基于线程的并发编程

基于线程的并发编程

线程(thread)就是运行在进程上下文中的逻辑流。线程由内核自动调度。每个线程都有它自己的线程上下文(thread context),包括一个唯一的整数线程ID(Thread ID,TID)、栈、栈指针、程序计数器、通用目的寄存器和条件码。所有运行在一个进程里的线程共享该进程的整个虚拟地址空间。

Posix线程

Posix线程(Pthreads)是在C程序中处理线程的一个标准接口。下面的Pthreads程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

void *thread(void *vargp);

int main()
{

pthread_t tid;
pthread_create(&tid, NULL, thread, NULL); // 返回时,主线程和对等线程同时运行
pthread_join(tid, NULL); // 等待对等线程终止
exit(0);
}

void *thread(void *vargp)
{

printf("Hello, world!\n");
return NULL;
}

主线程(main thread)创建一个对等线程(peer thread),然后等待它的终止。对等线程输出“Hello, world!”并终止。当主线程检测到对等线程终止后,他就通过调用exit终止该进程。
如第二行里的原型所示,每个线程例程都以一个通用指针作为输入,并返回一个通用指针。如果想传递多个参数给线程例程,那么应该将参数放到一个结构中,并传递一个指向该结构的指针。

1.写程序时忘记函数所需的头文件可以用man来查看,如pthread_create函数可以用man pthread_create命令查看所需头文件。 2.该文件编译:gcc -o hello hello.c -lpthread ,pthread不是Linux下的默认的库,也就是在链接的时候,无法找到pthread库中函数的入口地址,于是链接会失败。

创建线程

线程通过调用pthread_create函数来创建其他线程。

1
2
3
4
#include <pthread.h>
typedef void *(func)(void *);
int pthread_create(pthread_t *tid, pthread_attr_t *attr,
func *f, void *arg)
;

返回:若成功则返回0,若出错则为非零。
pthread_create函数创建一个新的线程,并带着一个输入变量arg,在新线程的上下文中运行线程例程f。能用attr参数来改变新创建线程的默认属性。
新线程可以通过调用pthread_self函数来获得它自己的线程ID。

1
2
#include <pthread.h>
pthread_t pthread_self(void);

终止线程

终止方式如下:

  • 当顶层的线程例程返回时,线程会隐式地终止。
  • 通过调用pthread_exit函数,线程会显式地终止。如果主线程调用pthread_exit,它会等待所有其他对等线程终止,然后再终止主线程和整个进程,返回值为thread_return。

    1
    2
    #include <pthread.h>
    void pthread_exit(void *thread_return);

  • 某个对等线程调用Unix的exit函数,该函数终止进程以及所有与改进程相关的线程。
  • 另一个对等线程通过以当前线程ID作为参数调用pthread_cancle函数来终止当前线程。

    1
    2
    #include <pthread.h>
    int pthread_cancel(pthread_t tid);

回收已终止线程的资源

线程通过调用pthread_join函数等待其他线程终止。

1
2
#include <pthread.h>
int pthread_join(pthread_t tid, void **thread_return);

pthread_join函数会阻塞,直到线程tid终止,将线程例程返回的(void *)指针赋值为thread_return指向的位置,然后回收已终止线程占用的所有存储器资源。 和Unix的wait函数不同,pthread_join函数只能等待一个指定的线程终止。没有办法让pthread_join等待任意某个线程终止。

分离线程

在任一个时间点,线程是可结合的(joinable)或者是分离的(detached)。一个可结合的线程能被其他线程收回其资源和杀死。一个分离的线程是不能被其他线程回收或杀死的。
线程默认是可结合的,为了避免存储器泄露,每个可结合线程都应该要么被其他线程显式回收,要么通过pthread_detach函数被分离。

1
2
#include <pthread.h>
int pthread_detach(pthread_t tid);

初始化线程

1
2
3
#include <pthread.h>
pthread_once_t once_control = PTHREAD_ONCE_INIT;
int pthread_once(pthread_once_t *once_control, void (*init_routine)(void));

once_control变量是一个全局或者静态变量,总是被初始化为PTHREAD_ONCE_INIT。第一次用参数once_control调用pthread_once时,它调用init_routine,这是一个没有输入参数,也没有返回的函数。(后面有应用)

一个基于线程的并发服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include "csapp.h"
void echo(int connfd);
void *thread(void *vargp);

int main(int argc, char **argv)
{

int listenfd, *connfdp, port;
socklen_t clientlen = sizeof(struct sockaddr_in);
struct sockaddr_in clientaddr;
pthread_t tid;

if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}
port = atoi(argv[1]);
listenfd = Open_listenfd(port);
while (1) {
// 为了避免对等线程的赋值语句和主线程的accept语句间引入的竞争
connfdp = Malloc(sizeof(int));
*connfdp = Accept(listenfd, (SA *) &clientaddr, &clientlen);
Pthread_create(&tid, NULL, thread, connfdp);
}
}

void echo(int connfd)
{

size_t n;
char buf[MAXLINE];
rio_t rio;

Rio_readinitb(&rio, connfd);
while ((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) {
printf("server received %d bytes\n", n);
Rio_writen(connfd, buf, n);
}
}

void *thread(void *vargp)
{

int connfd = *((int *)vargp);
Pthread_detach(pthread_self());
Free(vargp);
echo(connfd);
Close(connfd);
return NULL;
}

对等线程的赋值语句和主线程的Accept语句间引入的竞争:主线程Accept之后,创建新线程执行thread线程例程,同时主线程继续Accept,如果主线程的Accept在新线程的赋值语句之前执行,那么之前的连接就没有被处理,而是处理的下一次连接。因此必须让每个Accept返回的已连接描述符分配到不同的动态存储器块。

编译:gcc -o echoservert echoservert.c csapp.c csapp.h -lpthread

多线程程序中的共享变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include "csapp.h"
#define N 2
void *thread(void *vargp);
char **ptr;
int main()
{

int i;
pthread_t tid;
char *msgs[N] = {
"Hello from foo",
"Hello from bar"
};

ptr = msgs;
for (i = 0; i < N; i++)
Pthread_create(&tid, NULL, thread, (void *)i);
Pthread_exit(NULL);
}

void *thread(void *vargp)
{

int myid = (int)vargp;
static int cnt = 0;
printf("[%d]: %s (cnt=%d)\n", myid, ptr[myid], ++cnt);
return NULL;
}

编译:gcc -o sharing sharing.c csapp.c csapp.h -lpthread

线程存储器模型

一组并发线程运行在一个进程的上下文中。每个线程都有它自己独立的线程上下文,包括线程ID栈指针程序计数器条件码通用目的寄存器值。每个线程和其他线程一起共享进程上下文的剩余部分。这包括整个用户虚拟地址空间,它是由只读文本(代码)、读/写数据、堆以及所有的共享库代码和数据区域组成的。
将变量映射到存储器

  • 全局变量 在运行时,虚拟存储器的读/写区域只包含每个全局变量的一个实例,任何线程都可以引用。
  • 本地自动变量 每个线程的栈都包含它自己的所有本地自动变量的实例。
  • 本地静态变量 和全局变量一样,虚拟存储器的读/写区域只包含在程序中声明的每个本地静态变量的一个实例。

用信号量同步线程

一个共享变量引入同步错误(synchronization)的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include "csapp.h"

void *thread(void *vargp);
volatile int cnt = 0;

int main(int argc, char **argv)
{

int niters;
pthread_t tid1, tid2;
if (argc != 2) {
printf("usage: %s <niters>\n", argv[0]);
}
niters = atoi(argv[1]);

Pthread_create(&tid1, NULL, thread, &niters);
Pthread_create(&tid2, NULL, thread, &niters);
Pthread_join(tid1, NULL);
Pthread_join(tid2, NULL);

if (cnt != (2 * niters))
printf("BOOM! cnt=%d\n", cnt);
else
printf("OK cnt=%d\n", cnt);
exit(0);
}

void *thread(void *vargp)
{

int i, niters =*((int *)vargp);
for (i = 0; i < niters; i++)
cnt++;
return NULL;
}

编译:gcc -o badcnt badcnt.c csapp.c csapp.h -lpthread
执行:

1
2
3
4
➜  pthread ./badcnt 10000000
BOOM! cnt=18624047
➜ pthread ./badcnt 10000000
BOOM! cnt=12824971

会发现当niters足够大时,得到的答案会是错误的,而且每次都不同。因为当badcnt.c中的两个对等线程在一个单处理器上并发运行时,机器指令以某种顺序一个接一个地完成。这些顺序中的一些将会产生正确结果,但其他的则不会。一般而言,没有办法预测操作系统是否将为你的线程选择一个正确的顺序。

信号量

信号量s是具有非负整数值的全局变量,只能由两种特殊的操作来处理:

  • P(s) 如果s是非零的,P将s减一,并立即返回。如果s为零,那么就挂起这个线程,直到s变为非零。
  • V(s) V操作将s加一。如果有任何线程阻塞在P操作等待s变成非零,那么V操作会重启这些线程中的一个。

当有多个线程在等待同一个信号量时,不能预测V操作要重启哪个线程。
Posix标准定义了许多操作信号量的函数。

1
2
3
4
5
6
#include <semaphore.h>
// 将信号量sem初始化为value,每个信号量使用前必须初始化
int sem_init(sem_t *sem, 0, unsigned int value);
// 程序分别通过调用sem_wait和sem_post函数来执行P和V操作。
int sem_wait(sem_t *s);
int sem_post(sem_t *s);

可以用以下包装函数代替

1
2
3
#include "csapp.h"
void P(sem_t *s); // sem_wait的包装函数
void V(sem_t *s); // sem_post的包装函数

使用信号量来实现互斥

基本思想是将共享变量与一个信号量s(初始为1)联系起来,然后用P(s)和V(s)操作将相应的临界区包围起来。

临界区:对于线程i,操作共享变量cnt内容的指令构成了一个临界区(critical section)。 要确保每个线程在执行它的临界区中的指令时,拥有对共享变量的互斥的访问,这种现象称为互斥(mutual exclusion)。

以这种方式保护共享变量的信号量叫做二元信号量(binary semaphore),因为它的值为0或1。以提供互斥为目的的二元信号量也成为互斥锁(mutex)。在一个互斥锁上执行P操作称为对互斥锁加锁,V操作称为解锁。对一个互斥锁加了锁但是还没有解锁的线程称为占用这个互斥锁。
一个被用作一组可用资源的计数器的信号量称为计数信号量
用信号量正确同步前面的计数器程序实例:
1.首先声明一个信号量mutex

1
2
volatile int cnt = 0;
sem_t mutex;

2.在主例程中,pthread_create之前将mutex初始化为1

1
Sem_init(&mutex, 0, 1);

3.通过在线程例程中对共享变量cnt的更新包围P和V操作

1
2
3
4
5
for (i = 0; i < niters; i++) {
P(&mutex);
cnt++;
V(&mutex);
}

再编译执行就一定能得到正确结果了

1
2
3
4
5
➜  pthread gcc -o badcnt badcnt.c csapp.c csapp.h -lpthread
➜ pthread ./badcnt 10000000
OK cnt=20000000
➜ pthread ./badcnt 100000000
OK cnt=200000000

利用信号量来调度共享资源

一个线程通过信号量操作来通知另一个线程,程序状态中某个条件已经为真了。两个经典而有用的例子是生产者 - 消费者读者 - 写者问题。
1.生产者 - 消费者问题 生产者和消费者线程共享一个有n个槽的有限缓冲区。生产者线程反复地生成新的项目,并把它们插入到缓冲区中。消费者线程不断地从缓冲区中取出这些项目,然后消费它们。
因为插入和取出项目都涉及更新共享变量,所以我们必须保证对缓冲区的访问是互斥的。但只保证互斥访问是不够的,还需要调度对缓冲区的访问。如果缓冲区是满的,生产者就必须等到有一个槽位变为可用。如果缓冲区是空的,那么消费者必须等到有一个可用项目。
下面开发一个简单的包,叫做SBUF,用来构造生产者 - 消费者程序。
SBUF操作类型为sbuf_t的有限缓冲区。

1
2
3
4
5
6
7
8
9
10
11
// sbuf.h
#include "csapp.h"
typedef struct {
int *buf; // 存放项目的动态分配的n项整数数组
int n; // 槽位的个数
int front; // 索引值,(front+1)%n记录数组第一项
int rear; // 索引值,rear%n记录数组最后一项
sem_t mutex; // 提供互斥缓冲区访问的信号量
sem_t slots; // 空槽位数
sem_t items; // 可用项目数
} sbuf_t;

SBUF函数的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
include "csapp.h"
#include "sbuf.h"

void sbuf_init(sbuf_t *sp, int n)
{ // 为缓冲区分配堆存储器

sp->buf = Calloc(n, sizeof(int));
sp->n = n;
sp->front = sp->rear = 0; // 表示空缓冲区
Sem_init(&sp->mutex, 0, 1);
Sem_init(&sp->slots, 0, n);
Sem_init(&sp->items, 0, 0);
}

void sbuf_deinit(sbuf_t *sp)
{ // 应用程序用完缓冲区时,释放缓冲区存储

Free(sp->buf);
}

void sbuf_insert(sbuf_t *sp, int item)
{

P(&sp->slots); // wait for available slot
P(&sp->mutex); // lock the buffer
sp->buf[(++sp->rear) % (sp->n)] = item; // insert the item
V(&sp->mutex); // unlock the buffer
V(&sp->items); // announce available item
}

int sbuf_remove(sbuf_t *sp)
{

int item;
P(&sp->items); // wait for available slot
P(&sp->mutex);
item = sp->buf[(++sp->front) % (sp->n)];
V(&sp->mutex);
V(&sp->slots);
return item;
}

2.读者 - 写者问题
一组并发的线程要访问一个共享对象,有些线程只读对象,而其他的线程只修改对象。修改对象的线程叫做写者,只读对象的线程叫做读者。写者必须拥有对对象的独占的访问,而读者可以和无限多个其他的读者共享对象。
第一类读者 - 写者问题 读者优先,要求不让读者等待,除非已经有写者在占用。
第二类读者 - 写者问题 写者优先,在写者之后到的读者要等待。
第一类读者 - 写者问题的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
int readcnt; // 共享变量,统计当前在临界区中的读者数量
sem_t mutex, w; // mutex保护对readcnt的访问,w控制对访问共享对象的临界区的访问

void reader(void)
{

while (1) {
P(&mutex);
readcnt++;
if (readcnt == 1) // first in
P(&w);
V(&mutex);
/* Critical section
Reading happens */

P(&mutex);
readcnt--;
if (readcnt == 0) // last out
V(&w);
V(&mutex);
}
}

void writer(void)
{

while (1) {
P(&w);
/* Critical section
Writing happens */

V(&w);
}
}

综合:基于预线程化的并发服务器

一个基于预线程化(prethreading)的服务器通过使用生产者 - 消费者模型来降低为每一个新客户端创建一个新线程的开销。服务器是由一个主线程和一组工作者线程构成的。主线程不断地接收来自客户端的连接请求,并将得到的连接描述符放在一个有限缓冲区中。每一个工作者线程反复地从共享缓冲区中取出描述符,为客户端服务,然后等待下一个描述符。
用SBUF包实现一个预线程化的并发echo服务器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// echoservert_pre.c
#include "csapp.h"
#include "sbuf.h"
#define NTHREADS 4
#define SBUFSIZE 16

void echo_cnt(int connfd);
void *thread(void *vargp);

sbuf_t sbuf;

int main(int argc, char **argv)
{

int i, listenfd, connfd, port;
socklen_t clientlen = sizeof(struct sockaddr_in);
struct sockaddr_in clientaddr;
pthread_t tid;

if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}
port = atoi(argv[1]);
sbuf_init(&sbuf, SBUFSIZE); // 初始化缓冲区sbuf
listenfd = Open_listenfd(port);

for (i = 0; i < NTHREADS; i++) { // 主线程创建了一组工作者线程
Pthread_create(&tid, NULL, thread, NULL);
}

while (1) { // 进入无限的服务器循环,接受连接请求
connfd = Accept(listenfd, (SA *) &clientaddr, &clientlen);
sbuf_insert(&sbuf, connfd); // 将得到的已连接的描述符插入到缓冲区sbuf中。
}
}

void *thread(void *vargp)
{

Pthread_detach(pthread_self());
while (1) { // 每个工作者线程等待直到它能从缓冲区中取出一个已连接的描述符
int connfd = sbuf_remove(&sbuf);
echo_cnt(connfd);
Close(connfd);
}
}

echo_cnt函数的实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// echo_cnt.c
include "csapp.h"
static int byte_cnt; // 记录了所有客户端接收到的累计字节数
static sem_t mutex;

static void init_echo_cnt(void)
{

Sem_init(&mutex, 0, 1);
byte_cnt = 0;
}

void echo_cnt(int connfd)
{

int n;
char buf[MAXLINE];
rio_t rio;
static pthread_once_t once = PTHREAD_ONCE_INIT;
Pthread_once(&once, init_echo_cnt); // 当第一次有某个线程调用echo_once函数时,使用pthread_once调用初始化函数
Rio_readinitb(&rio, connfd);
while ((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) {
P(&mutex); // 对共享变量byte_cnt进行保护
byte_cnt += n;
printf("thread %d received %d (%d total) bytes on fd %d\n",
(int)pthread_self(), n, byte_cnt, connfd);
V(&mutex);
Rio_writen(connfd, buf, n);
}
}

编译:gcc -o echoservert_pre echoservert_pre.c sbuf.h sbuf.c csapp.h csapp.c echo_cnt.c -lpthread
reference
《深入理解计算机系统》