Producer-Consumer C++ Multithreading

Today’s topic revolves around synchronizing two threads in a producer and consumer scenario. While semaphores are a viable option for achieving this, we will explore the use of C++ conditional variables for the same purpose. This post will be concise and to the point.

This solution is designed to address a scenario where the consumer operates at a faster pace than the producer. In a subsequent post, I will delve into the opposite situation where the producer outpaces the consumer, and I will also touch upon the concept of lock-free programming.

What is the Problem?

Let’s begin by defining the problem before presenting a solution. Imagine we have a task that comprises the following subtasks, each with a specified time:

  1. Reading data from peripheral A, which takes 20 ms.
  2. Sending this data to another peripheral B, which takes 10 ms.

If you were to implement this code without multithreading, the total time taken for the data to move from peripheral A to peripheral B would be 30 ms. So, how can we make this process faster?

Solution:

Instead of executing these subtasks sequentially, we can run them concurrently. This means that while one thread is sending data to peripheral B, another thread can simultaneously start reading from peripheral A. By doing so, we effectively eliminate the time spent waiting for data transmission to peripheral B, reducing the total data movement time from 30 ms to 20 ms.

So, to ensure the proper functioning of this concurrent approach, we need to synchronize the two threads. When thread 2 (the consumer) finishes moving data to peripheral B, it should wait for thread 1 (the producer) to provide new data before proceeding. This synchronization ensures that the threads work seamlessly together to optimize the data movement process.

Solution

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>


// shared param between threads
int g_counter = 0;

// params for thread sync

// mutex to protect shared data
std::mutex g_dataMtx;
// conditional variable
std::condition_variable g_condVar;
// cond variable mutex
std::mutex g_condVarMtx;
// cond variable flag
std::atomic<bool> g_condVarFlag;


// thread function for producer
void producer() {
	while (1) {

		// produce shared data
		g_dataMtx.lock();
		++g_counter;
		g_dataMtx.unlock();

		// notify consumer thread about new shared data
		std::unique_lock lk(g_condVarMtx);
		g_condVarFlag.store(true);
		g_condVar.notify_all();
		lk.unlock();

		// To keep producer slower than consumer.
		std::this_thread::sleep_for(std::chrono::milliseconds(1000));
	}
}

// thread function for consumer 
void consumer(){

	while (true) {

		// check if there is no ready data and wait
		if (!g_condVarFlag.load()) {
			std::unique_lock lk(g_condVarMtx);
			while (!g_condVarFlag.load())
				g_condVar.wait(lk);
		}

		// use shared data
		g_dataMtx.lock();
		std::cout << "counter : " << g_counter << std::endl;
		g_dataMtx.unlock();

		// reset flag for preventing reusage of current data
		g_condVarFlag.store(false);
	}
}

int main(){

	std::thread producerThread(producer);
	std::thread consumerThread(consumer);
	producerThread.join();
	consumerThread.join();
	return 0;
}

I won’t delve into the intricacies of how conditional variables work in this post. The primary focus here is to demonstrate the concept of synchronization as an essential element of the solution.

The key point I want to emphasize is the reason behind introducing another parameter called g_condVarFlag. There are two primary motivations for this inclusion.

Firstly, in situations where producers provide new data before the consumer returns, it’s essential to determine whether the consumer should wait for fresh data or use the shared data directly.

Secondly, based on information I’ve come across and advice from my supervisor, it’s worth noting that conditional variables may not always be completely robust. They might occasionally receive false signals and prematurely stop waiting. To mitigate this potential issue, we incorporate the g_condVarFlag check when breaking the waiting condition. This check ensures that there is indeed new data available before the consumer proceeds.

It’s important to mention that, although my personal experience with conditional variables hasn’t shown false signals in a few examples, including the g_condVarFlag check adds an extra layer of safety, accounting for potential variations in platforms, operating systems, or other unforeseen factors.

Leave a comment