This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f621c4  DISPATCH-1275 - Enabled deletion of connections based on 
connection id. Added related policy code. By default, a user has privileges to 
delete a connection. A policy needs to disallow this. Inter-router connections 
can never be deleted This closes #467
9f621c4 is described below

commit 9f621c432f7e9dd49156292d1f7e53be4fd7eaf9
Author: Ganesh Murthy <gmur...@redhat.com>
AuthorDate: Wed Mar 6 12:34:13 2019 -0500

    DISPATCH-1275 - Enabled deletion of connections based on connection id. 
Added related policy code. By default, a user has privileges to delete a 
connection. A policy needs to disallow this. Inter-router connections can never 
be deleted
    This closes #467
---
 docs/books/user-guide/configuration-security.adoc  |   5 +
 include/qpid/dispatch/amqp.h                       |   2 +
 include/qpid/dispatch/router_core.h                |   5 +-
 python/qpid_dispatch/management/qdrouter.json      |  16 ++
 .../qpid_dispatch_internal/policy/policy_local.py  |   8 +-
 src/amqp.c                                         |   1 +
 src/container.c                                    |   5 +-
 src/parse.c                                        |   6 +
 src/policy.c                                       |   7 +
 src/policy.h                                       |   1 +
 src/router_core/agent.c                            |   4 +-
 src/router_core/agent_connection.c                 | 156 ++++++++++++++++
 src/router_core/agent_connection.h                 |   7 +-
 src/router_core/connections.c                      |  14 +-
 src/router_core/router_core_private.h              |  40 ++--
 src/router_node.c                                  |  25 ++-
 tests/one-router-policy/default.json               |   5 +-
 tests/system_tests_one_router.py                   |  52 +++++-
 tests/system_tests_qdmanage.py                     |  36 +++-
 tests/system_tests_two_routers.py                  | 203 ++++++++++++++++++++-
 20 files changed, 571 insertions(+), 27 deletions(-)

diff --git a/docs/books/user-guide/configuration-security.adoc 
b/docs/books/user-guide/configuration-security.adoc
index f8795d6..e2a2de2 100644
--- a/docs/books/user-guide/configuration-security.adoc
+++ b/docs/books/user-guide/configuration-security.adoc
@@ -604,6 +604,7 @@ vhost {
         $default: {
             remoteHosts: *
             allowDynamicSource: true,
+            allowAdminStatusUpdate: true,
             sources: myqueue1, myqueue2
             targets: myqueue1, myqueue2
         }
@@ -619,6 +620,10 @@ A list of remote hosts from which the users may connect. A 
host can be a hostnam
 `allowDynamicSource`::
 If true, connections from users in this group are permitted to attach 
receivers to dynamic sources.  This permits creation of listners to temporary 
addresses or termporary queues.  If false, use of dynamic sources is forbidden.
 
+`allowAdminStatusUpdate`::
+If true, connections from users in this group are permitted to modify the 
adminStatus of connections.  This permits termination of sender or receiver 
connections.  If false, the users of this group are prohibited from terminating 
any connections. Inter-router connections can never be terminated by any user 
under any circumstance. Defaults to true, no policy required.
+
+
 `allowWaypointLinks`::
 If true, connections from users in this group are permitted to attach links 
using waypoint capabilities.  This allows endpoints to act as waypoints (i.e. 
brokers) without the need for configuring auto-links.  If false, use of 
waypoint capabilities is forbidden.
 
diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h
index 9f6be37..8a17c4c 100644
--- a/include/qpid/dispatch/amqp.h
+++ b/include/qpid/dispatch/amqp.h
@@ -192,6 +192,8 @@ extern const char * const QD_AMQP_COND_PRECONDITION_FAILED;
 extern const char * const QD_AMQP_COND_RESOURCE_DELETED;
 extern const char * const QD_AMQP_COND_ILLEGAL_STATE;
 extern const char * const QD_AMQP_COND_FRAME_SIZE_TOO_SMALL;
+
+extern const char * const QD_AMQP_COND_CONNECTION_FORCED;
 /// @};
 
 /** @name AMQP link endpoint role. */
diff --git a/include/qpid/dispatch/router_core.h 
b/include/qpid/dispatch/router_core.h
index 65b1393..9f9567d 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -188,6 +188,7 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t          
  *core,
                                         bool                   
strip_annotations_in,
                                         bool                   
strip_annotations_out,
                                         bool                   
policy_allow_dynamic_link_routes,
+                                        bool                   
policy_allow_admin_status_update,
                                         int                    link_capacity,
                                         const char            *vhost,
                                         qdr_connection_info_t *connection_info,
@@ -677,6 +678,7 @@ typedef void (*qdr_link_drain_t)         (void *context, 
qdr_link_t *link, bool
 typedef int  (*qdr_link_push_t)          (void *context, qdr_link_t *link, int 
limit);
 typedef uint64_t (*qdr_link_deliver_t)   (void *context, qdr_link_t *link, 
qdr_delivery_t *delivery, bool settled);
 typedef void (*qdr_delivery_update_t)    (void *context, qdr_delivery_t *dlv, 
uint64_t disp, bool settled);
+typedef void (*qdr_connection_close_t)   (void *context, qdr_connection_t 
*conn, qdr_error_t *error);
 
 void qdr_connection_handlers(qdr_core_t             *core,
                              void                      *context,
@@ -690,7 +692,8 @@ void qdr_connection_handlers(qdr_core_t             *core,
                              qdr_link_drain_t           drain,
                              qdr_link_push_t            push,
                              qdr_link_deliver_t         deliver,
-                             qdr_delivery_update_t      delivery_update);
+                             qdr_delivery_update_t      delivery_update,
+                             qdr_connection_close_t     conn_close);
 
 /**
  ******************************************************************************
diff --git a/python/qpid_dispatch/management/qdrouter.json 
b/python/qpid_dispatch/management/qdrouter.json
index 1d12338..45958b1 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1631,6 +1631,15 @@
             "description": "Connections to the router's container.",
             "extends": "operationalEntity",
             "attributes": {
+                "adminStatus": {
+                    "type": ["enabled", "deleted"],
+                    "default": "enabled",
+                    "description": "This field is set to enabled when the 
connection is up and running. Setting this field to deleted will terminate the 
connection and all links and sessions contained in the connection. Inter-router 
connections cannot be terminated by setting the adminStatus to deleted.",
+                    "update": true
+                },
+                "operStatus": {
+                    "type": ["up", "closing"]
+                },            
                 "container": {
                     "description": "The container for this connection",
                     "type": "string"
@@ -1925,6 +1934,13 @@
                     "required": false,
                     "create": true
                 },
+                "allowAdminStatusUpdate": {
+                    "type": "boolean",
+                    "description": "Whether this connection is allowed to 
update the admin status of other connections. Note: Inter-router connections 
cannot be deleted at any time.",
+                    "default": true,
+                    "required": false,
+                    "create": true
+                },                
                 "sources": {
                     "type": "string",
                     "description": "A list of source addresses from which 
users in this group may receive messages. To specify multiple addresses, 
separate the addresses with either a comma or a space. If you do not specify 
any addresses, users in this group are not allowed to receive messages from any 
addresses. You can use the substitution token '${user}' to specify an address 
that contains a user's authenticated user name. You can use an asterisk ('*') 
wildcard to match one or more ch [...]
diff --git a/python/qpid_dispatch_internal/policy/policy_local.py 
b/python/qpid_dispatch_internal/policy/policy_local.py
index 4191db5..a7ab8b9 100644
--- a/python/qpid_dispatch_internal/policy/policy_local.py
+++ b/python/qpid_dispatch_internal/policy/policy_local.py
@@ -72,6 +72,7 @@ class PolicyKeys(object):
     KW_ALLOW_USERID_PROXY        = "allowUserIdProxy"
     KW_ALLOW_WAYPOINT_LINKS      = "allowWaypointLinks"
     KW_ALLOW_DYNAMIC_LINK_ROUTES = "allowDynamicLinkRoutes"
+    KW_ALLOW_ADMIN_STATUS_UPDATE = "allowAdminStatusUpdate"
     KW_SOURCES                   = "sources"
     KW_TARGETS                   = "targets"
     KW_SOURCE_PATTERN            = "sourcePattern"
@@ -147,6 +148,7 @@ class PolicyCompiler(object):
         PolicyKeys.KW_ALLOW_USERID_PROXY,
         PolicyKeys.KW_ALLOW_WAYPOINT_LINKS,
         PolicyKeys.KW_ALLOW_DYNAMIC_LINK_ROUTES,
+        PolicyKeys.KW_ALLOW_ADMIN_STATUS_UPDATE,
         PolicyKeys.KW_SOURCES,
         PolicyKeys.KW_TARGETS,
         PolicyKeys.KW_SOURCE_PATTERN,
@@ -250,6 +252,7 @@ class PolicyCompiler(object):
         policy_out[PolicyKeys.KW_ALLOW_USERID_PROXY] = False
         policy_out[PolicyKeys.KW_ALLOW_WAYPOINT_LINKS] = True
         policy_out[PolicyKeys.KW_ALLOW_DYNAMIC_LINK_ROUTES] = True
+        policy_out[PolicyKeys.KW_ALLOW_ADMIN_STATUS_UPDATE] = True
         policy_out[PolicyKeys.KW_SOURCES] = ''
         policy_out[PolicyKeys.KW_TARGETS] = ''
         policy_out[PolicyKeys.KW_SOURCE_PATTERN] = ''
@@ -287,7 +290,8 @@ class PolicyCompiler(object):
                          PolicyKeys.KW_ALLOW_DYNAMIC_SRC,
                          PolicyKeys.KW_ALLOW_USERID_PROXY,
                          PolicyKeys.KW_ALLOW_WAYPOINT_LINKS,
-                         PolicyKeys.KW_ALLOW_DYNAMIC_LINK_ROUTES
+                         PolicyKeys.KW_ALLOW_DYNAMIC_LINK_ROUTES,
+                         PolicyKeys.KW_ALLOW_ADMIN_STATUS_UPDATE
                          ]:
                 if isinstance(val, (PY_STRING_TYPE, PY_TEXT_TYPE)) and 
val.lower() in ['true', 'false']:
                     val = True if val == 'true' else False
@@ -599,6 +603,7 @@ class PolicyLocal(object):
         candidate = {}
         name = attributes[PolicyKeys.KW_VHOST_NAME]
         result = self._policy_compiler.compile_access_ruleset(name, 
attributes, candidate, warnings, diag)
+
         if not result:
             raise PolicyError("Policy '%s' is invalid: %s" % (name, diag[0]))
         if len(warnings) > 0:
@@ -802,6 +807,7 @@ class PolicyLocal(object):
                 return False
 
             upolicy.update(ruleset[PolicyKeys.KW_GROUPS][groupname])
+
             upolicy[PolicyKeys.KW_CSTATS] = self.statsdb[vhost].get_cstats()
             return True
         except Exception as e:
diff --git a/src/amqp.c b/src/amqp.c
index 9164d60..d0b19c2 100644
--- a/src/amqp.c
+++ b/src/amqp.c
@@ -84,6 +84,7 @@ const char * const QD_AMQP_COND_PRECONDITION_FAILED = 
"amqp:precondition-failed"
 const char * const QD_AMQP_COND_RESOURCE_DELETED = "amqp:resource-deleted";
 const char * const QD_AMQP_COND_ILLEGAL_STATE = "amqp:illegal-state";
 const char * const QD_AMQP_COND_FRAME_SIZE_TOO_SMALL = 
"amqp:frame-size-too-small";
+const char * const QD_AMQP_COND_CONNECTION_FORCED = "amqp:connection:forced";
 
 const char * const QD_AMQP_PORT_STR = "5672";
 const char * const QD_AMQPS_PORT_STR = "5671";
diff --git a/src/container.c b/src/container.c
index 12e6930..0ecd242 100644
--- a/src/container.c
+++ b/src/container.c
@@ -457,6 +457,10 @@ void qd_container_handle_event(qd_container_t *container, 
pn_event_t *event,
             close_links(container, conn, false);
             pn_connection_close(conn);
             qd_conn_event_batch_complete(container, qd_conn, true);
+        } else if (pn_connection_state(conn) == (PN_LOCAL_CLOSED | 
PN_REMOTE_CLOSED)) {
+            close_links(container, conn, false);
+            notify_closed(container, qd_conn, 
qd_connection_get_context(qd_conn));
+            qd_conn_event_batch_complete(container, qd_conn, true);
         }
         break;
 
@@ -681,7 +685,6 @@ void qd_container_free(qd_container_t *container)
     if (!container) return;
     if (container->default_node)
         qd_container_destroy_node(container->default_node);
-
     qd_link_t *link = DEQ_HEAD(container->links);
     while (link) {
         DEQ_REMOVE_HEAD(container->links);
diff --git a/src/parse.c b/src/parse.c
index 0358590..ecfbe0f 100644
--- a/src/parse.c
+++ b/src/parse.c
@@ -654,12 +654,18 @@ int is_tag_a_map(uint8_t tag)
 
 int qd_parse_is_map(qd_parsed_field_t *field)
 {
+    if (!field)
+        return 0;
+
     return is_tag_a_map(field->tag);
 }
 
 
 int qd_parse_is_list(qd_parsed_field_t *field)
 {
+    if (!field)
+        return 0;
+
     return field->tag == QD_AMQP_LIST8 || field->tag == QD_AMQP_LIST32;
 }
 
diff --git a/src/policy.c b/src/policy.c
index 81eaee0..aa01ea8 100644
--- a/src/policy.c
+++ b/src/policy.c
@@ -439,6 +439,11 @@ bool qd_policy_open_lookup_user(
                     settings->allowUserIdProxy       = 
qd_entity_opt_bool((qd_entity_t*)upolicy, "allowUserIdProxy", false);
                     settings->allowWaypointLinks     = 
qd_entity_opt_bool((qd_entity_t*)upolicy, "allowWaypointLinks", true);
                     settings->allowDynamicLinkRoutes = 
qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicLinkRoutes", true);
+
+                    //
+                    // By default, deleting connections are enabled. To 
disable, set the allowAdminStatusUpdate to false in a policy.
+                    //
+                    settings->allowAdminStatusUpdate = 
qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAdminStatusUpdate", true);
                     if (settings->sources == 0) { //don't override if 
configured by authz plugin
                         settings->sources              = 
qd_entity_get_string((qd_entity_t*)upolicy, "sources");
                     }
@@ -629,6 +634,7 @@ bool _qd_policy_approve_link_name(const char *username, 
const char *allowed, con
         // degenerate case of blank proposed name being opened. will never 
match anything.
         return false;
     }
+
     size_t a_len = strlen(allowed);
     if (a_len == 0) {
         // no names in 'allowed'.
@@ -651,6 +657,7 @@ bool _qd_policy_approve_link_name(const char *username, 
const char *allowed, con
         free(dup);
         return false;
     }
+
     size_t pName_sz = QPALN_SIZE;
 
     bool result = false;
diff --git a/src/policy.h b/src/policy.h
index f1b9d0d..dfaed44 100644
--- a/src/policy.h
+++ b/src/policy.h
@@ -53,6 +53,7 @@ struct qd_policy__settings_s {
     bool allowUserIdProxy;
     bool allowWaypointLinks;
     bool allowDynamicLinkRoutes;
+    bool allowAdminStatusUpdate;
     char *sources;
     char *targets;
     char *sourcePattern;
diff --git a/src/router_core/agent.c b/src/router_core/agent.c
index 71b39f5..f355573 100644
--- a/src/router_core/agent.c
+++ b/src/router_core/agent.c
@@ -414,7 +414,7 @@ static void qdr_manage_delete_CT(qdr_core_t *core, 
qdr_action_t *action, bool di
     case QD_ROUTER_CONFIG_ADDRESS:    qdra_config_address_delete_CT(core, 
query, name, identity); break;
     case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_delete_CT(core, 
query, name, identity); break;
     case QD_ROUTER_CONFIG_AUTO_LINK:  qdra_config_auto_link_delete_CT(core, 
query, name, identity); break;
-    case QD_ROUTER_CONNECTION:        break;
+    case QD_ROUTER_CONNECTION:        qdr_agent_forbidden(core, query, false); 
break;
     case QD_ROUTER_ROUTER:            qdr_agent_forbidden(core, query, false); 
break;
     case QD_ROUTER_LINK:              break;
     case QD_ROUTER_ADDRESS:           break;
@@ -439,7 +439,7 @@ static void qdr_manage_update_CT(qdr_core_t *core, 
qdr_action_t *action, bool di
     case QD_ROUTER_CONFIG_ADDRESS:    break;
     case QD_ROUTER_CONFIG_LINK_ROUTE: break;
     case QD_ROUTER_CONFIG_AUTO_LINK:  break;
-    case QD_ROUTER_CONNECTION:        break;
+    case QD_ROUTER_CONNECTION:        qdra_connection_update_CT(core, name, 
identity, query, in_body); break;
     case QD_ROUTER_ROUTER:            break;
     case QD_ROUTER_LINK:              qdra_link_update_CT(core, name, 
identity, query, in_body); break;
     case QD_ROUTER_ADDRESS:           break;
diff --git a/src/router_core/agent_connection.c 
b/src/router_core/agent_connection.c
index 06bc9cf..22b5625 100644
--- a/src/router_core/agent_connection.c
+++ b/src/router_core/agent_connection.c
@@ -41,10 +41,19 @@
 #define QDR_CONNECTION_SSL              16
 #define QDR_CONNECTION_OPENED           17
 #define QDR_CONNECTION_ACTIVE           18
+#define QDR_CONNECTION_ADMIN_STATUS     19
+#define QDR_CONNECTION_OPER_STATUS      20
 
 const char * const QDR_CONNECTION_DIR_IN  = "in";
 const char * const QDR_CONNECTION_DIR_OUT = "out";
 
+const char * QDR_CONNECTION_ADMIN_STATUS_DELETED = "deleted";
+const char * QDR_CONNECTION_ADMIN_STATUS_ENABLED = "enabled";
+
+const char * QDR_CONNECTION_OPER_STATUS_UP      = "up";
+const char * QDR_CONNECTION_OPER_STATUS_CLOSING = "closing";
+
+
 const char *qdr_connection_roles[] =
     {"normal",
      "inter-router",
@@ -72,6 +81,8 @@ const char *qdr_connection_columns[] =
      "ssl",
      "opened",
      "active",
+     "adminStatus",
+     "operStatus",
      0};
 
 const char *CONNECTION_TYPE = "org.apache.qpid.dispatch.connection";
@@ -102,6 +113,7 @@ static void qd_get_next_pn_data(pn_data_t **data, const 
char **d, int *d1)
 static void qdr_connection_insert_column_CT(qdr_core_t *core, qdr_connection_t 
*conn, int col, qd_composed_field_t *body, bool as_map)
 {
     char id_str[100];
+    const char *text = 0;
 
     if (as_map)
         qd_compose_insert_string(body, qdr_connection_columns[col]);
@@ -215,6 +227,16 @@ static void qdr_connection_insert_column_CT(qdr_core_t 
*core, qdr_connection_t *
         }
         break;
 
+    case QDR_CONNECTION_ADMIN_STATUS:
+        text = conn->closed ? QDR_CONNECTION_ADMIN_STATUS_DELETED : 
QDR_CONNECTION_ADMIN_STATUS_ENABLED;
+        qd_compose_insert_string(body, text);
+        break;
+
+    case QDR_CONNECTION_OPER_STATUS:
+        text = conn->closed ? QDR_CONNECTION_OPER_STATUS_CLOSING : 
QDR_CONNECTION_OPER_STATUS_UP;
+        qd_compose_insert_string(body, text);
+        break;
+
     case QDR_CONNECTION_PROPERTIES: {
         pn_data_t *data = conn->connection_info->connection_properties;
         qd_compose_start_map(body);
@@ -364,6 +386,17 @@ static void qdr_manage_write_connection_map_CT(qdr_core_t  
        *core,
     qd_compose_end_map(body);
 }
 
+static qdr_connection_t *_find_conn_CT(qdr_core_t *core, uint64_t conn_id)
+{
+    qdr_connection_t *conn = DEQ_HEAD(core->open_connections);
+    while (conn) {
+        if (conn->identity == conn_id)
+            break;
+        conn = DEQ_NEXT(conn);
+    }
+    return conn;
+}
+
 
 static qdr_connection_t *qdr_connection_find_by_identity_CT(qdr_core_t *core, 
qd_iterator_t *identity)
 {
@@ -419,3 +452,126 @@ void qdra_connection_get_CT(qdr_core_t    *core,
     //
     qdr_agent_enqueue_response_CT(core, query);
 }
+
+
+static void qdra_connection_set_bad_request(qdr_query_t *query)
+{
+    query->status = QD_AMQP_BAD_REQUEST;
+    qd_compose_start_map(query->body);
+    qd_compose_end_map(query->body);
+}
+
+
+static void qdra_connection_update_set_status(qdr_core_t *core, qdr_query_t 
*query, qdr_connection_t *conn, qd_parsed_field_t *admin_state)
+{
+    if (conn) {
+        qd_iterator_t *admin_status_iter = qd_parse_raw(admin_state);
+
+        if (qd_iterator_equal(admin_status_iter, (unsigned char*) 
QDR_CONNECTION_ADMIN_STATUS_DELETED)) {
+            // This connection has been force-closed.
+            // Inter-router and edge connections may not be force-closed
+            if (conn->role != QDR_ROLE_INTER_ROUTER && conn->role != 
QDR_ROLE_EDGE_CONNECTION) {
+                conn->closed = true;
+                conn->error  = qdr_error(QD_AMQP_COND_CONNECTION_FORCED, 
"Connection forced-closed by management request");
+                conn->admin_status = QDR_CONN_ADMIN_DELETED;
+
+                qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"] Connection 
force-closed by request from connection [C%"PRIu64"]", conn->identity, 
query->in_conn);
+
+                //Activate the connection, so the I/O threads can finish the 
job.
+                qdr_connection_activate_CT(core, conn);
+                query->status = QD_AMQP_OK;
+                qdr_manage_write_connection_map_CT(core, conn, query->body, 
qdr_connection_columns);
+            }
+            else {
+                //
+                // You are trying to delete an inter-router connection and 
that is always forbidden, no matter what
+                // policy rights you have.
+                //
+                query->status = QD_AMQP_FORBIDDEN;
+                query->status.description = "You are not allowed to perform 
this operation.";
+                qd_compose_start_map(query->body);
+                qd_compose_end_map(query->body);
+            }
+
+        }
+        else if (qd_iterator_equal(admin_status_iter, (unsigned char*) 
QDR_CONNECTION_ADMIN_STATUS_ENABLED)) {
+            query->status = QD_AMQP_OK;
+            qdr_manage_write_connection_map_CT(core, conn, query->body, 
qdr_connection_columns);
+        }
+        else {
+            qdra_connection_set_bad_request(query);
+        }
+    }
+    else {
+        query->status = QD_AMQP_NOT_FOUND;
+        qd_compose_start_map(query->body);
+        qd_compose_end_map(query->body);
+    }
+}
+
+
+
+void qdra_connection_update_CT(qdr_core_t      *core,
+                             qd_iterator_t     *name,
+                             qd_iterator_t     *identity,
+                             qdr_query_t       *query,
+                             qd_parsed_field_t *in_body)
+{
+    // If the request was successful then the statusCode MUST contain 200 (OK) 
and the body of the message
+    // MUST contain a map containing the actual attributes of the entity 
updated. These MAY differ from those
+    // requested.
+    // A map containing attributes that are not applicable for the entity 
being created, or invalid values for a
+    // given attribute, MUST result in a failure response with a statusCode of 
400 (Bad Request).
+    if (qd_parse_is_map(in_body)) {
+        // The absence of an attribute name implies that the entity should 
retain its already existing value.
+        // If the map contains a key-value pair where the value is null then 
the updated entity should have no value
+        // for that attribute, removing any previous value.
+        qd_parsed_field_t *admin_state = qd_parse_value_by_key(in_body, 
qdr_connection_columns[QDR_CONNECTION_ADMIN_STATUS]);
+
+        // Find the connection that the user connected on. This connection 
must have the correct policy rights which
+        // will allow the user on this connection to terminate some other 
connection.
+        qdr_connection_t *user_conn = _find_conn_CT(core, query->in_conn);
+
+        if (!user_conn) {
+            // This is bad. The user connection (that was requesting that some
+            // other connection be dropped) is gone
+            query->status.description = "Parent connection no longer exists";
+            qdra_connection_set_bad_request(query);
+        }
+
+        else {
+            if (!user_conn->policy_allow_admin_status_update) {
+                //
+                // Policy on the connection that is requesting that some other 
connection be deleted does not allow
+                // for the other connection to be deleted.Set the status to 
QD_AMQP_FORBIDDEN and just quit.
+                //
+                query->status = QD_AMQP_FORBIDDEN;
+                query->status.description = "You are not allowed to perform 
this operation.";
+                qd_compose_start_map(query->body);
+                qd_compose_end_map(query->body);
+             }
+            else if (admin_state) { //admin state is the only field that can 
be updated via the update management request
+                if (identity) {
+                    qdr_connection_t *conn = 
qdr_connection_find_by_identity_CT(core, identity);
+                    qdra_connection_update_set_status(core, query, conn, 
admin_state);
+                }
+                else {
+                    qdra_connection_set_bad_request(query);
+                }
+            }
+            else
+                qdra_connection_set_bad_request(query);
+        }
+    }
+    else
+        qdra_connection_set_bad_request(query);
+
+    //
+    // Enqueue the response.
+    //
+    qdr_agent_enqueue_response_CT(core, query);
+
+
+
+}
+
diff --git a/src/router_core/agent_connection.h 
b/src/router_core/agent_connection.h
index 7536969..0b764b1 100644
--- a/src/router_core/agent_connection.h
+++ b/src/router_core/agent_connection.h
@@ -29,8 +29,13 @@ void qdra_connection_get_CT(qdr_core_t          *core,
                             qdr_query_t         *query,
                             const char          *qdr_connection_columns[]);
 
+void qdra_connection_update_CT(qdr_core_t      *core,
+                             qd_iterator_t     *name,
+                             qd_iterator_t     *identity,
+                             qdr_query_t       *query,
+                             qd_parsed_field_t *in_body);
 
-#define QDR_CONNECTION_COLUMN_COUNT 19
+#define QDR_CONNECTION_COLUMN_COUNT 21
 const char *qdr_connection_columns[QDR_CONNECTION_COLUMN_COUNT + 1];
 
 #endif
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 96f176b..6bdc781 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -73,6 +73,7 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t            
*core,
                                         bool                   
strip_annotations_in,
                                         bool                   
strip_annotations_out,
                                         bool                   
policy_allow_dynamic_link_routes,
+                                        bool                   
policy_allow_admin_status_update,
                                         int                    link_capacity,
                                         const char            *vhost,
                                         qdr_connection_info_t *connection_info,
@@ -93,8 +94,11 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t           
 *core,
     conn->strip_annotations_in  = strip_annotations_in;
     conn->strip_annotations_out = strip_annotations_out;
     conn->policy_allow_dynamic_link_routes = policy_allow_dynamic_link_routes;
+    conn->policy_allow_admin_status_update = policy_allow_admin_status_update;
     conn->link_capacity         = link_capacity;
     conn->mask_bit              = -1;
+    conn->admin_status          = QDR_CONN_ADMIN_ENABLED;
+    conn->oper_status           = QDR_CONN_OPER_UP;
     DEQ_INIT(conn->links);
     DEQ_INIT(conn->work_list);
     conn->connection_info->role = conn->role;
@@ -223,6 +227,11 @@ int qdr_connection_process(qdr_connection_t *conn)
 
     int event_count = 0;
 
+    if (conn->closed) {
+        core->conn_close_handler(core->user_context, conn, conn->error);
+        return 0;
+    }
+
     sys_mutex_lock(conn->work_lock);
     DEQ_MOVE(conn->work_list, work_list);
     for (int priority = 0; priority <= QDR_MAX_PRIORITY; ++ priority) {
@@ -552,7 +561,8 @@ void qdr_connection_handlers(qdr_core_t                
*core,
                              qdr_link_drain_t           drain,
                              qdr_link_push_t            push,
                              qdr_link_deliver_t         deliver,
-                             qdr_delivery_update_t      delivery_update)
+                             qdr_delivery_update_t      delivery_update,
+                             qdr_connection_close_t     conn_close)
 {
     core->user_context            = context;
     core->first_attach_handler    = first_attach;
@@ -565,6 +575,7 @@ void qdr_connection_handlers(qdr_core_t                
*core,
     core->push_handler            = push;
     core->deliver_handler         = deliver;
     core->delivery_update_handler = delivery_update;
+    core->conn_close_handler      = conn_close;
 }
 
 
@@ -1198,6 +1209,7 @@ void qdr_connection_free(qdr_connection_t *conn)
 {
     sys_mutex_free(conn->work_lock);
     free(conn->tenant_space);
+    qdr_error_free(conn->error);
     qdr_connection_info_free(conn->connection_info);
     free_qdr_connection_t(conn);
 }
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index eb58837..abe695f 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -636,6 +636,18 @@ ALLOC_DECLARE(qdr_connection_info_t);
 
 DEQ_DECLARE(qdr_link_route_t, qdr_link_route_list_t);
 
+
+typedef enum {
+    QDR_CONN_OPER_UP,
+} qdr_conn_oper_status_t;
+
+
+typedef enum {
+    QDR_CONN_ADMIN_ENABLED,
+    QDR_CONN_ADMIN_DELETED
+} qdr_conn_admin_status_t;
+
+
 struct qdr_connection_t {
     DEQ_LINKS(qdr_connection_t);
     DEQ_LINKS_N(ACTIVATE, qdr_connection_t);
@@ -649,6 +661,7 @@ struct qdr_connection_t {
     bool                        strip_annotations_in;
     bool                        strip_annotations_out;
     bool                        policy_allow_dynamic_link_routes;
+    bool                        policy_allow_admin_status_update;
     int                         link_capacity;
     int                         mask_bit;
     qdr_connection_work_list_t  work_list;
@@ -660,6 +673,10 @@ struct qdr_connection_t {
     qdr_connection_info_t      *connection_info;
     void                       *user_context; /* Updated from IO thread, use 
work_lock */
     qdr_link_route_list_t       conn_link_routes;  // connection scoped link 
routes
+    qdr_conn_oper_status_t      oper_status;
+    qdr_conn_admin_status_t     admin_status;
+    qdr_error_t                *error;
+    bool                        closed; // This bit is used in the case where 
a client is trying to force close this connection.
 };
 
 ALLOC_DECLARE(qdr_connection_t);
@@ -801,17 +818,18 @@ struct qdr_core_t {
     //
     // Connection section
     //
-    void                      *user_context;
-    qdr_link_first_attach_t    first_attach_handler;
-    qdr_link_second_attach_t   second_attach_handler;
-    qdr_link_detach_t          detach_handler;
-    qdr_link_flow_t            flow_handler;
-    qdr_link_offer_t           offer_handler;
-    qdr_link_drained_t         drained_handler;
-    qdr_link_drain_t           drain_handler;
-    qdr_link_push_t            push_handler;
-    qdr_link_deliver_t         deliver_handler;
-    qdr_delivery_update_t      delivery_update_handler;
+    void                     *user_context;
+    qdr_link_first_attach_t   first_attach_handler;
+    qdr_link_second_attach_t  second_attach_handler;
+    qdr_link_detach_t         detach_handler;
+    qdr_link_flow_t           flow_handler;
+    qdr_link_offer_t          offer_handler;
+    qdr_link_drained_t        drained_handler;
+    qdr_link_drain_t          drain_handler;
+    qdr_link_push_t           push_handler;
+    qdr_link_deliver_t        deliver_handler;
+    qdr_delivery_update_t     delivery_update_handler;
+    qdr_connection_close_t    conn_close_handler;
 
     //
     // Events section
diff --git a/src/router_node.c b/src/router_node.c
index 3ae2813..ca78365 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1175,6 +1175,7 @@ static void AMQP_opened_handler(qd_router_t *router, 
qd_connection_t *conn, bool
                           conn->strip_annotations_in,
                           conn->strip_annotations_out,
                           conn->policy_settings ? 
conn->policy_settings->allowDynamicLinkRoutes : true,
+                          conn->policy_settings ? 
conn->policy_settings->allowAdminStatusUpdate : true,
                           link_capacity,
                           vhost,
                           connection_info,
@@ -1360,6 +1361,27 @@ static void CORE_link_second_attach(void *context, 
qdr_link_t *link, qdr_terminu
     pn_link_open(qd_link_pn(qlink));
 }
 
+static void CORE_close_connection(void *context, qdr_connection_t *qdr_conn, 
qdr_error_t *error)
+{
+    if (qdr_conn) {
+        qd_connection_t *qd_conn = qdr_connection_get_context(qdr_conn);
+        if (qd_conn) {
+            pn_connection_t *pn_conn = qd_connection_pn(qd_conn);
+            if (pn_conn) {
+                //
+                // Go down to the transport and close the head and tail.  This 
will
+                // drop the socket to the peer without providing any error 
indication.
+                // Due to issues in Proton that cause different behaviors in 
different
+                // bindings depending on whether there is a connection:forced 
error,
+                // this has been deemed the best way to force the peer to 
reconnect.
+                //
+                pn_transport_t *tport = pn_connection_transport(pn_conn);
+                pn_transport_close_head(tport);
+                pn_transport_close_tail(tport);
+            }
+        }
+    }
+}
 
 static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t 
*error, bool first, bool close)
 {
@@ -1703,7 +1725,8 @@ void qd_router_setup_late(qd_dispatch_t *qd)
                             CORE_link_drain,
                             CORE_link_push,
                             CORE_link_deliver,
-                            CORE_delivery_update);
+                            CORE_delivery_update,
+                            CORE_close_connection);
 
     qd_router_python_setup(qd->router);
     qd_timer_schedule(qd->router->timer, 1000);
diff --git a/tests/one-router-policy/default.json 
b/tests/one-router-policy/default.json
index 55f1da6..2b5820a 100644
--- a/tests/one-router-policy/default.json
+++ b/tests/one-router-policy/default.json
@@ -25,8 +25,9 @@
                     "remoteHosts": "*",
                     "allowDynamicSource": true,
                     "allowAnonymousSender": true,
-                   "targets": "*",
-                   "sources": "*"
+                           "allowAdminStatusUpdate": false,
+                           "targets": "*",
+                           "sources": "*"
                 }
              }
          }
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index 7b587d2..58ac120 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -24,12 +24,13 @@ from __future__ import print_function
 
 import unittest2 as unittest
 from proton import Condition, Message, Delivery, Url, symbol, Timeout
-from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, DIR
+from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, DIR, Process
 from proton.handlers import MessagingHandler, TransactionHandler
 from proton.reactor import Container, AtMostOnce, AtLeastOnce, 
DynamicNodeProperties, LinkOption, ApplicationEvent, EventInjector
 from proton.utils import BlockingConnection, SyncRequestResponse
 from qpid_dispatch.management.client import Node
 import os, json
+from subprocess import PIPE, STDOUT
 
 CONNECTION_PROPERTIES_UNICODE_STRING = {u'connection': u'properties', 
u'int_property': 6451}
 CONNECTION_PROPERTIES_SYMBOL = dict()
@@ -94,6 +95,18 @@ class OneRouterTest(TestCase):
         cls.out_strip_addr  = cls.router.addresses[3]
         cls.in_strip_addr   = cls.router.addresses[4]
 
+    def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, 
address=None):
+        p = self.popen(
+            ['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address, 
'--indent=-1', '--timeout', str(TIMEOUT)],
+            stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
+            universal_newlines=True)
+        out = p.communicate(input)[0]
+        try:
+            p.teardown()
+        except Exception as e:
+            raise Exception(out if out else str(e))
+        return out
+
 
     def test_01_listen_error(self):
         # Make sure a router exits if a initial listener fails, doesn't hang.
@@ -443,6 +456,43 @@ class OneRouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_44_delete_connection_fail(self):
+        """
+        This test creates a blocking connection and tries to update the 
adminStatus on that connection to "deleted".
+        Since the policy associated with this router set 
allowAdminStatusUpdate as false,
+        the update operation will not be permitted.
+        """
+
+        # Create a connection with some properties so we can easily identify 
the connection
+        connection = BlockingConnection(self.address,
+                                        
properties=CONNECTION_PROPERTIES_UNICODE_STRING)
+        query_command = 'QUERY --type=connection'
+        outputs = json.loads(self.run_qdmanage(query_command))
+        identity = None
+        passed = False
+
+        for output in outputs:
+            if output.get('properties'):
+                conn_properties = output['properties']
+                # Find the connection that has our properties - 
CONNECTION_PROPERTIES_UNICODE_STRING
+                # Delete that connection and run another qdmanage to see
+                # if the connection is gone.
+                if conn_properties.get('int_property'):
+                    identity = output.get("identity")
+                    print (identity)
+                    if identity:
+                        update_command = 'UPDATE --type=connection 
adminStatus=deleted --id=' + identity
+                        try:
+                            outputs = 
json.loads(self.run_qdmanage(update_command))
+                        except Exception as e:
+                            print (e)
+                            if "Forbidden" in e.message:
+                                passed = True
+
+        # The test has passed since we were not allowed to delete a connection
+        # because we do not have the policy permission to do so.
+        self.assertTrue(passed)
+
 
 class Entity(object):
     def __init__(self, status_code, status_description, attrs):
diff --git a/tests/system_tests_qdmanage.py b/tests/system_tests_qdmanage.py
index 94cc7f3..7df80f6 100644
--- a/tests/system_tests_qdmanage.py
+++ b/tests/system_tests_qdmanage.py
@@ -28,9 +28,13 @@ from system_test import TestCase, Process, Qdrouterd, 
main_module, TIMEOUT, DIR
 from subprocess import PIPE, STDOUT
 from qpid_dispatch_internal.compat import dictify
 from qpid_dispatch_internal.management.qdrouter import QdSchema
+from proton.handlers import MessagingHandler
+from proton.utils import BlockingConnection
 
 DUMMY = "org.apache.qpid.dispatch.dummy"
 
+CONNECTION_PROPERTIES_UNICODE_STRING = {u'connection': u'properties', 
u'int_property': 6451}
+
 TOTAL_ENTITIES=29   # for tests that check the total # of entities
 
 
@@ -53,6 +57,7 @@ class QdmanageTest(TestCase):
                              'privateKeyFile': 
cls.ssl_file('server-private-key.pem'),
                              'password': 'server-password'}),
             ('listener', {'port': cls.tester.get_port()}),
+
             ('connector', {'role': 'inter-router', 'port': 
cls.inter_router_port}),
             ('address', {'name': 'test-address', 'prefix': 'abcd', 
'distribution': 'multicast'}),
             ('linkRoute', {'name': 'test-link-route', 'prefix': 'xyz', 
'direction': 'in'}),
@@ -430,6 +435,36 @@ class QdmanageTest(TestCase):
         self.run_qdmanage('DELETE --type=sslProfile --name=' +
         ssl_profile_name)
 
+    def test_delete_connection(self):
+        """
+        This test creates a blocking connection and tries to delete that 
connection using qdmanage DELETE operation.
+        Make sure we are Forbidden from deleting a connection because qdmanage 
DELETEs are not allowed on a connection
+        Only qdmanage UPDATEs are allowed..
+        :return:
+        """
+        connection = BlockingConnection(self.address(), 
properties=CONNECTION_PROPERTIES_UNICODE_STRING)
+        query_command = 'QUERY --type=connection'
+        outputs = json.loads(self.run_qdmanage(query_command))
+        identity = None
+        passed = False
+        for output in outputs:
+            if output.get('properties'):
+                conn_properties = output['properties']
+                if conn_properties.get('int_property'):
+                    identity = output.get("identity")
+                    print (identity)
+                    if identity:
+                        delete_command = 'DELETE --type=connection --id=' + 
identity
+                        try:
+                            outs = 
json.loads(self.run_qdmanage(delete_command))
+                        except Exception as e:
+                            if "Forbidden" in e.message:
+                                passed = True
+
+        # The test has passed since we were forbidden from deleting a 
connection
+        # due to lack of policy permissions.
+        self.assertTrue(passed)
+
     def test_create_delete_address_pattern(self):
         config = [('mercury.*.earth.#', 'closest'),
                   ('*/mars/*/#', 'multicast'),
@@ -480,6 +515,5 @@ class QdmanageTest(TestCase):
                 for p in config:
                     self.assertNotEqual(p[0], pattern)
 
-
 if __name__ == '__main__':
     unittest.main(main_module())
diff --git a/tests/system_tests_two_routers.py 
b/tests/system_tests_two_routers.py
index 550687c..9fa7f2a 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -23,20 +23,20 @@ from __future__ import absolute_import
 from __future__ import print_function
 
 from time import sleep
-import json
+import json, os
 import unittest2 as unittest
 import logging
 from threading import Timer
 from subprocess import PIPE, STDOUT
 from proton import Message, Timeout, Delivery
-from system_test import TestCase, Process, Qdrouterd, main_module, TIMEOUT
+from system_test import TestCase, Process, Qdrouterd, main_module, TIMEOUT, DIR
 from system_test import AsyncTestReceiver
 
 from proton.handlers import MessagingHandler
 from proton.reactor import Container, AtLeastOnce
 from proton.utils import BlockingConnection
 from qpid_dispatch.management.client import Node
-
+CONNECTION_PROPERTIES_UNICODE_STRING = {u'connection': u'properties', 
u'int_property': 6451}
 
 class TwoRouterTest(TestCase):
 
@@ -48,13 +48,13 @@ class TwoRouterTest(TestCase):
         super(TwoRouterTest, cls).setUpClass()
 
         def router(name, client_server, connection):
+            policy_config_path = os.path.join(DIR, 'two-router-policy')
 
             config = [
                 # Use the deprecated attributes helloInterval, raInterval, 
raIntervalFlux, remoteLsMaxAge
                 # The routers should still start successfully after using 
these deprecated entities.
                 ('router', {'remoteLsMaxAge': 60, 'helloInterval': 1, 
'raInterval': 30, 'raIntervalFlux': 4,
                             'mode': 'interior', 'id': 'QDR.%s'%name, 
'allowUnsettledMulticast': 'yes'}),
-
                 ('listener', {'port': cls.tester.get_port(), 
'stripAnnotations': 'no', 'linkCapacity': 500}),
 
                 ('listener', {'port': cls.tester.get_port(), 
'stripAnnotations': 'no'}),
@@ -100,6 +100,21 @@ class TwoRouterTest(TestCase):
         cls.routers[0].wait_router_connected('QDR.B')
         cls.routers[1].wait_router_connected('QDR.A')
 
+    def address(self):
+        return self.routers[0].addresses[0]
+
+    def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, 
address=None):
+        p = self.popen(
+            ['qdmanage'] + cmd.split(' ') + ['--bus', address or 
self.address(), '--indent=-1', '--timeout', str(TIMEOUT)],
+            stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
+            universal_newlines=True)
+        out = p.communicate(input)[0]
+        try:
+            p.teardown()
+        except Exception as e:
+            raise Exception(out if out else str(e))
+        return out
+
     def test_01_pre_settled(self):
         test = DeliveriesInTransit(self.routers[0].addresses[0], 
self.routers[1].addresses[0])
         test.run()
@@ -267,6 +282,186 @@ class TwoRouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_19_delete_inter_router_connection(self):
+        """
+        This test tries to delete an inter-router connection but is
+        prevented from doing so.
+        """
+        query_command = 'QUERY --type=connection'
+        outputs = json.loads(self.run_qdmanage(query_command))
+        identity = None
+        passed = False
+
+        for output in outputs:
+            if "inter-router" == output['role']:
+                identity = output['identity']
+                if identity:
+                    update_command = 'UPDATE --type=connection 
adminStatus=deleted --id=' + identity
+                    try:
+                        json.loads(self.run_qdmanage(update_command))
+                    except Exception as e:
+                        if "Forbidden" in e.message:
+                            passed = True
+
+        # The test has passed since we were forbidden from deleting
+        # inter-router connections even though we are allowed to update the 
adminStatus field.
+        self.assertTrue(passed)
+
+    def test_20_delete_connection(self):
+        """
+        This test creates a blocking connection and tries to delete that 
connection.
+        Since  there is no policy associated with this router, the default for 
allowAdminStatusUpdate is true,
+        the delete operation will be permitted.
+        """
+
+        # Create a connection with some properties so we can easily identify 
the connection
+        connection = BlockingConnection(self.address(),
+                                        
properties=CONNECTION_PROPERTIES_UNICODE_STRING)
+        query_command = 'QUERY --type=connection'
+        outputs = json.loads(self.run_qdmanage(query_command))
+        identity = None
+        passed = False
+
+        print ()
+
+        for output in outputs:
+            if output.get('properties'):
+                conn_properties = output['properties']
+                # Find the connection that has our properties - 
CONNECTION_PROPERTIES_UNICODE_STRING
+                # Delete that connection and run another qdmanage to see
+                # if the connection is gone.
+                if conn_properties.get('int_property'):
+                    identity = output.get("identity")
+                    if identity:
+                        update_command = 'UPDATE --type=connection 
adminStatus=deleted --id=' + identity
+                        try:
+                            self.run_qdmanage(update_command)
+                            query_command = 'QUERY --type=connection'
+                            outputs = json.loads(
+                                self.run_qdmanage(query_command))
+                            no_properties = True
+                            for output in outputs:
+                                if output.get('properties'):
+                                    no_properties = False
+                                    conn_properties = output['properties']
+                                    if conn_properties.get('int_property'):
+                                        passed = False
+                                        break
+                                    else:
+                                        passed = True
+                            if no_properties:
+                                passed = True
+                        except Exception as e:
+                            passed = False
+
+        # The test has passed since we were allowed to delete a connection
+        # because we have the policy permission to do so.
+        self.assertTrue(passed)
+
+    def test_21_delete_connection_with_receiver(self):
+        test = DeleteConnectionWithReceiver(self.routers[0].addresses[0])
+        self.assertEqual(test.error, None)
+        test.run()
+
+class DeleteConnectionWithReceiver(MessagingHandler):
+    def __init__(self, address):
+        super(DeleteConnectionWithReceiver, self).__init__()
+        self.address = address
+        self.num_messages = 0
+        self.mgmt_receiver = None
+        self.mgmt_receiver_1 = None
+        self.mgmt_receiver_2 = None
+        self.conn_to_kill = None
+        self.mgmt_conn = None
+        self.mgmt_sender = None
+        self.success = False
+        self.error = None
+
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+
+        # Create a receiver connection with some properties so it
+        # can be easily identified.
+        self.conn_to_kill = event.container.connect(self.address, 
properties=CONNECTION_PROPERTIES_UNICODE_STRING)
+        self.receiver_to_kill = 
event.container.create_receiver(self.conn_to_kill, "hello_world")
+        self.mgmt_conn = event.container.connect(self.address)
+        self.mgmt_sender = event.container.create_sender(self.mgmt_conn)
+        self.mgmt_receiver = event.container.create_receiver(self.mgmt_conn, 
None, dynamic=True)
+        self.mgmt_receiver_1 = event.container.create_receiver(self.mgmt_conn,
+                                                             None,
+                                                             dynamic=True)
+        self.mgmt_receiver_2 = event.container.create_receiver(self.mgmt_conn,
+                                                             None,
+                                                             dynamic=True)
+    def timeout(self):
+        self.error = "Timeout Expired: sent=%d, received=%d" % (self.n_sent, 
self.n_received)
+        self.mgmt_conn.close()
+
+    def bail(self, error):
+        self.error = error
+        self.timer.cancel()
+        self.mgmt_conn.close()
+        self.conn_to_kill.close()
+
+    def on_sendable(self, event):
+        if event.sender == self.mgmt_sender:
+            if self.num_messages < 1:
+                request = Message()
+                request.address = "amqp:/_local/$management"
+                request.properties = {u'type': 
u'org.apache.qpid.dispatch.connection',
+                                      u'operation': u'QUERY'}
+                request.reply_to = self.mgmt_receiver.remote_source.address
+                event.sender.send(request)
+                self.num_messages += 1
+
+    def on_message(self, event):
+        if event.receiver == self.mgmt_receiver:
+            attribute_names = event.message.body['attributeNames']
+            property_index = attribute_names .index('properties')
+            identity_index = attribute_names .index('identity')
+
+            for result in event.message.body['results']:
+                if result[property_index]:
+                    properties = result[property_index]
+                    if properties.get('int_property'):
+                        identity = result[identity_index]
+                        print (identity)
+                        if identity:
+                            request = Message()
+                            request.address = "amqp:/_local/$management"
+                            request.properties = {
+                                u'identity': identity,
+                                u'type': 
u'org.apache.qpid.dispatch.connection',
+                                u'operation': u'UPDATE'
+                            }
+                            request.body = {
+                                u'adminStatus':  u'deleted'}
+                            request.reply_to = 
self.mgmt_receiver_1.remote_source.address
+                            self.mgmt_sender.send(request)
+        elif event.receiver == self.mgmt_receiver_1:
+            if event.message.properties['statusDescription'] == 'OK' and 
event.message.body['adminStatus'] == 'deleted':
+                request = Message()
+                request.address = "amqp:/_local/$management"
+                request.properties = {u'type': 
u'org.apache.qpid.dispatch.connection',
+                                      u'operation': u'QUERY'}
+                request.reply_to = self.mgmt_receiver_2.remote_source.address
+                self.mgmt_sender.send(request)
+
+        elif event.receiver == self.mgmt_receiver_2:
+            attribute_names = event.message.body['attributeNames']
+            property_index = attribute_names .index('properties')
+            identity_index = attribute_names .index('identity')
+
+            for result in event.message.body['results']:
+                if result[property_index]:
+                    properties = result[property_index]
+                    if properties and properties.get('int_property'):
+                        self.bail("Connection not deleted")
+            self.bail(None)
+
+    def run(self):
+        Container(self).run()
 
 class Timeout(object):
     def __init__(self, parent):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to