Author: tridge Date: 2006-04-12 06:08:24 +0000 (Wed, 12 Apr 2006) New Revision: 15049
WebSVN: http://websvn.samba.org/cgi-bin/viewcvs.cgi?view=rev&root=samba&rev=15049 Log: for really efficient oplock handling with thousands of open files we will need a separate messaging endpoint per open file. To make this efficient extend the messaging layer to have a new registration function for temporary message types that maps via an idtree. I have updated the LOCAL-MESSAGING test to use the new function. Modified: branches/SAMBA_4_0/source/lib/messaging/irpc.h branches/SAMBA_4_0/source/lib/messaging/messaging.c branches/SAMBA_4_0/source/lib/messaging/messaging.h branches/SAMBA_4_0/source/torture/local/messaging.c Changeset: Modified: branches/SAMBA_4_0/source/lib/messaging/irpc.h =================================================================== --- branches/SAMBA_4_0/source/lib/messaging/irpc.h 2006-04-12 04:42:40 UTC (rev 15048) +++ branches/SAMBA_4_0/source/lib/messaging/irpc.h 2006-04-12 06:08:24 UTC (rev 15049) @@ -76,14 +76,18 @@ } async; }; +typedef void (*msg_callback_t)(struct messaging_context *msg, void *private, + uint32_t msg_type, uint32_t server_id, DATA_BLOB *data); struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id, struct event_context *ev); NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, uint32_t msg_type, DATA_BLOB *data); -void messaging_register(struct messaging_context *msg, void *private, - uint32_t msg_type, - void (*fn)(struct messaging_context *, void *, uint32_t, uint32_t, DATA_BLOB *)); +NTSTATUS messaging_register(struct messaging_context *msg, void *private, + uint32_t msg_type, + msg_callback_t fn); +NTSTATUS messaging_register_tmp(struct messaging_context *msg, void *private, + msg_callback_t fn, uint32_t *msg_type); struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id, struct event_context *ev); struct messaging_context *messaging_client_init(TALLOC_CTX *mem_ctx, Modified: branches/SAMBA_4_0/source/lib/messaging/messaging.c =================================================================== --- branches/SAMBA_4_0/source/lib/messaging/messaging.c 2006-04-12 04:42:40 UTC (rev 15048) +++ branches/SAMBA_4_0/source/lib/messaging/messaging.c 2006-04-12 06:08:24 UTC (rev 15049) @@ -41,7 +41,9 @@ struct socket_context *sock; const char *base_path; const char *path; - struct dispatch_fn *dispatch; + struct dispatch_fn **dispatch; + uint32_t num_types; + struct idr_context *dispatch_tree; struct messaging_rec *pending; struct irpc_list *irpc; struct idr_context *idr; @@ -54,14 +56,13 @@ } event; }; -/* we have a linked list of dispatch handlers that this messaging - server can deal with */ +/* we have a linked list of dispatch handlers for each msg_type that + this messaging server can deal with */ struct dispatch_fn { struct dispatch_fn *next, *prev; uint32_t msg_type; void *private; - void (*fn)(struct messaging_context *msg, void *private, - uint32_t msg_type, uint32_t server_id, DATA_BLOB *data); + msg_callback_t fn; }; /* an individual message */ @@ -127,14 +128,22 @@ static void messaging_dispatch(struct messaging_context *msg, struct messaging_rec *rec) { struct dispatch_fn *d, *next; - for (d=msg->dispatch;d;d=next) { + + /* temporary IDs use an idtree, the rest use a array of pointers */ + if (rec->header->msg_type >= MSG_TMP_BASE) { + d = idr_find(msg->dispatch_tree, rec->header->msg_type); + } else if (rec->header->msg_type < msg->num_types) { + d = msg->dispatch[rec->header->msg_type]; + } else { + d = NULL; + } + + for (; d; d = next) { + DATA_BLOB data; next = d->next; - if (d->msg_type == rec->header->msg_type) { - DATA_BLOB data; - data.data = rec->packet.data + sizeof(*rec->header); - data.length = rec->header->length; - d->fn(msg, d->private, d->msg_type, rec->header->from, &data); - } + data.data = rec->packet.data + sizeof(*rec->header); + data.length = rec->header->length; + d->fn(msg, d->private, d->msg_type, rec->header->from, &data); } rec->header->length = 0; } @@ -272,34 +281,96 @@ /* Register a dispatch function for a particular message type. */ -void messaging_register(struct messaging_context *msg, void *private, - uint32_t msg_type, - void (*fn)(struct messaging_context *, void *, uint32_t, uint32_t, DATA_BLOB *)) +NTSTATUS messaging_register(struct messaging_context *msg, void *private, + uint32_t msg_type, msg_callback_t fn) { struct dispatch_fn *d; - d = talloc(msg, struct dispatch_fn); + /* possibly expand dispatch array */ + if (msg_type >= msg->num_types) { + struct dispatch_fn **dp; + int i; + dp = talloc_realloc(msg, msg->dispatch, struct dispatch_fn *, msg_type+1); + NT_STATUS_HAVE_NO_MEMORY(dp); + msg->dispatch = dp; + for (i=msg->num_types;i<=msg_type;i++) { + msg->dispatch[i] = NULL; + } + msg->num_types = msg_type+1; + } + + + d = talloc(msg->dispatch, struct dispatch_fn); + NT_STATUS_HAVE_NO_MEMORY(d); d->msg_type = msg_type; d->private = private; d->fn = fn; - DLIST_ADD(msg->dispatch, d); + + DLIST_ADD(msg->dispatch[msg_type], d); + + return NT_STATUS_OK; } /* + register a temporary message handler. The msg_type is allocated + above MSG_TMP_BASE +*/ +NTSTATUS messaging_register_tmp(struct messaging_context *msg, void *private, + msg_callback_t fn, uint32_t *msg_type) +{ + struct dispatch_fn *d; + int id; + + d = talloc_zero(msg->dispatch, struct dispatch_fn); + NT_STATUS_HAVE_NO_MEMORY(d); + d->private = private; + d->fn = fn; + + id = idr_get_new_above(msg->dispatch_tree, d, MSG_TMP_BASE, UINT16_MAX); + if (id == -1) { + talloc_free(d); + return NT_STATUS_TOO_MANY_CONTEXT_IDS; + } + + d->msg_type = (uint32_t)id; + (*msg_type) = d->msg_type; + + return NT_STATUS_OK; +} + +/* De-register the function for a particular message type. */ void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void *private) { - struct dispatch_fn *d, *next; + struct dispatch_fn *d, *list, *next; - for (d = msg->dispatch; d; d = next) { + if (msg_type >= msg->num_types) { + list = idr_find(msg->dispatch_tree, msg_type); + } else { + list = msg->dispatch[msg_type]; + } + + if (list == NULL) { + return; + } + + for (d = list; d; d = next) { next = d->next; - if (d->msg_type == msg_type && - d->private == private) { - DLIST_REMOVE(msg->dispatch, d); + if (d->private == private) { + DLIST_REMOVE(list, d); talloc_free(d); } } + + /* the list base possibly changed */ + if (list == NULL) { + if (msg_type >= msg->num_types) { + idr_remove(msg->dispatch_tree, msg_type); + } else { + msg->dispatch[msg_type] = NULL; + } + } } @@ -397,7 +468,7 @@ struct socket_address *path; char *dir; - msg = talloc(mem_ctx, struct messaging_context); + msg = talloc_zero(mem_ctx, struct messaging_context); if (msg == NULL) { return NULL; } @@ -411,15 +482,12 @@ mkdir(dir, 0700); talloc_free(dir); - msg->base_path = smbd_tmp_path(msg, "messaging"); - msg->path = messaging_path(msg, server_id); - msg->server_id = server_id; - msg->dispatch = NULL; - msg->pending = NULL; - msg->idr = idr_init(msg); - msg->irpc = NULL; - msg->names = NULL; - msg->start_time = timeval_current(); + msg->base_path = smbd_tmp_path(msg, "messaging"); + msg->path = messaging_path(msg, server_id); + msg->server_id = server_id; + msg->idr = idr_init(msg); + msg->dispatch_tree = idr_init(msg); + msg->start_time = timeval_current(); status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0); if (!NT_STATUS_IS_OK(status)) { Modified: branches/SAMBA_4_0/source/lib/messaging/messaging.h =================================================================== --- branches/SAMBA_4_0/source/lib/messaging/messaging.h 2006-04-12 04:42:40 UTC (rev 15048) +++ branches/SAMBA_4_0/source/lib/messaging/messaging.h 2006-04-12 06:08:24 UTC (rev 15049) @@ -34,4 +34,7 @@ #define MSG_PVFS_NOTIFY 7 #define MSG_NTVFS_OPLOCK_BREAK 8 +/* temporary messaging endpoints are allocated above this line */ +#define MSG_TMP_BASE 1000 + #endif Modified: branches/SAMBA_4_0/source/torture/local/messaging.c =================================================================== --- branches/SAMBA_4_0/source/torture/local/messaging.c 2006-04-12 04:42:40 UTC (rev 15048) +++ branches/SAMBA_4_0/source/torture/local/messaging.c 2006-04-12 06:08:24 UTC (rev 15049) @@ -25,13 +25,14 @@ #include "lib/messaging/irpc.h" #include "torture/torture.h" -enum {MY_PING=1000, MY_PONG, MY_EXIT}; +static uint32_t msg_pong; + static void ping_message(struct messaging_context *msg, void *private, uint32_t msg_type, uint32_t src, DATA_BLOB *data) { NTSTATUS status; - status = messaging_send(msg, src, MY_PONG, data); + status = messaging_send(msg, src, msg_pong, data); if (!NT_STATUS_IS_OK(status)) { printf("pong failed - %s\n", nt_errstr(status)); } @@ -64,6 +65,7 @@ BOOL ret = True; struct timeval tv; int timelimit = lp_parm_int(-1, "torture", "timelimit", 10); + uint32_t msg_ping, msg_exit; lp_set_cmdline("lock dir", "lockdir.tmp"); @@ -77,8 +79,8 @@ return False; } - messaging_register(msg_server_ctx, NULL, MY_PING, ping_message); - messaging_register(msg_server_ctx, mem_ctx, MY_EXIT, exit_message); + messaging_register_tmp(msg_server_ctx, NULL, ping_message, &msg_ping); + messaging_register_tmp(msg_server_ctx, mem_ctx, exit_message, &msg_exit); msg_client_ctx = messaging_init(mem_ctx, 2, ev); @@ -87,7 +89,7 @@ return False; } - messaging_register(msg_client_ctx, &pong_count, MY_PONG, pong_message); + messaging_register_tmp(msg_client_ctx, &pong_count, pong_message, &msg_pong); tv = timeval_current(); @@ -99,8 +101,8 @@ data.data = discard_const_p(uint8_t, "testing"); data.length = strlen((const char *)data.data); - status1 = messaging_send(msg_client_ctx, 1, MY_PING, &data); - status2 = messaging_send(msg_client_ctx, 1, MY_PING, NULL); + status1 = messaging_send(msg_client_ctx, 1, msg_ping, &data); + status2 = messaging_send(msg_client_ctx, 1, msg_ping, NULL); if (!NT_STATUS_IS_OK(status1)) { printf("msg1 failed - %s\n", nt_errstr(status1)); @@ -126,7 +128,7 @@ } printf("sending exit\n"); - messaging_send(msg_client_ctx, 1, MY_EXIT, NULL); + messaging_send(msg_client_ctx, 1, msg_exit, NULL); if (ping_count != pong_count) { printf("ping test failed! received %d, sent %d\n",