This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new 019cef2  DISPATCH-2040 - For link routes, even if there is no change 
in disposition state, still copy the disposition data. This closes #1115
019cef2 is described below

commit 019cef2d33bdc1a9056ac07a192819e96e97c7ce
Author: Ganesh Murthy <gmur...@apache.org>
AuthorDate: Sun Apr 11 19:49:09 2021 -0400

    DISPATCH-2040 - For link routes, even if there is no change in disposition 
state, still copy the disposition data. This closes #1115
---
 src/router_node.c                 |  7 +++-
 tests/system_tests_two_routers.py | 78 +++++++++++++++++++++++++++++++++++++--
 2 files changed, 80 insertions(+), 5 deletions(-)

diff --git a/src/router_node.c b/src/router_node.c
index 552410a..3e17695 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -2050,9 +2050,12 @@ static void CORE_delivery_update(void *context, 
qdr_delivery_t *dlv, uint64_t di
         return;
 
     //
-    // If the disposition has changed and the proton delivery has not already 
been settled, update the proton delivery.
+    // DISPATCH-2040: For link routed links, it does not matter if the passed 
in disp matches the pn_delivery_remote_state(pnd), we will still
+    // call qd_delivery_write_local_state and send out the disposition if the 
delivery is not already settled.
     //
-    if (disp != pn_delivery_remote_state(pnd) && !pn_delivery_settled(pnd)) {
+    // For non link routed links, if the disposition has changed and the 
proton delivery has not already been settled, update the proton delivery.
+    //
+    if ((qdr_link_is_routed(qlink) || disp != pn_delivery_remote_state(pnd)) 
&& !pn_delivery_settled(pnd)) {
         qd_message_t *msg = qdr_delivery_message(dlv);
 
         // handle propagation of delivery state from qdr_delivery_t to proton:
diff --git a/tests/system_tests_two_routers.py 
b/tests/system_tests_two_routers.py
index ebaef6b..bdd062d 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -36,6 +36,8 @@ from system_test import get_inter_router_links
 from system_test import unittest
 from test_broker import FakeService
 
+from proton import Described, ulong
+
 from proton.handlers import MessagingHandler
 from proton.reactor import Container, AtLeastOnce
 from proton.utils import BlockingConnection
@@ -2076,9 +2078,6 @@ class TwoRouterExtensionStateTest(TestCase):
                                  ('connector', {'name': 'toRouterA',
                                                 'role': 'inter-router',
                                                 'port': inter_router_port}),
-
-
-
                                  ('listener', {'role': 'route-container',
                                                'host': '0.0.0.0',
                                                'port': service_port,
@@ -2165,6 +2164,79 @@ class TwoRouterExtensionStateTest(TestCase):
                 pass
         self.assertEqual([1, 2, 3], ext_data)
 
+    def test_04_test_transactional_state(self):
+        """
+        Verifies that the data sent in the state field of the disposition
+        is forwarded all the way back to the client.
+        """
+        TRANS_STATE = 52
+        RESPONSE_LOCAL_DATA = ["MyTxnIDResp",
+                               Described(ulong(Delivery.ACCEPTED), [])]
+
+        class MyExtendedService(FakeService):
+            """
+            This service receives a transfer frame and sends back a
+            disposition frame with a state field.
+            For example, this service sends a disposition with the
+            following state field
+            state=@transactional-state(52) [txn-id="MyTxnIDResp", 
outcome=@accepted(36) []]
+            """
+            def __init__(self, url, container_id=None):
+                self.remote_state = None
+                self.remote_data = None
+                super(MyExtendedService, self).__init__(url, container_id,
+                                                        auto_accept=False,
+                                                        auto_settle=False)
+
+            def on_message(self, event):
+                self.remote_state = event.delivery.remote_state
+                self.remote_data = event.delivery.remote.data
+                if self.remote_state == TRANS_STATE and self.remote_data == 
['MyTxnID']:
+                    # This will send a disposition with
+                    # state=@transactional-state(52) [txn-id="MyTxnIDResp", 
outcome=@accepted(36) []]
+                    # We will make sure that this state was received
+                    # by the sender.
+                    event.delivery.local.data = RESPONSE_LOCAL_DATA
+                    event.delivery.update(TRANS_STATE)
+                event.delivery.settle()
+
+        # Start the service that connects to the route-container listener
+        # on the router with container_id="FakeService"
+        fs = MyExtendedService(self.RouterB.addresses[1],
+                               container_id="FakeService")
+
+        self.RouterA.wait_address("RoutieMcRouteFace", remotes=1, count=2)
+
+        class MyTransactionStateSender(AsyncTestSender):
+            def on_sendable(self, event):
+                # Send just one delivery with a transactional state.
+                if self.sent < self.total:
+                    self.sent += 1
+                    dlv = event.sender.delivery(str(self.sent))
+                    dlv.local.data = ["MyTxnID"]
+                    # this will send a transfer frame to the router with
+                    # state=@transactional-state(52) [txn-id="MyTxnID"]
+                    dlv.update(TRANS_STATE)
+                    event.sender.stream(self._message.encode())
+                    event.sender.advance()
+
+            def on_settled(self, event):
+                self.remote_state = event.delivery.remote_state
+                self.remote_data = event.delivery.remote.data
+                if self.remote_state == TRANS_STATE and \
+                        self.remote_data == RESPONSE_LOCAL_DATA:
+                    # This means that the router is passing the state it
+                    # received from the service all the way back to the
+                    # client. This would not happen without the fix
+                    # for DISPATCH-2040
+                    self.accepted += 1
+                    self.test_passed = True
+
+        tx = MyTransactionStateSender(self.RouterA.addresses[0], 
"RoutieMcRouteFace")
+        tx.wait()
+        fs.join()
+        self.assertTrue(tx.test_passed)
+
 
 class MyExtendedSender(AsyncTestSender):
     """

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to