Author: tross Date: Wed Nov 13 20:11:31 2013 New Revision: 1541689 URL: http://svn.apache.org/r1541689 Log: QPID-5319 - Added management access for connections. Applied patch from Ernie Allen
Modified: qpid/dispatch/trunk/src/server.c qpid/dispatch/trunk/src/server_private.h qpid/dispatch/trunk/tools/src/py/qdstat Modified: qpid/dispatch/trunk/src/server.c URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server.c?rev=1541689&r1=1541688&r2=1541689&view=diff ============================================================================== --- qpid/dispatch/trunk/src/server.c (original) +++ qpid/dispatch/trunk/src/server.c Wed Nov 13 20:11:31 2013 @@ -19,6 +19,7 @@ #include <qpid/dispatch/ctools.h> #include <qpid/dispatch/threading.h> +#include <qpid/dispatch/agent.h> #include <qpid/dispatch/log.h> #include "server_private.h" #include "timer_private.h" @@ -27,6 +28,7 @@ #include "work_queue.h" #include <stdio.h> #include <time.h> +#include <string.h> static char *module="SERVER"; static __thread dx_server_t *thread_server = 0; @@ -64,6 +66,7 @@ struct dx_server_t { dx_signal_handler_cb_t signal_handler; void *signal_context; int pending_signal; + dx_connection_list_t connections; }; @@ -102,6 +105,7 @@ static void thread_process_listeners(dx_ cxtr = pn_listener_accept(listener); dx_log(module, LOG_TRACE, "Accepting Connection from %s", pn_connector_name(cxtr)); ctx = new_dx_connection_t(); + DEQ_ITEM_INIT(ctx); ctx->state = CONN_STATE_OPENING; ctx->owner_thread = CONTEXT_NO_OWNER; ctx->enqueued = 0; @@ -119,6 +123,10 @@ static void thread_process_listeners(dx_ pn_connection_set_context(conn, ctx); ctx->pn_conn = conn; + dx_log(module, LOG_DEBUG, "added listener connection"); + // dx_server->lock is already locked + DEQ_INSERT_TAIL(dx_server->connections, ctx); + // // Get a pointer to the transport so we can insert security components into it // @@ -529,6 +537,9 @@ static void *thread_run(void *arg) } sys_mutex_lock(dx_server->lock); + DEQ_REMOVE(dx_server->connections, ctx); + dx_log(module, LOG_DEBUG, "removed %s connection", + ctx->connector ? "connector" : "listener"); free_dx_connection_t(ctx); pn_connector_free(work); if (conn) @@ -604,6 +615,7 @@ static void cxtr_try_open(void *context) return; dx_connection_t *ctx = new_dx_connection_t(); + DEQ_ITEM_INIT(ctx); ctx->server = ct->server; ctx->state = CONN_STATE_CONNECTING; ctx->owner_thread = CONTEXT_NO_OWNER; @@ -621,6 +633,8 @@ static void cxtr_try_open(void *context) // sys_mutex_lock(ct->server->lock); ctx->pn_cxtr = pn_connector(ct->server->driver, ct->config->host, ct->config->port, (void*) ctx); + DEQ_INSERT_TAIL(ct->server->connections, ctx); + dx_log(module, LOG_DEBUG, "added connector connection"); sys_mutex_unlock(ct->server->lock); ct->ctx = ctx; @@ -637,6 +651,7 @@ dx_server_t *dx_server(int thread_count, if (dx_server == 0) return 0; + DEQ_INIT(dx_server->connections); dx_server->thread_count = thread_count; dx_server->container_name = container_name; dx_server->driver = pn_driver(); @@ -671,12 +686,6 @@ dx_server_t *dx_server(int thread_count, } -void dx_server_setup_agent(dx_dispatch_t *dx) -{ - // TODO -} - - void dx_server_free(dx_server_t *dx_server) { int i; @@ -971,6 +980,7 @@ dx_user_fd_t *dx_user_fd(dx_dispatch_t * return 0; dx_connection_t *ctx = new_dx_connection_t(); + DEQ_ITEM_INIT(ctx); ctx->server = dx_server; ctx->state = CONN_STATE_USER; ctx->owner_thread = CONTEXT_NO_OWNER; @@ -1037,3 +1047,66 @@ void dx_server_timer_cancel_LH(dx_timer_ DEQ_REMOVE(timer->server->pending_timers, timer); } + +static void server_schema_handler(void *context, void *correlator) +{ +} + + +static void server_query_handler(void* context, const char *id, void *cor) +{ + dx_server_t *dx_server = (dx_server_t*) context; + sys_mutex_lock(dx_server->lock); + const char *conn_state; + const dx_server_config_t *config; + const char *pn_container_name; + const char *direction; + + dx_connection_t *conn = DEQ_HEAD(dx_server->connections); + while (conn) { + switch (conn->state) { + case CONN_STATE_CONNECTING: conn_state = "Connecting"; break; + case CONN_STATE_OPENING: conn_state = "Opening"; break; + case CONN_STATE_OPERATIONAL: conn_state = "Operational"; break; + case CONN_STATE_FAILED: conn_state = "Failed"; break; + case CONN_STATE_USER: conn_state = "User"; break; + default: conn_state = "undefined"; break; + } + dx_agent_value_string(cor, "state", conn_state); + // get remote container name using proton connection + pn_container_name = pn_connection_remote_container(conn->pn_conn); + if (pn_container_name) + dx_agent_value_string(cor, "container", pn_container_name); + else + dx_agent_value_null(cor, "container"); + + // and now for some config entries + if (conn->connector) { + config = conn->connector->config; + direction = "out"; + char host[1000]; + strcpy(host, config->host); + strcat(host, ":"); + strcat(host, config->port); + dx_agent_value_string(cor, "host", host); + } else { + config = conn->listener->config; + direction = "in"; + dx_agent_value_string(cor, "host", pn_connector_name(conn->pn_cxtr)); + } + + dx_agent_value_string(cor, "sasl", config->sasl_mechanisms); + dx_agent_value_string(cor, "role", config->role); + dx_agent_value_string(cor, "dir", direction); + + conn = DEQ_NEXT(conn); + dx_agent_value_complete(cor, conn != 0); + } + sys_mutex_unlock(dx_server->lock); +} + + +void dx_server_setup_agent(dx_dispatch_t *dx) +{ + dx_agent_register_class(dx, "org.apache.qpid.dispatch.connection", dx->server, server_schema_handler, server_query_handler); +} Modified: qpid/dispatch/trunk/src/server_private.h URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server_private.h?rev=1541689&r1=1541688&r2=1541689&view=diff ============================================================================== --- qpid/dispatch/trunk/src/server_private.h (original) +++ qpid/dispatch/trunk/src/server_private.h Wed Nov 13 20:11:31 2013 @@ -23,7 +23,9 @@ #include <qpid/dispatch/user_fd.h> #include <qpid/dispatch/timer.h> #include <qpid/dispatch/alloc.h> +#include <qpid/dispatch/ctools.h> #include <proton/driver.h> +#include <proton/engine.h> #include <proton/driver_extras.h> void dx_server_timer_pending_LH(dx_timer_t *timer); @@ -68,6 +70,7 @@ struct dx_connector_t { struct dx_connection_t { + DEQ_LINKS(dx_connection_t); dx_server_t *server; conn_state_t state; int owner_thread; @@ -96,5 +99,6 @@ ALLOC_DECLARE(dx_connector_t); ALLOC_DECLARE(dx_connection_t); ALLOC_DECLARE(dx_user_fd_t); +DEQ_DECLARE(dx_connection_t, dx_connection_list_t); #endif Modified: qpid/dispatch/trunk/tools/src/py/qdstat URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tools/src/py/qdstat?rev=1541689&r1=1541688&r2=1541689&view=diff ============================================================================== --- qpid/dispatch/trunk/tools/src/py/qdstat (original) +++ qpid/dispatch/trunk/tools/src/py/qdstat Wed Nov 13 20:11:31 2013 @@ -108,8 +108,43 @@ class BusManager: def Disconnect(self): self.M.stop() - def displayConn(self): - pass + def displayConnections(self): + disp = Display(prefix=" ") + heads = [] + heads.append(Header("state")) + heads.append(Header("host")) + heads.append(Header("container")) + heads.append(Header("sasl-mechanisms")) + heads.append(Header("role")) + heads.append(Header("dir")) + + rows = [] + + request = Message() + response = Message() + + request.address = self.address + request.reply_to = self.reply + request.correlation_id = 1 + request.properties = {u'operation':u'GET', u'type':u'org.apache.qpid.dispatch.connection'} + + self.M.put(request) + self.M.send() + + self.M.recv() + self.M.get(response) + for conn in response.body: + row = [] + row.append(conn['state']) + row.append(conn['host']) + row.append(conn['container']) + row.append(conn['sasl']) + row.append(conn['role']) + row.append(conn['dir']) + rows.append(row) + title = "Connections" + dispRows = rows + disp.formattedTable(title, heads, dispRows) def _addr_class(self, addr): if not addr: @@ -301,6 +336,7 @@ class BusManager: elif main == 'n': self.displayRouterNodes() elif main == 'a': self.displayAddresses() elif main == 'm': self.displayMemory() + elif main == 'c': self.displayConnections() def display(self, names): self.displayMain(names, config._types) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org