基于 C++ STL 和 pthread 实现一个多线程并发访问的队列

请注意,本文编写于 196 天前,最后修改于 150 天前,其中某些信息可能已经过时。

实现一个多线程并发访问的队列

Implement a multi-access threaded queue with multiple threads inserting and multiple threads extracting from the queue. Use mutex-locks to synchronize access to the queue. Document the time for 1000 insertions and 1000 extractions each by 64 insertions threads (Producers) and 64 extraction threads (Consumer).

  • 语言限制:C/C++
  • PS:不能直接使用STL中现有的并发访问队列,请基于普通的queue或自行实现。

解决方案

基本思路

使用 pthread 来完成本项目。

对 C++ STL 中的 queue 进行封装,增加线程互斥的功能,使其成为线程安全的队列。方法是从 queue 继承一个子类ThreadedQueue,然后覆盖其中的读写函数,每次对队列进行读或写操作前都要先“加锁”,操作完成后“解锁”。

在驱动程序中,编写producerconsumer函数,分别进行 1000 次入队和出队操作,并记录所用时间,将其保存在数组producer_timeproducer_time中。在 main 函数中,创建并执行 64 个 producer 线程和 64 个 consumer 线程。

最后显示每个线程所用的时间。由于所有线程总共进行了 64000 次入队操作和 64000 次出队操作,因此最终队列应该为空。

ThreadedQueue 类

查阅 STL 文档,对标准库 queue 类中的函数全部进行覆盖。

template <typename T>
class ThreadedQueue : public queue<T> {
public:
    ThreadedQueue();
    ~ThreadedQueue();
    bool empty() const;
    size_t size() const;
    void push(const T& val);
    void push(T& val);
    bool pop();
    T& front();
    const T& front() const;
    T& back();
    const T& back() const;

private:
    mutable pthread_mutex_t queue_lock;
};

需注意queue_lock必须是 mutable 的,这是因为类中存在的 const 方法需要对锁的状态进行修改。锁的初始化和销毁分别在类的构造函数和析构函数中进行。

在每个操作函数中,使用以下两条语句来加锁或解锁:

pthread_mutex_lock(&queue_lock);    // 加锁
pthread_mutex_unlock(&queue_lock);  // 解锁

我改写了 STL 中的pop()方法,使其返回 bool 而不是 void。在出队之前,先判断队列是否为空,如果为空则直接返回 false;如果不控才将一个元素出队并返回 true。其实现如下:

template <typename T>
bool ThreadedQueue<T>::pop() {
    pthread_mutex_lock(&queue_lock);
    bool result = false;
    if(!queue<T>::empty()) {
        queue<T>::pop();
        result = true;
    }
    pthread_mutex_unlock(&queue_lock);
    return result;
}

其它函数的实现较为简单,本文档中省略,源代码见ThreadedQueue.cpp文件。

producer 和 consumer 线程

定义常量(个人不喜欢用 define 定义常量,因此使用 const 变量的方法来代替):

const int THREAD_NUM = 64;        // 线程数
const int OPERATION_NUM = 1000;   // 线程内的操作次数

声明三个全局变量,其作用如注释:

ThreadedQueue<int> que;  // 线程安全队列
double producer_time[THREAD_NUM];  // 线程计时
double consumer_time[THREAD_NUM];  // 线程计时

producer 线程中做 1000 次push,并记录时间。

void* producer(void* arg)
{
    auto start = std::chrono::steady_clock::now();
    int num = *(int*)arg;
    for(int i = 0; i < OPERATION_NUM; i++) {
        que.push(i);
    }
    auto end = std::chrono::steady_clock::now();
    std::chrono::duration<double> diff = end-start;
    producer_time[num] = diff.count();
}

consumer 线程中做 1000 次pop,并记录时间。当一次pop返回 false 的时候,代表队列为空,出队失败,因此需要将循环变量 i 减去 1,表示当次循环无效,需要重新进行。

void* consumer(void* arg)
{
    auto start = std::chrono::steady_clock::now();
    int num = *(int*)arg;
    for(int i = 0; i < OPERATION_NUM; i++) {
        if(que.pop() == false) {  // 队列为空时出队失败
            i--;  // 计数减1
        }
    }
    auto end = std::chrono::steady_clock::now();
    std::chrono::duration<double> diff = end-start;
    consumer_time[num] = diff.count();
}

计时函数需要引入头文件<chrono>

驱动程序

创建 64 个 producer 及 consumer 线程,然后等待所有线程结束后显示每个线程的运行时间。最后打印队列的 size 确认执行结果。

int main()
{
    pthread_t tids[THREAD_NUM];  // 存放线程ID
    int args[THREAD_NUM];

    for (int i = 0; i < THREAD_NUM; i++)
    {
        args[i] = i;
        pthread_create(&tids[i], NULL, producer, (void*)&args[i]);  // 创建producer线程
        pthread_create(&tids[i], NULL, consumer, (void*)&args[i]);  // 创建comsumer线程
    }

    for(int i = 0; i < THREAD_NUM; i++) {
        pthread_join(tids[i], NULL);
    }

    cout << "Time costs by threads:" << endl;
    for(int i = 0; i < THREAD_NUM; i++) {
        cout << "Producer No." << i << ": " << producer_time[i]<< endl;
    }
    for(int i = 0; i < THREAD_NUM; i++) {
        cout << "Consumer No." << i << ": " << consumer_time[i] << endl;
    }
    cout << "After run: queue size is " << que.size() << endl;

    return 0;
}

实验结果

Time costs by threads:
Producer No.0: 0.0035453
Producer No.1: 0.0097541
Producer No.2: 0.0039374
Producer No.3: 0.0046643
Producer No.4: 0.0011017
Producer No.5: 0.0115519
Producer No.6: 0.029984
Producer No.7: 0.0124825
Producer No.8: 0.0319004
Producer No.9: 0.0091504
Producer No.10: 0.0092195
Producer No.11: 0.0098956
Producer No.12: 0.016065
Producer No.13: 0.0104092
Producer No.14: 0.0120921
Producer No.15: 0.024926
Producer No.16: 0.0277262
Producer No.17: 0.0019175
Producer No.18: 0.0071028
Producer No.19: 0.0150646
Producer No.20: 0.0390921
Producer No.21: 0.0235649
Producer No.22: 0.0211355
Producer No.23: 0.0382783
Producer No.24: 0.027969
Producer No.25: 0.0005259
Producer No.26: 0.0163282
Producer No.27: 0.0182741
Producer No.28: 0.0082836
Producer No.29: 0.0274859
Producer No.30: 0.0373441
Producer No.31: 0.0108248
Producer No.32: 0.0328676
Producer No.33: 0.0216773
Producer No.34: 0.0332299
Producer No.35: 0.0150974
Producer No.36: 0.0250234
Producer No.37: 0.0279427
Producer No.38: 0.0364425
Producer No.39: 0.030739
Producer No.40: 0.0245329
Producer No.41: 0.0127556
Producer No.42: 0.0303114
Producer No.43: 0.0135522
Producer No.44: 0.0268419
Producer No.45: 0.0119628
Producer No.46: 0.0075869
Producer No.47: 0.0033807
Producer No.48: 0.0304128
Producer No.49: 0.0201584
Producer No.50: 0.030793
Producer No.51: 0.025274
Producer No.52: 0.0135567
Producer No.53: 0.0231418
Producer No.54: 0.0222596
Producer No.55: 0.0241113
Producer No.56: 0.0081788
Producer No.57: 0.0125965
Producer No.58: 0.0185869
Producer No.59: 0.0131793
Producer No.60: 0.0149767
Producer No.61: 0.0095411
Producer No.62: 0.0142676
Producer No.63: 0.0086034
Consumer No.0: 0.0082464
Consumer No.1: 0.0025075
Consumer No.2: 0.0112725
Consumer No.3: 0.0054444
Consumer No.4: 0.0120974
Consumer No.5: 0.0129196
Consumer No.6: 0.0077034
Consumer No.7: 0.0118207
Consumer No.8: 0.0119351
Consumer No.9: 0.0191973
Consumer No.10: 0.0072163
Consumer No.11: 0.040856
Consumer No.12: 0.0162112
Consumer No.13: 0.0081399
Consumer No.14: 0.0094534
Consumer No.15: 0.0351553
Consumer No.16: 0.0017596
Consumer No.17: 0.0472846
Consumer No.18: 0.0046676
Consumer No.19: 0.0075435
Consumer No.20: 0.0281079
Consumer No.21: 0.0124427
Consumer No.22: 0.0132705
Consumer No.23: 0.0398564
Consumer No.24: 0.0276478
Consumer No.25: 0.037193
Consumer No.26: 0.0156065
Consumer No.27: 0.0095608
Consumer No.28: 0.0185973
Consumer No.29: 0.0280992
Consumer No.30: 0.0212333
Consumer No.31: 0.028526
Consumer No.32: 0.0328673
Consumer No.33: 0.0164981
Consumer No.34: 0.0265682
Consumer No.35: 0.0329799
Consumer No.36: 0.0305522
Consumer No.37: 0.016546
Consumer No.38: 0.0316502
Consumer No.39: 0.0267266
Consumer No.40: 0.0311787
Consumer No.41: 0.036088
Consumer No.42: 0.0288491
Consumer No.43: 0.0343447
Consumer No.44: 0.0292084
Consumer No.45: 0.0280437
Consumer No.46: 0.0189063
Consumer No.47: 0.0311268
Consumer No.48: 0.0021924
Consumer No.49: 0.0310998
Consumer No.50: 0.0152394
Consumer No.51: 0.0319721
Consumer No.52: 0.0179619
Consumer No.53: 0.0215205
Consumer No.54: 0.0172351
Consumer No.55: 0.0005207
Consumer No.56: 0.0110845
Consumer No.57: 0.0186043
Consumer No.58: 0.0120241
Consumer No.59: 0.0140537
Consumer No.60: 0.0104268
Consumer No.61: 0.0131224
Consumer No.62: 0.0091303
Consumer No.63: 0.0123637
After run: queue size is 0

最后,队列的 size 为 0,说明总入队次数等于总出队次数,符合预期。

完整代码

本地下载:pthread_ThreadedQueue.zip


Comments

添加新评论

已有 9 条评论

这不还是串行访问队列,也没有并行啊

Jed Jed 回复 @jack

`pthread_create`创建的线程并行运行

1. 组合优于继承,不推荐直接继承std::queue。
2. 既然都C++了,那就用std::thread,std::mutex,std::lock_guard呗。
3. 如果是多读单写的话,所有读操作都需要互斥吗?
4. 模板声明和定义分开写,这样真能编译过?...

Jed Jed 回复 @Tommy

非常感谢你的指点!在并行编程上我还没有什么经验,因此代码质量较低
关于第4条,分开来是不能编译的,我当时犯过这个错误,后来改为将声明和定义都放在`ThreadedQueue.h`中就可以了。博客里上传了错误的版本,非常抱歉,现已修复。

Tommy Tommy 回复 @Jed

没事哈,只是想到Matrix后台以前也有过类似的阻塞队列,所以就点进来看了。不过后来由于种种原因就重构了。

Jed Jed 回复 @Tommy

噢噢!原来是Matrix的大佬

编译通不过啊,函数undefined

Jed Jed 回复 @J2

加上`-pthread`参数

J2 J2 回复 @Jed

解决了,确实是这个问题