Graceful keyboard shutdown of thread pool in Rust

While writing a Discord bot using serenity-rs I wanted to create a thread pool to properly process events coming from Discord. The architecture looked something like this:

use rayon;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread::{self, JoinHandle};

enum Command {/* snip */}

struct WorkerPool {
    receiver: Option<JoinHandle<()>>,
    sender: Option<Sender<Command>>,
}

impl WorkerPool {
    fn new() -> Self {
        let (tx, rx) = channel();

        let receiver_thread = thread::spawn(move || {
            let pool = rayon::ThreadPoolBuilder::new()
                .num_threads(8)
                .build()
                .unwrap();

            while let Ok(command) = rx.recv() {
                /* do matching logic */
                pool.spawn(|| do_command(command));
            }
            eprintln!("Worker loop done.");
        });

        Self {
            receiver: Some(receiver_thread),
            sender: Some(tx),
        }
    }

    fn tx(&self) -> Sender<Command> {
        self.sender.unwrap().clone()
    }
}

/* Ensure thread is joined when WorkerPool goes out of scope. */
impl Drop for WorkerPool {
    fn drop(&mut self) {
        self.sender.take();
        self.receiver.take().and_then(|handle| handle.join().ok());
        eprintln!("dropped worker pool thread");
    }
}

fn main() {
    let worker_pool = WorkerPool::new();
    let discord = Discord::new(worker_pool.tx());

    // This blocks the main thread!
    discord.start();
    eprintln!("Good bye!");
}

This code does couple of things:

  • The worker pool is created containing 8 threads + one receiver thread. Receiver thread is light-weight and only interested in passing the message further to the pool of threads.
  • I keep the instance of Sender inside WorkerPool struct. .recv() call will error out when every Sender will die - which will end the loop inside the receiver thread. So when WorkerPool is dropped receiver thread will be kept alive as long as there are other cloned Senders active.
  • By default in Rust, threads are detached from the main thread meaning they can outlive the thread which spawned them. That’s why I implement Drop for WorkerPool - it joins the receiver thread so it dies before the main thread dies. It is very important to drop Sender before joining receiver thread - otherwise, it’s going to unconditionally deadlock. This is done by in this code by taking it our of option and forgetting the value immediately.

When I ran this code I’ve expected everything working properly - that I should see “Worker loop done”, “dropped worker pool thread” and “Good bye!” messages. Unfortunately, this is not the case.

Why this code is broken?

Handling keyboard interrupts are generally done using so-called signals which, by default, forces the process to terminate immediately. Rust is not overriding this behavior - which means no destructors will be called and the general unwinding won’t happen. That means our threads won’t get joined gracefully - they’ll get shut down by the process clean-up procedure of your operating system.

In order to fix such program, you need to capture this interrupt signal and break out of the main thread block. After main ends, everything will get cleaned up correctly.

Cross-platform fix: ctrlc crate

I spent quite some time thibking about how to perform this task in a cross-platform way - especially because I have no knowledge about how signal handling works in Windows family of operating systems. Fortunately there is a very nice crate for this task called ctrlc which allows you to solve this problem.

In my particular case, to break out of the main thread block after starting the client in serenity-rs there is a shutdown_all method in ShardManager part of the client. Since it is possible to call it while having shared reference to the client, the code is very straightforward:

/* In Cargo.toml:
 * [dependencies]
 * ctrlc = "3.1"
 */
fn main() {
    let worker_pool = WorkerPool::new();
    let discord = Discord::new(worker_pool.tx());

    // Gracefully shutdown on keyboard interrupt:
    // Shard manager is behind Arc:
    // https://docs.rs/serenity/0.8.6/serenity/client/struct.Client.html#structfield.shard_manager
    let shard_manager = discord.serenity_client.shard_manager.clone();
    ctrlc::set_handler(move || shard_manager.lock().shutdown_all());

    // This blocks the main thread!
    client.start();
    eprintln!("Good bye!");
}

ctrlc::set_handler replaces the default behavior of SIGINT signal (and SIGTERM if you want - behind the feature flag) which now disconnects serenity-rs Client from Discord servers, allowing start() method to finish.

What’s important, everything you put into set_handler must be thread-safe. Signal handlers are hard to get right - you are allowed to safely perform only so-called reentrant operations. Under the hood ctrlc spawns an another thread to perform your (possibly not reentrant) operations safely.

tl;dr

  • Rust is following the default behavior when it comes to signal handling - meaning interrupts by keyboard will abruptly terminate your process without running destructors for your data.
  • ctrlc allows you to override the default behavior of keyboard interrupts in a cross-platform way.
  • Remember that your code in set_handler needs to be thread-safe.
  • In the case of serenity-rs there is a way to shutdown the client in a thread-safe manner. You may need to find ways to do it in your use case. Common techniques include sending a special quit command using channels, setting an atomic flag, or using non-RAII destructor methods exposed by your types.

Sample GitHub code

I’ve put a simplified code example to this post (where serenity-rs is replaced with a dummy structure with a similar interface) on my GitHub. Feel free to clone it and tinker with it as you please!


Marcin Grzywaczewski

713 Words

2020-07-03 02:00 +0200