Author: tross
Date: Wed Nov 13 20:11:31 2013
New Revision: 1541689

URL: http://svn.apache.org/r1541689
Log:
QPID-5319 - Added management access for connections.
Applied patch from Ernie Allen

Modified:
    qpid/dispatch/trunk/src/server.c
    qpid/dispatch/trunk/src/server_private.h
    qpid/dispatch/trunk/tools/src/py/qdstat

Modified: qpid/dispatch/trunk/src/server.c
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server.c?rev=1541689&r1=1541688&r2=1541689&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server.c (original)
+++ qpid/dispatch/trunk/src/server.c Wed Nov 13 20:11:31 2013
@@ -19,6 +19,7 @@
 
 #include <qpid/dispatch/ctools.h>
 #include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/agent.h>
 #include <qpid/dispatch/log.h>
 #include "server_private.h"
 #include "timer_private.h"
@@ -27,6 +28,7 @@
 #include "work_queue.h"
 #include <stdio.h>
 #include <time.h>
+#include <string.h>
 
 static char *module="SERVER";
 static __thread dx_server_t *thread_server = 0;
@@ -64,6 +66,7 @@ struct dx_server_t {
     dx_signal_handler_cb_t   signal_handler;
     void                    *signal_context;
     int                      pending_signal;
+    dx_connection_list_t     connections;
 };
 
 
@@ -102,6 +105,7 @@ static void thread_process_listeners(dx_
         cxtr = pn_listener_accept(listener);
         dx_log(module, LOG_TRACE, "Accepting Connection from %s", 
pn_connector_name(cxtr));
         ctx = new_dx_connection_t();
+        DEQ_ITEM_INIT(ctx);
         ctx->state        = CONN_STATE_OPENING;
         ctx->owner_thread = CONTEXT_NO_OWNER;
         ctx->enqueued     = 0;
@@ -119,6 +123,10 @@ static void thread_process_listeners(dx_
         pn_connection_set_context(conn, ctx);
         ctx->pn_conn = conn;
 
+        dx_log(module, LOG_DEBUG, "added listener connection");
+        // dx_server->lock is already locked
+        DEQ_INSERT_TAIL(dx_server->connections, ctx);
+
         //
         // Get a pointer to the transport so we can insert security components 
into it
         //
@@ -529,6 +537,9 @@ static void *thread_run(void *arg)
                 }
 
                 sys_mutex_lock(dx_server->lock);
+                DEQ_REMOVE(dx_server->connections, ctx);
+                dx_log(module, LOG_DEBUG, "removed %s connection",
+                        ctx->connector ? "connector" : "listener");
                 free_dx_connection_t(ctx);
                 pn_connector_free(work);
                 if (conn)
@@ -604,6 +615,7 @@ static void cxtr_try_open(void *context)
         return;
 
     dx_connection_t *ctx = new_dx_connection_t();
+    DEQ_ITEM_INIT(ctx);
     ctx->server       = ct->server;
     ctx->state        = CONN_STATE_CONNECTING;
     ctx->owner_thread = CONTEXT_NO_OWNER;
@@ -621,6 +633,8 @@ static void cxtr_try_open(void *context)
     //
     sys_mutex_lock(ct->server->lock);
     ctx->pn_cxtr = pn_connector(ct->server->driver, ct->config->host, 
ct->config->port, (void*) ctx);
+    DEQ_INSERT_TAIL(ct->server->connections, ctx);
+    dx_log(module, LOG_DEBUG, "added connector connection");
     sys_mutex_unlock(ct->server->lock);
 
     ct->ctx   = ctx;
@@ -637,6 +651,7 @@ dx_server_t *dx_server(int thread_count,
     if (dx_server == 0)
         return 0;
 
+    DEQ_INIT(dx_server->connections);
     dx_server->thread_count    = thread_count;
     dx_server->container_name  = container_name;
     dx_server->driver          = pn_driver();
@@ -671,12 +686,6 @@ dx_server_t *dx_server(int thread_count,
 }
 
 
-void dx_server_setup_agent(dx_dispatch_t *dx)
-{
-    // TODO
-}
-
-
 void dx_server_free(dx_server_t *dx_server)
 {
     int i;
@@ -971,6 +980,7 @@ dx_user_fd_t *dx_user_fd(dx_dispatch_t *
         return 0;
 
     dx_connection_t *ctx = new_dx_connection_t();
+    DEQ_ITEM_INIT(ctx);
     ctx->server       = dx_server;
     ctx->state        = CONN_STATE_USER;
     ctx->owner_thread = CONTEXT_NO_OWNER;
@@ -1037,3 +1047,66 @@ void dx_server_timer_cancel_LH(dx_timer_
     DEQ_REMOVE(timer->server->pending_timers, timer);
 }
 
+
+static void server_schema_handler(void *context, void *correlator)
+{
+}
+
+
+static void server_query_handler(void* context, const char *id, void *cor)
+{
+    dx_server_t *dx_server = (dx_server_t*) context;
+    sys_mutex_lock(dx_server->lock);
+    const char               *conn_state;
+    const dx_server_config_t *config;
+    const char               *pn_container_name;
+    const char               *direction;
+
+    dx_connection_t *conn = DEQ_HEAD(dx_server->connections);
+    while (conn) {
+        switch (conn->state) {
+        case CONN_STATE_CONNECTING:  conn_state = "Connecting";  break;
+        case CONN_STATE_OPENING:     conn_state = "Opening";     break;
+        case CONN_STATE_OPERATIONAL: conn_state = "Operational"; break;
+        case CONN_STATE_FAILED:      conn_state = "Failed";      break;
+        case CONN_STATE_USER:        conn_state = "User";        break;
+        default:                     conn_state = "undefined";   break;
+        }
+        dx_agent_value_string(cor, "state", conn_state);
+        // get remote container name using proton connection
+        pn_container_name = pn_connection_remote_container(conn->pn_conn);
+        if (pn_container_name)
+            dx_agent_value_string(cor, "container", pn_container_name);
+        else
+            dx_agent_value_null(cor, "container");
+
+        // and now for some config entries
+        if (conn->connector) {
+            config = conn->connector->config;
+            direction = "out";
+            char host[1000];
+            strcpy(host, config->host);
+            strcat(host, ":");
+            strcat(host, config->port);
+            dx_agent_value_string(cor, "host", host);
+        } else {
+            config = conn->listener->config;
+            direction = "in";
+            dx_agent_value_string(cor, "host", 
pn_connector_name(conn->pn_cxtr));
+        }
+
+        dx_agent_value_string(cor, "sasl", config->sasl_mechanisms);
+        dx_agent_value_string(cor, "role", config->role);
+        dx_agent_value_string(cor, "dir",  direction);
+
+        conn = DEQ_NEXT(conn);
+        dx_agent_value_complete(cor, conn != 0);
+    }
+    sys_mutex_unlock(dx_server->lock);
+}
+
+
+void dx_server_setup_agent(dx_dispatch_t *dx)
+{
+    dx_agent_register_class(dx, "org.apache.qpid.dispatch.connection", 
dx->server, server_schema_handler, server_query_handler);
+}

Modified: qpid/dispatch/trunk/src/server_private.h
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server_private.h?rev=1541689&r1=1541688&r2=1541689&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server_private.h (original)
+++ qpid/dispatch/trunk/src/server_private.h Wed Nov 13 20:11:31 2013
@@ -23,7 +23,9 @@
 #include <qpid/dispatch/user_fd.h>
 #include <qpid/dispatch/timer.h>
 #include <qpid/dispatch/alloc.h>
+#include <qpid/dispatch/ctools.h>
 #include <proton/driver.h>
+#include <proton/engine.h>
 #include <proton/driver_extras.h>
 
 void dx_server_timer_pending_LH(dx_timer_t *timer);
@@ -68,6 +70,7 @@ struct dx_connector_t {
 
 
 struct dx_connection_t {
+    DEQ_LINKS(dx_connection_t);
     dx_server_t     *server;
     conn_state_t     state;
     int              owner_thread;
@@ -96,5 +99,6 @@ ALLOC_DECLARE(dx_connector_t);
 ALLOC_DECLARE(dx_connection_t);
 ALLOC_DECLARE(dx_user_fd_t);
 
+DEQ_DECLARE(dx_connection_t, dx_connection_list_t);
 
 #endif

Modified: qpid/dispatch/trunk/tools/src/py/qdstat
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tools/src/py/qdstat?rev=1541689&r1=1541688&r2=1541689&view=diff
==============================================================================
--- qpid/dispatch/trunk/tools/src/py/qdstat (original)
+++ qpid/dispatch/trunk/tools/src/py/qdstat Wed Nov 13 20:11:31 2013
@@ -108,8 +108,43 @@ class BusManager:
     def Disconnect(self):
         self.M.stop()
 
-    def displayConn(self):
-        pass
+    def displayConnections(self):
+        disp = Display(prefix="  ")
+        heads = []
+        heads.append(Header("state"))
+        heads.append(Header("host"))
+        heads.append(Header("container"))
+        heads.append(Header("sasl-mechanisms"))
+        heads.append(Header("role"))
+        heads.append(Header("dir"))
+
+        rows = []
+
+        request = Message()
+        response = Message()
+
+        request.address = self.address
+        request.reply_to = self.reply
+        request.correlation_id = 1
+        request.properties = {u'operation':u'GET', 
u'type':u'org.apache.qpid.dispatch.connection'}
+
+        self.M.put(request)
+        self.M.send()
+
+        self.M.recv()
+        self.M.get(response)
+        for conn in response.body:
+            row = []
+            row.append(conn['state'])
+            row.append(conn['host'])
+            row.append(conn['container'])
+            row.append(conn['sasl'])
+            row.append(conn['role'])
+            row.append(conn['dir'])
+            rows.append(row)
+        title = "Connections"
+        dispRows = rows
+        disp.formattedTable(title, heads, dispRows)
 
     def _addr_class(self, addr):
         if not addr:
@@ -301,6 +336,7 @@ class BusManager:
         elif main == 'n': self.displayRouterNodes()
         elif main == 'a': self.displayAddresses()
         elif main == 'm': self.displayMemory()
+        elif main == 'c': self.displayConnections()
 
     def display(self, names):
         self.displayMain(names, config._types)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to