Table of Contents
When a task isn't instant, you usually don't want to run it on the UI thread.
I present here a simple enough framework to run tasks in background.
It ensures tasks can be canceled and that the worker can be cleanly and instantly stopped guaranteeing there's no pending tasks or threads and that resources are correctly released. This is especially important if you don't want your users to wait on quitting.
If you want to try it, I made a complete runnable example which lets you start and interrupt tasks in a minimal cli application. It's available at https://github.com/Canop/blog-cancelable-background-task.
A task
First, to make it clearer, here's our starting point, a simple non cancelable task:
/// The task to execute
pub struct Task {
id: usize,
}
#[derive(Debug)]
pub enum TaskError {
// Your execution error
// Error boilerplate omitted
}
impl Task {
pub fn execute(
self,
) -> Result<(), TaskError> {
for i in 1..4 {
print(format!("Task {}: {}/4", self.id, i));
std::thread::sleep(std::time::Duration::from_secs(1));
}
Ok(())
}
}
This example task prints 4 times, sleeping one second at each point.
Such task can't be cleanly canceled. Once it's started, the execution must go till the end.
Now, let's make it cancelable.
A cancelable task
Some languages like C# and Java tried to introduce forced thread interruption. Let's just say that this didn't work well.
There's no other way, for a function to be cancelable, than to have it check, one way or another, whether it should continue working.
A perfectly OK solution is to use an Arc<AtomicBool>
, which you can update from outside the worker and check in the worker.
I usually prefer to use a channel
, which is more flexible: it allows you to send several kind of interruptions if the need arises, and it can be used while sleeping
.
The rest of this article will assume the use of crossbeam channels.
To return what happened, be it a task completion or a cancelation, Rust's Result
is perfectly suited.
Here's what our task becomes:
/// The task to execute
pub struct Task {
id: usize,
// probably other fields
}
#[derive(Debug)]
pub enum TaskError {
Interrupted,
// probably more variants
}
impl Task {
pub fn execute(self, interrupt: Receiver<()>) -> Result<(), TaskError> {
for i in 1..=4 {
log(format!("Task {}: {}/4", self.id, i));
if interrupt.recv_timeout(std::time::Duration::from_secs(1)).is_ok() {
return Err(TaskError::Interrupted);
}
}
Ok(())
}
}
Instead of just sleeping, we sleep but also watch for any interrupt coming down the channel.
Common tasks aren't usually sleeping a lot.
So instead of interrupt.recv_timeout
, you would regularly check if interrupt.is_full()
.
Checking whether an interrupt signal has arrived is very cheap so you should do it frequently for a better user experience.
The worker
We want our worker
- to accept new tasks and queue them if there aren't too many of them
- to execute queued tasks, one after the other
- to allow cancelation of the current task
- when dropped, to cancel the task, if any, then gracefully stop
Many problems don't require specific task cancelation, so you may remove the cancel_current_task
function, but you'll need a mechanism canceling tasks on worker finishing anyway.
Channels are used to communicate with the background thread, but calls to the workers are just synchronous calls to the execute
and cancel_current_taks
function.
The worker thread, created in Worker::new
waits until there's either a call to die or a new task.
The task is executed on the worker's thread.
use {
crate::*,
crossbeam::channel::{self, select, Sender},
std::thread,
};
/// Maximum number of tasks that can be queued.
/// If this limit is reached, no new task will be queued.
///
/// If you don't want to drop tasks, use `channel::unbounded`
/// instead of `channed::bounded` when creating `s_task`
const MAX_QUEUED_TASKS: usize = 3;
/// Manage a thread to execute tasks
pub struct Worker {
thread: Option<thread::JoinHandle<()>>,
s_die: Option<Sender<()>>, // to send the die signal
s_cancel: Sender<()>, // to send interrupt to the current task
s_task: Sender<Task>, // to send tasks to the worker thread
}
impl Worker {
pub fn new() -> Self {
let (s_task, r_task) = channel::bounded::<Task>(MAX_QUEUED_TASKS);
let (s_die, r_die) = channel::bounded(1);
let (s_cancel, r_cancel) = channel::bounded(1);
let thread = thread::spawn(move || {
loop {
select! {
recv(r_die) -> _ => {
log("worker thread is stopping");
break;
}
recv(r_task) -> ps => {
match ps {
Ok(task) => {
if !r_die.is_empty() { continue; }
let _ = r_cancel.try_recv(); // clean any unconsumed cancel
match task.execute(r_cancel.clone()) {
Ok(()) => {
log("Task done");
}
Err(TaskError::Interrupted) => {
log("Task interrupted");
}
Err(e) => {
log(format!("Task error: {}", e));
}
}
}
Err(e) => {
log(format!("Channel error: {}", e)); // unlikely
break;
}
}
}
}
}
});
Self {
thread: Some(thread),
s_die: Some(s_die),
s_task,
s_cancel,
}
}
/// Request a task execution, unless too many of them are already queued
pub fn execute(&self, task: Task) {
if self.s_task.try_send(task).is_ok() {
log("Queued task");
} else {
log("Too many tasks in the queue, dropping one");
}
}
/// Request the current task to be interrupted
pub fn cancel_current_task(&self) {
let _ = self.s_cancel.try_send(());
}
/// Make the worker thread stop /// (interrupting the current task if any)
pub fn die(&mut self) {
self.cancel_current_task(); // interrupt current task if any
if let Some(sender) = self.s_die.take() {
let _ = sender.send(());
}
if let Some(thread) = self.thread.take() {
if thread.join().is_err() {
log("child_thread.join() failed"); // should not happen
} else {
log("Worker gracefully stopped");
}
}
}
}
impl Drop for Worker {
fn drop(&mut self) {
self.die();
}
}
The devil is in the details, so I'll explain them below.
Task cancelation
Canceling the current task is straightforward:
When worker.cancel_current_task()
is called, a signal is sent to the s_cancel
channel.
The task's execute
function has a receiver for that channel, and stops when it receives the signal.
The signal is sent with try_send
instead of send
so that it doesn't block if the channel is full.
There's no need to queue cancelations.
The small detail to pay attention to here is that there might be a cancel signal already present in the channel when launching a new task, either because the previous task finished on its own without checking the channel, or because the signal was launched too late.
There are several ways to deal with that.
For example it's possible to send the id of the task to stop and to have the task check it was the intended recipient.
But here, with no task parallelism, the simplest solution is to just remove any unconsumed signal in this channel when starting the task. This is done with
let _ = r_cancel.try_recv(); // clean any unconsumed cancel
Worker end
The worker may be explicitely stopped with worker.die()
but for better ergonomics this function is also called in the Drop
implementation: when the scope of the worker ends, it's cleanly stopped.
This function does the following operations:
- it sends the cancelation signal to any running task
- it sends the die signal, and removes the sender from the struct: the worker dies only once
- then it waits for the thread to end on its own, ensuring the task can cleanly end without having the thread disappear at the wrong time
So, what happens ?
If no taks is running, the recv(r_die) -> _ => {
arm of the select!
is immediately triggered and the loop breaks.
If a task is running, the thread will be executing the match task.execute(r_cancel.clone())
expression until the task ends.
Then, in the next iteration of the loop, the select!
is expected to fall on the recv(r_die) -> _ => {
arm and then break the loop.
Where's the trap ?
There may be several tasks queued for execution. The arm order doesn't mean the next iteration of the loop will select the die arm just because there's a signal in the channel. If tasks were queued before, the "execute task" arm will be triggered first.
A solution is to just not execute them if there's a signal in the die channel. That's the purpose of this test:
if !r_die.is_empty() { continue; }
Go further
Make it a lib, and the worker generic on the task
My advice here is "Don't".
With experience, you'll notice that every time you need such worker the problem is a little different.
You need to know and understand a working pattern, and I even suggest you start from one you know to be working, but, codewise, the reusable parts are the standard thread library and a channel one.
Return a value at end of task, or during computations
Sometimes, your background task must return a value at end, or a task report on interruption.
Broot, for example, searches the disk in background and return the filtered file tree in response, but stops for a different search as soon as the user types a key.
In some cases, you may even want to return information during task execution. This is for example the case of bacon which runs programs in background and displays diagnostics in real time.
In both case, channels are used. A simple solution is for the worker to return a struct wrapping a channel in which the task can send its production. This struct can also wrap a specific interrupt channel.
Rust async
In Rust async, for example with tokio, things are a little different but the concepts are the same.
The Task
especially wouldn't change apart the async
keyword and the use of async-channel instead of crossbeam.
The worker would be quite similar but with no explicit thread.
If you're interested, tell me and I'll make a follow-up article with a tokio focus.