Table of Contents
Introduction
I have created a thread pool in C++ and in this article I will present and explain the code.
Before I begin, I will state that the original code I used is heavily based off of this StackOverflow answer from PhD AP EcE. If you are interested in the original, and a more simple thread pool that just works - his answer is perfect.
The original answer could only accept jobs that are a void function, that receives no arguments.
My goal was to make that thread pool much more modular, being able to accept different kinds of functions and and function signatures (all returning void, but that can be changed easily though not really needed with threads).
If you are interested in just getting the file, you can do so here and it is also available on my GitHub with a working example (which I will present here as well).
How to use it
A full main file example using the code can be found here. Note that the code uses a very basic thread safe queue to handle the printing. That class QueueTS.hpp
can be found here.
I’ll be assuming you know why you need a thread pool in the first place. My example will be a simple one in which we’ll be summing a large array. We first create the function we want to execute on each of our threads. I have created the following function:
void partialSum(const std::vector<int>& ns, size_t start, size_t end, long long& result) {
result = std::accumulate(ns.begin() + start, ns.begin() + end, 0LL);
std::stringstream s;
s << "Summed from " << start << " to " << end
<< " and got a total of " << result
<< " on thread " << std::this_thread::get_id();
printQueue.push(s.str());
}
partialSum
simply sums a part of the std::vector
. We will have a big array in our main
function, specifically of size
which is approx. a million. We’ll:
- Split it into 64 partial sums.
- Run all the partial sums on 4 threads.
- Wait for them to finish.
- Sum the 64 partial sums in the main thread.
- Output the actual sum (by doing the full sum) and the sum of the partial sums.
Again, the full code can be seen above. We have a function called fillWithRandomNumbers
that does what it says to an std::vector<int>&
passed to it. We also have a thread safe QueueTS.hpp file we use for the printing which is global.
Here’s our main function:
int main() {
const size_t SIZE = 2 << 20;
std::vector<int> numbers(SIZE);
fillWithRandomNumbers(numbers);
// Create a thread pool with 4 threads.
ThreadPool<4, const std::vector<int>&, size_t, size_t, long long&> pool;
// We want to split our code into 64 smaller chunks.
const size_t tasks = 64;
const size_t taskCount = SIZE / tasks;
std::array<long long, tasks> results;
for (size_t i = 0; i < tasks; i++) {
results[i] = 0;
pool.queue(partialSum, numbers, taskCount * i, taskCount * (i + 1), results[i]);
}
pool.start();
while (pool.busy()) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
pool.join();
std::cout << "Result is: "
<< std::accumulate(results.begin(), results.end(), 0LL)
<< "\n";
std::cout << "Actual sum is: "
<< std::accumulate(numbers.begin(), numbers.end(), 0LL)
<< "\n";
std::cout << "-----------------------------------------------------\n";
while (!printQueue.empty()) {
std::cout << printQueue.pop() << "\n";
}
}
The main thing to focus on is our creation of the thread, in which we are passing the number of threads we are interested in, and the argument types the function is expecting to get. This gives us all the flexibility we might be interested in as we get to call any kind of function we want (that returns void
).
We should also look at the API. We can see that our thread pool supports the methods queue
, start
, busy
and join
. These methods are enough for us to handle our threads. Whereas join
tells all threads to stop execution of their current task and terminate, without executing any more tasks. start
starts running the thread, meaning there’s an inner thread pool loop in the class.
We’ll be building all of those functions.
Let’s build a Thread Pool
First thing we’ll define is our signature, so we’ll know what we have to implement. Here’s the layout of our class:
#include <thread>
#include <mutex>
#include <functional>
#include <condition_variable>
#include <queue>
#include <array>
#include <optional>
#include <tuple>
template<size_t threadCount, typename... Args>
class ThreadPool {
public:
typedef std::function<void(Args...)> Function;
struct Job;
private:
bool shouldTerminate = false;
std::mutex queueMutex;
std::condition_variable mutexCondition;
std::array<threadCount, std::thread> threads;
std::queue<Job> jobs;
public:
ThreadPool() = default;
~ThreadPool();
void start();
void queue(const Function& func, Args... args);
void join();
bool busy();
private:
void loop();
std::optional<Job> getNextJob();
};
As you can tell we have quite a bit to cover, we’ll implement the code from top to bottom in this article for the sake of order.
Templated Class
If you are unfamiliar with templated class, or just not used to them, the definition here can be a bit a lot so let’s look at it carefully.
template<size_t threadCount, typename... Args> class ThreadPool
First, I’ll remind that the templated information is passed to the class when creating as ThreadPool<4, int, float>
. The first argument we are receiving is the number of threads that we want to have. The idea of a thread pool is that we have a few threads that keep executing tasks, so we can have 1 million tasks, but still have a low number of context switches and a low number of thread overhead.
The first part is easy enough to understand. If you are unfamiliar with C++ templates, then yes, you can pass arguments like that and they will be passed as is, if you pass a number, the compiler will replace all occurances with that number and if you pass a variable, the compiler will replace all occurances with that variable.
The next part is the more tricky part. What we are saying is that, from here on out, you can pass as many types as you want into the template and we’ll store them all in Args
. This is all the part says, and it just looks a bit scary. This is the trick to let us have a lot of flexibility while creating thread pools.
ThreadPool::Job
This is the part that is the most different from the original. I have defined a custom job struct for each function. The job struct is defined as such:
struct Job {
Function task;
std::tuple<Args...> args;
void call() {
std::apply(task, args);
}
};
Note that Job
is part of ThreadPool
as each thread pool has it’s own kind of job that it executes tasks of. In order to start a task, we must call the call()
method and it will call the function using std::apply
with the arguments that were passed when the Job
was created.
A small note, I didn’t find another way to do this, we’ll save the arguments as a tuple as we can pass our Args
to the std::tuple
template using the elipsis ...
to destructure them. We are using apply
here, rather than just calling the function using task(args)
so we can pass it the tuple.
Variables Explained
We’ll have a look at our variables. I believe they are self explanatory, however I will explain them for the sake of completion.
The idea of the ThreadPool
is we have a queue of tasks. Each thread will execute a task from it in a first come first serve (FIFO) manner. If there are no tasks to queue, the threads will sleep until they are notified of a new task, or are notified to stop terminate. The way we’ll notify them will be using the std::condition_variable
. The way we’ll tell the threads to stop executing will be with shouldTerminate
.
You can also see we have an std::queue
of Jobs
and a mutex to control threads’ access to the queue (to make it thread safe).
One note about this implementation. In the origianl, they used an std::vector
to store the threads. We are using an array. This does mean that once you set the number of threads you cannot change it. If you really desire that usage, feel free to change the code accordingly.
This does mean we are limited to compile time know values, but that’s not an issue for my use cases.
The API
We’ll now cover the start
, queue
, join
and busy
methods. They are quite simple as the main part of the thread pool is the thread loop. So let’s get to it.
void start() {
for (size_t i = 0; i < threadCount; i++) {
threads[i] = std::thread(&ThreadPool::loop, this);
}
}
To start the thread pool execution, we are telling all of our threads to start executing the loop function, giving it the context of this
. You could call the start function directly in the constructor. Doing this, however, let’s us first queue up all of the tasks before executing them. You can also call start
directly after initializing the object.
void queue(const Function& func, Args... args) {
std::unique_lock<std::mutex> lock(queueMutex);
Job job = { func, std::forward_as_tuple(std::forward<Args>(args)...) };
jobs.push(job);
}
To queue a task, ignoring the syntax, we are just asking for the function handle (the same function that we have defined with ThreadPool::Function
). Writing the signature like that lets us pass the arguments in the following manner.
pool.queue(func, 5, 3.14f, "Text");
This makes the code the user writes look nicer than passing an std::tuple. We are required to forward_as_tuple the arguments, but that’s just a little bit more code for the api to make the usage much nicer :D.
void join() {
if (shouldTerminate) {
return;
}
std::unique_lock<std::mutex> lock(queueMutex);
shouldTerminate = true;
lock.unlock();
mutexCondition.notify_all();
for (std::thread& activeThread : threads) {
activeThread.join();
}
}
join
ing means “tell the threads to stop whatever they are doing, and stop executing any more tasks, even if there are nore tasks”. As stated above, we can notify waiting threads that they can stop waiting for more tasks using the condition_variable
. Thus, having threads waiting for new tasks, they can escape their lock.
We are getting a lock, setting the shouldTerminate
to true
and notify_all
of the threads waiting on the queueMutex
with the mutexCondition
condition variable. Then we are looping over all of our threads and we are telling them to stop executing once they can using the std::thread::join
method.
bool busy() {
return !jobs.empty();
}
Lastly, for our programs (to know if we are still executing tasks), we have the busy method that returns whether the std::queue still has tasks in it.
Main Thread Loop
Now for the interesting part. Here’s the heavy lifting part of the code. Before we can actually talk about the main thread pool loop we’ll show how we are getting task to execute. We are doing so using the getNextJob
defined:
std::optional<Job> getNextJob() {
std::unique_lock<std::mutex> lock(queueMutex);
mutexCondition.wait(lock, [this] {
return !jobs.empty() || shouldTerminate;
});
if (shouldTerminate) {
return std::nullopt;
}
Job job = jobs.front();
jobs.pop();
return job;
}
First, we’re getting a lock on the std::queue
by locking our queueMutex
with a std::unique_lock
which gets released automatically at the end of its life (and since it’s defined on the stack, its end of life is the end of the task).
We are then waiting using std::condition_variable
until we are notified and the jobs
queue is not empty or we shoudlTerminate
. This will get triggered by a the queue
and join
.
Once we finished waiting, we either need to stop executing, and then we are retuning a std::nullopt
or we have another job to execute, and then we are taking it from the queue and returning it. This is another big change in the code. In the original code we are using execptions and if we need to terminate we throw
a proper exception that we’ll catch in the loop
method. Here we are replacing the try-catch
with an std::optional
return type - being an std::nullopt
to terminate and a value to continue execution.
Now let’s look at the main loop as we saved the best for last.
void loop() {
while (true) {
std::optional job = getNextJob();
if (!job.has_value()) {
break; // out of the while loop.
}
job.value().call();
}
}
We are asking here for the next job using the method getNextJob
we just explained. This will make our thread wait until there is a new job or we need to stop execution. If the result we are getting is a std::nullopt
then we break
out of the while loop. Otherwise, there’s a value which is a struct Job
and we can go ahead and call the call
method of that struct.
This concludes the ThreadPool
class and it’s implementation.
Conclusion
This is the thread pool that I am using when working on C++ projects. Now you are more than welcomed to use the code in your project or modify it to your likings.
And that concludes the code.