直接上图:
memcached使用多线程模型,一个master线程,多个worker线程,master和worker通过管道实现通信。
每个worker线程有一个队列,队列元素为CQ_ITEM。
typedef struct { pthread_t thread_id; /* unique ID of this thread */ struct event_base *base; /* libevent handle this thread uses */ struct event notify_event; /* listen event for notify pipe */ int notify_receive_fd; /* receiving end of notify pipe */ int notify_send_fd; /* sending end of notify pipe */ struct thread_stats stats; /* Stats generated by this thread */ struct conn_queue *new_conn_queue; /* queue of new connections to handle */ cache_t *suffix_cache; /* suffix cache */ logger *l; /* logger buffer */ void *lru_bump_buf; /* async LRU bump buffer */} LIBEVENT_THREAD;/* An item in the connection queue. */typedef struct conn_queue_item CQ_ITEM;struct conn_queue_item { int sfd; enum conn_states init_state; int event_flags; int read_buffer_size; enum network_transport transport; conn *c; CQ_ITEM *next;};/* A connection queue. */typedef struct conn_queue CQ;struct conn_queue { CQ_ITEM *head; CQ_ITEM *tail; pthread_mutex_t lock;};
memcached使用libevent实现事件监听,master和worker各有一个event_base。
起初,master负责监听连接的到来,worker线程负责监听管道的读事件。
当有一个连接到来,master线程accept该连接,并将conn_fd封装成一个CQ_ITEM对象放入一个worker线程的队列中,同时向管道写入数据触发管道读事件。
对应worker线程执行管道读事件的回调函数thread_libevent_process:
/* * Processes an incoming "handle a new connection" item. This is called when * input arrives on the libevent wakeup pipe. */static void thread_libevent_process(int fd, short which, void *arg) { LIBEVENT_THREAD *me = arg; CQ_ITEM *item; char buf[1]; unsigned int timeout_fd; if (read(fd, buf, 1) != 1) { if (settings.verbose > 0) fprintf(stderr, "Can't read from libevent pipe\n"); return; } switch (buf[0]) { case 'c': item = cq_pop(me->new_conn_queue); if (NULL != item) { conn *c = conn_new(item->sfd, item->init_state, item->event_flags, item->read_buffer_size, item->transport, me->base); if (c == NULL) { if (IS_UDP(item->transport)) { fprintf(stderr, "Can't listen for events on UDP socket\n"); exit(1); } else { if (settings.verbose > 0) { fprintf(stderr, "Can't listen for events on fd %d\n", item->sfd); } close(item->sfd); } } else { c->thread = me; } cqi_free(item); } break; case 'r': item = cq_pop(me->new_conn_queue); if (NULL != item) { conn_worker_readd(item->c); cqi_free(item); } break; /* we were told to pause and report in */ case 'p': register_thread_initialized(); break; /* a client socket timed out */ case 't': if (read(fd, &timeout_fd, sizeof(timeout_fd)) != sizeof(timeout_fd)) { if (settings.verbose > 0) fprintf(stderr, "Can't read timeout fd from libevent pipe\n"); return; } conn_close_idle(conns[timeout_fd]); break; }}
在conn_new中,将conn_fd的读事件添加进自己的event_base中。
至此,worker线程开始监听连接上的I/O事件。
参考资料: