From: Liu Yuan <tailai...@taobao.com>

update: clean up strcut cluster_driver too
------------------------------------------- >8

This patch tries to completely remove block/unblock as well as
sd_block_handler() code from both cluster driver and core sheep code, simpifying
the code a lot also boost performance a lot. We actually have a very nice
construct to do blocking request handling:our top/bottom style work queue. So
the requests can be nicely ordered by a block_wqueue.

Signed-off-by: Liu Yuan <tailai...@taobao.com>
---
 sheep/cluster.h           |   14 ---------
 sheep/cluster/accord.c    |   39 -------------------------
 sheep/cluster/corosync.c  |   59 ++++----------------------------------
 sheep/cluster/local.c     |   40 --------------------------
 sheep/cluster/zookeeper.c |   50 +-------------------------------
 sheep/group.c             |   69 +++++++++++----------------------------------
 6 files changed, 24 insertions(+), 247 deletions(-)

diff --git a/sheep/cluster.h b/sheep/cluster.h
index 07e5f7b..970e6ac 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -83,20 +83,6 @@ struct cluster_driver {
         */
        int (*notify)(void *msg, size_t msg_len);
 
-       /*
-        * Send a message to all nodes to block further events.
-        *
-        * Once the cluster driver has ensured that events are blocked on all
-        * nodes it needs to call sd_block_handler() on the node where ->block
-        * was called.
-        */
-       void (*block)(void);
-
-       /*
-        * Unblock events on all nodes, and send a a message to all nodes.
-        */
-       void (*unblock)(void *msg, size_t msg_len);
-
        struct list_head list;
 };
 
diff --git a/sheep/cluster/accord.c b/sheep/cluster/accord.c
index c3e0320..27ddb99 100644
--- a/sheep/cluster/accord.c
+++ b/sheep/cluster/accord.c
@@ -30,7 +30,6 @@ enum acrd_event_type {
        EVENT_JOIN_REQUEST = 1,
        EVENT_JOIN_RESPONSE,
        EVENT_LEAVE,
-       EVENT_BLOCK,
        EVENT_NOTIFY,
 };
 
@@ -46,8 +45,6 @@ struct acrd_event {
        uint64_t ids[SD_MAX_NODES];
 
        enum cluster_join_result join_result;
-
-       int callbacked; /* set non-zero after sd_block_handler() was called */
 };
 
 static struct sd_node this_node;
@@ -279,7 +276,6 @@ static int add_event(struct acrd_handle *ah, enum 
acrd_event_type type,
                memmove(i, i + 1, sizeof(*i) * (ev.nr_nodes - idx));
                break;
        case EVENT_NOTIFY:
-       case EVENT_BLOCK:
                break;
        case EVENT_JOIN_RESPONSE:
                abort();
@@ -427,29 +423,6 @@ static int accord_notify(void *msg, size_t msg_len)
        return add_event(ahandle, EVENT_NOTIFY, &this_node, msg, msg_len);
 }
 
-static void accord_block(void)
-{
-       add_event(ahandle, EVENT_BLOCK, &this_node, NULL, 0);
-}
-
-static void accord_unblock(void *msg, size_t msg_len)
-{
-       struct acrd_event ev;
-
-       pthread_mutex_lock(&queue_lock);
-
-       acrd_queue_pop(ahandle, &ev);
-
-       ev.type = EVENT_NOTIFY;
-       ev.buf_len = msg_len;
-       if (msg)
-               memcpy(ev.buf, msg, msg_len);
-
-       acrd_queue_push_back(ahandle, &ev);
-
-       pthread_mutex_unlock(&queue_lock);
-}
-
 static void acrd_handler(int listen_fd, int events, void *data)
 {
        int ret;
@@ -510,16 +483,6 @@ static void acrd_handler(int listen_fd, int events, void 
*data)
        case EVENT_LEAVE:
                sd_leave_handler(&ev.sender, ev.nodes, ev.nr_nodes);
                break;
-       case EVENT_BLOCK:
-               if (node_cmp(&ev.sender, &this_node) == 0 && !ev.callbacked) {
-                       ev.callbacked = 1;
-
-                       acrd_queue_push_back(ahandle, &ev);
-                       sd_block_handler();
-               } else {
-                       acrd_queue_push_back(ahandle, NULL);
-               }
-               break;
        case EVENT_NOTIFY:
                sd_notify_handler(&ev.sender, ev.buf, ev.buf_len);
                break;
@@ -592,8 +555,6 @@ struct cluster_driver cdrv_accord = {
        .join       = accord_join,
        .leave      = accord_leave,
        .notify     = accord_notify,
-       .block      = accord_block,
-       .unblock    = accord_unblock,
 };
 
 cdrv_register(cdrv_accord);
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index e6655a0..6ecbf27 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -43,7 +43,6 @@ enum corosync_event_type {
        COROSYNC_EVENT_TYPE_JOIN_REQUEST,
        COROSYNC_EVENT_TYPE_JOIN_RESPONSE,
        COROSYNC_EVENT_TYPE_LEAVE,
-       COROSYNC_EVENT_TYPE_BLOCK,
        COROSYNC_EVENT_TYPE_NOTIFY,
 };
 
@@ -53,8 +52,6 @@ enum corosync_message_type {
        COROSYNC_MSG_TYPE_JOIN_RESPONSE,
        COROSYNC_MSG_TYPE_LEAVE,
        COROSYNC_MSG_TYPE_NOTIFY,
-       COROSYNC_MSG_TYPE_BLOCK,
-       COROSYNC_MSG_TYPE_UNBLOCK,
 };
 
 struct corosync_event {
@@ -291,8 +288,8 @@ static int __corosync_dispatch_one(struct corosync_event 
*cevent)
                case CJ_RES_JOIN_LATER:
                        build_node_list(cpg_nodes, nr_cpg_nodes, entries);
                        sd_join_handler(&cevent->sender.ent, entries,
-                                                      nr_cpg_nodes, 
cevent->result,
-                                                      cevent->msg);
+                                       nr_cpg_nodes, cevent->result,
+                                       cevent->msg);
                        break;
                }
                break;
@@ -307,21 +304,11 @@ static int __corosync_dispatch_one(struct corosync_event 
*cevent)
                build_node_list(cpg_nodes, nr_cpg_nodes, entries);
                sd_leave_handler(&cevent->sender.ent, entries, nr_cpg_nodes);
                break;
-       case COROSYNC_EVENT_TYPE_BLOCK:
-               if (cpg_node_equal(&cevent->sender, &this_node) &&
-                   !cevent->callbacked) {
-                       sd_block_handler();
-                       cevent->callbacked = 1;
-               }
-
-               /* block the rest messages until unblock message comes */
-               return 0;
        case COROSYNC_EVENT_TYPE_NOTIFY:
                sd_notify_handler(&cevent->sender.ent, cevent->msg,
-                                                cevent->msg_len);
+                                 cevent->msg_len);
                break;
        }
-
        return 1;
 }
 
@@ -330,7 +317,8 @@ static void __corosync_dispatch(void)
        struct corosync_event *cevent;
 
        while (!list_empty(&corosync_event_list)) {
-               cevent = list_first_entry(&corosync_event_list, 
typeof(*cevent), list);
+               cevent = list_first_entry(&corosync_event_list, typeof(*cevent),
+                                         list);
 
                /* update join status */
                if (!join_finished) {
@@ -362,7 +350,6 @@ static void __corosync_dispatch(void)
                } else {
                        switch (cevent->type) {
                        case COROSYNC_MSG_TYPE_JOIN_REQUEST:
-                       case COROSYNC_MSG_TYPE_BLOCK:
                                return;
                        default:
                                break;
@@ -420,16 +407,12 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
                cevent->sender = cmsg->sender;
                cevent->msg_len = cmsg->msg_len;
                break;
-       case COROSYNC_MSG_TYPE_BLOCK:
        case COROSYNC_MSG_TYPE_NOTIFY:
                cevent = zalloc(sizeof(*cevent));
                if (!cevent)
                        panic("failed to allocate memory\n");
 
-               if (cmsg->type == COROSYNC_MSG_TYPE_BLOCK)
-                       cevent->type = COROSYNC_EVENT_TYPE_BLOCK;
-               else
-                       cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
+               cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
 
                cevent->sender = cmsg->sender;
                cevent->msg_len = cmsg->msg_len;
@@ -480,14 +463,6 @@ static void cdrv_cpg_deliver(cpg_handle_t handle,
                       sizeof(*cmsg->nodes) * cmsg->nr_nodes);
 
                break;
-       case COROSYNC_MSG_TYPE_UNBLOCK:
-               cevent = update_event(COROSYNC_EVENT_TYPE_BLOCK, &cmsg->sender,
-                                     cmsg->msg, cmsg->msg_len);
-               if (!cevent)
-                       break;
-
-               cevent->type = COROSYNC_EVENT_TYPE_NOTIFY;
-               break;
        }
 
        __corosync_dispatch();
@@ -542,14 +517,6 @@ static void cdrv_cpg_confchg(cpg_handle_t handle,
                        continue;
                }
 
-               cevent = find_event(COROSYNC_EVENT_TYPE_BLOCK, left_sheep + i);
-               if (cevent) {
-                       /* the node left before sending UNBLOCK */
-                       list_del(&cevent->list);
-                       free(cevent->msg);
-                       free(cevent);
-               }
-
                cevent = zalloc(sizeof(*cevent));
                if (!cevent)
                        panic("failed to allocate memory\n");
@@ -640,18 +607,6 @@ static int corosync_leave(void)
                            NULL, 0);
 }
 
-static void corosync_block(void)
-{
-       send_message(COROSYNC_MSG_TYPE_BLOCK, 0, &this_node, NULL, 0,
-                           NULL, 0);
-}
-
-static void corosync_unblock(void *msg, size_t msg_len)
-{
-       send_message(COROSYNC_MSG_TYPE_UNBLOCK, 0, &this_node, NULL, 0,
-                    msg, msg_len);
-}
-
 static int corosync_notify(void *msg, size_t msg_len)
 {
        return send_message(COROSYNC_MSG_TYPE_NOTIFY, 0, &this_node,
@@ -739,8 +694,6 @@ struct cluster_driver cdrv_corosync = {
        .join       = corosync_join,
        .leave      = corosync_leave,
        .notify     = corosync_notify,
-       .block      = corosync_block,
-       .unblock    = corosync_unblock,
 };
 
 cdrv_register(cdrv_corosync);
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index 4f2d8e8..a1f0cc6 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -36,7 +36,6 @@ enum local_event_type {
        EVENT_JOIN_REQUEST = 1,
        EVENT_JOIN_RESPONSE,
        EVENT_LEAVE,
-       EVENT_BLOCK,
        EVENT_NOTIFY,
 };
 
@@ -52,8 +51,6 @@ struct local_event {
        pid_t pids[SD_MAX_NODES];
 
        enum cluster_join_result join_result;
-
-       int callbacked; /* set non-zero after sd_block_handler() was called */
 };
 
 
@@ -244,7 +241,6 @@ static void add_event(enum local_event_type type, struct 
sd_node *node,
                memmove(p, p + 1, sizeof(*p) * (ev.nr_nodes - idx));
                break;
        case EVENT_NOTIFY:
-       case EVENT_BLOCK:
                break;
        case EVENT_JOIN_RESPONSE:
                abort();
@@ -314,34 +310,6 @@ static int local_notify(void *msg, size_t msg_len)
        return 0;
 }
 
-static void local_block(void)
-{
-       shm_queue_lock();
-
-       add_event(EVENT_BLOCK, &this_node, NULL, 0);
-
-       shm_queue_unlock();
-}
-
-static void local_unblock(void *msg, size_t msg_len)
-{
-       struct local_event *ev;
-
-       shm_queue_lock();
-
-       ev = shm_queue_peek();
-
-       ev->type = EVENT_NOTIFY;
-       ev->buf_len = msg_len;
-       if (msg)
-               memcpy(ev->buf, msg, msg_len);
-       msync(ev, sizeof(*ev), MS_SYNC);
-
-       shm_queue_notify();
-
-       shm_queue_unlock();
-}
-
 static void local_handler(int listen_fd, int events, void *data)
 {
        struct signalfd_siginfo siginfo;
@@ -404,12 +372,6 @@ static void local_handler(int listen_fd, int events, void 
*data)
                sd_leave_handler(&ev->sender, ev->nodes, ev->nr_nodes);
                shm_queue_pop();
                break;
-       case EVENT_BLOCK:
-               if (node_eq(&ev->sender, &this_node) && !ev->callbacked) {
-                       sd_block_handler();
-                       ev->callbacked = 1;
-               }
-               break;
        case EVENT_NOTIFY:
                sd_notify_handler(&ev->sender, ev->buf, ev->buf_len);
                shm_queue_pop();
@@ -467,8 +429,6 @@ struct cluster_driver cdrv_local = {
        .join       = local_join,
        .leave      = local_leave,
        .notify     = local_notify,
-       .block      = local_block,
-       .unblock    = local_unblock,
 };
 
 cdrv_register(cdrv_local);
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 8499cb4..7924920 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -44,7 +44,6 @@ enum zk_event_type {
        EVENT_JOIN_REQUEST = 1,
        EVENT_JOIN_RESPONSE,
        EVENT_LEAVE,
-       EVENT_BLOCK,
        EVENT_NOTIFY,
 };
 
@@ -60,8 +59,6 @@ struct zk_event {
 
        enum cluster_join_result join_result;
 
-       int callbacked; /* set non-zero after sd_block_handler() was called */
-
        size_t buf_len;
        uint8_t buf[SD_MAX_EVENT_BUF_SIZE];
 };
@@ -82,7 +79,7 @@ static size_t nr_zk_nodes;
 
 static inline int is_blocking_event(struct zk_event *ev)
 {
-       return ev->type == EVENT_BLOCK || ev->type == EVENT_JOIN_REQUEST;
+       return ev->type == EVENT_JOIN_REQUEST;
 }
 
 /* zookeeper API wrapper */
@@ -497,7 +494,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
        ev.type = type;
        ev.sender = *znode;
        ev.buf_len = buf_len;
-       ev.callbacked = 0;
        if (buf)
                memcpy(ev.buf, buf, buf_len);
        zk_queue_push(zh, &ev);
@@ -514,7 +510,6 @@ static int leave_event(zhandle_t *zh, struct zk_node *znode)
        ev->type = EVENT_LEAVE;
        ev->sender = *znode;
        ev->buf_len = 0;
-       ev->callbacked = 0;
 
        nr_levents = uatomic_add_return(&nr_zk_levents, 1);
        dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail);
@@ -611,34 +606,6 @@ static int zk_notify(void *msg, size_t msg_len)
        return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len);
 }
 
-static void zk_block(void)
-{
-       add_event(zhandle, EVENT_BLOCK, &this_node, NULL, 0);
-}
-
-static void zk_unblock(void *msg, size_t msg_len)
-{
-       int rc;
-       struct zk_event ev;
-       eventfd_t value = 1;
-
-       rc = zk_queue_pop(zhandle, &ev);
-       assert(rc == 0);
-
-       ev.type = EVENT_NOTIFY;
-       ev.buf_len = msg_len;
-       if (msg)
-               memcpy(ev.buf, msg, msg_len);
-
-       zk_queue_push_back(zhandle, &ev);
-
-       uatomic_dec(&zk_notify_blocked);
-
-       /* this notify is necessary */
-       dprintf("write event to efd:%d\n", efd);
-       eventfd_write(efd, value);
-}
-
 static void zk_handler(int listen_fd, int events, void *data)
 {
        int ret, rc;
@@ -764,19 +731,6 @@ static void zk_handler(int listen_fd, int events, void 
*data)
                build_node_list(zk_node_btroot);
                sd_leave_handler(&ev.sender.node, sd_nodes, nr_sd_nodes);
                break;
-       case EVENT_BLOCK:
-               dprintf("BLOCK\n");
-               if (node_eq(&ev.sender.node, &this_node.node)
-                               && !ev.callbacked) {
-                       uatomic_inc(&zk_notify_blocked);
-                       ev.callbacked = 1;
-                       zk_queue_push_back(zhandle, &ev);
-                       sd_block_handler();
-               } else {
-                       zk_queue_push_back(zhandle, NULL);
-               }
-
-               break;
        case EVENT_NOTIFY:
                dprintf("NOTIFY\n");
                sd_notify_handler(&ev.sender.node, ev.buf, ev.buf_len);
@@ -835,8 +789,6 @@ struct cluster_driver cdrv_zookeeper = {
        .join       = zk_join,
        .leave      = zk_leave,
        .notify     = zk_notify,
-       .block      = zk_block,
-       .unblock    = zk_unblock,
 };
 
 cdrv_register(cdrv_zookeeper);
diff --git a/sheep/group.c b/sheep/group.c
index c2679f2..638200f 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -252,7 +252,7 @@ int get_nr_copies(struct vnode_info *vnode_info)
 }
 
 static struct vdi_op_message *prepare_cluster_msg(struct request *req,
-               size_t *sizep)
+                                                 size_t *sizep)
 {
        struct vdi_op_message *msg;
        size_t size;
@@ -264,11 +264,7 @@ static struct vdi_op_message *prepare_cluster_msg(struct 
request *req,
 
        assert(size <= SD_MAX_EVENT_BUF_SIZE);
 
-       msg = zalloc(size);
-       if (!msg) {
-               eprintf("failed to allocate memory\n");
-               return NULL;
-       }
+       msg = xzalloc(size);
 
        memcpy(&msg->req, &req->rq, sizeof(struct sd_req));
        memcpy(&msg->rsp, &req->rp, sizeof(struct sd_rsp));
@@ -283,9 +279,11 @@ static struct vdi_op_message *prepare_cluster_msg(struct 
request *req,
 static void do_cluster_request(struct work *work)
 {
        struct request *req = container_of(work, struct request, work);
-       int ret;
+       int ret = SD_RES_SUCCESS;
+
+       if (has_process_work(req->op))
+               ret = do_process_work(req);
 
-       ret = do_process_work(req);
        req->rp.result = ret;
 }
 
@@ -296,32 +294,14 @@ static void cluster_op_done(struct work *work)
        size_t size;
 
        msg = prepare_cluster_msg(req, &size);
-       if (!msg)
-               panic();
 
-       sys->cdrv->unblock(msg, size);
+       /* Kick off the cluster to process_main() */
+       sys->cdrv->notify(msg, size);
 
        free(msg);
 }
 
 /*
- * Perform a blocked cluster operation.
- *
- * Must run in the main thread as it access unlocked state like
- * sys->pending_list.
- */
-void sd_block_handler(void)
-{
-       struct request *req = list_first_entry(&sys->pending_list,
-                                               struct request, pending_list);
-
-       req->work.fn = do_cluster_request;
-       req->work.done = cluster_op_done;
-
-       queue_work(sys->block_wqueue, &req->work);
-}
-
-/*
  * Execute a cluster operation by letting the cluster driver send it to all
  * nodes in the cluster.
  *
@@ -332,24 +312,10 @@ void queue_cluster_request(struct request *req)
 {
        eprintf("%p %x\n", req, req->rq.opcode);
 
-       if (has_process_work(req->op)) {
-               list_add_tail(&req->pending_list, &sys->pending_list);
-               sys->cdrv->block();
-       } else {
-               struct vdi_op_message *msg;
-               size_t size;
-
-               msg = prepare_cluster_msg(req, &size);
-               if (!msg)
-                       return;
-
-               list_add_tail(&req->pending_list, &sys->pending_list);
-
-               msg->rsp.result = SD_RES_SUCCESS;
-               sys->cdrv->notify(msg, size);
-
-               free(msg);
-       }
+       list_add_tail(&req->pending_list, &sys->pending_list);
+       req->work.fn = do_cluster_request;
+       req->work.done = cluster_op_done;
+       queue_work(sys->block_wqueue, &req->work);
 }
 
 static inline int get_nodes_nr_from(struct list_head *l)
@@ -748,20 +714,19 @@ void sd_notify_handler(struct sd_node *sender, void 
*data, size_t data_len)
        struct vdi_op_message *msg = data;
        struct sd_op_template *op = get_sd_op(msg->req.opcode);
        int ret = msg->rsp.result;
-       struct request *req = NULL;
 
        dprintf("size: %zd, from: %s\n", data_len, node_to_str(sender));
 
+       if (ret == SD_RES_SUCCESS && has_process_main(op))
+               ret = do_process_main(op, &msg->req, &msg->rsp, msg->data);
+
        if (is_myself(sender->addr, sender->port)) {
+               struct request *req;
+
                req = list_first_entry(&sys->pending_list, struct request,
                                       pending_list);
                list_del(&req->pending_list);
-       }
-
-       if (ret == SD_RES_SUCCESS && has_process_main(op))
-               ret = do_process_main(op, &msg->req, &msg->rsp, msg->data);
 
-       if (req) {
                msg->rsp.result = ret;
                if (has_process_main(req->op))
                        memcpy(req->data, msg->data, msg->rsp.data_length);
-- 
1.7.10.2

-- 
sheepdog mailing list
sheepdog@lists.wpkg.org
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to