streem language core analysis Tag201503
strm_struct
struct strm_stream{
strm_task_mode mode; //생성/필터/소비 중의 하나
unsigned int flags; //flag
strm_func start_func; //개시 함수
strm_func close_func; //후처리 함수
void* data; //스트림 고유 데이터
strm_stream* dst; //출력 스트림
strm_stream* nextd; //출력 링크
};
strm_loop()
- I/O 대응 후 이벤트를 발생시킨다.
- 발생한 이벤트에 대응하는 처리를 수행한다. 입력이라면 데이터를 읽어 들이고 나누는 작업 등이 된다.
- 이벤트 처리 결과를 파이프라인 다음 스트림으로 전송하고, 전송받은 쪽의 처리를 실행
- 이후 반복 수행
epoll의 경우 select 시스템콜과 달리 epoll에서 받은 이벤트 정보가 구조체에 전달되기 떄문에, 수신 대기 중인 I/O의 수를 고려할 필요가 없다.
초기화 단계에서 스트림(strm_stream)이 생성된다. → 스트림 결합 후 파이프라인 구성
I/O 대기 스트림은 epoll을 사용해 I/O 대기에 등록
I/O 스트림의 경우 epoll_wait를 이용해서 I/O를 기다려 데이터가 도착한 스트림을 태스크 큐에 등록
테스크 큐에 등록되는 데이터는 3개이나 첫 번째 스트림에 전달할 데이터는 없으므로 NULL이 전달
이벤트 처리 루프에서는 작업 큐에서 하나씩 정보를 검색하고 함수를 실행한다.
함수중에는(입력, 가공, 출력이 포함된다.)
strm_emit()
함수에서 다음 스트림에 데이터를 전달해야할 때는 strm_emit()함수를 실행한다.
3개의 인수를 필요로 한다.
현재 작업에 해당하는 trm_stream구조체, 전달할 데이터, 전달 후 이 하뭇에 이어서 수행될 콜백 함수
다음 스트림에 전달된다고 해서 그 스트림이 바로 실행되는것이 아닌 작업 큐에 있어야 한다.
처리 함수의 패턴
콜백에 의한 암묵적 루프
io.c의 read_cb, readline_cb
strm_emit()함수가 수행될 때 콜백함수에 동일한 함수를 지정하여 암묵적인 루프 구성
전달 받은 요소를 그대로 처리
main.c의 map_recv(), write_cb() : 전달 받은 요소의 하나씩 처리만 하고 자기 자신은 반복하지 않는 패턴 strm_emit()콜백에 NULL 지정
루프 내에서의 emit
X
Source Code Analysis
main.c main()
int main(int argc, char **argv)
{
strm_stream *strm_stdin = strm_readio(0 /* stdin*/);
strm_stream *strm_map = strm_funcmap(str_toupper);
strm_stream *strm_stdout = strm_writeio(1 /* stdout */);
/*
스트림을 구성한다. -> 각 각의 스트림을 strm_stream구조체에 만들어 넣는다.
*/
strm_connect(strm_stdin, strm_map);
strm_connect(strm_map, strm_stdout);
strm_loop();
return 0;
}
strm.h struct strm_stream
## strm_struct
struct strm_stream{
strm_task_mode mode; //생성/필터/소비 중의 하나
unsigned int flags; //flag
strm_func start_func; //개시 함수
strm_func close_func; //후처리 함수
void* data; //스트림 고유 데이터
strm_stream* dst; //출력 스트림
strm_stream* nextd; //출력 링크
};
io.c strm_readio(0)
strm_stream* strm_readio(int fd)
{
struct fd_read_buffer *buf = malloc(sizeof(struct fd_read_buffer));
buf->fd = fd;
buf->beg = buf->end = buf->buf;//char buf[1024]의 포인터를 가리킴
return strm_alloc_stream(strm_task_prod, stdio_read, read_close, (void*)buf);
}
#stdio_read, read_close 등등 함수 볼 필요있음
io.c fd_read_buffer struct
struct fd_read_buffer {
int fd;
char *beg, *end; //char buf[1024]의 포인터를 가리킴
char buf[1024];
};
core.c strm_alloc_stream()
argument
mode: 생성자 or 필터자 or 소비자 인지 확인
start_func: 개시함수
close_func: 후처리(끝낼때) 함수
data: 스트림 고유 데이타 (readio)에서 넘긴 data는 입력에 필요한 data들 집합 넘긴듯
static strm_queue *task_q;
strm_stream* strm_alloc_stream(strm_task_mode mode, strm_func start_func, strm_func close_func, void *data)
{
strm_stream *s = malloc(sizeof(strm_stream));
s->mode = mode;
s->start_func = start_func;
s->close_func = close_func;
s->data = data;
s->dst = NULL;
s->nextd = NULL;
s->flags = 0;
if (!task_q) { //작업 큐의 생성
task_q = strm_queue_alloc();
}
return s;
}
queue.c strm_queue_alloc()
strm_queue* strm_queue_alloc()
{
strm_queue *q = malloc(sizeof(strm_queue));
pthread_mutex_init(&q->mutex, NULL);
pthread_cond_init(&q->cond, NULL);
q->fi = q->fo = NULL;
return q;
}
작업큐를 생성한다.
queue.c struct strm_queue
struct strm_queue {
pthread_mutex_t mutex;
pthread_cond_t cond;
struct strm_queue_entry *fi, *fo;
};
struct strm_queue_entry {
strm_stream *strm;
strm_func func;
void *data;
struct strm_queue_entry *next;
};
작업큐의 struct이다. strm_queue가 시작이고 trm_queue_entry로 실제 작업을 의미하는듯하다. (연결리스트 느낌)
strm_queue는 strm_queue_entry를 관리하는 구조체이다.
strm_queue_entry는 실제 스트림을 관리하는 구조체이다.
strm.h
/* ----- core */
typedef enum {
strm_task_prod, /* Producer */
strm_task_filt, /* Filter */
strm_task_cons, /* Consumer */
} strm_task_mode;
io.c strm_writeio(1)
strm_stream* strm_writeio(int fd)
{
return strm_alloc_stream(strm_task_cons, write_cb, write_close, (void*)fd);
}
main.c strm_funcmap(str_toupper)
strm_stream* strm_funcmap(void *(*func)(strm_stream *, void*))
{
return strm_alloc_stream(strm_task_filt, map_recv, NULL, func);
}
//인자 func은 str_toupper함수의 포인터인데 왜 data인자쪽에 있는지 잘모르겠음
//해당 스트림에 대한 struct allocate 진행
//개시 함수로 map_recv 함수 진행, 후처리함수는 없음
//map_recv인 개시함수에서 실제 수행할 함수를 매핑한것인가
//strm->data에는 strtoupper 함수 포인터
static void map_recv(strm_stream *strm, void *data)
{
strm_map_func func = strm->data; //여기서 실제 기능을 수행하는 함수를 매핑한다.
void *d;
d = (*func)(strm, data);
strm_emit(strm, d, NULL);
}
//core.c
void strm_emit(strm_stream *strm, void *data, strm_func func)
{
strm_stream *d = strm->dst;
while (d) {
strm_task_push(d, d->start_func, data);
d = d->nextd;
}
if (func) {
strm_task_push(strm, func, NULL);
}
}
core.c
int strm_connect(strm_stream *src, strm_stream *dst)
{
strm_stream *s;
if (dst->mode == strm_task_prod) {
/* destination task should not be a producer */
return 0;
}
s = src->dst;
if (s) {
while (s->nextd) {
s = s->nextd;
}
s->nextd = dst;
}
else {
src->dst = dst;
}
if (src->mode == strm_task_prod) {
strm_task_push(src, src->start_func, NULL);
}
return 1;
}
void strm_task_push(strm_stream *s, strm_func func, void *data)
{
strm_queue_push(task_q, s, func, data);
}
//s는 prod의 strm이고 func도 prod의 start_fucn?
dst로 연결되어서 하나의 직력 파이프라인을 만들고 이 하나의 직렬 파이프라인을 queue에 넣는거 같은데 복합 연결일경우에만 nextd를 사용하는것 같음 이경우를 한번 찾아보자.
의문1. A | B | C로 되어있을떄 A B C 이 3개가 작업 큐에 들어가는 것이 아닌 A→B→C 이런형태를 만들고 A를 작업큐에 넣는것인가?
queue.c
void strm_queue_push(strm_queue *q, strm_stream *strm, strm_func func, void *data)
{
struct strm_queue_entry *e;
//queue의 fi와 fo는 무엇인가
//fo에는 queue_entry의 처음 큐 주소가 있고
//fi는 큐를 연결해주는 역할이네
if (!q) return;
//func은 start_func, data는 처리할것들
e = malloc(sizeof(struct strm_queue_entry));
pthread_mutex_lock(&q->mutex);
e->strm = strm; //prod's strm
e->func = func;
e->data = data;
e->next = NULL;
if (q->fi) {
q->fi->next = e;
}
q->fi = e;
if (!q->fo) q->fo = e;
pthread_cond_signal(&q->cond);
pthread_mutex_unlock(&q->mutex);
}
core.c
void strm_init_io_loop()
{
epoll_fd = epoll_create(10);
pthread_create(&io_worker, NULL, io_loop, NULL);
}
int strm_loop()
{
strm_init_io_loop();
for (;;) {
strm_task_exec();
if (strm_io_waiting() == 0 && !strm_queue_p(task_q)) {
break;
}
}
strm_queue_free(task_q);
return 1;
}
int strm_task_exec()
{
return strm_queue_exec(task_q);
}
strm_init_io_loop() 함수를 통해서 io event는 따로 thread를 만들고 loop를 돌린다.
io.c
int strm_io_waiting()
{
return (io_wait_num > 0);
}
queue.c
int strm_queue_p(strm_queue *q)
{
return q->fi != NULL;
}
int strm_queue_exec(strm_queue *q)
{
struct strm_queue_entry *e;
strm_stream *strm;
strm_func func;
void *data;
pthread_mutex_lock(&q->mutex);
if (!q->fo) {
pthread_cond_wait(&q->cond, &q->mutex);
}
e = q->fo;
q->fo = e->next;
if (!q->fo) q->fi = NULL;
pthread_mutex_unlock(&q->mutex);
strm = e->strm;
func = e->func;
data = e->data;
free(e);
(*func)(strm, data); //실제 함수 실행
return 1;
}
Source Code Flow Chart
Detail Analysis
function의 실행 시작은
[producer] start_func : stdio_read close_func : read_close data : fd_read_buffer structure
[filter] start_func : map_recv close_func : NULL data = func(stroupper function pointer)
[comsumer] start_func : write_cb close_func : write_close data : fd
static void stdio_read(strm_stream *strm, void *data)
{
struct fd_read_buffer *buf = strm->data;
strm_io_start_read(strm, buf->fd, read_cb);
}
void strm_io_start_read(strm_stream *strm, int fd, strm_func cb)
{
strm_io_start(strm, fd, cb, EPOLLIN);
}
static void strm_io_start(strm_stream *strm, int fd, strm_func cb, uint32_t events)
{
int n;
n = io_push(fd, strm, cb);
if (n == 0) {
io_wait_num++;
}
else {
if (errno == EPERM) { //?
/* fd must be a regular file */
/* enqueue task without waiting */
strm->flags |= STRM_IO_NOWAIT;
strm_task_push(strm, cb, NULL);
}
}
}
static int io_push(int fd, strm_stream *strm, strm_func cb)
{
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLONESHOT;
ev.data.ptr = io_task(strm, cb); //io가 발생했을 때 실행할 strm와 function setting
return epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev);
}
static struct io_task* io_task(strm_stream *strm, strm_func func)
{
struct io_task *t = malloc(sizeof(struct io_task));
t->strm = strm;
t->func = func; //read_cb
return t;
}
위의 stdio_read function의 실행흐름으로 보아 stdin을 위한 준비작업을 하고 epoll_ctl 시스템콜을 호출하는 것을 보아 입력 대기 이벤트에 걸어놓은것으로 추정 가능
Tip
epoll_event struct을 잘보면 events[i].data.ptr이라는 void* Pointer가 int data와 union으로 되어 있다. 이 void* ptr을 잘 쓰면 효과적인 데이터 전달이 가능하다.
A | B | C 같은 직렬 파이프 라인이 들어왔을 때의 알고리즘 로직도
각 각의 파이프라인으로 구성된 stream을 strm_stream이라는 구조체에 strm_alloc_stream함수를 통해 memory allocate를 시전한다. 이때 strm_alloc_stream이라는 함수내에서는 strm_stream 구조체 멤버변수에 알맞는 값을 세팅한다. /* mode : 해당 스트림이 producer인지, filter인지, comsumer인지 구별하는 값이 들어간다. flags : 몰라 start_func : 해당 스트림의 시작 함수 포인터를 개시한다. close_func : 해당 스트림이 끝났을 때의 함수 포인터를 개시한다. data : 해당 스트림의 데이터인데 조금 더 알아볼 필요가 있다. (정의된 함수라면 여기서 toupper같은 함수포인터가 들어가거나 stdin일 경우 읽어들일 버퍼의 주소값이 들어가기도 하던데 정확한것은 좀 더 파보아야함) dst : 지금까지 분석으로는 하나의 직렬 파이프라인을 3개의 스트림으로 쪼개었을 때 각 각의 스트림을 이어줄 필요가 있다. nextd : 이는 복합 파이프라인구성때 쓰인느거 같은데 자세히는 모르겠음. */
스트림이 나누어 졌다면 strm_connect()함수로 스트림들을 이어줄 필요가 있다. 해당 스트림들은 dst 구조체 멤버 포인터변수로 인해 Single linked List로 이루어진다.
이어진 스트림의 Producer의 Pointer를 작업 큐에 넣는다. 나머지는 연결되어 있으므로 Producer만 넣는거 같다.
큐에 넣을때는 좋은 방식을 사용하고 있다.
queue struct의 멤버변수에는 fi포인터와 fo포인터가 있는데 fo포인터가 작업큐의 첫 번째 작업 포인터를 가리키고 fi 포인터로 계속 삽입된 포인터를 연결해주고 있다. 두개의 포인터변수를 사용함으로 Single Linked List의 단점인 선형리스트를 순회하는 시간을 절역한다. O(n) -> O(1)
이제 strm_loop()함수를 볼차례이다.
strm_loop()는 큐의 목록이 없어질때 까지와 strm_io_waiting()이 False일 때까지 exec한다. strm_io_waiting함수는 현재 수신 대기중인 IO의 개수를 의미하는것 같다. strm_init_io_loop()를 thread로 둬서 io event처리를 따로 체킹하고 있다.
책에 나온 표시대로 진행되고 있는지는 아직 잘 모르겠음.
strm_queue는 strm_qeueu_entry의 첫 pointer와 각종 pthread mutex 개체를 관리한다. strm_queue_entry는 실제 작업 스트림이 Single Linked List형태로 들어가 있음. 작업 큐에 들어가 있는 개체의 형태는 각 각의 스트림이 들어가 있는 것이 아닌 따로 연결되는 것이 아닌 Prod-filter-comsumer 이형태의 파이프라인이 들어가 있음. strm_queue_entry.fo에는 producer의 Pointer가 담겨져 있음.
struct strm_queue { pthread_mutex_t mutex; pthread_cond_t cond; struct strm_queue_entry *fi, *fo; };
struct strm_queue_entry { strm_stream *strm; - prod의 strm strm_func func; - prod의 start_func void *data; - data인데 함수포인터로 쓸려나? struct strm_queue_entry *next; };
큐에 mutex_lock이랑 mutex_unlock을 자주 거는데 따로 pthread로 관리되는 io event때문임(수시로 큐에 넣기때문)
strm, func, data 변수를 선언하여 각 각의 값 대입 strm = prod's strm pointer func = prod's start func data = prod's data (*func)(strm, data); //executing function!
생각해보니까 위에는 C로 만들어진 인터프리터로 실행되는거네?
그럼 예제 프로그램을 확인해보자. stdin | {x->x.toupper()} | stdout
function의 실행 시작은
[producer] start_func : stdio_read close_func : read_close data : fd_read_buffer structure
[filter] start_func : map_recv close_func : NULL data = func(stroupper function pointer)
[comsumer] start_func : write_cb close_func : write_close data : fd
stdio_read -> strm_io_start_read -> strm_io_start -> io_push -> io_task -> epoll_ctl 하고 이작업은 끝나서 다음 strm_queue_exec 진행
내 가설은 계속 루프 돌다가 입력이 감지되었을 때 push되고 exec되는건데 왜 read가 준비되는거지 ㅇㅎ epoll_wait에서 만약 io가 감지되면 push_queue를 하는데 이 함수 내부에서 signal을 보냄 p->cond 그럼 exec queue에서 pthread_cond_wait(&q->cond, &q->mutex); 대기하다가 시그널이 옴 == 큐에 무언가가 있음하고 함수를 실행
strm_emit에는 복합 파이프 구성도 처리해서 이부분은 좀 더 유심히 봐야할듯
readline을 통해서 입력받고 만약 개행이 있다면 개행 기준으로 버퍼를 나눈다. 그럼 버퍼가 2개니까 push 함수를 2번 호출해야한다. 그래서 입력함수가 끝나고 다음에 진행될 함수를 strm_task_psuh에 넣고나서 2번쨰 버퍼를 읽을 함수를 strm_emit의 세번째 인자의 자기 자신의 함수 포인터를 strm_task_push에 넣어 암묵적인 콜백을 형성한다.
즉, strm_emit함수는 다음것을 task 큐에 push함으로 반복문같은 역할을 담당(다음 콜백 호출)
여기서 중요한것은 strm_emit으로 함수를 연속적으로 실행하여 처리하는것이 아닌 작업큐에 다음에 실행할 것들을 넣는것이다.
즉, 책의 내용인 strm_emit()함수를 호출해도 다음 작업이 즉시 실행되는 것은 아니다. 전달된 데이터는 먼저 작업 대기열에 등록된다. 함수의 실행이 끝나면 메인 루프에서 다음 작업을 꺼낸다.
짭퉁 인터프리터를 분석하면서 왜 인터프리터 언어가 컴파일 언어보다 느린지 알 수 있다.
컴파일 언어는 바로 바이너리 레벨로 만들어져 바이너리가 실행되지만 인터프리터 언어는 해당 소스를 내가 만든 프로그램을 통해서 실행되기에 느릴 수 밖에 없다.