This is an automated email from the ASF dual-hosted git repository. gmurthy pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new 019cef2 DISPATCH-2040 - For link routes, even if there is no change in disposition state, still copy the disposition data. This closes #1115 019cef2 is described below commit 019cef2d33bdc1a9056ac07a192819e96e97c7ce Author: Ganesh Murthy <gmur...@apache.org> AuthorDate: Sun Apr 11 19:49:09 2021 -0400 DISPATCH-2040 - For link routes, even if there is no change in disposition state, still copy the disposition data. This closes #1115 --- src/router_node.c | 7 +++- tests/system_tests_two_routers.py | 78 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 80 insertions(+), 5 deletions(-) diff --git a/src/router_node.c b/src/router_node.c index 552410a..3e17695 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -2050,9 +2050,12 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di return; // - // If the disposition has changed and the proton delivery has not already been settled, update the proton delivery. + // DISPATCH-2040: For link routed links, it does not matter if the passed in disp matches the pn_delivery_remote_state(pnd), we will still + // call qd_delivery_write_local_state and send out the disposition if the delivery is not already settled. // - if (disp != pn_delivery_remote_state(pnd) && !pn_delivery_settled(pnd)) { + // For non link routed links, if the disposition has changed and the proton delivery has not already been settled, update the proton delivery. + // + if ((qdr_link_is_routed(qlink) || disp != pn_delivery_remote_state(pnd)) && !pn_delivery_settled(pnd)) { qd_message_t *msg = qdr_delivery_message(dlv); // handle propagation of delivery state from qdr_delivery_t to proton: diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py index ebaef6b..bdd062d 100644 --- a/tests/system_tests_two_routers.py +++ b/tests/system_tests_two_routers.py @@ -36,6 +36,8 @@ from system_test import get_inter_router_links from system_test import unittest from test_broker import FakeService +from proton import Described, ulong + from proton.handlers import MessagingHandler from proton.reactor import Container, AtLeastOnce from proton.utils import BlockingConnection @@ -2076,9 +2078,6 @@ class TwoRouterExtensionStateTest(TestCase): ('connector', {'name': 'toRouterA', 'role': 'inter-router', 'port': inter_router_port}), - - - ('listener', {'role': 'route-container', 'host': '0.0.0.0', 'port': service_port, @@ -2165,6 +2164,79 @@ class TwoRouterExtensionStateTest(TestCase): pass self.assertEqual([1, 2, 3], ext_data) + def test_04_test_transactional_state(self): + """ + Verifies that the data sent in the state field of the disposition + is forwarded all the way back to the client. + """ + TRANS_STATE = 52 + RESPONSE_LOCAL_DATA = ["MyTxnIDResp", + Described(ulong(Delivery.ACCEPTED), [])] + + class MyExtendedService(FakeService): + """ + This service receives a transfer frame and sends back a + disposition frame with a state field. + For example, this service sends a disposition with the + following state field + state=@transactional-state(52) [txn-id="MyTxnIDResp", outcome=@accepted(36) []] + """ + def __init__(self, url, container_id=None): + self.remote_state = None + self.remote_data = None + super(MyExtendedService, self).__init__(url, container_id, + auto_accept=False, + auto_settle=False) + + def on_message(self, event): + self.remote_state = event.delivery.remote_state + self.remote_data = event.delivery.remote.data + if self.remote_state == TRANS_STATE and self.remote_data == ['MyTxnID']: + # This will send a disposition with + # state=@transactional-state(52) [txn-id="MyTxnIDResp", outcome=@accepted(36) []] + # We will make sure that this state was received + # by the sender. + event.delivery.local.data = RESPONSE_LOCAL_DATA + event.delivery.update(TRANS_STATE) + event.delivery.settle() + + # Start the service that connects to the route-container listener + # on the router with container_id="FakeService" + fs = MyExtendedService(self.RouterB.addresses[1], + container_id="FakeService") + + self.RouterA.wait_address("RoutieMcRouteFace", remotes=1, count=2) + + class MyTransactionStateSender(AsyncTestSender): + def on_sendable(self, event): + # Send just one delivery with a transactional state. + if self.sent < self.total: + self.sent += 1 + dlv = event.sender.delivery(str(self.sent)) + dlv.local.data = ["MyTxnID"] + # this will send a transfer frame to the router with + # state=@transactional-state(52) [txn-id="MyTxnID"] + dlv.update(TRANS_STATE) + event.sender.stream(self._message.encode()) + event.sender.advance() + + def on_settled(self, event): + self.remote_state = event.delivery.remote_state + self.remote_data = event.delivery.remote.data + if self.remote_state == TRANS_STATE and \ + self.remote_data == RESPONSE_LOCAL_DATA: + # This means that the router is passing the state it + # received from the service all the way back to the + # client. This would not happen without the fix + # for DISPATCH-2040 + self.accepted += 1 + self.test_passed = True + + tx = MyTransactionStateSender(self.RouterA.addresses[0], "RoutieMcRouteFace") + tx.wait() + fs.join() + self.assertTrue(tx.test_passed) + class MyExtendedSender(AsyncTestSender): """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org