This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push: new b463667 DISPATCH-1907 - Separated policy-spec, distribute policy spec with connection and core subscriptions. b463667 is described below commit b46366762aeebcf3f03f03c200f0b6d2e3d5688e Author: Ted Ross <tr...@apache.org> AuthorDate: Thu Jan 7 11:04:05 2021 -0500 DISPATCH-1907 - Separated policy-spec, distribute policy spec with connection and core subscriptions. --- include/qpid/dispatch/policy_spec.h | 39 ++++++++++++++++ include/qpid/dispatch/protocol_adaptor.h | 4 +- include/qpid/dispatch/router_core.h | 3 +- src/adaptors/http1/http1_client.c | 3 +- src/adaptors/http1/http1_server.c | 3 +- src/adaptors/http2/http2_adaptor.c | 6 +-- src/adaptors/reference_adaptor.c | 3 +- src/adaptors/tcp_adaptor.c | 58 +++++++++++------------ src/policy.c | 70 ++++++++++++++-------------- src/policy.h | 28 ++++------- src/python_embedded.c | 2 +- src/remote_sasl.c | 4 +- src/router_core/agent_conn_link_route.c | 3 +- src/router_core/agent_connection.c | 3 +- src/router_core/connections.c | 6 +-- src/router_core/forwarder.c | 5 +- src/router_core/management_agent.c | 2 +- src/router_core/modules/mobile_sync/mobile.c | 11 +++-- src/router_core/router_core_private.h | 6 +-- src/router_node.c | 5 +- src/server.c | 2 +- 21 files changed, 146 insertions(+), 120 deletions(-) diff --git a/include/qpid/dispatch/policy_spec.h b/include/qpid/dispatch/policy_spec.h new file mode 100644 index 0000000..69e50ed --- /dev/null +++ b/include/qpid/dispatch/policy_spec.h @@ -0,0 +1,39 @@ +#ifndef __policy_spec_h__ +#define __policy_spec_h__ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +typedef struct { + int maxFrameSize; + int maxSessionWindow; + int maxSessions; + int maxSenders; + int maxReceivers; + uint64_t maxMessageSize; + bool allowDynamicSource; + bool allowAnonymousSender; + bool allowUserIdProxy; + bool allowWaypointLinks; + bool allowFallbackLinks; + bool allowDynamicLinkRoutes; + bool allowAdminStatusUpdate; + bool outgoingConnection; +} qd_policy_spec_t; + +#endif diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h index ae7036f..537970e 100644 --- a/include/qpid/dispatch/protocol_adaptor.h +++ b/include/qpid/dispatch/protocol_adaptor.h @@ -20,6 +20,7 @@ */ #include <qpid/dispatch/router_core.h> +#include <qpid/dispatch/policy_spec.h> typedef struct qdr_protocol_adaptor_t qdr_protocol_adaptor_t; typedef struct qdr_connection_t qdr_connection_t; @@ -369,10 +370,9 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core, const char *remote_container_id, bool strip_annotations_in, bool strip_annotations_out, - bool policy_allow_dynamic_link_routes, - bool policy_allow_admin_status_update, int link_capacity, const char *vhost, + const qd_policy_spec_t *policy_spec, qdr_connection_info_t *connection_info, qdr_connection_bind_context_t context_binder, void *bind_token); diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 2b54bb8..cd70f43 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -24,6 +24,7 @@ #include <qpid/dispatch/compose.h> #include <qpid/dispatch/parse.h> #include <qpid/dispatch/router.h> +#include <qpid/dispatch/policy_spec.h> /** @@ -98,7 +99,7 @@ void qdr_core_route_table_handlers(qdr_core_t *core, ****************************************************************************** */ typedef void (*qdr_receive_t) (void *context, qd_message_t *msg, int link_maskbit, int inter_router_cost, - uint64_t conn_id); + uint64_t conn_id, const qd_policy_spec_t *policy); /** * qdr_core_subscribe diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c index 3c58f3b..5115ce4 100644 --- a/src/adaptors/http1/http1_client.c +++ b/src/adaptors/http1/http1_client.c @@ -302,10 +302,9 @@ static void _setup_client_connection(qdr_http1_connection_t *hconn) 0, // remote container id false, // strip annotations in false, // strip annotations out - false, // allow dynamic link routes - false, // allow admin status update DEFAULT_CAPACITY, 0, // vhost + 0, // policy_spec info, 0, // bind context 0); // bind token diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c index 45d31f2..9481a4a 100644 --- a/src/adaptors/http1/http1_server.c +++ b/src/adaptors/http1/http1_server.c @@ -194,10 +194,9 @@ static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *ct 0, // remote container id false, // strip annotations in false, // strip annotations out - false, // allow dynamic link routes - false, // allow admin status update DEFAULT_CAPACITY, 0, // vhost + 0, // policy_spec info, 0, // bind context 0); // bind token diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c index 250cf33..00c1041 100644 --- a/src/adaptors/http2/http2_adaptor.c +++ b/src/adaptors/http2/http2_adaptor.c @@ -1991,10 +1991,9 @@ qdr_http2_connection_t *qdr_http_connection_ingress_accept(qdr_http2_connection_ 0, false, false, - false, - false, 250, 0, + 0, info, 0, 0); @@ -2216,10 +2215,9 @@ qdr_http2_connection_t *qdr_http_connection_egress(qd_http_connector_t *connecto 0, false, false, - false, - false, 250, 0, + 0, info, 0, 0); diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c index 08e7433..2fd3ff2 100644 --- a/src/adaptors/reference_adaptor.c +++ b/src/adaptors/reference_adaptor.c @@ -427,10 +427,9 @@ static void on_startup(void *context) 0, // remote_container_id false, // strip_annotations_in false, // strip_annotations_out - false, // policy_allow_dynamic_link_routes - false, // policy_allow_admin_status_update 250, // link_capacity 0, // vhost + 0, // policy_spec info, // connection_info 0, // context_binder 0); // bind_token diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 3b886b9..6ea544f 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -455,21 +455,20 @@ static void qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc) tc->conn_id = qd_server_allocate_connection_id(tc->server); qdr_connection_t *conn = qdr_connection_opened(tcp_adaptor->core, tcp_adaptor->adaptor, - true, - QDR_ROLE_NORMAL, - 1, - tc->conn_id, - 0, - 0, - false, - false, - false, - false, - 250, - 0, - info, - 0, - 0); + true, // incoming + QDR_ROLE_NORMAL, // role + 1, // cost + tc->conn_id, // management_id + 0, // label + 0, // remote_container_id + false, // strip_annotations_in + false, // strip_annotations_out + 250, // link_capacity + 0, // vhost + 0, // policy_spec + info, // connection_info + 0, // context_binder + 0); // bind_token tc->qdr_conn = conn; qdr_connection_set_context(conn, tc); @@ -640,21 +639,20 @@ static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc) qdr_connection_t *conn = qdr_connection_opened(tcp_adaptor->core, tcp_adaptor->adaptor, - false, - QDR_ROLE_NORMAL, - 1, - tc->conn_id, - 0, - 0, - false, - false, - false, - false, - 250, - 0, - info, - 0, - 0); + false, // incoming + QDR_ROLE_NORMAL, // role + 1, // cost + tc->conn_id, // management_id + 0, // label + 0, // remote_container_id + false, // strip_annotations_in + false, // strip_annotations_out + 250, // link_capacity + 0, // vhost + 0, // policy_spec + info, // connection_info + 0, // context_binder + 0); // bind_token tc->qdr_conn = conn; qdr_connection_set_context(conn, tc); diff --git a/src/policy.c b/src/policy.c index 7486dd7..42697b6 100644 --- a/src/policy.c +++ b/src/policy.c @@ -549,27 +549,27 @@ bool qd_policy_open_fetch_settings( if (result2) { int truthy = PyObject_IsTrue(result2); if (truthy) { - settings->maxFrameSize = qd_entity_opt_long((qd_entity_t*)upolicy, "maxFrameSize", 0); - settings->maxSessionWindow = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessionWindow", 0); - settings->maxSessions = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessions", 0); - settings->maxSenders = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSenders", 0); - settings->maxReceivers = qd_entity_opt_long((qd_entity_t*)upolicy, "maxReceivers", 0); - settings->maxMessageSize = qd_entity_opt_long((qd_entity_t*)upolicy, "maxMessageSize", 0); - if (!settings->allowAnonymousSender) { //don't override if enabled by authz plugin - settings->allowAnonymousSender = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAnonymousSender", false); + settings->spec.maxFrameSize = qd_entity_opt_long((qd_entity_t*)upolicy, "maxFrameSize", 0); + settings->spec.maxSessionWindow = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessionWindow", 0); + settings->spec.maxSessions = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessions", 0); + settings->spec.maxSenders = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSenders", 0); + settings->spec.maxReceivers = qd_entity_opt_long((qd_entity_t*)upolicy, "maxReceivers", 0); + settings->spec.maxMessageSize = qd_entity_opt_long((qd_entity_t*)upolicy, "maxMessageSize", 0); + if (!settings->spec.allowAnonymousSender) { //don't override if enabled by authz plugin + settings->spec.allowAnonymousSender = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAnonymousSender", false); } - if (!settings->allowDynamicSource) { //don't override if enabled by authz plugin - settings->allowDynamicSource = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicSource", false); + if (!settings->spec.allowDynamicSource) { //don't override if enabled by authz plugin + settings->spec.allowDynamicSource = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicSource", false); } - settings->allowUserIdProxy = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowUserIdProxy", false); - settings->allowWaypointLinks = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowWaypointLinks", true); - settings->allowFallbackLinks = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowFallbackLinks", true); - settings->allowDynamicLinkRoutes = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicLinkRoutes", true); + settings->spec.allowUserIdProxy = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowUserIdProxy", false); + settings->spec.allowWaypointLinks = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowWaypointLinks", true); + settings->spec.allowFallbackLinks = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowFallbackLinks", true); + settings->spec.allowDynamicLinkRoutes = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicLinkRoutes", true); // // By default, deleting connections are enabled. To disable, set the allowAdminStatusUpdate to false in a policy. // - settings->allowAdminStatusUpdate = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAdminStatusUpdate", true); + settings->spec.allowAdminStatusUpdate = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAdminStatusUpdate", true); if (settings->sources == 0) { //don't override if configured by authz plugin settings->sources = qd_entity_get_string((qd_entity_t*)upolicy, "sources"); } @@ -640,8 +640,8 @@ bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn) { bool result = true; if (qd_conn->policy_settings) { - if (qd_conn->policy_settings->maxSessions) { - if (qd_conn->n_sessions == qd_conn->policy_settings->maxSessions) { + if (qd_conn->policy_settings->spec.maxSessions) { + if (qd_conn->n_sessions == qd_conn->policy_settings->spec.maxSessions) { qd_policy_deny_amqp_session(ssn, qd_conn); result = false; } @@ -672,9 +672,9 @@ bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn) void qd_policy_apply_session_settings(pn_session_t *ssn, qd_connection_t *qd_conn) { size_t capacity; - if (qd_conn->policy_settings && qd_conn->policy_settings->maxSessionWindow - && !qd_conn->policy_settings->outgoingConnection) { - capacity = qd_conn->policy_settings->maxSessionWindow; + if (qd_conn->policy_settings && qd_conn->policy_settings->spec.maxSessionWindow + && !qd_conn->policy_settings->spec.outgoingConnection) { + capacity = qd_conn->policy_settings->spec.maxSessionWindow; } else { const qd_server_config_t * cf = qd_connection_config(qd_conn); capacity = cf->incoming_capacity; @@ -1107,8 +1107,8 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_ const char *hostip = qd_connection_remote_ip(qd_conn); const char *vhost = pn_connection_remote_hostname(qd_connection_pn(qd_conn)); - if (qd_conn->policy_settings->maxSenders) { - if (qd_conn->n_senders == qd_conn->policy_settings->maxSenders) { + if (qd_conn->policy_settings->spec.maxSenders) { + if (qd_conn->n_senders == qd_conn->policy_settings->spec.maxSenders) { // Max sender limit specified and violated. qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO, "[C%"PRIu64"] DENY AMQP Attach sender for user '%s', rhost '%s', vhost '%s' based on maxSenders limit", @@ -1126,7 +1126,7 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_ bool lookup; if (target && *target) { // a target is specified - if (!qd_conn->policy_settings->allowWaypointLinks) { + if (!qd_conn->policy_settings->spec.allowWaypointLinks) { bool waypoint = qd_policy_terminus_is_waypoint(pn_link_remote_target(pn_link)); if (waypoint) { qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO, @@ -1137,7 +1137,7 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_ } } - if (!qd_conn->policy_settings->allowFallbackLinks) { + if (!qd_conn->policy_settings->spec.allowFallbackLinks) { bool fallback = qd_policy_terminus_is_fallback(pn_link_remote_target(pn_link)); if (fallback) { qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO, @@ -1161,7 +1161,7 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_ } else { // A sender with no remote target. // This happens all the time with anonymous relay - lookup = qd_conn->policy_settings->allowAnonymousSender; + lookup = qd_conn->policy_settings->spec.allowAnonymousSender; qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, (lookup ? QD_LOG_TRACE : QD_LOG_INFO), "[C%"PRIu64"] %s AMQP Attach anonymous sender for user '%s', rhost '%s', vhost '%s'", qd_conn->connection_id, (lookup ? "ALLOW" : "DENY"), qd_conn->user_id, hostip, vhost); @@ -1180,8 +1180,8 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *q const char *hostip = qd_connection_remote_ip(qd_conn); const char *vhost = pn_connection_remote_hostname(qd_connection_pn(qd_conn)); - if (qd_conn->policy_settings->maxReceivers) { - if (qd_conn->n_receivers == qd_conn->policy_settings->maxReceivers) { + if (qd_conn->policy_settings->spec.maxReceivers) { + if (qd_conn->n_receivers == qd_conn->policy_settings->spec.maxReceivers) { // Max sender limit specified and violated. qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO, "[C%"PRIu64"] DENY AMQP Attach receiver for user '%s', rhost '%s', vhost '%s' based on maxReceivers limit", @@ -1197,7 +1197,7 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *q // Approve receiver link based on source bool dynamic_src = pn_terminus_is_dynamic(pn_link_remote_source(pn_link)); if (dynamic_src) { - bool lookup = qd_conn->policy_settings->allowDynamicSource; + bool lookup = qd_conn->policy_settings->spec.allowDynamicSource; qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, (lookup ? QD_LOG_TRACE : QD_LOG_INFO), "[C%"PRIu64"] %s AMQP Attach receiver dynamic source for user '%s', rhost '%s', vhost '%s',", qd_conn->connection_id, (lookup ? "ALLOW" : "DENY"), qd_conn->user_id, hostip, vhost); @@ -1210,7 +1210,7 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *q const char * source = pn_terminus_get_address(pn_link_remote_source(pn_link)); if (source && *source) { // a source is specified - if (!qd_conn->policy_settings->allowWaypointLinks) { + if (!qd_conn->policy_settings->spec.allowWaypointLinks) { bool waypoint = qd_policy_terminus_is_waypoint(pn_link_remote_source(pn_link)); if (waypoint) { qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO, @@ -1221,7 +1221,7 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *q } } - if (!qd_conn->policy_settings->allowFallbackLinks) { + if (!qd_conn->policy_settings->spec.allowFallbackLinks) { bool fallback = qd_policy_terminus_is_fallback(pn_link_remote_source(pn_link)); if (fallback) { qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO, @@ -1286,10 +1286,10 @@ void qd_policy_amqp_open(qd_connection_t *qd_conn) { // This connection is allowed by policy. // Apply transport policy settings if (qd_policy_open_fetch_settings(policy, vhost, settings_name, qd_conn->policy_settings)) { - if (qd_conn->policy_settings->maxFrameSize > 0) - pn_transport_set_max_frame(pn_trans, qd_conn->policy_settings->maxFrameSize); - if (qd_conn->policy_settings->maxSessions > 0) - pn_transport_set_channel_max(pn_trans, qd_conn->policy_settings->maxSessions - 1); + if (qd_conn->policy_settings->spec.maxFrameSize > 0) + pn_transport_set_max_frame(pn_trans, qd_conn->policy_settings->spec.maxFrameSize); + if (qd_conn->policy_settings->spec.maxSessions > 0) + pn_transport_set_channel_max(pn_trans, qd_conn->policy_settings->spec.maxSessions - 1); const qd_server_config_t *cf = qd_connection_config(qd_conn); if (cf && cf->multi_tenant) { char vhost_name_buf[SETTINGS_NAME_SIZE]; @@ -1348,7 +1348,7 @@ void qd_policy_amqp_open_connector(qd_connection_t *qd_conn) { ZERO(qd_conn->policy_settings); if (qd_policy_open_fetch_settings(policy, policy_vhost, POLICY_VHOST_GROUP, qd_conn->policy_settings)) { - qd_conn->policy_settings->outgoingConnection = true; + qd_conn->policy_settings->spec.outgoingConnection = true; qd_conn->policy_counted = true; // Count senders and receivers for this connection } else { qd_log(policy->log_source, diff --git a/src/policy.h b/src/policy.h index ea6ecd7..f05c0d9 100644 --- a/src/policy.h +++ b/src/policy.h @@ -25,6 +25,7 @@ #include "qpid/dispatch/static_assert.h" #include "qpid/dispatch/alloc.h" #include "qpid/dispatch/alloc_pool.h" +#include "qpid/dispatch/policy_spec.h" #include "config.h" #include "entity.h" @@ -44,21 +45,12 @@ struct qd_policy_denial_counts_s { typedef struct qd_policy_t qd_policy_t; +// +// Policy settings are defined in include/qpid/dispatch/policy_settings.h +// + struct qd_policy__settings_s { - int maxFrameSize; - int maxSessionWindow; - int maxSessions; - int maxSenders; - int maxReceivers; - uint64_t maxMessageSize; - bool allowDynamicSource; - bool allowAnonymousSender; - bool allowUserIdProxy; - bool allowWaypointLinks; - bool allowFallbackLinks; - bool allowDynamicLinkRoutes; - bool allowAdminStatusUpdate; - bool outgoingConnection; + qd_policy_spec_t spec; char *sources; char *targets; char *sourcePattern; @@ -208,10 +200,10 @@ void qd_policy_settings_free(qd_policy_settings_t *settings); * @param[in] isReceiver indication to check using receiver settings */ bool qd_policy_approve_link_name(const char *username, - const qd_policy_settings_t *settings, - const char *proposed, - bool isReceiver - ); + const qd_policy_settings_t *settings, + const char *proposed, + bool isReceiver + ); /** Add a hostname to the lookup parse_tree * Note that the parse_tree may store an 'optimised' pattern for a given diff --git a/src/python_embedded.c b/src/python_embedded.c index 2207563..dadf768 100644 --- a/src/python_embedded.c +++ b/src/python_embedded.c @@ -631,7 +631,7 @@ static qd_error_t iter_to_py_attr(qd_iterator_t *iter, } static void qd_io_rx_handler(void *context, qd_message_t *msg, int link_id, int inter_router_cost, - uint64_t ignore) + uint64_t ignore, const qd_policy_spec_t *policy_spec) { IoAdapter *self = (IoAdapter*) context; diff --git a/src/remote_sasl.c b/src/remote_sasl.c index d8ffb22..624ee91 100644 --- a/src/remote_sasl.c +++ b/src/remote_sasl.c @@ -331,8 +331,8 @@ static void set_policy_settings(pn_connection_t* conn, permissions_t* permission if (permissions->sources.start && permissions->sources.capacity) { qd_conn->policy_settings->sources = qd_policy_compile_allowed_csv(permissions->sources.start); } - qd_conn->policy_settings->allowDynamicSource = true; - qd_conn->policy_settings->allowAnonymousSender = true; + qd_conn->policy_settings->spec.allowDynamicSource = true; + qd_conn->policy_settings->spec.allowAnonymousSender = true; } } diff --git a/src/router_core/agent_conn_link_route.c b/src/router_core/agent_conn_link_route.c index b2feefe..7e8b0d3 100644 --- a/src/router_core/agent_conn_link_route.c +++ b/src/router_core/agent_conn_link_route.c @@ -180,7 +180,8 @@ void qdra_conn_link_route_create_CT(qdr_core_t *core, } // fail if forbidden by policy - if (!conn->policy_allow_dynamic_link_routes) { + bool allow = conn->policy_spec ? conn->policy_spec->allowDynamicLinkRoutes : true; + if (!allow) { query->status = QD_AMQP_FORBIDDEN; goto exit; } diff --git a/src/router_core/agent_connection.c b/src/router_core/agent_connection.c index 3e5dbc8..b8cfa7b 100644 --- a/src/router_core/agent_connection.c +++ b/src/router_core/agent_connection.c @@ -608,7 +608,8 @@ void qdra_connection_update_CT(qdr_core_t *core, admin_status_bad_or_forbidden = true; } else { - if (!user_conn->policy_allow_admin_status_update) { + bool allow = user_conn->policy_spec ? user_conn->policy_spec->allowAdminStatusUpdate : true; + if (!allow) { // // Policy on the connection that is requesting that some other connection be deleted does not allow // for the other connection to be deleted.Set the status to QD_AMQP_FORBIDDEN and just quit. diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 05f69c6..2d833e1 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -77,10 +77,9 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core, const char *remote_container_id, bool strip_annotations_in, bool strip_annotations_out, - bool policy_allow_dynamic_link_routes, - bool policy_allow_admin_status_update, int link_capacity, const char *vhost, + const qd_policy_spec_t *policy_spec, qdr_connection_info_t *connection_info, qdr_connection_bind_context_t context_binder, void *bind_token) @@ -99,8 +98,7 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core, conn->inter_router_cost = cost; conn->strip_annotations_in = strip_annotations_in; conn->strip_annotations_out = strip_annotations_out; - conn->policy_allow_dynamic_link_routes = policy_allow_dynamic_link_routes; - conn->policy_allow_admin_status_update = policy_allow_admin_status_update; + conn->policy_spec = policy_spec; conn->link_capacity = link_capacity; conn->mask_bit = -1; conn->admin_status = QDR_CONN_ADMIN_ENABLED; diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 1d1f884..09df514 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -309,7 +309,7 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery void qdr_forward_on_message(qdr_core_t *core, qdr_general_work_t *work) { - work->on_message(work->on_message_context, work->msg, work->maskbit, work->inter_router_cost, work->in_conn_id); + work->on_message(work->on_message_context, work->msg, work->maskbit, work->inter_router_cost, work->in_conn_id, work->policy_spec); qd_message_free(work->msg); } @@ -324,7 +324,7 @@ void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_li // // The handler runs in-core. Invoke it right now. // - sub->on_message(sub->on_message_context, msg, mask_bit, cost, identity); + sub->on_message(sub->on_message_context, msg, mask_bit, cost, identity, link ? link->conn->policy_spec : 0); } else { // // The handler runs in an IO thread. Defer its invocation. @@ -336,6 +336,7 @@ void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_li work->maskbit = mask_bit; work->inter_router_cost = cost; work->in_conn_id = identity; + work->policy_spec = link ? link->conn->policy_spec : 0; qdr_post_general_work_CT(core, work); } } diff --git a/src/router_core/management_agent.c b/src/router_core/management_agent.c index 9f729e9..564991c 100644 --- a/src/router_core/management_agent.c +++ b/src/router_core/management_agent.c @@ -499,7 +499,7 @@ static bool qd_can_handle_request(qd_parsed_field_t *properties_fld, * */ void qdr_management_agent_on_message(void *context, qd_message_t *msg, int unused_link_id, int unused_cost, - uint64_t in_conn_id) + uint64_t in_conn_id, const qd_policy_spec_t *policy_spec) { qdr_core_t *core = (qdr_core_t*) context; qd_iterator_t *app_properties_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES); diff --git a/src/router_core/modules/mobile_sync/mobile.c b/src/router_core/modules/mobile_sync/mobile.c index 948bc38..daf947c 100644 --- a/src/router_core/modules/mobile_sync/mobile.c +++ b/src/router_core/modules/mobile_sync/mobile.c @@ -694,11 +694,12 @@ static void qcm_mobile_sync_on_mau_CT(qdrm_mobile_sync_t *msync, qd_parsed_field } -static void qcm_mobile_sync_on_message_CT(void *context, - qd_message_t *msg, - int unused_link_maskbit, - int unused_inter_router_cost, - uint64_t unused_conn_id) +static void qcm_mobile_sync_on_message_CT(void *context, + qd_message_t *msg, + int unused_link_maskbit, + int unused_inter_router_cost, + uint64_t unused_conn_id, + const qd_policy_spec_t *unused_policy_spec) { qdrm_mobile_sync_t *msync = (qdrm_mobile_sync_t*) context; qd_iterator_t *ap_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES); diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 14da0b8..e877bbd 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -239,6 +239,7 @@ struct qdr_general_work_t { void *on_message_context; uint64_t in_conn_id; uint64_t mobile_seq; + const qd_policy_spec_t *policy_spec; qdr_delivery_cleanup_list_t delivery_cleanup_list; qdr_global_stats_handler_t stats_handler; void *context; @@ -671,8 +672,6 @@ struct qdr_connection_t { qdr_conn_identifier_t *alt_conn_id; bool strip_annotations_in; bool strip_annotations_out; - bool policy_allow_dynamic_link_routes; - bool policy_allow_admin_status_update; int link_capacity; int mask_bit; ///< set only if inter-router connection qdr_connection_work_list_t work_list; @@ -693,6 +692,7 @@ struct qdr_connection_t { bool enable_protocol_trace; // Has trace level logging been turned on for this connection. bool has_streaming_links; ///< one or more of this connection's links are for streaming messages qdr_link_list_t streaming_link_pool; ///< pool of links available for streaming messages + const qd_policy_spec_t *policy_spec; }; DEQ_DECLARE(qdr_connection_t, qdr_connection_list_t); @@ -942,7 +942,7 @@ ALLOC_DECLARE(qdr_terminus_t); void *router_core_thread(void *arg); uint64_t qdr_identifier(qdr_core_t* core); -void qdr_management_agent_on_message(void *context, qd_message_t *msg, int link_id, int cost, uint64_t in_conn_id); +void qdr_management_agent_on_message(void *context, qd_message_t *msg, int link_id, int cost, uint64_t in_conn_id, const qd_policy_spec_t *policy_spec); void qdr_route_table_setup_CT(qdr_core_t *core); qdr_agent_t *qdr_agent(qdr_core_t *core); void qdr_agent_setup_subscriptions(qdr_agent_t *agent, qdr_core_t *core); diff --git a/src/router_node.c b/src/router_node.c index a61fe87..f1a2496 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -508,7 +508,7 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link) int tenant_space_len; const char *tenant_space = qdr_connection_get_tenant_space(qdr_conn, &tenant_space_len); if (conn->policy_settings) - check_user = !conn->policy_settings->allowUserIdProxy; + check_user = !conn->policy_settings->spec.allowUserIdProxy; // // Validate the content of the delivery as an AMQP message. This is done partially, only @@ -1225,10 +1225,9 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool pn_connection_remote_container(pn_conn), conn->strip_annotations_in, conn->strip_annotations_out, - conn->policy_settings ? conn->policy_settings->allowDynamicLinkRoutes : true, - conn->policy_settings ? conn->policy_settings->allowAdminStatusUpdate : true, link_capacity, vhost, + !!conn->policy_settings ? &conn->policy_settings->spec : 0, connection_info, bind_connection_context, conn); diff --git a/src/server.c b/src/server.c index 4c8cb00..58d06a6 100644 --- a/src/server.c +++ b/src/server.c @@ -1782,5 +1782,5 @@ sys_mutex_t *qd_server_get_activation_lock(qd_server_t * server) } uint64_t qd_connection_max_message_size(const qd_connection_t *c) { - return (c && c->policy_settings) ? c->policy_settings->maxMessageSize : 0; + return (c && c->policy_settings) ? c->policy_settings->spec.maxMessageSize : 0; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org