Repository: qpid-dispatch Updated Branches: refs/heads/master e95eb3406 -> a6134950e
DISPATCH-1194 - Fixed credit propagation for routed links set up asynchronously. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/a6134950 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/a6134950 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/a6134950 Branch: refs/heads/master Commit: a6134950e8f45e23408fec8a09c3bd0d8d03d568 Parents: e95eb34 Author: Ted Ross <tr...@redhat.com> Authored: Tue Dec 11 13:59:16 2018 -0500 Committer: Ted Ross <tr...@redhat.com> Committed: Tue Dec 11 13:59:55 2018 -0500 ---------------------------------------------------------------------- src/router_core/connections.c | 7 + src/router_core/forwarder.c | 4 +- src/router_core/router_core_private.h | 2 + src/router_core/transfer.c | 7 + tests/system_tests_link_route_credit.py | 276 +++++++++++++++++++++++++++ 5 files changed, 295 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a6134950/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 642fdd5..a810a1e 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -882,6 +882,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, link->oper_status = QDR_LINK_OPER_DOWN; link->insert_prefix = 0; link->strip_prefix = 0; + link->attach_count = 1; link->strip_annotations_in = conn->strip_annotations_in; link->strip_annotations_out = conn->strip_annotations_out; @@ -1304,6 +1305,11 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act qdr_terminus_t *target = action->args.connection.target; // + // Start the attach count. + // + link->attach_count = 1; + + // // Put the link into the proper lists for tracking. // DEQ_INSERT_TAIL(core->open_links, link); @@ -1422,6 +1428,7 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac qdr_terminus_t *target = action->args.connection.target; link->oper_status = QDR_LINK_OPER_UP; + link->attach_count++; // // Mark the link as an edge link if it's inside an edge connection. http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a6134950/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index b4478c9..9d2437c 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -911,6 +911,7 @@ void qdr_forward_link_direct_CT(qdr_core_t *core, out_link->link_type = QD_LINK_ENDPOINT; out_link->link_direction = qdr_link_direction(in_link) == QD_OUTGOING ? QD_INCOMING : QD_OUTGOING; out_link->admin_enabled = true; + out_link->attach_count = 1; if (strip) { out_link->strip_prefix = strip; @@ -939,7 +940,8 @@ void qdr_forward_link_direct_CT(qdr_core_t *core, qdr_connection_enqueue_work_CT(core, conn, work); if (qdr_link_direction(in_link) == QD_OUTGOING && in_link->credit_to_core > 0) { - qdr_link_issue_credit_CT(core, out_link, in_link->credit_to_core, false); + qdr_link_issue_credit_CT(core, out_link, in_link->credit_stored, in_link->drain_mode); + in_link->credit_stored = 0; } } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a6134950/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 7177471..63fd5df 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -419,6 +419,7 @@ struct qdr_link_t { char *name; char *disambiguated_name; char *terminus_addr; + int attach_count; ///< 1 or 2 depending on the state of the lifecycle int detach_count; ///< 0, 1, or 2 depending on the state of the lifecycle qdr_address_t *owning_addr; ///< [ref] Address record that owns this link qdr_link_t *connected_link; ///< [ref] If this is a link-route, reference the connected link @@ -433,6 +434,7 @@ struct qdr_link_t { int capacity; int credit_to_core; ///< Number of the available credits incrementally given to the core int credit_pending; ///< Number of credits to be issued once consumers are available + int credit_stored; ///< Number of credits given to the link before it was ready to process them. bool admin_enabled; bool strip_annotations_in; bool strip_annotations_out; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a6134950/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 1cce8de..c4b90c9 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -781,6 +781,13 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar qdr_link_enqueue_work_CT(core, clink, work); } } else { + if (link->attach_count == 1) + // + // The link is half-open. Store the pending credit to be dealt with once the link is + // progressed to the next step. + // + link->credit_stored += credit; + // // Handle the replenishing of credit outbound // http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a6134950/tests/system_tests_link_route_credit.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_link_route_credit.py b/tests/system_tests_link_route_credit.py index e8c824e..8674519 100644 --- a/tests/system_tests_link_route_credit.py +++ b/tests/system_tests_link_route_credit.py @@ -151,6 +151,174 @@ class RouterTest(TestCase): test.run() self.assertEqual(None, test.error) + def test_08_dest_sender_initial_credit_same_edge(self): + test = LRDestSenderFlowTest(self.routers[2].addresses[0], + self.routers[2].addresses[1], + self.routers[2].addresses[0], + 'queue.08', 5) + test.run() + self.assertEqual(None, test.error) + + def test_09_dest_sender_initial_credit_same_interior(self): + test = LRDestSenderFlowTest(self.routers[0].addresses[0], + self.routers[0].addresses[1], + self.routers[0].addresses[0], + 'queue.09', 5) + test.run() + self.assertEqual(None, test.error) + + def test_10_dest_sender_initial_credit_edge_edge(self): + test = LRDestSenderFlowTest(self.routers[2].addresses[0], + self.routers[3].addresses[1], + self.routers[0].addresses[0], + 'queue.10', 5) + test.run() + self.assertEqual(None, test.error) + + def test_11_dest_sender_initial_credit_interior_interior(self): + test = LRDestSenderFlowTest(self.routers[0].addresses[0], + self.routers[1].addresses[1], + self.routers[0].addresses[0], + 'queue.11', 5) + test.run() + self.assertEqual(None, test.error) + + def test_12_dest_sender_initial_credit_edge_interior(self): + test = LRDestSenderFlowTest(self.routers[2].addresses[0], + self.routers[0].addresses[1], + self.routers[0].addresses[0], + 'queue.12', 5) + test.run() + self.assertEqual(None, test.error) + + def test_13_dest_sender_initial_credit_interior_edge(self): + test = LRDestSenderFlowTest(self.routers[0].addresses[0], + self.routers[2].addresses[1], + self.routers[0].addresses[0], + 'queue.13', 5) + test.run() + self.assertEqual(None, test.error) + + def test_14_dest_sender_initial_credit_edge_interior_interior_edge(self): + test = LRDestSenderFlowTest(self.routers[2].addresses[0], + self.routers[4].addresses[1], + self.routers[0].addresses[0], + 'queue.14', 5) + test.run() + self.assertEqual(None, test.error) + + def test_15_dest_receiver_same_edge(self): + test = LRDestReceiverFlowTest(self.routers[2].addresses[1], + self.routers[2].addresses[0], + self.routers[2].addresses[0], + 'queue.15', 0) + test.run() + self.assertEqual(None, test.error) + + def test_16_dest_receiver_same_interior(self): + test = LRDestReceiverFlowTest(self.routers[0].addresses[1], + self.routers[0].addresses[0], + self.routers[0].addresses[0], + 'queue.16', 0) + test.run() + self.assertEqual(None, test.error) + + def test_17_dest_receiver_edge_edge(self): + test = LRDestReceiverFlowTest(self.routers[2].addresses[1], + self.routers[3].addresses[0], + self.routers[0].addresses[0], + 'queue.17', 0) + test.run() + self.assertEqual(None, test.error) + + def test_18_dest_receiver_interior_interior(self): + test = LRDestReceiverFlowTest(self.routers[0].addresses[1], + self.routers[1].addresses[0], + self.routers[0].addresses[0], + 'queue.18', 0) + test.run() + self.assertEqual(None, test.error) + + def test_19_dest_receiver_edge_interior(self): + test = LRDestReceiverFlowTest(self.routers[2].addresses[1], + self.routers[0].addresses[0], + self.routers[0].addresses[0], + 'queue.19', 0) + test.run() + self.assertEqual(None, test.error) + + def test_20_dest_receiver_interior_edge(self): + test = LRDestReceiverFlowTest(self.routers[0].addresses[1], + self.routers[2].addresses[0], + self.routers[0].addresses[0], + 'queue.20', 0) + test.run() + self.assertEqual(None, test.error) + + def test_21_dest_receiver_edge_interior_interior_edge(self): + test = LRDestReceiverFlowTest(self.routers[2].addresses[1], + self.routers[4].addresses[0], + self.routers[1].addresses[0], + 'queue.21', 0) + test.run() + self.assertEqual(None, test.error) + + def test_22_dest_receiver_initial_credit_same_edge(self): + test = LRDestReceiverFlowTest(self.routers[2].addresses[1], + self.routers[2].addresses[0], + self.routers[2].addresses[0], + 'queue.22', 5) + test.run() + self.assertEqual(None, test.error) + + def test_23_dest_receiver_initial_credit_same_interior(self): + test = LRDestReceiverFlowTest(self.routers[0].addresses[1], + self.routers[0].addresses[0], + self.routers[0].addresses[0], + 'queue.23', 5) + test.run() + self.assertEqual(None, test.error) + + def test_24_dest_receiver_initial_credit_edge_edge(self): + test = LRDestReceiverFlowTest(self.routers[2].addresses[1], + self.routers[3].addresses[0], + self.routers[0].addresses[0], + 'queue.24', 5) + test.run() + self.assertEqual(None, test.error) + + def test_25_dest_receiver_initial_credit_interior_interior(self): + test = LRDestReceiverFlowTest(self.routers[0].addresses[1], + self.routers[1].addresses[0], + self.routers[0].addresses[0], + 'queue.25', 5) + test.run() + self.assertEqual(None, test.error) + + def test_26_dest_receiver_initial_credit_edge_interior(self): + test = LRDestReceiverFlowTest(self.routers[2].addresses[1], + self.routers[0].addresses[0], + self.routers[0].addresses[0], + 'queue.26', 5) + test.run() + self.assertEqual(None, test.error) + + def test_27_dest_receiver_initial_credit_interior_edge(self): + test = LRDestReceiverFlowTest(self.routers[0].addresses[1], + self.routers[2].addresses[0], + self.routers[0].addresses[0], + 'queue.27', 5) + test.run() + self.assertEqual(None, test.error) + + def test_28_dest_receiver_initial_credit_edge_interior_interior_edge(self): + test = LRDestReceiverFlowTest(self.routers[2].addresses[1], + self.routers[4].addresses[0], + self.routers[1].addresses[0], + 'queue.28', 5) + test.run() + self.assertEqual(None, test.error) + class Entity(object): def __init__(self, status_code, status_description, attrs): @@ -303,5 +471,113 @@ class LRDestSenderFlowTest(MessagingHandler): container.run() +class LRDestReceiverFlowTest(MessagingHandler): + def __init__(self, receiver_host, sender_host, probe_host, address, initial_credit): + super(LRDestReceiverFlowTest, self).__init__(prefetch=0) + self.receiver_host = receiver_host + self.sender_host = sender_host + self.probe_host = probe_host + self.address = address + self.initial_credit = initial_credit + self.delta_credit = 7 + self.final_credit = initial_credit + 2 * self.delta_credit + self.expected_credit = initial_credit + + self.receiver_conn = None + self.sender_conn = None + self.probe_conn = None + self.probe_sender = None + self.probe_receiver = None + self.probe_reply = None + self.receiver = None + self.sender = None + self.error = None + self.last_action = "Test initialization" + + def fail(self, text): + self.error = text + self.receiver_conn.close() + self.sender_conn.close() + self.probe_conn.close() + self.timer.cancel() + + def timeout(self): + self.error = "Timeout Expired - last_action: %s" % (self.last_action) + self.receiver_conn.close() + self.sender_conn.close() + self.probe_conn.close() + + def poll_timeout(self): + self.probe() + + def on_start(self, event): + self.reactor = event.reactor + self.timer = event.reactor.schedule(7.0, Timeout(self)) + self.receiver_conn = event.container.connect(self.receiver_host) + self.sender_conn = event.container.connect(self.sender_host) + self.probe_conn = event.container.connect(self.probe_host) + self.probe_receiver = event.container.create_receiver(self.probe_conn, dynamic=True) + self.probe_receiver.flow(1000) + self.last_action = "on_start" + + def probe(self): + self.probe_sender.send(self.proxy.read_address('Cqueue')) + + def on_link_opened(self, event): + if event.receiver == self.probe_receiver: + self.probe_reply = self.probe_receiver.remote_source.address + self.proxy = RouterProxy(self.probe_reply) + self.probe_sender = event.container.create_sender(self.probe_conn, '$management') + elif event.sender == self.probe_sender: + self.probe() + self.last_action = "probing" + elif event.sender == self.sender: + if self.initial_credit == 0: + self.expected_credit += self.delta_credit + self.receiver.flow(self.delta_credit) + + def on_link_opening(self, event): + if event.receiver: + self.receiver = event.receiver + if event.receiver.remote_target.address == self.address: + event.receiver.target.address = self.address + event.receiver.open() + if self.initial_credit > 0: + self.receiver.flow(self.initial_credit) + self.expected_credit = self.initial_credit + else: + self.fail("Incorrect address on incoming receiver: got %s, expected %s" % + (event.receiver.remote_target.address, self.address)) + + def on_sendable(self, event): + if event.sender == self.sender: + if event.sender.credit == self.expected_credit: + if self.expected_credit == self.final_credit: + self.fail(None) + else: + self.expected_credit += self.delta_credit + self.receiver.flow(self.delta_credit) + else: + self.fail("Unexpected sender credit: got %d, expected %d" % + (event.sender.credit, self.expected_credit)) + + def on_message(self, event): + if event.receiver == self.probe_receiver: + response = self.proxy.response(event.message); + self.last_action = "Handling probe response: remote: %d container: %d" \ + % (response.remoteCount, response.containerCount) + if response.status_code == 200 and response.remoteCount + response.containerCount == 1: + self.sender = event.container.create_sender(self.receiver_conn, self.address) + self.last_action = "opening test sender" + else: + self.poll_timer = self.reactor.schedule(0.5, PollTimeout(self)) + + + def run(self): + container = Container(self) + container.container_id = 'LRC_R' + container.run() + + if __name__== '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org