Message Queue
메시지 큐는 파이프하고 비슷한 건데, 파이프처럼 데이터를 주고받을 수 있음
다른 프로세스로 메시지를 send 하거나 다른 프로세스로부터 메시지를 receive 하도록 허용해주는 ipc라 하는 거임
파이프는 bytestream으로 데이터를 주고 받음 데이터의 경계가 없음 메시지와 메시지 간 경계가 없음
메시지 큐로 데이터를 주고받을 때에는 단위가 메시지라는 단위로 주고받아서 메시지간 분명한 경계가 있음
그리고 그것을 메시지들을 구분해서 커널에서 저장할 때, 그걸 링크드 리스트 형태로 커널에서 저장함
struct msqid_ds { /* <sys/msg.h> */
struct ipc_perm msg_perm; /* see ch08_ipc1-p.29 */
struct msg *msg_first; /* ptr to first message on queue */
struct msg *msg_last; /* ptr to last message on queue */
msglen_t msg_cbytes; /* current #bytes on queue */
msgqnum_t msg_qnum; /* # of messages on queue */
msglen_t msg_qbytes; /* max # of bytes on queue */
pid_t msg_lspid; /* pid of last msgsnd() */
pid_t msg_lrpid; /* pid of last msgrcv() */
time_t msg_stime; /* last-msgsnd() time */
time_t msg_rtime; /* last-msgrcv() time */
time_t msg_ctime; /* last-change time */
};
메시지 큐는 ipc로서 identifier로 특정 지정이 됨
파일의 inode 정보를 담고 있는 구조체 struct stat가 있듯이 메시지 큐의 정보를 담는 struct msgid_ds라는 구조체
이 구조체의 값은 msgctl로 메시지를 가져오거나 변경해서 쓸 수 있음
sys/msg.h 에 들어있음. 여러개의 멤버가 있는데 첫번째 멤버는 공통적으로 들어있는 permission임
이거는 앞 동영상에서 본 적이 있음 다섯개의 멤버가 있는데 msg_perm에는 다섯번째에 mode가 있었음 readwrite
첫번째 두번째는 owner의 uid, gid 세번째 네번째는 creator의 uid gid 가 들어감
이게 msgid_ds의 첫번째 멤버임 두번째 세번째 멤버는 메시지가 커널 안에 링크드리스트 형태로 저장된다고 함
linked list의 첫번째 메시지와 마지막 메시지를 표현하는 포인터가 있음 그게 두번째, 세번째 멤버임
double linked list로 표현돼서 첫번째 마지막 포인터만 있으면 중간의 메시지를 얼마든지 찾을 수 있음
그리고 세번째는 큐 안에 메시지들에 있는 total byte를 전부 더한 것임 메시지 큐의 전체 사이즈라고 보면 됨
qnum은 메시지 갯수가 됨 그리고 qbyte는 큐안에 잇는 최대 사이즈임
send를 하고 receive를 하는데 가장 마지막, 가장 최근에 send한 프로세스가 누구냐 정보를 가짐
그리고 가장 최근에 send나 receive 한 시간이 언제인지도 있음
그리고 메시지 큐가 가장 최근에 변경된 시간이 언제냐?! 하는 것도 있음
이런 정보들이 이 메시지 큐에 대한 정보라고 할 수 있고 이게 msgid_ds라는 struct 구조 안에 있음
여기에 있는 데이터들은 msgctl로 가져오고 변경하고 업데이트 할 수 있음
메시지 큐는 메시지들의 linked list로 되어있다고 함
single 인데 쭉 따라가면 중간 메시지를 찾을 수 있음
last가 맨 마지막을 가리키고 있음 last를 쭉 따라가서 last를 체크하면 됨
The msgget(2) system call
#include <sys/msg.h>
int msgget(key_t key, int flag);
// returns : identifier if ok, -1 on error
파일을 open 해서 파일디스크립터를 얻듯이 msgget이라는 시스템콜을 사용함
key를 넣어주고 permission이나 option을 flag로 넣어주고 msgget하면 identifier가 반환됨
flag에는 read write permission에 대한 정보가 들어감
key에는 key를 주는 세가지 방법이 있음
첫번째는 값을 직접 주는 방법 - 위험함 유니크하게 지정하지 못할 가능성이 높음.
두번째는 ftok로 pathname과 identifier를 넣어서 더 유니크하게 얻는 방법은 있지만 백퍼센트는 아님
세번째로 key 값에 ipc_private 넣으면 return 되는 identifier 값이 무조건 유니크한 값임
기존에 이미 있는 key라면 permission 체크해서 허용이 되면 return 하겠지만 permission 만족하지 않으면 EACCES 에러
EEXIST 에러는 flag에다가 create하고 exclusive 를 동시에 줄수도 있는데, 그 경우에는 key 가 있으면 open 실패
ENOENT 는 ipc_creat이런게 없다면 지정한 key 가 없으면, 없는데 get하라고 했다고 나오는 에러임
ENOSPC 는 메시지큐 최대 갯수 한계에 도달하면 더이상 만들지 못한다고 나오는 에러 메시지
ipc는 새로운 key 값으로 새로운 ipc를 get해서 만들고 ctl로 명령을 remove명령으로 삭제하지 않으면,
cmd 레벨에서 그 key를 삭제하지 않으면 시스템에 그대로 남아있음
만들고 쓰고 필요 없으면 항상 지워야함 그러지 않으면 돌때마다 key 가 만들어질 수 있음
여러 번 만들게 되면 key 가 계속 쌓임 그럼 시스템 한계를 넘어서 이런 에러가 나올 수 있음 ENOSPC
The msgsnd(2) system call
#include <sys/msg.h>
int msgsnd(int msqid, const void *ptr, size_t nbytes, int flag);
// returns : 0 if ok, -1 on error
identifier가 가리키는 메시지큐에 메시지를 쓴다는 거임
메시지는 ptr이 가리키는 곳에 메시지가 들어있음 근데 그 사이즈가 nbytes인거임
메시지는 항상 보내면 그 메시지끼리 링크드리스트로 연결되어있는 곳의 맨 끝에 붙음
struct mymesg {
long mtype; /* positive message type */
char mtext[1]; /* message data, of length nbytes */
};
ptr 구조체를 보면 두개의 멤버로 되어있음 첫번째 멤버는 long 타입의 mtype 메시지의 타입 번호가 들어있음
long은 8바이트임 mtext는 실제 데이터임 [1]이라고 되어있지만 그 사이즈만큼 들어있음
근데 그 사이즈하고 msgsnd의 아규먼트인 nbytes와 일치해야됨
flag 중 IPC_NOWAIT 기다려야 할 때 기다리지 말라는 뜻
pipe가 full 일때 write하는 프로세스가 block 됨
마찬가지로 메시지큐도 msgsnd를 할때 메시지큐가 최대사이즈가 꽉 찼을 때 block 이 됨
flag옵션에 nowait 옵션을 넣어주면
msg큐 사이즈가 full이 돼서 메시지큐 send가 블락이 되어야 할 상황에서도 블락이 안된다는 것
send를 못한거고 실패를 해서 error msg
The msgrcv(2) system call
#include <sys/msg.h>
ssize_t msgrcv(int msqid, void *ptr, size_t nbytes, long type, int flag);
// returns : size of data portion of message if ok, -1 on error
id가 가리키는 메시지큐로부터 메시지를 가져오는데 그 메시지 최대 사이즈가 nbytes임
메시지 버퍼가 ptr임 여기에 쓰는 거임 근데 메시지 타입이있음 근데 그 타입에 따라서 작동하는 방식이 세개 있음
flag가 있고 ipc nowait가 있고 msg noerror 가 있음
type == 0 : 메시지 큐에서 첫번 째 message를 받아온다.
type > 0 : 메시지 큐에서 주어진 type과 동일한 message를 받아온다.
type < 0 : 주어진 type보다 작은 type의 message들 중 가장 작은 type을 갖는 message를 받아온다. (이를 이용해 우선순위 큐를 만드는것이 가능하다)
flag에는 IPC_NOWAIT가 있는데 msgrcv도 큐에서 받으려는데 큐에 메시지가 하나도 없으면 블락이 돼서 기다리게 됨
그 상황에도 불구하고 기다리지 말라고 하는거임 그러면 실제로는 메시지 큐를 못얻어서 실패했다고 -1 리턴
또 다른 flag로 MSG_NOERROR 가 있음
nbytes는 메시지를 받을 때 nbytes를 최대 사이즈로 지정해서 이 사이즈 보다 작은 사이즈만 받으라는 거
The msgctl(2) system call
#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
// returns : 0 if ok, -1 on error
id 지정하고 cmd 하고 메시지 큐의 정보를 저장하는 데이터 구조 buf 가 있음
cmd 에는 IPC_STAT이 명령을 주면, msqid_ds 의 정보를 이 구조에 대한 정보를 포인터로 버퍼에 가져오라는 거임
그 값을 변경하고 save할 수 있어야함 그게 IPC_SET
IPC_RMID는 아이디를 다 쓰고 지우는 명령
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
#include <errno.h>
#define QKEY (key_t)0105 /* 큐의 키를 식별한다 */
#define QPERM 0660 /* 큐의 허가 */
#define MAXOBN 50 /* 개체 이름의 최대길이 */
#define MAXPRIOR 10 /* 최대 우선 순위 수준 */
struct q_entry {
long mtype;
char mtext [MAXOBN+1];
};
int warn (char *s){
fprintf (stderr, "warning: %s\n", s);
}
int init_queue(void){/* init_queue -- 큐 식별자를 획득한다. */
int queue_id;
if ( (queue_id=msgget (QKEY,IPC_CREAT|QPERM))==-1) perror("msgget failed");
return (queue_id);
}
q_entry 메시지가 이렇게 있다고 구조체를 씀 maxobn +1<- 맨끝에null chararcter때문에 있는거고
최대 50자까지 메시지에 들어간다고 함
init queue는 큐를 처음 만드는 거임 없을 때 만듦
두번째 부르면 이미 만들어져서 open 만 하면됨
get을함 key 값을 여기서 주어진 0105로 함
flag로 ipc_creat 줌
permission은 0660 줌
ipc_creat : 없으면 만들어서 identifier를 return 함 처음에는 없는거를 만들거임
두번째 부를때는 이미 있어서, 있는거는 그냥 get하면 됨 그래서 얻어진 identifier를 return 해주는 함수임
int enter (char *objname, int priority){
int len, s_qid;
struct q_entry s_entry; /* 메시지를 저장할 구조 */
if ( (len = strlen(objname)) > MAXOBN){ /* 이름의 길이, 우선순위 수준을 확인한다. */
warn ("name too long");
return (-1);
}
if (priority > MAXPRIOR || priority < 0){
warn ("invalid priority level");
return (-1);
}
if ( (s_qid=init_queue())==-1) return (-1); /* 필요에 따라 메시지 큐를 초기화한다. */
/* s_entry를 초기화한다. */
s_entry.mtype = (long) priority;
strncpy (s_entry.mtext, objname, MAXOBN);
/* 메시지를 보내고, 필요할 경우 기다린다. */
if (msgsnd (s_qid, &s_entry, len, 0) == -1){
perror ("msgsnd failed");
return (-1);
} else
return (0);
}
enter는 최종적으로 msgsnd하는 함수임
id가 있고, 아까 본 msg의 저장 구조체가 있음 그래서 argument로 메시지에 저장할 문자열을 받음
그 길이를 strlen으로 받음 그게 최대 길이 50바이트보다 크면 안됨 그럼 에러메시지 내고 끝냄
메시지큐보다 작으면 내려와서 prioirity를 받음 max보다 크면 안됨 그리고 0보다 작으면 안됨
메시지도 50보다 작은 사이즈고 prioirty도 0~10이면 init_queue해서 가져옴 id가 리턴됨
type 에다가 메시지에 들어가는 장소에 두가지가 있는데 type이고 text임
type은 가져온 priority를 넣어줌. 그리고 text는 메시지가 들어감 obj를 넣으면 됨 50보다 작았음
strncpy는 두번재꺼를 첫번재꺼에 카피하는데 최대 세번째꺼만큼 하라는 거임
실패하면 에러메시지, 실패하지 않으면 성공적으로 snd 한거임
int proc_obj (struct q_entry *msg){
printf ("\npriority: %ld name: %s\n", msg―>mtype, msg―>mtext);
}
int serve (void){
int mlen, r_qid;
struct q_entry r_entry;
/* 필요에 따라 메시지 큐를 초기화한다. */
if ((r_qid = init_queue()) == -1) return (-1);
/* 다음 메시지를 가져와 처리한다. 필요하면 기다린다. */
for (;;){
if ((mlen=msgrcv(r_qid, &r_entry, MAXOBN,(-1*MAXPRIOR), MSG_NOERROR))== -1){
perror ("msgrcv failed");
return (-1);
}
else {
/* 우리가 문자열을 가지고 있는지 확인한다. */
r_entry.mtext[mlen]='\0';
/* 객체 이름을 처리한다. */
proc_obj (&r_entry);
}
}
}
/* etest -- 큐에 객체 이름을 넣는다. */
#include <stdio.h>
#include <stdlib.h>
#include "q.h"
main (int argc, char **argv){
int priority;
if (argc != 3){
fprintf (stderr, "usage: %s objname priority\n", argv[0]);
exit (1);
}
if ((priority = atoi(arvg[2])) <= 0 || priority > MAXPRIOR){
warn ("invalid priority");
exit (2);
}
if (enter (argv[1], priority) < 0){
warn ("enter failure");
exit (3);
}
exit (0);
}
argc는 3아니면 에러임. $ ./a.out arg1 arg2
/* stest -- 큐를 위한 단순한 서버 */
#include <stdio.h>
#include "q.h"
main(){
pid_pid;
switch (pid = fork()){
case 0: /* 자식 */
serve();
break; /* 실제로는, 서버는 결코 퇴장(exit)하지 않음 */
case -1:
warn ("fork to start server failed");
break;
default:
printf ("server process pid is %d\n", pid);
}
exit( pid != -1 ? 0 : 1);
}
#include <sys/types.h> /* showmsg -- 메시지 큐의 자세한 정보를 보인다. */
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <time.h>
void mqstat_print (key_t, int, struct msqid_ds *);
main( int argc, char **argv){
key_t mkey;
int msq_id;
struct msqid_ds msq_status;
if (argc != 2){
fprintf (stderr, "usage: showmsg keyval\n"); exit (1);
}
mkey = (key_t) atoi (argv[1]); /* 메시지 큐 식별자를 얻는다. */
if (( msq_id = msgget(mkey, 0)) == -1){
perror( "msgget failed"); exit (2);
}
if (msgctl(msq_id, IPC_STAT, &msq_status) == -1){ /* 상태 정보를 얻는다. */
perror ("msgctl failed"); exit (3);
}
mqstat_print (mkey, msq_id, *msq_status); /* 상태 정보를 프린트한다. */
exit (0);
}
mqstat_print (key_t mkey, int mqid, struct msqid_ds *mstat){
printf ("\nKey %d, msg_qid %d\n\n", mkey, mqid);
printf ("%d message(s) on queue\n\n", mstat->msg_qnum);
printf ("Last send by proc %d at %s\n", mstat->msg_lspid, ctime (&(mstat->msg_stime)));
printf ("Last recv by proc %d at %s\n", mstat->msg_lrpid,
ctime (& (mstat->msg_rtime)));
}
이게 pirnt 하는 함수임
키값 id 데이터 스트럭쳐가 들어있음
mstat의 msg_qnum : 메시지 안에 있는 총 몇개가 있는지 메시지가
mstat의 msg_lspid : ls는 last send 임 이 메시지큐에 가장 마지막에 send 한 프로세스 아이디가 뭐냐 이거임
그리고 lrpid는 last receive pid 말하는 거임 가장 최근에 receive 한 프로세스 pid 가 뭐냐는 거임
stime은 send 한 time 을 말함 msgsnd를 실행한 time이 언제냐는거임 ctime은 library임 이걸 formatting 해서
문자열로 변환해주는 거임 rtime은 receive 한 타임을 말하는 거임 그 정보를 보여주는 예제임
Semaphore
#include <sys/sem.h>
struct semid_ds {
struct ipc_perm sem_perm;
struct sem *sem_base; /* ptr to array of semphores in set */
ushort_t sem_nsems; /* # of semaphores in set */
time_t sem_otime; /* last-semop() time */
time_t sem_ctime; /* last-change time */
};
struct sem {
ushort_t semval; /* semaphore value, nonegative*/
short sempid; /* PID of last successful semop(), SETVAL, SETALL*/
ushort_t semncnt; /* # awaiting semval > current value */
ushort_t semzcnt; /* # awaiting semval = 0 */
};
System V 세마포는 semaphore element로 구성된 set이다. 각 element는 struct sem의 구조를 갖는다.
한번의 시스템 콜로 전체 set에 대한 오퍼레이션을 수행한다.
semid_ds는 이 데이터 구조체를 가리키는 건데 두번째가 실질적인 어레이임 세번째는 세마포어 갯수임
그럼 base 어레이 사이즈가 2가 되는 거임 그 nsem 이 2이면
그래서 그 sem base의 어레이의 갯수가 네 개라고 하면 sem value가 각 네 개씩 있는 거임
The semget(2) system call
#include <sys/sem.h>
int semget(key_t key, int nsems, int flag);
// returns : semaphore identifier if ok, -1 on error
key 값을 넣고 flag도 비슷함 그리고 nsems라는 거는 세마포어 갯수
#define PERMS (S_IRUSR | S_IWUSR)
int semid;
if ((semid = semget(IPC_PRIVATE, 3, PERMS)) == -1)
perror("Failed to create new private semaphore");
get하고 ipc_private은 설명했었음 key를 커널에게 선택해서 주도록 해주는 거임 유니크한 키를
id가 넘어오는데, 세마포어 set에는 세마포어가 세개가 있음
#include <errno.h>
#include <sys/sem.h>
#include <sys/stat.h>
#define PERMS (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH)
#define SET_SIZE 2
int main(int argc, char *argv[]) {
key_t mykey;
int semid;
if (argc != 3) {
fprintf(stderr, "Usage: %s pathname id\n", argv[0]);
return 1;
}
if ((mykey = ftok(argv[1], atoi(argv[2]))) == (key_t)-1) {
fprintf(stderr, "Failed to derive key from filename %s:%s\n", argv[1], strerror(errno));
return 1;
}
if ((semid = semget(mykey, SET_SIZE, PERMS | IPC_CREAT)) == -1) {
fprintf(stderr, "Failed to create semaphore with key %d:%s\n", (int)mykey, strerror(errno));
return 1;
}
printf("semid = %d\n", semid);
return 0;
}
키가 있고 id 가 있음 argc가 3아니면 에러남 $ ./a.out pathname identifier 이렇게 들어와야함
ftok 해서 들어오는 pathname로 지정해줌, 그리고 id를 int로 바꿈 -1이면 실패해서 끝내버림
성공적으로 유니크하게 키값을 가져오면 semget해서 id를 return 해야하는데
그 안에 sem 이몇개가 있나 봤는데 set_size 2로 define 해놓음 perm 도 define 해줌
ipc_creat면 이 key가 가리키는 sem이 이미 있으면 그냥 get하고 없으면 만들어서 get하는 거임 아무튼 id가 return 됨
The semctl(2) system call
#include <sys/sem.h>
int semctl(int semid, int semnum, int cmd, .. /*union semun arg*/);
semctl은 주어진 id가 있으면 여기에서 cmd를 세번째 argument에 주고
그 cmd의 종류에 따라서 두번째 num 몇 번째 sem을 말하는 건지 특정 세마포어 번호를 말함
이 cmd가 특정 sem에 대한 명령일때 세마포어 번호가 필요함.
만약 전체 sem 에 대한 cmd면 num 이 필요없음
그리고 자료구조가 sem 에 따라 가변적임 union이 가변적이라서 union으로 되어있음
union semun {
int val; /* for SETVAL */
struct semid_ds *buf; /* for IPC_STAT and IPC_SET */
unsigned short *array; /* for GETALL and SETALL */
};
union에는 struct랑 비슷한데 어떻게 다르냐면 여기에 들어가는 변수가 메모리를 공유해서 겹쳐있는거임
메모리를 하나만 사용함 overwrite하고 겹치는 변수임 어느 순간에 변수 하나만 사용하는거임
value, buffer, array 매번 cmd에 따라서 사용되는 셋 중의 사용되는 변수가 달라짐
cmd 에 따라서 사용될 수 있는 특정 세마포어를 지정하기도하고 아니기도하고,
cmd 에따라서 사용되는 변수가 다름
val은 setval 하는 명령을 쓸때 사용됨 세마포어의 값
그리고 ipc_stat이나 ipc_set 같은 명령을 사용할 대 struct semid_ds 를 가진 포인터를 사용함
그리고 명령이 getall이나 setall이면 getval은 특정 한개의 sem 의 값을 set하거나 get하는 거임
근데 get all 이나 setall은 모든 sem의 값을 get하거나 set하는 거임 이런 차이가 있음
ipc_stat, set, rmid 정보를 가져오고 변경하고 이럴 때 union의 semid_ds를 씀
명령이 getval setval 이거는 특정 sem 를 지정해줘야함 두번째 argument에 지정함
네번째 argument에는 val을 사용
밑에는 안봐도 됨
getall setall 은 array를 사용함
#include <sys/sem.h>
int initelement(int semid, int semnum, int semvalue) {
union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
} arg;
arg.val = semvalue;
return semctl(semid, semnum, SETVAL, arg);
/* return semctl(semid, semnum, SETVAL, semvalue); */
}
initialize element semid 가 여러개 있음 두번째 argument에 특정 sem 번호를 지정함
그 value를 setting 해주는 setval 해주는 명령임 id 가가리키는 세마포어의 특정번호의 세마포어의 val에
초기 값을 넣어주라는 말임
#include <sys/sem.h>
int removesem(int semid) {
return semctl(semid, 0, IPC_RMID);
}
removesem 은 전체 지우는 거라서 특정 세마포어 필요없고 id 하고 rmid 하는거임
semctl(semid, 0, IPC_RMID)는 0번 element를 지우는?
IPC_RMID를 사용할 경우 semnum은 무시됨. 특정 element를 지우는 것이 아님.
'CS > 유닉스프로그래밍' 카테고리의 다른 글
14. 유닉스 - 소켓 (0) | 2020.12.10 |
---|---|
13. 유닉스 IPC - semaphore, shared memory (0) | 2020.12.10 |
11. 유닉스 IPC - IO multiplexing (0) | 2020.12.08 |
10. 유닉스 IPC - PIPE, FIFO (0) | 2020.12.08 |
9. 유닉스 시그널 프로세싱 - 2 (0) | 2020.12.07 |