Repository: qpid-dispatch Updated Branches: refs/heads/master c98cb9049 -> 098ec7d13
DISPATCH-802 - Refuse transaction coordination links if they can't be routed to a coordinator since the router by itself cannot coordinate transactions. (cherry picked from commit d17f7b58b95886ca4ee03aac7deba4eb5cad679d) Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/098ec7d1 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/098ec7d1 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/098ec7d1 Branch: refs/heads/master Commit: 098ec7d13744bbbd31fce173bc9a76daab945e9c Parents: c98cb90 Author: Ganesh Murthy <gmur...@redhat.com> Authored: Fri Aug 4 15:19:15 2017 -0400 Committer: Ganesh Murthy <gmur...@redhat.com> Committed: Mon Aug 14 13:15:21 2017 -0400 ---------------------------------------------------------------------- include/qpid/dispatch/router_core.h | 9 +++++ src/router_core/connections.c | 20 +++++++++- src/router_core/router_core_private.h | 1 + src/router_core/terminus.c | 6 ++- tests/system_tests_one_router.py | 61 +++++++++++++++++++++++++++++- 5 files changed, 93 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/098ec7d1/include/qpid/dispatch/router_core.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 18499e0..0c47fae 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -304,6 +304,15 @@ bool qdr_terminus_has_capability(qdr_terminus_t *term, const char *capability); bool qdr_terminus_is_anonymous(qdr_terminus_t *term); /** + * qdr_terminus_is_coordinator + * + * Indicates if the terminus is a coordinator. + * @param term A qdr_terminus pointer returned by qdr_terminus() + * @return true iff the terminus is a coordinator + */ +bool qdr_terminus_is_coordinator(qdr_terminus_t *term); + +/** * qdr_terminus_is_dynamic * * Indicate whether this terminus represents a dynamic endpoint. http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/098ec7d1/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 264068d..4010ecb 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -860,6 +860,11 @@ void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t work->error = qdr_error("qd:connection-role", "Link attach forbidden on inter-router connection"); break; + case QDR_CONDITION_COORDINATOR_NOT_FOUND: + work->error = qdr_error(QD_AMQP_COND_NOT_FOUND, "Link attach forbidden, there is no route to a coordinator, " + "the router cannot coordinate transactions by itself. Try setting up a linkRoute to a coordinator and try again"); + break; + case QDR_CONDITION_NONE: work->error = 0; break; @@ -1373,7 +1378,20 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act qdr_terminus_free(target); } - } else { + } + + else if (qdr_terminus_is_coordinator(target)) { + // + // This target terminus is a coordinator. + // If we got here, it means that the coordinator link attach could not be link routed to a broker (or to the next router). + // The router should reject this link because the router cannot coordinate transactions itself. + // + // The attach response should have a null target to indicate refusal and the immediately coming detach. + // Now, send back a detach with the error amqp:not-found + qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_COORDINATOR_NOT_FOUND, true); + } + else + { // // Associate the link with the address. With this association, it will be unnecessary // to do an address lookup for deliveries that arrive on this link. http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/098ec7d1/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 052ed7f..0b52283 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -48,6 +48,7 @@ typedef enum { QDR_CONDITION_ROUTED_LINK_LOST, QDR_CONDITION_FORBIDDEN, QDR_CONDITION_WRONG_ROLE, + QDR_CONDITION_COORDINATOR_NOT_FOUND, QDR_CONDITION_NONE } qdr_condition_t; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/098ec7d1/src/router_core/terminus.c ---------------------------------------------------------------------- diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c index a6db7ea..62012f3 100644 --- a/src/router_core/terminus.c +++ b/src/router_core/terminus.c @@ -44,7 +44,6 @@ qdr_terminus_t *qdr_terminus(pn_terminus_t *pn) qdr_terminus_t *term = new_qdr_terminus_t(); ZERO(term); - term->coordinator = false; term->properties = pn_data(0); term->filter = pn_data(0); term->outcomes = pn_data(0); @@ -145,6 +144,11 @@ bool qdr_terminus_is_anonymous(qdr_terminus_t *term) return term == 0 || (term->address == 0 && !term->dynamic); } +bool qdr_terminus_is_coordinator(qdr_terminus_t *term) +{ + return term->coordinator; +} + bool qdr_terminus_is_dynamic(qdr_terminus_t *term) { http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/098ec7d1/tests/system_tests_one_router.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py index f520871..2f12670 100644 --- a/tests/system_tests_one_router.py +++ b/tests/system_tests_one_router.py @@ -18,9 +18,9 @@ # import unittest -from proton import Condition, Message, Delivery, PENDING, ACCEPTED, REJECTED +from proton import Condition, Message, Delivery, PENDING, ACCEPTED, REJECTED, Url from system_test import TestCase, Qdrouterd, main_module, TIMEOUT -from proton.handlers import MessagingHandler +from proton.handlers import MessagingHandler, TransactionHandler from proton.reactor import Container, AtMostOnce, AtLeastOnce from proton.utils import BlockingConnection, SyncRequestResponse from qpid_dispatch.management.client import Node @@ -1141,6 +1141,11 @@ class RouterTest(TestCase): test.run() self.assertEqual(None, test.error) + def test_25_reject_coordinator(self): + test = RejectCoordinatorTest(self.address) + test.run() + self.assertTrue(test.passed) + def test_reject_disposition(self): test = RejectDispositionTest(self.address) test.run() @@ -1616,6 +1621,58 @@ class BatchedSettlementTest(MessagingHandler): Container(self).run() +class RejectCoordinatorTest(MessagingHandler, TransactionHandler): + def __init__(self, url): + super(RejectCoordinatorTest, self).__init__(prefetch=0) + self.url = Url(url) + self.error = "Link attach forbidden, there is no route to a coordinator, the router cannot coordinate " \ + "transactions by itself. Try setting up a linkRoute to a coordinator and try again" + self.container = None + self.conn = None + self.sender = None + self.timer = None + self.passed = False + self.link_error = False + self.link_remote_close = False + + def timeout(self): + self.conn.close() + + def check_if_done(self): + if self.link_remote_close and self.link_error: + self.passed = True + self.conn.close() + self.timer.cancel() + + def on_start(self, event): + self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) + self.container = event.container + self.conn = self.container.connect(self.url) + self.sender = self.container.create_sender(self.conn, self.url.path) + # declare_transaction tries to create a link with name "txn-ctrl" to the + # transaction coordinator which has its own target, it has no address + # The router cannot coordinate transactions itself and so there will be a link error when this + # link is attempted to be created + self.container.declare_transaction(self.conn, handler=self) + + def on_link_error(self, event): + link = event.link + # If the link name is 'txn-ctrl' and there is a link error and it matches self.error, then we know + # that the router has rejected the link because it cannot coordinate transactions itself + if link.name == "txn-ctrl" and link.remote_condition.description == self.error: + self.link_error = True + self.check_if_done() + + def on_link_remote_close(self, event): + link = event.link + if link.name == "txn-ctrl": + self.link_remote_close = True + self.check_if_done() + + def run(self): + Container(self).run() + + class PresettledOverflowTest(MessagingHandler): def __init__(self, address): super(PresettledOverflowTest, self).__init__(prefetch=0) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org