Author: tross
Date: Mon Oct 21 21:44:12 2013
New Revision: 1534391

URL: http://svn.apache.org/r1534391
Log:
QPID-5238 - Added several DISCOVER-* operations from the management spec.

Modified:
    qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h
    qpid/trunk/qpid/extras/dispatch/src/agent.c
    qpid/trunk/qpid/extras/dispatch/src/router_agent.c

Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h?rev=1534391&r1=1534390&r2=1534391&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h (original)
+++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h Mon Oct 21 
21:44:12 2013
@@ -47,5 +47,6 @@ void dx_router_send2(dx_dispatch_t *dx,
                      const char    *address,
                      dx_message_t  *msg);
 
+void dx_router_build_node_list(dx_dispatch_t *dx, dx_composed_field_t *field);
 
 #endif

Modified: qpid/trunk/qpid/extras/dispatch/src/agent.c
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/agent.c?rev=1534391&r1=1534390&r2=1534391&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/agent.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/agent.c Mon Oct 21 21:44:12 2013
@@ -35,24 +35,29 @@
 #include <string.h>
 #include <stdio.h>
 
-struct dx_agent_t {
-    dx_dispatch_t     *dx;
-    dx_hash_t         *class_hash;
-    dx_message_list_t  in_fifo;
-    dx_message_list_t  out_fifo;
-    sys_mutex_t       *lock;
-    dx_timer_t        *timer;
-    dx_address_t      *address;
-};
-
-
 struct dx_agent_class_t {
-    char                 *fqname;
+    DEQ_LINKS(dx_agent_class_t);
+    dx_hash_handle_t     *hash_handle;
     void                 *context;
     dx_agent_schema_cb_t  schema_handler;
     dx_agent_query_cb_t   query_handler;  // 0 iff class is an event.
 };
 
+DEQ_DECLARE(dx_agent_class_t, dx_agent_class_list_t);
+
+
+struct dx_agent_t {
+    dx_dispatch_t         *dx;
+    dx_hash_t             *class_hash;
+    dx_agent_class_list_t  class_list;
+    dx_message_list_t      in_fifo;
+    dx_message_list_t      out_fifo;
+    sys_mutex_t           *lock;
+    dx_timer_t            *timer;
+    dx_address_t          *address;
+    dx_agent_class_t      *container_class;
+};
+
 
 typedef struct {
     dx_agent_t          *agent;
@@ -63,20 +68,8 @@ typedef struct {
 static char *log_module = "AGENT";
 
 
-static void dx_agent_process_get(dx_agent_t *agent, dx_parsed_field_t *map, 
dx_field_iterator_t *reply_to)
+static dx_composed_field_t *dx_agent_setup_response(dx_field_iterator_t 
*reply_to)
 {
-    dx_parsed_field_t *cls = dx_parse_value_by_key(map, "type");
-    if (cls == 0)
-        return;
-
-    dx_field_iterator_t    *cls_string = dx_parse_raw(cls);
-    const dx_agent_class_t *cls_record;
-    dx_hash_retrieve_const(agent->class_hash, cls_string, (const void**) 
&cls_record);
-    if (cls_record == 0)
-        return;
-
-    dx_log(log_module, LOG_TRACE, "Received GET request for type: %s", 
cls_record->fqname);
-
     //
     // Compose the header
     //
@@ -103,9 +96,6 @@ static void dx_agent_process_get(dx_agen
     //
     field = dx_compose(DX_PERFORMATIVE_APPLICATION_PROPERTIES, field);
     dx_compose_start_map(field);
-    dx_compose_insert_string(field, "operation");
-    dx_compose_insert_string(field, "GET");
-
     dx_compose_insert_string(field, "status-code");
     dx_compose_insert_uint(field, 200);
 
@@ -113,6 +103,26 @@ static void dx_agent_process_get(dx_agen
     dx_compose_insert_string(field, "OK");
     dx_compose_end_map(field);
 
+    return field;
+}
+
+
+static void dx_agent_process_get(dx_agent_t *agent, dx_parsed_field_t *map, 
dx_field_iterator_t *reply_to)
+{
+    dx_parsed_field_t *cls = dx_parse_value_by_key(map, "type");
+    if (cls == 0)
+        return;
+
+    dx_field_iterator_t    *cls_string = dx_parse_raw(cls);
+    const dx_agent_class_t *cls_record;
+    dx_hash_retrieve_const(agent->class_hash, cls_string, (const void**) 
&cls_record);
+    if (cls_record == 0)
+        return;
+
+    dx_log(log_module, LOG_TRACE, "Received GET request for type: %s", 
dx_hash_key_by_handle(cls_record->hash_handle));
+
+    dx_composed_field_t *field = dx_agent_setup_response(reply_to);
+
     //
     // Open the Body (AMQP Value) to be filled in by the handler.
     //
@@ -147,6 +157,113 @@ static void dx_agent_process_get(dx_agen
 }
 
 
+static void dx_agent_process_discover_types(dx_agent_t *agent, 
dx_parsed_field_t *map, dx_field_iterator_t *reply_to)
+{
+    dx_log(log_module, LOG_TRACE, "Received DISCOVER-TYPES request");
+
+    dx_composed_field_t *field = dx_agent_setup_response(reply_to);
+
+    //
+    // Open the Body (AMQP Value) to be filled in by the handler.
+    //
+    field = dx_compose(DX_PERFORMATIVE_BODY_AMQP_VALUE, field);
+    dx_compose_start_map(field);
+
+    //
+    // Put entries into the map for each known entity type
+    //
+    sys_mutex_lock(agent->lock);
+    dx_agent_class_t *cls = DEQ_HEAD(agent->class_list);
+    while (cls) {
+        dx_compose_insert_string(field, (const char*) 
dx_hash_key_by_handle(cls->hash_handle));
+        dx_compose_insert_null(field); // TODO - 
https://tools.oasis-open.org/issues/browse/AMQP-87
+        cls = DEQ_NEXT(cls);
+    }
+    sys_mutex_unlock(agent->lock);
+    dx_compose_end_map(field);
+
+    //
+    // Create a message and send it.
+    //
+    dx_message_t *msg = dx_message();
+    dx_message_compose_2(msg, field);
+    dx_router_send(agent->dx, reply_to, msg);
+
+    dx_message_free(msg);
+    dx_compose_free(field);
+}
+
+
+static void dx_agent_process_discover_operations(dx_agent_t *agent, 
dx_parsed_field_t *map, dx_field_iterator_t *reply_to)
+{
+    dx_log(log_module, LOG_TRACE, "Received DISCOVER-OPERATIONS request");
+
+    dx_composed_field_t *field = dx_agent_setup_response(reply_to);
+
+    //
+    // Open the Body (AMQP Value) to be filled in by the handler.
+    //
+    field = dx_compose(DX_PERFORMATIVE_BODY_AMQP_VALUE, field);
+    dx_compose_start_map(field);
+
+    //
+    // Put entries into the map for each known entity type
+    //
+    sys_mutex_lock(agent->lock);
+    dx_agent_class_t *cls = DEQ_HEAD(agent->class_list);
+    while (cls) {
+        dx_compose_insert_string(field, (const char*) 
dx_hash_key_by_handle(cls->hash_handle));
+        dx_compose_start_list(field);
+        dx_compose_insert_string(field, "READ");
+        dx_compose_end_list(field);
+        cls = DEQ_NEXT(cls);
+    }
+    sys_mutex_unlock(agent->lock);
+    dx_compose_end_map(field);
+
+    //
+    // Create a message and send it.
+    //
+    dx_message_t *msg = dx_message();
+    dx_message_compose_2(msg, field);
+    dx_router_send(agent->dx, reply_to, msg);
+
+    dx_message_free(msg);
+    dx_compose_free(field);
+}
+
+
+static void dx_agent_process_discover_nodes(dx_agent_t *agent, 
dx_parsed_field_t *map, dx_field_iterator_t *reply_to)
+{
+    dx_log(log_module, LOG_TRACE, "Received DISCOVER-MGMT-NODES request");
+
+    dx_composed_field_t *field = dx_agent_setup_response(reply_to);
+
+    //
+    // Open the Body (AMQP Value) to be filled in by the handler.
+    //
+    field = dx_compose(DX_PERFORMATIVE_BODY_AMQP_VALUE, field);
+
+    //
+    // Put entries into the list for each known management node
+    //
+    dx_compose_start_list(field);
+    dx_compose_insert_string(field, "amqp:/_local/$management");
+    dx_router_build_node_list(agent->dx, field);
+    dx_compose_end_list(field);
+
+    //
+    // Create a message and send it.
+    //
+    dx_message_t *msg = dx_message();
+    dx_message_compose_2(msg, field);
+    dx_router_send(agent->dx, reply_to, msg);
+
+    dx_message_free(msg);
+    dx_compose_free(field);
+}
+
+
 static void dx_agent_process_request(dx_agent_t *agent, dx_message_t *msg)
 {
     //
@@ -213,6 +330,12 @@ static void dx_agent_process_request(dx_
     dx_field_iterator_t *operation_string = dx_parse_raw(operation);
     if (dx_field_iterator_equal(operation_string, (unsigned char*) "GET"))
         dx_agent_process_get(agent, map, reply_to);
+    if (dx_field_iterator_equal(operation_string, (unsigned char*) 
"DISCOVER-TYPES"))
+        dx_agent_process_discover_types(agent, map, reply_to);
+    if (dx_field_iterator_equal(operation_string, (unsigned char*) 
"DISCOVER-OPERATIONS"))
+        dx_agent_process_discover_operations(agent, map, reply_to);
+    if (dx_field_iterator_equal(operation_string, (unsigned char*) 
"DISCOVER-MGMT-NODES"))
+        dx_agent_process_discover_nodes(agent, map, reply_to);
 
     dx_parse_free(map);
     dx_field_iterator_free(ap);
@@ -253,11 +376,38 @@ static void dx_agent_rx_handler(void *co
 }
 
 
+static dx_agent_class_t *dx_agent_register_class_LH(dx_agent_t           
*agent,
+                                                    const char           
*fqname,
+                                                    void                 
*context,
+                                                    dx_agent_schema_cb_t  
schema_handler,
+                                                    dx_agent_query_cb_t   
query_handler)
+{
+    dx_agent_class_t *cls = NEW(dx_agent_class_t);
+    assert(cls);
+    DEQ_ITEM_INIT(cls);
+    cls->context        = context;
+    cls->schema_handler = schema_handler;
+    cls->query_handler  = query_handler;
+
+    dx_field_iterator_t *iter = dx_field_iterator_string(fqname, 
ITER_VIEW_ALL);
+    int result = dx_hash_insert_const(agent->class_hash, iter, cls, 
&cls->hash_handle);
+    dx_field_iterator_free(iter);
+    if (result < 0)
+        assert(false);
+
+    DEQ_INSERT_TAIL(agent->class_list, cls);
+
+    dx_log(log_module, LOG_INFO, "Manageable Entity Type (%s) %s", 
query_handler ? "object" : "event", fqname);
+    return cls;
+}
+
+
 dx_agent_t *dx_agent(dx_dispatch_t *dx)
 {
     dx_agent_t *agent = NEW(dx_agent_t);
     agent->dx         = dx;
     agent->class_hash = dx_hash(6, 10, 1);
+    DEQ_INIT(agent->class_list);
     DEQ_INIT(agent->in_fifo);
     DEQ_INIT(agent->out_fifo);
     agent->lock    = sys_mutex();
@@ -284,23 +434,12 @@ dx_agent_class_t *dx_agent_register_clas
                                           dx_agent_schema_cb_t  schema_handler,
                                           dx_agent_query_cb_t   query_handler)
 {
-    dx_agent_t *agent = dx->agent;
-
-    dx_agent_class_t *cls = NEW(dx_agent_class_t);
-    assert(cls);
-    cls->fqname = (char*) malloc(strlen(fqname) + 1);
-    strcpy(cls->fqname, fqname);
-    cls->context        = context;
-    cls->schema_handler = schema_handler;
-    cls->query_handler  = query_handler;
+    dx_agent_t       *agent = dx->agent;
+    dx_agent_class_t *cls;
 
-    dx_field_iterator_t *iter = dx_field_iterator_string(fqname, 
ITER_VIEW_ALL);
-    int result = dx_hash_insert_const(agent->class_hash, iter, cls, 0);
-    dx_field_iterator_free(iter);
-    if (result < 0)
-        assert(false);
-
-    dx_log(log_module, LOG_TRACE, "%s class registered: %s", query_handler ? 
"Object" : "Event", fqname);
+    sys_mutex_lock(agent->lock);
+    cls = dx_agent_register_class_LH(agent, fqname, context, schema_handler, 
query_handler);
+    sys_mutex_unlock(agent->lock);
     return cls;
 }
 

Modified: qpid/trunk/qpid/extras/dispatch/src/router_agent.c
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_agent.c?rev=1534391&r1=1534390&r2=1534391&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_agent.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_agent.c Mon Oct 21 21:44:12 2013
@@ -185,3 +185,24 @@ void dx_router_agent_setup(dx_router_t *
         dx_router_setup_class(router, 
"org.apache.qpid.dispatch.router.address", DX_ROUTER_CLASS_ADDRESS);
 }
 
+
+void dx_router_build_node_list(dx_dispatch_t *dx, dx_composed_field_t *field)
+{
+    dx_router_t *router = dx->router;
+    char         temp[1000];  // FIXME
+
+    sys_mutex_lock(router->lock);
+    dx_router_node_t *rnode = DEQ_HEAD(router->routers);
+    while (rnode) {
+        strcpy(temp, "amqp:/_topo/");
+        strcat(temp, router->router_area);
+        strcat(temp, "/");
+        const unsigned char* addr = 
dx_hash_key_by_handle(rnode->owning_addr->hash_handle);
+        strcat(temp, &((char*) addr)[1]);
+        strcat(temp, "/$management");
+        dx_compose_insert_string(field, temp);
+        rnode = DEQ_NEXT(rnode);
+    }
+    sys_mutex_unlock(router->lock);
+}
+



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to