Repository: qpid-dispatch
Updated Branches:
  refs/heads/crolke-DISPATCH-188-1 8b3f0c761 -> 25af9cd7a


Allow config file settings sources and targets to be type string or list.
Refactor c-to-python routines.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/25af9cd7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/25af9cd7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/25af9cd7

Branch: refs/heads/crolke-DISPATCH-188-1
Commit: 25af9cd7a60aeaeb26da80f9f94fee8fbac80d46
Parents: 8b3f0c7
Author: Chuck Rolke <[email protected]>
Authored: Fri Mar 18 13:44:14 2016 -0400
Committer: Chuck Rolke <[email protected]>
Committed: Fri Mar 18 13:44:14 2016 -0400

----------------------------------------------------------------------
 .../policy/policy_local.py                      |  36 ++--
 src/policy.c                                    | 179 +++++++++++--------
 src/policy.h                                    |  19 +-
 tests/router_policy_test.py                     |   6 +-
 4 files changed, 152 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/25af9cd7/python/qpid_dispatch_internal/policy/policy_local.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/policy/policy_local.py 
b/python/qpid_dispatch_internal/policy/policy_local.py
index c000d6d..ea965c7 100644
--- a/python/qpid_dispatch_internal/policy/policy_local.py
+++ b/python/qpid_dispatch_internal/policy/policy_local.py
@@ -208,8 +208,8 @@ class PolicyCompiler(object):
         policy_out[PolicyKeys.KW_MAX_RECEIVERS] = 10
         policy_out[PolicyKeys.KW_ALLOW_DYNAMIC_SRC] = False
         policy_out[PolicyKeys.KW_ALLOW_ANONYMOUS_SENDER] = False
-        policy_out[PolicyKeys.KW_SOURCES] = []
-        policy_out[PolicyKeys.KW_TARGETS] = []
+        policy_out[PolicyKeys.KW_SOURCES] = ''
+        policy_out[PolicyKeys.KW_TARGETS] = ''
 
         cerror = []
         for key, val in policy_in.iteritems():
@@ -239,10 +239,23 @@ class PolicyCompiler(object):
             elif key in [PolicyKeys.KW_SOURCES,
                          PolicyKeys.KW_TARGETS
                          ]:
-                val = [x.strip(' ') for x in 
val.split(PolicyKeys.KC_CONFIG_LIST_SEP)]
+                # accept a string or list
+                if type(val) is str:
+                    # 'abc, def, mytarget'
+                    val = [x.strip(' ') for x in 
val.split(PolicyKeys.KC_CONFIG_LIST_SEP)]
+                elif type(val) is list:
+                    # ['abc', 'def', 'mytarget']
+                    pass
+                elif type(val) is unicode:
+                    # u'abc, def, mytarget'
+                    val = [x.strip(' ') for x in 
str(val).split(PolicyKeys.KC_CONFIG_LIST_SEP)]
+                else:
+                    errors.append("Application '%s' user group '%s' option 
'%s' has illegal value '%s'. Type must be 'str' or 'list' but is '%s;" %
+                                  (appname, usergroup, key, val, type(val)))
                 # deduplicate address lists
                 val = list(set(val))
-                policy_out[key] = val
+                # output result is CSV string with no white space between 
values: 'abc,def,mytarget'
+                policy_out[key] = ','.join(val)
         return True
 
 
@@ -640,7 +653,6 @@ class PolicyLocal(object):
 
             upolicy.update(ruleset[PolicyKeys.KW_SETTINGS][name])
             upolicy[PolicyKeys.KW_CSTATS] = self.statsdb[appname].get_cstats()
-
             return True
         except Exception, e:
             #print str(e)
@@ -653,11 +665,15 @@ class PolicyLocal(object):
         @param conn_id:
         @return:
         """
-        facts = self._connections[conn_id]
-        stats = self.statsdb[facts.app]
-        stats.disconnect(facts.conn_name, facts.user, facts.host)
-        self._connections.remove(conn_id)
-
+        try:
+            facts = self._connections[conn_id]
+            stats = self.statsdb[facts.app]
+            stats.disconnect(facts.conn_name, facts.user, facts.host)
+            del self._connections[conn_id]
+        except Exception, e:
+            #print str(e)
+            self._manager.log_trace(
+                "Policy internal error closing connection id %s. %s" % 
(conn_id, str(e)))
 
     def test_load_config(self):
         ruleset_str = '["policyAccessRuleset", {"applicationName": 
"photoserver","maxConnections": 50,"maxConnPerUser": 5,"maxConnPerHost": 
20,"userGroups": {"anonymous":       "anonymous","users":           "u1, 
u2","paidsubscribers": "p1, p2","test":            "zeke, ynot","admin":        
   "alice, bob","superuser":       "ellen"},"ingressHostGroups": {"Ten18":     
"10.18.0.0-10.18.255.255","EllensWS":  "72.135.2.9","TheLabs":   
"10.48.0.0-10.48.255.255, 192.168.100.0-192.168.100.255","localhost": 
"127.0.0.1, ::1","TheWorld":  "*"},"ingressPolicies": {"anonymous":       
"TheWorld","users":           "TheWorld","paidsubscribers": "TheWorld","test":  
          "TheLabs","admin":           "Ten18, TheLabs, localhost","superuser": 
      "EllensWS, localhost"},"connectionAllowDefault": true,'

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/25af9cd7/src/policy.c
----------------------------------------------------------------------
diff --git a/src/policy.c b/src/policy.c
index 1b55bf5..075e89b 100644
--- a/src/policy.c
+++ b/src/policy.c
@@ -41,6 +41,15 @@
 #include "qpid/dispatch/log.h"
 
 
+/**
+ * Private Function Prototypes
+ */
+void qd_policy_private_deny_amqp_connection(pn_connection_t *conn, const char 
*cond_name, const char *cond_descr);
+void qd_policy_deny_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn);
+void _qd_policy_deny_amqp_link(pn_link_t *link, qd_connection_t *qd_conn, char 
* s_or_r);
+void _qd_policy_deny_amqp_sender_link(pn_link_t *pn_link, qd_connection_t 
*qd_conn);
+void _qd_policy_deny_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t 
*qd_conn);
+
 //
 // TODO: when policy dev is more complete lower the log level
 //
@@ -241,23 +250,29 @@ void qd_policy_socket_close(void *context, const 
qd_connection_t *conn)
         // HACK ALERT: TODO: This should be deferred to a Python thread
         qd_python_lock_state_t lock_state = qd_python_lock();
         PyObject *module = 
PyImport_ImportModule("qpid_dispatch_internal.policy.policy_manager");
-        PyObject *close_connection = module ? PyObject_GetAttrString(module, 
"policy_close_connection") : NULL;
-        Py_XDECREF(module);
-        PyObject *result = close_connection ? 
PyObject_CallFunction(close_connection, "(OK)", 
-                                                               (PyObject 
*)policy->py_policy_manager, 
-                                                               
conn->connection_id) : NULL;
-        Py_XDECREF(close_connection);
-        if (!result) {
-            qd_python_unlock(lock_state);
-            return;
+        if (module) {
+            PyObject *close_connection = PyObject_GetAttrString(module, 
"policy_close_connection");
+            if (close_connection) {
+                PyObject *result = PyObject_CallFunction(close_connection, 
"(OK)",
+                                                         (PyObject 
*)policy->py_policy_manager,
+                                                          conn->connection_id);
+                if (result) {
+                    Py_XDECREF(result);
+                } else {
+                    qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection 
close failed: result");
+                }
+                Py_XDECREF(close_connection);
+            } else {
+                qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection close 
failed: close_connection");
+            }
+            Py_XDECREF(module);
+        } else {
+            qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection close 
failed: module");
         }
-        Py_XDECREF(result);
-
         qd_python_unlock(lock_state);
-
     }
-    const char *hostname = qdpn_connector_name(conn->pn_cxtr);
     if (policy->max_connection_limit > 0) {
+        const char *hostname = qdpn_connector_name(conn->pn_cxtr);
         qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection '%s' closed. 
N connections=%d", hostname, n_connections);
     }
 }
@@ -295,65 +310,83 @@ bool qd_policy_open_lookup_user(
     qd_policy_settings_t *settings)
 {
     // Lookup the user/host/app for allow/deny and to get settings name
+    bool res = false;
     qd_python_lock_state_t lock_state = qd_python_lock();
     PyObject *module = 
PyImport_ImportModule("qpid_dispatch_internal.policy.policy_manager");
-    PyObject *lookup_user = module ? PyObject_GetAttrString(module, 
"policy_lookup_user") : NULL;
-    PyObject *result = lookup_user ? PyObject_CallFunction(lookup_user, 
"(OssssK)", 
-                                                           (PyObject 
*)policy->py_policy_manager, 
-                                                           username, hostip, 
app, conn_name, conn_id) : NULL;
-    Py_XDECREF(lookup_user);
-    if (!result) {
-        Py_XDECREF(module);
+    if (module) {
+        PyObject *lookup_user = PyObject_GetAttrString(module, 
"policy_lookup_user");
+        if (lookup_user) {
+            PyObject *result = PyObject_CallFunction(lookup_user, "(OssssK)",
+                                                     (PyObject 
*)policy->py_policy_manager,
+                                                     username, hostip, app, 
conn_name, conn_id);
+            if (result) {
+                const char *res_string = PyString_AsString(result);
+                strncpy(name_buf, res_string, name_buf_size);
+                Py_XDECREF(result);
+                res = true; // settings name returned
+            } else {
+                qd_log(policy->log_source, POLICY_LOG_LEVEL, "Internal: 
lookup_user: result");
+            }
+            Py_XDECREF(lookup_user);
+        } else {
+            qd_log(policy->log_source, POLICY_LOG_LEVEL, "Internal: 
lookup_user: lookup_user");
+        }
+    }
+    if (!res) {
+        if (module) {
+            Py_XDECREF(module);
+        }
         qd_python_unlock(lock_state);
-        qd_log(policy->log_source,
-               POLICY_LOG_LEVEL,
-               "PyObject lookup_user is Null");
         return false;
     }
-    const char *res_string = PyString_AsString(result);
-    strncpy(name_buf, res_string, name_buf_size);
-    Py_XDECREF(result);
 
+    // 
     if (name_buf[0]) {
-        // Go get the settings
+        // Go get the named settings
+        res = false;
         PyObject *upolicy = PyDict_New();
-        PyObject *lookup_settings = module ? PyObject_GetAttrString(module, 
"policy_lookup_settings") : NULL;
-        PyObject *result2 = lookup_settings ? 
PyObject_CallFunction(lookup_settings, "(OssO)", 
-                                                            (PyObject 
*)policy->py_policy_manager, 
-                                                            app, name_buf, 
upolicy) : NULL;
-        Py_XDECREF(lookup_settings);
-        if (!result2) {
+        if (upolicy) {
+            PyObject *lookup_settings = PyObject_GetAttrString(module, 
"policy_lookup_settings");
+            if (lookup_settings) {
+                PyObject *result2 = PyObject_CallFunction(lookup_settings, 
"(OssO)",
+                                                        (PyObject 
*)policy->py_policy_manager,
+                                                        app, name_buf, 
upolicy);
+                if (result2) {
+                    settings->maxFrameSize         = 
qd_entity_opt_long((qd_entity_t*)upolicy, "maxFrameSize", 0);
+                    settings->maxMessageSize       = 
qd_entity_opt_long((qd_entity_t*)upolicy, "maxMessageSize", 0);
+                    settings->maxSessionWindow     = 
qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessionWindow", 0);
+                    settings->maxSessions          = 
qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessions", 0);
+                    settings->maxSenders           = 
qd_entity_opt_long((qd_entity_t*)upolicy, "maxSenders", 0);
+                    settings->maxReceivers         = 
qd_entity_opt_long((qd_entity_t*)upolicy, "maxReceivers", 0);
+                    settings->allowAnonymousSender = 
qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAnonymousSender", false);
+                    settings->allowDynamicSrc      = 
qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicSrc", false);
+                    settings->sources              = 
qd_entity_get_string((qd_entity_t*)upolicy, "sources");
+                    settings->targets              = 
qd_entity_get_string((qd_entity_t*)upolicy, "targets");
+                    settings->denialCounts         = 
(qd_policy_denial_counts_t*)
+                                                    
qd_entity_get_long((qd_entity_t*)upolicy, "denialCounts");
+                    Py_XDECREF(result2);
+                    res = true; // named settings content returned
+                } else {
+                    qd_log(policy->log_source, POLICY_LOG_LEVEL, "Internal: 
lookup_user: result2");
+                }
+                Py_XDECREF(lookup_settings);
+            } else {
+                qd_log(policy->log_source, POLICY_LOG_LEVEL, "Internal: 
lookup_user: lookup_settings");
+            }
             Py_XDECREF(upolicy);
-            qd_python_unlock(lock_state);
-            qd_log(policy->log_source,
-                   POLICY_LOG_LEVEL,
-                   "PyObject lookup_settings is Null");
-            return false;
+        } else {
+            qd_log(policy->log_source, POLICY_LOG_LEVEL, "Internal: 
lookup_user: upolicy");
         }
-        Py_XDECREF(result2);
-        settings->maxFrameSize         = 
qd_entity_opt_long((qd_entity_t*)upolicy, "maxFrameSize", 0);
-        settings->maxMessageSize       = 
qd_entity_opt_long((qd_entity_t*)upolicy, "maxMessageSize", 0);
-        settings->maxSessionWindow     = 
qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessionWindow", 0);
-        settings->maxSessions          = 
qd_entity_opt_long((qd_entity_t*)upolicy, "maxSessions", 0);
-        settings->maxSenders           = 
qd_entity_opt_long((qd_entity_t*)upolicy, "maxSenders", 0);
-        settings->maxReceivers         = 
qd_entity_opt_long((qd_entity_t*)upolicy, "maxReceivers", 0);
-        settings->allowAnonymousSender = 
qd_entity_opt_bool((qd_entity_t*)upolicy, "allowAnonymousSender", false);
-        settings->allowDynamicSrc      = 
qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicSrc", false);
-        settings->sources              = 
qd_entity_get_string((qd_entity_t*)upolicy, "sources");
-        settings->targets              = 
qd_entity_get_string((qd_entity_t*)upolicy, "targets");
-        settings->denialCounts         = (qd_policy_denial_counts_t*)
-                                         
qd_entity_get_long((qd_entity_t*)upolicy, "denialCounts");
-        Py_XDECREF(upolicy);
     }
     Py_XDECREF(module);
     qd_python_unlock(lock_state);
 
     qd_log(policy->log_source, 
            POLICY_LOG_LEVEL, 
-           "Policy AMQP Open lookup_user: %s, hostip: %s, app: %s, connection: 
%s. Usergroup: '%s'", 
-           username, hostip, app, conn_name, name_buf);
+           "Policy AMQP Open lookup_user: %s, hostip: %s, app: %s, connection: 
%s. Usergroup: '%s'%s",
+           username, hostip, app, conn_name, name_buf, (res ? "" : " Internal 
error."));
 
-    return true;
+    return res;
 }
 
 /** Set the error condition and close the connection.
@@ -372,8 +405,11 @@ void 
qd_policy_private_deny_amqp_connection(pn_connection_t *conn, const char *c
 }
 
 
-//
-//
+/** Internal function to deny an amqp session
+ * The session is closed with a condition and the denial is counted.
+ * @param[in,out] ssn proton session
+ * @param[in,out] qd_conn dispatch connection
+ */
 void qd_policy_deny_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn)
 {
     pn_condition_t * cond = pn_session_condition(ssn);
@@ -401,15 +437,15 @@ void qd_policy_deny_amqp_session(pn_session_t *ssn, 
qd_connection_t *qd_conn)
 //
 bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t 
*qd_conn)
 {
-  if (qd_conn->policy_settings) {
-    if (qd_conn->policy_settings->maxSessions) {
-      if (qd_conn->n_sessions == qd_conn->policy_settings->maxSessions) {
-       qd_policy_deny_amqp_session(ssn, qd_conn);
-       return false;
-      }
+    if (qd_conn->policy_settings) {
+        if (qd_conn->policy_settings->maxSessions) {
+            if (qd_conn->n_sessions == qd_conn->policy_settings->maxSessions) {
+                qd_policy_deny_amqp_session(ssn, qd_conn);
+                return false;
+            }
+        }
     }
-  }
-  return true;
+    return true;
 }
 
 
@@ -417,11 +453,11 @@ bool qd_policy_approve_amqp_session(pn_session_t *ssn, 
qd_connection_t *qd_conn)
 //
 void qd_policy_apply_session_settings(pn_session_t *ssn, qd_connection_t 
*qd_conn)
 {
-  if (qd_conn->policy_settings && qd_conn->policy_settings->maxSessionWindow) {
-    pn_session_set_incoming_capacity(ssn, 
qd_conn->policy_settings->maxSessionWindow);
-  } else {
-    pn_session_set_incoming_capacity(ssn, 1000000);
-  }
+    if (qd_conn->policy_settings && 
qd_conn->policy_settings->maxSessionWindow) {
+        pn_session_set_incoming_capacity(ssn, 
qd_conn->policy_settings->maxSessionWindow);
+    } else {
+        pn_session_set_incoming_capacity(ssn, 1000000);
+    }
 }
 
 //
@@ -464,6 +500,9 @@ void _qd_policy_deny_amqp_receiver_link(pn_link_t *pn_link, 
qd_connection_t *qd_
     qd_conn->policy_settings->denialCounts->receiverDenied++;
 }
 
+
+//
+//
 bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t 
*qd_conn)
 {
     if (qd_conn->policy_settings->maxSenders) {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/25af9cd7/src/policy.h
----------------------------------------------------------------------
diff --git a/src/policy.h b/src/policy.h
index 88b0f48..d4fc2b2 100644
--- a/src/policy.h
+++ b/src/policy.h
@@ -32,6 +32,7 @@
 
 typedef struct qd_policy_denial_counts_s qd_policy_denial_counts_t;
 
+// TODO: Provide locking
 struct qd_policy_denial_counts_s {
     int sessionDenied;
     int senderDenied;
@@ -76,9 +77,19 @@ qd_error_t qd_entity_configure_policy(qd_policy_t *policy, 
qd_entity_t *entity);
 qd_error_t qd_register_policy_manager(qd_policy_t *policy, void 
*policy_manager);
 
 
+/** Allocate counts statistics block.
+ * Called from Python
+ */
 long qd_policy_c_counts_alloc();
+
+/** Free counts statistics block.
+ * Called from Python
+ */
 void qd_policy_c_counts_free(long ccounts);
 
+/** Refresh a counts statistics block
+ * Called from Python
+ */
 qd_error_t qd_policy_c_counts_refresh(long ccounts, qd_entity_t*entity);
 
 
@@ -107,7 +118,7 @@ void qd_policy_socket_close(void *context, const 
qd_connection_t *conn);
 /** Approve a new session based on connection's policy.
  * Sessions denied are closed and counted.
  *
- * @param[in] ssn proton session being closed
+ * @param[in] ssn proton session being approved
  * @param[in] qd_conn dispatch connection with policy settings and counts
  **/
 bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t 
*qd_conn);
@@ -115,7 +126,7 @@ bool qd_policy_approve_amqp_session(pn_session_t *ssn, 
qd_connection_t *qd_conn)
 
 /** Apply policy or default settings for a new session.
  *
- * @param[in] ssn proton session being closed
+ * @param[in] ssn proton session being set
  * @param[in] qd_conn dispatch connection with policy settings and counts
  **/
 void qd_policy_apply_session_settings(pn_session_t *ssn, qd_connection_t 
*qd_conn);
@@ -124,7 +135,7 @@ void qd_policy_apply_session_settings(pn_session_t *ssn, 
qd_connection_t *qd_con
 /** Approve a new sender link based on connection's policy.
  * Links denied are closed and counted.
  *
- * @param[in] pn_link proton link being closed
+ * @param[in] pn_link proton link being approved
  * @param[in] qd_conn dispatch connection with policy settings and counts
  **/
 bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t 
*qd_conn);
@@ -133,7 +144,7 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, 
qd_connection_t *qd_
 /** Approve a new receiver link based on connection's policy.
  * Links denied are closed and counted.
  *
- * @param[in] pn_link proton link being closed
+ * @param[in] pn_link proton link being approved
  * @param[in] qd_conn dispatch connection with policy settings and counts
  **/
 bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t 
*qd_conn);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/25af9cd7/tests/router_policy_test.py
----------------------------------------------------------------------
diff --git a/tests/router_policy_test.py b/tests/router_policy_test.py
index 78a91a6..99165c9 100644
--- a/tests/router_policy_test.py
+++ b/tests/router_policy_test.py
@@ -168,10 +168,8 @@ class PolicyFile(TestCase):
         self.assertTrue(upolicy['maxReceivers']             == 44)
         self.assertTrue(upolicy['allowAnonymousSender'])
         self.assertTrue(upolicy['allowDynamicSrc'])
-        self.assertTrue(len(upolicy['targets']) == 1)
-        self.assertTrue('private' in upolicy['targets'])
-        self.assertTrue(len(upolicy['sources']) == 1)
-        self.assertTrue('private' in upolicy['sources'])
+        self.assertTrue(upolicy['targets'] == 'private')
+        self.assertTrue(upolicy['sources'] == 'private')
 
     def test_policy1_test_zeke_bad_IP(self):
         self.assertTrue(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to