一个例子讲清楚线程间同步、互斥量、条件变量、队列、内存池

前段时间有朋友想要了解一下多线程编程,正好有个项目上有这么个例子可以抽出来讲一讲。只要搞清楚这个例子,就一下子掌握了线程间同步、互斥量、条件变量、队列、内存池的概念和使用。

首先,线程间同步的概念。

比如,学过数字电路的人都知道,两个时钟域的信号如果没有经过同步直接接到一起的话,会引起亚稳态。原因是如果恰好输入信号在时钟边沿附近变化的话(不满足建立保持时间的情况下),信号可能处于一个中间电平,这样会导致触发器处于一个振荡状态,引起整块数字电路的不稳定。这就是数字电路中异步的概念,两个时钟都是各自free running,彼此没有关系。

再比如单片机程序中,各个不同的中断程序或者跟主程序间是异步的,因为主程序在执行的过程中随时可能被进来的中断打断,如果中断和主程序之间要通过一个共享的变量传递数据,你就要注意这个共享的变量的保护。假如主程序只读取了一半的数据而被中断打断,然后中断程序中又更新了整个变量,这样的回到主程序继续执行时读到的数据就有一半是上一次的,一半是更新过的。这样的结果显然不是我们想要的。这里只是举了一个很明显的例子。更多的情况可以搜索一下“原子操作”。

所以在多线程环境下,我们就要注意线程间共享变量的保护,这块敏感区域叫临界区(Critical area)。在单片机中,我们用中断开关来保护共享变量读写操作的完整性。在操作系统中,我们用的是互斥锁(mutex)来占有这个变量,防止它被多个线程同时访问。当一个线程访问当前已经被另一个线程占有的变量时,就会进入阻塞态,直到另一个线程完成解锁操作后,这个线程将得到继续执行。

互斥锁(mutex)是多线程编程时最重要的一个工具,用来解决多线程竞争同个资源的问题。其最底层的实现都是一个原子操作来界定lock or unlock。

接下来的例子创建了两个线程,一个是producer, 另一个是cusumer, 它们两个是异步的,中间通过一个队列来通讯。producer 向队列中发送数据,cusumer读取数据。模拟了一个场景:producer 以较快的速度向队列写数据,cusumer 处理数据较慢。这在图像帧处理时经常会碰到CPU处理和发送数据较慢,而外设采集速度较快的情况,这样多余的帧将被丢弃。队列节点使用自己写的一个内存池来分配,在malloc_node 从内存池(free_queue)里取出node; release_node 时把节点放回资源池。当对free_queue 进行操作的时候都要加锁,因为malloc_node 和release_node 可能被不同的线程调用,必须对free_queue 进行保护。这样的函数称之为是线程安全的。同理对enqueue,dequeue的操作也要对队列进行保护。

然后使用条件变量来通知consumer 队列有新数据到来。条件变量同样是被多个线程调用,也是需要带一个mutex 来进行保护的。当条件不满足时,线程会解锁mutex 进入block状态等待消息,这样才不会一直占有CPU。当条件满足或者超时时,才继续执行下面的程序。

例子中使用了pthread(POSIX thread) 的实现。其实各大操作系统都有自己的实现,FreeRTOS, Linux kernel等等,都可以拿代码过来看看学习。

请看这个多线程的例子,可以在online gdb 中运行调试:

#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <pthread.h>
#include <stdbool.h>
#include <assert.h>
#include <sys/time.h>
#include <errno.h>

/\*\*\*\*\*\*\*\*\*\*\*\* queue manage \*\*\*\*\*\*\*\*\*\*\*/
typedef struct Node
{
    void \*data;
    struct Node \*next;
}queue\_node\_t;

#define BUFFER\_POOL\_SIZE  (640 \* 480)
#define BUFFER\_POOL\_NUM   5

struct pbuf{
    uint32\_t len;
    uint8\_t payload\[BUFFER\_POOL\_SIZE\];
};

typedef struct QueueList
{
    int sizeOfQueue;
    uint16\_t memSize;
    queue\_node\_t \*head;
    queue\_node\_t \*tail;
}queue\_t;

/\* to inform consumer\_thread \*/
static pthread\_cond\_t cap\_cond;
static pthread\_mutex\_t cap\_mutex;

/\* stream queue for communicate between two threads \*/
static queue\_t strm\_queue;
static pthread\_mutex\_t strmq\_mutex;

int strm\_queue\_init(){
    queue\_t \*q = &strm\_queue;
    q->sizeOfQueue = 0;
    q->memSize = 0;
    q->head = q->tail = NULL;

    if (pthread\_cond\_init(&cap\_cond, NULL) != 0) {
        printf("pthread\_cond\_init failed\\n");
        exit(-1);
    }
    if (pthread\_mutex\_init(&cap\_mutex, NULL) != 0) {
        printf("pthread\_mutex\_init failed\\n");
        pthread\_cond\_destroy(&cap\_cond);
        exit(-1);
    }

    if (pthread\_mutex\_init(&strmq\_mutex, NULL) != 0) {
        printf("pthread\_mutex\_init failed\\n");
        exit(-1);
    }

    return 0;
}

int malloc\_node(queue\_node\_t \*\*newNode);

int frame\_enqueue(void \*data, uint32\_t len){
    queue\_t \*q = &strm\_queue;
    queue\_node\_t \*newNode = NULL;
    struct pbuf \*p;
	assert(len < BUFFER\_POOL\_SIZE);
	if (malloc\_node(&newNode) != true) {
		//printf("no node available!\\n");
		return -1;
	}
	p = (struct pbuf \*)newNode->data;
	memcpy(p->payload, data, len);
	p->len = len;

	pthread\_mutex\_lock(&strmq\_mutex);
	if (q->sizeOfQueue == 0) {
		q->head = q->tail = newNode;
	} else {
		q->tail->next = newNode;
		q->tail = newNode;
	}
	
	q->sizeOfQueue++;
	pthread\_mutex\_unlock(&strmq\_mutex);
	
	pthread\_mutex\_lock(&cap\_mutex);
	pthread\_cond\_signal(&cap\_cond);
	pthread\_mutex\_unlock(&cap\_mutex);

	return 0;
}

int frame\_dequeue(queue\_node\_t \*\*newNode){
    queue\_t \*q = &strm\_queue;
    bool ret = false;
    pthread\_mutex\_lock(&strmq\_mutex);
    if (q->sizeOfQueue <= 0) {
        pthread\_mutex\_unlock(&strmq\_mutex);
        \*newNode = NULL;
        ret = false;
    }else{
        \*newNode = q->head;
        if (q->sizeOfQueue > 1) {
            q->head = q->head->next;
        } else {
            q->head = NULL;
            q->tail = NULL;
        }
        q->sizeOfQueue--;
        ret = true;
    }
    pthread\_mutex\_unlock(&strmq\_mutex);

    return ret;
}

/\*\*\*\*\* node pool manage \*\*\*\*\*/
static struct pbuf buffer\_pool\[BUFFER\_POOL\_NUM\];
static queue\_node\_t queue\_node\[BUFFER\_POOL\_NUM\];
/\* mutex for free\_queue when malloc and release node \*/
pthread\_mutex\_t freeq\_mutex;
queue\_t free\_queue;

int node\_pool\_init(){
    int i;
    queue\_t \*q = &free\_queue;
    queue\_node\_t \*newNode; 

    newNode = &queue\_node\[0\];
    newNode->data = (void \*)&buffer\_pool\[0\];
    q->head = q->tail = newNode;

    for (i = 1; i < BUFFER\_POOL\_NUM; i++) {   
       newNode = &queue\_node\[i\];
       newNode->data = (void \*)&buffer\_pool\[i\];      
       q->tail->next = newNode;
       q->tail = newNode;
    }
    q->sizeOfQueue = i;

    if (pthread\_mutex\_init(&freeq\_mutex, NULL) != 0) {
        printf("pthread\_mutex\_init failed\\n");
        exit(-1);
    }

    return 0;
}

/\* dequeue from free queue \*/
int malloc\_node(queue\_node\_t \*\*newNode){

    queue\_t \*q = &free\_queue;
    bool ret = false;
    pthread\_mutex\_lock(&freeq\_mutex);
    if (q->sizeOfQueue <= 0) {
        pthread\_mutex\_unlock(&freeq\_mutex);
        \*newNode = NULL;
        ret = false;
    }else{
        \*newNode = q->head;
        if (q->sizeOfQueue > 1) {
            q->head = q->head->next;
        } else {
            q->head = NULL;
            q->tail = NULL;
        }
        q->sizeOfQueue--;
        ret = true;
    }
    pthread\_mutex\_unlock(&freeq\_mutex);

    return ret;
}

/\* tailed to free queue \*/
int release\_node(queue\_node\_t \*newNode ){

    queue\_t \*q = &free\_queue;
    newNode->next = NULL;
    pthread\_mutex\_lock(&freeq\_mutex);
    if (q->sizeOfQueue == 0) {
        q->head = q->tail = newNode;
    } else {
        q->tail->next = newNode;
        q->tail = newNode;
    }

    q->sizeOfQueue++;
    pthread\_mutex\_unlock(&freeq\_mutex);
    return 0;
}

/\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\* test for communication between threads \*\*\*\*\*\*\*\*\*\*\*\*\*/
void\* producer\_thread(void \*argv\[\]){
    char message\[100\];
    printf("producer start!\\n");
    for(int i = 0; i < 100; i++){
        usleep(100000);
        sprintf(message, "Hello %d", i);
        if(0 != frame\_enqueue((void \*)message, strlen(message))){
            printf("drop message: %s!\\n", message);
        };
    }
    printf("producer stop!\\n");
}

void\* consumer\_thread(void \*argv\[\]){
    queue\_node\_t \*newNode;
    int i = 0;
    struct timeval cur\_tv;
    struct timespec to;
    int timeout\_msec = 5000;
    int ret;
    
    while(1){
        
        gettimeofday(&cur\_tv, NULL);
        to.tv\_sec = cur\_tv.tv\_sec + timeout\_msec / 1000;
        to.tv\_nsec = cur\_tv.tv\_usec \* 1000 + (timeout\_msec % 1000) \* 1000000;
        
        printf("consumer wait!\\n");        
        pthread\_mutex\_lock(&cap\_mutex);
        ret = pthread\_cond\_timedwait(&cap\_cond, &cap\_mutex, &to);
        pthread\_mutex\_unlock(&cap\_mutex);
        
        if((ret == ETIMEDOUT)){
            printf("condition wait timeout!\\n");
            break;
        }
        while(frame\_dequeue(&newNode)){
            /\* do some process \*/
            printf("%s\\n", ((struct pbuf \*)(newNode->data))->payload);
            usleep(500000);
            release\_node(newNode);
            i++;
        }
    }
    printf("consumer get %d message!\\n", i);    
    printf("consumer exit!\\n");
}

int main(int argc, char \*argv\[\])
{
    int i;
    pthread\_t pid1, pid2;
    printf("\\n--- test for communication between threads ---\\n");
    node\_pool\_init();    
    strm\_queue\_init();

    if (pthread\_create(&pid2, NULL, consumer\_thread, NULL)){
        printf("create consumer failed!\\n");
    }

    if (pthread\_create(&pid1, NULL, producer\_thread, NULL)) {
        printf("create producer failed!\\n");
        return -1;
    }
    
    pthread\_join(pid1, NULL);
    pthread\_join(pid2, NULL);
    printf("\\n--- test exit ---!\\n");
    return 0;
}



例子中模拟了 producer 的生产速度是cosumer 的5倍,所以当队列满了的时候很多数据就会被丢弃。

若要看其它更简单的使用例子,可以参看libpthread 源码 /tests目录下的小例子。OK!