Concurrency

_images/concurrency.png

Concurrencey is the ability to run mutiple tasks simultaneously. This concept becomes more and more important as the number of cores in CPUs increases. In a single core CPU, the operating system can switch between tasks so fast that it appears that they are running simultaneously. However, in a multi-core CPU, the tasks can actually run simultaneously. As the clock frequencies of modern processor are not increasing as fast as they used to, the number of cores in CPUs is increasing. This means that in the future, the performance of a program will depend on how well it can utilize multiple cores. This is why concurency is important.

In C++ this can be acomplished using threads or processes. A process is an independent execution unit that has its own memory space. Processes are typically applications runing on your computer like a web browser or a text editor. Threads are similar to processes, but they share the same memory space. This means that threads can communicate with each other by reading and writing to the same memory locations. Threads are typically used for tasks that are closely related and need to share data.

In this chapter we are mainly going focus thread based concurrency. Before C++11 threads were not part of the standard library and you had to rely on platform specific libraries for this functionality. On Linux you mainly used the POSIX threads library (pthread) and on Windows you had to use the Windows API.

C++11 introduced the std::thread class which is a part of the standard library. This class is used to create and manage threads. C++20 introduced the std::jthread class which is similar to std::thread, but it has some additional features which we will cover in the following sections.

Threading basics

The basic way of creating a thread is to create a thread object and pass a function to it. The passed function will be executed on the new thread. Here is an example:

#include <print>
#include <thread>
#include <chrono>

void myfunc()
{
    std::print("Thread {} starting.\n", std::this_thread::get_id());
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::print("Thread {} stopping.\n", std::this_thread::get_id());
}

int main()
{
    std::jthread t(myfunc);
    t.join();
    std::println("Main thread stopping.\n");
}

Try example

In the example above we have defined a function, myfunc() that prints its id and waits for a second and the prints that it is stopping. To executed this function on a thread, we create a std::jthread object t and pass the function to it. The thread will execute immediately when created. As the thread is executed concurrently the main routine can terminate before the thread function terminates, which can lead to undefined behavior. To solve this the thread object has special method, .join(), which can be used to wait for the thread to finish.

Note

When working with threads and output, always use std::print() or std::println() from C++23 instead of std::cout. The std::print family of functions are thread-safe and guarantee that output from different threads won’t be interleaved. Using std::cout from multiple threads without synchronization can result in garbled output.

Managing Multiple Threads

To illustrate how threads can be used in computational tasks we will implement an algorithm calculating a sum of integers.

\[\sum_{k=0}^{n} k\]

This has the closed-form solution of:

\[\sum_{k=0}^{n} k = \frac{n(n+1)}{2}\]

By having a closed-form solution we can also easily check the results of our algorithm.

In our algorithm we need a function that can calculate a part of a sum in a range. We also want it to be able to store its sum somewhere. The following code shows how this function could be implemented.

void sum(const int a, const int b, const int id, long long *sums)
{
    long long localSum = 0;

    for (int i = a; i <= b; i++)
        localSum += i;

    sums[id] = localSum;
}

The sums array contains the sums for each separate calculcation. id is the index of the current calculation performed. Now we want to create a number of threads each performing a sum over a range in the total series.

In our complete program we will use the std::print() function and std::vector for output and storing std::jthread and sums. To do this we need the following includes:

#include <thread>
#include <vector>
#include <print>

The first thing we will do in our main program is to define constants for our total range, that is from 0 to 100000000. As they will not change during the computation we declare them as const.

const int a = 0;
const int b = 100000000;

Next, we need to determine how many threads we are going to use. To be useful we will use a minimum of 8 up to the total of threads possible in the system. To calculate this we will use the std::min() function combined with the std::thread::hardware_concurrency() method in the thread library. We declare a constant, numThreads, which we assign the actual value. We also use the auto keyword so that we don’t have to bother with the actual datatype returned from the std::min() function.

const auto numThreads = std::min(8u, std::thread::hardware_concurrency());

To be able to handle multiple threads we can use a std::vector to store the jthread instances in. We declare an array threads that will hold our created threads.

std::vector< std::jthread > threads;

To handle the individual sums that the threads will comput we will declare a std::vector, sums for holding the results. We resize this vector to the number of threads being used.

std::vector< long long > sums;
sums.resize(numThreads);

Next, we will create some required constants for determining the ranges each thread will calculcate and som variables used in our thread creation loop.

const int chunkSize = (b - a + 1) / numThreads;
const int remainder = (b - a + 1) % numThreads;

std::print("Summing from {0} to {1} using {2} threads\n", a, b, numThreads);
std::print("Chunk size = {0}, Remainder = {1}\n", chunkSize, remainder);

int i0, i1;

In our thread creation loop we first calculcate i0 and i1 then we create our threads using the .emplace_back() method which will automatically call the thread constructor with the function pointer as well as the function arguments. The thread will immediately start executing the sum() function at this point.

for (auto i = 0; i < numThreads; i++)
{
    i0 = a + chunkSize * i + std::min(i, remainder);
    i1 = i0 + chunkSize - 1;

    if (i < remainder)
        i1++;

    std::print("Thread {0}: i0 = {1}, i1 = {2} (count = {3})\n", i, i0, i1, i1 - i0 + 1);
    threads.emplace_back(sum, i0, i1, i, sums.data());
}

We now have several threads running and computing our sum. Before we can calculate the total sum we need to make sure all threads have finished executing. This can be done using the .join() method on the jthread object. Waiting for all our threads can easily be done using another simple loop as shonw in the following code:

std::print("Waiting for completion...\n");

for (auto& thread : threads)
    thread.join();

This can look somewhat strange that we are sequentially waiting for the threads, but remember that the threads are executing concurrently and we need to wait for all of them. If the first thread we join take longer the other calls to the .join() method will return directly as the other threads have already completed.

We are now ready to compute our total sum and a comparison with the closed-form result using the following code:

long long totalSum = 0;
auto i = 0;

for (auto& sum : sums)
{
    totalSum += sum;
    std::print("Sum {0} = {1}\n", i, sums[i++]);
}

std::print("Total sum = {0}\n", totalSum);

long long expected = (long long)(b - a + 1) * (a + b) / 2;
std::print("Expected sum = {0}\n", expected);
std::print("Difference = {0}\n", totalSum - expected);

Note

The implementation of the current algorithm is considered thread safe as the sum function only write to a unique id in the sums vector.

The complete code is shown below:

#include <thread>
#include <vector>
#include <print>

void sum(const int a, const int b, const int id, long long *sums)
{
    long long localSum = 0;

    for (int i = a; i <= b; i++)
        localSum += i;

    sums[id] = localSum;
}

int main()
{
    const int a = 0;
    const int b = 100000000;

    const auto numThreads = std::min(8u, std::thread::hardware_concurrency());

    std::vector< std::jthread > threads;

    std::vector< long long > sums;
    sums.resize(numThreads);

    const int chunkSize = (b - a + 1) / numThreads;
    const int remainder = (b - a + 1) % numThreads;

    std::print("Summing from {0} to {1} using {2} threads\n", a, b, numThreads);
    std::print("Chunk size = {0}, Remainder = {1}\n", chunkSize, remainder);

    int i0, i1;

    for (auto i = 0; i < numThreads; i++)
    {
        i0 = a + chunkSize * i + std::min(i, remainder);
        i1 = i0 + chunkSize - 1;

        if (i < remainder)
            i1++;

        std::print("Thread {0}: i0 = {1}, i1 = {2} (count = {3})\n", i, i0, i1, i1 - i0 + 1);
        threads.emplace_back(sum, i0, i1, i, sums.data());
    }

    std::print("Waiting for completion...\n");

    for (auto& thread : threads)
        thread.join();

    long long totalSum = 0;
    auto i = 0;

    for (auto& sum : sums)
    {
        totalSum += sum;
        std::print("Sum {0} = {1}\n", i, sums[i++]);
    }

    std::print("Total sum = {0}\n", totalSum);

    long long expected = (long long)(b - a + 1) * (a + b) / 2;
    std::print("Expected sum = {0}\n", expected);
    std::print("Difference = {0}\n", totalSum - expected);

    return 0;
}

Try example

Synchronization and Mutexes

We have now covered some basic usage of threads in C++. The examples shown have been well-behaved, not writing to the same memory locations and not shared any information between threads. What happens if threads are accessing the same memory locations or variables? If threads are only reading data from shared resources are usually not a problem. The problems come when they are going to write data in the same location. You can have multiple threads running concurrently that needs to update a counter at the same time. The update operation reading, modifying and writing back can not be acomplished atomically. That is a one thread can read at the same time as another thread writes resulting in an unknown variable state. To handle this situation we need to be able to synchronise access to the shared resource. This can be done using mutexes in C++. A mutex is an object that only one thread at a time can access. It works like a lock. When a thread wants to access a resource it calls the .lock() method on the mutex, which “locks” access to the resource until the thread calls .unlock() method. If another thread has locked the mutex the .lock() method will block and wait until it is available. Preventing the thread from mosifying the shared resource.

std::mutex should not be used directly as it is not exception safe. If an exception is thrown inside the code between .lock() and .unlock() calls will lead that the mutex will be locked forever. To solve this, mutexes should always be used together with a so called lock guard (std::lock_guard). This is a special class that owns a mutex and automatically calls .unlock() when it goes out of scope.

To illustrate synchronisation we are going to implement an application that uses threads that updates a global counter. To illustrate both with and without synchronisation we are going to implement the thread function with and without mutexes.

First we implement a global counter and a mutex:

#include <vector>
#include <thread>
#include <mutex>
#include <print>

// Shared resource that multiple threads will modify
int g_counter = 0;
std::mutex g_counter_mutex;

The std::mutex class is available in the <mutex> include. Next we implement two versions of the counter update functions. First the naive version that just updates the global variable g_counter.

void increment_counter_unsafe(int iterations)
{
    for (auto i = 0; i < iterations; i++)
        g_counter++;
}

In the safe version of the function we use a lock guard using std::lock_guard class. When you create the lock guard object it holds the lock of the selected mutex until it goes out of scope. In this function we aquire the lock in the scope of the for-loop just before we update the g_counter variable. When the loop-block goes out of scope the lock instance will release the lock and call .unlock on the mutex object. The safe version of the function then becomes:

void increment_counter_safe(int iterations)
{
    for (auto i = 0; i < iterations; i++)
    {
        std::lock_guard<std::mutex> lock(g_counter_mutex);
        g_counter++;
    }
}

In out main function we first start 10 threads using the unsafe function. We then print out a comparison of the number of increments we have on the g_counter variable compared to the expected increments.

const int num_threads = 10;
const int iterations_per_thread = 100000;

// Test 1: Without mutex (race condition)

std::print("Test 1: WITHOUT mutex protection");
g_counter = 0;

{
    std::vector<std::jthread> threads;
    for (int i = 0; i < num_threads; i++)
        threads.emplace_back(increment_counter_unsafe, iterations_per_thread);
}

std::println("Expected: {}", num_threads * iterations_per_thread);
std::println("Actual:   {}", g_counter);
std::println("Lost updates: {}", num_threads * iterations_per_thread - g_counter);

The extra brackets around the thread creation loop can look a bit odd, but it make sures that the .join() methods are called on the jthread objects and that the threads are completed before continuing.

The safe increment function is called in the same way.

std::println("Test 2: WITH mutex protection (using lock_guard)");
g_counter = 0;

{
    std::vector<std::jthread> threads;
    for (int i = 0; i < num_threads; i++)
        threads.emplace_back(increment_counter_safe, iterations_per_thread);
}

std::println("Expected: {}", num_threads * iterations_per_thread);
std::println("Actual:   {}", g_counter);
std::println("Lost updates: {}", num_threads * iterations_per_thread - g_counter);

return 0;

If we run the code we get the following output:

Test 1: WITHOUT mutex protection
Expected: 1000000
Actual:   500000
Lost updates: 500000
Test 2: WITH mutex protection (using lock_guard)
Expected: 1000000
Actual:   1000000
Lost updates: 0

This example clearly shows that is very important to synchronising access to resources when multiple threads are modifying them. The complete example is shown below.

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <print>


// Shared resource that multiple threads will modify
int g_counter = 0;
std::mutex g_counter_mutex;

void increment_counter_unsafe(int iterations)
{
    for (auto i = 0; i < iterations; i++)
        g_counter++;
}

void increment_counter_safe(int iterations)
{
    for (auto i = 0; i < iterations; i++)
    {
        std::lock_guard<std::mutex> lock(g_counter_mutex);
        g_counter++;
    }
}

int main()
{
    const int num_threads = 10;
    const int iterations_per_thread = 100000;
    
    // Test 1: Without mutex (race condition)
 
    std::println("Test 1: WITHOUT mutex protection");
    g_counter = 0;
    
    {
        std::vector<std::jthread> threads;
        for (int i = 0; i < num_threads; i++)
            threads.emplace_back(increment_counter_unsafe, iterations_per_thread);
    }

    std::println("Expected: {}", num_threads * iterations_per_thread);
    std::println("Actual:   {}", g_counter);
    std::println("Lost updates: {}", num_threads * iterations_per_thread - g_counter);
    
    // Test 2: With mutex (correct)
    std::println("Test 2: WITH mutex protection (using lock_guard)");
    g_counter = 0;
    
    {
        std::vector<std::jthread> threads;
        for (int i = 0; i < num_threads; i++)
            threads.emplace_back(increment_counter_safe, iterations_per_thread);
    }
    
    std::println("Expected: {}", num_threads * iterations_per_thread);
    std::println("Actual:   {}", g_counter);
    std::println("Lost updates: {}", num_threads * iterations_per_thread - g_counter);
    
    return 0;
}

Try example

Practical Parallel Computing

To illustrate the limits of threading, when it works and when it doesn’t, we are going to implement 2 similar applications. One that implements the classical SAXPY algorithm and another that is more compute heavy.

Implementing SAXPY using threads and lambda-functions

The SAXPY operation is a very common BLAS operation, which performs the following operation:

\[\mathbf{y} = \alpha \mathbf{x} + \mathbf{y}\]

A serial naive version of this operation can be implemented in C++ using the following function:

void saxpy_serial(double z[], double a, double x[], double y[], long n)
{
    std::println("Performing SAXPY serially.");

    for (long i = 0; i < n; i++)
        z[i] = a * x[i] + y[i];
}

It loops over all elements in x and y and computes the elements in the z array.

In the parallel version we are going to use jthreads combined with a lamda function for the actual SAXPY operation. Also, we are going to hide all the threading inside the parallel version of the function, so the function will have the same arguments as the serial version.

void saxpy(double z[], double a, double x[], double y[], long n)

Next we define the number of threads to use and setup our lambda function. Note that we use std::min() and std::thread::hardware_concurrency() to determine the number of threads we are going to use. The lambda function is quite simple as it can operate directly on the incoming arrays. The arguments are the start and end positions which will be provided for each thread. We can see here that we can reuse our lambda function for multiple threads.

const auto numThreads = std::min(8u, std::thread::hardware_concurrency());

std::println("Performing SAXPY with {0} threads.", numThreads);

auto lambda = [z, a, x, y](long start, long end) {
    for (long i = start; i < end; i++)
        z[i] = a * x[i] + y[i];
};

In the next step we create a vector of our threads and a loop to compute the range for each thread and create each thread using the .emplace_back() method of the vector.

std::vector<std::jthread> threads;
long chunkSize = n / numThreads;

std::print("Chunk size = {0}\n", chunkSize);

for (unsigned int i = 0; i < numThreads; i++)
{
    long start = i * chunkSize;
    long end = (i == numThreads - 1) ? n : (i + 1) * chunkSize;
    threads.emplace_back(lambda, start, end);
}

But where do we wait on thread completion? As we are implementing this in a function with its own scope we can rely on the std::jsthread destructor to do an automatic join when the object goes out of scope, which happens at the end of the function.

In the main program we allocate our arrays using unique pointers avoiding new and delete. We also initalise the arrays to some values.

const int N = 200000000; // 100 million elements

std::print("Allocating arrays of size N = {0}...\n", N);
std::print("Allocated size in MB = {0}\n", (3 * N * sizeof(double)) / (1024.0 * 1024.0));

auto x = std::make_unique<double[]>(N);
auto y = std::make_unique<double[]>(N);
auto z = std::make_unique<double[]>(N);

for (auto i = 0; i < N; i++)
{
    x[i] = static_cast<double>(i);
    y[i] = static_cast<double>(i * 2);
}

We then run our functions and measure time using the functions in the std::chrono::high_resolution_clock::now() function in the standard library.

auto startTimeThreads = std::chrono::high_resolution_clock::now();
saxpy(z.get(), 3.14, x.get(), y.get(), N);
auto endTimeThreads = std::chrono::high_resolution_clock::now();

std::print("Completed SAXPY (threaded) with N = {0} elements.\n", N);

auto startTimeSerial = std::chrono::high_resolution_clock::now();
saxpy_serial(z.get(), 3.14, x.get(), y.get(), N);
auto endTimeSerial = std::chrono::high_resolution_clock::now();

std::print("Completed SAXPY (serial) with N = {0} elements.\n", N);

auto timeThreads = std::chrono::duration<double>(endTimeThreads - startTimeThreads).count();
auto timeSerial = std::chrono::duration<double>(endTimeSerial - startTimeSerial).count();

std::print("Serial execution time = {0} seconds.\n", timeSerial);
std::print("Thread execution time = {0} seconds.\n", timeThreads);
std::print("Speedup = {0}x\n", timeSerial / timeThreads);

What speedup should we expect from this implementation? Since the SAXPY operation is very simple (just one multiplication and one addition per element), we might initially expect good parallel speedup. However, as we will see, the reality is quite different due to memory bandwidth constraints.

When we run the code we get the following output:

Allocating arrays of size N = 200000000...
Allocated size in MB = 4577.63671875
Performing SAXPY with 8 threads.
Chunk size = 25000000
Completed SAXPY (threaded) with N = 200000000 elements.
Performing SAXPY serially.
Completed SAXPY (serial) with N = 200000000 elements.
Serial execution time = 0.2421141 seconds.
Thread execution time = 0.2416036 seconds.
Speedup = 1.002112965204161x

But that can’t be right? A speedup of only 1.002? Why does this happen? This bad performance is not related compute limitation but on memory bandwidth. The SAXPY operation:

z[i] = a * x[i] + y[i]

performs only 2 floating point operations (multiplication and addition) but requires:

  • 2 memory reads - x[i] and y[i]

  • 1 memory write - z[i]

This makes the implementation memory-bound rather than compute-bound. The main problem is that modern CPUs can execute many floating point operations per cycle, but memory bandwidth is shared across all cores. Running 8 threads will try to read/write 24 values simultaneously, saturating the memory subsystem. It doesn’t help to add threads as they are competing for the same resources.

The serial code is as fast because it by itself can saturate the memory subsystem. Also there is a significant overhead of thread creation and synchronisation. Running in serial for these cases also doesn’t have any problems with cache coherency and cache line conflicts.

Threading helps when you have more compute heavy codes performing more operations per memory access. Examples can be matrix multiplication, complex calculations and encryption. In the next section we are going to create a more compute heavy application.

The complete example is available here:

#include <chrono>
#include <memory>
#include <thread>
#include <print>
#include <vector>

using namespace std;

void saxpy_serial(double z[], double a, double x[], double y[], long n)
{
    std::println("Performing SAXPY serially.");

    for (long i = 0; i < n; i++)
        z[i] = a * x[i] + y[i];
}

void saxpy(double z[], double a, double x[], double y[], long n)
{
    const auto numThreads = std::min(8u, std::thread::hardware_concurrency());

    std::println("Performing SAXPY with {0} threads.", numThreads);
    
    auto lambda = [z, a, x, y](long start, long end) {
        for (long i = start; i < end; i++)
            z[i] = a * x[i] + y[i];
    };

    std::vector<std::jthread> threads;
    long chunkSize = n / numThreads;

    std::print("Chunk size = {0}\n", chunkSize);
    
    for (unsigned int i = 0; i < numThreads; i++)
    {
        long start = i * chunkSize;
        long end = (i == numThreads - 1) ? n : (i + 1) * chunkSize;
        threads.emplace_back(lambda, start, end);
    }
    
    // jthread automatically joins on destruction
}

int main()
{
    const int N = 200000000; // 100 million elements

    std::print("Allocating arrays of size N = {0}...\n", N);
    std::print("Allocated size in MB = {0}\n", (3 * N * sizeof(double)) / (1024.0 * 1024.0));

    auto x = std::make_unique<double[]>(N);
    auto y = std::make_unique<double[]>(N);
    auto z = std::make_unique<double[]>(N);

    for (auto i = 0; i < N; i++)
    {
        x[i] = static_cast<double>(i);
        y[i] = static_cast<double>(i * 2);
    }

    auto startTimeThreads = std::chrono::high_resolution_clock::now();
    saxpy(z.get(), 3.14, x.get(), y.get(), N);
    auto endTimeThreads = std::chrono::high_resolution_clock::now();

    std::print("Completed SAXPY (threaded) with N = {0} elements.\n", N);

    auto startTimeSerial = std::chrono::high_resolution_clock::now();
    saxpy_serial(z.get(), 3.14, x.get(), y.get(), N);
    auto endTimeSerial = std::chrono::high_resolution_clock::now();

    std::print("Completed SAXPY (serial) with N = {0} elements.\n", N);

    auto timeThreads = std::chrono::duration<double>(endTimeThreads - startTimeThreads).count();
    auto timeSerial = std::chrono::duration<double>(endTimeSerial - startTimeSerial).count();

    std::print("Serial execution time = {0} seconds.\n", timeSerial);
    std::print("Thread execution time = {0} seconds.\n", timeThreads);
    std::print("Speedup = {0}x\n", timeSerial / timeThreads);

    return 0;
}

Try example

A compute heavy example

To illustrate a more compute heavy example we first create a function doing some heave computational operations on an array. The function doesn’t perform anything more important than make sure our processor cores are busy. Arguments are the array and start and end positions for the operations.

// We will use somewhat more functions from the C++ library ;)

#include <print>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cmath>
#include <memory>
#include <thread>
#include <vector>

void heavyComputation(double *arr, size_t start, size_t end)
{
    for (size_t i = start; i < end; ++i)
    {
        arr[i] = std::sin(arr[i]) * std::cos(arr[i]) * std::sqrt(std::abs(arr[i]));
        for (int j = 0; j < 100; ++j)
            arr[i] = std::sin(arr[i]);
    }
}

To make a comparison with a serial version of the code we implement a function, processSerial() that calls the heavyComputation() function for the entire array.

void processSequential(double *data, size_t size)
{
    heavyComputation(data, 0, size);
}

The threaded version of the this function is implemented in processParallel(). This function creates a vector of threads based on the numThreads argument. The threads call the heavyComputation() function with a specific unique range. Thread synchronisation is automatic as the std::jthread will automatically join when its destructor is called.

void processParallel(double *data, size_t size, int numThreads)
{
    std::vector< std::jthread > threads;
    threads.reserve(numThreads);

    size_t chunkSize = size / numThreads;

    for (int i = 0; i < numThreads; ++i)
    {
        size_t start = i * chunkSize;
        size_t end = (i == numThreads - 1) ? size : (i + 1) * chunkSize;
        threads.emplace_back(heavyComputation, data, start, end);
    }
}

In the main program we setup some constants for the test. We use all available threads by assigning numThreads the value from the function std::thread::hardware_concurrency().

const size_t dataSize = 5000000;
int numThreads = std::thread::hardware_concurrency();

std::print("Data size: {0}\n", dataSize);
std::print("Number of threads: {0}\n", numThreads);

To follow C++ best practice regarding memory allocation we will use unique pointers and std::make_unique<> to create the memory for the arrays. This will automatically handle cleanup of the array. We create two arrays one for the serial version and one for the parallel version. We also initialise the arrays using an algorithm from the standard library, std::generate_n(), to assign the value 1.0 to all elements. This to make sure our computations are stable.

std::print("Allocating arrays...\n");

auto seqData = std::make_unique< double[] >(dataSize);
auto parData = std::make_unique< double[] >(dataSize);

std::print("Initialising arrays...\n");

std::generate_n(seqData.get(), dataSize, []() { return 1.0; });

Now it is time to execute our serial version of our computations and measure the performance. We measure the time and calculate the speedup.

std::print("Running serially...\n");

auto start = std::chrono::high_resolution_clock::now();
processSequential(seqData.get(), dataSize);
auto end = std::chrono::high_resolution_clock::now();

std::chrono::duration< double > elapsedSerially = end - start;
std::print("Time for sequential processing: {0} seconds\n", elapsedSerially.count());

std::print("Running in parallel...\n");

start = std::chrono::high_resolution_clock::now();
processParallel(parData.get(), dataSize, numThreads);
end = std::chrono::high_resolution_clock::now();

std::chrono::duration< double > elapsedParallel = end - start;

std::print("Time for parallel processing: {0} seconds\n", elapsedParallel.count());
std::print("Speedup: {0}\n", elapsedSerially.count() / elapsedParallel.count());

Running this code produce the following output:

Data size: 5000000
Number of threads: 12
Allocating arrays...
Initialising arrays...
Running serially...
Time for sequential processing: 3.114036 seconds
Running in parallel...
Time for parallel processing: 0.2145049 seconds
Speedup: 14.517318718593375

This is a significant improvement from the SAXPY results. Here we are compute-bound and having more threads running on more cores decreases the computational time.

The complete code is available below:

#include <print>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cmath>
#include <memory>
#include <thread>
#include <vector>

// Function to perform a computationally intensive task
void heavyComputation(double *arr, size_t start, size_t end)
{
    for (size_t i = start; i < end; ++i)
    {
        arr[i] = std::sin(arr[i]) * std::cos(arr[i]) * std::sqrt(std::abs(arr[i]));
        for (int j = 0; j < 100; ++j)
            arr[i] = std::sin(arr[i]);
    }
}

// Function to process data sequentially
void processSequential(double *data, size_t size)
{
    heavyComputation(data, 0, size);
}

void processParallel(double *data, size_t size, int numThreads)
{
    std::vector< std::jthread > threads;
    threads.reserve(numThreads);

    size_t chunkSize = size / numThreads;

    for (int i = 0; i < numThreads; ++i)
    {
        size_t start = i * chunkSize;
        size_t end = (i == numThreads - 1) ? size : (i + 1) * chunkSize;
        threads.emplace_back(heavyComputation, data, start, end);
    }
}

int main()
{
    const size_t dataSize = 5000000;
    int numThreads = std::thread::hardware_concurrency();

    std::print("Data size: {0}\n", dataSize);
    std::print("Number of threads: {0}\n", numThreads);

    std::print("Allocating arrays...\n");

    auto seqData = std::make_unique< double[] >(dataSize);
    auto parData = std::make_unique< double[] >(dataSize);

    std::print("Initialising arrays...\n");

    std::generate_n(seqData.get(), dataSize, []() { return 1.0; });

    std::print("Running serially...\n");

    auto start = std::chrono::high_resolution_clock::now();
    processSequential(seqData.get(), dataSize);
    auto end = std::chrono::high_resolution_clock::now();

    std::chrono::duration< double > elapsedSerially = end - start;
    std::print("Time for sequential processing: {0} seconds\n", elapsedSerially.count());

    std::print("Running in parallel...\n");

    start = std::chrono::high_resolution_clock::now();
    processParallel(parData.get(), dataSize, numThreads);
    end = std::chrono::high_resolution_clock::now();

    std::chrono::duration< double > elapsedParallel = end - start;

    std::print("Time for parallel processing: {0} seconds\n", elapsedParallel.count());
    std::print("Speedup: {0}\n", elapsedSerially.count() / elapsedParallel.count());

    return 0;
}

Try example

Advanced Thread Patterns (Atomics)

There are other ways of implementing synchronisation that has lower performance penalties that using mutexes and lock guards. This can be done using the std::atomic<T> datatype. This data type uses hardware atomic instructions such as LOCK XADD on x86. This means that the thread never blocks - it either succeeds immediately or retries in a tight loop. There is also no OS involvement which removes any context switches. Usage of atomics is often referred to as lock-free programming and can have large performance benefits.

Note

When to use atomics vs mutexes:

  • Use std::atomic<T> for simple operations on single variables (counters, flags, simple accumulation)

  • Use std::mutex with std::lock_guard when you need to protect multiple operations or complex data structures

  • Atomics have lower overhead but are limited to simple types and operations

  • For anything more complex than updating a single value, use a mutex

To illustrate how atomics can be used we will modify the heavy computing example to use atomics calculate a sum of the computations from all threads.

First, the heavyComputation() function is modified to take an atomic value as input. The thread adds it local sum to this atomic value. Already here you can see how easy it is to use atomics. No more mutex management.

#include <algorithm>
#include <atomic>
#include <chrono>
#include <cmath>
#include <memory>
#include <thread>
#include <vector>
#include <print>

// Function to perform a computationally intensive task
void heavyComputation(double *arr, size_t start, size_t end, std::atomic<double> &sum)
{
    double localSum = 0.0;

    for (size_t i = start; i < end; ++i)
    {
        arr[i] = std::sin(arr[i]) * std::cos(arr[i]) * std::sqrt(std::abs(arr[i]));
        for (int j = 0; j < 100; ++j)
        {
            arr[i] = std::sin(arr[i]);
        }
        localSum += arr[i];
    }

    // Atomically add localSum to sum

    sum += localSum;
}

As we will reuse the heavyComputation() function we need a processSequentially() function to call when timing the results. We are also implement a function for serially initialising the array.

// Function to process data sequentially
void processSequential(double *data, size_t size, std::atomic<double> &sum)
{
    heavyComputation(data, 0, size, sum);
}

void initialiseArray(double *arr, size_t start, size_t end, double value)
{
    for (size_t i = start; i < end; ++i)
        arr[i] = value;
}

In this example for the parallel example we will also intialise the array in parallel, so we need a function that does this. This is very similar to our previous examples using a vector of std::jthreads. No atomics needed here as all threads are working on individual parts of the array.

void initialiseParallel(double *data, size_t size, double value)
{
    size_t numThreads = std::thread::hardware_concurrency();
    size_t chunkSize = size / numThreads;

    std::vector<std::jthread> threads;
    threads.reserve(numThreads);

    for (size_t i = 0; i < numThreads; ++i)
    {
        size_t start = i * chunkSize;
        size_t end = (i == numThreads - 1) ? size : (i + 1) * chunkSize;
        threads.emplace_back(initialiseArray, data, start, end, value);
    }

    // jthread automatically joins when it goes out of scope - no manual join needed
}

The parallel processing is implemented in the processParallel() function. Also very similar approach as for the initialise function. However we need to use a special function, std::ref() to pass the atomic variable to the .emplace_back() method. Otherwise, no magic.

void processParallel(double *data, size_t size, int numThreads, std::atomic<double> &sum)
{
    std::vector<std::jthread> threads;
    threads.reserve(numThreads);

    size_t chunkSize = size / numThreads;

    for (int i = 0; i < numThreads; ++i)
    {
        size_t start = i * chunkSize;
        size_t end = (i == numThreads - 1) ? size : (i + 1) * chunkSize;
        threads.emplace_back(heavyComputation, data, start, end, std::ref(sum));
    }
}

The main function is very similar to our previous examples. First we initialise some constants and arrays. We create two arrays and two atomic sums.

const size_t dataSize = 800000;
const int numThreads = std::thread::hardware_concurrency();

std::printf("Data size: %zu\n", dataSize);
std::printf("Number of threads: %d\n", numThreads);

std::print("Allocating arrays...\n");

auto seqData = std::make_unique<double[]>(dataSize);
auto parData = std::make_unique<double[]>(dataSize);

std::atomic<double> seqSum{0.0};
std::atomic<double> parSum{0.0};

The serial code the becomes:

std::print("Initialising arrays serially...\n");

auto start = std::chrono::high_resolution_clock::now();
std::generate_n(seqData.get(), dataSize, []() { return 1.0; });
auto end = std::chrono::high_resolution_clock::now();

std::chrono::duration< double > elapsedInitSerially = end - start;

std::printf("Time for serial initialisation: %f seconds\n", elapsedInitSerially.count());

std::print("Running serially...\n");

start = std::chrono::high_resolution_clock::now();
processSequential(seqData.get(), dataSize, seqSum);
end = std::chrono::high_resolution_clock::now();

std::chrono::duration<double> elapsedSerially = end - start;

std::printf("Time for sequential processing: %f seconds\n", elapsedSerially.count());
std::printf("Sum: %f\n", seqSum.load());

To retrieve the value of an atomic you use the .load() method. The parallel version is very similar.

std::print("Initialising arrays in parallel...\n");

start = std::chrono::high_resolution_clock::now();
initialiseParallel(parData.get(), dataSize, 1.0);
end = std::chrono::high_resolution_clock::now();

std::chrono::duration<double> elapsedInitParallel = end - start;

std::printf("Time for parallel initialisation: %f seconds\n", elapsedInitParallel.count());

std::print("Running in parallel...\n");

start = std::chrono::high_resolution_clock::now();
processParallel(parData.get(), dataSize, numThreads, parSum);
end = std::chrono::high_resolution_clock::now();

std::chrono::duration<double> elapsedParallel = end - start;

std::printf("Time for parallel processing: %f seconds\n", elapsedParallel.count());
std::printf("Sum: %f\n", parSum.load());
std::printf("Speedup: %fx\n", elapsedSerially.count() / elapsedParallel.count());

return 0;

Running the code produces the following output:

Data size: 800000
Number of threads: 12
Allocating arrays...
Initialising arrays serially...
Time for serial initialisation: 0.000182 seconds
Running serially...
Time for sequential processing: 0.506693 seconds
Sum: 128779.329092
Initialising arrays in parallel...
Time for parallel initialisation: 0.001853 seconds
Running in parallel...
Time for parallel processing: 0.074658 seconds
Sum: 128779.329093
Speedup: 6.786868x

Here we also see a speedup, but not as significant as for the previous example, but here we also have added the complexity of summarising the data. We can also see that it seems that the parallel version of the intialisation was memory-bound, so we could have used the serial version of the initialisation function to initialise the array.

The complete code is available below:

#include <algorithm>
#include <atomic>
#include <chrono>
#include <cmath>
#include <memory>
#include <thread>
#include <vector>
#include <print>

// Function to perform a computationally intensive task
void heavyComputation(double *arr, size_t start, size_t end, std::atomic<double> &sum)
{
    double localSum = 0.0;

    for (size_t i = start; i < end; ++i)
    {
        arr[i] = std::sin(arr[i]) * std::cos(arr[i]) * std::sqrt(std::abs(arr[i]));
        for (int j = 0; j < 100; ++j)
        {
            arr[i] = std::sin(arr[i]);
        }
        localSum += arr[i];
    }

    // Atomically add localSum to sum

    sum += localSum;

    /*
    double current = sum.load();
    while (!sum.compare_exchange_weak(current, current + localSum))
        ;
    */
}

void initialiseArray(double *arr, size_t start, size_t end, double value)
{
    for (size_t i = start; i < end; ++i)
        arr[i] = value;
}

// Function to process data sequentially
void processSequential(double *data, size_t size, std::atomic<double> &sum)
{
    heavyComputation(data, 0, size, sum);
}

void initialiseParallel(double *data, size_t size, double value)
{
    size_t numThreads = std::thread::hardware_concurrency();
    size_t chunkSize = size / numThreads;

    std::vector<std::jthread> threads;
    threads.reserve(numThreads);

    for (size_t i = 0; i < numThreads; ++i)
    {
        size_t start = i * chunkSize;
        size_t end = (i == numThreads - 1) ? size : (i + 1) * chunkSize;
        threads.emplace_back(initialiseArray, data, start, end, value);
    }

    // jthread automatically joins when it goes out of scope - no manual join needed
}

void processParallel(double *data, size_t size, int numThreads, std::atomic<double> &sum)
{
    std::vector<std::jthread> threads;
    threads.reserve(numThreads);

    size_t chunkSize = size / numThreads;

    for (int i = 0; i < numThreads; ++i)
    {
        size_t start = i * chunkSize;
        size_t end = (i == numThreads - 1) ? size : (i + 1) * chunkSize;
        threads.emplace_back(heavyComputation, data, start, end, std::ref(sum));
    }

    // jthread automatically joins when threads vector goes out of scope
    // Manual join is not needed and would cause issues
}

int main()
{
    const size_t dataSize = 800000;
    const int numThreads = std::thread::hardware_concurrency();

    std::printf("Data size: %zu\n", dataSize);
    std::printf("Number of threads: %d\n", numThreads);

    std::print("Allocating arrays...\n");

    auto seqData = std::make_unique<double[]>(dataSize);
    auto parData = std::make_unique<double[]>(dataSize);

    std::atomic<double> seqSum{0.0};
    std::atomic<double> parSum{0.0};

    // ----- SERIAL CODE ----

    std::print("Initialising arrays serially...\n");

    auto start = std::chrono::high_resolution_clock::now();
    std::generate_n(seqData.get(), dataSize, []() { return 1.0; });
    auto end = std::chrono::high_resolution_clock::now();

    std::chrono::duration< double > elapsedInitSerially = end - start;

    std::printf("Time for serial initialisation: %f seconds\n", elapsedInitSerially.count());

    std::print("Running serially...\n");

    start = std::chrono::high_resolution_clock::now();
    processSequential(seqData.get(), dataSize, seqSum);
    end = std::chrono::high_resolution_clock::now();

    std::chrono::duration<double> elapsedSerially = end - start;

    std::printf("Time for sequential processing: %f seconds\n", elapsedSerially.count());
    std::printf("Sum: %f\n", seqSum.load());

    // ----- PARALLEL CODE -----

    std::print("Initialising arrays in parallel...\n");

    start = std::chrono::high_resolution_clock::now();
    initialiseParallel(parData.get(), dataSize, 1.0);
    end = std::chrono::high_resolution_clock::now();

    std::chrono::duration<double> elapsedInitParallel = end - start;
    
    std::printf("Time for parallel initialisation: %f seconds\n", elapsedInitParallel.count());

    std::print("Running in parallel...\n");

    start = std::chrono::high_resolution_clock::now();
    processParallel(parData.get(), dataSize, numThreads, parSum);
    end = std::chrono::high_resolution_clock::now();

    std::chrono::duration<double> elapsedParallel = end - start;

    std::printf("Time for parallel processing: %f seconds\n", elapsedParallel.count());
    std::printf("Sum: %f\n", parSum.load());
    std::printf("Speedup: %fx\n", elapsedSerially.count() / elapsedParallel.count());

    return 0;
}

Try example

Async and Futures

There are more ways of implementing concurrency in C++. If your application is more task oriented and doesn’t rely on shared data the std::async() call combined with the std::future return value can be an effective way of implementing concurrency. The async call is like a non-blocking function call that returns the control back to the main thread. A return value from an async call is something called a future. This is like a placeholder of a future return value. When you want the return value you call the .get() method of the future. If the async call has finished the method will immediately return the computed return value otherwise it will block until the async call is completed. You can only call the .get() method once. Calling it multiple times will generate exceptions. It is also possible to check if there is a valid shared state using the method .valid(). It is also possible to wait for a result using the methods .wait() or .wait_for()/wait_until().

To illustrate how async and future can be used we will use our heavy computation example as a base and create functions returning a sum of a complicated computation as a result. The function we will is as follows:

#include <algorithm>
#include <chrono>
#include <cmath>
#include <future>
#include <memory>
#include <print>
#include <thread>
#include <vector>

// Function to perform a computationally intensive task
double heavyComputation(double *arr, size_t start, size_t end)
{
    double localSum = 0.0;

    for (size_t i = start; i < end; ++i)
    {
        arr[i] = std::sin(arr[i]) * std::cos(arr[i]) * std::sqrt(std::abs(arr[i]));
        for (int j = 0; j < 1000; ++j)
        {
            arr[i] = std::sin(arr[i]);
        }
        localSum += arr[i];
    }

    return localSum;
}

There is nothing special with this function. It just returns the local sum. For the asynchronous processing we implement a special function processAsync() that will do the heavy lifting. As we are going to use the std::async() function we need to be able to manage the std::future that the async function will return. We do that by declaring a vector containing a list of future instances.

double processAsync(double *data, size_t size, int numThreads)
{
    std::vector< std::future< double > > futures;
    futures.reserve(numThreads);

Next we will be making a number of calls to the async method in a loop and push back the return futures from the calls. In this case we need to use the .push_back() method because we need to catch the return values from the async call.

size_t chunkSize = size / numThreads;

for (int i = 0; i < numThreads; ++i)
{
    size_t start = i * chunkSize;
    size_t end = (i == numThreads - 1) ? size : (i + 1) * chunkSize;
    futures.push_back(std::async(std::launch::async, heavyComputation, data, start, end));
}

To calculate the sum we can just loop over our future list and call the .get() method. There is no risk of any synchronisation problems here as the method will block until the task has completed before returning a value. Finally we return the sum.

    double sum = 0.0;

    for (auto &future : futures)
        sum += future.get();

    return sum;
}

The serial sum just calls the heavyComputation() method for all elements.

double processSequential(double *data, size_t size)
{
    return heavyComputation(data, 0, size);
}

Our main program is very similar to our previous examples. We allocate two arrays one for each test using unique pointers. We initialise the arrays using the std::generate_n() algorithm. We use function from the <chrono> include from the standard library to measure execution times.

const size_t dataSize = 5000000;
const int numThreads = std::thread::hardware_concurrency();

std::println("Data size: {}", dataSize);
std::println("Number of threads: {}\n", numThreads);

std::println("Allocating and initializing arrays...");
auto seqData = std::make_unique< double[] >(dataSize);
auto parData = std::make_unique< double[] >(dataSize);

std::generate_n(seqData.get(), dataSize, []() { return 1.0; });
std::generate_n(parData.get(), dataSize, []() { return 1.0; });

std::println("Running sequentially...");
auto startSeq = std::chrono::high_resolution_clock::now();
double seqSum = processSequential(seqData.get(), dataSize);
auto endSeq = std::chrono::high_resolution_clock::now();

auto elapsedSeq = std::chrono::duration< double >(endSeq - startSeq).count();
std::println("Sequential - Time: {:.4f} s, Sum: {:.6f}\n", elapsedSeq, seqSum);

std::println("Running in parallel...");
auto startPar = std::chrono::high_resolution_clock::now();
double parSum = processAsync(parData.get(), dataSize, numThreads);
auto endPar = std::chrono::high_resolution_clock::now();

auto elapsedPar = std::chrono::duration< double >(endPar - startPar).count();
std::println("Parallel   - Time: {:.4f} s, Sum: {:.6f}\n", elapsedPar, parSum);

// Results summary
std::println("--- Performance Metrics ---");
std::println("Speedup: {:.2f}x", elapsedSeq / elapsedPar);
std::println("Efficiency: {:.1f}%", (elapsedSeq / elapsedPar / numThreads) * 100);

double sumDiff = std::abs(seqSum - parSum);
double relativeError = sumDiff / std::abs(seqSum);
std::println("Sum difference: {:.2e} (relative error: {:.2e})", sumDiff, relativeError);

// Verify results are reasonably close
if (relativeError < 1e-10)
    std::println("OK - Results match within acceptable tolerance");
else
    std::println("Warning: Large difference between sequential and parallel sums");

return 0;

When running the program we get the following results.

Data size: 5000000
Number of threads: 12

Allocating and initializing arrays...
Running sequentially...
Sequential - Time: 30.9081 s, Sum: 271552.176301

Running in parallel...
Parallel   - Time: 3.0463 s, Sum: 271552.176326

--- Performance Metrics ---
Speedup: 10.15x
Efficiency: 84.5%
Sum difference: 2.45e-05 (relative error: 9.03e-11)
OK - Results match within acceptable tolerance

This example is compute bound and the tasks are quite self contained only returning a modified array and a sum, which is a perfect example of a suitable use case for the async/future combination in the standard library.

#include <algorithm>
#include <chrono>
#include <cmath>
#include <future>
#include <memory>
#include <print>
#include <thread>
#include <vector>

// Function to perform a computationally intensive task
double heavyComputation(double *arr, size_t start, size_t end)
{
    double localSum = 0.0;

    for (size_t i = start; i < end; ++i)
    {
        arr[i] = std::sin(arr[i]) * std::cos(arr[i]) * std::sqrt(std::abs(arr[i]));
        for (int j = 0; j < 1000; ++j)
        {
            arr[i] = std::sin(arr[i]);
        }
        localSum += arr[i];
    }

    return localSum;
}

// Function to process data sequentially
double processSequential(double *data, size_t size)
{
    return heavyComputation(data, 0, size);
}

// Function to process data in parallel using std::async
double processAsync(double *data, size_t size, int numThreads)
{
    std::vector< std::future< double > > futures;
    futures.reserve(numThreads);
    
    size_t chunkSize = size / numThreads;

    for (int i = 0; i < numThreads; ++i)
    {
        size_t start = i * chunkSize;
        size_t end = (i == numThreads - 1) ? size : (i + 1) * chunkSize;
        futures.push_back(std::async(std::launch::async, heavyComputation, data, start, end));
    }

    double sum = 0.0;

    for (auto &future : futures)
        sum += future.get();

    return sum;
}

int main()
{
    const size_t dataSize = 5000000;
    const int numThreads = std::thread::hardware_concurrency();

    std::println("Data size: {}", dataSize);
    std::println("Number of threads: {}\n", numThreads);

    // Allocate separate arrays for sequential and parallel processing
    std::println("Allocating and initializing arrays...");
    auto seqData = std::make_unique< double[] >(dataSize);
    auto parData = std::make_unique< double[] >(dataSize);

    std::generate_n(seqData.get(), dataSize, []() { return 1.0; });
    std::generate_n(parData.get(), dataSize, []() { return 1.0; });

    // Sequential processing
    std::println("Running sequentially...");
    auto startSeq = std::chrono::high_resolution_clock::now();
    double seqSum = processSequential(seqData.get(), dataSize);
    auto endSeq = std::chrono::high_resolution_clock::now();

    auto elapsedSeq = std::chrono::duration< double >(endSeq - startSeq).count();
    std::println("Sequential - Time: {:.4f} s, Sum: {:.6f}\n", elapsedSeq, seqSum);

    // Parallel processing
    std::println("Running in parallel...");
    auto startPar = std::chrono::high_resolution_clock::now();
    double parSum = processAsync(parData.get(), dataSize, numThreads);
    auto endPar = std::chrono::high_resolution_clock::now();

    auto elapsedPar = std::chrono::duration< double >(endPar - startPar).count();
    std::println("Parallel   - Time: {:.4f} s, Sum: {:.6f}\n", elapsedPar, parSum);

    // Results summary
    std::println("--- Performance Metrics ---");
    std::println("Speedup: {:.2f}x", elapsedSeq / elapsedPar);
    std::println("Efficiency: {:.1f}%", (elapsedSeq / elapsedPar / numThreads) * 100);
    
    double sumDiff = std::abs(seqSum - parSum);
    double relativeError = sumDiff / std::abs(seqSum);
    std::println("Sum difference: {:.2e} (relative error: {:.2e})", sumDiff, relativeError);
    
    // Verify results are reasonably close
    if (relativeError < 1e-10)
        std::println("OK - Results match within acceptable tolerance");
    else
        std::println("Warning: Large difference between sequential and parallel sums");

    return 0;
}

Try example

Producer-Consumer Pattern with Condition Variables

One of the most common patterns in concurrent programming is the Producer-Consumer pattern. In this pattern, one or more producer threads generate data and place it in a shared buffer, while one or more consumer threads retrieve and process this data. The challenge is coordinating access to the shared buffer so that:

  • Producers wait when the buffer is full

  • Consumers wait when the buffer is empty

  • Multiple threads don’t corrupt the buffer

While we could implement this using just mutexes and busy-waiting (checking repeatedly if a condition is met), this wastes CPU cycles. A better approach is to use std::condition_variable, which allows threads to efficiently wait for a condition to become true.

Understanding Condition Variables

A std::condition_variable is a synchronization primitive that allows threads to wait until notified by another thread. It works together with a std::mutex and typically follows this pattern:

Waiting thread:

  1. Lock the mutex

  2. Check if condition is met

  3. If not, call .wait() which atomically unlocks the mutex and puts the thread to sleep

  4. When notified, the thread wakes up and automatically reacquires the lock

  5. Recheck the condition (spurious wakeups can occur)

  6. Proceed when condition is met

Notifying thread:

  1. Lock the mutex

  2. Modify shared data

  3. Unlock the mutex

  4. Call .notify_one() or .notify_all() to wake waiting threads

The key benefit is that waiting threads sleep instead of consuming CPU cycles in a busy-wait loop.

Implementing the Producer-Consumer Pattern

Let’s implement a thread-safe queue that demonstrates this pattern. First, we’ll create a SafeQueue class that encapsulates the buffer and synchronization:

template < typename T > class SafeQueue {
private:
    std::queue< T > m_queue;
    mutable std::mutex m_mutex;
    std::condition_variable m_cond;
    bool m_finished = false;
    size_t m_maxSize;

public:
    SafeQueue(size_t maxSize = 10) : m_maxSize(maxSize)
    {
    }

    // Producer: Add item to queue (blocks if full)
    void push(T item)
    {
        std::unique_lock< std::mutex > lock(m_mutex);

        // Wait until queue has space
        m_cond.wait(lock, [this]() { return m_queue.size() < m_maxSize || m_finished; });

        if (!m_finished)
        {
            m_queue.push(std::move(item));
            m_cond.notify_one(); // Notify a waiting consumer
        }
    }

    // Consumer: Remove item from queue (blocks if empty)
    std::optional< T > pop()
    {
        std::unique_lock< std::mutex > lock(m_mutex);

        // Wait until queue has data or is finished
        m_cond.wait(lock, [this]() { return !m_queue.empty() || m_finished; });

        if (m_queue.empty())
            return std::nullopt; // No more data

        T item = std::move(m_queue.front());
        m_queue.pop();
        m_cond.notify_one(); // Notify a waiting producer
        return item;
    }

    // Signal that no more items will be produced
    void finish()
    {
        std::unique_lock< std::mutex > lock(m_mutex);
        m_finished = true;
        m_cond.notify_all(); // Wake all waiting threads
    }

    size_t size() const
    {
        std::unique_lock< std::mutex > lock(m_mutex);
        return m_queue.size();
    }
};

The SafeQueue uses std::unique_lock instead of std::lock_guard because std::condition_variable needs a lock that can be unlocked and relocked (which happens during .wait()).

Now let’s create a complete example with multiple producers and consumers:

#include <chrono>
#include <print>
#include <random>
#include <thread>
#include <vector>

void producer(SafeQueue<int>& queue, int id, int itemCount)
{
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> dis(1, 100);
    std::uniform_int_distribution<> sleep(10, 100);

    for (int i = 0; i < itemCount; ++i)
    {
        int value = dis(gen);
        queue.push(value);
        std::println("Producer {} produced: {}", id, value);
        std::this_thread::sleep_for(std::chrono::milliseconds(sleep(gen)));
    }

    std::println("Producer {} finished", id);
}

void consumer(SafeQueue<int>& queue, int id, std::atomic<int>& totalConsumed)
{
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> sleep(50, 150);

    while (true)
    {
        auto item = queue.pop();
        if (!item.has_value())
        {
            std::println("Consumer {} finished (no more items)", id);
            break;
        }

        std::println("Consumer {} consumed: {}", id, item.value());
        totalConsumed++;
        std::this_thread::sleep_for(std::chrono::milliseconds(sleep(gen)));
    }
}

In the main function, we create multiple producers and consumers:

int main()
{
    const int numProducers = 3;
    const int numConsumers = 2;
    const int itemsPerProducer = 5;

    SafeQueue<int> queue(5);  // Buffer size of 5
    std::atomic<int> totalConsumed{0};

    std::println("Starting {} producers and {} consumers", numProducers, numConsumers);
    std::println("Each producer will create {} items\n", itemsPerProducer);

    // Start producers
    std::vector<std::jthread> producers;
    for (int i = 0; i < numProducers; ++i)
        producers.emplace_back(producer, std::ref(queue), i, itemsPerProducer);

    // Start consumers
    std::vector<std::jthread> consumers;
    for (int i = 0; i < numConsumers; ++i)
        consumers.emplace_back(consumer, std::ref(queue), i, std::ref(totalConsumed));

    // Wait for all producers to finish
    producers.clear();  // jthread joins on destruction

    std::println("\nAll producers finished. Signaling consumers...");
    queue.finish();

    // Wait for all consumers to finish
    consumers.clear();

    std::println("\nTotal items produced: {}", numProducers * itemsPerProducer);
    std::println("Total items consumed: {}", totalConsumed.load());

    return 0;
}

When we run this program, we see the interleaved output from multiple producers and consumers, demonstrating concurrent access to the shared queue:

Starting 3 producers and 2 consumers
Each producer will create 5 items

Producer 1 produced: 5
Consumer 0 consumed: 5
Producer 0 produced: 96
Producer 2 produced: 66
Consumer 1 consumed: 66
Consumer 0 consumed: 96
Producer 0 produced: 38
Consumer 1 consumed: 38
Producer 1 produced: 62
Producer 2 produced: 99
Producer 1 produced: 47
Consumer 0 consumed: 99
Producer 0 produced: 87
Producer 2 produced: 58
Producer 0 produced: 74
Consumer 1 consumed: 62
Producer 1 produced: 99
Consumer 1 consumed: 47
Producer 0 produced: 44
Consumer 0 consumed: 87
Producer 2 produced: 53
Producer 0 finished
Consumer 1 consumed: 58
Producer 1 produced: 94
Producer 1 finished
Consumer 0 consumed: 74
Producer 2 produced: 92
Consumer 1 consumed: 99
Consumer 0 consumed: 44
Producer 2 finished

All producers finished. Signaling consumers...
Consumer 1 consumed: 53
Consumer 1 consumed: 94
Consumer 0 consumed: 92
Consumer 0 finished (no more items)
Consumer 1 finished (no more items)

Total items produced: 15
Total items consumed: 15

Key takeaways from this pattern:

  • Efficiency: Threads sleep when waiting instead of busy-waiting

  • Safety: The mutex protects shared data from race conditions

  • Coordination: Condition variables enable efficient thread synchronization

  • Scalability: Works with any number of producers and consumers

  • Graceful shutdown: The finish() method allows clean termination

This pattern is fundamental in many applications: task queues, buffered I/O, event processing, and thread pools.

The complete code is available below:

#include <chrono>
#include <print>
#include <random>
#include <thread>

#include "safe_queue.h"

void producer(SafeQueue< int > &queue, int id, int itemCount)
{
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> dis(1, 100);
    std::uniform_int_distribution<> sleep(10, 100);

    for (int i = 0; i < itemCount; ++i)
    {
        int value = dis(gen);
        queue.push(value);
        std::println("Producer {} produced: {}", id, value);
        std::this_thread::sleep_for(std::chrono::milliseconds(sleep(gen)));
    }

    std::println("Producer {} finished", id);
}

void consumer(SafeQueue< int > &queue, int id, std::atomic< int > &totalConsumed)
{
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> sleep(50, 150);

    while (true)
    {
        auto item = queue.pop();

        if (!item.has_value())
        {
            std::println("Consumer {} finished (no more items)", id);
            break;
        }

        std::println("Consumer {} consumed: {}", id, item.value());
        totalConsumed++;
        std::this_thread::sleep_for(std::chrono::milliseconds(sleep(gen)));
    }
}

int main()
{
    const int numProducers = 3;
    const int numConsumers = 2;
    const int itemsPerProducer = 5;

    SafeQueue< int > queue(5); // Buffer size of 5
    std::atomic< int > totalConsumed{0};

    std::println("Starting {} producers and {} consumers", numProducers, numConsumers);
    std::println("Each producer will create {} items\n", itemsPerProducer);

    // Start producers
    std::vector< std::jthread > producers;
    for (int i = 0; i < numProducers; ++i)
        producers.emplace_back(producer, std::ref(queue), i, itemsPerProducer);

    // Start consumers
    std::vector< std::jthread > consumers;
    for (int i = 0; i < numConsumers; ++i)
        consumers.emplace_back(consumer, std::ref(queue), i, std::ref(totalConsumed));

    // Wait for all producers to finish
    producers.clear(); // jthread joins on destruction

    std::println("\nAll producers finished. Signaling consumers...");
    queue.finish();

    // Wait for all consumers to finish
    consumers.clear();

    std::println("\nTotal items produced: {}", numProducers * itemsPerProducer);
    std::println("Total items consumed: {}", totalConsumed.load());

    return 0;
}

Try example

Parallelism and the standard library

Since C++17 the standard library have been extended to support parallel execution of some of the built-in algorithms. The functionality is accessed through the <execution> include. To use the functionality you can tell the algorithm to use a specific execution policy. Currently available execution options are:

  • Sequenced - std::execution::seq - Executes just like normal standard library algorithms. Single threaded and strictly ordered. Baseline for correctnes and debugging.

  • Parallel - std::execution::par - Allows execution over multiple threads. Ordering is unspecified. Iterations may run concurrently but every element only processed once. You are reponsible for thread safety. You need to provide synchronisation to shared state.

  • Parallel and unsequenced - std::execution::par_unseq - Enables both multi-threading and SIMD operations. Execution order is completely unspecified and may overlap even within a single thread. Strongest performance potential, but requires you to give strict safety guarantees. No data dependencies between iterations, no locks, atomics or blocking synchronisation and no assumption about ordering.

A quick rule of thumb:

  • Use seq for correctness and debugging.

  • Use par for when iterations are indepenendent, but not vectorisation safe.

  • Use par_unseq for pure side-effect free numerical kernels.

In the following section we will illustrate how this concepts can be used with some of the common algorithms in the standard library.

Parallel for each

In this example we are going to check all numbers in a vector if they are a prime number or not. By doing this we will also have an operation that is compute-bound so that the parallelisation will be efficient. We start by implemeting a function for determining if an integer is a prime number or not. Perhaps not the most efficient implementation, but is works for this example.

#include <vector>
#include <print>
#include <algorithm>
#include <execution>
#include <chrono>

bool is_prime(int num)
{
    if (num < 2)
        return false;

    for (int i = 2; i * i <= num; ++i)
    {
        if (num % i == 0)
            return false;
    }
    return true;
};

We will use this function in our `for_each loop later. Next we initialise 2 vectors, one for a sequential version and one for the parallel version. We initialise the vectors with values from 0..N-1.

int main()
{
    const int N = 10000000;

    std::vector<int> seqVec(N);
    std::vector<int> parVec(N);

    std::generate(seqVec.begin(), seqVec.end(), [n = 0]() mutable { return n++; });
    std::generate(parVec.begin(), parVec.end(), [n = 0]() mutable { return n++; });

We are now ready to perform our first try of the parallel features of the standard library. As the algoritm doesn’t have any dependencies on previous computations we can use the policy std::execution::par_unseq. The call to the `std::for_each() function then becomes (with time keeping):

    auto startPar = std::chrono::high_resolution_clock::now();
std::for_each(std::execution::par_unseq, parVec.begin(), parVec.end(), [](int &n) {
    n = is_prime(n) ? n : 0;
});
auto endPar = std::chrono::high_resolution_clock::now();

    std::chrono::duration< double > durationPar = endPar - startPar;

The loop will write a 0 if n is a prime number and n if it is a prime number. In the end we will have an array with zeros and prime numbers only.

For comparison we can execute the same function with the std::execution::seq instead to validate that we get the same results.

    auto startSeq = std::chrono::high_resolution_clock::now();
std::for_each(std::execution::seq, seqVec.begin(), seqVec.end(), [](int &n) {
    n = is_prime(n) ? n : 0;
});
auto endSeq = std::chrono::high_resolution_clock::now();

std::chrono::duration< double > durationSeq = endSeq - startSeq;

We print out a comparison and the first 10 elements of each array.

    std::print("Time taken sequentially : {} seconds\n", durationSeq.count());
std::print("Time taken in parallel  : {} seconds\n", durationPar.count());

std::print("Speedup: {}x\n", durationSeq.count() / durationPar.count());
std::print("Data size in megabytes: {} MB\n", (N * sizeof(int)) / (1024.0 * 1024.0));

    std::print("First 10 elements after parallel for_each: ");

for (auto n : parVec | std::views::take(10))
{
    std::print("{} ", n);
}
std::print("\n");

    std::print("First 10 elements after sequential for_each: ");
for (auto n : seqVec | std::views::take(10))
{
    std::print("{} ", n);
}
std::print("\n");

Note

The print loops use another nice features in the <ranges> include of the standard library, std::view::take() that returns a view of up to a certain number of elements from a sequence.

The output from the run gives us the following output:

Time taken sequentially : 2.4123562 seconds
Time taken in parallel  : 0.4349576 seconds
Speedup: 5.546187030643907x
Data size in megabytes: 38.14697265625 MB
First 10 elements after parallel for_each: 0 0 2 3 0 5 0 7 0 0
First 10 elements after sequential for_each: 0 0 2 3 0 5 0 7 0 0

This example was run on a 6 core AMD processor. A 5.5x speedup is a good result considering that you only change a single parameter in the call to the standard library function. Important to note though is that the tasks performed needs to be compute-bound otherwise the scaling will be bad.

The complete code is available below:

#include <algorithm>
#include <chrono>
#include <execution>
#include <print>
#include <ranges>
#include <vector>

bool is_prime(int num)
{
    if (num < 2)
        return false;

    for (int i = 2; i * i <= num; ++i)
    {
        if (num % i == 0)
            return false;
    }
    return true;
};

int main()
{
    const int N = 10000000;

    std::vector< int > seqVec(N);
    std::vector< int > parVec(N);

    std::generate(seqVec.begin(), seqVec.end(), [n = 0]() mutable { return n++; });
    std::generate(parVec.begin(), parVec.end(), [n = 0]() mutable { return n++; });

    auto startPar = std::chrono::high_resolution_clock::now();
    std::for_each(std::execution::par_unseq, parVec.begin(), parVec.end(), [](int &n) { n = is_prime(n) ? n : 0; });
    auto endPar = std::chrono::high_resolution_clock::now();

    std::chrono::duration< double > durationPar = endPar - startPar;

    auto startSeq = std::chrono::high_resolution_clock::now();
    std::for_each(std::execution::seq, seqVec.begin(), seqVec.end(), [](int &n) { n = is_prime(n) ? n : 0; });
    auto endSeq = std::chrono::high_resolution_clock::now();

    std::chrono::duration< double > durationSeq = endSeq - startSeq;

    std::print("Time taken sequentially : {} seconds\n", durationSeq.count());
    std::print("Time taken in parallel  : {} seconds\n", durationPar.count());

    std::print("Speedup: {}x\n", durationSeq.count() / durationPar.count());
    std::print("Data size in megabytes: {} MB\n", (N * sizeof(int)) / (1024.0 * 1024.0));

    std::print("First 10 elements after parallel for_each: ");

    for (auto n : parVec | std::views::take(10))
    {
        std::print("{} ", n);
    }
    std::print("\n");

    std::print("First 10 elements after sequential for_each: ");
    for (auto n : seqVec | std::views::take(10))
    {
        std::print("{} ", n);
    }
    std::print("\n");

    return 0;
}

Try example (only single thread…)

Parallel sorting

Another operation than can benefit from parallel execution is sorting. In this example we are going to sort a long random vectors of integers. First we need to generate 2 long random vectors. For this we are going to use the modern random number generation functions in the random include header. First we define a random number generator in this case we use the mersenne twister generator, std::mt19937. To get our distribution we use the std::uniform_int_distribution<>. We use the std::generate() function to assign random numbers to our vector. To be able to compare the parallel sorting algorithm with the sequential version we copy the random numbers from the parData to seqData.

#include <vector>
#include <algorithm>
#include <execution>
#include <chrono>
#include <print>
#include <random>

int main()
{
    const int N = 100000000;
    std::vector< int > parData(100000000);
    std::vector< int > seqData(100000000);

    // Initialize the vector with some values

    std::print("Initializing data...\n");

    std::mt19937 gen(42); // Fixed seed for reproducibility
    std::uniform_int_distribution<> dis(1, 10000000);
    std::generate(parData.begin(), parData.end(), [&]() { return dis(gen); });
    std::copy(parData.begin(), parData.end(), seqData.begin());

Now we use the standard library function std::sort() with the execution policy std::execution::par. We time it and calcualte the run duration.

auto startPar = std::chrono::high_resolution_clock::now();
std::sort(std::execution::par, parData.begin(), parData.end());
    auto endPar = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> durationPar = endPar - startPar;

For the sequential comparison we just change the execution policy to std::execution::seq.

auto startSeq = std::chrono::high_resolution_clock::now();
std::sort(std::execution::seq, seqData.begin(), seqData.end());
auto endSeq = std::chrono::high_resolution_clock::now();
std::chrono::duration< double > durationSeq = endSeq - startSeq;

To validate that the sorting was performed correctly we can use the standard library function std::is_sorted(). This function can also be provided an execution policy, so we do that.

std::is_sorted(std::execution::par, parData.begin(), parData.end()) ?
    std::print("Parallel sort successful.\n") :
    std::print("Parallel sort failed.\n");

std::is_sorted(std::execution::par, seqData.begin(), seqData.end()) ?
    std::print("Sequential sort successful.\n") :
    std::print("Sequential sort failed.\n");

    std::print("Parallel sort time: {} seconds\n", durationPar.count());
std::print("Sequential sort time: {} seconds\n", durationSeq.count());
std::print("Speedup: {:.2f}x\n", durationSeq.count() / durationPar.count());

Running the code also shows a good speedup compared to the sequential version.

Initializing data...
Starting sort operations...
Verifying sorted data...
Parallel sort successful.
Sequential sort successful.
Parallel sort time: 1.3105185 seconds
Sequential sort time: 7.7848976 seconds
Speedup: 5.94x
#include <algorithm>
#include <chrono>
#include <execution>
#include <print>
#include <random>
#include <vector>

int main()
{
    const int N = 100000000;
    std::vector< int > parData(N);
    std::vector< int > seqData(N);

    // Initialize the vector with some values

    std::print("Initializing data...\n");

    std::mt19937 gen(42); // Fixed seed for reproducibility
    std::uniform_int_distribution<> dis(1, N);
    std::generate(parData.begin(), parData.end(), [&]() { return dis(gen); });
    std::copy(parData.begin(), parData.end(), seqData.begin());

    std::print("Starting sort operations...\n");

    // Sort the vector in parallel
    auto startPar = std::chrono::high_resolution_clock::now();
    std::sort(std::execution::par, parData.begin(), parData.end());
    auto endPar = std::chrono::high_resolution_clock::now();
    std::chrono::duration< double > durationPar = endPar - startPar;

    // Sort the vector sequentially
    auto startSeq = std::chrono::high_resolution_clock::now();
    std::sort(std::execution::seq, seqData.begin(), seqData.end());
    auto endSeq = std::chrono::high_resolution_clock::now();
    std::chrono::duration< double > durationSeq = endSeq - startSeq;

    // Check if sort opereration was successful

    std::print("Verifying sorted data...\n");

    std::is_sorted(std::execution::par, parData.begin(), parData.end()) ? std::print("Parallel sort successful.\n")
                                                                        : std::print("Parallel sort failed.\n");

    std::is_sorted(std::execution::par, seqData.begin(), seqData.end()) ? std::print("Sequential sort successful.\n")
                                                                        : std::print("Sequential sort failed.\n");

    std::print("Parallel sort time: {} seconds\n", durationPar.count());
    std::print("Sequential sort time: {} seconds\n", durationSeq.count());
    std::print("Speedup: {:.2f}x\n", durationSeq.count() / durationPar.count());

    return 0;
}

Try example (only single thread…)

Parallel transform

In the final example we are going to look at is how to use the std::transform() function from the standard library. This function applies a tranform on an input vector and transforms it either to itself or to a result array. For the parallel algorithm to be effective it needs to be compute bound otherwise the sequential version is probarbly more efficient.

In the example we will be applying some heavy floating point operations to each element of the array and write back the result to the same element. We first initalise our data structure:

int main()
{
    std::vector<double> dataSeq(size);
    std::vector<double> dataPar(size);

    // Initialize with simple values first
    std::iota(dataSeq.begin(), dataSeq.end(), 0.0);
    std::iota(dataPar.begin(), dataPar.end(), 0.0);

Next we define our computational function as a lambda function.

// Computationally intensive transformation
auto heavyTransform = [](double x) {
    double result = x;
    for (int i = 0; i < 100; ++i) {
        result = std::sin(result) * std::cos(result);
    }
    return result;
};

Now we execute the std::transform() function with a parallel execution policy.

auto startPar = std::chrono::high_resolution_clock::now();
std::transform(std::execution::par, dataPar.begin(), dataPar.end(),
                dataPar.begin(), heavyTransform);
auto endPar = std::chrono::high_resolution_clock::now();

The fourth parameter of the call could be replaced by a result vector as well. We then repeat for the sequential version as well.

auto startSeq = std::chrono::high_resolution_clock::now();
std::transform(std::execution::seq, dataSeq.begin(), dataSeq.end(),
                dataSeq.begin(), heavyTransform);
auto endSeq = std::chrono::high_resolution_clock::now();

We then calcuate the execution times and compare the vectors using the std::equal() function of the standard library.

auto durationSeq = std::chrono::duration<double>(endSeq - startSeq).count();
auto durationPar = std::chrono::duration<double>(endPar - startPar).count();

std::println("========================================");
std::println("Data size: {}", size);
std::println("Data size in MB: {:.2f}", size * sizeof(double) / (1024.0 * 1024.0));
std::println("Sequential time: {:.4f} s", durationSeq);
std::println("Parallel time: {:.4f} s", durationPar);

if (durationPar > 0) {
    std::println("Speedup: {:.2f}x", durationSeq / durationPar);
    std::println("Efficiency: {:.1f}%", (durationSeq / durationPar / std::thread::hardware_concurrency()) * 100);
}

// Verify results match
bool resultsMatch = std::equal(dataSeq.begin(), dataSeq.end(), dataPar.begin(),
    [](double a, double b) { return std::abs(a - b) < 1e-10; });
std::println("Results match: {}", resultsMatch ? "Yes" : "No");

In the full example in the source code tree we also iterate over different problem sizes to see how the speedup depends on the problem size. Running the example we get:

========================================
Data size: 10000
Data size in MB: 0.08
Sequential time: 0.0084 s
Parallel time: 0.0016 s
Speedup: 5.42x
Efficiency: 45.2%
Results match: Yes
========================================
Data size: 100000
Data size in MB: 0.76
Sequential time: 0.0844 s
Parallel time: 0.0097 s
Speedup: 8.66x
Efficiency: 72.2%
Results match: Yes
========================================
Data size: 1000000
Data size in MB: 7.63
Sequential time: 0.8038 s
Parallel time: 0.0810 s
Speedup: 9.93x
Efficiency: 82.7%
Results match: Yes
========================================
Data size: 10000000
Data size in MB: 76.29
Sequential time: 8.0549 s
Parallel time: 0.7785 s
Speedup: 10.35x
Efficiency: 86.2%
Results match: Yes
========================================
Data size: 100000000
Data size in MB: 762.94
Sequential time: 85.5011 s
Parallel time: 8.5313 s
Speedup: 10.02x
Efficiency: 83.5%
Results match: Yes

We can clearly see that having larger problem sizes and heavier computational loads benefits parallelisation. This is due to the large overhead of starting up threads.

The complete example can be found here:

#include <vector>
#include <array>
#include <algorithm>
#include <execution>
#include <chrono>
#include <print>
#include <random>
#include <cmath>                                                                                                                                                                                                                                                                                                            
#include <thread>

int main() 
{
    std::array dataSizes = {10000, 100000, 1000000, 10000000, 100000000};

    for (const auto& size : dataSizes)
    {
        std::vector<double> dataSeq(size);
        std::vector<double> dataPar(size);

        // Initialize with simple values first
        std::iota(dataSeq.begin(), dataSeq.end(), 0.0);                                                     
        std::iota(dataPar.begin(), dataPar.end(), 0.0);

        // Computationally intensive transformation
        auto heavyTransform = [](double x) {
            double result = x;
            for (int i = 0; i < 100; ++i) {
                result = std::sin(result) * std::cos(result);
            }
            return result;
        };

        // Sequential execution
        auto startSeq = std::chrono::high_resolution_clock::now();
        std::transform(std::execution::seq, dataSeq.begin(), dataSeq.end(),                                                                                                                                                                                                     
                      dataSeq.begin(), heavyTransform);
        auto endSeq = std::chrono::high_resolution_clock::now();

        // Parallel execution
        auto startPar = std::chrono::high_resolution_clock::now();
        std::transform(std::execution::par, dataPar.begin(), dataPar.end(), 
                      dataPar.begin(), heavyTransform);
        auto endPar = std::chrono::high_resolution_clock::now();

        auto durationSeq = std::chrono::duration<double>(endSeq - startSeq).count();
        auto durationPar = std::chrono::duration<double>(endPar - startPar).count();

        std::println("========================================");
        std::println("Data size: {}", size);
        std::println("Data size in MB: {:.2f}", size * sizeof(double) / (1024.0 * 1024.0));
        std::println("Sequential time: {:.4f} s", durationSeq);
        std::println("Parallel time: {:.4f} s", durationPar);
        
        if (durationPar > 0) {
            std::println("Speedup: {:.2f}x", durationSeq / durationPar);
            std::println("Efficiency: {:.1f}%", (durationSeq / durationPar / std::thread::hardware_concurrency()) * 100);
        }
        
        // Verify results match
        bool resultsMatch = std::equal(dataSeq.begin(), dataSeq.end(), dataPar.begin(),
            [](double a, double b) { return std::abs(a - b) < 1e-10; });
        std::println("Results match: {}", resultsMatch ? "Yes" : "No");
    }

    return 0;
}

Real-World Example: Parallel Stencil Computation

Best Practices and Common Pitfalls