这是一道作业题,题目如下:
Implement a multi-access threaded queue with multiple threads inserting and multiple threads extracting from the queue. Use mutex-locks to synchronize access to the queue. Document the time for 1000 insertions and 1000 extractions each by 64 insertions threads (Producers) and 64 extraction threads (Consumer).
- 语言限制:C/C++
- PS:不能直接使用STL中现有的并发访问队列,请基于普通的queue或自行实现。
完整代码下载地址(仅供参考):pthread_ThreadedQueue.zip
基本思路
使用 pthread 来完成本项目。
对 C++ STL 中的 queue 进行封装,增加线程互斥的功能,使其成为线程安全的队列。方法是从 queue 继承一个子类ThreadedQueue
,然后覆盖其中的读写函数,每次对队列进行读或写操作前都要先“加锁”,操作完成后“解锁”。
在驱动程序中,编写producer
和consumer
函数,分别进行 1000 次入队和出队操作,并记录所用时间,将其保存在数组producer_time
和producer_time
中。在 main 函数中,创建并执行 64 个 producer 线程和 64 个 consumer 线程。
最后显示每个线程所用的时间。由于所有线程总共进行了 64000 次入队操作和 64000 次出队操作,因此最终队列应该为空。
ThreadedQueue 类
查阅 STL 文档,对标准库 queue 类中的函数全部进行覆盖。
template <typename T>
class ThreadedQueue : public queue<T> {
public:
ThreadedQueue();
~ThreadedQueue();
bool empty() const;
size_t size() const;
void push(const T& val);
void push(T& val);
bool pop();
T& front();
const T& front() const;
T& back();
const T& back() const;
private:
mutable pthread_mutex_t queue_lock;
};
需注意queue_lock
必须是 mutable 的,这是因为类中存在的 const 方法需要对锁的状态进行修改。锁的初始化和销毁分别在类的构造函数和析构函数中进行。
在每个操作函数中,使用以下两条语句来加锁或解锁:
pthread_mutex_lock(&queue_lock); // 加锁
pthread_mutex_unlock(&queue_lock); // 解锁
我改写了 STL 中的pop()
方法,使其返回 bool 而不是 void。在出队之前,先判断队列是否为空,如果为空则直接返回 false;如果不控才将一个元素出队并返回 true。其实现如下:
template <typename T>
bool ThreadedQueue<T>::pop() {
pthread_mutex_lock(&queue_lock);
bool result = false;
if(!queue<T>::empty()) {
queue<T>::pop();
result = true;
}
pthread_mutex_unlock(&queue_lock);
return result;
}
其它函数的实现较为简单,本文档中省略,源代码见ThreadedQueue.cpp
文件。
本文地址:https://www.jeddd.com/article/cpp-pthread-multi-access-threaded-queue.html
producer 和 consumer 线程
定义常量(个人不喜欢用 define 定义常量,因此使用 const 变量的方法来代替):
const int THREAD_NUM = 64; // 线程数
const int OPERATION_NUM = 1000; // 线程内的操作次数
声明三个全局变量,其作用如注释:
ThreadedQueue<int> que; // 线程安全队列
double producer_time[THREAD_NUM]; // 线程计时
double consumer_time[THREAD_NUM]; // 线程计时
producer 线程中做 1000 次push
,并记录时间。
void* producer(void* arg)
{
auto start = std::chrono::steady_clock::now();
int num = *(int*)arg;
for(int i = 0; i < OPERATION_NUM; i++) {
que.push(i);
}
auto end = std::chrono::steady_clock::now();
std::chrono::duration<double> diff = end-start;
producer_time[num] = diff.count();
}
consumer 线程中做 1000 次pop
,并记录时间。当一次pop
返回 false 的时候,代表队列为空,出队失败,因此需要将循环变量 i 减去 1,表示当次循环无效,需要重新进行。
void* consumer(void* arg)
{
auto start = std::chrono::steady_clock::now();
int num = *(int*)arg;
for(int i = 0; i < OPERATION_NUM; i++) {
if(que.pop() == false) { // 队列为空时出队失败
i--; // 计数减1
}
}
auto end = std::chrono::steady_clock::now();
std::chrono::duration<double> diff = end-start;
consumer_time[num] = diff.count();
}
计时函数需要引入头文件<chrono>
。
驱动程序
创建 64 个 producer 及 consumer 线程,然后等待所有线程结束后显示每个线程的运行时间。最后打印队列的 size 确认执行结果。
int main()
{
pthread_t tids[THREAD_NUM]; // 存放线程ID
int args[THREAD_NUM];
for (int i = 0; i < THREAD_NUM; i++)
{
args[i] = i;
pthread_create(&tids[i], NULL, producer, (void*)&args[i]); // 创建producer线程
pthread_create(&tids[i], NULL, consumer, (void*)&args[i]); // 创建comsumer线程
}
for(int i = 0; i < THREAD_NUM; i++) {
pthread_join(tids[i], NULL);
}
cout << "Time costs by threads:" << endl;
for(int i = 0; i < THREAD_NUM; i++) {
cout << "Producer No." << i << ": " << producer_time[i]<< endl;
}
for(int i = 0; i < THREAD_NUM; i++) {
cout << "Consumer No." << i << ": " << consumer_time[i] << endl;
}
cout << "After run: queue size is " << que.size() << endl;
return 0;
}
实验结果
Time costs by threads:
Producer No.0: 0.0035453
Producer No.1: 0.0097541
Producer No.2: 0.0039374
Producer No.3: 0.0046643
Producer No.4: 0.0011017
Producer No.5: 0.0115519
Producer No.6: 0.029984
Producer No.7: 0.0124825
Producer No.8: 0.0319004
Producer No.9: 0.0091504
Producer No.10: 0.0092195
Producer No.11: 0.0098956
Producer No.12: 0.016065
Producer No.13: 0.0104092
Producer No.14: 0.0120921
Producer No.15: 0.024926
Producer No.16: 0.0277262
Producer No.17: 0.0019175
Producer No.18: 0.0071028
Producer No.19: 0.0150646
Producer No.20: 0.0390921
Producer No.21: 0.0235649
Producer No.22: 0.0211355
Producer No.23: 0.0382783
Producer No.24: 0.027969
Producer No.25: 0.0005259
Producer No.26: 0.0163282
Producer No.27: 0.0182741
Producer No.28: 0.0082836
Producer No.29: 0.0274859
Producer No.30: 0.0373441
Producer No.31: 0.0108248
Producer No.32: 0.0328676
Producer No.33: 0.0216773
Producer No.34: 0.0332299
Producer No.35: 0.0150974
Producer No.36: 0.0250234
Producer No.37: 0.0279427
Producer No.38: 0.0364425
Producer No.39: 0.030739
Producer No.40: 0.0245329
Producer No.41: 0.0127556
Producer No.42: 0.0303114
Producer No.43: 0.0135522
Producer No.44: 0.0268419
Producer No.45: 0.0119628
Producer No.46: 0.0075869
Producer No.47: 0.0033807
Producer No.48: 0.0304128
Producer No.49: 0.0201584
Producer No.50: 0.030793
Producer No.51: 0.025274
Producer No.52: 0.0135567
Producer No.53: 0.0231418
Producer No.54: 0.0222596
Producer No.55: 0.0241113
Producer No.56: 0.0081788
Producer No.57: 0.0125965
Producer No.58: 0.0185869
Producer No.59: 0.0131793
Producer No.60: 0.0149767
Producer No.61: 0.0095411
Producer No.62: 0.0142676
Producer No.63: 0.0086034
Consumer No.0: 0.0082464
Consumer No.1: 0.0025075
Consumer No.2: 0.0112725
Consumer No.3: 0.0054444
Consumer No.4: 0.0120974
Consumer No.5: 0.0129196
Consumer No.6: 0.0077034
Consumer No.7: 0.0118207
Consumer No.8: 0.0119351
Consumer No.9: 0.0191973
Consumer No.10: 0.0072163
Consumer No.11: 0.040856
Consumer No.12: 0.0162112
Consumer No.13: 0.0081399
Consumer No.14: 0.0094534
Consumer No.15: 0.0351553
Consumer No.16: 0.0017596
Consumer No.17: 0.0472846
Consumer No.18: 0.0046676
Consumer No.19: 0.0075435
Consumer No.20: 0.0281079
Consumer No.21: 0.0124427
Consumer No.22: 0.0132705
Consumer No.23: 0.0398564
Consumer No.24: 0.0276478
Consumer No.25: 0.037193
Consumer No.26: 0.0156065
Consumer No.27: 0.0095608
Consumer No.28: 0.0185973
Consumer No.29: 0.0280992
Consumer No.30: 0.0212333
Consumer No.31: 0.028526
Consumer No.32: 0.0328673
Consumer No.33: 0.0164981
Consumer No.34: 0.0265682
Consumer No.35: 0.0329799
Consumer No.36: 0.0305522
Consumer No.37: 0.016546
Consumer No.38: 0.0316502
Consumer No.39: 0.0267266
Consumer No.40: 0.0311787
Consumer No.41: 0.036088
Consumer No.42: 0.0288491
Consumer No.43: 0.0343447
Consumer No.44: 0.0292084
Consumer No.45: 0.0280437
Consumer No.46: 0.0189063
Consumer No.47: 0.0311268
Consumer No.48: 0.0021924
Consumer No.49: 0.0310998
Consumer No.50: 0.0152394
Consumer No.51: 0.0319721
Consumer No.52: 0.0179619
Consumer No.53: 0.0215205
Consumer No.54: 0.0172351
Consumer No.55: 0.0005207
Consumer No.56: 0.0110845
Consumer No.57: 0.0186043
Consumer No.58: 0.0120241
Consumer No.59: 0.0140537
Consumer No.60: 0.0104268
Consumer No.61: 0.0131224
Consumer No.62: 0.0091303
Consumer No.63: 0.0123637
After run: queue size is 0
最后,队列的 size 为 0,说明总入队次数等于总出队次数,符合预期。
本文地址:https://www.jeddd.com/article/cpp-pthread-multi-access-threaded-queue.html
这不还是串行访问队列,也没有并行啊
pthread_create创建的线程并行运行
谢谢指教~
关于第4条,分开来是不能编译的,现已修复
没事哈,只是想到Matrix后台以前也有过类似的阻塞队列,所以就点进来看了。不过后来由于种种原因就重构了。
噢噢!原来是Matrix的大佬
编译通不过啊,函数undefined
加上-pthread参数