------------------------------------------------------------
revno: 51
revision-id: [EMAIL PROTECTED]
parent: [EMAIL PROTECTED]
committer: Peter Somogyi <[EMAIL PROTECTED]>
branch nick: ctdb
timestamp: Fri 2007-01-05 18:13:35 +0100
message:
  ibw: modified tridge's code - in my point of view
  ibw_alloc_send and node-centric params are the basics of these important 
changes.
  Also tried to avoid memcpy/memdup where it was possible.
modified:
  common/ctdb.c                  ctdb.c-20061127094323-t50f58d65iaao5of-2
  common/ctdb_call.c             ctdb_call.c-20061128065342-to93h6eejj5kon81-1
  ib/ibw_ctdb_init.c             
ibw_ctdb_init.c-20070102171305-cn2z4k7ibx8141d5-1
  include/ctdb_private.h         
ctdb_private.h-20061117234101-o3qt14umlg9en8z0-13
  tcp/tcp_init.c                 tcp_init.c-20061128004937-x70q1cu5xzg5g2tm-2
=== modified file 'common/ctdb.c'
--- a/common/ctdb.c     2007-01-02 17:16:39 +0000
+++ b/common/ctdb.c     2007-01-05 17:13:35 +0000
@@ -30,14 +30,14 @@
 int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
 {
        int ctdb_tcp_init(struct ctdb_context *ctdb);
-#ifdef HAVE_INFINIBAND
+#ifdef USE_INFINIBAND
        int ctdb_ibw_init(struct ctdb_context *ctdb);
 #endif /*HAVE_INFINIBAND*/
 
        if (strcmp(transport, "tcp") == 0) {
                return ctdb_tcp_init(ctdb);
        }
-#ifdef HAVE_INFINIBAND
+#ifdef USE_INFINIBAND
        if (strcmp(transport, "ib") == 0) {
                return ctdb_ibw_init(ctdb);
        }
@@ -256,10 +256,15 @@
        }
 }
 
+void ctdb_stopped(struct ctdb_context *ctdb)
+{
+}
+
 static const struct ctdb_upcalls ctdb_upcalls = {
        .recv_pkt       = ctdb_recv_pkt,
        .node_dead      = ctdb_node_dead,
-       .node_connected = ctdb_node_connected
+       .node_connected = ctdb_node_connected,
+       .stopped        = ctdb_stopped
 };
 
 /*

=== modified file 'common/ctdb_call.c'
--- a/common/ctdb_call.c        2006-12-19 01:03:10 +0000
+++ b/common/ctdb_call.c        2007-01-05 17:13:35 +0000
@@ -31,12 +31,10 @@
 /*
   queue a packet or die
 */
-static void ctdb_queue_packet(struct ctdb_context *ctdb, struct 
ctdb_req_header *hdr)
+static inline void ctdb_queue_packet(struct ctdb_node *node, struct 
ctdb_req_header *hdr)
 {
-       struct ctdb_node *node;
-       node = ctdb->nodes[hdr->destnode];
-       if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
-               ctdb_fatal(ctdb, "Unable to queue packet\n");
+       if (node->ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) 
!= 0) {
+               ctdb_fatal(node->ctdb, "Unable to queue packet\n");
        }
 }
 
@@ -121,6 +119,9 @@
        struct ctdb_reply_error *r;
        char *msg;
        int len;
+       struct ctdb_node *node;
+
+       node = ctdb->nodes[hdr->srcnode];
 
        va_start(ap, fmt);
        msg = talloc_vasprintf(ctdb, fmt, ap);
@@ -130,7 +131,7 @@
        va_end(ap);
 
        len = strlen(msg)+1;
-       r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + len);
+       r = ctdb->methods->allocate_pkt(node, sizeof(*r) + len);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.length = sizeof(*r) + len;
        r->hdr.operation = CTDB_REPLY_ERROR;
@@ -143,9 +144,8 @@
 
        talloc_free(msg);
 
-       ctdb_queue_packet(ctdb, &r->hdr);
-
-       talloc_free(r);
+       ctdb_queue_packet(node, &r->hdr);
+       ctdb->methods->dealloc_pkt(node, r);
 }
 
 
@@ -157,8 +157,11 @@
                                    struct ctdb_ltdb_header *header)
 {
        struct ctdb_reply_redirect *r;
-
-       r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r));
+       struct ctdb_node *node;
+
+       node = ctdb->nodes[c->hdr.srcnode];
+
+       r = ctdb->methods->allocate_pkt(node, sizeof(*r));
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.length = sizeof(*r);
        r->hdr.operation = CTDB_REPLY_REDIRECT;
@@ -167,9 +170,8 @@
        r->hdr.reqid     = c->hdr.reqid;
        r->dmaster       = header->dmaster;
 
-       ctdb_queue_packet(ctdb, &r->hdr);
-
-       talloc_free(r);
+       ctdb_queue_packet(node, &r->hdr);
+       ctdb->methods->dealloc_pkt(node, r);
 }
 
 /*
@@ -186,13 +188,18 @@
 {
        struct ctdb_req_dmaster *r;
        int len;
+       struct ctdb_node *node;
+       uint32_t destnode;
+
+       destnode = ctdb_lmaster(ctdb, key);
+       node = ctdb->nodes[destnode];
        
        len = sizeof(*r) + key->dsize + data->dsize;
-       r = ctdb->methods->allocate_pkt(ctdb, len);
+       r = ctdb->methods->allocate_pkt(node, len);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.length    = len;
        r->hdr.operation = CTDB_REQ_DMASTER;
-       r->hdr.destnode  = ctdb_lmaster(ctdb, key);
+       r->hdr.destnode  = destnode;
        r->hdr.srcnode   = ctdb->vnn;
        r->hdr.reqid     = c->hdr.reqid;
        r->dmaster       = header->laccessor;
@@ -205,14 +212,14 @@
                /* we are the lmaster - don't send to ourselves */
                ctdb_request_dmaster(ctdb, &r->hdr);
        } else {
-               ctdb_queue_packet(ctdb, &r->hdr);
+               ctdb_queue_packet(node, &r->hdr);
 
                /* update the ltdb to record the new dmaster */
                header->dmaster = r->hdr.destnode;
                ctdb_ltdb_store(ctdb, *key, header, *data);
        }
 
-       talloc_free(r);
+       ctdb->methods->dealloc_pkt(node, r);
 }
 
 
@@ -229,7 +236,9 @@
        TDB_DATA key, data;
        struct ctdb_ltdb_header header;
        int ret;
+       struct ctdb_node *node;
 
+       node = ctdb->nodes[c->dmaster];
        key.dptr = c->data;
        key.dsize = c->keylen;
        data.dptr = c->data + c->keylen;
@@ -255,7 +264,7 @@
        }
 
        /* send the CTDB_REPLY_DMASTER */
-       r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + data.dsize);
+       r = ctdb->methods->allocate_pkt(node, sizeof(*r) + data.dsize);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.length = sizeof(*r) + data.dsize;
        r->hdr.operation = CTDB_REPLY_DMASTER;
@@ -265,9 +274,8 @@
        r->datalen       = data.dsize;
        memcpy(&r->data[0], data.dptr, data.dsize);
 
-       ctdb_queue_packet(ctdb, &r->hdr);
-
-       talloc_free(r);
+       ctdb_queue_packet(node, &r->hdr);
+       ctdb->methods->dealloc_pkt(node, r);
 }
 
 
@@ -281,7 +289,9 @@
        struct ctdb_reply_call *r;
        int ret;
        struct ctdb_ltdb_header header;
+       struct ctdb_node *node;
 
+       node = ctdb->nodes[hdr->srcnode];
        key.dptr = c->data;
        key.dsize = c->keylen;
        call_data.dptr = c->data + c->keylen;
@@ -317,7 +327,7 @@
                        call_data.dsize?&call_data:NULL,
                        &reply_data, c->hdr.srcnode);
 
-       r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + reply_data.dsize);
+       r = ctdb->methods->allocate_pkt(node, sizeof(*r) + reply_data.dsize);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.length = sizeof(*r) + reply_data.dsize;
        r->hdr.operation = CTDB_REPLY_CALL;
@@ -327,10 +337,10 @@
        r->datalen       = reply_data.dsize;
        memcpy(&r->data[0], reply_data.dptr, reply_data.dsize);
 
-       ctdb_queue_packet(ctdb, &r->hdr);
+       ctdb_queue_packet(node, &r->hdr);
+       ctdb->methods->dealloc_pkt(node, r);
 
        talloc_free(reply_data.dptr);
-       talloc_free(r);
 }
 
 enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR};
@@ -440,7 +450,10 @@
 {
        struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr;
        struct ctdb_call_state *state;
-
+       struct ctdb_node *node;
+#ifdef USE_INFINIBAND
+       uint8_t *r;
+#endif /* USE_INFINIBAND */
        state = idr_find(ctdb->idr, hdr->reqid);
 
        talloc_steal(state, c);
@@ -453,7 +466,18 @@
        /* send it off again */
        state->node = ctdb->nodes[c->dmaster];
 
-       ctdb_queue_packet(ctdb, &state->c->hdr);
+       node = ctdb->nodes[state->c->hdr.destnode];
+
+#ifdef USE_INFINIBAND
+       r = ctdb->methods->allocate_pkt(node, state->c->hdr.length);
+       memcpy(r, &state->c->hdr, state->c->hdr.length);
+#endif /* USE_INFINIBAND */
+
+       ctdb_queue_packet(node, &state->c->hdr);
+
+#ifdef USE_INFINIBAND
+       ctdb->methods->dealloc_pkt(node, r);
+#endif /* USE_INFINIBAND */
 }
 
 /*
@@ -520,6 +544,7 @@
        int ret;
        struct ctdb_ltdb_header header;
        TDB_DATA data;
+       struct ctdb_node *node;
 
        /*
          if we are the dmaster for this key then we don't need to
@@ -538,8 +563,9 @@
        state = talloc_zero(ctdb, struct ctdb_call_state);
        CTDB_NO_MEMORY_NULL(ctdb, state);
 
+       node = ctdb->nodes[header.dmaster];
        len = sizeof(*state->c) + key.dsize + (call_data?call_data->dsize:0);
-       state->c = ctdb->methods->allocate_pkt(ctdb, len);
+       state->c = ctdb->methods->allocate_pkt(node, len);
        CTDB_NO_MEMORY_NULL(ctdb, state->c);
 
        state->c->hdr.length    = len;
@@ -566,7 +592,12 @@
 
        talloc_set_destructor(state, ctdb_call_destructor);
 
-       ctdb_queue_packet(ctdb, &state->c->hdr);
+       ctdb_queue_packet(node, &state->c->hdr);
+
+#ifdef USE_INFINIBAND
+       ctdb->methods->dealloc_pkt(node, state->c);
+       state->c = NULL;
+#endif /* USE_INFINIBAND */
 
        event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 
0), 
                        ctdb_call_timeout, state);

=== modified file 'ib/ibw_ctdb_init.c'
--- a/ib/ibw_ctdb_init.c        2007-01-04 15:44:41 +0000
+++ b/ib/ibw_ctdb_init.c        2007-01-05 17:13:35 +0000
@@ -29,6 +29,9 @@
 #include "ibwrapper.h"
 #include "ibw_ctdb.h"
 
+/* not nice; temporary workaround for the current implementation... */
+static void *last_key = NULL;
+
 static int ctdb_ibw_listen(struct ctdb_context *ctdb, int backlog)
 {
        struct ibw_ctx *ictx = talloc_get_type(ctdb->private, struct ibw_ctx);
@@ -108,14 +111,12 @@
 /*
  * transport packet allocator - allows transport to control memory for packets
  */
-static void *ctdb_ibw_allocate_pkt(struct ctdb_context *ctdb, size_t size)
+static void *ctdb_ibw_allocate_pkt(struct ctdb_node *node, size_t size)
 {
        struct ibw_conn *conn = NULL;
        void *buf = NULL;
-       void *key; /* TODO: expand the param list with this */
 
-       /* TODO2: !!! I need "node" or ibw_conn here */
-       if (ibw_alloc_send_buf(conn, &buf, &key, (int)size))
+       if (ibw_alloc_send_buf(conn, &buf, &last_key, size))
                return NULL;
 
        return buf;
@@ -124,20 +125,40 @@
 static int ctdb_ibw_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t 
length)
 {
        struct ibw_conn *conn = talloc_get_type(node->private, struct ibw_conn);
-       void *key = NULL; /* TODO: expand the param list with this */
-
-       assert(conn!=NULL);
-       return ibw_send(conn, data, key, length);
+       int     rc;
+
+       rc = ibw_send(conn, data, last_key, length);
+       last_key = NULL;
+
+       return rc;
+}
+
+static void ctdb_ibw_dealloc_pkt(struct ctdb_node *node, void *data)
+{
+       if (last_key) {
+               struct ibw_conn *conn = talloc_get_type(node->private, struct 
ibw_conn);
+       
+               assert(conn!=NULL);
+               ibw_cancel_send_buf(conn, data, last_key);
+       } /* else ibw_send is already using it and will free it after 
completion */
+}
+
+static int ctdb_ibw_stop(struct ctdb_context *cctx)
+{
+       struct ibw_ctx *ictx = talloc_get_type(cctx->private, struct ibw_ctx);
+
+       assert(ictx!=NULL);
+       return ibw_stop(ictx);
 }
 
 static const struct ctdb_methods ctdb_ibw_methods = {
        .start     = ctdb_ibw_start,
        .add_node  = ctdb_ibw_add_node,
        .queue_pkt = ctdb_ibw_queue_pkt,
-       .allocate_pkt = ctdb_ibw_allocate_pkt
-       
-//     .dealloc_pkt = ctdb_ibw_dealloc_pkt
-//     .stop = ctdb_ibw_stop
+       .allocate_pkt = ctdb_ibw_allocate_pkt,
+
+       .dealloc_pkt = ctdb_ibw_dealloc_pkt,
+       .stop = ctdb_ibw_stop
 };
 
 /*
@@ -146,7 +167,7 @@
 int ctdb_ibw_init(struct ctdb_context *ctdb)
 {
        struct ibw_ctx *ictx;
-       
+
        ictx = ibw_init(
                NULL, //struct ibw_initattr *attr, /* TODO */
                0, //int nattr, /* TODO */

=== modified file 'include/ctdb_private.h'
--- a/include/ctdb_private.h    2006-12-19 23:32:31 +0000
+++ b/include/ctdb_private.h    2007-01-05 17:13:35 +0000
@@ -55,7 +55,9 @@
        int (*start)(struct ctdb_context *); /* start protocol processing */    
        int (*add_node)(struct ctdb_node *); /* setup a new node */     
        int (*queue_pkt)(struct ctdb_node *, uint8_t *data, uint32_t length);
-       void *(*allocate_pkt)(struct ctdb_context *, size_t );
+       void *(*allocate_pkt)(struct ctdb_node *, size_t);
+       void (*dealloc_pkt)(struct ctdb_node *, void *data);
+       int (*stop)(struct ctdb_context *); /* initiate stopping the protocol */
 };
 
 /*
@@ -70,6 +72,9 @@
 
        /* node_connected is called when a connection to a node is established 
*/
        void (*node_connected)(struct ctdb_node *);
+
+       /* protocol has been stopped */
+       void (*stopped)(struct ctdb_context *);
 };
 
 /* main state of the ctdb daemon */

=== modified file 'tcp/tcp_init.c'
--- a/tcp/tcp_init.c    2006-12-19 01:07:07 +0000
+++ b/tcp/tcp_init.c    2007-01-05 17:13:35 +0000
@@ -67,21 +67,31 @@
 /*
   transport packet allocator - allows transport to control memory for packets
 */
-void *ctdb_tcp_allocate_pkt(struct ctdb_context *ctdb, size_t size)
+void *ctdb_tcp_allocate_pkt(struct ctdb_node *node, size_t size)
 {
        /* tcp transport needs to round to 8 byte alignment to ensure
           that we can use a length header and 64 bit elements in
           structures */
        size = (size+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
-       return talloc_size(ctdb, size);
-}
-
+       return talloc_size(node, size);
+}
+
+void ctdb_tcp_dealloc_pkt(struct ctdb_node *node, void *buf)
+{
+       talloc_free(buf);
+}
+
+int ctdb_tcp_stop(struct ctdb_context *ctdb)
+{
+       return 0;
+}
 
 static const struct ctdb_methods ctdb_tcp_methods = {
        .start     = ctdb_tcp_start,
        .add_node  = ctdb_tcp_add_node,
        .queue_pkt = ctdb_tcp_queue_pkt,
-       .allocate_pkt = ctdb_tcp_allocate_pkt
+       .allocate_pkt = ctdb_tcp_allocate_pkt,
+       .dealloc_pkt = ctdb_tcp_dealloc_pkt
 };
 
 /*

Reply via email to