This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push: new 9f621c4 DISPATCH-1275 - Enabled deletion of connections based on connection id. Added related policy code. By default, a user has privileges to delete a connection. A policy needs to disallow this. Inter-router connections can never be deleted This closes #467 9f621c4 is described below commit 9f621c432f7e9dd49156292d1f7e53be4fd7eaf9 Author: Ganesh Murthy <gmur...@redhat.com> AuthorDate: Wed Mar 6 12:34:13 2019 -0500 DISPATCH-1275 - Enabled deletion of connections based on connection id. Added related policy code. By default, a user has privileges to delete a connection. A policy needs to disallow this. Inter-router connections can never be deleted This closes #467 --- docs/books/user-guide/configuration-security.adoc | 5 + include/qpid/dispatch/amqp.h | 2 + include/qpid/dispatch/router_core.h | 5 +- python/qpid_dispatch/management/qdrouter.json | 16 ++ .../qpid_dispatch_internal/policy/policy_local.py | 8 +- src/amqp.c | 1 + src/container.c | 5 +- src/parse.c | 6 + src/policy.c | 7 + src/policy.h | 1 + src/router_core/agent.c | 4 +- src/router_core/agent_connection.c | 156 ++++++++++++++++ src/router_core/agent_connection.h | 7 +- src/router_core/connections.c | 14 +- src/router_core/router_core_private.h | 40 ++-- src/router_node.c | 25 ++- tests/one-router-policy/default.json | 5 +- tests/system_tests_one_router.py | 52 +++++- tests/system_tests_qdmanage.py | 36 +++- tests/system_tests_two_routers.py | 203 ++++++++++++++++++++- 20 files changed, 571 insertions(+), 27 deletions(-) diff --git a/docs/books/user-guide/configuration-security.adoc b/docs/books/user-guide/configuration-security.adoc index f8795d6..e2a2de2 100644 --- a/docs/books/user-guide/configuration-security.adoc +++ b/docs/books/user-guide/configuration-security.adoc @@ -604,6 +604,7 @@ vhost { $default: { remoteHosts: * allowDynamicSource: true, + allowAdminStatusUpdate: true, sources: myqueue1, myqueue2 targets: myqueue1, myqueue2 } @@ -619,6 +620,10 @@ A list of remote hosts from which the users may connect. A host can be a hostnam `allowDynamicSource`:: If true, connections from users in this group are permitted to attach receivers to dynamic sources. This permits creation of listners to temporary addresses or termporary queues. If false, use of dynamic sources is forbidden. +`allowAdminStatusUpdate`:: +If true, connections from users in this group are permitted to modify the adminStatus of connections. This permits termination of sender or receiver connections. If false, the users of this group are prohibited from terminating any connections. Inter-router connections can never be terminated by any user under any circumstance. Defaults to true, no policy required. + + `allowWaypointLinks`:: If true, connections from users in this group are permitted to attach links using waypoint capabilities. This allows endpoints to act as waypoints (i.e. brokers) without the need for configuring auto-links. If false, use of waypoint capabilities is forbidden. diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h index 9f6be37..8a17c4c 100644 --- a/include/qpid/dispatch/amqp.h +++ b/include/qpid/dispatch/amqp.h @@ -192,6 +192,8 @@ extern const char * const QD_AMQP_COND_PRECONDITION_FAILED; extern const char * const QD_AMQP_COND_RESOURCE_DELETED; extern const char * const QD_AMQP_COND_ILLEGAL_STATE; extern const char * const QD_AMQP_COND_FRAME_SIZE_TOO_SMALL; + +extern const char * const QD_AMQP_COND_CONNECTION_FORCED; /// @}; /** @name AMQP link endpoint role. */ diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 65b1393..9f9567d 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -188,6 +188,7 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core, bool strip_annotations_in, bool strip_annotations_out, bool policy_allow_dynamic_link_routes, + bool policy_allow_admin_status_update, int link_capacity, const char *vhost, qdr_connection_info_t *connection_info, @@ -677,6 +678,7 @@ typedef void (*qdr_link_drain_t) (void *context, qdr_link_t *link, bool typedef int (*qdr_link_push_t) (void *context, qdr_link_t *link, int limit); typedef uint64_t (*qdr_link_deliver_t) (void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled); typedef void (*qdr_delivery_update_t) (void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled); +typedef void (*qdr_connection_close_t) (void *context, qdr_connection_t *conn, qdr_error_t *error); void qdr_connection_handlers(qdr_core_t *core, void *context, @@ -690,7 +692,8 @@ void qdr_connection_handlers(qdr_core_t *core, qdr_link_drain_t drain, qdr_link_push_t push, qdr_link_deliver_t deliver, - qdr_delivery_update_t delivery_update); + qdr_delivery_update_t delivery_update, + qdr_connection_close_t conn_close); /** ****************************************************************************** diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index 1d12338..45958b1 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -1631,6 +1631,15 @@ "description": "Connections to the router's container.", "extends": "operationalEntity", "attributes": { + "adminStatus": { + "type": ["enabled", "deleted"], + "default": "enabled", + "description": "This field is set to enabled when the connection is up and running. Setting this field to deleted will terminate the connection and all links and sessions contained in the connection. Inter-router connections cannot be terminated by setting the adminStatus to deleted.", + "update": true + }, + "operStatus": { + "type": ["up", "closing"] + }, "container": { "description": "The container for this connection", "type": "string" @@ -1925,6 +1934,13 @@ "required": false, "create": true }, + "allowAdminStatusUpdate": { + "type": "boolean", + "description": "Whether this connection is allowed to update the admin status of other connections. Note: Inter-router connections cannot be deleted at any time.", + "default": true, + "required": false, + "create": true + }, "sources": { "type": "string", "description": "A list of source addresses from which users in this group may receive messages. To specify multiple addresses, separate the addresses with either a comma or a space. If you do not specify any addresses, users in this group are not allowed to receive messages from any addresses. You can use the substitution token '${user}' to specify an address that contains a user's authenticated user name. You can use an asterisk ('*') wildcard to match one or more ch [...] diff --git a/python/qpid_dispatch_internal/policy/policy_local.py b/python/qpid_dispatch_internal/policy/policy_local.py index 4191db5..a7ab8b9 100644 --- a/python/qpid_dispatch_internal/policy/policy_local.py +++ b/python/qpid_dispatch_internal/policy/policy_local.py @@ -72,6 +72,7 @@ class PolicyKeys(object): KW_ALLOW_USERID_PROXY = "allowUserIdProxy" KW_ALLOW_WAYPOINT_LINKS = "allowWaypointLinks" KW_ALLOW_DYNAMIC_LINK_ROUTES = "allowDynamicLinkRoutes" + KW_ALLOW_ADMIN_STATUS_UPDATE = "allowAdminStatusUpdate" KW_SOURCES = "sources" KW_TARGETS = "targets" KW_SOURCE_PATTERN = "sourcePattern" @@ -147,6 +148,7 @@ class PolicyCompiler(object): PolicyKeys.KW_ALLOW_USERID_PROXY, PolicyKeys.KW_ALLOW_WAYPOINT_LINKS, PolicyKeys.KW_ALLOW_DYNAMIC_LINK_ROUTES, + PolicyKeys.KW_ALLOW_ADMIN_STATUS_UPDATE, PolicyKeys.KW_SOURCES, PolicyKeys.KW_TARGETS, PolicyKeys.KW_SOURCE_PATTERN, @@ -250,6 +252,7 @@ class PolicyCompiler(object): policy_out[PolicyKeys.KW_ALLOW_USERID_PROXY] = False policy_out[PolicyKeys.KW_ALLOW_WAYPOINT_LINKS] = True policy_out[PolicyKeys.KW_ALLOW_DYNAMIC_LINK_ROUTES] = True + policy_out[PolicyKeys.KW_ALLOW_ADMIN_STATUS_UPDATE] = True policy_out[PolicyKeys.KW_SOURCES] = '' policy_out[PolicyKeys.KW_TARGETS] = '' policy_out[PolicyKeys.KW_SOURCE_PATTERN] = '' @@ -287,7 +290,8 @@ class PolicyCompiler(object): PolicyKeys.KW_ALLOW_DYNAMIC_SRC, PolicyKeys.KW_ALLOW_USERID_PROXY, PolicyKeys.KW_ALLOW_WAYPOINT_LINKS, - PolicyKeys.KW_ALLOW_DYNAMIC_LINK_ROUTES + PolicyKeys.KW_ALLOW_DYNAMIC_LINK_ROUTES, + PolicyKeys.KW_ALLOW_ADMIN_STATUS_UPDATE ]: if isinstance(val, (PY_STRING_TYPE, PY_TEXT_TYPE)) and val.lower() in ['true', 'false']: val = True if val == 'true' else False @@ -599,6 +603,7 @@ class PolicyLocal(object): candidate = {} name = attributes[PolicyKeys.KW_VHOST_NAME] result = self._policy_compiler.compile_access_ruleset(name, attributes, candidate, warnings, diag) + if not result: raise PolicyError("Policy '%s' is invalid: %s" % (name, diag[0])) if len(warnings) > 0: @@ -802,6 +807,7 @@ class PolicyLocal(object): return False upolicy.update(ruleset[PolicyKeys.KW_GROUPS][groupname]) + upolicy[PolicyKeys.KW_CSTATS] = self.statsdb[vhost].get_cstats() return True except Exception as e: diff --git a/src/amqp.c b/src/amqp.c index 9164d60..d0b19c2 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -84,6 +84,7 @@ const char * const QD_AMQP_COND_PRECONDITION_FAILED = "amqp:precondition-failed" const char * const QD_AMQP_COND_RESOURCE_DELETED = "amqp:resource-deleted"; const char * const QD_AMQP_COND_ILLEGAL_STATE = "amqp:illegal-state"; const char * const QD_AMQP_COND_FRAME_SIZE_TOO_SMALL = "amqp:frame-size-too-small"; +const char * const QD_AMQP_COND_CONNECTION_FORCED = "amqp:connection:forced"; const char * const QD_AMQP_PORT_STR = "5672"; const char * const QD_AMQPS_PORT_STR = "5671"; diff --git a/src/container.c b/src/container.c index 12e6930..0ecd242 100644 --- a/src/container.c +++ b/src/container.c @@ -457,6 +457,10 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, close_links(container, conn, false); pn_connection_close(conn); qd_conn_event_batch_complete(container, qd_conn, true); + } else if (pn_connection_state(conn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { + close_links(container, conn, false); + notify_closed(container, qd_conn, qd_connection_get_context(qd_conn)); + qd_conn_event_batch_complete(container, qd_conn, true); } break; @@ -681,7 +685,6 @@ void qd_container_free(qd_container_t *container) if (!container) return; if (container->default_node) qd_container_destroy_node(container->default_node); - qd_link_t *link = DEQ_HEAD(container->links); while (link) { DEQ_REMOVE_HEAD(container->links); diff --git a/src/parse.c b/src/parse.c index 0358590..ecfbe0f 100644 --- a/src/parse.c +++ b/src/parse.c @@ -654,12 +654,18 @@ int is_tag_a_map(uint8_t tag) int qd_parse_is_map(qd_parsed_field_t *field) { + if (!field) + return 0; + return is_tag_a_map(field->tag); } int qd_parse_is_list(qd_parsed_field_t *field) { + if (!field) + return 0; + return field->tag == QD_AMQP_LIST8 || field->tag == QD_AMQP_LIST32; } diff --git a/src/policy.c b/src/policy.c index 81eaee0..aa01ea8 100644 --- a/src/policy.c +++ b/src/policy.c @@ -439,6 +439,11 @@ bool qd_policy_open_lookup_user( settings->allowUserIdProxy = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowUserIdProxy", false); settings->allowWaypointLinks = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowWaypointLinks", true); settings->allowDynamicLinkRoutes = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicLinkRoutes", true); + + // + // By default, deleting connections are enabled. To disable, set the allowAdminStatusUpdate to false in a policy. + // + settings->allowAdminStatusUpdate = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAdminStatusUpdate", true); if (settings->sources == 0) { //don't override if configured by authz plugin settings->sources = qd_entity_get_string((qd_entity_t*)upolicy, "sources"); } @@ -629,6 +634,7 @@ bool _qd_policy_approve_link_name(const char *username, const char *allowed, con // degenerate case of blank proposed name being opened. will never match anything. return false; } + size_t a_len = strlen(allowed); if (a_len == 0) { // no names in 'allowed'. @@ -651,6 +657,7 @@ bool _qd_policy_approve_link_name(const char *username, const char *allowed, con free(dup); return false; } + size_t pName_sz = QPALN_SIZE; bool result = false; diff --git a/src/policy.h b/src/policy.h index f1b9d0d..dfaed44 100644 --- a/src/policy.h +++ b/src/policy.h @@ -53,6 +53,7 @@ struct qd_policy__settings_s { bool allowUserIdProxy; bool allowWaypointLinks; bool allowDynamicLinkRoutes; + bool allowAdminStatusUpdate; char *sources; char *targets; char *sourcePattern; diff --git a/src/router_core/agent.c b/src/router_core/agent.c index 71b39f5..f355573 100644 --- a/src/router_core/agent.c +++ b/src/router_core/agent.c @@ -414,7 +414,7 @@ static void qdr_manage_delete_CT(qdr_core_t *core, qdr_action_t *action, bool di case QD_ROUTER_CONFIG_ADDRESS: qdra_config_address_delete_CT(core, query, name, identity); break; case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_delete_CT(core, query, name, identity); break; case QD_ROUTER_CONFIG_AUTO_LINK: qdra_config_auto_link_delete_CT(core, query, name, identity); break; - case QD_ROUTER_CONNECTION: break; + case QD_ROUTER_CONNECTION: qdr_agent_forbidden(core, query, false); break; case QD_ROUTER_ROUTER: qdr_agent_forbidden(core, query, false); break; case QD_ROUTER_LINK: break; case QD_ROUTER_ADDRESS: break; @@ -439,7 +439,7 @@ static void qdr_manage_update_CT(qdr_core_t *core, qdr_action_t *action, bool di case QD_ROUTER_CONFIG_ADDRESS: break; case QD_ROUTER_CONFIG_LINK_ROUTE: break; case QD_ROUTER_CONFIG_AUTO_LINK: break; - case QD_ROUTER_CONNECTION: break; + case QD_ROUTER_CONNECTION: qdra_connection_update_CT(core, name, identity, query, in_body); break; case QD_ROUTER_ROUTER: break; case QD_ROUTER_LINK: qdra_link_update_CT(core, name, identity, query, in_body); break; case QD_ROUTER_ADDRESS: break; diff --git a/src/router_core/agent_connection.c b/src/router_core/agent_connection.c index 06bc9cf..22b5625 100644 --- a/src/router_core/agent_connection.c +++ b/src/router_core/agent_connection.c @@ -41,10 +41,19 @@ #define QDR_CONNECTION_SSL 16 #define QDR_CONNECTION_OPENED 17 #define QDR_CONNECTION_ACTIVE 18 +#define QDR_CONNECTION_ADMIN_STATUS 19 +#define QDR_CONNECTION_OPER_STATUS 20 const char * const QDR_CONNECTION_DIR_IN = "in"; const char * const QDR_CONNECTION_DIR_OUT = "out"; +const char * QDR_CONNECTION_ADMIN_STATUS_DELETED = "deleted"; +const char * QDR_CONNECTION_ADMIN_STATUS_ENABLED = "enabled"; + +const char * QDR_CONNECTION_OPER_STATUS_UP = "up"; +const char * QDR_CONNECTION_OPER_STATUS_CLOSING = "closing"; + + const char *qdr_connection_roles[] = {"normal", "inter-router", @@ -72,6 +81,8 @@ const char *qdr_connection_columns[] = "ssl", "opened", "active", + "adminStatus", + "operStatus", 0}; const char *CONNECTION_TYPE = "org.apache.qpid.dispatch.connection"; @@ -102,6 +113,7 @@ static void qd_get_next_pn_data(pn_data_t **data, const char **d, int *d1) static void qdr_connection_insert_column_CT(qdr_core_t *core, qdr_connection_t *conn, int col, qd_composed_field_t *body, bool as_map) { char id_str[100]; + const char *text = 0; if (as_map) qd_compose_insert_string(body, qdr_connection_columns[col]); @@ -215,6 +227,16 @@ static void qdr_connection_insert_column_CT(qdr_core_t *core, qdr_connection_t * } break; + case QDR_CONNECTION_ADMIN_STATUS: + text = conn->closed ? QDR_CONNECTION_ADMIN_STATUS_DELETED : QDR_CONNECTION_ADMIN_STATUS_ENABLED; + qd_compose_insert_string(body, text); + break; + + case QDR_CONNECTION_OPER_STATUS: + text = conn->closed ? QDR_CONNECTION_OPER_STATUS_CLOSING : QDR_CONNECTION_OPER_STATUS_UP; + qd_compose_insert_string(body, text); + break; + case QDR_CONNECTION_PROPERTIES: { pn_data_t *data = conn->connection_info->connection_properties; qd_compose_start_map(body); @@ -364,6 +386,17 @@ static void qdr_manage_write_connection_map_CT(qdr_core_t *core, qd_compose_end_map(body); } +static qdr_connection_t *_find_conn_CT(qdr_core_t *core, uint64_t conn_id) +{ + qdr_connection_t *conn = DEQ_HEAD(core->open_connections); + while (conn) { + if (conn->identity == conn_id) + break; + conn = DEQ_NEXT(conn); + } + return conn; +} + static qdr_connection_t *qdr_connection_find_by_identity_CT(qdr_core_t *core, qd_iterator_t *identity) { @@ -419,3 +452,126 @@ void qdra_connection_get_CT(qdr_core_t *core, // qdr_agent_enqueue_response_CT(core, query); } + + +static void qdra_connection_set_bad_request(qdr_query_t *query) +{ + query->status = QD_AMQP_BAD_REQUEST; + qd_compose_start_map(query->body); + qd_compose_end_map(query->body); +} + + +static void qdra_connection_update_set_status(qdr_core_t *core, qdr_query_t *query, qdr_connection_t *conn, qd_parsed_field_t *admin_state) +{ + if (conn) { + qd_iterator_t *admin_status_iter = qd_parse_raw(admin_state); + + if (qd_iterator_equal(admin_status_iter, (unsigned char*) QDR_CONNECTION_ADMIN_STATUS_DELETED)) { + // This connection has been force-closed. + // Inter-router and edge connections may not be force-closed + if (conn->role != QDR_ROLE_INTER_ROUTER && conn->role != QDR_ROLE_EDGE_CONNECTION) { + conn->closed = true; + conn->error = qdr_error(QD_AMQP_COND_CONNECTION_FORCED, "Connection forced-closed by management request"); + conn->admin_status = QDR_CONN_ADMIN_DELETED; + + qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"] Connection force-closed by request from connection [C%"PRIu64"]", conn->identity, query->in_conn); + + //Activate the connection, so the I/O threads can finish the job. + qdr_connection_activate_CT(core, conn); + query->status = QD_AMQP_OK; + qdr_manage_write_connection_map_CT(core, conn, query->body, qdr_connection_columns); + } + else { + // + // You are trying to delete an inter-router connection and that is always forbidden, no matter what + // policy rights you have. + // + query->status = QD_AMQP_FORBIDDEN; + query->status.description = "You are not allowed to perform this operation."; + qd_compose_start_map(query->body); + qd_compose_end_map(query->body); + } + + } + else if (qd_iterator_equal(admin_status_iter, (unsigned char*) QDR_CONNECTION_ADMIN_STATUS_ENABLED)) { + query->status = QD_AMQP_OK; + qdr_manage_write_connection_map_CT(core, conn, query->body, qdr_connection_columns); + } + else { + qdra_connection_set_bad_request(query); + } + } + else { + query->status = QD_AMQP_NOT_FOUND; + qd_compose_start_map(query->body); + qd_compose_end_map(query->body); + } +} + + + +void qdra_connection_update_CT(qdr_core_t *core, + qd_iterator_t *name, + qd_iterator_t *identity, + qdr_query_t *query, + qd_parsed_field_t *in_body) +{ + // If the request was successful then the statusCode MUST contain 200 (OK) and the body of the message + // MUST contain a map containing the actual attributes of the entity updated. These MAY differ from those + // requested. + // A map containing attributes that are not applicable for the entity being created, or invalid values for a + // given attribute, MUST result in a failure response with a statusCode of 400 (Bad Request). + if (qd_parse_is_map(in_body)) { + // The absence of an attribute name implies that the entity should retain its already existing value. + // If the map contains a key-value pair where the value is null then the updated entity should have no value + // for that attribute, removing any previous value. + qd_parsed_field_t *admin_state = qd_parse_value_by_key(in_body, qdr_connection_columns[QDR_CONNECTION_ADMIN_STATUS]); + + // Find the connection that the user connected on. This connection must have the correct policy rights which + // will allow the user on this connection to terminate some other connection. + qdr_connection_t *user_conn = _find_conn_CT(core, query->in_conn); + + if (!user_conn) { + // This is bad. The user connection (that was requesting that some + // other connection be dropped) is gone + query->status.description = "Parent connection no longer exists"; + qdra_connection_set_bad_request(query); + } + + else { + if (!user_conn->policy_allow_admin_status_update) { + // + // Policy on the connection that is requesting that some other connection be deleted does not allow + // for the other connection to be deleted.Set the status to QD_AMQP_FORBIDDEN and just quit. + // + query->status = QD_AMQP_FORBIDDEN; + query->status.description = "You are not allowed to perform this operation."; + qd_compose_start_map(query->body); + qd_compose_end_map(query->body); + } + else if (admin_state) { //admin state is the only field that can be updated via the update management request + if (identity) { + qdr_connection_t *conn = qdr_connection_find_by_identity_CT(core, identity); + qdra_connection_update_set_status(core, query, conn, admin_state); + } + else { + qdra_connection_set_bad_request(query); + } + } + else + qdra_connection_set_bad_request(query); + } + } + else + qdra_connection_set_bad_request(query); + + // + // Enqueue the response. + // + qdr_agent_enqueue_response_CT(core, query); + + + +} + diff --git a/src/router_core/agent_connection.h b/src/router_core/agent_connection.h index 7536969..0b764b1 100644 --- a/src/router_core/agent_connection.h +++ b/src/router_core/agent_connection.h @@ -29,8 +29,13 @@ void qdra_connection_get_CT(qdr_core_t *core, qdr_query_t *query, const char *qdr_connection_columns[]); +void qdra_connection_update_CT(qdr_core_t *core, + qd_iterator_t *name, + qd_iterator_t *identity, + qdr_query_t *query, + qd_parsed_field_t *in_body); -#define QDR_CONNECTION_COLUMN_COUNT 19 +#define QDR_CONNECTION_COLUMN_COUNT 21 const char *qdr_connection_columns[QDR_CONNECTION_COLUMN_COUNT + 1]; #endif diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 96f176b..6bdc781 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -73,6 +73,7 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core, bool strip_annotations_in, bool strip_annotations_out, bool policy_allow_dynamic_link_routes, + bool policy_allow_admin_status_update, int link_capacity, const char *vhost, qdr_connection_info_t *connection_info, @@ -93,8 +94,11 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core, conn->strip_annotations_in = strip_annotations_in; conn->strip_annotations_out = strip_annotations_out; conn->policy_allow_dynamic_link_routes = policy_allow_dynamic_link_routes; + conn->policy_allow_admin_status_update = policy_allow_admin_status_update; conn->link_capacity = link_capacity; conn->mask_bit = -1; + conn->admin_status = QDR_CONN_ADMIN_ENABLED; + conn->oper_status = QDR_CONN_OPER_UP; DEQ_INIT(conn->links); DEQ_INIT(conn->work_list); conn->connection_info->role = conn->role; @@ -223,6 +227,11 @@ int qdr_connection_process(qdr_connection_t *conn) int event_count = 0; + if (conn->closed) { + core->conn_close_handler(core->user_context, conn, conn->error); + return 0; + } + sys_mutex_lock(conn->work_lock); DEQ_MOVE(conn->work_list, work_list); for (int priority = 0; priority <= QDR_MAX_PRIORITY; ++ priority) { @@ -552,7 +561,8 @@ void qdr_connection_handlers(qdr_core_t *core, qdr_link_drain_t drain, qdr_link_push_t push, qdr_link_deliver_t deliver, - qdr_delivery_update_t delivery_update) + qdr_delivery_update_t delivery_update, + qdr_connection_close_t conn_close) { core->user_context = context; core->first_attach_handler = first_attach; @@ -565,6 +575,7 @@ void qdr_connection_handlers(qdr_core_t *core, core->push_handler = push; core->deliver_handler = deliver; core->delivery_update_handler = delivery_update; + core->conn_close_handler = conn_close; } @@ -1198,6 +1209,7 @@ void qdr_connection_free(qdr_connection_t *conn) { sys_mutex_free(conn->work_lock); free(conn->tenant_space); + qdr_error_free(conn->error); qdr_connection_info_free(conn->connection_info); free_qdr_connection_t(conn); } diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index eb58837..abe695f 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -636,6 +636,18 @@ ALLOC_DECLARE(qdr_connection_info_t); DEQ_DECLARE(qdr_link_route_t, qdr_link_route_list_t); + +typedef enum { + QDR_CONN_OPER_UP, +} qdr_conn_oper_status_t; + + +typedef enum { + QDR_CONN_ADMIN_ENABLED, + QDR_CONN_ADMIN_DELETED +} qdr_conn_admin_status_t; + + struct qdr_connection_t { DEQ_LINKS(qdr_connection_t); DEQ_LINKS_N(ACTIVATE, qdr_connection_t); @@ -649,6 +661,7 @@ struct qdr_connection_t { bool strip_annotations_in; bool strip_annotations_out; bool policy_allow_dynamic_link_routes; + bool policy_allow_admin_status_update; int link_capacity; int mask_bit; qdr_connection_work_list_t work_list; @@ -660,6 +673,10 @@ struct qdr_connection_t { qdr_connection_info_t *connection_info; void *user_context; /* Updated from IO thread, use work_lock */ qdr_link_route_list_t conn_link_routes; // connection scoped link routes + qdr_conn_oper_status_t oper_status; + qdr_conn_admin_status_t admin_status; + qdr_error_t *error; + bool closed; // This bit is used in the case where a client is trying to force close this connection. }; ALLOC_DECLARE(qdr_connection_t); @@ -801,17 +818,18 @@ struct qdr_core_t { // // Connection section // - void *user_context; - qdr_link_first_attach_t first_attach_handler; - qdr_link_second_attach_t second_attach_handler; - qdr_link_detach_t detach_handler; - qdr_link_flow_t flow_handler; - qdr_link_offer_t offer_handler; - qdr_link_drained_t drained_handler; - qdr_link_drain_t drain_handler; - qdr_link_push_t push_handler; - qdr_link_deliver_t deliver_handler; - qdr_delivery_update_t delivery_update_handler; + void *user_context; + qdr_link_first_attach_t first_attach_handler; + qdr_link_second_attach_t second_attach_handler; + qdr_link_detach_t detach_handler; + qdr_link_flow_t flow_handler; + qdr_link_offer_t offer_handler; + qdr_link_drained_t drained_handler; + qdr_link_drain_t drain_handler; + qdr_link_push_t push_handler; + qdr_link_deliver_t deliver_handler; + qdr_delivery_update_t delivery_update_handler; + qdr_connection_close_t conn_close_handler; // // Events section diff --git a/src/router_node.c b/src/router_node.c index 3ae2813..ca78365 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -1175,6 +1175,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool conn->strip_annotations_in, conn->strip_annotations_out, conn->policy_settings ? conn->policy_settings->allowDynamicLinkRoutes : true, + conn->policy_settings ? conn->policy_settings->allowAdminStatusUpdate : true, link_capacity, vhost, connection_info, @@ -1360,6 +1361,27 @@ static void CORE_link_second_attach(void *context, qdr_link_t *link, qdr_terminu pn_link_open(qd_link_pn(qlink)); } +static void CORE_close_connection(void *context, qdr_connection_t *qdr_conn, qdr_error_t *error) +{ + if (qdr_conn) { + qd_connection_t *qd_conn = qdr_connection_get_context(qdr_conn); + if (qd_conn) { + pn_connection_t *pn_conn = qd_connection_pn(qd_conn); + if (pn_conn) { + // + // Go down to the transport and close the head and tail. This will + // drop the socket to the peer without providing any error indication. + // Due to issues in Proton that cause different behaviors in different + // bindings depending on whether there is a connection:forced error, + // this has been deemed the best way to force the peer to reconnect. + // + pn_transport_t *tport = pn_connection_transport(pn_conn); + pn_transport_close_head(tport); + pn_transport_close_tail(tport); + } + } + } +} static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close) { @@ -1703,7 +1725,8 @@ void qd_router_setup_late(qd_dispatch_t *qd) CORE_link_drain, CORE_link_push, CORE_link_deliver, - CORE_delivery_update); + CORE_delivery_update, + CORE_close_connection); qd_router_python_setup(qd->router); qd_timer_schedule(qd->router->timer, 1000); diff --git a/tests/one-router-policy/default.json b/tests/one-router-policy/default.json index 55f1da6..2b5820a 100644 --- a/tests/one-router-policy/default.json +++ b/tests/one-router-policy/default.json @@ -25,8 +25,9 @@ "remoteHosts": "*", "allowDynamicSource": true, "allowAnonymousSender": true, - "targets": "*", - "sources": "*" + "allowAdminStatusUpdate": false, + "targets": "*", + "sources": "*" } } } diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py index 7b587d2..58ac120 100644 --- a/tests/system_tests_one_router.py +++ b/tests/system_tests_one_router.py @@ -24,12 +24,13 @@ from __future__ import print_function import unittest2 as unittest from proton import Condition, Message, Delivery, Url, symbol, Timeout -from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, DIR +from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, DIR, Process from proton.handlers import MessagingHandler, TransactionHandler from proton.reactor import Container, AtMostOnce, AtLeastOnce, DynamicNodeProperties, LinkOption, ApplicationEvent, EventInjector from proton.utils import BlockingConnection, SyncRequestResponse from qpid_dispatch.management.client import Node import os, json +from subprocess import PIPE, STDOUT CONNECTION_PROPERTIES_UNICODE_STRING = {u'connection': u'properties', u'int_property': 6451} CONNECTION_PROPERTIES_SYMBOL = dict() @@ -94,6 +95,18 @@ class OneRouterTest(TestCase): cls.out_strip_addr = cls.router.addresses[3] cls.in_strip_addr = cls.router.addresses[4] + def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None): + p = self.popen( + ['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address, '--indent=-1', '--timeout', str(TIMEOUT)], + stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect, + universal_newlines=True) + out = p.communicate(input)[0] + try: + p.teardown() + except Exception as e: + raise Exception(out if out else str(e)) + return out + def test_01_listen_error(self): # Make sure a router exits if a initial listener fails, doesn't hang. @@ -443,6 +456,43 @@ class OneRouterTest(TestCase): test.run() self.assertEqual(None, test.error) + def test_44_delete_connection_fail(self): + """ + This test creates a blocking connection and tries to update the adminStatus on that connection to "deleted". + Since the policy associated with this router set allowAdminStatusUpdate as false, + the update operation will not be permitted. + """ + + # Create a connection with some properties so we can easily identify the connection + connection = BlockingConnection(self.address, + properties=CONNECTION_PROPERTIES_UNICODE_STRING) + query_command = 'QUERY --type=connection' + outputs = json.loads(self.run_qdmanage(query_command)) + identity = None + passed = False + + for output in outputs: + if output.get('properties'): + conn_properties = output['properties'] + # Find the connection that has our properties - CONNECTION_PROPERTIES_UNICODE_STRING + # Delete that connection and run another qdmanage to see + # if the connection is gone. + if conn_properties.get('int_property'): + identity = output.get("identity") + print (identity) + if identity: + update_command = 'UPDATE --type=connection adminStatus=deleted --id=' + identity + try: + outputs = json.loads(self.run_qdmanage(update_command)) + except Exception as e: + print (e) + if "Forbidden" in e.message: + passed = True + + # The test has passed since we were not allowed to delete a connection + # because we do not have the policy permission to do so. + self.assertTrue(passed) + class Entity(object): def __init__(self, status_code, status_description, attrs): diff --git a/tests/system_tests_qdmanage.py b/tests/system_tests_qdmanage.py index 94cc7f3..7df80f6 100644 --- a/tests/system_tests_qdmanage.py +++ b/tests/system_tests_qdmanage.py @@ -28,9 +28,13 @@ from system_test import TestCase, Process, Qdrouterd, main_module, TIMEOUT, DIR from subprocess import PIPE, STDOUT from qpid_dispatch_internal.compat import dictify from qpid_dispatch_internal.management.qdrouter import QdSchema +from proton.handlers import MessagingHandler +from proton.utils import BlockingConnection DUMMY = "org.apache.qpid.dispatch.dummy" +CONNECTION_PROPERTIES_UNICODE_STRING = {u'connection': u'properties', u'int_property': 6451} + TOTAL_ENTITIES=29 # for tests that check the total # of entities @@ -53,6 +57,7 @@ class QdmanageTest(TestCase): 'privateKeyFile': cls.ssl_file('server-private-key.pem'), 'password': 'server-password'}), ('listener', {'port': cls.tester.get_port()}), + ('connector', {'role': 'inter-router', 'port': cls.inter_router_port}), ('address', {'name': 'test-address', 'prefix': 'abcd', 'distribution': 'multicast'}), ('linkRoute', {'name': 'test-link-route', 'prefix': 'xyz', 'direction': 'in'}), @@ -430,6 +435,36 @@ class QdmanageTest(TestCase): self.run_qdmanage('DELETE --type=sslProfile --name=' + ssl_profile_name) + def test_delete_connection(self): + """ + This test creates a blocking connection and tries to delete that connection using qdmanage DELETE operation. + Make sure we are Forbidden from deleting a connection because qdmanage DELETEs are not allowed on a connection + Only qdmanage UPDATEs are allowed.. + :return: + """ + connection = BlockingConnection(self.address(), properties=CONNECTION_PROPERTIES_UNICODE_STRING) + query_command = 'QUERY --type=connection' + outputs = json.loads(self.run_qdmanage(query_command)) + identity = None + passed = False + for output in outputs: + if output.get('properties'): + conn_properties = output['properties'] + if conn_properties.get('int_property'): + identity = output.get("identity") + print (identity) + if identity: + delete_command = 'DELETE --type=connection --id=' + identity + try: + outs = json.loads(self.run_qdmanage(delete_command)) + except Exception as e: + if "Forbidden" in e.message: + passed = True + + # The test has passed since we were forbidden from deleting a connection + # due to lack of policy permissions. + self.assertTrue(passed) + def test_create_delete_address_pattern(self): config = [('mercury.*.earth.#', 'closest'), ('*/mars/*/#', 'multicast'), @@ -480,6 +515,5 @@ class QdmanageTest(TestCase): for p in config: self.assertNotEqual(p[0], pattern) - if __name__ == '__main__': unittest.main(main_module()) diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py index 550687c..9fa7f2a 100644 --- a/tests/system_tests_two_routers.py +++ b/tests/system_tests_two_routers.py @@ -23,20 +23,20 @@ from __future__ import absolute_import from __future__ import print_function from time import sleep -import json +import json, os import unittest2 as unittest import logging from threading import Timer from subprocess import PIPE, STDOUT from proton import Message, Timeout, Delivery -from system_test import TestCase, Process, Qdrouterd, main_module, TIMEOUT +from system_test import TestCase, Process, Qdrouterd, main_module, TIMEOUT, DIR from system_test import AsyncTestReceiver from proton.handlers import MessagingHandler from proton.reactor import Container, AtLeastOnce from proton.utils import BlockingConnection from qpid_dispatch.management.client import Node - +CONNECTION_PROPERTIES_UNICODE_STRING = {u'connection': u'properties', u'int_property': 6451} class TwoRouterTest(TestCase): @@ -48,13 +48,13 @@ class TwoRouterTest(TestCase): super(TwoRouterTest, cls).setUpClass() def router(name, client_server, connection): + policy_config_path = os.path.join(DIR, 'two-router-policy') config = [ # Use the deprecated attributes helloInterval, raInterval, raIntervalFlux, remoteLsMaxAge # The routers should still start successfully after using these deprecated entities. ('router', {'remoteLsMaxAge': 60, 'helloInterval': 1, 'raInterval': 30, 'raIntervalFlux': 4, 'mode': 'interior', 'id': 'QDR.%s'%name, 'allowUnsettledMulticast': 'yes'}), - ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'linkCapacity': 500}), ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}), @@ -100,6 +100,21 @@ class TwoRouterTest(TestCase): cls.routers[0].wait_router_connected('QDR.B') cls.routers[1].wait_router_connected('QDR.A') + def address(self): + return self.routers[0].addresses[0] + + def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None): + p = self.popen( + ['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)], + stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect, + universal_newlines=True) + out = p.communicate(input)[0] + try: + p.teardown() + except Exception as e: + raise Exception(out if out else str(e)) + return out + def test_01_pre_settled(self): test = DeliveriesInTransit(self.routers[0].addresses[0], self.routers[1].addresses[0]) test.run() @@ -267,6 +282,186 @@ class TwoRouterTest(TestCase): test.run() self.assertEqual(None, test.error) + def test_19_delete_inter_router_connection(self): + """ + This test tries to delete an inter-router connection but is + prevented from doing so. + """ + query_command = 'QUERY --type=connection' + outputs = json.loads(self.run_qdmanage(query_command)) + identity = None + passed = False + + for output in outputs: + if "inter-router" == output['role']: + identity = output['identity'] + if identity: + update_command = 'UPDATE --type=connection adminStatus=deleted --id=' + identity + try: + json.loads(self.run_qdmanage(update_command)) + except Exception as e: + if "Forbidden" in e.message: + passed = True + + # The test has passed since we were forbidden from deleting + # inter-router connections even though we are allowed to update the adminStatus field. + self.assertTrue(passed) + + def test_20_delete_connection(self): + """ + This test creates a blocking connection and tries to delete that connection. + Since there is no policy associated with this router, the default for allowAdminStatusUpdate is true, + the delete operation will be permitted. + """ + + # Create a connection with some properties so we can easily identify the connection + connection = BlockingConnection(self.address(), + properties=CONNECTION_PROPERTIES_UNICODE_STRING) + query_command = 'QUERY --type=connection' + outputs = json.loads(self.run_qdmanage(query_command)) + identity = None + passed = False + + print () + + for output in outputs: + if output.get('properties'): + conn_properties = output['properties'] + # Find the connection that has our properties - CONNECTION_PROPERTIES_UNICODE_STRING + # Delete that connection and run another qdmanage to see + # if the connection is gone. + if conn_properties.get('int_property'): + identity = output.get("identity") + if identity: + update_command = 'UPDATE --type=connection adminStatus=deleted --id=' + identity + try: + self.run_qdmanage(update_command) + query_command = 'QUERY --type=connection' + outputs = json.loads( + self.run_qdmanage(query_command)) + no_properties = True + for output in outputs: + if output.get('properties'): + no_properties = False + conn_properties = output['properties'] + if conn_properties.get('int_property'): + passed = False + break + else: + passed = True + if no_properties: + passed = True + except Exception as e: + passed = False + + # The test has passed since we were allowed to delete a connection + # because we have the policy permission to do so. + self.assertTrue(passed) + + def test_21_delete_connection_with_receiver(self): + test = DeleteConnectionWithReceiver(self.routers[0].addresses[0]) + self.assertEqual(test.error, None) + test.run() + +class DeleteConnectionWithReceiver(MessagingHandler): + def __init__(self, address): + super(DeleteConnectionWithReceiver, self).__init__() + self.address = address + self.num_messages = 0 + self.mgmt_receiver = None + self.mgmt_receiver_1 = None + self.mgmt_receiver_2 = None + self.conn_to_kill = None + self.mgmt_conn = None + self.mgmt_sender = None + self.success = False + self.error = None + + + def on_start(self, event): + self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) + + # Create a receiver connection with some properties so it + # can be easily identified. + self.conn_to_kill = event.container.connect(self.address, properties=CONNECTION_PROPERTIES_UNICODE_STRING) + self.receiver_to_kill = event.container.create_receiver(self.conn_to_kill, "hello_world") + self.mgmt_conn = event.container.connect(self.address) + self.mgmt_sender = event.container.create_sender(self.mgmt_conn) + self.mgmt_receiver = event.container.create_receiver(self.mgmt_conn, None, dynamic=True) + self.mgmt_receiver_1 = event.container.create_receiver(self.mgmt_conn, + None, + dynamic=True) + self.mgmt_receiver_2 = event.container.create_receiver(self.mgmt_conn, + None, + dynamic=True) + def timeout(self): + self.error = "Timeout Expired: sent=%d, received=%d" % (self.n_sent, self.n_received) + self.mgmt_conn.close() + + def bail(self, error): + self.error = error + self.timer.cancel() + self.mgmt_conn.close() + self.conn_to_kill.close() + + def on_sendable(self, event): + if event.sender == self.mgmt_sender: + if self.num_messages < 1: + request = Message() + request.address = "amqp:/_local/$management" + request.properties = {u'type': u'org.apache.qpid.dispatch.connection', + u'operation': u'QUERY'} + request.reply_to = self.mgmt_receiver.remote_source.address + event.sender.send(request) + self.num_messages += 1 + + def on_message(self, event): + if event.receiver == self.mgmt_receiver: + attribute_names = event.message.body['attributeNames'] + property_index = attribute_names .index('properties') + identity_index = attribute_names .index('identity') + + for result in event.message.body['results']: + if result[property_index]: + properties = result[property_index] + if properties.get('int_property'): + identity = result[identity_index] + print (identity) + if identity: + request = Message() + request.address = "amqp:/_local/$management" + request.properties = { + u'identity': identity, + u'type': u'org.apache.qpid.dispatch.connection', + u'operation': u'UPDATE' + } + request.body = { + u'adminStatus': u'deleted'} + request.reply_to = self.mgmt_receiver_1.remote_source.address + self.mgmt_sender.send(request) + elif event.receiver == self.mgmt_receiver_1: + if event.message.properties['statusDescription'] == 'OK' and event.message.body['adminStatus'] == 'deleted': + request = Message() + request.address = "amqp:/_local/$management" + request.properties = {u'type': u'org.apache.qpid.dispatch.connection', + u'operation': u'QUERY'} + request.reply_to = self.mgmt_receiver_2.remote_source.address + self.mgmt_sender.send(request) + + elif event.receiver == self.mgmt_receiver_2: + attribute_names = event.message.body['attributeNames'] + property_index = attribute_names .index('properties') + identity_index = attribute_names .index('identity') + + for result in event.message.body['results']: + if result[property_index]: + properties = result[property_index] + if properties and properties.get('int_property'): + self.bail("Connection not deleted") + self.bail(None) + + def run(self): + Container(self).run() class Timeout(object): def __init__(self, parent): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org