[
https://issues.apache.org/jira/browse/DISPATCH-1160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16670406#comment-16670406
]
ASF GitHub Bot commented on DISPATCH-1160:
------------------------------------------
Github user ted-ross commented on a diff in the pull request:
https://github.com/apache/qpid-dispatch/pull/410#discussion_r229788028
--- Diff: tests/system_tests_edge_router.py ---
@@ -345,5 +620,420 @@ def run(self):
Container(self).run()
-if __name__ == '__main__':
+class MobileAddressOneSenderTwoReceiversTest(MessagingHandler):
+ def __init__(self, receiver1_host, receiver2_host, sender_host,
address):
+ super(MobileAddressOneSenderTwoReceiversTest, self).__init__()
+ self.receiver1_host = receiver1_host
+ self.receiver2_host = receiver2_host
+ self.sender_host = sender_host
+ self.address = address
+
+ # One sender connection and two receiver connections
+ self.receiver1_conn = None
+ self.receiver2_conn = None
+ self.sender_conn = None
+
+ self.receiver1 = None
+ self.receiver2 = None
+ self.sender = None
+
+ self.count = 300
+ self.rel_count = 50
+ self.n_rcvd1 = 0
+ self.n_rcvd2 = 0
+ self.n_sent = 0
+ self.n_settled = 0
+ self.n_released = 0
+ self.error = None
+ self.timer = None
+ self.all_msgs_received = False
+ self.recvd_msg_bodies = dict()
+ self.dup_msg = None
+
+ def timeout(self):
+ if self.dup_msg:
+ self.error = "Duplicate message %s received " % self.dup_msg
+ else:
+ self.error = "Timeout Expired - n_sent=%d n_rcvd=%d
n_settled=%d n_released=%d addr=%s" % \
+ (self.n_sent, (self.n_rcvd1 + self.n_rcvd2),
self.n_settled, self.n_released, self.address)
+
+ self.receiver1_conn.close()
+ self.receiver2_conn.close()
+ self.sender_conn.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5.0, Timeout(self))
+
+ # Create two receivers
+ self.receiver1_conn = event.container.connect(self.receiver1_host)
+ self.receiver2_conn = event.container.connect(self.receiver2_host)
+ self.receiver1 =
event.container.create_receiver(self.receiver1_conn,
+ self.address)
+ self.receiver2 =
event.container.create_receiver(self.receiver2_conn,
+ self.address)
+
+ # Create one sender
+ self.sender_conn = event.container.connect(self.sender_host)
+ self.sender = event.container.create_sender(self.sender_conn,
+ self.address)
+
+ def on_sendable(self, event):
+ while self.n_sent < self.count:
+ self.sender.send(Message(body="Message %d" % self.n_sent))
+ self.n_sent += 1
+
+ def on_message(self, event):
+ if self.recvd_msg_bodies.get(event.message.body):
+ self.dup_msg = event.message.body
+ self.timeout()
+ else:
+ self.recvd_msg_bodies[event.message.body] = event.message.body
+
+ if event.receiver == self.receiver1:
+ self.n_rcvd1 += 1
+ if event.receiver == self.receiver2:
+ self.n_rcvd2 += 1
+
+ if self.n_sent == self.n_rcvd1 + self.n_rcvd2:
+ self.all_msgs_received = True
+
+ def on_settled(self, event):
+ self.n_settled += 1
+ if self.n_settled == self.count:
+ self.receiver1.close()
+ self.receiver2.close()
+ for i in range(self.rel_count):
+ self.sender.send(Message(body="Message %d" % self.n_sent))
+ self.n_sent += 1
+
+ def on_released(self, event):
+ self.n_released += 1
+ if self.n_released == self.rel_count and self.all_msgs_received:
+ self.receiver1_conn.close()
+ self.receiver2_conn.close()
+ self.sender_conn.close()
+ self.timer.cancel()
+
+ def run(self):
+ Container(self).run()
+
+
+class MobileAddressMulticastTest(MessagingHandler):
+ def __init__(self, receiver1_host, receiver2_host, receiver3_host,
+ sender_host, address, large_msg=False, check_addr=False):
+ super(MobileAddressMulticastTest, self).__init__()
+ self.receiver1_host = receiver1_host
+ self.receiver2_host = receiver2_host
+ self.receiver3_host = receiver3_host
+ self.sender_host = sender_host
+ self.address = address
+
+ # One sender connection and two receiver connections
+ self.receiver1_conn = None
+ self.receiver2_conn = None
+ self.receiver3_conn = None
+ self.sender_conn = None
+
+ self.receiver1 = None
+ self.receiver2 = None
+ self.receiver3 = None
+ self.sender = None
+
+ self.count = 200
+ self.n_rcvd1 = 0
+ self.n_rcvd2 = 0
+ self.n_rcvd3 = 0
+ self.n_sent = 0
+ self.n_settled = 0
+ self.n_released = 0
+ self.error = None
+ self.timer = None
+ self.all_msgs_received = False
+ self.recvd1_msgs = dict()
+ self.recvd2_msgs = dict()
+ self.recvd3_msgs = dict()
+ self.dup_msg_rcvd = False
+ self.dup_msg = None
+ self.receiver_name = None
+ self.large_msg = large_msg
+ self.body = ""
+ self.r_attaches = 0
+ self.addr_timer = None
+ self.num_attempts = 0
+ self.container = None
+ self.check_addr = check_addr
+
+ if self.large_msg:
+ for i in range(10000):
+ self.body += "0123456789101112131415"
+
+ def timeout(self):
+ if self.dup_msg:
+ self.error = "%s received duplicate message %s" % \
+ (self.receiver_name, self.dup_msg)
+ else:
+ if not self.error:
+ self.error = "Timeout Expired - n_sent=%d n_rcvd1=%d " \
+ "n_rcvd2=%d n_rcvd3=%d addr=%s" % \
+ (self.n_sent, self.n_rcvd1, self.n_rcvd2,
+ self.n_rcvd3, self.address)
+ self.receiver1_conn.close()
+ self.receiver2_conn.close()
+ self.receiver3_conn.close()
+ if self.sender_conn:
+ self.sender_conn.close()
+
+ def check_address(self):
+ local_node = Node.connect(self.sender_host, timeout=TIMEOUT)
+ outs =
local_node.query(type='org.apache.qpid.dispatch.router.address')
+ found = False
+ for result in outs.results:
+ if self.address in result[0]:
+ found = True
+ self.sender_conn = self.container.connect(self.sender_host)
+ self.sender =
self.container.create_sender(self.sender_conn,
+ self.address)
+ local_node.close()
+ break
+
+ if not found:
+ self.error = "Unable to create sender because of " \
+ "absence of address in the address table"
+ self.addr_timer.cancel()
+ self.timeout()
+ local_node.close()
+
+ def create_sndr(self):
+ self.sender_conn = self.container.connect(self.sender_host)
+ self.sender = self.container.create_sender(self.sender_conn,
+ self.address)
+
+ def on_start(self, event):
+ if self.large_msg:
+ self.timer = event.reactor.schedule(10.0, Timeout(self))
+ else:
+ self.timer = event.reactor.schedule(20.0, Timeout(self))
+
+ # Create two receivers
+ self.receiver1_conn = event.container.connect(self.receiver1_host)
+ self.receiver2_conn = event.container.connect(self.receiver2_host)
+ self.receiver3_conn = event.container.connect(self.receiver3_host)
+ self.receiver1 =
event.container.create_receiver(self.receiver1_conn,
+ self.address)
+ self.receiver2 =
event.container.create_receiver(self.receiver2_conn,
+ self.address)
+ self.receiver3 =
event.container.create_receiver(self.receiver3_conn,
+ self.address)
+ self.container = event.container
+
+ def on_link_opened(self, event):
+ if event.receiver == self.receiver1 or \
+ event.receiver == self.receiver2 or \
+ event.receiver == self.receiver3:
+ self.r_attaches += 1
+ if self.r_attaches == 3:
+ self.addr_timer = event.reactor.schedule(4.0,
--- End diff --
Why do you wait so long? These tests take a very long time to run.
> Add edge address tracking module to interior routers which will inform edges
> of mobile address receiver changes
> ---------------------------------------------------------------------------------------------------------------
>
> Key: DISPATCH-1160
> URL: https://issues.apache.org/jira/browse/DISPATCH-1160
> Project: Qpid Dispatch
> Issue Type: Improvement
> Affects Versions: 1.4.1
> Reporter: Ganesh Murthy
> Assignee: Ganesh Murthy
> Priority: Major
> Fix For: 1.5.0
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]