This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new 8c8829b DISPATCH-1772: Add logging to fallback dest self test 8c8829b is described below commit 8c8829b88386e590e2fe14278fa2f2ece8e2a44c Author: Chuck Rolke <c...@apache.org> AuthorDate: Wed Jul 14 15:01:43 2021 -0400 DISPATCH-1772: Add logging to fallback dest self test * This patch instruments the SwitchoverTest where most of the test failures show up * The on_message handler is broken up to explicitly handle receiving from primary and fallback receivers in phases 0 and 1 * Add sender drain handling This closes #1298 --- tests/system_tests_fallback_dest.py | 178 ++++++++++++++++++++++++------------ 1 file changed, 118 insertions(+), 60 deletions(-) diff --git a/tests/system_tests_fallback_dest.py b/tests/system_tests_fallback_dest.py index db51172..85e6c89 100644 --- a/tests/system_tests_fallback_dest.py +++ b/tests/system_tests_fallback_dest.py @@ -20,6 +20,7 @@ from proton import Message, symbol from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, TestTimeout from system_test import unittest +from system_test import Logger from proton.handlers import MessagingHandler from proton.reactor import Container @@ -258,113 +259,113 @@ class RouterTest(TestCase): self.assertIsNone(test.error) def test_25_switchover_same_edge(self): - test = SwitchoverTest(self.ROUTER_EA1, - self.ROUTER_EA1, - self.ROUTER_EA1, + test = SwitchoverTest([self.ROUTER_EA1, "EA1"], + [self.ROUTER_EA1, "EA1"], + [self.ROUTER_EA1, "EA1"], 'dest.25') test.run() self.assertIsNone(test.error) def test_26_switchover_same_interior(self): - test = SwitchoverTest(self.ROUTER_INTA, - self.ROUTER_INTA, - self.ROUTER_INTA, + test = SwitchoverTest([self.ROUTER_INTA, "INTA"], + [self.ROUTER_INTA, "INTA"], + [self.ROUTER_INTA, "INTA"], 'dest.26') test.run() self.assertIsNone(test.error) def test_27_switchover_local_edge_alt_remote_interior(self): - test = SwitchoverTest(self.ROUTER_EA1, - self.ROUTER_INTA, - self.ROUTER_EA1, + test = SwitchoverTest([self.ROUTER_EA1, "EA1"], + [self.ROUTER_INTA, "INTA"], + [self.ROUTER_EA1, "EA1"], 'dest.27') test.run() self.assertIsNone(test.error) def test_28_switchover_local_edge_alt_remote_edge(self): - test = SwitchoverTest(self.ROUTER_EA1, - self.ROUTER_EB1, - self.ROUTER_EA1, + test = SwitchoverTest([self.ROUTER_EA1, "EA1"], + [self.ROUTER_EB1, "EB1"], + [self.ROUTER_EA1, "EA1"], 'dest.28') test.run() self.assertIsNone(test.error) def test_29_switchover_local_edge_pri_remote_interior(self): - test = SwitchoverTest(self.ROUTER_EA1, - self.ROUTER_EA1, - self.ROUTER_INTA, + test = SwitchoverTest([self.ROUTER_EA1, "EA1"], + [self.ROUTER_EA1, "EA1"], + [self.ROUTER_INTA, "INTA"], 'dest.29') test.run() self.assertIsNone(test.error) def test_30_switchover_local_interior_pri_remote_edge(self): - test = SwitchoverTest(self.ROUTER_EA1, - self.ROUTER_EA1, - self.ROUTER_EB1, + test = SwitchoverTest([self.ROUTER_EA1, "EA1"], + [self.ROUTER_EA1, "EA1"], + [self.ROUTER_EB1, "EB1"], 'dest.30') test.run() self.assertIsNone(test.error) def test_31_switchover_local_interior_alt_remote_interior(self): - test = SwitchoverTest(self.ROUTER_INTB, - self.ROUTER_INTA, - self.ROUTER_INTB, + test = SwitchoverTest([self.ROUTER_INTB, "INTB"], + [self.ROUTER_INTA, "INTA"], + [self.ROUTER_INTB, "INTB"], 'dest.31') test.run() self.assertIsNone(test.error) def test_32_switchover_local_interior_alt_remote_edge(self): - test = SwitchoverTest(self.ROUTER_INTB, - self.ROUTER_EA2, - self.ROUTER_INTB, + test = SwitchoverTest([self.ROUTER_INTB, "INTB"], + [self.ROUTER_EA2, "EA2"], + [self.ROUTER_INTB, "INTB"], 'dest.32') test.run() self.assertIsNone(test.error) def test_33_switchover_local_interior_pri_remote_interior(self): - test = SwitchoverTest(self.ROUTER_INTB, - self.ROUTER_INTB, - self.ROUTER_INTA, + test = SwitchoverTest([self.ROUTER_INTB, "INTB"], + [self.ROUTER_INTB, "INTB"], + [self.ROUTER_INTA, "INTA"], 'dest.33') test.run() self.assertIsNone(test.error) def test_34_switchover_local_interior_pri_remote_edge(self): - test = SwitchoverTest(self.ROUTER_INTB, - self.ROUTER_INTB, - self.ROUTER_EB1, + test = SwitchoverTest([self.ROUTER_INTB, "INTB"], + [self.ROUTER_INTB, "INTB"], + [self.ROUTER_EB1, "EB1"], 'dest.34') test.run() self.assertIsNone(test.error) def test_35_switchover_mix_1(self): - test = SwitchoverTest(self.ROUTER_INTA, - self.ROUTER_INTB, - self.ROUTER_EA1, + test = SwitchoverTest([self.ROUTER_INTA, "INTA"], + [self.ROUTER_INTB, "INTB"], + [self.ROUTER_EA1, "EA1"], 'dest.35') test.run() self.assertIsNone(test.error) def test_36_switchover_mix_2(self): - test = SwitchoverTest(self.ROUTER_EA1, - self.ROUTER_INTB, - self.ROUTER_INTA, + test = SwitchoverTest([self.ROUTER_EA1, "EA1"], + [self.ROUTER_INTB, "INTB"], + [self.ROUTER_INTA, "INTA"], 'dest.36') test.run() self.assertIsNone(test.error) def test_37_switchover_mix_3(self): - test = SwitchoverTest(self.ROUTER_EA1, - self.ROUTER_INTB, - self.ROUTER_EB1, + test = SwitchoverTest([self.ROUTER_EA1, "EA1"], + [self.ROUTER_INTB, "INTB"], + [self.ROUTER_EB1, "EB1"], 'dest.37') test.run() self.assertIsNone(test.error) def test_38_switchover_mix_4(self): - test = SwitchoverTest(self.ROUTER_EA1, - self.ROUTER_EA2, - self.ROUTER_EB1, + test = SwitchoverTest([self.ROUTER_EA1, "EA1"], + [self.ROUTER_EA2, "EA2"], + [self.ROUTER_EB1, "EB1"], 'dest.38') test.run() self.assertIsNone(test.error) @@ -567,9 +568,12 @@ class ReceiverFirstTest(MessagingHandler): class SwitchoverTest(MessagingHandler): def __init__(self, sender_host, primary_host, fallback_host, addr): super(SwitchoverTest, self).__init__() - self.sender_host = sender_host - self.primary_host = primary_host - self.fallback_host = fallback_host + self.sender_host = sender_host[0] + self.primary_host = primary_host[0] + self.fallback_host = fallback_host[0] + self.sender_name = sender_host[1] + self.primary_name = primary_host[1] + self.fallback_name = fallback_host[1] self.addr = addr self.count = 300 @@ -586,6 +590,14 @@ class SwitchoverTest(MessagingHandler): self.tx_seq = 0 self.local_rel = 0 + self.log_prefix = "FALLBACK_TEST %s" % self.addr + self.logger = Logger("SwitchoverTest_%s" % addr, print_to_console=False) + # Prepend a convenience SERVER line for scraper tool. + # Then the logs from this test can be merged with the router logs in scraper. + self.logger.log("SERVER (info) Container Name: %s" % self.addr) + self.logger.log("%s SwitchoverTest sender:%s primary:%s fallback:%s" % + (self.log_prefix, self.sender_name, self.primary_name, self.fallback_name)) + def timeout(self): self.error = "Timeout Expired - n_tx=%d, n_rx=%d, n_rel=%d, phase=%d, local_rel=%d" % \ (self.n_tx, self.n_rx, self.n_rel, self.phase, self.local_rel) @@ -602,65 +614,111 @@ class SwitchoverTest(MessagingHandler): def on_start(self, event): self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) + self.logger.log("%s Opening sender connection to %s" % (self.log_prefix, self.sender_name)) self.sender_conn = event.container.connect(self.sender_host) + self.logger.log("%s Opening primary receiver connection to %s" % (self.log_prefix, self.primary_name)) self.primary_conn = event.container.connect(self.primary_host) + self.logger.log("%s Opening fallback receiver connection to %s" % (self.log_prefix, self.fallback_name)) self.fallback_conn = event.container.connect(self.fallback_host) + self.logger.log("%s Opening primary receiver to %s" % (self.log_prefix, self.primary_name)) self.primary_receiver = event.container.create_receiver(self.primary_conn, self.addr, name=(self.addr + "_primary_receiver")) + self.logger.log("%s Opening fallback receiver to %s" % (self.log_prefix, self.fallback_name)) self.fallback_receiver = event.container.create_receiver(self.fallback_conn, self.addr, name=(self.addr + "fallback_receiver")) self.fallback_receiver.source.capabilities.put_object(symbol("qd.fallback")) def on_link_opened(self, event): receiver_event = False if event.receiver == self.primary_receiver: + self.logger.log("%s Primary receiver opened" % self.log_prefix) self.primary_open = True receiver_event = True if event.receiver == self.fallback_receiver: + self.logger.log("%s Fallback receiver opened" % self.log_prefix) self.fallback_open = True receiver_event = True if receiver_event and self.primary_open and self.fallback_open: + self.logger.log("%s Opening sender to %s" % (self.log_prefix, self.sender_name)) self.sender = event.container.create_sender(self.sender_conn, self.addr, name=(self.addr + "_sender")) def on_link_closed(self, event): if event.receiver == self.primary_receiver: + self.logger.log("%s Primary receiver closed. Start phase 1 send" % self.log_prefix) self.n_rx = 0 self.n_tx = 0 self.send() def send(self): - while self.sender.credit > 0 and self.n_tx < self.count: - self.sender.send(Message("Msg %s %d %d" % (self.addr, self.tx_seq, self.n_tx))) + e_credit = self.sender.credit + e_n_tx = self.n_tx + e_tx_seq = self.tx_seq + last_message = Message("None") + while self.sender.credit > 0 and self.n_tx < self.count and not self.sender.drain_mode: + last_message = Message("Msg %s %d %d" % (self.addr, self.tx_seq, self.n_tx)) + self.sender.send(last_message) self.n_tx += 1 self.tx_seq += 1 + if self.sender.drain_mode: + n_drained = self.sender.drained() + self.logger.log("%s sender.drained() drained %d credits" % (self.log_prefix, n_drained)) + self.logger.log("%s send() exit: last sent '%s' phase=%d, credit=%3d->%3d, n_tx=%4d->%4d, tx_seq=%4d->%4d, n_rel=%4d" % + (self.log_prefix, last_message.body, self.phase, e_credit, self.sender.credit, + e_n_tx, self.n_tx, e_tx_seq, self.tx_seq, self.n_rel)) def on_sendable(self, event): if event.sender == self.sender: self.send() + else: + self.fail("%s on_sendable event not from the only sender") def on_message(self, event): - if not (self.phase == 0 and event.receiver == self.fallback_receiver): - # Phase 0 message over primary receiver. Phase 1 can come in only on primary. - self.n_rx += 1 - if self.n_rx == self.count: - if self.phase == 0: + if event.receiver == self.primary_receiver: + if self.phase == 0: + self.n_rx += 1 + self.logger.log("%s Received phase 0 message '%s', n_rx=%d" % + (self.log_prefix, event.message.body, self.n_rx)) + if self.n_rx == self.count: + self.logger.log("%s Triggering fallback by closing primary receiver on %s. Test phase 0->1." % + (self.log_prefix, self.primary_name)) self.phase = 1 self.primary_receiver.close() - else: + else: + # Phase 1 messages are unexpected on primary receiver + self.logger.log("%s Phase %d message received on primary: '%s'" % (self.log_prefix, self.phase, event.message.body)) + self.fail("Receive phase1 message on primary receiver") + elif event.receiver == self.fallback_receiver: + if self.phase == 0: + # Phase 0 message over fallback receiver. This may happen because + # primary receiver is on a distant router and the fallback receiver is local. + # Release the message to keep trying until the primary receiver kicks in. + self.release(event.delivery) + self.n_rel += 1 + self.n_tx -= 1 + self.local_rel += 1 + self.logger.log("%s Released phase 0 over fallback: msg:'%s', n_rx=%d, n_tx=%d, n_rel=%d, local_rel=%d" % + (self.log_prefix, event.message.body, self.n_rx, self.n_tx, self.n_rel, self.local_rel)) + else: + self.n_rx += 1 + self.logger.log("%s Received phase 1 over fallback: msg:'%s', n_rx=%d" % + (self.log_prefix, event.message.body, self.n_rx)) + if self.n_rx == self.count: + self.logger.log("%s Success" % self.log_prefix) self.fail(None) else: - # Phase 0 message over fallback receiver. This may happen because - # primary receiver is on a distant router and the fallback receiver is local. - # Release the message to keep trying until the primary receiver kicks in. - self.release(event.delivery) - self.n_rel += 1 - self.n_tx -= 1 - self.local_rel += 1 + self.fail("%s message received on unidentified receiver" % self.addr) def on_released(self, event): + # event type pn_delivery for sender self.n_rel += 1 self.n_tx -= 1 + self.logger.log("%s on_released: sender delivery was released. Adjusted counts: n_rel=%d, n_tx=%d" % + (self.log_prefix, self.n_rel, self.n_tx)) + if event.sender is None: + self.fail("on_released event not related to sender") def run(self): Container(self).run() + if self.error is not None: + self.logger.dump() class SenderFirstAutoLinkTest(MessagingHandler): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org