生产者-消费者问题-信号量&共享内存
生产者-消费者问题
[TOC]
1. 问题简介
- 生产者消费者问题,亦称有限缓冲问题,是多进程同步问题
- 问题要求共享固定大小缓冲区的两个(或多个)进程——生产者 / 消费者,协同的产生数据放入缓冲区,读取数据拿出缓冲区
算法需要解决的是
- 保证在缓冲区满时,生产者不再放入数据
- 保证在缓冲区空时,消费者不再读取数据
- 缓冲池应作为一种临界资源,使得进程互斥的去访问
- 可以使用信号量机制 / 管程实现
2. 信号量
- 信号量分为内核信号量(由内核控制路径使用)和用户进程信号量,我们操作的主要是用户级
- 用户级信号量包括
System V
和POSIX
,笔记中按照System V
实现,但是书上貌似是POSIX
- Linux中与信号量操作相关的函数声明都在
<sys/sem.h>
文件中,其中包括了semget
/semop
/semctl
等 计数信号量具备
V
(signal()
) /P
(wait()
) 两种操作,V
操作将会增加信号量的数值,P
操作将会减少信号量的数值,其运作方式为:- 初始化信号量,给定一个非负整数值
- 运行
P
,信号量的数值会减少一,当运行完成后数值非负时,方可进入临界区 - 运行
V
,信号量的数值会增加一,当运行完成后数值非负时,等待进程方可进入临界区
2.0. 信号量定义
在
System V
标准下,所有信号量都必须在一个信号量集合中,信号量集合的定义如下:struct semid_ds { struct ipc_perm sem_perm; /* Ownership and permissions */ time_t sem_otime; /* Last semop time */ time_t sem_ctime; /* Last change time */ unsigned short sem_nsems; /* No. of semaphores in set */ };
其中,
sem_perm
表示了信号量集的ipc_perm
,也就是规定了从属与权限struct ipc_perm { key_t __key; /* Key supplied to semget(2) */ uid_t uid; /* Effective UID of owner */ gid_t gid; /* Effective GID of owner */ uid_t cuid; /* Effective UID of creator */ gid_t cgid; /* Effective GID of creator */ unsigned short mode; /* Permissions */ unsigned short __seq; /* Sequence number */ };
sem_otime
是最后一次调用semop
函数的时间sem_ctime
是最近一次对该信号量存在更改的时间
信号量的相关信息被定义于
sem
结构体中struct sem { short sempid; /* pid of last operation */ ushort semval; /* current value */ ushort semncnt; /* num procs awaiting increase in semval */ ushort semzcnt; /* num procs awaiting semval = 0 */ };
- 其中
sempid
为最后一次调用此信号量的进程PID semval
是信号量的值,就是决定阻塞与否的那个semncnt
是等待信号量值增加的进程个数semzcnt
是等待semval == 0
的进程个数
- 其中
2.1. semget
- Manual / OpenGroup
- 该函数返回值是一个信号量集合,也就是说,一次调用该函数,可以创建或者打开一个信号量集,如果有错误会返回-1,具有原子性
函数原型为
int semget(key_t key, int nsems, int semflg);
key_t key
: 是信号量集的键值,系统会据此传入的键值返回一个信号量标识符,之后所有对信号量的操作都要通过信号量标识符而非键值进行间接操作。键值与信号量标识符一一对应,且标识符是系统由键值计算得到,所以为了区别在多信号量集合中键值的唯一性,可以通过手动指定的方法,也可以使用ftok
函数key_t ftok(const char *pathname, int proj_id);
nsems
: 表示创建的信号量集合中含有的信号量的个数(该参数对非创建调用无效)semflg
: 表示权限或操作类型,和Linux文件系统中的权限类似,八进制数表示时,低位起第一位表示创建者的操作权限,第二位表示同组用户的操作权限,第三位表示其他用户的操作权限。4 为 可读,2 为可写, 6 为可读写,0 为不可读写
2.2. semop
- Manual / OpenGroup / die.net
- 该函数用于更改信号量的值,具有原子性
函数原型为
int semop(int semid, struct sembuf *sops, size_t nsops);
semid
: 表示信号量标识符sops
: 操作结构体,在每一个sembuf
结构体中,包含下列成员struct sembuf { unsigned short int sem_num; /* semaphore number */ short int sem_op; /* semaphore operation */ short int sem_flg; /* operation flag */ };
- `sem_num` : 表示要操作的信号量的标识符
- `sem_op` : 表示操作
- 当 `sem_op > 0` 时,表示进程解除对当前资源的占用状态,若在 `sembuf` 中的 `sem_flg` 值为 `SEM_UNDO` ,则 `semval` 会减去 `sem_op` 的绝对值
-
具体的可以移步[OpenGroup](https://pubs.opengroup.org/onlinepubs/9699919799/)去看,每一个 `sops` 指定的操作都会在 `semid` 和 `sem_num` 对应的信号量上执行
nsops
: 数组长度,即操作个数- 操作成功返回0, 否则返回-1报错
2.3. semctl
- Manual / OpenGroup
- 函数用于提供
cmd
规定的一系列信号量操作控制, 精确控制信号量的复杂变化,具有原子性 函数原型为
int semctl(int semid, int semnum, int cmd, [union semun] ...);
cmd
: 表示动作控制指令,常用有如下值IPC_RMID
: 删除信号量GETALL
: 读取信号量集合中的所有信号量的值GETVAL
: 读取信号量集合中的某一信号量的值SETALL
: 设置信号量集合中的所有信号量的值SETVAL
: 设置信号量集合中的某一信号量的值
使用
cmd
对信号量进行操作时,需要使用函数的第四个变量 (需要自己在代码中某一位置声明)union semun { int val; struct semid_ds *buf; unsigned short *array; } arg;
3. 基于信号量机制实现生产者消费者
- 代码存在了Github,可以通过curl 获取,有BUG的话发PR呀,谢谢~ 传送门
/**
* @file PandC.c
* @author FancyKing ([email protected])
* @brief 使用信号量机制与共享内存模拟生产者消费者问题
* 你可以在这里看到实验结果的截图
* https://imgchr.com/i/Ejptx0
* https://imgchr.com/i/EXz1OS
* @version 1.0
* @date 2019-05-18 - 2019-05-19
*
* @copyright Copyright (c) 2019
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#define N 10
/**
* @brief 由于使用了 'semctl' https://linux.die.net/man/2/semctl
* 所以在此补充定义 semun 类型
*/
union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
};
/**
* @brief 初始化 semid 信号量的值为 init_value
*
* @param semid 信号量标识符,使用 'semget' 的返回值得到
* @param init_value 欲初始化的信号量初始值
*/
void initialize_value_of_sem(int semid, int init_value) {
union semun sem_union_temp;
sem_union_temp.val = init_value;
if (semctl(semid, 0, SETVAL, sem_union_temp) == -1) {
printf("Initialize the value of semaphore ERROR\n");
exit(1);
}
}
/**
* @brief 依据信号量标识符删除信号量
*
* @param semid 信号量标识符,使用 'semget' 的返回值得到
*/
void delete_sem(int semid) {
union semun sem_union_temp;
if (semctl(semid, 0, IPC_RMID, sem_union_temp) == -1) {
printf("Delete the semaphore ERROR\n");
exit(1);
}
}
void signal(int semid) {
struct sembuf sembuf_temp;
// sem_flg 设置为 SEM_UNDO,表示在进程退出时,撤销生存周期所做的所有操作
// 避免因为进程异常退出而造成的死锁
sembuf_temp.sem_flg = SEM_UNDO;
// 信号量集合中的信号量编号,从 0 开始,所以该处表示有一个信号量
sembuf_temp.sem_num = 0;
// 将信号量的值 +sem_op,设置为正数即为增加信号量的值
// 若执行完毕,存在信号量的值为非负数,则资源可用
sembuf_temp.sem_op = 1;
// semop 的参数依次为 信号量标识符, 操作结构体, 操作信号量个数
if (semop(semid, &sembuf_temp, 1) == -1) {
printf("Signal ERROR\n");
exit(1);
}
}
/**
* @brief 对 semop 函数进行封装
*
* @param semid 欲操作信号量的标识符
*/
void wait(int semid) {
struct sembuf sembuf_temp;
// sem_flg 设置为 SEM_UNDO,表示在进程退出时,撤销生存周期所做的所有操作
// 避免因为进程异常退出而造成的死锁
sembuf_temp.sem_flg = SEM_UNDO;
// 信号量集合中的信号量编号,从 0 开始,所以该处表示有一个信号量
sembuf_temp.sem_num = 0;
// 将信号量的值 +sem_op,设置为负数即为减少信号量的值
// 若执行完毕,存在信号量的值为负数,则为进程阻塞,直到资源可用
sembuf_temp.sem_op = -1;
// semop 的参数依次为 信号量标识符, 操作结构体, 操作信号量个数
if (semop(semid, &sembuf_temp, 1) == -1) {
printf("Wait ERROR\n");
}
}
// 共享内存中的结构体
struct shared_data {
int in, out;
char buff[N];
FILE *fp_in;
FILE *fp_out;
};
struct shared_data *shared;
int empty, full, mutex;
int shared_memory_id;
void *shared_memory_addr;
int producer() {
wait(empty);
wait(mutex);
// 进入临界区
// 共享数据读入字符
char ch = fgetc(shared->fp_in);
// 已经读到文件尾部
if (ch == EOF) {
signal(mutex);
return 0;
}
// 保存读取的资源,记录循环标号
shared->buff[shared->in] = ch;
printf("%c\n", ch);
shared->in = (shared->in + 1) % N;
signal(mutex);
signal(full);
// 退出临界区
return 1;
}
int consumer() {
wait(full);
wait(mutex);
// 进入临界区
// 打印循环标号,取出共享数据
// printf("out_id: %d:", shared->out);
char out_char = shared->buff[shared->out];
if (out_char == EOF) {
signal(mutex);
return 0;
}
// 写入文件,输出流刷新,并打印到终端
shared->out = (shared->out + 1) % N;
fprintf(shared->fp_out, "%c", out_char);
fflush(shared->fp_out);
printf("%c\n", out_char);
signal(mutex);
signal(empty);
// 退出临界区
return 1;
}
int main(int argc, char const *argv[]) {
// 申请共享内存,如果要打开一个已经存在的合法共享内存, shmflg 可为 0
// 参数分别为 共享内存标识符,相关数据结构体,
shared_memory_id =
shmget(12345, sizeof(struct shared_data), 0666 | IPC_CREAT);
// shmat 三个参数分别为 共享内存标识符,指定连接地址,动作标记,挂接共享内存
// 当 shmaddr 为 NULL,则系统自动选择一个地址
shared_memory_addr = shmat(shared_memory_id, 0, 0);
shared = (struct shared_data *)shared_memory_addr;
// 打开文件输入输出流
shared->fp_in = fopen("in", "r");
shared->fp_out = fopen("out", "w");
if (shared->fp_in == NULL) {
printf("Read input file ERROR");
return 0;
}
shared->in = 0;
shared->out = 0;
// 得到信号量标识符,便于后续控制
empty = semget(3000, 1, 0666 | IPC_CREAT);
full = semget(3001, 1, 0666 | IPC_CREAT);
mutex = semget(3002, 1, 0666 | IPC_CREAT);
// 按照实际情况初始化信号量数值
initialize_value_of_sem(empty, N);
initialize_value_of_sem(full, 0);
initialize_value_of_sem(mutex, 1);
// 进程ID,其实貌似也就是个int
pid_t pidone, pidtwo;
while ((pidone = fork()) == -1)
;
if (pidone > 0) {
while ((pidtwo = fork()) == -1)
;
do {
printf("Producing below ");
} while (producer());
} else {
do {
printf("\t\t Recieveing below ");
} while (consumer());
printf("\n");
}
fclose(shared->fp_out);
fclose(shared->fp_in);
return 0;
}