一个例子讲清楚线程间同步、互斥量、条件变量、队列、内存池
前段时间有朋友想要了解一下多线程编程,正好有个项目上有这么个例子可以抽出来讲一讲。只要搞清楚这个例子,就一下子掌握了线程间同步、互斥量、条件变量、队列、内存池的概念和使用。
首先,线程间同步的概念。
比如,学过数字电路的人都知道,两个时钟域的信号如果没有经过同步直接接到一起的话,会引起亚稳态。原因是如果恰好输入信号在时钟边沿附近变化的话(不满足建立保持时间的情况下),信号可能处于一个中间电平,这样会导致触发器处于一个振荡状态,引起整块数字电路的不稳定。这就是数字电路中异步的概念,两个时钟都是各自free running,彼此没有关系。
再比如单片机程序中,各个不同的中断程序或者跟主程序间是异步的,因为主程序在执行的过程中随时可能被进来的中断打断,如果中断和主程序之间要通过一个共享的变量传递数据,你就要注意这个共享的变量的保护。假如主程序只读取了一半的数据而被中断打断,然后中断程序中又更新了整个变量,这样的回到主程序继续执行时读到的数据就有一半是上一次的,一半是更新过的。这样的结果显然不是我们想要的。这里只是举了一个很明显的例子。更多的情况可以搜索一下“原子操作”。
所以在多线程环境下,我们就要注意线程间共享变量的保护,这块敏感区域叫临界区(Critical area)。在单片机中,我们用中断开关来保护共享变量读写操作的完整性。在操作系统中,我们用的是互斥锁(mutex)来占有这个变量,防止它被多个线程同时访问。当一个线程访问当前已经被另一个线程占有的变量时,就会进入阻塞态,直到另一个线程完成解锁操作后,这个线程将得到继续执行。
互斥锁(mutex)是多线程编程时最重要的一个工具,用来解决多线程竞争同个资源的问题。其最底层的实现都是一个原子操作来界定lock or unlock。
接下来的例子创建了两个线程,一个是producer, 另一个是cusumer, 它们两个是异步的,中间通过一个队列来通讯。producer 向队列中发送数据,cusumer读取数据。模拟了一个场景:producer 以较快的速度向队列写数据,cusumer 处理数据较慢。这在图像帧处理时经常会碰到CPU处理和发送数据较慢,而外设采集速度较快的情况,这样多余的帧将被丢弃。队列节点使用自己写的一个内存池来分配,在malloc_node 从内存池(free_queue)里取出node; release_node 时把节点放回资源池。当对free_queue 进行操作的时候都要加锁,因为malloc_node 和release_node 可能被不同的线程调用,必须对free_queue 进行保护。这样的函数称之为是线程安全的。同理对enqueue,dequeue的操作也要对队列进行保护。
然后使用条件变量来通知consumer 队列有新数据到来。条件变量同样是被多个线程调用,也是需要带一个mutex 来进行保护的。当条件不满足时,线程会解锁mutex 进入block状态等待消息,这样才不会一直占有CPU。当条件满足或者超时时,才继续执行下面的程序。
例子中使用了pthread(POSIX thread) 的实现。其实各大操作系统都有自己的实现,FreeRTOS, Linux kernel等等,都可以拿代码过来看看学习。
请看这个多线程的例子,可以在online gdb 中运行调试:
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <pthread.h>
#include <stdbool.h>
#include <assert.h>
#include <sys/time.h>
#include <errno.h>
/\*\*\*\*\*\*\*\*\*\*\*\* queue manage \*\*\*\*\*\*\*\*\*\*\*/
typedef struct Node
{
void \*data;
struct Node \*next;
}queue\_node\_t;
#define BUFFER\_POOL\_SIZE (640 \* 480)
#define BUFFER\_POOL\_NUM 5
struct pbuf{
uint32\_t len;
uint8\_t payload\[BUFFER\_POOL\_SIZE\];
};
typedef struct QueueList
{
int sizeOfQueue;
uint16\_t memSize;
queue\_node\_t \*head;
queue\_node\_t \*tail;
}queue\_t;
/\* to inform consumer\_thread \*/
static pthread\_cond\_t cap\_cond;
static pthread\_mutex\_t cap\_mutex;
/\* stream queue for communicate between two threads \*/
static queue\_t strm\_queue;
static pthread\_mutex\_t strmq\_mutex;
int strm\_queue\_init(){
queue\_t \*q = &strm\_queue;
q->sizeOfQueue = 0;
q->memSize = 0;
q->head = q->tail = NULL;
if (pthread\_cond\_init(&cap\_cond, NULL) != 0) {
printf("pthread\_cond\_init failed\\n");
exit(-1);
}
if (pthread\_mutex\_init(&cap\_mutex, NULL) != 0) {
printf("pthread\_mutex\_init failed\\n");
pthread\_cond\_destroy(&cap\_cond);
exit(-1);
}
if (pthread\_mutex\_init(&strmq\_mutex, NULL) != 0) {
printf("pthread\_mutex\_init failed\\n");
exit(-1);
}
return 0;
}
int malloc\_node(queue\_node\_t \*\*newNode);
int frame\_enqueue(void \*data, uint32\_t len){
queue\_t \*q = &strm\_queue;
queue\_node\_t \*newNode = NULL;
struct pbuf \*p;
assert(len < BUFFER\_POOL\_SIZE);
if (malloc\_node(&newNode) != true) {
//printf("no node available!\\n");
return -1;
}
p = (struct pbuf \*)newNode->data;
memcpy(p->payload, data, len);
p->len = len;
pthread\_mutex\_lock(&strmq\_mutex);
if (q->sizeOfQueue == 0) {
q->head = q->tail = newNode;
} else {
q->tail->next = newNode;
q->tail = newNode;
}
q->sizeOfQueue++;
pthread\_mutex\_unlock(&strmq\_mutex);
pthread\_mutex\_lock(&cap\_mutex);
pthread\_cond\_signal(&cap\_cond);
pthread\_mutex\_unlock(&cap\_mutex);
return 0;
}
int frame\_dequeue(queue\_node\_t \*\*newNode){
queue\_t \*q = &strm\_queue;
bool ret = false;
pthread\_mutex\_lock(&strmq\_mutex);
if (q->sizeOfQueue <= 0) {
pthread\_mutex\_unlock(&strmq\_mutex);
\*newNode = NULL;
ret = false;
}else{
\*newNode = q->head;
if (q->sizeOfQueue > 1) {
q->head = q->head->next;
} else {
q->head = NULL;
q->tail = NULL;
}
q->sizeOfQueue--;
ret = true;
}
pthread\_mutex\_unlock(&strmq\_mutex);
return ret;
}
/\*\*\*\*\* node pool manage \*\*\*\*\*/
static struct pbuf buffer\_pool\[BUFFER\_POOL\_NUM\];
static queue\_node\_t queue\_node\[BUFFER\_POOL\_NUM\];
/\* mutex for free\_queue when malloc and release node \*/
pthread\_mutex\_t freeq\_mutex;
queue\_t free\_queue;
int node\_pool\_init(){
int i;
queue\_t \*q = &free\_queue;
queue\_node\_t \*newNode;
newNode = &queue\_node\[0\];
newNode->data = (void \*)&buffer\_pool\[0\];
q->head = q->tail = newNode;
for (i = 1; i < BUFFER\_POOL\_NUM; i++) {
newNode = &queue\_node\[i\];
newNode->data = (void \*)&buffer\_pool\[i\];
q->tail->next = newNode;
q->tail = newNode;
}
q->sizeOfQueue = i;
if (pthread\_mutex\_init(&freeq\_mutex, NULL) != 0) {
printf("pthread\_mutex\_init failed\\n");
exit(-1);
}
return 0;
}
/\* dequeue from free queue \*/
int malloc\_node(queue\_node\_t \*\*newNode){
queue\_t \*q = &free\_queue;
bool ret = false;
pthread\_mutex\_lock(&freeq\_mutex);
if (q->sizeOfQueue <= 0) {
pthread\_mutex\_unlock(&freeq\_mutex);
\*newNode = NULL;
ret = false;
}else{
\*newNode = q->head;
if (q->sizeOfQueue > 1) {
q->head = q->head->next;
} else {
q->head = NULL;
q->tail = NULL;
}
q->sizeOfQueue--;
ret = true;
}
pthread\_mutex\_unlock(&freeq\_mutex);
return ret;
}
/\* tailed to free queue \*/
int release\_node(queue\_node\_t \*newNode ){
queue\_t \*q = &free\_queue;
newNode->next = NULL;
pthread\_mutex\_lock(&freeq\_mutex);
if (q->sizeOfQueue == 0) {
q->head = q->tail = newNode;
} else {
q->tail->next = newNode;
q->tail = newNode;
}
q->sizeOfQueue++;
pthread\_mutex\_unlock(&freeq\_mutex);
return 0;
}
/\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\* test for communication between threads \*\*\*\*\*\*\*\*\*\*\*\*\*/
void\* producer\_thread(void \*argv\[\]){
char message\[100\];
printf("producer start!\\n");
for(int i = 0; i < 100; i++){
usleep(100000);
sprintf(message, "Hello %d", i);
if(0 != frame\_enqueue((void \*)message, strlen(message))){
printf("drop message: %s!\\n", message);
};
}
printf("producer stop!\\n");
}
void\* consumer\_thread(void \*argv\[\]){
queue\_node\_t \*newNode;
int i = 0;
struct timeval cur\_tv;
struct timespec to;
int timeout\_msec = 5000;
int ret;
while(1){
gettimeofday(&cur\_tv, NULL);
to.tv\_sec = cur\_tv.tv\_sec + timeout\_msec / 1000;
to.tv\_nsec = cur\_tv.tv\_usec \* 1000 + (timeout\_msec % 1000) \* 1000000;
printf("consumer wait!\\n");
pthread\_mutex\_lock(&cap\_mutex);
ret = pthread\_cond\_timedwait(&cap\_cond, &cap\_mutex, &to);
pthread\_mutex\_unlock(&cap\_mutex);
if((ret == ETIMEDOUT)){
printf("condition wait timeout!\\n");
break;
}
while(frame\_dequeue(&newNode)){
/\* do some process \*/
printf("%s\\n", ((struct pbuf \*)(newNode->data))->payload);
usleep(500000);
release\_node(newNode);
i++;
}
}
printf("consumer get %d message!\\n", i);
printf("consumer exit!\\n");
}
int main(int argc, char \*argv\[\])
{
int i;
pthread\_t pid1, pid2;
printf("\\n--- test for communication between threads ---\\n");
node\_pool\_init();
strm\_queue\_init();
if (pthread\_create(&pid2, NULL, consumer\_thread, NULL)){
printf("create consumer failed!\\n");
}
if (pthread\_create(&pid1, NULL, producer\_thread, NULL)) {
printf("create producer failed!\\n");
return -1;
}
pthread\_join(pid1, NULL);
pthread\_join(pid2, NULL);
printf("\\n--- test exit ---!\\n");
return 0;
}
例子中模拟了 producer 的生产速度是cosumer 的5倍,所以当队列满了的时候很多数据就会被丢弃。
若要看其它更简单的使用例子,可以参看libpthread 源码 /tests目录下的小例子。OK!