[ https://issues.apache.org/jira/browse/DISPATCH-1160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16670407#comment-16670407 ]
ASF GitHub Bot commented on DISPATCH-1160: ------------------------------------------ Github user ted-ross commented on a diff in the pull request: https://github.com/apache/qpid-dispatch/pull/410#discussion_r229788582 --- Diff: tests/system_tests_edge_router.py --- @@ -345,5 +620,420 @@ def run(self): Container(self).run() -if __name__ == '__main__': +class MobileAddressOneSenderTwoReceiversTest(MessagingHandler): + def __init__(self, receiver1_host, receiver2_host, sender_host, address): + super(MobileAddressOneSenderTwoReceiversTest, self).__init__() + self.receiver1_host = receiver1_host + self.receiver2_host = receiver2_host + self.sender_host = sender_host + self.address = address + + # One sender connection and two receiver connections + self.receiver1_conn = None + self.receiver2_conn = None + self.sender_conn = None + + self.receiver1 = None + self.receiver2 = None + self.sender = None + + self.count = 300 + self.rel_count = 50 + self.n_rcvd1 = 0 + self.n_rcvd2 = 0 + self.n_sent = 0 + self.n_settled = 0 + self.n_released = 0 + self.error = None + self.timer = None + self.all_msgs_received = False + self.recvd_msg_bodies = dict() + self.dup_msg = None + + def timeout(self): + if self.dup_msg: + self.error = "Duplicate message %s received " % self.dup_msg + else: + self.error = "Timeout Expired - n_sent=%d n_rcvd=%d n_settled=%d n_released=%d addr=%s" % \ + (self.n_sent, (self.n_rcvd1 + self.n_rcvd2), self.n_settled, self.n_released, self.address) + + self.receiver1_conn.close() + self.receiver2_conn.close() + self.sender_conn.close() + + def on_start(self, event): + self.timer = event.reactor.schedule(5.0, Timeout(self)) + + # Create two receivers + self.receiver1_conn = event.container.connect(self.receiver1_host) + self.receiver2_conn = event.container.connect(self.receiver2_host) + self.receiver1 = event.container.create_receiver(self.receiver1_conn, + self.address) + self.receiver2 = event.container.create_receiver(self.receiver2_conn, + self.address) + + # Create one sender + self.sender_conn = event.container.connect(self.sender_host) + self.sender = event.container.create_sender(self.sender_conn, + self.address) + + def on_sendable(self, event): + while self.n_sent < self.count: + self.sender.send(Message(body="Message %d" % self.n_sent)) + self.n_sent += 1 + + def on_message(self, event): + if self.recvd_msg_bodies.get(event.message.body): + self.dup_msg = event.message.body + self.timeout() + else: + self.recvd_msg_bodies[event.message.body] = event.message.body + + if event.receiver == self.receiver1: + self.n_rcvd1 += 1 + if event.receiver == self.receiver2: + self.n_rcvd2 += 1 + + if self.n_sent == self.n_rcvd1 + self.n_rcvd2: + self.all_msgs_received = True + + def on_settled(self, event): + self.n_settled += 1 + if self.n_settled == self.count: + self.receiver1.close() + self.receiver2.close() + for i in range(self.rel_count): + self.sender.send(Message(body="Message %d" % self.n_sent)) + self.n_sent += 1 + + def on_released(self, event): + self.n_released += 1 + if self.n_released == self.rel_count and self.all_msgs_received: + self.receiver1_conn.close() + self.receiver2_conn.close() + self.sender_conn.close() + self.timer.cancel() + + def run(self): + Container(self).run() + + +class MobileAddressMulticastTest(MessagingHandler): + def __init__(self, receiver1_host, receiver2_host, receiver3_host, + sender_host, address, large_msg=False, check_addr=False): + super(MobileAddressMulticastTest, self).__init__() + self.receiver1_host = receiver1_host + self.receiver2_host = receiver2_host + self.receiver3_host = receiver3_host + self.sender_host = sender_host + self.address = address + + # One sender connection and two receiver connections + self.receiver1_conn = None + self.receiver2_conn = None + self.receiver3_conn = None + self.sender_conn = None + + self.receiver1 = None + self.receiver2 = None + self.receiver3 = None + self.sender = None + + self.count = 200 + self.n_rcvd1 = 0 + self.n_rcvd2 = 0 + self.n_rcvd3 = 0 + self.n_sent = 0 + self.n_settled = 0 + self.n_released = 0 + self.error = None + self.timer = None + self.all_msgs_received = False + self.recvd1_msgs = dict() + self.recvd2_msgs = dict() + self.recvd3_msgs = dict() + self.dup_msg_rcvd = False + self.dup_msg = None + self.receiver_name = None + self.large_msg = large_msg + self.body = "" + self.r_attaches = 0 + self.addr_timer = None + self.num_attempts = 0 + self.container = None + self.check_addr = check_addr + + if self.large_msg: + for i in range(10000): + self.body += "0123456789101112131415" + + def timeout(self): + if self.dup_msg: + self.error = "%s received duplicate message %s" % \ + (self.receiver_name, self.dup_msg) + else: + if not self.error: + self.error = "Timeout Expired - n_sent=%d n_rcvd1=%d " \ + "n_rcvd2=%d n_rcvd3=%d addr=%s" % \ + (self.n_sent, self.n_rcvd1, self.n_rcvd2, + self.n_rcvd3, self.address) + self.receiver1_conn.close() + self.receiver2_conn.close() + self.receiver3_conn.close() + if self.sender_conn: + self.sender_conn.close() + + def check_address(self): + local_node = Node.connect(self.sender_host, timeout=TIMEOUT) --- End diff -- Is Node a synchronous client? If so, I don't think it should be used in a reactive environment. > Add edge address tracking module to interior routers which will inform edges > of mobile address receiver changes > --------------------------------------------------------------------------------------------------------------- > > Key: DISPATCH-1160 > URL: https://issues.apache.org/jira/browse/DISPATCH-1160 > Project: Qpid Dispatch > Issue Type: Improvement > Affects Versions: 1.4.1 > Reporter: Ganesh Murthy > Assignee: Ganesh Murthy > Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org