Thread Pool in C++

Table of Contents

  1. Introduction
  2. How to use it
  3. Let’s build a Thread Pool
    1. Templated Class
    2. ThreadPool::Job
    3. Variables Explained
    4. The API
    5. Main Thread Loop
  4. Conclusion

Introduction

I have created a thread pool in C++ and in this article I will present and explain the code.

Before I begin, I will state that the original code I used is heavily based off of this StackOverflow answer from PhD AP EcE. If you are interested in the original, and a more simple thread pool that just works - his answer is perfect.

The original answer could only accept jobs that are a void function, that receives no arguments.

My goal was to make that thread pool much more modular, being able to accept different kinds of functions and and function signatures (all returning void, but that can be changed easily though not really needed with threads).

If you are interested in just getting the file, you can do so here and it is also available on my GitHub with a working example (which I will present here as well).

How to use it

A full main file example using the code can be found here. Note that the code uses a very basic thread safe queue to handle the printing. That class QueueTS.hpp can be found here.

I’ll be assuming you know why you need a thread pool in the first place. My example will be a simple one in which we’ll be summing a large array. We first create the function we want to execute on each of our threads. I have created the following function:

void partialSum(const std::vector<int>& ns, size_t start, size_t end, long long& result) {
  result = std::accumulate(ns.begin() + start, ns.begin() + end, 0LL);

  std::stringstream s;
  s << "Summed from " << start << " to " << end
    << " and got a total of " << result
    << " on thread " << std::this_thread::get_id();
  printQueue.push(s.str());
}

partialSum simply sums a part of the std::vector. We will have a big array in our main function, specifically of size 2202^{20} which is approx. a million. We’ll:

  1. Split it into 64 partial sums.
  2. Run all the partial sums on 4 threads.
  3. Wait for them to finish.
  4. Sum the 64 partial sums in the main thread.
  5. Output the actual sum (by doing the full sum) and the sum of the partial sums.

Again, the full code can be seen above. We have a function called fillWithRandomNumbers that does what it says to an std::vector<int>& passed to it. We also have a thread safe QueueTS.hpp file we use for the printing which is global.

Here’s our main function:

int main() {
  const size_t SIZE = 2 << 20;
  std::vector<int> numbers(SIZE);

  fillWithRandomNumbers(numbers);

  // Create a thread pool with 4 threads.
  ThreadPool<4, const std::vector<int>&, size_t, size_t, long long&> pool;

  // We want to split our code into 64 smaller chunks.
  const size_t tasks = 64;
  const size_t taskCount = SIZE / tasks;

  std::array<long long, tasks> results;
  for (size_t i = 0; i < tasks; i++) {
    results[i] = 0;
    pool.queue(partialSum, numbers, taskCount * i, taskCount * (i + 1), results[i]);
  }

  pool.start();
  while (pool.busy()) {
    std::this_thread::sleep_for(std::chrono::milliseconds(50));
  }
  pool.join();

  std::cout << "Result is:     "
    << std::accumulate(results.begin(), results.end(), 0LL)
    << "\n";
  std::cout << "Actual sum is: "
    << std::accumulate(numbers.begin(), numbers.end(), 0LL)
    << "\n";

  std::cout << "-----------------------------------------------------\n";

  while (!printQueue.empty()) {
    std::cout << printQueue.pop() << "\n";
  }
}

The main thing to focus on is our creation of the thread, in which we are passing the number of threads we are interested in, and the argument types the function is expecting to get. This gives us all the flexibility we might be interested in as we get to call any kind of function we want (that returns void).

We should also look at the API. We can see that our thread pool supports the methods queue, start, busy and join. These methods are enough for us to handle our threads. Whereas join tells all threads to stop execution of their current task and terminate, without executing any more tasks. start starts running the thread, meaning there’s an inner thread pool loop in the class.

We’ll be building all of those functions.

Let’s build a Thread Pool

First thing we’ll define is our signature, so we’ll know what we have to implement. Here’s the layout of our class:

#include <thread>
#include <mutex>
#include <functional>
#include <condition_variable>
#include <queue>
#include <array>
#include <optional>
#include <tuple>

template<size_t threadCount, typename... Args>
class ThreadPool {
public:
  typedef std::function<void(Args...)> Function;

  struct Job;

private:
  bool shouldTerminate = false;

  std::mutex queueMutex;
  std::condition_variable mutexCondition;

  std::array<threadCount, std::thread> threads;
  std::queue<Job> jobs;

public:
  ThreadPool() = default;

  ~ThreadPool();

  void start();
  void queue(const Function& func, Args... args);
  void join();
  bool busy();

private:
  void loop();
  std::optional<Job> getNextJob();
};

As you can tell we have quite a bit to cover, we’ll implement the code from top to bottom in this article for the sake of order.

Templated Class

If you are unfamiliar with templated class, or just not used to them, the definition here can be a bit a lot so let’s look at it carefully.

template<size_t threadCount, typename... Args> class ThreadPool

First, I’ll remind that the templated information is passed to the class when creating as ThreadPool<4, int, float>. The first argument we are receiving is the number of threads that we want to have. The idea of a thread pool is that we have a few threads that keep executing tasks, so we can have 1 million tasks, but still have a low number of context switches and a low number of thread overhead.

The first part is easy enough to understand. If you are unfamiliar with C++ templates, then yes, you can pass arguments like that and they will be passed as is, if you pass a number, the compiler will replace all occurances with that number and if you pass a variable, the compiler will replace all occurances with that variable.

The next part is the more tricky part. What we are saying is that, from here on out, you can pass as many types as you want into the template and we’ll store them all in Args. This is all the part says, and it just looks a bit scary. This is the trick to let us have a lot of flexibility while creating thread pools.

ThreadPool::Job

This is the part that is the most different from the original. I have defined a custom job struct for each function. The job struct is defined as such:

struct Job {
  Function task;
  std::tuple<Args...> args;

  void call() {
    std::apply(task, args);
  }
};

Note that Job is part of ThreadPool as each thread pool has it’s own kind of job that it executes tasks of. In order to start a task, we must call the call() method and it will call the function using std::apply with the arguments that were passed when the Job was created.

A small note, I didn’t find another way to do this, we’ll save the arguments as a tuple as we can pass our Args to the std::tuple template using the elipsis ... to destructure them. We are using apply here, rather than just calling the function using task(args) so we can pass it the tuple.

Variables Explained

We’ll have a look at our variables. I believe they are self explanatory, however I will explain them for the sake of completion.

The idea of the ThreadPool is we have a queue of tasks. Each thread will execute a task from it in a first come first serve (FIFO) manner. If there are no tasks to queue, the threads will sleep until they are notified of a new task, or are notified to stop terminate. The way we’ll notify them will be using the std::condition_variable. The way we’ll tell the threads to stop executing will be with shouldTerminate.

You can also see we have an std::queue of Jobs and a mutex to control threads’ access to the queue (to make it thread safe).

One note about this implementation. In the origianl, they used an std::vector to store the threads. We are using an array. This does mean that once you set the number of threads you cannot change it. If you really desire that usage, feel free to change the code accordingly.

This does mean we are limited to compile time know values, but that’s not an issue for my use cases.

The API

We’ll now cover the start, queue, join and busy methods. They are quite simple as the main part of the thread pool is the thread loop. So let’s get to it.

void start() {
  for (size_t i = 0; i < threadCount; i++) {
    threads[i] = std::thread(&ThreadPool::loop, this);
  }
}

To start the thread pool execution, we are telling all of our threads to start executing the loop function, giving it the context of this. You could call the start function directly in the constructor. Doing this, however, let’s us first queue up all of the tasks before executing them. You can also call start directly after initializing the object.

void queue(const Function& func, Args... args) {
  std::unique_lock<std::mutex> lock(queueMutex);

  Job job = { func, std::forward_as_tuple(std::forward<Args>(args)...) };
  jobs.push(job);
}

To queue a task, ignoring the syntax, we are just asking for the function handle (the same function that we have defined with ThreadPool::Function). Writing the signature like that lets us pass the arguments in the following manner.

pool.queue(func, 5, 3.14f, "Text");

This makes the code the user writes look nicer than passing an std::tuple. We are required to forward_as_tuple the arguments, but that’s just a little bit more code for the api to make the usage much nicer :D.

void join() {
  if (shouldTerminate) {
    return;
  }

  std::unique_lock<std::mutex> lock(queueMutex);
  shouldTerminate = true;
  lock.unlock();

  mutexCondition.notify_all();
  for (std::thread& activeThread : threads) {
    activeThread.join();
  }
}

joining means “tell the threads to stop whatever they are doing, and stop executing any more tasks, even if there are nore tasks”. As stated above, we can notify waiting threads that they can stop waiting for more tasks using the condition_variable. Thus, having threads waiting for new tasks, they can escape their lock.

We are getting a lock, setting the shouldTerminate to true and notify_all of the threads waiting on the queueMutex with the mutexCondition condition variable. Then we are looping over all of our threads and we are telling them to stop executing once they can using the std::thread::join method.

bool busy() {
  return !jobs.empty();
}

Lastly, for our programs (to know if we are still executing tasks), we have the busy method that returns whether the std::queue still has tasks in it.

Main Thread Loop

Now for the interesting part. Here’s the heavy lifting part of the code. Before we can actually talk about the main thread pool loop we’ll show how we are getting task to execute. We are doing so using the getNextJob defined:

std::optional<Job> getNextJob() {
  std::unique_lock<std::mutex> lock(queueMutex);
  mutexCondition.wait(lock, [this] {
    return !jobs.empty() || shouldTerminate;
  });

  if (shouldTerminate) {
    return std::nullopt;
  }

  Job job = jobs.front();
  jobs.pop();

  return job;
}

First, we’re getting a lock on the std::queue by locking our queueMutex with a std::unique_lock which gets released automatically at the end of its life (and since it’s defined on the stack, its end of life is the end of the task).

We are then waiting using std::condition_variable until we are notified and the jobs queue is not empty or we shoudlTerminate. This will get triggered by a the queue and join.

Once we finished waiting, we either need to stop executing, and then we are retuning a std::nullopt or we have another job to execute, and then we are taking it from the queue and returning it. This is another big change in the code. In the original code we are using execptions and if we need to terminate we throw a proper exception that we’ll catch in the loop method. Here we are replacing the try-catch with an std::optional return type - being an std::nullopt to terminate and a value to continue execution.

Now let’s look at the main loop as we saved the best for last.

void loop() {
  while (true) {
    std::optional job = getNextJob();

    if (!job.has_value()) {
      break; // out of the while loop.
    }

    job.value().call();
  }
}

We are asking here for the next job using the method getNextJob we just explained. This will make our thread wait until there is a new job or we need to stop execution. If the result we are getting is a std::nullopt then we break out of the while loop. Otherwise, there’s a value which is a struct Job and we can go ahead and call the call method of that struct.

This concludes the ThreadPool class and it’s implementation.

Conclusion

This is the thread pool that I am using when working on C++ projects. Now you are more than welcomed to use the code in your project or modify it to your likings.


And that concludes the code.