------------------------------------------------------------ revno: 51 revision-id: [EMAIL PROTECTED] parent: [EMAIL PROTECTED] committer: Peter Somogyi <[EMAIL PROTECTED]> branch nick: ctdb timestamp: Fri 2007-01-05 18:13:35 +0100 message: ibw: modified tridge's code - in my point of view ibw_alloc_send and node-centric params are the basics of these important changes. Also tried to avoid memcpy/memdup where it was possible. modified: common/ctdb.c ctdb.c-20061127094323-t50f58d65iaao5of-2 common/ctdb_call.c ctdb_call.c-20061128065342-to93h6eejj5kon81-1 ib/ibw_ctdb_init.c ibw_ctdb_init.c-20070102171305-cn2z4k7ibx8141d5-1 include/ctdb_private.h ctdb_private.h-20061117234101-o3qt14umlg9en8z0-13 tcp/tcp_init.c tcp_init.c-20061128004937-x70q1cu5xzg5g2tm-2 === modified file 'common/ctdb.c' --- a/common/ctdb.c 2007-01-02 17:16:39 +0000 +++ b/common/ctdb.c 2007-01-05 17:13:35 +0000 @@ -30,14 +30,14 @@ int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport) { int ctdb_tcp_init(struct ctdb_context *ctdb); -#ifdef HAVE_INFINIBAND +#ifdef USE_INFINIBAND int ctdb_ibw_init(struct ctdb_context *ctdb); #endif /*HAVE_INFINIBAND*/ if (strcmp(transport, "tcp") == 0) { return ctdb_tcp_init(ctdb); } -#ifdef HAVE_INFINIBAND +#ifdef USE_INFINIBAND if (strcmp(transport, "ib") == 0) { return ctdb_ibw_init(ctdb); } @@ -256,10 +256,15 @@ } } +void ctdb_stopped(struct ctdb_context *ctdb) +{ +} + static const struct ctdb_upcalls ctdb_upcalls = { .recv_pkt = ctdb_recv_pkt, .node_dead = ctdb_node_dead, - .node_connected = ctdb_node_connected + .node_connected = ctdb_node_connected, + .stopped = ctdb_stopped }; /*
=== modified file 'common/ctdb_call.c' --- a/common/ctdb_call.c 2006-12-19 01:03:10 +0000 +++ b/common/ctdb_call.c 2007-01-05 17:13:35 +0000 @@ -31,12 +31,10 @@ /* queue a packet or die */ -static void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +static inline void ctdb_queue_packet(struct ctdb_node *node, struct ctdb_req_header *hdr) { - struct ctdb_node *node; - node = ctdb->nodes[hdr->destnode]; - if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) { - ctdb_fatal(ctdb, "Unable to queue packet\n"); + if (node->ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) { + ctdb_fatal(node->ctdb, "Unable to queue packet\n"); } } @@ -121,6 +119,9 @@ struct ctdb_reply_error *r; char *msg; int len; + struct ctdb_node *node; + + node = ctdb->nodes[hdr->srcnode]; va_start(ap, fmt); msg = talloc_vasprintf(ctdb, fmt, ap); @@ -130,7 +131,7 @@ va_end(ap); len = strlen(msg)+1; - r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + len); + r = ctdb->methods->allocate_pkt(node, sizeof(*r) + len); CTDB_NO_MEMORY_FATAL(ctdb, r); r->hdr.length = sizeof(*r) + len; r->hdr.operation = CTDB_REPLY_ERROR; @@ -143,9 +144,8 @@ talloc_free(msg); - ctdb_queue_packet(ctdb, &r->hdr); - - talloc_free(r); + ctdb_queue_packet(node, &r->hdr); + ctdb->methods->dealloc_pkt(node, r); } @@ -157,8 +157,11 @@ struct ctdb_ltdb_header *header) { struct ctdb_reply_redirect *r; - - r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r)); + struct ctdb_node *node; + + node = ctdb->nodes[c->hdr.srcnode]; + + r = ctdb->methods->allocate_pkt(node, sizeof(*r)); CTDB_NO_MEMORY_FATAL(ctdb, r); r->hdr.length = sizeof(*r); r->hdr.operation = CTDB_REPLY_REDIRECT; @@ -167,9 +170,8 @@ r->hdr.reqid = c->hdr.reqid; r->dmaster = header->dmaster; - ctdb_queue_packet(ctdb, &r->hdr); - - talloc_free(r); + ctdb_queue_packet(node, &r->hdr); + ctdb->methods->dealloc_pkt(node, r); } /* @@ -186,13 +188,18 @@ { struct ctdb_req_dmaster *r; int len; + struct ctdb_node *node; + uint32_t destnode; + + destnode = ctdb_lmaster(ctdb, key); + node = ctdb->nodes[destnode]; len = sizeof(*r) + key->dsize + data->dsize; - r = ctdb->methods->allocate_pkt(ctdb, len); + r = ctdb->methods->allocate_pkt(node, len); CTDB_NO_MEMORY_FATAL(ctdb, r); r->hdr.length = len; r->hdr.operation = CTDB_REQ_DMASTER; - r->hdr.destnode = ctdb_lmaster(ctdb, key); + r->hdr.destnode = destnode; r->hdr.srcnode = ctdb->vnn; r->hdr.reqid = c->hdr.reqid; r->dmaster = header->laccessor; @@ -205,14 +212,14 @@ /* we are the lmaster - don't send to ourselves */ ctdb_request_dmaster(ctdb, &r->hdr); } else { - ctdb_queue_packet(ctdb, &r->hdr); + ctdb_queue_packet(node, &r->hdr); /* update the ltdb to record the new dmaster */ header->dmaster = r->hdr.destnode; ctdb_ltdb_store(ctdb, *key, header, *data); } - talloc_free(r); + ctdb->methods->dealloc_pkt(node, r); } @@ -229,7 +236,9 @@ TDB_DATA key, data; struct ctdb_ltdb_header header; int ret; + struct ctdb_node *node; + node = ctdb->nodes[c->dmaster]; key.dptr = c->data; key.dsize = c->keylen; data.dptr = c->data + c->keylen; @@ -255,7 +264,7 @@ } /* send the CTDB_REPLY_DMASTER */ - r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + data.dsize); + r = ctdb->methods->allocate_pkt(node, sizeof(*r) + data.dsize); CTDB_NO_MEMORY_FATAL(ctdb, r); r->hdr.length = sizeof(*r) + data.dsize; r->hdr.operation = CTDB_REPLY_DMASTER; @@ -265,9 +274,8 @@ r->datalen = data.dsize; memcpy(&r->data[0], data.dptr, data.dsize); - ctdb_queue_packet(ctdb, &r->hdr); - - talloc_free(r); + ctdb_queue_packet(node, &r->hdr); + ctdb->methods->dealloc_pkt(node, r); } @@ -281,7 +289,9 @@ struct ctdb_reply_call *r; int ret; struct ctdb_ltdb_header header; + struct ctdb_node *node; + node = ctdb->nodes[hdr->srcnode]; key.dptr = c->data; key.dsize = c->keylen; call_data.dptr = c->data + c->keylen; @@ -317,7 +327,7 @@ call_data.dsize?&call_data:NULL, &reply_data, c->hdr.srcnode); - r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + reply_data.dsize); + r = ctdb->methods->allocate_pkt(node, sizeof(*r) + reply_data.dsize); CTDB_NO_MEMORY_FATAL(ctdb, r); r->hdr.length = sizeof(*r) + reply_data.dsize; r->hdr.operation = CTDB_REPLY_CALL; @@ -327,10 +337,10 @@ r->datalen = reply_data.dsize; memcpy(&r->data[0], reply_data.dptr, reply_data.dsize); - ctdb_queue_packet(ctdb, &r->hdr); + ctdb_queue_packet(node, &r->hdr); + ctdb->methods->dealloc_pkt(node, r); talloc_free(reply_data.dptr); - talloc_free(r); } enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR}; @@ -440,7 +450,10 @@ { struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr; struct ctdb_call_state *state; - + struct ctdb_node *node; +#ifdef USE_INFINIBAND + uint8_t *r; +#endif /* USE_INFINIBAND */ state = idr_find(ctdb->idr, hdr->reqid); talloc_steal(state, c); @@ -453,7 +466,18 @@ /* send it off again */ state->node = ctdb->nodes[c->dmaster]; - ctdb_queue_packet(ctdb, &state->c->hdr); + node = ctdb->nodes[state->c->hdr.destnode]; + +#ifdef USE_INFINIBAND + r = ctdb->methods->allocate_pkt(node, state->c->hdr.length); + memcpy(r, &state->c->hdr, state->c->hdr.length); +#endif /* USE_INFINIBAND */ + + ctdb_queue_packet(node, &state->c->hdr); + +#ifdef USE_INFINIBAND + ctdb->methods->dealloc_pkt(node, r); +#endif /* USE_INFINIBAND */ } /* @@ -520,6 +544,7 @@ int ret; struct ctdb_ltdb_header header; TDB_DATA data; + struct ctdb_node *node; /* if we are the dmaster for this key then we don't need to @@ -538,8 +563,9 @@ state = talloc_zero(ctdb, struct ctdb_call_state); CTDB_NO_MEMORY_NULL(ctdb, state); + node = ctdb->nodes[header.dmaster]; len = sizeof(*state->c) + key.dsize + (call_data?call_data->dsize:0); - state->c = ctdb->methods->allocate_pkt(ctdb, len); + state->c = ctdb->methods->allocate_pkt(node, len); CTDB_NO_MEMORY_NULL(ctdb, state->c); state->c->hdr.length = len; @@ -566,7 +592,12 @@ talloc_set_destructor(state, ctdb_call_destructor); - ctdb_queue_packet(ctdb, &state->c->hdr); + ctdb_queue_packet(node, &state->c->hdr); + +#ifdef USE_INFINIBAND + ctdb->methods->dealloc_pkt(node, state->c); + state->c = NULL; +#endif /* USE_INFINIBAND */ event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), ctdb_call_timeout, state); === modified file 'ib/ibw_ctdb_init.c' --- a/ib/ibw_ctdb_init.c 2007-01-04 15:44:41 +0000 +++ b/ib/ibw_ctdb_init.c 2007-01-05 17:13:35 +0000 @@ -29,6 +29,9 @@ #include "ibwrapper.h" #include "ibw_ctdb.h" +/* not nice; temporary workaround for the current implementation... */ +static void *last_key = NULL; + static int ctdb_ibw_listen(struct ctdb_context *ctdb, int backlog) { struct ibw_ctx *ictx = talloc_get_type(ctdb->private, struct ibw_ctx); @@ -108,14 +111,12 @@ /* * transport packet allocator - allows transport to control memory for packets */ -static void *ctdb_ibw_allocate_pkt(struct ctdb_context *ctdb, size_t size) +static void *ctdb_ibw_allocate_pkt(struct ctdb_node *node, size_t size) { struct ibw_conn *conn = NULL; void *buf = NULL; - void *key; /* TODO: expand the param list with this */ - /* TODO2: !!! I need "node" or ibw_conn here */ - if (ibw_alloc_send_buf(conn, &buf, &key, (int)size)) + if (ibw_alloc_send_buf(conn, &buf, &last_key, size)) return NULL; return buf; @@ -124,20 +125,40 @@ static int ctdb_ibw_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length) { struct ibw_conn *conn = talloc_get_type(node->private, struct ibw_conn); - void *key = NULL; /* TODO: expand the param list with this */ - - assert(conn!=NULL); - return ibw_send(conn, data, key, length); + int rc; + + rc = ibw_send(conn, data, last_key, length); + last_key = NULL; + + return rc; +} + +static void ctdb_ibw_dealloc_pkt(struct ctdb_node *node, void *data) +{ + if (last_key) { + struct ibw_conn *conn = talloc_get_type(node->private, struct ibw_conn); + + assert(conn!=NULL); + ibw_cancel_send_buf(conn, data, last_key); + } /* else ibw_send is already using it and will free it after completion */ +} + +static int ctdb_ibw_stop(struct ctdb_context *cctx) +{ + struct ibw_ctx *ictx = talloc_get_type(cctx->private, struct ibw_ctx); + + assert(ictx!=NULL); + return ibw_stop(ictx); } static const struct ctdb_methods ctdb_ibw_methods = { .start = ctdb_ibw_start, .add_node = ctdb_ibw_add_node, .queue_pkt = ctdb_ibw_queue_pkt, - .allocate_pkt = ctdb_ibw_allocate_pkt - -// .dealloc_pkt = ctdb_ibw_dealloc_pkt -// .stop = ctdb_ibw_stop + .allocate_pkt = ctdb_ibw_allocate_pkt, + + .dealloc_pkt = ctdb_ibw_dealloc_pkt, + .stop = ctdb_ibw_stop }; /* @@ -146,7 +167,7 @@ int ctdb_ibw_init(struct ctdb_context *ctdb) { struct ibw_ctx *ictx; - + ictx = ibw_init( NULL, //struct ibw_initattr *attr, /* TODO */ 0, //int nattr, /* TODO */ === modified file 'include/ctdb_private.h' --- a/include/ctdb_private.h 2006-12-19 23:32:31 +0000 +++ b/include/ctdb_private.h 2007-01-05 17:13:35 +0000 @@ -55,7 +55,9 @@ int (*start)(struct ctdb_context *); /* start protocol processing */ int (*add_node)(struct ctdb_node *); /* setup a new node */ int (*queue_pkt)(struct ctdb_node *, uint8_t *data, uint32_t length); - void *(*allocate_pkt)(struct ctdb_context *, size_t ); + void *(*allocate_pkt)(struct ctdb_node *, size_t); + void (*dealloc_pkt)(struct ctdb_node *, void *data); + int (*stop)(struct ctdb_context *); /* initiate stopping the protocol */ }; /* @@ -70,6 +72,9 @@ /* node_connected is called when a connection to a node is established */ void (*node_connected)(struct ctdb_node *); + + /* protocol has been stopped */ + void (*stopped)(struct ctdb_context *); }; /* main state of the ctdb daemon */ === modified file 'tcp/tcp_init.c' --- a/tcp/tcp_init.c 2006-12-19 01:07:07 +0000 +++ b/tcp/tcp_init.c 2007-01-05 17:13:35 +0000 @@ -67,21 +67,31 @@ /* transport packet allocator - allows transport to control memory for packets */ -void *ctdb_tcp_allocate_pkt(struct ctdb_context *ctdb, size_t size) +void *ctdb_tcp_allocate_pkt(struct ctdb_node *node, size_t size) { /* tcp transport needs to round to 8 byte alignment to ensure that we can use a length header and 64 bit elements in structures */ size = (size+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1); - return talloc_size(ctdb, size); -} - + return talloc_size(node, size); +} + +void ctdb_tcp_dealloc_pkt(struct ctdb_node *node, void *buf) +{ + talloc_free(buf); +} + +int ctdb_tcp_stop(struct ctdb_context *ctdb) +{ + return 0; +} static const struct ctdb_methods ctdb_tcp_methods = { .start = ctdb_tcp_start, .add_node = ctdb_tcp_add_node, .queue_pkt = ctdb_tcp_queue_pkt, - .allocate_pkt = ctdb_tcp_allocate_pkt + .allocate_pkt = ctdb_tcp_allocate_pkt, + .dealloc_pkt = ctdb_tcp_dealloc_pkt }; /*