Repository: qpid-dispatch Updated Branches: refs/heads/crolke-DISPATCH-188-1 8b3f0c761 -> 25af9cd7a
Allow config file settings sources and targets to be type string or list. Refactor c-to-python routines. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/25af9cd7 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/25af9cd7 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/25af9cd7 Branch: refs/heads/crolke-DISPATCH-188-1 Commit: 25af9cd7a60aeaeb26da80f9f94fee8fbac80d46 Parents: 8b3f0c7 Author: Chuck Rolke <[email protected]> Authored: Fri Mar 18 13:44:14 2016 -0400 Committer: Chuck Rolke <[email protected]> Committed: Fri Mar 18 13:44:14 2016 -0400 ---------------------------------------------------------------------- .../policy/policy_local.py | 36 ++-- src/policy.c | 179 +++++++++++-------- src/policy.h | 19 +- tests/router_policy_test.py | 6 +- 4 files changed, 152 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/25af9cd7/python/qpid_dispatch_internal/policy/policy_local.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/policy/policy_local.py b/python/qpid_dispatch_internal/policy/policy_local.py index c000d6d..ea965c7 100644 --- a/python/qpid_dispatch_internal/policy/policy_local.py +++ b/python/qpid_dispatch_internal/policy/policy_local.py @@ -208,8 +208,8 @@ class PolicyCompiler(object): policy_out[PolicyKeys.KW_MAX_RECEIVERS] = 10 policy_out[PolicyKeys.KW_ALLOW_DYNAMIC_SRC] = False policy_out[PolicyKeys.KW_ALLOW_ANONYMOUS_SENDER] = False - policy_out[PolicyKeys.KW_SOURCES] = [] - policy_out[PolicyKeys.KW_TARGETS] = [] + policy_out[PolicyKeys.KW_SOURCES] = '' + policy_out[PolicyKeys.KW_TARGETS] = '' cerror = [] for key, val in policy_in.iteritems(): @@ -239,10 +239,23 @@ class PolicyCompiler(object): elif key in [PolicyKeys.KW_SOURCES, PolicyKeys.KW_TARGETS ]: - val = [x.strip(' ') for x in val.split(PolicyKeys.KC_CONFIG_LIST_SEP)] + # accept a string or list + if type(val) is str: + # 'abc, def, mytarget' + val = [x.strip(' ') for x in val.split(PolicyKeys.KC_CONFIG_LIST_SEP)] + elif type(val) is list: + # ['abc', 'def', 'mytarget'] + pass + elif type(val) is unicode: + # u'abc, def, mytarget' + val = [x.strip(' ') for x in str(val).split(PolicyKeys.KC_CONFIG_LIST_SEP)] + else: + errors.append("Application '%s' user group '%s' option '%s' has illegal value '%s'. Type must be 'str' or 'list' but is '%s;" % + (appname, usergroup, key, val, type(val))) # deduplicate address lists val = list(set(val)) - policy_out[key] = val + # output result is CSV string with no white space between values: 'abc,def,mytarget' + policy_out[key] = ','.join(val) return True @@ -640,7 +653,6 @@ class PolicyLocal(object): upolicy.update(ruleset[PolicyKeys.KW_SETTINGS][name]) upolicy[PolicyKeys.KW_CSTATS] = self.statsdb[appname].get_cstats() - return True except Exception, e: #print str(e) @@ -653,11 +665,15 @@ class PolicyLocal(object): @param conn_id: @return: """ - facts = self._connections[conn_id] - stats = self.statsdb[facts.app] - stats.disconnect(facts.conn_name, facts.user, facts.host) - self._connections.remove(conn_id) - + try: + facts = self._connections[conn_id] + stats = self.statsdb[facts.app] + stats.disconnect(facts.conn_name, facts.user, facts.host) + del self._connections[conn_id] + except Exception, e: + #print str(e) + self._manager.log_trace( + "Policy internal error closing connection id %s. %s" % (conn_id, str(e))) def test_load_config(self): ruleset_str = '["policyAccessRuleset", {"applicationName": "photoserver","maxConnections": 50,"maxConnPerUser": 5,"maxConnPerHost": 20,"userGroups": {"anonymous": "anonymous","users": "u1, u2","paidsubscribers": "p1, p2","test": "zeke, ynot","admin": "alice, bob","superuser": "ellen"},"ingressHostGroups": {"Ten18": "10.18.0.0-10.18.255.255","EllensWS": "72.135.2.9","TheLabs": "10.48.0.0-10.48.255.255, 192.168.100.0-192.168.100.255","localhost": "127.0.0.1, ::1","TheWorld": "*"},"ingressPolicies": {"anonymous": "TheWorld","users": "TheWorld","paidsubscribers": "TheWorld","test": "TheLabs","admin": "Ten18, TheLabs, localhost","superuser": "EllensWS, localhost"},"connectionAllowDefault": true,' http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/25af9cd7/src/policy.c ---------------------------------------------------------------------- diff --git a/src/policy.c b/src/policy.c index 1b55bf5..075e89b 100644 --- a/src/policy.c +++ b/src/policy.c @@ -41,6 +41,15 @@ #include "qpid/dispatch/log.h" +/** + * Private Function Prototypes + */ +void qd_policy_private_deny_amqp_connection(pn_connection_t *conn, const char *cond_name, const char *cond_descr); +void qd_policy_deny_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn); +void _qd_policy_deny_amqp_link(pn_link_t *link, qd_connection_t *qd_conn, char * s_or_r); +void _qd_policy_deny_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_conn); +void _qd_policy_deny_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *qd_conn); + // // TODO: when policy dev is more complete lower the log level // @@ -241,23 +250,29 @@ void qd_policy_socket_close(void *context, const qd_connection_t *conn) // HACK ALERT: TODO: This should be deferred to a Python thread qd_python_lock_state_t lock_state = qd_python_lock(); PyObject *module = PyImport_ImportModule("qpid_dispatch_internal.policy.policy_manager"); - PyObject *close_connection = module ? PyObject_GetAttrString(module, "policy_close_connection") : NULL; - Py_XDECREF(module); - PyObject *result = close_connection ? PyObject_CallFunction(close_connection, "(OK)", - (PyObject *)policy->py_policy_manager, - conn->connection_id) : NULL; - Py_XDECREF(close_connection); - if (!result) { - qd_python_unlock(lock_state); - return; + if (module) { + PyObject *close_connection = PyObject_GetAttrString(module, "policy_close_connection"); + if (close_connection) { + PyObject *result = PyObject_CallFunction(close_connection, "(OK)", + (PyObject *)policy->py_policy_manager, + conn->connection_id); + if (result) { + Py_XDECREF(result); + } else { + qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection close failed: result"); + } + Py_XDECREF(close_connection); + } else { + qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection close failed: close_connection"); + } + Py_XDECREF(module); + } else { + qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection close failed: module"); } - Py_XDECREF(result); - qd_python_unlock(lock_state); - } - const char *hostname = qdpn_connector_name(conn->pn_cxtr); if (policy->max_connection_limit > 0) { + const char *hostname = qdpn_connector_name(conn->pn_cxtr); qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection '%s' closed. N connections=%d", hostname, n_connections); } } @@ -295,65 +310,83 @@ bool qd_policy_open_lookup_user( qd_policy_settings_t *settings) { // Lookup the user/host/app for allow/deny and to get settings name + bool res = false; qd_python_lock_state_t lock_state = qd_python_lock(); PyObject *module = PyImport_ImportModule("qpid_dispatch_internal.policy.policy_manager"); - PyObject *lookup_user = module ? PyObject_GetAttrString(module, "policy_lookup_user") : NULL; - PyObject *result = lookup_user ? PyObject_CallFunction(lookup_user, "(OssssK)", - (PyObject *)policy->py_policy_manager, - username, hostip, app, conn_name, conn_id) : NULL; - Py_XDECREF(lookup_user); - if (!result) { - Py_XDECREF(module); + if (module) { + PyObject *lookup_user = PyObject_GetAttrString(module, "policy_lookup_user"); + if (lookup_user) { + PyObject *result = PyObject_CallFunction(lookup_user, "(OssssK)", + (PyObject *)policy->py_policy_manager, + username, hostip, app, conn_name, conn_id); + if (result) { + const char *res_string = PyString_AsString(result); + strncpy(name_buf, res_string, name_buf_size); + Py_XDECREF(result); + res = true; // settings name returned + } else { + qd_log(policy->log_source, POLICY_LOG_LEVEL, "Internal: lookup_user: result"); + } + Py_XDECREF(lookup_user); + } else { + qd_log(policy->log_source, POLICY_LOG_LEVEL, "Internal: lookup_user: lookup_user"); + } + } + if (!res) { + if (module) { + Py_XDECREF(module); + } qd_python_unlock(lock_state); - qd_log(policy->log_source, - POLICY_LOG_LEVEL, - "PyObject lookup_user is Null"); return false; } - const char *res_string = PyString_AsString(result); - strncpy(name_buf, res_string, name_buf_size); - Py_XDECREF(result); + // if (name_buf[0]) { - // Go get the settings + // Go get the named settings + res = false; PyObject *upolicy = PyDict_New(); - PyObject *lookup_settings = module ? PyObject_GetAttrString(module, "policy_lookup_settings") : NULL; - PyObject *result2 = lookup_settings ? PyObject_CallFunction(lookup_settings, "(OssO)", - (PyObject *)policy->py_policy_manager, - app, name_buf, upolicy) : NULL; - Py_XDECREF(lookup_settings); - if (!result2) { + if (upolicy) { + PyObject *lookup_settings = PyObject_GetAttrString(module, "policy_lookup_settings"); + if (lookup_settings) { + PyObject *result2 = PyObject_CallFunction(lookup_settings, "(OssO)", + (PyObject *)policy->py_policy_manager, + app, name_buf, upolicy); + if (result2) { + settings->maxFrameSize = qd_entity_opt_long((qd_entity_t*)upolicy, "maxFrameSize", 0); + settings->maxMessageSize = qd_entity_opt_long((qd_entity_t*)upolicy, "maxMessageSize", 0); + settings->maxSessionWindow = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessionWindow", 0); + settings->maxSessions = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessions", 0); + settings->maxSenders = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSenders", 0); + settings->maxReceivers = qd_entity_opt_long((qd_entity_t*)upolicy, "maxReceivers", 0); + settings->allowAnonymousSender = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAnonymousSender", false); + settings->allowDynamicSrc = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicSrc", false); + settings->sources = qd_entity_get_string((qd_entity_t*)upolicy, "sources"); + settings->targets = qd_entity_get_string((qd_entity_t*)upolicy, "targets"); + settings->denialCounts = (qd_policy_denial_counts_t*) + qd_entity_get_long((qd_entity_t*)upolicy, "denialCounts"); + Py_XDECREF(result2); + res = true; // named settings content returned + } else { + qd_log(policy->log_source, POLICY_LOG_LEVEL, "Internal: lookup_user: result2"); + } + Py_XDECREF(lookup_settings); + } else { + qd_log(policy->log_source, POLICY_LOG_LEVEL, "Internal: lookup_user: lookup_settings"); + } Py_XDECREF(upolicy); - qd_python_unlock(lock_state); - qd_log(policy->log_source, - POLICY_LOG_LEVEL, - "PyObject lookup_settings is Null"); - return false; + } else { + qd_log(policy->log_source, POLICY_LOG_LEVEL, "Internal: lookup_user: upolicy"); } - Py_XDECREF(result2); - settings->maxFrameSize = qd_entity_opt_long((qd_entity_t*)upolicy, "maxFrameSize", 0); - settings->maxMessageSize = qd_entity_opt_long((qd_entity_t*)upolicy, "maxMessageSize", 0); - settings->maxSessionWindow = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessionWindow", 0); - settings->maxSessions = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessions", 0); - settings->maxSenders = qd_entity_opt_long((qd_entity_t*)upolicy, "maxSenders", 0); - settings->maxReceivers = qd_entity_opt_long((qd_entity_t*)upolicy, "maxReceivers", 0); - settings->allowAnonymousSender = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAnonymousSender", false); - settings->allowDynamicSrc = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicSrc", false); - settings->sources = qd_entity_get_string((qd_entity_t*)upolicy, "sources"); - settings->targets = qd_entity_get_string((qd_entity_t*)upolicy, "targets"); - settings->denialCounts = (qd_policy_denial_counts_t*) - qd_entity_get_long((qd_entity_t*)upolicy, "denialCounts"); - Py_XDECREF(upolicy); } Py_XDECREF(module); qd_python_unlock(lock_state); qd_log(policy->log_source, POLICY_LOG_LEVEL, - "Policy AMQP Open lookup_user: %s, hostip: %s, app: %s, connection: %s. Usergroup: '%s'", - username, hostip, app, conn_name, name_buf); + "Policy AMQP Open lookup_user: %s, hostip: %s, app: %s, connection: %s. Usergroup: '%s'%s", + username, hostip, app, conn_name, name_buf, (res ? "" : " Internal error.")); - return true; + return res; } /** Set the error condition and close the connection. @@ -372,8 +405,11 @@ void qd_policy_private_deny_amqp_connection(pn_connection_t *conn, const char *c } -// -// +/** Internal function to deny an amqp session + * The session is closed with a condition and the denial is counted. + * @param[in,out] ssn proton session + * @param[in,out] qd_conn dispatch connection + */ void qd_policy_deny_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn) { pn_condition_t * cond = pn_session_condition(ssn); @@ -401,15 +437,15 @@ void qd_policy_deny_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn) // bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn) { - if (qd_conn->policy_settings) { - if (qd_conn->policy_settings->maxSessions) { - if (qd_conn->n_sessions == qd_conn->policy_settings->maxSessions) { - qd_policy_deny_amqp_session(ssn, qd_conn); - return false; - } + if (qd_conn->policy_settings) { + if (qd_conn->policy_settings->maxSessions) { + if (qd_conn->n_sessions == qd_conn->policy_settings->maxSessions) { + qd_policy_deny_amqp_session(ssn, qd_conn); + return false; + } + } } - } - return true; + return true; } @@ -417,11 +453,11 @@ bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn) // void qd_policy_apply_session_settings(pn_session_t *ssn, qd_connection_t *qd_conn) { - if (qd_conn->policy_settings && qd_conn->policy_settings->maxSessionWindow) { - pn_session_set_incoming_capacity(ssn, qd_conn->policy_settings->maxSessionWindow); - } else { - pn_session_set_incoming_capacity(ssn, 1000000); - } + if (qd_conn->policy_settings && qd_conn->policy_settings->maxSessionWindow) { + pn_session_set_incoming_capacity(ssn, qd_conn->policy_settings->maxSessionWindow); + } else { + pn_session_set_incoming_capacity(ssn, 1000000); + } } // @@ -464,6 +500,9 @@ void _qd_policy_deny_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *qd_ qd_conn->policy_settings->denialCounts->receiverDenied++; } + +// +// bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_conn) { if (qd_conn->policy_settings->maxSenders) { http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/25af9cd7/src/policy.h ---------------------------------------------------------------------- diff --git a/src/policy.h b/src/policy.h index 88b0f48..d4fc2b2 100644 --- a/src/policy.h +++ b/src/policy.h @@ -32,6 +32,7 @@ typedef struct qd_policy_denial_counts_s qd_policy_denial_counts_t; +// TODO: Provide locking struct qd_policy_denial_counts_s { int sessionDenied; int senderDenied; @@ -76,9 +77,19 @@ qd_error_t qd_entity_configure_policy(qd_policy_t *policy, qd_entity_t *entity); qd_error_t qd_register_policy_manager(qd_policy_t *policy, void *policy_manager); +/** Allocate counts statistics block. + * Called from Python + */ long qd_policy_c_counts_alloc(); + +/** Free counts statistics block. + * Called from Python + */ void qd_policy_c_counts_free(long ccounts); +/** Refresh a counts statistics block + * Called from Python + */ qd_error_t qd_policy_c_counts_refresh(long ccounts, qd_entity_t*entity); @@ -107,7 +118,7 @@ void qd_policy_socket_close(void *context, const qd_connection_t *conn); /** Approve a new session based on connection's policy. * Sessions denied are closed and counted. * - * @param[in] ssn proton session being closed + * @param[in] ssn proton session being approved * @param[in] qd_conn dispatch connection with policy settings and counts **/ bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn); @@ -115,7 +126,7 @@ bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn) /** Apply policy or default settings for a new session. * - * @param[in] ssn proton session being closed + * @param[in] ssn proton session being set * @param[in] qd_conn dispatch connection with policy settings and counts **/ void qd_policy_apply_session_settings(pn_session_t *ssn, qd_connection_t *qd_conn); @@ -124,7 +135,7 @@ void qd_policy_apply_session_settings(pn_session_t *ssn, qd_connection_t *qd_con /** Approve a new sender link based on connection's policy. * Links denied are closed and counted. * - * @param[in] pn_link proton link being closed + * @param[in] pn_link proton link being approved * @param[in] qd_conn dispatch connection with policy settings and counts **/ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_conn); @@ -133,7 +144,7 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_ /** Approve a new receiver link based on connection's policy. * Links denied are closed and counted. * - * @param[in] pn_link proton link being closed + * @param[in] pn_link proton link being approved * @param[in] qd_conn dispatch connection with policy settings and counts **/ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *qd_conn); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/25af9cd7/tests/router_policy_test.py ---------------------------------------------------------------------- diff --git a/tests/router_policy_test.py b/tests/router_policy_test.py index 78a91a6..99165c9 100644 --- a/tests/router_policy_test.py +++ b/tests/router_policy_test.py @@ -168,10 +168,8 @@ class PolicyFile(TestCase): self.assertTrue(upolicy['maxReceivers'] == 44) self.assertTrue(upolicy['allowAnonymousSender']) self.assertTrue(upolicy['allowDynamicSrc']) - self.assertTrue(len(upolicy['targets']) == 1) - self.assertTrue('private' in upolicy['targets']) - self.assertTrue(len(upolicy['sources']) == 1) - self.assertTrue('private' in upolicy['sources']) + self.assertTrue(upolicy['targets'] == 'private') + self.assertTrue(upolicy['sources'] == 'private') def test_policy1_test_zeke_bad_IP(self): self.assertTrue( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
