生产者-消费者问题

1. 问题简介

  • 生产者消费者问题,亦称有限缓冲问题,是多进程同步问题
  • 问题要求共享固定大小缓冲区的两个(或多个)进程——生产者 / 消费者,协同的产生数据放入缓冲区,读取数据拿出缓冲区
  • 算法需要解决的是
    • 保证在缓冲区满时,生产者不再放入数据
    • 保证在缓冲区空时,消费者不再读取数据
    • 缓冲池应作为一种临界资源,使得进程互斥的去访问
  • 可以使用信号量机制 / 管程实现

2. 信号量

  • 信号量分为内核信号量(由内核控制路径使用)和用户进程信号量,我们操作的主要是用户级
  • 用户级信号量包括 System VPOSIX ,笔记中按照 System V 实现,但是书上貌似是 POSIX
  • Linux中与信号量操作相关的函数声明都在 <sys/sem.h> 文件中,其中包括了 semget / semop / semctl
  • 计数信号量具备 V (signal()) / P (wait()) 两种操作,V 操作将会增加信号量的数值, P 操作将会减少信号量的数值,其运作方式为:
    1. 初始化信号量,给定一个非负整数值
    2. 运行 P ,信号量的数值会减少一,当运行完成后数值非负时,方可进入临界区
    3. 运行 V ,信号量的数值会增加一,当运行完成后数值非负时,等待进程方可进入临界区

2.0. 信号量定义

  • System V 标准下,所有信号量都必须在一个信号量集合中,信号量集合的定义如下:

    struct semid_ds {
        struct ipc_perm sem_perm;  /* Ownership and permissions */
        time_t          sem_otime; /* Last semop time */
        time_t          sem_ctime; /* Last change time */
        unsigned short  sem_nsems; /* No. of semaphores in set */
    };
    • 其中, sem_perm 表示了信号量集的 ipc_perm ,也就是规定了从属与权限

      struct ipc_perm {
          key_t          __key; /* Key supplied to semget(2) */
          uid_t          uid;   /* Effective UID of owner */
          gid_t          gid;   /* Effective GID of owner */
          uid_t          cuid;  /* Effective UID of creator */
          gid_t          cgid;  /* Effective GID of creator */
          unsigned short mode;  /* Permissions */
          unsigned short __seq; /* Sequence number */
      };
    • sem_otime 是最后一次调用 semop 函数的时间

    • sem_ctime 是最近一次对该信号量存在更改的时间

  • 信号量的相关信息被定义于 sem 结构体中

    struct sem {
      short   sempid;         /* pid of last operation */
      ushort  semval;         /* current value */
      ushort  semncnt;        /* num procs awaiting increase in semval */
      ushort  semzcnt;        /* num procs awaiting semval = 0 */
    };
    • 其中 sempid 为最后一次调用此信号量的进程PID
    • semval 是信号量的值,就是决定阻塞与否的那个
    • semncnt 是等待信号量值增加的进程个数
    • semzcnt 是等待 semval == 0 的进程个数

2.1. semget - Manual / OpenGroup

  • 该函数返回值是一个信号量集合,也就是说,一次调用该函数,可以创建或者打开一个信号量集,如果有错误会返回-1,具有原子性

  • 函数原型为

    int semget(key_t key, int nsems, int semflg);
    • key_t key : 是信号量集的键值,系统会据此传入的键值返回一个信号量标识符,之后所有对信号量的操作都要通过信号量标识符而非键值进行间接操作。键值与信号量标识符一一对应,且标识符是系统由键值计算得到,所以为了区别在多信号量集合中键值的唯一性,可以通过手动指定的方法,也可以使用 ftok 函数

      key_t ftok(const char *pathname, int proj_id);
    • nsems : 表示创建的信号量集合中含有的信号量的个数(该参数对非创建调用无效)

    • semflg : 表示权限或操作类型,和Linux文件系统中的权限类似,八进制数表示时,低位起第一位表示创建者的操作权限,第二位表示同组用户的操作权限,第三位表示其他用户的操作权限。4 为 可读,2 为可写, 6 为可读写,0 为不可读写

2.2. semop - Manual / OpenGroup / die.net

  • 该函数用于更改信号量的值,具有原子性

  • 函数原型为

    int semop(int semid, struct sembuf *sops, size_t nsops);
    • semid : 表示信号量标识符

    • sops : 操作结构体,在每一个 sembuf 结构体中,包含下列成员

      struct sembuf {
        unsigned short int sem_num;   /* semaphore number */
        short int sem_op;     /* semaphore operation */
        short int sem_flg;        /* operation flag */
      };
      • sem_num : 表示要操作的信号量的标识符
      • sem_op : 表示操作
      • sem_op > 0 时,表示进程解除对当前资源的占用状态,若在 sembuf 中的 sem_flg 值为 SEM_UNDO ,则 semval 会减去 sem_op 的绝对值

      具体的可以移步OpenGroup去看,每一个 sops 指定的操作都会在 semidsem_num 对应的信号量上执行

    • nsops : 数组长度,即操作个数

    • 操作成功返回0, 否则返回-1报错

2.3. semctl - Manual / OpenGroup

  • 函数用于提供 cmd 规定的一系列信号量操作控制, 精确控制信号量的复杂变化,具有原子性

  • 函数原型为

    int semctl(int semid, int semnum, int cmd, [union semun] ...);
    • cmd : 表示动作控制指令,常用有如下值

      • IPC_RMID : 删除信号量
      • GETALL : 读取信号量集合中的所有信号量的值
      • GETVAL : 读取信号量集合中的某一信号量的值
      • SETALL : 设置信号量集合中的所有信号量的值
      • SETVAL : 设置信号量集合中的某一信号量的值
    • 使用 cmd 对信号量进行操作时,需要使用函数的第四个变量 (需要自己在代码中某一位置声明)

      union semun {
          int val;
          struct semid_ds *buf;
          unsigned short  *array;
      } arg;

3. 基于信号量机制实现生产者消费者

  • 代码存在了Github,可以通过curl 获取,有BUG的话发PR呀,谢谢~ 传送门
/**
 * @file PandC.c
 * @author FancyKing ([email protected])
 * @brief 使用信号量机制与共享内存模拟生产者消费者问题
 *        你可以在这里看到实验结果的截图
 *        https://imgchr.com/i/Ejptx0
 *        https://imgchr.com/i/EXz1OS
 * @version 1.0
 * @date 2019-05-18 - 2019-05-19
 *
 * @copyright Copyright (c) 2019
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>

#define N 10

/**
 * @brief 由于使用了 'semctl' https://linux.die.net/man/2/semctl
 *        所以在此补充定义 semun 类型
 */
union semun {
  int val;
  struct semid_ds *buf;
  unsigned short *array;
};

/**
 * @brief 初始化 semid 信号量的值为 init_value
 *
 * @param semid 信号量标识符,使用 'semget' 的返回值得到
 * @param init_value 欲初始化的信号量初始值
 */
void initialize_value_of_sem(int semid, int init_value) {
  union semun sem_union_temp;
  sem_union_temp.val = init_value;
  if (semctl(semid, 0, SETVAL, sem_union_temp) == -1) {
    printf("Initialize the value of semaphore ERROR\n");
    exit(1);
  }
}

/**
 * @brief 依据信号量标识符删除信号量
 *
 * @param semid 信号量标识符,使用 'semget' 的返回值得到
 */
void delete_sem(int semid) {
  union semun sem_union_temp;
  if (semctl(semid, 0, IPC_RMID, sem_union_temp) == -1) {
    printf("Delete the semaphore ERROR\n");
    exit(1);
  }
}

void signal(int semid) {
  struct sembuf sembuf_temp;
  // sem_flg 设置为 SEM_UNDO,表示在进程退出时,撤销生存周期所做的所有操作
  // 避免因为进程异常退出而造成的死锁
  sembuf_temp.sem_flg = SEM_UNDO;
  // 信号量集合中的信号量编号,从 0 开始,所以该处表示有一个信号量
  sembuf_temp.sem_num = 0;
  // 将信号量的值 +sem_op,设置为正数即为增加信号量的值
  // 若执行完毕,存在信号量的值为非负数,则资源可用
  sembuf_temp.sem_op = 1;
  // semop 的参数依次为 信号量标识符, 操作结构体, 操作信号量个数
  if (semop(semid, &sembuf_temp, 1) == -1) {
    printf("Signal ERROR\n");
    exit(1);
  }
}

/**
 * @brief 对 semop 函数进行封装
 *
 * @param semid 欲操作信号量的标识符
 */
void wait(int semid) {
  struct sembuf sembuf_temp;
  // sem_flg 设置为 SEM_UNDO,表示在进程退出时,撤销生存周期所做的所有操作
  // 避免因为进程异常退出而造成的死锁
  sembuf_temp.sem_flg = SEM_UNDO;
  // 信号量集合中的信号量编号,从 0 开始,所以该处表示有一个信号量
  sembuf_temp.sem_num = 0;
  // 将信号量的值 +sem_op,设置为负数即为减少信号量的值
  // 若执行完毕,存在信号量的值为负数,则为进程阻塞,直到资源可用
  sembuf_temp.sem_op = -1;
  // semop 的参数依次为 信号量标识符, 操作结构体, 操作信号量个数
  if (semop(semid, &sembuf_temp, 1) == -1) {
    printf("Wait ERROR\n");
  }
}

// 共享内存中的结构体
struct shared_data {
  int in, out;
  char buff[N];
  FILE *fp_in;
  FILE *fp_out;
};
struct shared_data *shared;
int empty, full, mutex;
int shared_memory_id;
void *shared_memory_addr;

int producer() {
  wait(empty);
  wait(mutex);
  // 进入临界区

  // 共享数据读入字符
  char ch = fgetc(shared->fp_in);
  // 已经读到文件尾部
  if (ch == EOF) {
    signal(mutex);
    return 0;
  }
  // 保存读取的资源,记录循环标号
  shared->buff[shared->in] = ch;
  printf("%c\n", ch);
  shared->in = (shared->in + 1) % N;

  signal(mutex);
  signal(full);
  // 退出临界区

  return 1;
}

int consumer() {
  wait(full);
  wait(mutex);
  // 进入临界区

  // 打印循环标号,取出共享数据
  // printf("out_id: %d:", shared->out);
  char out_char = shared->buff[shared->out];
  if (out_char == EOF) {
    signal(mutex);
    return 0;
  }
  // 写入文件,输出流刷新,并打印到终端
  shared->out = (shared->out + 1) % N;
  fprintf(shared->fp_out, "%c", out_char);
  fflush(shared->fp_out);
  printf("%c\n", out_char);

  signal(mutex);
  signal(empty);
  // 退出临界区

  return 1;
}

int main(int argc, char const *argv[]) {
  // 申请共享内存,如果要打开一个已经存在的合法共享内存, shmflg 可为 0
  // 参数分别为 共享内存标识符,相关数据结构体,
  shared_memory_id =
      shmget(12345, sizeof(struct shared_data), 0666 | IPC_CREAT);

  // shmat 三个参数分别为 共享内存标识符,指定连接地址,动作标记,挂接共享内存
  // 当 shmaddr 为 NULL,则系统自动选择一个地址
  shared_memory_addr = shmat(shared_memory_id, 0, 0);
  shared = (struct shared_data *)shared_memory_addr;
  // 打开文件输入输出流
  shared->fp_in = fopen("in", "r");
  shared->fp_out = fopen("out", "w");
  if (shared->fp_in == NULL) {
    printf("Read input file ERROR");
    return 0;
  }
  shared->in = 0;
  shared->out = 0;
  // 得到信号量标识符,便于后续控制
  empty = semget(3000, 1, 0666 | IPC_CREAT);
  full = semget(3001, 1, 0666 | IPC_CREAT);
  mutex = semget(3002, 1, 0666 | IPC_CREAT);
  // 按照实际情况初始化信号量数值
  initialize_value_of_sem(empty, N);
  initialize_value_of_sem(full, 0);
  initialize_value_of_sem(mutex, 1);

  // 进程ID,其实貌似也就是个int
  pid_t pidone, pidtwo;
  while ((pidone = fork()) == -1)
    ;
  if (pidone > 0) {
    while ((pidtwo = fork()) == -1)
      ;
    do {
      printf("Producing below ");
    } while (producer());
  } else {
    do {
      printf("\t\t  Recieveing below ");
    } while (consumer());
    printf("\n");
  }

  fclose(shared->fp_out);
  fclose(shared->fp_in);

  return 0;
}

标签: Linux, OS

添加新评论