From: Liu Yuan <tailai...@taobao.com> update: clean up strcut cluster_driver too ------------------------------------------- >8
This patch tries to completely remove block/unblock as well as sd_block_handler() code from both cluster driver and core sheep code, simpifying the code a lot also boost performance a lot. We actually have a very nice construct to do blocking request handling:our top/bottom style work queue. So the requests can be nicely ordered by a block_wqueue. Signed-off-by: Liu Yuan <tailai...@taobao.com> --- sheep/cluster.h | 14 --------- sheep/cluster/accord.c | 39 ------------------------- sheep/cluster/corosync.c | 59 ++++---------------------------------- sheep/cluster/local.c | 40 -------------------------- sheep/cluster/zookeeper.c | 50 +------------------------------- sheep/group.c | 69 +++++++++++---------------------------------- 6 files changed, 24 insertions(+), 247 deletions(-) diff --git a/sheep/cluster.h b/sheep/cluster.h index 07e5f7b..970e6ac 100644 --- a/sheep/cluster.h +++ b/sheep/cluster.h @@ -83,20 +83,6 @@ struct cluster_driver { */ int (*notify)(void *msg, size_t msg_len); - /* - * Send a message to all nodes to block further events. - * - * Once the cluster driver has ensured that events are blocked on all - * nodes it needs to call sd_block_handler() on the node where ->block - * was called. - */ - void (*block)(void); - - /* - * Unblock events on all nodes, and send a a message to all nodes. - */ - void (*unblock)(void *msg, size_t msg_len); - struct list_head list; }; diff --git a/sheep/cluster/accord.c b/sheep/cluster/accord.c index c3e0320..27ddb99 100644 --- a/sheep/cluster/accord.c +++ b/sheep/cluster/accord.c @@ -30,7 +30,6 @@ enum acrd_event_type { EVENT_JOIN_REQUEST = 1, EVENT_JOIN_RESPONSE, EVENT_LEAVE, - EVENT_BLOCK, EVENT_NOTIFY, }; @@ -46,8 +45,6 @@ struct acrd_event { uint64_t ids[SD_MAX_NODES]; enum cluster_join_result join_result; - - int callbacked; /* set non-zero after sd_block_handler() was called */ }; static struct sd_node this_node; @@ -279,7 +276,6 @@ static int add_event(struct acrd_handle *ah, enum acrd_event_type type, memmove(i, i + 1, sizeof(*i) * (ev.nr_nodes - idx)); break; case EVENT_NOTIFY: - case EVENT_BLOCK: break; case EVENT_JOIN_RESPONSE: abort(); @@ -427,29 +423,6 @@ static int accord_notify(void *msg, size_t msg_len) return add_event(ahandle, EVENT_NOTIFY, &this_node, msg, msg_len); } -static void accord_block(void) -{ - add_event(ahandle, EVENT_BLOCK, &this_node, NULL, 0); -} - -static void accord_unblock(void *msg, size_t msg_len) -{ - struct acrd_event ev; - - pthread_mutex_lock(&queue_lock); - - acrd_queue_pop(ahandle, &ev); - - ev.type = EVENT_NOTIFY; - ev.buf_len = msg_len; - if (msg) - memcpy(ev.buf, msg, msg_len); - - acrd_queue_push_back(ahandle, &ev); - - pthread_mutex_unlock(&queue_lock); -} - static void acrd_handler(int listen_fd, int events, void *data) { int ret; @@ -510,16 +483,6 @@ static void acrd_handler(int listen_fd, int events, void *data) case EVENT_LEAVE: sd_leave_handler(&ev.sender, ev.nodes, ev.nr_nodes); break; - case EVENT_BLOCK: - if (node_cmp(&ev.sender, &this_node) == 0 && !ev.callbacked) { - ev.callbacked = 1; - - acrd_queue_push_back(ahandle, &ev); - sd_block_handler(); - } else { - acrd_queue_push_back(ahandle, NULL); - } - break; case EVENT_NOTIFY: sd_notify_handler(&ev.sender, ev.buf, ev.buf_len); break; @@ -592,8 +555,6 @@ struct cluster_driver cdrv_accord = { .join = accord_join, .leave = accord_leave, .notify = accord_notify, - .block = accord_block, - .unblock = accord_unblock, }; cdrv_register(cdrv_accord); diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c index e6655a0..6ecbf27 100644 --- a/sheep/cluster/corosync.c +++ b/sheep/cluster/corosync.c @@ -43,7 +43,6 @@ enum corosync_event_type { COROSYNC_EVENT_TYPE_JOIN_REQUEST, COROSYNC_EVENT_TYPE_JOIN_RESPONSE, COROSYNC_EVENT_TYPE_LEAVE, - COROSYNC_EVENT_TYPE_BLOCK, COROSYNC_EVENT_TYPE_NOTIFY, }; @@ -53,8 +52,6 @@ enum corosync_message_type { COROSYNC_MSG_TYPE_JOIN_RESPONSE, COROSYNC_MSG_TYPE_LEAVE, COROSYNC_MSG_TYPE_NOTIFY, - COROSYNC_MSG_TYPE_BLOCK, - COROSYNC_MSG_TYPE_UNBLOCK, }; struct corosync_event { @@ -291,8 +288,8 @@ static int __corosync_dispatch_one(struct corosync_event *cevent) case CJ_RES_JOIN_LATER: build_node_list(cpg_nodes, nr_cpg_nodes, entries); sd_join_handler(&cevent->sender.ent, entries, - nr_cpg_nodes, cevent->result, - cevent->msg); + nr_cpg_nodes, cevent->result, + cevent->msg); break; } break; @@ -307,21 +304,11 @@ static int __corosync_dispatch_one(struct corosync_event *cevent) build_node_list(cpg_nodes, nr_cpg_nodes, entries); sd_leave_handler(&cevent->sender.ent, entries, nr_cpg_nodes); break; - case COROSYNC_EVENT_TYPE_BLOCK: - if (cpg_node_equal(&cevent->sender, &this_node) && - !cevent->callbacked) { - sd_block_handler(); - cevent->callbacked = 1; - } - - /* block the rest messages until unblock message comes */ - return 0; case COROSYNC_EVENT_TYPE_NOTIFY: sd_notify_handler(&cevent->sender.ent, cevent->msg, - cevent->msg_len); + cevent->msg_len); break; } - return 1; } @@ -330,7 +317,8 @@ static void __corosync_dispatch(void) struct corosync_event *cevent; while (!list_empty(&corosync_event_list)) { - cevent = list_first_entry(&corosync_event_list, typeof(*cevent), list); + cevent = list_first_entry(&corosync_event_list, typeof(*cevent), + list); /* update join status */ if (!join_finished) { @@ -362,7 +350,6 @@ static void __corosync_dispatch(void) } else { switch (cevent->type) { case COROSYNC_MSG_TYPE_JOIN_REQUEST: - case COROSYNC_MSG_TYPE_BLOCK: return; default: break; @@ -420,16 +407,12 @@ static void cdrv_cpg_deliver(cpg_handle_t handle, cevent->sender = cmsg->sender; cevent->msg_len = cmsg->msg_len; break; - case COROSYNC_MSG_TYPE_BLOCK: case COROSYNC_MSG_TYPE_NOTIFY: cevent = zalloc(sizeof(*cevent)); if (!cevent) panic("failed to allocate memory\n"); - if (cmsg->type == COROSYNC_MSG_TYPE_BLOCK) - cevent->type = COROSYNC_EVENT_TYPE_BLOCK; - else - cevent->type = COROSYNC_EVENT_TYPE_NOTIFY; + cevent->type = COROSYNC_EVENT_TYPE_NOTIFY; cevent->sender = cmsg->sender; cevent->msg_len = cmsg->msg_len; @@ -480,14 +463,6 @@ static void cdrv_cpg_deliver(cpg_handle_t handle, sizeof(*cmsg->nodes) * cmsg->nr_nodes); break; - case COROSYNC_MSG_TYPE_UNBLOCK: - cevent = update_event(COROSYNC_EVENT_TYPE_BLOCK, &cmsg->sender, - cmsg->msg, cmsg->msg_len); - if (!cevent) - break; - - cevent->type = COROSYNC_EVENT_TYPE_NOTIFY; - break; } __corosync_dispatch(); @@ -542,14 +517,6 @@ static void cdrv_cpg_confchg(cpg_handle_t handle, continue; } - cevent = find_event(COROSYNC_EVENT_TYPE_BLOCK, left_sheep + i); - if (cevent) { - /* the node left before sending UNBLOCK */ - list_del(&cevent->list); - free(cevent->msg); - free(cevent); - } - cevent = zalloc(sizeof(*cevent)); if (!cevent) panic("failed to allocate memory\n"); @@ -640,18 +607,6 @@ static int corosync_leave(void) NULL, 0); } -static void corosync_block(void) -{ - send_message(COROSYNC_MSG_TYPE_BLOCK, 0, &this_node, NULL, 0, - NULL, 0); -} - -static void corosync_unblock(void *msg, size_t msg_len) -{ - send_message(COROSYNC_MSG_TYPE_UNBLOCK, 0, &this_node, NULL, 0, - msg, msg_len); -} - static int corosync_notify(void *msg, size_t msg_len) { return send_message(COROSYNC_MSG_TYPE_NOTIFY, 0, &this_node, @@ -739,8 +694,6 @@ struct cluster_driver cdrv_corosync = { .join = corosync_join, .leave = corosync_leave, .notify = corosync_notify, - .block = corosync_block, - .unblock = corosync_unblock, }; cdrv_register(cdrv_corosync); diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c index 4f2d8e8..a1f0cc6 100644 --- a/sheep/cluster/local.c +++ b/sheep/cluster/local.c @@ -36,7 +36,6 @@ enum local_event_type { EVENT_JOIN_REQUEST = 1, EVENT_JOIN_RESPONSE, EVENT_LEAVE, - EVENT_BLOCK, EVENT_NOTIFY, }; @@ -52,8 +51,6 @@ struct local_event { pid_t pids[SD_MAX_NODES]; enum cluster_join_result join_result; - - int callbacked; /* set non-zero after sd_block_handler() was called */ }; @@ -244,7 +241,6 @@ static void add_event(enum local_event_type type, struct sd_node *node, memmove(p, p + 1, sizeof(*p) * (ev.nr_nodes - idx)); break; case EVENT_NOTIFY: - case EVENT_BLOCK: break; case EVENT_JOIN_RESPONSE: abort(); @@ -314,34 +310,6 @@ static int local_notify(void *msg, size_t msg_len) return 0; } -static void local_block(void) -{ - shm_queue_lock(); - - add_event(EVENT_BLOCK, &this_node, NULL, 0); - - shm_queue_unlock(); -} - -static void local_unblock(void *msg, size_t msg_len) -{ - struct local_event *ev; - - shm_queue_lock(); - - ev = shm_queue_peek(); - - ev->type = EVENT_NOTIFY; - ev->buf_len = msg_len; - if (msg) - memcpy(ev->buf, msg, msg_len); - msync(ev, sizeof(*ev), MS_SYNC); - - shm_queue_notify(); - - shm_queue_unlock(); -} - static void local_handler(int listen_fd, int events, void *data) { struct signalfd_siginfo siginfo; @@ -404,12 +372,6 @@ static void local_handler(int listen_fd, int events, void *data) sd_leave_handler(&ev->sender, ev->nodes, ev->nr_nodes); shm_queue_pop(); break; - case EVENT_BLOCK: - if (node_eq(&ev->sender, &this_node) && !ev->callbacked) { - sd_block_handler(); - ev->callbacked = 1; - } - break; case EVENT_NOTIFY: sd_notify_handler(&ev->sender, ev->buf, ev->buf_len); shm_queue_pop(); @@ -467,8 +429,6 @@ struct cluster_driver cdrv_local = { .join = local_join, .leave = local_leave, .notify = local_notify, - .block = local_block, - .unblock = local_unblock, }; cdrv_register(cdrv_local); diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c index 8499cb4..7924920 100644 --- a/sheep/cluster/zookeeper.c +++ b/sheep/cluster/zookeeper.c @@ -44,7 +44,6 @@ enum zk_event_type { EVENT_JOIN_REQUEST = 1, EVENT_JOIN_RESPONSE, EVENT_LEAVE, - EVENT_BLOCK, EVENT_NOTIFY, }; @@ -60,8 +59,6 @@ struct zk_event { enum cluster_join_result join_result; - int callbacked; /* set non-zero after sd_block_handler() was called */ - size_t buf_len; uint8_t buf[SD_MAX_EVENT_BUF_SIZE]; }; @@ -82,7 +79,7 @@ static size_t nr_zk_nodes; static inline int is_blocking_event(struct zk_event *ev) { - return ev->type == EVENT_BLOCK || ev->type == EVENT_JOIN_REQUEST; + return ev->type == EVENT_JOIN_REQUEST; } /* zookeeper API wrapper */ @@ -497,7 +494,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type type, ev.type = type; ev.sender = *znode; ev.buf_len = buf_len; - ev.callbacked = 0; if (buf) memcpy(ev.buf, buf, buf_len); zk_queue_push(zh, &ev); @@ -514,7 +510,6 @@ static int leave_event(zhandle_t *zh, struct zk_node *znode) ev->type = EVENT_LEAVE; ev->sender = *znode; ev->buf_len = 0; - ev->callbacked = 0; nr_levents = uatomic_add_return(&nr_zk_levents, 1); dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail); @@ -611,34 +606,6 @@ static int zk_notify(void *msg, size_t msg_len) return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len); } -static void zk_block(void) -{ - add_event(zhandle, EVENT_BLOCK, &this_node, NULL, 0); -} - -static void zk_unblock(void *msg, size_t msg_len) -{ - int rc; - struct zk_event ev; - eventfd_t value = 1; - - rc = zk_queue_pop(zhandle, &ev); - assert(rc == 0); - - ev.type = EVENT_NOTIFY; - ev.buf_len = msg_len; - if (msg) - memcpy(ev.buf, msg, msg_len); - - zk_queue_push_back(zhandle, &ev); - - uatomic_dec(&zk_notify_blocked); - - /* this notify is necessary */ - dprintf("write event to efd:%d\n", efd); - eventfd_write(efd, value); -} - static void zk_handler(int listen_fd, int events, void *data) { int ret, rc; @@ -764,19 +731,6 @@ static void zk_handler(int listen_fd, int events, void *data) build_node_list(zk_node_btroot); sd_leave_handler(&ev.sender.node, sd_nodes, nr_sd_nodes); break; - case EVENT_BLOCK: - dprintf("BLOCK\n"); - if (node_eq(&ev.sender.node, &this_node.node) - && !ev.callbacked) { - uatomic_inc(&zk_notify_blocked); - ev.callbacked = 1; - zk_queue_push_back(zhandle, &ev); - sd_block_handler(); - } else { - zk_queue_push_back(zhandle, NULL); - } - - break; case EVENT_NOTIFY: dprintf("NOTIFY\n"); sd_notify_handler(&ev.sender.node, ev.buf, ev.buf_len); @@ -835,8 +789,6 @@ struct cluster_driver cdrv_zookeeper = { .join = zk_join, .leave = zk_leave, .notify = zk_notify, - .block = zk_block, - .unblock = zk_unblock, }; cdrv_register(cdrv_zookeeper); diff --git a/sheep/group.c b/sheep/group.c index c2679f2..638200f 100644 --- a/sheep/group.c +++ b/sheep/group.c @@ -252,7 +252,7 @@ int get_nr_copies(struct vnode_info *vnode_info) } static struct vdi_op_message *prepare_cluster_msg(struct request *req, - size_t *sizep) + size_t *sizep) { struct vdi_op_message *msg; size_t size; @@ -264,11 +264,7 @@ static struct vdi_op_message *prepare_cluster_msg(struct request *req, assert(size <= SD_MAX_EVENT_BUF_SIZE); - msg = zalloc(size); - if (!msg) { - eprintf("failed to allocate memory\n"); - return NULL; - } + msg = xzalloc(size); memcpy(&msg->req, &req->rq, sizeof(struct sd_req)); memcpy(&msg->rsp, &req->rp, sizeof(struct sd_rsp)); @@ -283,9 +279,11 @@ static struct vdi_op_message *prepare_cluster_msg(struct request *req, static void do_cluster_request(struct work *work) { struct request *req = container_of(work, struct request, work); - int ret; + int ret = SD_RES_SUCCESS; + + if (has_process_work(req->op)) + ret = do_process_work(req); - ret = do_process_work(req); req->rp.result = ret; } @@ -296,32 +294,14 @@ static void cluster_op_done(struct work *work) size_t size; msg = prepare_cluster_msg(req, &size); - if (!msg) - panic(); - sys->cdrv->unblock(msg, size); + /* Kick off the cluster to process_main() */ + sys->cdrv->notify(msg, size); free(msg); } /* - * Perform a blocked cluster operation. - * - * Must run in the main thread as it access unlocked state like - * sys->pending_list. - */ -void sd_block_handler(void) -{ - struct request *req = list_first_entry(&sys->pending_list, - struct request, pending_list); - - req->work.fn = do_cluster_request; - req->work.done = cluster_op_done; - - queue_work(sys->block_wqueue, &req->work); -} - -/* * Execute a cluster operation by letting the cluster driver send it to all * nodes in the cluster. * @@ -332,24 +312,10 @@ void queue_cluster_request(struct request *req) { eprintf("%p %x\n", req, req->rq.opcode); - if (has_process_work(req->op)) { - list_add_tail(&req->pending_list, &sys->pending_list); - sys->cdrv->block(); - } else { - struct vdi_op_message *msg; - size_t size; - - msg = prepare_cluster_msg(req, &size); - if (!msg) - return; - - list_add_tail(&req->pending_list, &sys->pending_list); - - msg->rsp.result = SD_RES_SUCCESS; - sys->cdrv->notify(msg, size); - - free(msg); - } + list_add_tail(&req->pending_list, &sys->pending_list); + req->work.fn = do_cluster_request; + req->work.done = cluster_op_done; + queue_work(sys->block_wqueue, &req->work); } static inline int get_nodes_nr_from(struct list_head *l) @@ -748,20 +714,19 @@ void sd_notify_handler(struct sd_node *sender, void *data, size_t data_len) struct vdi_op_message *msg = data; struct sd_op_template *op = get_sd_op(msg->req.opcode); int ret = msg->rsp.result; - struct request *req = NULL; dprintf("size: %zd, from: %s\n", data_len, node_to_str(sender)); + if (ret == SD_RES_SUCCESS && has_process_main(op)) + ret = do_process_main(op, &msg->req, &msg->rsp, msg->data); + if (is_myself(sender->addr, sender->port)) { + struct request *req; + req = list_first_entry(&sys->pending_list, struct request, pending_list); list_del(&req->pending_list); - } - - if (ret == SD_RES_SUCCESS && has_process_main(op)) - ret = do_process_main(op, &msg->req, &msg->rsp, msg->data); - if (req) { msg->rsp.result = ret; if (has_process_main(req->op)) memcpy(req->data, msg->data, msg->rsp.data_length); -- 1.7.10.2 -- sheepdog mailing list sheepdog@lists.wpkg.org http://lists.wpkg.org/mailman/listinfo/sheepdog