Repository: qpid-dispatch
Updated Branches:
  refs/heads/master e95eb3406 -> a6134950e


DISPATCH-1194 - Fixed credit propagation for routed links set up asynchronously.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/a6134950
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/a6134950
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/a6134950

Branch: refs/heads/master
Commit: a6134950e8f45e23408fec8a09c3bd0d8d03d568
Parents: e95eb34
Author: Ted Ross <tr...@redhat.com>
Authored: Tue Dec 11 13:59:16 2018 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Dec 11 13:59:55 2018 -0500

----------------------------------------------------------------------
 src/router_core/connections.c           |   7 +
 src/router_core/forwarder.c             |   4 +-
 src/router_core/router_core_private.h   |   2 +
 src/router_core/transfer.c              |   7 +
 tests/system_tests_link_route_credit.py | 276 +++++++++++++++++++++++++++
 5 files changed, 295 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a6134950/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 642fdd5..a810a1e 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -882,6 +882,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t       *core,
     link->oper_status    = QDR_LINK_OPER_DOWN;
     link->insert_prefix = 0;
     link->strip_prefix = 0;
+    link->attach_count = 1;
 
     link->strip_annotations_in  = conn->strip_annotations_in;
     link->strip_annotations_out = conn->strip_annotations_out;
@@ -1304,6 +1305,11 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t 
*core, qdr_action_t *act
     qdr_terminus_t    *target = action->args.connection.target;
 
     //
+    // Start the attach count.
+    //
+    link->attach_count = 1;
+
+    //
     // Put the link into the proper lists for tracking.
     //
     DEQ_INSERT_TAIL(core->open_links, link);
@@ -1422,6 +1428,7 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t 
*core, qdr_action_t *ac
     qdr_terminus_t   *target = action->args.connection.target;
 
     link->oper_status = QDR_LINK_OPER_UP;
+    link->attach_count++;
 
     //
     // Mark the link as an edge link if it's inside an edge connection.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a6134950/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index b4478c9..9d2437c 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -911,6 +911,7 @@ void qdr_forward_link_direct_CT(qdr_core_t       *core,
     out_link->link_type      = QD_LINK_ENDPOINT;
     out_link->link_direction = qdr_link_direction(in_link) == QD_OUTGOING ? 
QD_INCOMING : QD_OUTGOING;
     out_link->admin_enabled  = true;
+    out_link->attach_count   = 1;
 
     if (strip) {
         out_link->strip_prefix = strip;
@@ -939,7 +940,8 @@ void qdr_forward_link_direct_CT(qdr_core_t       *core,
     qdr_connection_enqueue_work_CT(core, conn, work);
 
     if (qdr_link_direction(in_link) == QD_OUTGOING && in_link->credit_to_core 
> 0) {
-        qdr_link_issue_credit_CT(core, out_link, in_link->credit_to_core, 
false);
+        qdr_link_issue_credit_CT(core, out_link, in_link->credit_stored, 
in_link->drain_mode);
+        in_link->credit_stored = 0;
     }
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a6134950/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index 7177471..63fd5df 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -419,6 +419,7 @@ struct qdr_link_t {
     char                    *name;
     char                    *disambiguated_name;
     char                    *terminus_addr;
+    int                      attach_count;       ///< 1 or 2 depending on the 
state of the lifecycle
     int                      detach_count;       ///< 0, 1, or 2 depending on 
the state of the lifecycle
     qdr_address_t           *owning_addr;        ///< [ref] Address record 
that owns this link
     qdr_link_t              *connected_link;     ///< [ref] If this is a 
link-route, reference the connected link
@@ -433,6 +434,7 @@ struct qdr_link_t {
     int                      capacity;
     int                      credit_to_core; ///< Number of the available 
credits incrementally given to the core
     int                      credit_pending; ///< Number of credits to be 
issued once consumers are available
+    int                      credit_stored;  ///< Number of credits given to 
the link before it was ready to process them.
     bool                     admin_enabled;
     bool                     strip_annotations_in;
     bool                     strip_annotations_out;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a6134950/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 1cce8de..c4b90c9 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -781,6 +781,13 @@ static void qdr_link_flow_CT(qdr_core_t *core, 
qdr_action_t *action, bool discar
             qdr_link_enqueue_work_CT(core, clink, work);
         }
     } else {
+        if (link->attach_count == 1)
+            //
+            // The link is half-open.  Store the pending credit to be dealt 
with once the link is
+            // progressed to the next step.
+            //
+            link->credit_stored += credit;
+
         //
         // Handle the replenishing of credit outbound
         //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a6134950/tests/system_tests_link_route_credit.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_route_credit.py 
b/tests/system_tests_link_route_credit.py
index e8c824e..8674519 100644
--- a/tests/system_tests_link_route_credit.py
+++ b/tests/system_tests_link_route_credit.py
@@ -151,6 +151,174 @@ class RouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_08_dest_sender_initial_credit_same_edge(self):
+        test = LRDestSenderFlowTest(self.routers[2].addresses[0],
+                                    self.routers[2].addresses[1],
+                                    self.routers[2].addresses[0],
+                                    'queue.08', 5)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_09_dest_sender_initial_credit_same_interior(self):
+        test = LRDestSenderFlowTest(self.routers[0].addresses[0],
+                                    self.routers[0].addresses[1],
+                                    self.routers[0].addresses[0],
+                                    'queue.09', 5)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_10_dest_sender_initial_credit_edge_edge(self):
+        test = LRDestSenderFlowTest(self.routers[2].addresses[0],
+                                    self.routers[3].addresses[1],
+                                    self.routers[0].addresses[0],
+                                    'queue.10', 5)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_11_dest_sender_initial_credit_interior_interior(self):
+        test = LRDestSenderFlowTest(self.routers[0].addresses[0],
+                                    self.routers[1].addresses[1],
+                                    self.routers[0].addresses[0],
+                                    'queue.11', 5)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_12_dest_sender_initial_credit_edge_interior(self):
+        test = LRDestSenderFlowTest(self.routers[2].addresses[0],
+                                    self.routers[0].addresses[1],
+                                    self.routers[0].addresses[0],
+                                    'queue.12', 5)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_13_dest_sender_initial_credit_interior_edge(self):
+        test = LRDestSenderFlowTest(self.routers[0].addresses[0],
+                                    self.routers[2].addresses[1],
+                                    self.routers[0].addresses[0],
+                                    'queue.13', 5)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_14_dest_sender_initial_credit_edge_interior_interior_edge(self):
+        test = LRDestSenderFlowTest(self.routers[2].addresses[0],
+                                    self.routers[4].addresses[1],
+                                    self.routers[0].addresses[0],
+                                    'queue.14', 5)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_15_dest_receiver_same_edge(self):
+        test = LRDestReceiverFlowTest(self.routers[2].addresses[1],
+                                      self.routers[2].addresses[0],
+                                      self.routers[2].addresses[0],
+                                      'queue.15', 0)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_16_dest_receiver_same_interior(self):
+        test = LRDestReceiverFlowTest(self.routers[0].addresses[1],
+                                      self.routers[0].addresses[0],
+                                      self.routers[0].addresses[0],
+                                      'queue.16', 0)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_17_dest_receiver_edge_edge(self):
+        test = LRDestReceiverFlowTest(self.routers[2].addresses[1],
+                                      self.routers[3].addresses[0],
+                                      self.routers[0].addresses[0],
+                                      'queue.17', 0)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_18_dest_receiver_interior_interior(self):
+        test = LRDestReceiverFlowTest(self.routers[0].addresses[1],
+                                      self.routers[1].addresses[0],
+                                      self.routers[0].addresses[0],
+                                      'queue.18', 0)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_19_dest_receiver_edge_interior(self):
+        test = LRDestReceiverFlowTest(self.routers[2].addresses[1],
+                                      self.routers[0].addresses[0],
+                                      self.routers[0].addresses[0],
+                                      'queue.19', 0)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_20_dest_receiver_interior_edge(self):
+        test = LRDestReceiverFlowTest(self.routers[0].addresses[1],
+                                      self.routers[2].addresses[0],
+                                      self.routers[0].addresses[0],
+                                      'queue.20', 0)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_21_dest_receiver_edge_interior_interior_edge(self):
+        test = LRDestReceiverFlowTest(self.routers[2].addresses[1],
+                                      self.routers[4].addresses[0],
+                                      self.routers[1].addresses[0],
+                                      'queue.21', 0)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_22_dest_receiver_initial_credit_same_edge(self):
+        test = LRDestReceiverFlowTest(self.routers[2].addresses[1],
+                                      self.routers[2].addresses[0],
+                                      self.routers[2].addresses[0],
+                                      'queue.22', 5)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_23_dest_receiver_initial_credit_same_interior(self):
+        test = LRDestReceiverFlowTest(self.routers[0].addresses[1],
+                                      self.routers[0].addresses[0],
+                                      self.routers[0].addresses[0],
+                                      'queue.23', 5)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_24_dest_receiver_initial_credit_edge_edge(self):
+        test = LRDestReceiverFlowTest(self.routers[2].addresses[1],
+                                      self.routers[3].addresses[0],
+                                      self.routers[0].addresses[0],
+                                      'queue.24', 5)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_25_dest_receiver_initial_credit_interior_interior(self):
+        test = LRDestReceiverFlowTest(self.routers[0].addresses[1],
+                                      self.routers[1].addresses[0],
+                                      self.routers[0].addresses[0],
+                                      'queue.25', 5)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_26_dest_receiver_initial_credit_edge_interior(self):
+        test = LRDestReceiverFlowTest(self.routers[2].addresses[1],
+                                      self.routers[0].addresses[0],
+                                      self.routers[0].addresses[0],
+                                      'queue.26', 5)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_27_dest_receiver_initial_credit_interior_edge(self):
+        test = LRDestReceiverFlowTest(self.routers[0].addresses[1],
+                                      self.routers[2].addresses[0],
+                                      self.routers[0].addresses[0],
+                                      'queue.27', 5)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_28_dest_receiver_initial_credit_edge_interior_interior_edge(self):
+        test = LRDestReceiverFlowTest(self.routers[2].addresses[1],
+                                      self.routers[4].addresses[0],
+                                      self.routers[1].addresses[0],
+                                      'queue.28', 5)
+        test.run()
+        self.assertEqual(None, test.error)
+
 
 class Entity(object):
     def __init__(self, status_code, status_description, attrs):
@@ -303,5 +471,113 @@ class LRDestSenderFlowTest(MessagingHandler):
         container.run()
 
 
+class LRDestReceiverFlowTest(MessagingHandler):
+    def __init__(self, receiver_host, sender_host, probe_host, address, 
initial_credit):
+        super(LRDestReceiverFlowTest, self).__init__(prefetch=0)
+        self.receiver_host   = receiver_host
+        self.sender_host     = sender_host
+        self.probe_host      = probe_host
+        self.address         = address
+        self.initial_credit  = initial_credit
+        self.delta_credit    = 7
+        self.final_credit    = initial_credit + 2 * self.delta_credit
+        self.expected_credit = initial_credit
+
+        self.receiver_conn  = None
+        self.sender_conn    = None
+        self.probe_conn     = None
+        self.probe_sender   = None
+        self.probe_receiver = None
+        self.probe_reply    = None
+        self.receiver       = None
+        self.sender         = None
+        self.error          = None
+        self.last_action    = "Test initialization"
+
+    def fail(self, text):
+        self.error = text
+        self.receiver_conn.close()
+        self.sender_conn.close()
+        self.probe_conn.close()
+        self.timer.cancel()
+
+    def timeout(self):
+        self.error = "Timeout Expired - last_action: %s" % (self.last_action)
+        self.receiver_conn.close()
+        self.sender_conn.close()
+        self.probe_conn.close()
+
+    def poll_timeout(self):
+        self.probe()
+
+    def on_start(self, event):
+        self.reactor        = event.reactor
+        self.timer          = event.reactor.schedule(7.0, Timeout(self))
+        self.receiver_conn  = event.container.connect(self.receiver_host)
+        self.sender_conn    = event.container.connect(self.sender_host)
+        self.probe_conn     = event.container.connect(self.probe_host)
+        self.probe_receiver = event.container.create_receiver(self.probe_conn, 
dynamic=True)
+        self.probe_receiver.flow(1000)
+        self.last_action = "on_start"
+
+    def probe(self):
+        self.probe_sender.send(self.proxy.read_address('Cqueue'))
+
+    def on_link_opened(self, event):
+        if event.receiver == self.probe_receiver:
+            self.probe_reply  = self.probe_receiver.remote_source.address
+            self.proxy        = RouterProxy(self.probe_reply)
+            self.probe_sender = event.container.create_sender(self.probe_conn, 
'$management')
+        elif event.sender == self.probe_sender:
+            self.probe()
+            self.last_action = "probing"
+        elif event.sender == self.sender:
+            if self.initial_credit == 0:
+                self.expected_credit += self.delta_credit
+                self.receiver.flow(self.delta_credit)
+
+    def on_link_opening(self, event):
+        if event.receiver:
+            self.receiver = event.receiver
+            if event.receiver.remote_target.address == self.address:
+                event.receiver.target.address = self.address
+                event.receiver.open()
+                if self.initial_credit > 0:
+                    self.receiver.flow(self.initial_credit)
+                    self.expected_credit = self.initial_credit
+            else:
+                self.fail("Incorrect address on incoming receiver: got %s, 
expected %s" %
+                          (event.receiver.remote_target.address, self.address))
+
+    def on_sendable(self, event):
+        if event.sender == self.sender:
+            if event.sender.credit == self.expected_credit:
+                if self.expected_credit == self.final_credit:
+                    self.fail(None)
+                else:
+                    self.expected_credit += self.delta_credit
+                    self.receiver.flow(self.delta_credit)
+            else:
+                self.fail("Unexpected sender credit: got %d, expected %d" %
+                          (event.sender.credit, self.expected_credit))
+
+    def on_message(self, event):
+        if event.receiver == self.probe_receiver:
+            response = self.proxy.response(event.message);
+            self.last_action = "Handling probe response: remote: %d container: 
%d" \
+                               % (response.remoteCount, 
response.containerCount)
+            if response.status_code == 200 and response.remoteCount + 
response.containerCount == 1:
+                self.sender = 
event.container.create_sender(self.receiver_conn, self.address)
+                self.last_action = "opening test sender"
+            else:
+                self.poll_timer = self.reactor.schedule(0.5, PollTimeout(self))
+                
+
+    def run(self):
+        container = Container(self)
+        container.container_id = 'LRC_R'
+        container.run()
+
+
 if __name__== '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to