Author: tross
Date: Mon Oct 21 21:44:12 2013
New Revision: 1534391
URL: http://svn.apache.org/r1534391
Log:
QPID-5238 - Added several DISCOVER-* operations from the management spec.
Modified:
qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h
qpid/trunk/qpid/extras/dispatch/src/agent.c
qpid/trunk/qpid/extras/dispatch/src/router_agent.c
Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h?rev=1534391&r1=1534390&r2=1534391&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h (original)
+++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h Mon Oct 21
21:44:12 2013
@@ -47,5 +47,6 @@ void dx_router_send2(dx_dispatch_t *dx,
const char *address,
dx_message_t *msg);
+void dx_router_build_node_list(dx_dispatch_t *dx, dx_composed_field_t *field);
#endif
Modified: qpid/trunk/qpid/extras/dispatch/src/agent.c
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/agent.c?rev=1534391&r1=1534390&r2=1534391&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/agent.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/agent.c Mon Oct 21 21:44:12 2013
@@ -35,24 +35,29 @@
#include <string.h>
#include <stdio.h>
-struct dx_agent_t {
- dx_dispatch_t *dx;
- dx_hash_t *class_hash;
- dx_message_list_t in_fifo;
- dx_message_list_t out_fifo;
- sys_mutex_t *lock;
- dx_timer_t *timer;
- dx_address_t *address;
-};
-
-
struct dx_agent_class_t {
- char *fqname;
+ DEQ_LINKS(dx_agent_class_t);
+ dx_hash_handle_t *hash_handle;
void *context;
dx_agent_schema_cb_t schema_handler;
dx_agent_query_cb_t query_handler; // 0 iff class is an event.
};
+DEQ_DECLARE(dx_agent_class_t, dx_agent_class_list_t);
+
+
+struct dx_agent_t {
+ dx_dispatch_t *dx;
+ dx_hash_t *class_hash;
+ dx_agent_class_list_t class_list;
+ dx_message_list_t in_fifo;
+ dx_message_list_t out_fifo;
+ sys_mutex_t *lock;
+ dx_timer_t *timer;
+ dx_address_t *address;
+ dx_agent_class_t *container_class;
+};
+
typedef struct {
dx_agent_t *agent;
@@ -63,20 +68,8 @@ typedef struct {
static char *log_module = "AGENT";
-static void dx_agent_process_get(dx_agent_t *agent, dx_parsed_field_t *map,
dx_field_iterator_t *reply_to)
+static dx_composed_field_t *dx_agent_setup_response(dx_field_iterator_t
*reply_to)
{
- dx_parsed_field_t *cls = dx_parse_value_by_key(map, "type");
- if (cls == 0)
- return;
-
- dx_field_iterator_t *cls_string = dx_parse_raw(cls);
- const dx_agent_class_t *cls_record;
- dx_hash_retrieve_const(agent->class_hash, cls_string, (const void**)
&cls_record);
- if (cls_record == 0)
- return;
-
- dx_log(log_module, LOG_TRACE, "Received GET request for type: %s",
cls_record->fqname);
-
//
// Compose the header
//
@@ -103,9 +96,6 @@ static void dx_agent_process_get(dx_agen
//
field = dx_compose(DX_PERFORMATIVE_APPLICATION_PROPERTIES, field);
dx_compose_start_map(field);
- dx_compose_insert_string(field, "operation");
- dx_compose_insert_string(field, "GET");
-
dx_compose_insert_string(field, "status-code");
dx_compose_insert_uint(field, 200);
@@ -113,6 +103,26 @@ static void dx_agent_process_get(dx_agen
dx_compose_insert_string(field, "OK");
dx_compose_end_map(field);
+ return field;
+}
+
+
+static void dx_agent_process_get(dx_agent_t *agent, dx_parsed_field_t *map,
dx_field_iterator_t *reply_to)
+{
+ dx_parsed_field_t *cls = dx_parse_value_by_key(map, "type");
+ if (cls == 0)
+ return;
+
+ dx_field_iterator_t *cls_string = dx_parse_raw(cls);
+ const dx_agent_class_t *cls_record;
+ dx_hash_retrieve_const(agent->class_hash, cls_string, (const void**)
&cls_record);
+ if (cls_record == 0)
+ return;
+
+ dx_log(log_module, LOG_TRACE, "Received GET request for type: %s",
dx_hash_key_by_handle(cls_record->hash_handle));
+
+ dx_composed_field_t *field = dx_agent_setup_response(reply_to);
+
//
// Open the Body (AMQP Value) to be filled in by the handler.
//
@@ -147,6 +157,113 @@ static void dx_agent_process_get(dx_agen
}
+static void dx_agent_process_discover_types(dx_agent_t *agent,
dx_parsed_field_t *map, dx_field_iterator_t *reply_to)
+{
+ dx_log(log_module, LOG_TRACE, "Received DISCOVER-TYPES request");
+
+ dx_composed_field_t *field = dx_agent_setup_response(reply_to);
+
+ //
+ // Open the Body (AMQP Value) to be filled in by the handler.
+ //
+ field = dx_compose(DX_PERFORMATIVE_BODY_AMQP_VALUE, field);
+ dx_compose_start_map(field);
+
+ //
+ // Put entries into the map for each known entity type
+ //
+ sys_mutex_lock(agent->lock);
+ dx_agent_class_t *cls = DEQ_HEAD(agent->class_list);
+ while (cls) {
+ dx_compose_insert_string(field, (const char*)
dx_hash_key_by_handle(cls->hash_handle));
+ dx_compose_insert_null(field); // TODO -
https://tools.oasis-open.org/issues/browse/AMQP-87
+ cls = DEQ_NEXT(cls);
+ }
+ sys_mutex_unlock(agent->lock);
+ dx_compose_end_map(field);
+
+ //
+ // Create a message and send it.
+ //
+ dx_message_t *msg = dx_message();
+ dx_message_compose_2(msg, field);
+ dx_router_send(agent->dx, reply_to, msg);
+
+ dx_message_free(msg);
+ dx_compose_free(field);
+}
+
+
+static void dx_agent_process_discover_operations(dx_agent_t *agent,
dx_parsed_field_t *map, dx_field_iterator_t *reply_to)
+{
+ dx_log(log_module, LOG_TRACE, "Received DISCOVER-OPERATIONS request");
+
+ dx_composed_field_t *field = dx_agent_setup_response(reply_to);
+
+ //
+ // Open the Body (AMQP Value) to be filled in by the handler.
+ //
+ field = dx_compose(DX_PERFORMATIVE_BODY_AMQP_VALUE, field);
+ dx_compose_start_map(field);
+
+ //
+ // Put entries into the map for each known entity type
+ //
+ sys_mutex_lock(agent->lock);
+ dx_agent_class_t *cls = DEQ_HEAD(agent->class_list);
+ while (cls) {
+ dx_compose_insert_string(field, (const char*)
dx_hash_key_by_handle(cls->hash_handle));
+ dx_compose_start_list(field);
+ dx_compose_insert_string(field, "READ");
+ dx_compose_end_list(field);
+ cls = DEQ_NEXT(cls);
+ }
+ sys_mutex_unlock(agent->lock);
+ dx_compose_end_map(field);
+
+ //
+ // Create a message and send it.
+ //
+ dx_message_t *msg = dx_message();
+ dx_message_compose_2(msg, field);
+ dx_router_send(agent->dx, reply_to, msg);
+
+ dx_message_free(msg);
+ dx_compose_free(field);
+}
+
+
+static void dx_agent_process_discover_nodes(dx_agent_t *agent,
dx_parsed_field_t *map, dx_field_iterator_t *reply_to)
+{
+ dx_log(log_module, LOG_TRACE, "Received DISCOVER-MGMT-NODES request");
+
+ dx_composed_field_t *field = dx_agent_setup_response(reply_to);
+
+ //
+ // Open the Body (AMQP Value) to be filled in by the handler.
+ //
+ field = dx_compose(DX_PERFORMATIVE_BODY_AMQP_VALUE, field);
+
+ //
+ // Put entries into the list for each known management node
+ //
+ dx_compose_start_list(field);
+ dx_compose_insert_string(field, "amqp:/_local/$management");
+ dx_router_build_node_list(agent->dx, field);
+ dx_compose_end_list(field);
+
+ //
+ // Create a message and send it.
+ //
+ dx_message_t *msg = dx_message();
+ dx_message_compose_2(msg, field);
+ dx_router_send(agent->dx, reply_to, msg);
+
+ dx_message_free(msg);
+ dx_compose_free(field);
+}
+
+
static void dx_agent_process_request(dx_agent_t *agent, dx_message_t *msg)
{
//
@@ -213,6 +330,12 @@ static void dx_agent_process_request(dx_
dx_field_iterator_t *operation_string = dx_parse_raw(operation);
if (dx_field_iterator_equal(operation_string, (unsigned char*) "GET"))
dx_agent_process_get(agent, map, reply_to);
+ if (dx_field_iterator_equal(operation_string, (unsigned char*)
"DISCOVER-TYPES"))
+ dx_agent_process_discover_types(agent, map, reply_to);
+ if (dx_field_iterator_equal(operation_string, (unsigned char*)
"DISCOVER-OPERATIONS"))
+ dx_agent_process_discover_operations(agent, map, reply_to);
+ if (dx_field_iterator_equal(operation_string, (unsigned char*)
"DISCOVER-MGMT-NODES"))
+ dx_agent_process_discover_nodes(agent, map, reply_to);
dx_parse_free(map);
dx_field_iterator_free(ap);
@@ -253,11 +376,38 @@ static void dx_agent_rx_handler(void *co
}
+static dx_agent_class_t *dx_agent_register_class_LH(dx_agent_t
*agent,
+ const char
*fqname,
+ void
*context,
+ dx_agent_schema_cb_t
schema_handler,
+ dx_agent_query_cb_t
query_handler)
+{
+ dx_agent_class_t *cls = NEW(dx_agent_class_t);
+ assert(cls);
+ DEQ_ITEM_INIT(cls);
+ cls->context = context;
+ cls->schema_handler = schema_handler;
+ cls->query_handler = query_handler;
+
+ dx_field_iterator_t *iter = dx_field_iterator_string(fqname,
ITER_VIEW_ALL);
+ int result = dx_hash_insert_const(agent->class_hash, iter, cls,
&cls->hash_handle);
+ dx_field_iterator_free(iter);
+ if (result < 0)
+ assert(false);
+
+ DEQ_INSERT_TAIL(agent->class_list, cls);
+
+ dx_log(log_module, LOG_INFO, "Manageable Entity Type (%s) %s",
query_handler ? "object" : "event", fqname);
+ return cls;
+}
+
+
dx_agent_t *dx_agent(dx_dispatch_t *dx)
{
dx_agent_t *agent = NEW(dx_agent_t);
agent->dx = dx;
agent->class_hash = dx_hash(6, 10, 1);
+ DEQ_INIT(agent->class_list);
DEQ_INIT(agent->in_fifo);
DEQ_INIT(agent->out_fifo);
agent->lock = sys_mutex();
@@ -284,23 +434,12 @@ dx_agent_class_t *dx_agent_register_clas
dx_agent_schema_cb_t schema_handler,
dx_agent_query_cb_t query_handler)
{
- dx_agent_t *agent = dx->agent;
-
- dx_agent_class_t *cls = NEW(dx_agent_class_t);
- assert(cls);
- cls->fqname = (char*) malloc(strlen(fqname) + 1);
- strcpy(cls->fqname, fqname);
- cls->context = context;
- cls->schema_handler = schema_handler;
- cls->query_handler = query_handler;
+ dx_agent_t *agent = dx->agent;
+ dx_agent_class_t *cls;
- dx_field_iterator_t *iter = dx_field_iterator_string(fqname,
ITER_VIEW_ALL);
- int result = dx_hash_insert_const(agent->class_hash, iter, cls, 0);
- dx_field_iterator_free(iter);
- if (result < 0)
- assert(false);
-
- dx_log(log_module, LOG_TRACE, "%s class registered: %s", query_handler ?
"Object" : "Event", fqname);
+ sys_mutex_lock(agent->lock);
+ cls = dx_agent_register_class_LH(agent, fqname, context, schema_handler,
query_handler);
+ sys_mutex_unlock(agent->lock);
return cls;
}
Modified: qpid/trunk/qpid/extras/dispatch/src/router_agent.c
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_agent.c?rev=1534391&r1=1534390&r2=1534391&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_agent.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_agent.c Mon Oct 21 21:44:12 2013
@@ -185,3 +185,24 @@ void dx_router_agent_setup(dx_router_t *
dx_router_setup_class(router,
"org.apache.qpid.dispatch.router.address", DX_ROUTER_CLASS_ADDRESS);
}
+
+void dx_router_build_node_list(dx_dispatch_t *dx, dx_composed_field_t *field)
+{
+ dx_router_t *router = dx->router;
+ char temp[1000]; // FIXME
+
+ sys_mutex_lock(router->lock);
+ dx_router_node_t *rnode = DEQ_HEAD(router->routers);
+ while (rnode) {
+ strcpy(temp, "amqp:/_topo/");
+ strcat(temp, router->router_area);
+ strcat(temp, "/");
+ const unsigned char* addr =
dx_hash_key_by_handle(rnode->owning_addr->hash_handle);
+ strcat(temp, &((char*) addr)[1]);
+ strcat(temp, "/$management");
+ dx_compose_insert_string(field, temp);
+ rnode = DEQ_NEXT(rnode);
+ }
+ sys_mutex_unlock(router->lock);
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]