基于线程的并发编程 线程(thread)就是运行在进程上下文中的逻辑流。线程由内核自动调度。每个线程都有它自己的线程上下文(thread context),包括一个唯一的整数线程ID(Thread ID,TID)、栈、栈指针、程序计数器、通用目的寄存器和条件码。所有运行在一个进程里的线程共享该进程的整个虚拟地址空间。
Posix线程 Posix线程(Pthreads)是在C程序中处理线程的一个标准接口。下面的Pthreads程序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 #include <stdio.h> #include <stdlib.h> #include <pthread.h> void *thread (void *vargp) ;int main () { pthread_t tid; pthread_create(&tid, NULL , thread, NULL ); pthread_join(tid, NULL ); exit (0 ); } void *thread (void *vargp) { printf ("Hello, world!\n" ); return NULL ; }
主线程(main thread)创建一个对等线程(peer thread),然后等待它的终止。对等线程输出”Hello, world!\n”并终止。当主线程检测到对等线程终止后,他就通过调用exit终止该进程。 如第二行里的原型所示,每个线程例程都以一个通用指针作为输入,并返回一个通用指针。如果想传递多个参数给线程例程,那么应该将参数放到一个结构中,并传递一个指向该结构的指针。
1.写程序时忘记函数所需的头文件可以用man来查看,如pthread_create函数可以用man pthread_create
命令查看所需头文件。 2.该文件编译:gcc -o hello hello.c -lpthread ,pthread不是Linux下的默认的库,也就是在链接的时候,无法找到pthread库中函数的入口地址,于是链接会失败。
创建线程 线程通过调用pthread_create
函数来创建其他线程。
1 2 3 4 #include <pthread.h> typedef void *(func)(void *);int pthread_create (pthread_t *tid, pthread_attr_t *attr, func *f, void *arg) ;
返回:若成功则返回0,若出错则为非零。 pthread_create函数创建一个新的线程,并带着一个输入变量arg,在新线程的上下文中运行线程例程f。能用attr参数来改变新创建线程的默认属性。 新线程可以通过调用pthread_self函数来获得它自己的线程ID。
1 2 #include <pthread.h> pthread_t pthread_self (void ) ;
终止线程 终止方式如下:
当顶层的线程例程返回时,线程会隐式地
终止。
通过调用pthread_exit函数,线程会显式地
终止。如果主线程调用pthread_exit,它会等待所有其他对等线程终止,然后再终止主线程和整个进程,返回值为thread_return。1 2 #include <pthread.h> void pthread_exit (void *thread_return) ;
某个对等线程调用Unix的exit函数,该函数终止进程以及所有与改进程相关的线程。
另一个对等线程通过以当前线程ID作为参数调用pthread_cancle函数来终止当前线程。1 2 #include <pthread.h> int pthread_cancel (pthread_t tid) ;
回收已终止线程的资源 线程通过调用pthread_join函数等待其他线程终止。
1 2 #include <pthread.h> int pthread_join (pthread_t tid, void **thread_return) ;
pthread_join函数会阻塞,直到线程tid终止,将线程例程返回的(void *)指针赋值为thread_return指向的位置,然后回收
已终止线程占用的所有存储器资源。 和Unix的wait函数不同,pthread_join函数只能等待一个指定的线程终止。没有办法让pthread_join等待任意某个线程终止。
分离线程 在任一个时间点,线程是可结合的(joinable)
或者是分离的(detached)
。一个可结合的线程能被其他线程收回其资源和杀死。一个分离的线程是不能被其他线程回收或杀死的。 线程默认是可结合的,为了避免存储器泄露,每个可结合线程都应该要么被其他线程显式回收,要么通过pthread_detach函数被分离。
1 2 #include <pthread.h> int pthread_detach (pthread_t tid) ;
初始化线程 1 2 3 #include <pthread.h> pthread_once_t once_control = PTHREAD_ONCE_INIT;int pthread_once (pthread_once_t *once_control, void (*init_routine)(void )) ;
once_control变量是一个全局或者静态变量,总是被初始化为PTHREAD_ONCE_INIT。第一次用参数once_control调用pthread_once时,它调用init_routine,这是一个没有输入参数,也没有返回的函数。(后面有应用)
一个基于线程的并发服务器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 #include "csapp.h" void echo (int connfd) ;void *thread (void *vargp) ;int main (int argc, char **argv) { int listenfd, *connfdp, port; socklen_t clientlen = sizeof (struct sockaddr_in); struct sockaddr_in clientaddr ; pthread_t tid; if (argc != 2 ) { fprintf (stderr , "usage: %s <port>\n" , argv[0 ]); exit (0 ); } port = atoi(argv[1 ]); listenfd = Open_listenfd(port); while (1 ) { connfdp = Malloc(sizeof (int )); *connfdp = Accept(listenfd, (SA *) &clientaddr, &clientlen); Pthread_create(&tid, NULL , thread, connfdp); } } void echo (int connfd) { size_t n; char buf[MAXLINE]; rio_t rio; Rio_readinitb(&rio, connfd); while ((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0 ) { printf ("server received %d bytes\n" , n); Rio_writen(connfd, buf, n); } } void *thread (void *vargp) { int connfd = *((int *)vargp); Pthread_detach(pthread_self()); Free(vargp); echo(connfd); Close(connfd); return NULL ; }
对等线程的赋值语句和主线程的Accept语句间引入的竞争:主线程Accept之后,创建新线程执行thread线程例程,同时主线程继续Accept,如果主线程的Accept在新线程的赋值语句之前执行,那么之前的连接就没有被处理,而是处理的下一次连接。因此必须让每个Accept返回的已连接描述符分配到不同的动态存储器块。
编译:gcc -o echoservert echoservert.c csapp.c csapp.h -lpthread
多线程程序中的共享变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 #include "csapp.h" #define N 2 void *thread (void *vargp) ;char **ptr;int main () { int i; pthread_t tid; char *msgs[N] = { "Hello from foo" , "Hello from bar" }; ptr = msgs; for (i = 0 ; i < N; i++) Pthread_create(&tid, NULL , thread, (void *)i); Pthread_exit(NULL ); } void *thread (void *vargp) { int myid = (int )vargp; static int cnt = 0 ; printf ("[%d]: %s (cnt=%d)\n" , myid, ptr[myid], ++cnt); return NULL ; }
编译:gcc -o sharing sharing.c csapp.c csapp.h -lpthread
线程存储器模型 一组并发线程运行在一个进程的上下文中。每个线程都有它自己独立的线程上下文,包括线程ID
、栈
、栈指针
、程序计数器
、条件码
和通用目的寄存器值
。每个线程和其他线程一起共享进程上下文的剩余部分。这包括整个用户虚拟地址空间
,它是由只读文本(代码)、读/写数据、堆以及所有的共享库代码和数据区域组成的。将变量映射到存储器
全局变量 在运行时,虚拟存储器的读/写区域只包含每个全局变量的一个实例,任何线程都可以引用。
本地自动变量 每个线程的栈都包含它自己的所有本地自动变量的实例。
本地静态变量 和全局变量一样,虚拟存储器的读/写区域只包含在程序中声明的每个本地静态变量的一个实例。
用信号量同步线程 一个共享变量引入同步错误(synchronization)
的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 #include "csapp.h" void *thread (void *vargp) ;volatile int cnt = 0 ;int main (int argc, char **argv) { int niters; pthread_t tid1, tid2; if (argc != 2 ) { printf ("usage: %s <niters>\n" , argv[0 ]); } niters = atoi(argv[1 ]); Pthread_create(&tid1, NULL , thread, &niters); Pthread_create(&tid2, NULL , thread, &niters); Pthread_join(tid1, NULL ); Pthread_join(tid2, NULL ); if (cnt != (2 * niters)) printf ("BOOM! cnt=%d\n" , cnt); else printf ("OK cnt=%d\n" , cnt); exit (0 ); } void *thread (void *vargp) { int i, niters =*((int *)vargp); for (i = 0 ; i < niters; i++) cnt++; return NULL ; }
编译:gcc -o badcnt badcnt.c csapp.c csapp.h -lpthread
执行:
1 2 3 4 ➜ pthread ./badcnt 10000000 BOOM! cnt=18624047 ➜ pthread ./badcnt 10000000 BOOM! cnt=12824971
会发现当niters足够大时,得到的答案会是错误的,而且每次都不同。因为当badcnt.c中的两个对等线程在一个单处理器上并发运行时,机器指令以某种顺序一个接一个地完成。这些顺序中的一些将会产生正确结果,但其他的则不会。一般而言,没有办法预测操作系统是否将为你的线程选择一个正确的顺序。
信号量 信号量s是具有非负整数值的全局变量,只能由两种特殊的操作来处理:
P(s) 如果s是非零的,P将s减一,并立即返回。如果s为零,那么就挂起这个线程,直到s变为非零。
V(s) V操作将s加一。如果有任何线程阻塞在P操作等待s变成非零,那么V操作会重启这些线程中的一个。
当有多个线程在等待同一个信号量时,不能预测V操作要重启哪个线程。 Posix标准定义了许多操作信号量的函数。
1 2 3 4 5 6 #include <semaphore.h> int sem_init (sem_t *sem, 0 , unsigned int value) ;int sem_wait (sem_t *s) ;int sem_post (sem_t *s) ;
可以用以下包装函数代替
1 2 3 #include "csapp.h" void P (sem_t *s) ; void V (sem_t *s) ;
使用信号量来实现互斥 基本思想是将共享变量与一个信号量s(初始为1)联系起来,然后用P(s)和V(s)操作将相应的临界区包围起来。
临界区:对于线程i,操作共享变量cnt内容的指令构成了一个临界区(critical section)。 要确保每个线程在执行它的临界区中的指令时,拥有对共享变量的互斥的访问,这种现象称为互斥(mutual exclusion)。
以这种方式保护共享变量的信号量叫做二元信号量(binary semaphore),因为它的值为0或1。以提供互斥为目的的二元信号量也成为互斥锁(mutex)。在一个互斥锁上执行P操作称为对互斥锁加锁
,V操作称为解锁
。对一个互斥锁加了锁但是还没有解锁的线程称为占用
这个互斥锁。 一个被用作一组可用资源的计数器的信号量称为计数信号量
。 用信号量正确同步前面的计数器程序实例: 1.首先声明一个信号量mutex
1 2 volatile int cnt = 0 ;sem_t mutex;
2.在主例程中,pthread_create之前将mutex初始化为1
3.通过在线程例程中对共享变量cnt的更新包围P和V操作
1 2 3 4 5 for (i = 0 ; i < niters; i++) { P(&mutex); cnt++; V(&mutex); }
再编译执行就一定能得到正确结果了
1 2 3 4 5 ➜ pthread gcc -o badcnt badcnt.c csapp.c csapp.h -lpthread ➜ pthread ./badcnt 10000000 OK cnt=20000000 ➜ pthread ./badcnt 100000000 OK cnt=200000000
利用信号量来调度共享资源 一个线程通过信号量操作来通知另一个线程,程序状态中某个条件已经为真了。两个经典而有用的例子是生产者 - 消费者
和读者 - 写者
问题。 1.生产者 - 消费者问题 生产者和消费者线程共享一个有n个槽的有限缓冲区
。生产者线程反复地生成新的项目,并把它们插入到缓冲区中。消费者线程不断地从缓冲区中取出这些项目,然后消费它们。 因为插入和取出项目都涉及更新共享变量,所以我们必须保证对缓冲区的访问是互斥的。但只保证互斥访问是不够的,还需要调度对缓冲区的访问。如果缓冲区是满的,生产者就必须等到有一个槽位变为可用。如果缓冲区是空的,那么消费者必须等到有一个可用项目。 下面开发一个简单的包,叫做SBUF,用来构造生产者 - 消费者程序。 SBUF操作类型为sbuf_t的有限缓冲区。
1 2 3 4 5 6 7 8 9 10 11 #include "csapp.h" typedef struct { int *buf; int n; int front; int rear; sem_t mutex; sem_t slots; sem_t items; } sbuf_t ;
SBUF函数的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 include "csapp.h" #include "sbuf.h" void sbuf_init (sbuf_t *sp, int n) { sp->buf = Calloc(n, sizeof (int )); sp->n = n; sp->front = sp->rear = 0 ; Sem_init(&sp->mutex, 0 , 1 ); Sem_init(&sp->slots, 0 , n); Sem_init(&sp->items, 0 , 0 ); } void sbuf_deinit (sbuf_t *sp) { Free(sp->buf); } void sbuf_insert (sbuf_t *sp, int item) { P(&sp->slots); P(&sp->mutex); sp->buf[(++sp->rear) % (sp->n)] = item; V(&sp->mutex); V(&sp->items); } int sbuf_remove (sbuf_t *sp) { int item; P(&sp->items); P(&sp->mutex); item = sp->buf[(++sp->front) % (sp->n)]; V(&sp->mutex); V(&sp->slots); return item; }
2.读者 - 写者问题 一组并发的线程要访问一个共享对象,有些线程只读对象,而其他的线程只修改对象。修改对象的线程叫做写者
,只读对象的线程叫做读者
。写者必须拥有对对象的独占的访问,而读者可以和无限多个其他的读者共享对象。第一类读者 - 写者问题
读者优先,要求不让读者等待,除非已经有写者在占用。第二类读者 - 写者问题
写者优先,在写者之后到的读者要等待。 第一类读者 - 写者问题的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 int readcnt; sem_t mutex, w; void reader (void ) { while (1 ) { P(&mutex); readcnt++; if (readcnt == 1 ) P(&w); V(&mutex); P(&mutex); readcnt--; if (readcnt == 0 ) V(&w); V(&mutex); } } void writer (void ) { while (1 ) { P(&w); V(&w); } }
综合:基于预线程化的并发服务器 一个基于预线程化(prethreading)的服务器通过使用生产者 - 消费者模型来降低为每一个新客户端创建一个新线程的开销。服务器是由一个主线程和一组工作者线程构成的。主线程不断地接收来自客户端的连接请求,并将得到的连接描述符放在一个有限缓冲区中。每一个工作者线程反复地从共享缓冲区中取出描述符,为客户端服务,然后等待下一个描述符。 用SBUF包实现一个预线程化的并发echo服务器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 #include "csapp.h" #include "sbuf.h" #define NTHREADS 4 #define SBUFSIZE 16 void echo_cnt (int connfd) ;void *thread (void *vargp) ;sbuf_t sbuf;int main (int argc, char **argv) { int i, listenfd, connfd, port; socklen_t clientlen = sizeof (struct sockaddr_in); struct sockaddr_in clientaddr ; pthread_t tid; if (argc != 2 ) { fprintf (stderr , "usage: %s <port>\n" , argv[0 ]); exit (0 ); } port = atoi(argv[1 ]); sbuf_init(&sbuf, SBUFSIZE); listenfd = Open_listenfd(port); for (i = 0 ; i < NTHREADS; i++) { Pthread_create(&tid, NULL , thread, NULL ); } while (1 ) { connfd = Accept(listenfd, (SA *) &clientaddr, &clientlen); sbuf_insert(&sbuf, connfd); } } void *thread (void *vargp) { Pthread_detach(pthread_self()); while (1 ) { int connfd = sbuf_remove(&sbuf); echo_cnt(connfd); Close(connfd); } }
echo_cnt函数的实现如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 include "csapp.h" static int byte_cnt; static sem_t mutex;static void init_echo_cnt (void ) { Sem_init(&mutex, 0 , 1 ); byte_cnt = 0 ; } void echo_cnt (int connfd) { int n; char buf[MAXLINE]; rio_t rio; static pthread_once_t once = PTHREAD_ONCE_INIT; Pthread_once(&once, init_echo_cnt); Rio_readinitb(&rio, connfd); while ((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0 ) { P(&mutex); byte_cnt += n; printf ("thread %d received %d (%d total) bytes on fd %d\n" , (int )pthread_self(), n, byte_cnt, connfd); V(&mutex); Rio_writen(connfd, buf, n); } }
编译:gcc -o echoservert_pre echoservert_pre.c sbuf.h sbuf.c csapp.h csapp.c echo_cnt.c -lpthread
reference 《深入理解计算机系统》