[qpid-dispatch] branch main updated: DISPATCH-2276: Accomodate hex values in 0.36+ qpid-proton logging (#1525)
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new 6f75d1f DISPATCH-2276: Accomodate hex values in 0.36+ qpid-proton logging (#1525) 6f75d1f is described below commit 6f75d1f3dc1600204289e54ed395a22e5e486f26 Author: Chuck Rolke AuthorDate: Tue Feb 22 12:09:59 2022 -0500 DISPATCH-2276: Accomodate hex values in 0.36+ qpid-proton logging (#1525) * DISPATCH-2276: Accomodate hex values in 0.36+ qpid-proton logging * Fix code that extracts integer values during AMQP analysis * Convert ints from hex to decimal for web display of values line link and session numbers and credit. * DISPATCH-2276: Topology disposition test does not need to import proton Co-authored-by: chug Co-authored-by: Chuck Rolke --- tests/system_tests_topology_disposition.py | 2 -- tools/scraper/amqp_detail.py | 8 tools/scraper/parser.py| 3 +++ 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/tests/system_tests_topology_disposition.py b/tests/system_tests_topology_disposition.py index 25e8046..22b684c 100644 --- a/tests/system_tests_topology_disposition.py +++ b/tests/system_tests_topology_disposition.py @@ -23,7 +23,6 @@ import time import unittest from subprocess import PIPE, STDOUT -import proton from proton import Message from proton.handlers import MessagingHandler from proton.reactor import Container @@ -406,7 +405,6 @@ class TopologyDispositionTests (TestCase): self.assertIsNone(error) self.assertIsNone(error) -@unittest.skipIf(proton.VERSION > (0, 36, 0), "see DISPATCH-2276") def test_04_scraper_tool(self): name = 'test_04' error = str(None) diff --git a/tools/scraper/amqp_detail.py b/tools/scraper/amqp_detail.py index 0b2c3cd..b1c8147 100755 --- a/tools/scraper/amqp_detail.py +++ b/tools/scraper/amqp_detail.py @@ -681,7 +681,7 @@ class AllDetails(): sdispmap = sess.rx_rcvr_disposition_map if splf.data.is_receiver else sess.rx_sndr_disposition_map else: sdispmap = sess.tx_rcvr_disposition_map if splf.data.is_receiver else sess.tx_sndr_disposition_map -for sdid in range(int(splf.data.first), (int(splf.data.last) + 1)): +for sdid in range(int(splf.data.first, 0), (int(splf.data.last, 0) + 1)): did = str(sdid) if did in sdispmap: old_splf = sdispmap[did] @@ -829,7 +829,7 @@ class AllDetails(): tod_of_second_attach = plf.datetime if look_for_sender_delivery_id: if plf.data.name == "attach" and not plf.data.is_receiver: -current_delivery = int(plf.data.described_type.dict.get("initial-delivery_count", "0")) +current_delivery = int(plf.data.described_type.dict.get("initial-delivery_count", "0"), 0) delivery_limit = current_delivery look_for_sender_delivery_id = False @@ -838,7 +838,7 @@ class AllDetails(): # a flow in the normal direction updates the delivery limit dc = plf.data.described_type.dict.get("delivery-count", "0") lc = plf.data.described_type.dict.get("link-credit", "0") -delivery_limit = int(dc) + int(lc) # TODO: wrap at 32-bits +delivery_limit = int(dc, 0) + int(lc, 0) # TODO: wrap at 32-bits if n_attaches < 2: # a working flow before sender attach - cancel initial stall init_stall = False @@ -858,7 +858,7 @@ class AllDetails(): else: # flow in the opposite direction updates the senders current delivery # normally used to consume credit in response to a drain from receiver -current_delivery = int(plf.data.described_type.dict.get("initial-delivery_count", "0")) +current_delivery = int(plf.data.described_type.dict.get("initial-delivery_count", "0"), 0) elif plf.data.transfer: if plf.data.direction == dir_of_xfer: diff --git a/tools/scraper/parser.py
[qpid-dispatch] branch main updated: DISPATCH-2214: Reduce logging in tcp_adaptor self test
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new 967e00d DISPATCH-2214: Reduce logging in tcp_adaptor self test 967e00d is described below commit 967e00d14a1335557b1f3269e92d4e9020401c07 Author: Chuck Rolke AuthorDate: Wed Jul 28 09:47:46 2021 -0400 DISPATCH-2214: Reduce logging in tcp_adaptor self test * Put restraints on echo tests startup wait loop Only poll N times; delay between polls * Print qdsdat address displays if polling fails This closes #1327 --- tests/system_tests_tcp_adaptor.py | 13 - 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index 71bce4c..fe26dd4 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -322,7 +322,7 @@ class TcpAdaptor(TestCase): # define logging levels cls.print_logs_server = False -cls.print_logs_client = True +cls.print_logs_client = False parent_path = os.path.dirname(os.getcwd()) cls.logger = Logger(title="TcpAdaptor-testClass", print_to_console=True, @@ -505,6 +505,8 @@ class TcpAdaptor(TestCase): # wait for server addresses (mobile ES_) to propagate to all interior routers interior_rtrs = [rtr for rtr in cls.router_order if rtr.startswith('I')] +poll_loops = 100 +poll_loop_delay = 0.5 # seconds found_all = False while not found_all: found_all = True @@ -528,6 +530,15 @@ class TcpAdaptor(TestCase): unseen = [srv for srv in cls.router_order if "ES_" + srv not in seen] cls.logger.log("TCP_TEST Router %s sees only %d of %d addresses. Waiting for %s" % (rtr, len(server_lines), len(cls.router_order), unseen)) +if poll_loops == 1: +# last poll loop +for line in lines: +cls.logger.log("TCP_TEST Router %s : %s" % (rtr, line)) +poll_loops -= 1 +if poll_loops == 0: +assert False, "TCP_TEST TCP_Adaptor test setup failed. Echo tests never executed." +else: +time.sleep(poll_loop_delay) cls.logger.log("TCP_TEST Done poll wait") @classmethod - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch main updated: DISPATCH-2213: [test] Fallback dest test to do less logging
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new feec3b5 DISPATCH-2213: [test] Fallback dest test to do less logging feec3b5 is described below commit feec3b51f083551cc932f50c1096bd4f0da7288c Author: Chuck Rolke AuthorDate: Tue Jul 27 14:27:18 2021 -0400 DISPATCH-2213: [test] Fallback dest test to do less logging Log only every 100th event and provide some delays while waiting for addresses to stabilize. This prevents this single test from consuming the entire disk quota for the entire test run and it gives the routers under test some CPU time to do real work. This closes #1323 --- tests/system_tests_fallback_dest.py | 34 +++--- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/tests/system_tests_fallback_dest.py b/tests/system_tests_fallback_dest.py index 85e6c89..d34ea65 100644 --- a/tests/system_tests_fallback_dest.py +++ b/tests/system_tests_fallback_dest.py @@ -23,6 +23,7 @@ from system_test import unittest from system_test import Logger from proton.handlers import MessagingHandler from proton.reactor import Container +import time class AddrTimer(object): @@ -577,6 +578,11 @@ class SwitchoverTest(MessagingHandler): self.addr = addr self.count = 300 +# DISPATCH-2213 back off on logging. +self.log_sends = 100 # every 100th send +self.log_recvs = 100 # every 100th receive +self.log_released = 100 # every 100th sender released + self.sender_conn= None self.primary_conn = None self.fallback_conn = None @@ -660,9 +666,10 @@ class SwitchoverTest(MessagingHandler): if self.sender.drain_mode: n_drained = self.sender.drained() self.logger.log("%s sender.drained() drained %d credits" % (self.log_prefix, n_drained)) -self.logger.log("%s send() exit: last sent '%s' phase=%d, credit=%3d->%3d, n_tx=%4d->%4d, tx_seq=%4d->%4d, n_rel=%4d" % -(self.log_prefix, last_message.body, self.phase, e_credit, self.sender.credit, - e_n_tx, self.n_tx, e_tx_seq, self.tx_seq, self.n_rel)) +if self.n_tx > e_n_tx and self.n_tx % self.log_sends == 0: # if sent then log every Nth message +self.logger.log("%s send() exit: last sent '%s' phase=%d, credit=%3d->%3d, n_tx=%4d->%4d, tx_seq=%4d->%4d, n_rel=%4d" % +(self.log_prefix, last_message.body, self.phase, e_credit, self.sender.credit, + e_n_tx, self.n_tx, e_tx_seq, self.tx_seq, self.n_rel)) def on_sendable(self, event): if event.sender == self.sender: @@ -674,8 +681,9 @@ class SwitchoverTest(MessagingHandler): if event.receiver == self.primary_receiver: if self.phase == 0: self.n_rx += 1 -self.logger.log("%s Received phase 0 message '%s', n_rx=%d" % -(self.log_prefix, event.message.body, self.n_rx)) +if self.n_rx % self.log_recvs == 0: +self.logger.log("%s Received phase 0 message '%s', n_rx=%d" % +(self.log_prefix, event.message.body, self.n_rx)) if self.n_rx == self.count: self.logger.log("%s Triggering fallback by closing primary receiver on %s. Test phase 0->1." % (self.log_prefix, self.primary_name)) @@ -694,12 +702,15 @@ class SwitchoverTest(MessagingHandler): self.n_rel += 1 self.n_tx -= 1 self.local_rel += 1 -self.logger.log("%s Released phase 0 over fallback: msg:'%s', n_rx=%d, n_tx=%d, n_rel=%d, local_rel=%d" % -(self.log_prefix, event.message.body, self.n_rx, self.n_tx, self.n_rel, self.local_rel)) +if self.local_rel % self.log_recvs == 0: +self.logger.log("%s Released phase 0 over fallback: msg:'%s', n_rx=%d, n_tx=%d, n_rel=%d, local_rel=%d" % +(self.log_prefix, event.message.body, self.n_rx, self.n_tx, self.n_rel, self.local_rel)) +time.sleep(0.02) else: self.n_rx += 1 -self.logger.log("%s Received phase 1 over fallback: msg:'%s', n_rx=%d" % -(self.log_prefix, event.message.body, self.n_rx)) +if self.n_rx % self.log_recvs == 0: +self.logger.log("
[qpid-dispatch] branch main updated: DISPATCH-2195: Fix SERVER-PYTHON lock inversion
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new e90df06 DISPATCH-2195: Fix SERVER-PYTHON lock inversion e90df06 is described below commit e90df062b08fb572e5d865c69feb0f20dd202abe Author: Chuck Rolke AuthorDate: Fri Jul 23 13:55:34 2021 -0400 DISPATCH-2195: Fix SERVER-PYTHON lock inversion * Release server lock before calling qd_policy_socket_close instead of after. * As a general rule the PYTHON lock must not be taken while holding any other lock. * Make a copy of the global while holding the lock and then log the value of the copy. This closes #1295 --- src/policy.c | 3 ++- src/server.c | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/policy.c b/src/policy.c index 07f4fae..2255806 100644 --- a/src/policy.c +++ b/src/policy.c @@ -289,6 +289,7 @@ void qd_policy_socket_close(qd_policy_t *policy, const qd_connection_t *conn) { sys_mutex_lock(stats_lock); n_connections--; +uint64_t nc = n_connections; assert (n_connections >= 0); sys_mutex_unlock(stats_lock); if (policy->enableVhostPolicy) { @@ -319,7 +320,7 @@ void qd_policy_socket_close(qd_policy_t *policy, const qd_connection_t *conn) "[C%"PRIu64"] Connection '%s' closed with resources n_sessions=%d, n_senders=%d, n_receivers=%d, " "sessions_denied=%"PRIu64", senders_denied=%"PRIu64", receivers_denied=%"PRIu64", max_message_size_denied:%"PRIu64", nConnections= %"PRIu64".", conn->connection_id, hostname, conn->n_sessions, conn->n_senders, conn->n_receivers, -qpdc->sessionDenied, qpdc->senderDenied, qpdc->receiverDenied, qpdc->maxSizeMessagesDenied, n_connections); +qpdc->sessionDenied, qpdc->senderDenied, qpdc->receiverDenied, qpdc->maxSizeMessagesDenied, nc); } } diff --git a/src/server.c b/src/server.c index 82ad1f3..75527b2 100644 --- a/src/server.c +++ b/src/server.c @@ -931,12 +931,12 @@ static void qd_connection_free(qd_connection_t *qd_conn) sys_mutex_lock(qd_server->lock); DEQ_REMOVE(qd_server->conn_list, qd_conn); +sys_mutex_unlock(qd_server->lock); // If counted for policy enforcement, notify it has closed if (qd_conn->policy_counted) { qd_policy_socket_close(qd_server->qd->policy, qd_conn); } -sys_mutex_unlock(qd_server->lock); invoke_deferred_calls(qd_conn, true); // Discard any pending deferred calls sys_mutex_free(qd_conn->deferred_call_lock); - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch main updated: DISPATCH-2166: Improve message, message_content multithread access correctness
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new fc745cd DISPATCH-2166: Improve message, message_content multithread access correctness fc745cd is described below commit fc745cdf9759d3d2bf0bd1aea752a81e401a3653 Author: Chuck Rolke AuthorDate: Wed Jul 21 16:32:02 2021 -0400 DISPATCH-2166: Improve message, message_content multithread access correctness Many message and message_content variables have implicit thread ownership. * Each message has only one receiver that creates the content. * Each message content may have many senders that consume the content. Some variables are "owned" by the message_receive thread and are never read nor written by the message_send threads. And other variables are owned by the message send threads. Then more variables are shared among all the senders and receiver. This patch addresses the shared variables. * Unsuppress tsan qd_message_set_send_complete * Convert many bools to sys_atomic_t; adjust access methods * Add locking to some variable accesses This closes #1308 --- include/qpid/dispatch/atomic.h | 7 +-- src/message.c | 111 + src/message_private.h | 79 +++-- tests/message_test.c | 12 ++--- tests/tsan.supp| 3 -- 5 files changed, 119 insertions(+), 93 deletions(-) diff --git a/include/qpid/dispatch/atomic.h b/include/qpid/dispatch/atomic.h index 9eb09d6..32f4967 100644 --- a/include/qpid/dispatch/atomic.h +++ b/include/qpid/dispatch/atomic.h @@ -205,10 +205,11 @@ static inline void sys_atomic_destroy(sys_atomic_t *ref) #endif -#defineSET_ATOMIC_FLAG(flag) sys_atomic_set(flag, 1) -#defineCLEAR_ATOMIC_FLAG(flag) sys_atomic_set(flag, 0) +#defineSET_ATOMIC_FLAG(flag)sys_atomic_set((flag), 1) +#define CLEAR_ATOMIC_FLAG(flag)sys_atomic_set((flag), 0) +#defineSET_ATOMIC_BOOL(flag, value) sys_atomic_set((flag), ((value) ? 1 : 0)) -#define IS_ATOMIC_FLAG_SET(flag) (sys_atomic_get(flag) == 1) +#define IS_ATOMIC_FLAG_SET(flag) (sys_atomic_get(flag) == 1) /** Atomic increase: NOTE returns value *before* increase, like i++ */ static inline uint32_t sys_atomic_inc(sys_atomic_t *ref) { return sys_atomic_add((ref), 1); } diff --git a/src/message.c b/src/message.c index e867913..2d8d579 100644 --- a/src/message.c +++ b/src/message.c @@ -926,8 +926,7 @@ static void qd_message_parse_priority(qd_message_t *in_msg) qd_message_content_t *content = MSG_CONTENT(in_msg); qd_iterator_t*iter = qd_message_field_iterator(in_msg, QD_FIELD_HEADER); -content->priority_parsed = true; -content->priority_present = false; +SET_ATOMIC_FLAG(&content->priority_parsed); if (!!iter) { qd_parsed_field_t *field = qd_parse(iter); @@ -936,8 +935,8 @@ static void qd_message_parse_priority(qd_message_t *in_msg) qd_parsed_field_t *priority_field = qd_parse_sub_value(field, 1); if (qd_parse_tag(priority_field) != QD_AMQP_NULL) { uint32_t value = qd_parse_as_uint(priority_field); -content->priority = value > QDR_MAX_PRIORITY ? QDR_MAX_PRIORITY : (uint8_t) (value & 0x00ff); -content->priority_present = true; +value = MIN(value, QDR_MAX_PRIORITY); +sys_atomic_set(&content->priority, value); } } } @@ -1022,8 +1021,15 @@ qd_message_t *qd_message() ZERO(msg->content); msg->content->lock = sys_mutex(); -sys_atomic_init(&msg->content->ref_count, 1); sys_atomic_init(&msg->content->aborted, 0); +sys_atomic_init(&msg->content->discard, 0); +sys_atomic_init(&msg->content->ma_stream, 0); +sys_atomic_init(&msg->content->no_body, 0); +sys_atomic_init(&msg->content->oversize, 0); +sys_atomic_init(&msg->content->priority, QDR_DEFAULT_PRIORITY); +sys_atomic_init(&msg->content->priority_parsed, 0); +sys_atomic_init(&msg->content->receive_complete, 0); +sys_atomic_init(&msg->content->ref_count, 1); msg->content->parse_depth = QD_DEPTH_NONE; return (qd_message_t*) msg; } @@ -1040,6 +1046,8 @@ void qd_message_free(qd_message_t *in_msg) qd_buffer_list_free_buffers(&msg->ma_trace); qd_buffer_list_free_buffers(&msg->ma_ingress); +sys_atomic_destroy(&msg->send_complete); + qd_message_content_t *content = msg->content; if (msg->is_fanout) { @@ -1104,6 +1112
[qpid-dispatch] branch main updated: DISPATCH-2205: Do not process MAU after test shutdown in effect
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new 49e37aa DISPATCH-2205: Do not process MAU after test shutdown in effect 49e37aa is described below commit 49e37aa2c775a21b31f1f2eb5742f5ca78c26272 Author: Chuck Rolke AuthorDate: Wed Jul 21 12:25:22 2021 -0400 DISPATCH-2205: Do not process MAU after test shutdown in effect A legitimate MAU will arrive after "mobile_address" is closed. Stop processing updates after test success is declared. --- tests/system_tests_routing_protocol.py | 4 1 file changed, 4 insertions(+) diff --git a/tests/system_tests_routing_protocol.py b/tests/system_tests_routing_protocol.py index 7360177..f97584e 100644 --- a/tests/system_tests_routing_protocol.py +++ b/tests/system_tests_routing_protocol.py @@ -135,6 +135,7 @@ class RejectHigherVersionMARTest(MessagingHandler): self.receiver= None self.hello_count = 0 self.mar_count = 0 +self.finished= False def timeout(self): self.error = "Timeout Expired - hello_count: %d, mar_count: %d" % (self.hello_count, self.mar_count) @@ -168,6 +169,8 @@ class RejectHigherVersionMARTest(MessagingHandler): self.sender.target.capabilities.put_object(symbol("qd.router")) def on_message(self, event): +if self.finished: +return opcode = event.message.properties['opcode'] body = event.message.body rid= body['id'] @@ -183,6 +186,7 @@ class RejectHigherVersionMARTest(MessagingHandler): elif opcode == 'RA': if self.mar_count > 2: +self.finished = True self.fail(None) return - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch main updated: DISPATCH-2199: Container link handlers to use qd_alloc safe_ptr functions
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new 2b34a16 DISPATCH-2199: Container link handlers to use qd_alloc safe_ptr functions 2b34a16 is described below commit 2b34a1687911d89225d87a63786862dac9f57992 Author: Chuck Rolke AuthorDate: Fri Jul 16 16:41:31 2021 -0400 DISPATCH-2199: Container link handlers to use qd_alloc safe_ptr functions * use qd_alloc_deref_safe_ptr in cleanup_link * use qd_alloc_set_safe_ptr in qd_link_set_incoming_msg This closes #1300 --- src/container.c | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/container.c b/src/container.c index b3fa7d4..dc154eb 100644 --- a/src/container.c +++ b/src/container.c @@ -355,9 +355,10 @@ static void cleanup_link(qd_link_t *link) link->pn_sess = 0; // cleanup any inbound message that has not been forwarded -qd_message_t *msg = link->incoming_msg.ptr; -if (msg && qd_alloc_sequence(msg) == link->incoming_msg.seq) +qd_message_t *msg = qd_alloc_deref_safe_ptr(&link->incoming_msg); +if (msg) { qd_message_free(msg); +} } } @@ -1224,8 +1225,7 @@ void qd_session_cleanup(qd_connection_t *qd_conn) void qd_link_set_incoming_msg(qd_link_t *link, qd_message_t *msg) { if (msg) { -link->incoming_msg.ptr = (void*) msg; -link->incoming_msg.seq = qd_alloc_sequence(msg); +qd_alloc_set_safe_ptr(&link->incoming_msg, msg); } else { qd_nullify_safe_ptr(&link->incoming_msg); } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch main updated: DISPATCH-1772: Add logging to fallback dest self test
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new 8c8829b DISPATCH-1772: Add logging to fallback dest self test 8c8829b is described below commit 8c8829b88386e590e2fe14278fa2f2ece8e2a44c Author: Chuck Rolke AuthorDate: Wed Jul 14 15:01:43 2021 -0400 DISPATCH-1772: Add logging to fallback dest self test * This patch instruments the SwitchoverTest where most of the test failures show up * The on_message handler is broken up to explicitly handle receiving from primary and fallback receivers in phases 0 and 1 * Add sender drain handling This closes #1298 --- tests/system_tests_fallback_dest.py | 178 1 file changed, 118 insertions(+), 60 deletions(-) diff --git a/tests/system_tests_fallback_dest.py b/tests/system_tests_fallback_dest.py index db51172..85e6c89 100644 --- a/tests/system_tests_fallback_dest.py +++ b/tests/system_tests_fallback_dest.py @@ -20,6 +20,7 @@ from proton import Message, symbol from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, TestTimeout from system_test import unittest +from system_test import Logger from proton.handlers import MessagingHandler from proton.reactor import Container @@ -258,113 +259,113 @@ class RouterTest(TestCase): self.assertIsNone(test.error) def test_25_switchover_same_edge(self): -test = SwitchoverTest(self.ROUTER_EA1, - self.ROUTER_EA1, - self.ROUTER_EA1, +test = SwitchoverTest([self.ROUTER_EA1, "EA1"], + [self.ROUTER_EA1, "EA1"], + [self.ROUTER_EA1, "EA1"], 'dest.25') test.run() self.assertIsNone(test.error) def test_26_switchover_same_interior(self): -test = SwitchoverTest(self.ROUTER_INTA, - self.ROUTER_INTA, - self.ROUTER_INTA, +test = SwitchoverTest([self.ROUTER_INTA, "INTA"], + [self.ROUTER_INTA, "INTA"], + [self.ROUTER_INTA, "INTA"], 'dest.26') test.run() self.assertIsNone(test.error) def test_27_switchover_local_edge_alt_remote_interior(self): -test = SwitchoverTest(self.ROUTER_EA1, - self.ROUTER_INTA, - self.ROUTER_EA1, +test = SwitchoverTest([self.ROUTER_EA1, "EA1"], + [self.ROUTER_INTA, "INTA"], + [self.ROUTER_EA1, "EA1"], 'dest.27') test.run() self.assertIsNone(test.error) def test_28_switchover_local_edge_alt_remote_edge(self): -test = SwitchoverTest(self.ROUTER_EA1, - self.ROUTER_EB1, - self.ROUTER_EA1, +test = SwitchoverTest([self.ROUTER_EA1, "EA1"], + [self.ROUTER_EB1, "EB1"], + [self.ROUTER_EA1, "EA1"], 'dest.28') test.run() self.assertIsNone(test.error) def test_29_switchover_local_edge_pri_remote_interior(self): -test = SwitchoverTest(self.ROUTER_EA1, - self.ROUTER_EA1, - self.ROUTER_INTA, +test = SwitchoverTest([self.ROUTER_EA1, "EA1"], + [self.ROUTER_EA1, "EA1"], + [self.ROUTER_INTA, "INTA"], 'dest.29') test.run() self.assertIsNone(test.error) def test_30_switchover_local_interior_pri_remote_edge(self): -test = SwitchoverTest(self.ROUTER_EA1, - self.ROUTER_EA1, - self.ROUTER_EB1, +test = SwitchoverTest([self.ROUTER_EA1, "EA1"], + [self.ROUTER_EA1, "EA1"], + [self.ROUTER_EB1, "EB1"], 'dest.30') test.run() self.assertIsNone(test.error) def test_31_switchover_local_interior_alt_remote_interior(self): -test = SwitchoverTest(self.ROUTER_INTB, - self.ROUTER_INTA, - self.ROUTER_INTB, +test = SwitchoverTest([self.ROUTER_INTB, "INTB"], + [self.ROUTER_INTA,
[qpid-dispatch] branch main updated: DISPATCH-2194: Fix CONNECTOR - ENTITY_CACHE lock inversion
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new 269bfd4 DISPATCH-2194: Fix CONNECTOR - ENTITY_CACHE lock inversion 269bfd4 is described below commit 269bfd46f3ac43ef6d1199f8cb271d117fd1ed9a Author: Chuck Rolke AuthorDate: Fri Jul 9 16:26:59 2021 -0400 DISPATCH-2194: Fix CONNECTOR - ENTITY_CACHE lock inversion The two locks are taken in both orders. * entity_cache first, connector second is routinely used by management entity updates in a pattern shared by all entities. * connector first, entity_cache second is used only when a connector creates or deletes an associated connection. The allocation and disposal of the connection causes an implicit entity_cache lock. The fix is to avoid the connector - entity_cache lock order by allocating the connection before taking the connector lock and by freeing the connection after releasing the connector lock. This patch also corrects an improper free call in a failure path. --- include/qpid/dispatch/server.h | 29 +++ src/connection_manager.c | 9 -- src/server.c | 66 +++--- 3 files changed, 90 insertions(+), 14 deletions(-) diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h index faf8366..7f71912 100644 --- a/include/qpid/dispatch/server.h +++ b/include/qpid/dispatch/server.h @@ -569,6 +569,35 @@ void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_t call, vo /** + * Schedule a call to be invoked on a thread that has ownership of this connection + * when it will be safe for the callback to perform operations related to this connection. + * A qd_deferred_call_t object has been allocated before hand to avoid taking + * the ENTITY_CACHE lock. + * + * @param conn Connection object + * @param call The function to be invoked on the connection's thread + * @param context The context to be passed back in the callback + * @param dct Pointer to preallocated qd_deferred_call_t object + */ +void qd_connection_invoke_deferred_impl(qd_connection_t *conn, qd_deferred_t call, void *context, void *dct); + + +/** + * Allocate a qd_deferred_call_t object + */ +void *qd_connection_new_qd_deferred_call_t(); + + +/** + * Deallocate a qd_deferred_call_t object + * + * @param dct Pointer to preallocated qd_deferred_call_t object + */ +void qd_connection_free_qd_deferred_call_t(void *dct); + + + +/** * Listen for incoming connections, return true if listening succeeded. */ bool qd_listener_listen(qd_listener_t *l); diff --git a/src/connection_manager.c b/src/connection_manager.c index 6ef14cd..c77999b 100644 --- a/src/connection_manager.c +++ b/src/connection_manager.c @@ -1081,16 +1081,19 @@ void qd_connection_manager_delete_connector(qd_dispatch_t *qd, void *impl) // cannot free the timer while holding ct->lock since the // timer callback may be running during the call to qd_timer_free qd_timer_t *timer = 0; +void *dct = qd_connection_new_qd_deferred_call_t(); sys_mutex_lock(ct->lock); timer = ct->timer; ct->timer = 0; ct->state = CXTR_STATE_DELETED; qd_connection_t *conn = ct->qd_conn; if (conn && conn->pn_conn) { -qd_connection_invoke_deferred(conn, deferred_close, conn->pn_conn); +qd_connection_invoke_deferred_impl(conn, deferred_close, conn->pn_conn, dct); +sys_mutex_unlock(ct->lock); +} else { +sys_mutex_unlock(ct->lock); +qd_connection_free_qd_deferred_call_t(dct); } -sys_mutex_unlock(ct->lock); - qd_timer_free(timer); DEQ_REMOVE(qd->connection_manager->connectors, ct); qd_connector_decref(ct); diff --git a/src/server.c b/src/server.c index 4910f37..24e73ee 100644 --- a/src/server.c +++ b/src/server.c @@ -561,11 +561,13 @@ static void connection_wake(qd_connection_t *ctx) { if (ctx->pn_conn) pn_connection_wake(ctx->pn_conn); } -/* Construct a new qd_connection. Thread safe. */ -qd_connection_t *qd_server_connection(qd_server_t *server, qd_server_config_t *config) +/* Construct a new qd_connection. Thread safe. + * Does not allocate any managed objects and therefore + * does not take ENTITY_CACHE lock. + */ +qd_connection_t *qd_server_connection_impl(qd_server_t *server, qd_server_config_t *config, qd_connection_t *ctx) { -qd_connection_t *ctx = new_qd_connection_t(); -if (!ctx) return NULL; +assert(ctx); ZERO(ctx); ctx->pn_conn = pn_connection(); ctx->deferred_call_lock = sys_mutex(); @@ -574,7 +576,7 @@ qd_connection_t *qd_server_connection(qd
[qpid-dispatch] branch main updated: NO-JIRA: Replace tab characters with spaces
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new d7688f4 NO-JIRA: Replace tab characters with spaces d7688f4 is described below commit d7688f473459909288bc833be25d801ed1849596 Author: Chuck Rolke AuthorDate: Fri Jul 2 10:09:31 2021 -0400 NO-JIRA: Replace tab characters with spaces --- src/adaptors/tcp_adaptor.c | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index af5faa6..a5bd0d4 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -79,8 +79,8 @@ struct qdr_tcp_connection_t { bool egress_dispatcher; bool connector_closed;//only used if egress_dispatcher=true bool in_list; // This connection is in the adaptor's connections list -sys_atomic_t raw_closed_read; // proton event seen -sys_atomic_t raw_closed_write; // proton event seen or write_close called +sys_atomic_t raw_closed_read; // proton event seen +sys_atomic_t raw_closed_write; // proton event seen or write_close called bool raw_read_shutdown; // stream closed bool read_eos_seen; qdr_delivery_t *initial_delivery; @@ -381,7 +381,7 @@ static int handle_incoming(qdr_tcp_connection_t *conn, const char *msg) int count = handle_incoming_raw_read(conn, &buffers); // Grant more buffers to proton for reading if read side is still open - grant_read_buffers(conn); +grant_read_buffers(conn); // Push the bytes just read into the streaming message if (count > 0) { @@ -721,7 +721,7 @@ static void qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc) "", // *ssl_cipher, "", // *user, "TcpAdaptor",// *container, - tcp_conn_properties, // *connection_properties, + tcp_conn_properties, // *connection_properties, 0, // ssl_ssf, false, // ssl, "", // peer router version, @@ -983,7 +983,7 @@ static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc) "", //const char *ssl_cipher, "", //const char *user, "TcpAdaptor",//const char *container, - tcp_conn_properties,// pn_data_t *connection_properties, + tcp_conn_properties,// pn_data_t *connection_properties, 0, //int ssl_ssf, false, //bool ssl, "", // peer router version, @@ -1003,7 +1003,7 @@ static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc) 250, // link_capacity 0, // vhost 0, // policy_spec - info, // connection_info + info,// connection_info 0, // context_binder 0); // bind_token tc->qdr_conn = conn; - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch main updated: DISPATCH-2163: Add C include guards to generated config.h files
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new 5bd4ca5 DISPATCH-2163: Add C include guards to generated config.h files 5bd4ca5 is described below commit 5bd4ca594ee2593ff69700d9203633d7c4bbe83e Author: Chuck Rolke AuthorDate: Tue Jun 29 10:32:46 2021 -0400 DISPATCH-2163: Add C include guards to generated config.h files --- router/src/config.h.in | 3 +++ src/config.h.in| 3 +++ 2 files changed, 6 insertions(+) diff --git a/router/src/config.h.in b/router/src/config.h.in index 8e09608..4a61371 100644 --- a/router/src/config.h.in +++ b/router/src/config.h.in @@ -1,3 +1,5 @@ +#ifndef __router_src_config_h_in__ +#define __router_src_config_h_in__ /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -21,3 +23,4 @@ #cmakedefine DEFAULT_CONFIG_PATH "${DEFAULT_CONFIG_PATH}" #cmakedefine QPID_DISPATCH_HOME_INSTALLED "${QPID_DISPATCH_HOME_INSTALLED}" +#endif // __router_src_config_h_in__ diff --git a/src/config.h.in b/src/config.h.in index f558215..d162224 100644 --- a/src/config.h.in +++ b/src/config.h.in @@ -1,3 +1,5 @@ +#ifndef __src_config_h_in__ +#define __src_config_h_in__ /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -21,3 +23,4 @@ #define QPID_CONSOLE_STAND_ALONE_INSTALL_DIR "${CONSOLE_STAND_ALONE_INSTALL_DIR}" #cmakedefine01 QD_MEMORY_STATS #cmakedefine01 QD_HAVE_GETRLIMIT +#endif // __src_config_h_in__ - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch main updated: DISPATCH-2108: Aggregate statistics for TCP listeners and connectors
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new e401cb1 DISPATCH-2108: Aggregate statistics for TCP listeners and connectors e401cb1 is described below commit e401cb19bc243bd240eb61cae9a1bef9fdd2e549 Author: Chuck Rolke AuthorDate: Wed Jun 16 15:32:09 2021 -0400 DISPATCH-2108: Aggregate statistics for TCP listeners and connectors This commit adds counters * bytesIn * bytesOut * connectionsOpened * connectionsClosed to tcpListener and tcpConnector entities. Individual bytesIn and bytesOut for tcpConnections are tracked in the connection entity. The new aggregated statistics are running totals for all connections created through the listener and connector entities. Internally the tcp_bridge_config structure was renamed to tcp_bridge as the struct is not just config any more. The qd_tcp_connection no longer holds the actual structure in 'config'. Instead it holds a pointer to the repurposed structure in 'bridge'. qd_tcp_bridge_t is now a pooled object. Its lifetime is controlled by a ref count. A bridge is created synchronously with a listener or a connector and those entities hold the first reference. Tcp connections hold a reference to the bridge and target that bridge for ongoing statistics accumulation. With the ref counting a connection may be created, say, through a listener. Then before the connection is closed the listener may be deleted. The ref count keeps the bridge object alive so that the connection statistics are still aggregated. Only when the bridge object loses all of its references is the bridge finally disposed. Bridge objects dump their final statistics to the TCP_ADAPTOR log at INFO level. This commit also creates LOCK and UNLOCK macros to aid developers trying to read the code. A self test is added to make sure the entity attributes are present and hold reasonable values. This closes #2144 --- python/qpid_dispatch/management/qdrouter.json | 41 +- src/adaptors/tcp_adaptor.c| 192 +++--- src/adaptors/tcp_adaptor.h| 34 +++-- tests/system_tests_tcp_adaptor.py | 41 ++ 4 files changed, 244 insertions(+), 64 deletions(-) diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index 0441752..2619ad0 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -1279,6 +1279,26 @@ "required": false, "description": "Used to identify where connection is handled.", "create": true +}, +"connectionsOpened": { +"type": "integer", +"graph": true, +"description": "The number of connections opened by this listener." +}, +"connectionsClosed": { +"type": "integer", +"graph": true, +"description": "The number of connections closed by this listener." +}, +"bytesIn": { +"type": "integer", +"graph": true, +"description": "The number of bytes sent from clients to servers on all connections to this listener." +}, +"bytesOut": { +"type": "integer", +"graph": true, +"description": "The number of bytes sent from servers to clients on all connections to this listener." } } }, @@ -1302,13 +1322,32 @@ "description": "Port number or symbolic service name.", "type": "string", "create": true - }, "siteId": { "type": "string", "required": false, "description": "Used to identify origin of connections.", "create": true +}, +"connectionsOpened": { +"type": "integer", +"g
[qpid-dispatch] branch main updated: DISPATCH-2097: Identify TCP connections with property qd.adaptor=tcp
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new 00a2b3b DISPATCH-2097: Identify TCP connections with property qd.adaptor=tcp 00a2b3b is described below commit 00a2b3ba484a0d991956e35fce84ae5c93bd5cd4 Author: Chuck Rolke AuthorDate: Thu Jun 10 11:06:42 2021 -0400 DISPATCH-2097: Identify TCP connections with property qd.adaptor=tcp This closes #1254 --- include/qpid/dispatch/amqp.h | 2 ++ src/adaptors/tcp_adaptor.c | 21 +++-- src/amqp.c | 2 ++ 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h index 6c1064f..9b8665a 100644 --- a/include/qpid/dispatch/amqp.h +++ b/include/qpid/dispatch/amqp.h @@ -168,6 +168,8 @@ extern const char * const QD_CONNECTION_PROPERTY_FAILOVER_NETHOST_KEY; extern const char * const QD_CONNECTION_PROPERTY_FAILOVER_PORT_KEY; extern const char * const QD_CONNECTION_PROPERTY_FAILOVER_SCHEME_KEY; extern const char * const QD_CONNECTION_PROPERTY_FAILOVER_HOSTNAME_KEY; +extern const char * const QD_CONNECTION_PROPERTY_ADAPTOR_KEY; +extern const char * const QD_CONNECTION_PROPERTY_TCP_ADAPTOR_VALUE; /// @} /** @name Terminus Addresses */ diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index c262088..51cce49 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -25,6 +25,7 @@ #include "qpid/dispatch/ctools.h" #include "qpid/dispatch/protocol_adaptor.h" +#include #include #include #include @@ -671,6 +672,22 @@ static char *get_address_string(pn_raw_connection_t *socket) } } +static pn_data_t * qdr_tcp_conn_properties() +{ + // Return a new tcp connection properties map. +pn_data_t *props = pn_data(0); +pn_data_put_map(props); +pn_data_enter(props); +pn_data_put_symbol(props, + pn_bytes(strlen(QD_CONNECTION_PROPERTY_ADAPTOR_KEY), + QD_CONNECTION_PROPERTY_ADAPTOR_KEY)); +pn_data_put_string(props, + pn_bytes(strlen(QD_CONNECTION_PROPERTY_TCP_ADAPTOR_VALUE), + QD_CONNECTION_PROPERTY_TCP_ADAPTOR_VALUE)); +pn_data_exit(props); +return props; +} + static void qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc) { allocate_tcp_write_buffer(&tc->write_buffer); @@ -687,7 +704,7 @@ static void qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc) "", // *ssl_cipher, "", // *user, "TcpAdaptor",// *container, - 0, // *connection_properties, + qdr_tcp_conn_properties(), // *connection_properties, 0, // ssl_ssf, false, // ssl, "", // peer router version, @@ -935,7 +952,7 @@ static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc) "", //const char *ssl_cipher, "", //const char *user, "TcpAdaptor",//const char *container, - 0, //pn_data_t *connection_properties, + qdr_tcp_conn_properties(),// pn_data_t*connection_properties, 0, //int ssl_ssf, false, //bool ssl, "", // peer router version, diff --git a/src/amqp.c b/src/amqp.c index 4dee1f5..e9defc4 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -65,6 +65,8 @@ const char * const QD_CONNECTION_PROPERTY_FAILOVER_NETHOST_KEY = "network-host" const char * const QD_CONNECTION_PROPERTY_FAILOVER_PORT_KEY = "port"; const char * const QD_CONNECTION_PROPERTY_FAILOVER_SCHEME_KEY = "scheme"; const char * const QD_CONNECTION_PROPERTY_FAILOVER_HOSTNAME_KEY = "hostname"; +const char * const QD_CONNECTION_PROPERTY_ADAPTOR_KEY
[qpid-dispatch] branch main updated: DISPATCH-1878: Account for new dispositions used for flow control
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new 9e87b89 DISPATCH-1878: Account for new dispositions used for flow control 9e87b89 is described below commit 9e87b8994d29cfba97789a246e5df20c4ac29cab Author: Chuck Rolke AuthorDate: Tue Jun 8 10:48:12 2021 -0400 DISPATCH-1878: Account for new dispositions used for flow control Non-terminal RECEIVED dispositions used for TCP flow control generate new disposition states. This patch closes the TCP connection upon receiving RELEASED settlement and not on MODIFIED settlement. --- src/adaptors/tcp_adaptor.c | 11 +-- src/router_node.c | 1 - 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 940d65f..c262088 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -1485,12 +1485,11 @@ static void qdr_tcp_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t DLV_FMT" qdr_tcp_delivery_update: disp: %"PRIu64", settled: %s", DLV_ARGS(dlv), disp, settled ? "true" : "false"); -if (settled) { -// the only settlement occurs when the initial delivery is -// settled, which occurs when the connector is unable to -// connect to the configured tcp endpoint, so in this case -// we can just close the connection -// (The end of the message is used to convey half closed status) +if (settled && disp == PN_RELEASED) { +// When the connector is unable to connect to a tcp endpoint it will +// release the message. We handle that here by closing the connection. +// Half-closed status is signalled by read_eos_seen and is not +// sufficient by itself to force a connection closure. qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, DLV_FMT" qdr_tcp_delivery_update: call pn_raw_connection_close()", DLV_ARGS(dlv)); diff --git a/src/router_node.c b/src/router_node.c index 44f13cf..6df6679 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -2067,7 +2067,6 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di if (disp && !pn_delivery_settled(pnd)) { uint64_t ignore = 0; qd_delivery_state_t *dstate = qdr_delivery_take_local_delivery_state(dlv, &ignore); -assert(ignore == disp); // expected: since both are from the same dlv // update if the disposition has changed or there is new state associated with it if (disp != pn_delivery_local_state(pnd) || dstate) { - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch main updated: DISPATCH-2166: Suppress tsan race in qd_message_set_send_complete
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new a44c5a0 DISPATCH-2166: Suppress tsan race in qd_message_set_send_complete a44c5a0 is described below commit a44c5a0c13d9dbb5e9bdd2dd8ef0239b68f1f4c3 Author: Chuck Rolke AuthorDate: Mon Jun 7 10:18:57 2021 -0400 DISPATCH-2166: Suppress tsan race in qd_message_set_send_complete --- tests/tsan.supp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/tsan.supp b/tests/tsan.supp index 8b2d294..4bf3c4e 100644 --- a/tests/tsan.supp +++ b/tests/tsan.supp @@ -79,6 +79,9 @@ race:qdr_delivery_mcast_outbound_disposition_CT # DISPATCH-2157 race:^qd_message_send$ +# DISPATCH-2166 +race:qd_message_set_send_complete + # # External libraries # - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch main updated: DISPATCH-2136: Fix tsan race: convert message aborted flag to atomic
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new 5ad0968 DISPATCH-2136: Fix tsan race: convert message aborted flag to atomic 5ad0968 is described below commit 5ad09687d17259ebb79550f1af46035613005100 Author: Chuck Rolke AuthorDate: Thu May 27 10:22:09 2021 -0400 DISPATCH-2136: Fix tsan race: convert message aborted flag to atomic The message aborted flag is read and written by I/O and core threads. Conversion from a naked bool to a sys_atomic ensures consistent state. This closes #1235 --- src/message.c | 33 ++--- src/message_private.h | 2 +- tests/tsan.supp | 3 --- 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/src/message.c b/src/message.c index f345da3..37d8370 100644 --- a/src/message.c +++ b/src/message.c @@ -45,6 +45,10 @@ #define LOCK sys_mutex_lock #define UNLOCK sys_mutex_unlock +// Implement bool flags with atomic variables +#defineSET_FLAG(flag) sys_atomic_set(flag, 1) +#define IS_FLAG_SET(flag) (sys_atomic_get(flag) == 1) + const char *STR_AMQP_NULL = "null"; const char *STR_AMQP_TRUE = "T"; const char *STR_AMQP_FALSE = "F"; @@ -1004,6 +1008,7 @@ qd_message_t *qd_message() ZERO(msg->content); msg->content->lock = sys_mutex(); sys_atomic_init(&msg->content->ref_count, 1); +sys_atomic_init(&msg->content->aborted, 0); msg->content->parse_depth = QD_DEPTH_NONE; return (qd_message_t*) msg; } @@ -1083,6 +1088,7 @@ void qd_message_free(qd_message_t *in_msg) qd_buffer_free(content->pending); sys_mutex_free(content->lock); +sys_atomic_destroy(&content->aborted); free_qd_message_content_t(content); } @@ -1391,13 +1397,15 @@ qd_message_t *discard_receive(pn_delivery_t *delivery, break; } else if (rc == PN_EOS || rc < 0) { // End of message or error: finalize message_receive handling -msg->content->aborted = pn_delivery_aborted(delivery); +if (pn_delivery_aborted(delivery)) { +SET_FLAG(&msg->content->aborted); +} pn_record_t *record = pn_delivery_attachments(delivery); pn_record_set(record, PN_DELIVERY_CTX, 0); if (msg->content->oversize) { // Aborting the content disposes of downstream copies. // This has no effect on the received message. -msg->content->aborted = true; +SET_FLAG(&msg->content->aborted); } qd_message_set_receive_complete((qd_message_t*) msg); break; @@ -1519,8 +1527,9 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) content->receive_complete = true; content->q2_unblocker.handler = 0; qd_nullify_safe_ptr(&content->q2_unblocker.context); -content->aborted = pn_delivery_aborted(delivery); - +if (pn_delivery_aborted(delivery)) { +SET_FLAG(&msg->content->aborted); +} // unlink message and delivery pn_record_set(record, PN_DELIVERY_CTX, 0); } @@ -1764,11 +1773,11 @@ void qd_message_send(qd_message_t *in_msg, if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) { -if (content->aborted) { +if (IS_FLAG_SET(&content->aborted)) { // Message is aborted before any part of it has been sent. // Declare the message to be sent, msg->send_complete = true; -// the link has an outgoing deliver. abort it. +// If the outgoing delivery is not already aborted then abort it. if (!pn_delivery_aborted(pn_link_current(pnl))) { pn_delivery_abort(pn_link_current(pnl)); } @@ -1871,7 +1880,7 @@ void qd_message_send(qd_message_t *in_msg, pn_session_t *pns= pn_link_session(pnl); const size_t q3_upper = BUFFER_SIZE * QD_QLIMIT_Q3_UPPER; -while (!content->aborted +while (!IS_FLAG_SET(&content->aborted) && buf && pn_session_outgoing_bytes(pns) < q3_upper) { @@ -1893,7 +1902,7 @@ void qd_message_send(qd_message_t *in_msg, // send error - likely the link has failed and we will eventually // get a link detach event for this link // -content->aborted = true; +SET_FLAG(&content->aborted); msg->send_complete = true; if (!pn_delivery_aborte
[qpid-dispatch] branch main updated: DISPATCH-2154: Disallow setting message abort flag to False
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new 87282a6 DISPATCH-2154: Disallow setting message abort flag to False 87282a6 is described below commit 87282a6fe2ed10799c3389e80be672c2c3823528 Author: Chuck Rolke AuthorDate: Tue May 25 15:21:35 2021 -0400 DISPATCH-2154: Disallow setting message abort flag to False Messages are created in the non-aborted state. The control interface is modified to allow only setting the aborted state to True. --- include/qpid/dispatch/message.h | 3 +-- src/adaptors/http1/http1_server.c | 2 +- src/message.c | 4 ++-- src/router_core/connections.c | 6 +++--- src/router_core/delivery.c| 4 ++-- src/router_core/delivery.h| 2 +- 6 files changed, 10 insertions(+), 11 deletions(-) diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index 82c62cf..5a66db3 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -594,9 +594,8 @@ bool qd_message_aborted(const qd_message_t *msg); /** * Set the aborted flag on the message. * @param msg A pointer to the message - * @param aborted */ -void qd_message_set_aborted(const qd_message_t *msg, bool aborted); +void qd_message_set_aborted(const qd_message_t *msg); /** * Return message priority diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c index cdc137a..4b05d47 100644 --- a/src/adaptors/http1/http1_server.c +++ b/src/adaptors/http1/http1_server.c @@ -714,7 +714,7 @@ static bool _process_request(_server_request_t *hreq) while (rmsg) { if (rmsg->dlv) { qd_message_set_receive_complete(qdr_delivery_message(rmsg->dlv)); -qdr_delivery_set_aborted(rmsg->dlv, true); +qdr_delivery_set_aborted(rmsg->dlv); } _server_response_msg_free(hreq, rmsg); rmsg = DEQ_HEAD(hreq->responses); diff --git a/src/message.c b/src/message.c index 424b5be..4198599 100644 --- a/src/message.c +++ b/src/message.c @@ -2892,12 +2892,12 @@ bool qd_message_aborted(const qd_message_t *msg) return ((qd_message_pvt_t *)msg)->content->aborted; } -void qd_message_set_aborted(const qd_message_t *msg, bool aborted) +void qd_message_set_aborted(const qd_message_t *msg) { if (!msg) return; qd_message_pvt_t * msg_pvt = (qd_message_pvt_t *)msg; -msg_pvt->content->aborted = aborted; +msg_pvt->content->aborted = true; } diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 2857906..bf462f3 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -893,7 +893,7 @@ void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *conn, qd } if (!qdr_delivery_receive_complete(dlv)) { -qdr_delivery_set_aborted(dlv, true); +qdr_delivery_set_aborted(dlv); qdr_delivery_continue_peers_CT(core, dlv, false); } @@ -942,7 +942,7 @@ void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *conn, qd DEQ_REMOVE_HEAD(settled); if (!qdr_delivery_receive_complete(dlv)) { -qdr_delivery_set_aborted(dlv, true); +qdr_delivery_set_aborted(dlv); qdr_delivery_continue_peers_CT(core, dlv, false); } @@ -977,7 +977,7 @@ static void qdr_link_abort_undelivered_CT(qdr_core_t *core, qdr_link_t *link) qdr_delivery_t *dlv = DEQ_HEAD(link->undelivered); while (dlv) { if (!qdr_delivery_receive_complete(dlv)) -qdr_delivery_set_aborted(dlv, true); +qdr_delivery_set_aborted(dlv); dlv = DEQ_NEXT(dlv); } sys_mutex_unlock(conn->work_lock); diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c index a405219..91d99da 100644 --- a/src/router_core/delivery.c +++ b/src/router_core/delivery.c @@ -117,10 +117,10 @@ void qdr_delivery_incref(qdr_delivery_t *delivery, const char *label) } -void qdr_delivery_set_aborted(const qdr_delivery_t *delivery, bool aborted) +void qdr_delivery_set_aborted(const qdr_delivery_t *delivery) { assert(delivery); -qd_message_set_aborted(delivery->msg, aborted); +qd_message_set_aborted(delivery->msg); } bool qdr_delivery_is_aborted(const qdr_delivery_t *delivery) diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h index f0bdd2b..e89bbba 100644 --- a/src/router_core/delivery.h +++ b/src/router_core/delivery.h @@ -101,7 +101,7 @@ void qdr_delivery_set_tag_sent(const qdr_delivery_t *delivery, bool tag_sent); uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery); void qdr_delivery_set_disposition(qdr_delivery
[qpid-dispatch] 02/02: DISPATCH-2100: TCP half-closed self test
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git commit 561306a4a9ae5952418713c497036f62156a90f7 Author: Chuck Rolke AuthorDate: Thu May 6 15:39:38 2021 -0400 DISPATCH-2100: TCP half-closed self test Add a self test that runs if the ncat utility is available. When available run ncat against various TCP listener and connector pairs to verify that half-closed works in a variety of network topologies. --- tests/system_tests_tcp_adaptor.py | 53 +++ 1 file changed, 53 insertions(+) diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index f60de91..5da2b6c 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -92,6 +92,20 @@ Q2_TEST_MESSAGE_SIZE = 1000 echo_timeout = 30 +def ncat_available(): +popen_args = ['ncat', '--version'] +try: +process = Process(popen_args, + name='ncat_check', + stdout=PIPE, + expect=None, + universal_newlines=True) +out = process.communicate()[0] +return True +except: +return False + + # # Test concurrent clients # @@ -881,6 +895,45 @@ class TcpAdaptor(TestCase): # Declare success self.logger.log("TCP_TEST Stop %s SUCCESS" % name) +def run_ncat(self, port, logger, expect=Process.EXIT_OK, timeout=2, data=b'abcd'): +ncat_cmd = ['ncat', '127.0.0.1', str(port)] +logger.log("Starting ncat '%s' and input '%s'" % (ncat_cmd, str(data))) +p = self.popen( +ncat_cmd, +stdin=PIPE, stdout=PIPE, stderr=PIPE, expect=expect, +universal_newlines=True) +out = p.communicate(input='abcd', timeout=timeout)[0] +try: +p.teardown() +except Exception as e: +raise Exception(out if out else str(e)) +return out + +def ncat_runner(self, tname, client, server, logger): +name = "%s_%s_%s" % (tname, client, server) +logger.log(name + " Start") +out = self.run_ncat(TcpAdaptor.tcp_client_listener_ports[client][server], logger, data=b'abcd') +logger.log("run_ncat returns: '%s'" % out) +assert(len(out) > 0) +assert("abcd" in out) +logger.log(tname + " Stop") + +# half-closed handling +def test_70_half_closed(self): +if DISABLE_SELECTOR_TESTS: +self.skipTest(DISABLE_SELECTOR_REASON) +if not ncat_available(): +self.skipTest("Ncat utility is not available") +name = "test_70_half_closed" +self.logger.log("TCP_TEST Start %s" % name) +self.ncat_runner(name, "INTA", "INTA", self.logger) +self.ncat_runner(name, "INTA", "INTB", self.logger) +self.ncat_runner(name, "INTA", "INTC", self.logger) +self.ncat_runner(name, "EA1", "EA1", self.logger) +self.ncat_runner(name, "EA1", "EB1", self.logger) +self.ncat_runner(name, "EA1", "EC2", self.logger) +self.logger.log("TCP_TEST Stop %s SUCCESS" % name) + class TcpAdaptorManagementTest(TestCase): """ - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] 01/02: DISPATCH-2100: Self test for TCP half-closed handling
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git commit 42b9bbce19320d1743ef3e063863fe8608dc2546 Author: Chuck Rolke AuthorDate: Thu May 6 15:31:50 2021 -0400 DISPATCH-2100: Self test for TCP half-closed handling This patch improves the echo server program half-closed handling. When the client socket closes ensure that data received from the client is echoed back to the client before closing the socket from the server end. --- tests/TCP_echo_server.py | 20 +++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/tests/TCP_echo_server.py b/tests/TCP_echo_server.py index 678132c..47dcb4f 100755 --- a/tests/TCP_echo_server.py +++ b/tests/TCP_echo_server.py @@ -230,7 +230,25 @@ class TcpEchoServer: split_chunk_for_display(recv_data))) sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data) else: -logger.log('%s Closing connection to %s:%d' % (self.prefix, data.addr[0], data.addr[1])) +while data.outb: +logger.log('%s Client closed: flush client input to %s:%d' % (self.prefix, data.addr[0], data.addr[1])) +try: +sent = sock.send(data.outb) +data.outb = data.outb[sent:] +except IOError: +logger.log('%s Connection to %s:%d IOError: %s' % + (self.prefix, data.addr[0], data.addr[1], traceback.format_exc())) +sel.unregister(sock) +sock.close() +return 0 +except Exception: +self.error = ('%s Connection to %s:%d exception: %s' % + (self.prefix, data.addr[0], data.addr[1], traceback.format_exc())) +logger.log(self.error) +sel.unregister(sock) +sock.close() +return 1 +logger.log('%s Client closed: closing connection to %s:%d' % (self.prefix, data.addr[0], data.addr[1])) sel.unregister(sock) sock.close() return 0 - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch main updated (c12faeb -> 561306a)
This is an automated email from the ASF dual-hosted git repository. chug pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git. from c12faeb DISPATCH-1539 Require Python >= 3.6.8 since Proton 0.35 is dropping Python 2 (#1216) new 42b9bbc DISPATCH-2100: Self test for TCP half-closed handling new 561306a DISPATCH-2100: TCP half-closed self test The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: tests/TCP_echo_server.py | 20 ++- tests/system_tests_tcp_adaptor.py | 53 +++ 2 files changed, 72 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch main updated: DISPATCH-2056: Do not reference qdr connection after it is closed (2)
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new fcc035c DISPATCH-2056: Do not reference qdr connection after it is closed (2) fcc035c is described below commit fcc035c6ef6a84c7418c777a18be5723458f1cce Author: Chuck Rolke AuthorDate: Mon May 10 10:44:49 2021 -0400 DISPATCH-2056: Do not reference qdr connection after it is closed (2) Fix a second instance of same problem that was fixed in commit 5a1abe9 --- src/adaptors/tcp_adaptor.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 435c3b9..1175e5f 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -154,8 +154,9 @@ static void on_activate(void *context) qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] on_activate", conn->conn_id); while (qdr_connection_process(conn->qdr_conn)) {} if (conn->egress_dispatcher && conn->connector_closed) { -qdr_connection_closed(conn->qdr_conn); qdr_connection_set_context(conn->qdr_conn, 0); +qdr_connection_closed(conn->qdr_conn); +conn->qdr_conn = 0; free_qdr_tcp_connection(conn); } } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch main updated: DISPATCH-2056: Do not reference qdr connection after it is closed
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new cff75f9 DISPATCH-2056: Do not reference qdr connection after it is closed cff75f9 is described below commit cff75f9fceff73f7be0cdb2d6590f3a95e4c4281 Author: Chuck Rolke AuthorDate: Fri May 7 17:22:23 2021 -0400 DISPATCH-2056: Do not reference qdr connection after it is closed This closes #1201 --- src/adaptors/tcp_adaptor.c | 9 + 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 92b9774..435c3b9 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -489,15 +489,16 @@ static void handle_disconnected(qdr_tcp_connection_t* conn) conn->conn_id, conn->outgoing_id); qdr_link_detach(conn->outgoing, QD_LOST, 0); } -if (conn->qdr_conn) { -qdr_connection_closed(conn->qdr_conn); -qdr_connection_set_context(conn->qdr_conn, 0); -} if (conn->initial_delivery) { qdr_delivery_remote_state_updated(tcp_adaptor->core, conn->initial_delivery, PN_RELEASED, true, 0, false); qdr_delivery_decref(tcp_adaptor->core, conn->initial_delivery, "tcp-adaptor.handle_disconnected - initial_delivery"); conn->initial_delivery = 0; } +if (conn->qdr_conn) { +qdr_connection_set_context(conn->qdr_conn, 0); +qdr_connection_closed(conn->qdr_conn); +conn->qdr_conn = 0; +} //need to free on core thread to avoid deleting while in use by management agent qdr_action_t *action = qdr_action(qdr_del_tcp_connection_CT, "delete_tcp_connection"); - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch main updated: DISPATCH-2104: Fix display of plain number in qdstat
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new a5267f5 DISPATCH-2104: Fix display of plain number in qdstat a5267f5 is described below commit a5267f599ab00b2ca4e0c6583f0f7064ec36aa87 Author: Chuck Rolke AuthorDate: Fri May 7 09:29:36 2021 -0400 DISPATCH-2104: Fix display of plain number in qdstat This closes #1193 --- tools/qdstat.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/qdstat.in b/tools/qdstat.in index 0cba019..f94ed52 100755 --- a/tools/qdstat.in +++ b/tools/qdstat.in @@ -652,7 +652,7 @@ class BusManager(object): row = [] row.append(al.address) row.append(al.direction) -row.append(al.phase) +row.append(PlainNum(al.phase)) row.append(al.externalAddress) row.append(al.linkRef) row.append(al.operStatus) - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch main updated: DISPATCH-2091: Ignore RX window when raw connection is write_closed
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new 7ef0a0f DISPATCH-2091: Ignore RX window when raw connection is write_closed 7ef0a0f is described below commit 7ef0a0f0262a830df35d3bd39b9346c5db56940b Author: Chuck Rolke AuthorDate: Fri Apr 30 10:13:25 2021 -0400 DISPATCH-2091: Ignore RX window when raw connection is write_closed When TCP adaptor receives an EOS in the incoming streaming link then it calls pn_raw_write_close to the corresponding PN raw connection. This sends a FIN to the raw connection peer. If the adaptor then continues to honor the RX window closure then it never reads data from the raw connection that now has a close in progress. Further, when the incoming TCP window is full then the raw connection never returns the client's FIN and the connection is stuck open. This patch causes the adaptor to override backpressure flow control and to keep reading from a raw connection after a write_close has been effected. This closes #1177 --- src/adaptors/tcp_adaptor.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 4a260bb..e75f1ee 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -231,7 +231,7 @@ static int handle_incoming_raw_read(qdr_tcp_connection_t *conn, qd_buffer_list_t int free_count = 0; const bool was_open = conn->bytes_unacked < TCP_MAX_CAPACITY; -while ((count + conn->bytes_unacked < TCP_MAX_CAPACITY) +while ((conn->raw_closed_write || count + conn->bytes_unacked < TCP_MAX_CAPACITY) && (n = pn_raw_connection_take_read_buffers(conn->pn_raw_conn, raw_buffers, READ_BUFFERS)) ) { for (size_t i = 0; i < n && raw_buffers[i].bytes; ++i) { - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch main updated: DISPATCH-2088: Free message_stream_data objects on connection thread
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new 18bce61 DISPATCH-2088: Free message_stream_data objects on connection thread 18bce61 is described below commit 18bce61e79bd8731be0241ba04db3841159da5e5 Author: Chuck Rolke AuthorDate: Thu Apr 29 15:21:15 2021 -0400 DISPATCH-2088: Free message_stream_data objects on connection thread and not on the core thread. Move flush_outgoing_bufs call to handle_disconnected function. This closes #1174 --- src/adaptors/tcp_adaptor.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index ddd7409..4a260bb 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -452,7 +452,6 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc) if (tc->activate_timer) { qd_timer_free(tc->activate_timer); } -flush_outgoing_buffs(tc); sys_mutex_free(tc->activation_lock); //proactor will free the socket free_qdr_tcp_connection_t(tc); @@ -495,6 +494,8 @@ static void handle_disconnected(qdr_tcp_connection_t* conn) qdr_delivery_decref(tcp_adaptor->core, conn->initial_delivery, "tcp-adaptor.handle_disconnected - initial_delivery"); conn->initial_delivery = 0; } +flush_outgoing_buffs(conn); + //need to free on core thread to avoid deleting while in use by management agent qdr_action_t *action = qdr_action(qdr_del_tcp_connection_CT, "delete_tcp_connection"); - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch main updated: DISPATCH-2090: [tools] Scraper handles non-terminal dispositions
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new 04c1001 DISPATCH-2090: [tools] Scraper handles non-terminal dispositions 04c1001 is described below commit 04c10019bd69427ef70566087ec2f1bb945ffc86 Author: Chuck Rolke AuthorDate: Wed Apr 28 10:18:49 2021 -0400 DISPATCH-2090: [tools] Scraper handles non-terminal dispositions Non-terminal dispositions were added to effect TCP adaptor flow control in the 1.16 release. Scraper needs adjustments to handle them. * Report negative disposition delta times as 0 seconds * Allow non-terminal dispositions to be overwritten without logging an error --- tools/scraper/amqp_detail.py | 17 +++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/tools/scraper/amqp_detail.py b/tools/scraper/amqp_detail.py index 8a2de9b..0b2c3cd 100755 --- a/tools/scraper/amqp_detail.py +++ b/tools/scraper/amqp_detail.py @@ -449,6 +449,9 @@ class AllDetails(): :param t0: :return: """ +if ttest < t0: +# Never return negative deltas +return "0.00" delta = ttest - t0 t = float(delta.seconds) + float(delta.microseconds) / 100.0 return "%0.06f" % t @@ -681,8 +684,18 @@ class AllDetails(): for sdid in range(int(splf.data.first), (int(splf.data.last) + 1)): did = str(sdid) if did in sdispmap: -sys.stderr.write("ERROR: Delivery ID collision in disposition map. connid:%s, \n" % - (splf.data.conn_id)) +old_splf = sdispmap[did] +if "state=@received" in old_splf.line: +# Existing disposition is non-terminal. +# Don't complain when it is overwritten by another non-terminal +# or by a terminal disposition. +pass +else: +# Current state is terminal disposition. Complain when overwritten. +sys.stderr.write("ERROR: Delivery ID collision in disposition map. connid:%s, \n" % + (splf.data.conn_id)) +sys.stderr.write(" old: %s, %s\n" % (old_splf.fid, old_splf.line)) +sys.stderr.write(" new: %s, %s\n" % (splf.fid, splf.line)) sdispmap[did] = splf def rollup_disposition_counts(self, state, conn, sess, link): - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch main updated: DISPATCH-2089: [tools] Scraper must html-escape transfer payload
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new 4da6849 DISPATCH-2089: [tools] Scraper must html-escape transfer payload 4da6849 is described below commit 4da68495bf88d33898ab63d0d3095ffc0ca28046 Author: Chuck Rolke AuthorDate: Tue Apr 27 17:39:44 2021 -0400 DISPATCH-2089: [tools] Scraper must html-escape transfer payload --- tools/scraper/parser.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/scraper/parser.py b/tools/scraper/parser.py index e7ca2bf..2bff233 100755 --- a/tools/scraper/parser.py +++ b/tools/scraper/parser.py @@ -762,7 +762,7 @@ class ParsedLogLine(object): self.highlighted("more", res.transfer_more, common.color_of("more")), self.highlighted("resume", res.transfer_resume, common.color_of("aborted")), self.highlighted("aborted", res.transfer_aborted, common.color_of("aborted")), -showdat, spl[-SEQUENCE_TRANSFER_SIZE:], +showdat, common.html_escape(spl[-SEQUENCE_TRANSFER_SIZE:]), res.transfer_size) res.sdorg_str = "%s %s (%s) %s (%s%s%s%s)\\n%s" % ( res.name, res.channel_handle, res.delivery_id, res.transfer_size, - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch main updated: DISPATCH-2043: TCP adaptor logging improvements
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new 80c685d DISPATCH-2043: TCP adaptor logging improvements 80c685d is described below commit 80c685d7398b504238066bd5400fed2f5dc26326 Author: Chuck Rolke AuthorDate: Mon Apr 26 09:25:36 2021 -0400 DISPATCH-2043: TCP adaptor logging improvements * add connection role to several logs * in handle_incoming report conn,link,delivery to most logs This closes #1157 --- src/adaptors/tcp_adaptor.c | 121 +++-- 1 file changed, 61 insertions(+), 60 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 2f895ab..9f34403 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -275,14 +275,14 @@ static int handle_incoming(qdr_tcp_connection_t *conn, const char *msg) // if we don't yet have a reply-to address and credit. if (conn->ingress && !conn->reply_to) { qd_log(log, QD_LOG_DEBUG, -"[C%"PRIu64"][L%"PRIu64"] Waiting for reply-to address before initiating ingress stream message", -conn->conn_id, conn->incoming_id); +"[C%"PRIu64"][L%"PRIu64"] Waiting for reply-to address before initiating %s ingress stream message", +conn->conn_id, conn->incoming_id, qdr_tcp_connection_role_name(conn)); return 0; } if (!conn->flow_enabled) { qd_log(log, QD_LOG_DEBUG, -"[C%"PRIu64"][L%"PRIu64"] Waiting for credit before initiating ingress stream message", -conn->conn_id, conn->incoming_id); +"[C%"PRIu64"][L%"PRIu64"] Waiting for credit before initiating %s ingress stream message", +conn->conn_id, conn->incoming_id, qdr_tcp_connection_role_name(conn)); return 0; } @@ -301,7 +301,7 @@ static int handle_incoming(qdr_tcp_connection_t *conn, const char *msg) qd_compose_insert_string(props, conn->global_id); // subject qd_compose_insert_string(props, conn->reply_to); // reply-to qd_log(log, QD_LOG_DEBUG, - "[C%"PRIu64"][L%"PRIu64"] Initiating ingress stream incoming link for %s connection to: %s reply: %s", + "[C%"PRIu64"][L%"PRIu64"] Initiating listener (ingress) stream incoming link for %s connection to: %s reply: %s", conn->conn_id, conn->incoming_id, qdr_tcp_connection_role_name(conn), conn->config.address, conn->reply_to); } else { @@ -309,8 +309,8 @@ static int handle_incoming(qdr_tcp_connection_t *conn, const char *msg) qd_compose_insert_string(props, conn->global_id); // subject qd_compose_insert_null(props);// reply-to qd_log(log, QD_LOG_DEBUG, - "[C%"PRIu64"][L%"PRIu64"] Initiating ingress stream incoming link for %s connection to: %s", - conn->conn_id, conn->incoming_id, qdr_tcp_connection_role_name(conn), conn->reply_to); + "[C%"PRIu64"][L%"PRIu64"] Initiating connector (egress) stream incoming link for connection to: %s", + conn->conn_id, conn->incoming_id, conn->reply_to); } //qd_compose_insert_null(props); // correlation-id //qd_compose_insert_null(props); // content-type @@ -332,17 +332,19 @@ static int handle_incoming(qdr_tcp_connection_t *conn, const char *msg) conn->instream = qdr_link_deliver(conn->incoming, msg, 0, false, 0, 0, 0, 0); qd_log(log, QD_LOG_DEBUG, - "[C%"PRIu64"][L%"PRIu64"][D%"PRIu32"] Initiating ingress stream message with 0 bytes", - conn->conn_id, conn->incoming_id, conn->instream->delivery_id); + "[C%"PRIu64"][L%"PRIu64"][D%"PRIu32"] Initiating empty %s incoming stream message", + conn->conn_id, conn->incoming_id, conn->instream->delivery_id, + qdr_tcp_connection_role_name(conn)); conn->incoming_started = true; } +qdr_delivery_t *conn_instream = conn->instream; // Don't read from proton if in Q2 holdoff if (conn->q2_blocked) { qd_log(log, QD_LOG_DEBUG, - "[C%"PRIu64"] handle_incoming q2_blocked for
[qpid-dispatch] branch main updated: DISPATCH-2074: Remove unused function
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new 49d84ea DISPATCH-2074: Remove unused function 49d84ea is described below commit 49d84eab4be0bf0627895f23ec7f6fc728f03fe9 Author: Chuck Rolke AuthorDate: Fri Apr 23 16:52:03 2021 -0400 DISPATCH-2074: Remove unused function --- src/adaptors/tcp_adaptor.c | 6 -- 1 file changed, 6 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index f6cbe7f..2f895ab 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -119,12 +119,6 @@ static inline uint64_t qdr_tcp_conn_linkid(const qdr_tcp_connection_t *conn) return conn->instream ? conn->incoming_id : conn->outgoing_id; } -static inline const char * qdr_link_direction_name(const qdr_link_t *link) -{ -assert(link); -return qdr_link_direction(link) == QD_OUTGOING ? "outgoing" : "incoming"; -} - static inline const char * qdr_tcp_connection_role_name(const qdr_tcp_connection_t *tc) { assert(tc); - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch main updated: DISPATCH-2075: Bump required proton to 0.34
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new 5955ba6 DISPATCH-2075: Bump required proton to 0.34 5955ba6 is described below commit 5955ba6210e726a76fc98ab338ba13a10048ae43 Author: Chuck Rolke AuthorDate: Fri Apr 23 16:50:19 2021 -0400 DISPATCH-2075: Bump required proton to 0.34 --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index d8d4d0a..a05369f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -106,7 +106,7 @@ if (NOT DEFINED Python_FIND_FRAMEWORK) endif () find_package(Python REQUIRED COMPONENTS Interpreter Development) find_package(Threads REQUIRED) -find_package(Proton 0.33.0 REQUIRED COMPONENTS Core Proactor) +find_package(Proton 0.34.0 REQUIRED COMPONENTS Core Proactor) message(STATUS "Found Proton: ${Proton_LIBRARIES} (found version \"${Proton_VERSION}\")" ) ## ## Optional dependencies - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch main updated: DISPATCH-1878: fix log format specifier
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new e354322 DISPATCH-1878: fix log format specifier e354322 is described below commit e354322bcf5081bac2b5c48b1a893f8fdd0b126a Author: Chuck Rolke AuthorDate: Fri Apr 23 14:10:45 2021 -0400 DISPATCH-1878: fix log format specifier --- src/adaptors/tcp_adaptor.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 9fcf267..f6cbe7f 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -338,7 +338,7 @@ static int handle_incoming(qdr_tcp_connection_t *conn, const char *msg) conn->instream = qdr_link_deliver(conn->incoming, msg, 0, false, 0, 0, 0, 0); qd_log(log, QD_LOG_DEBUG, - "[C%"PRIu64"][L%"PRIu64"][D%"PRIu64"] Initiating ingress stream message with 0 bytes", + "[C%"PRIu64"][L%"PRIu64"][D%"PRIu32"] Initiating ingress stream message with 0 bytes", conn->conn_id, conn->incoming_id, conn->instream->delivery_id); conn->incoming_started = true; - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch main updated: DISPATCH-1878: Handle half-closed TCP connections without losing data
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new d288c21 DISPATCH-1878: Handle half-closed TCP connections without losing data d288c21 is described below commit d288c21fe5ecc7a2d78eaf1f38e973fce2356e45 Author: Chuck Rolke AuthorDate: Fri Apr 23 09:27:15 2021 -0400 DISPATCH-1878: Handle half-closed TCP connections without losing data Restructure the code several ways to handle closed reads and closed writes without necessarily closing the whole connection and losing data in flight. This is a squashed and rebased commit of work during development and tracked under PR#1129. This closes #1129 --- src/adaptors/tcp_adaptor.c | 539 ++--- 1 file changed, 365 insertions(+), 174 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index eb10f8a..9fcf267 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -57,11 +57,14 @@ struct qdr_tcp_connection_t { qdr_delivery_t *outstream; bool ingress; bool flow_enabled; +bool incoming_started; bool egress_dispatcher; bool connector_closed;//only used if egress_dispatcher=true bool in_list; // This connection is in the adaptor's connections list -bool raw_closed_read; -bool raw_closed_write; +bool raw_closed_read; // proton event seen +bool raw_closed_write; // proton event seen or write_close called +bool raw_read_shutdown; // stream closed +bool read_eos_seen; qdr_delivery_t *initial_delivery; qd_timer_t *activate_timer; qd_bridge_config_tconfig; @@ -116,6 +119,26 @@ static inline uint64_t qdr_tcp_conn_linkid(const qdr_tcp_connection_t *conn) return conn->instream ? conn->incoming_id : conn->outgoing_id; } +static inline const char * qdr_link_direction_name(const qdr_link_t *link) +{ +assert(link); +return qdr_link_direction(link) == QD_OUTGOING ? "outgoing" : "incoming"; +} + +static inline const char * qdr_tcp_connection_role_name(const qdr_tcp_connection_t *tc) +{ +assert(tc); +return tc->ingress ? "listener" : "connector"; +} + +static const char * qdr_tcp_quadrant_id(const qdr_tcp_connection_t *tc, const qdr_link_t *link) +{ +if (tc->ingress) +return link->link_direction == QD_INCOMING ? "(listener incoming)" : "(listener outgoing)"; +else +return link->link_direction == QD_INCOMING ? "(connector incoming)" : "(connector outgoing)"; +} + static void on_activate(void *context) { qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context; @@ -136,22 +159,22 @@ static void grant_read_buffers(qdr_tcp_connection_t *conn) pn_raw_buffer_t raw_buffers[READ_BUFFERS]; // Give proactor more read buffers for the socket -if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn)) { -size_t desired = pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn); -qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Granted %zu read buffers", conn->conn_id, desired); -while (desired) { -size_t i; -for (i = 0; i < desired && i < READ_BUFFERS; ++i) { -qd_buffer_t *buf = qd_buffer(); -raw_buffers[i].bytes = (char*) qd_buffer_base(buf); -raw_buffers[i].capacity = qd_buffer_capacity(buf); -raw_buffers[i].size = 0; -raw_buffers[i].offset = 0; -raw_buffers[i].context = (uintptr_t) buf; -} -desired -= i; -pn_raw_connection_give_read_buffers(conn->pn_raw_conn, raw_buffers, i); +size_t desired = pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn); +qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, +"[C%"PRIu64"][L%"PRIu64"] Granting %zu to pn_raw_connection_give_read_buffers()", +conn->conn_id, conn->incoming_id, desired); +while (desired) { +size_t i; +for (i = 0; i < desired && i < READ_BUFFERS; ++i) { +qd_buffer_t *buf = qd_buffer(); +raw_buffers[i].bytes = (char*) qd_buffer_base(buf); +raw_buffers[i].capacity = qd_buffer_capacity(buf); +raw_buffers[i].size = 0; +raw_buffers[i].offset = 0; +raw_buffers[i].context = (uintptr_t) buf; } +
[qpid-dispatch] branch main updated: DISPATCH-2038: python default args must not be mutable
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new 440acaa DISPATCH-2038: python default args must not be mutable 440acaa is described below commit 440acaad5ff5af1938d2762325ff3566f8b662d4 Author: Chuck Rolke AuthorDate: Tue Apr 6 12:09:01 2021 -0400 DISPATCH-2038: python default args must not be mutable --- tests/system_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/system_test.py b/tests/system_test.py index bc317a8..96b188c 100755 --- a/tests/system_test.py +++ b/tests/system_test.py @@ -943,7 +943,9 @@ class AsyncTestReceiver(MessagingHandler): Empty = Queue.Empty def __init__(self, address, source, conn_args=None, container_id=None, - wait=True, recover_link=False, msg_args={}): + wait=True, recover_link=False, msg_args=None): +if msg_args is None: +msg_args = {} super(AsyncTestReceiver, self).__init__(**msg_args) self.address = address self.source = source - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch main updated: DISPATCH-1985: Update Logger class in system_test
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/main by this push: new 46a580e DISPATCH-1985: Update Logger class in system_test 46a580e is described below commit 46a580e7c9c309ee18dbd124ea8996d339138aa8 Author: Chuck Rolke AuthorDate: Tue Apr 6 10:23:33 2021 -0400 DISPATCH-1985: Update Logger class in system_test Move new Logger out of tcp adaptor test and into system_test. --- tests/system_test.py | 16 +--- tests/system_tests_tcp_adaptor.py | 55 ++- 2 files changed, 21 insertions(+), 50 deletions(-) diff --git a/tests/system_test.py b/tests/system_test.py index 35d3860..bc317a8 100755 --- a/tests/system_test.py +++ b/tests/system_test.py @@ -1387,13 +1387,19 @@ class Logger(object): """ Record an event log for a self test. May print per-event or save events to be printed later. +Optional file opened in 'append' mode to which each log line is written. """ -def __init__(self, title="Logger", print_to_console=False, save_for_dump=True): +def __init__(self, + title="Logger", + print_to_console=False, + save_for_dump=True, + ofilename=None): self.title = title self.print_to_console = print_to_console self.save_for_dump = save_for_dump self.logs = [] +self.ofilename = ofilename def log(self, msg): ts = Timestamp() @@ -1402,14 +1408,18 @@ class Logger(object): if self.print_to_console: print("%s %s" % (ts, msg)) sys.stdout.flush() +if self.ofilename is not None: +with open(self.ofilename, 'a') as f_out: +f_out.write("%s %s\n" % (ts, msg)) +f_out.flush() def dump(self): print(self) sys.stdout.flush() +@property def __str__(self): -lines = [] -lines.append(self.title) +lines = [self.title] for ts, msg in self.logs: lines.append("%s %s" % (ts, msg)) res = str('\n'.join(lines)) diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index 3fc27dd..277cbbe 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -28,11 +28,15 @@ import sys import time import traceback -from system_test import TestCase, Qdrouterd, main_module, TIMEOUT -from system_test import Timestamp -from system_test import unittest -from system_test import SkipIfNeeded +from system_test import Logger +from system_test import main_module from system_test import Process +from system_test import Qdrouterd +from system_test import SkipIfNeeded +from system_test import TestCase +from system_test import TIMEOUT +from system_test import unittest + from subprocess import PIPE # Tests in this file are organized by classes that inherit TestCase. @@ -85,49 +89,6 @@ Q2_DELAY_SECONDS = 1.0 Q2_TEST_MESSAGE_SIZE = 1000 -class Logger(): -""" -Record event logs as existing Logger. Also add: -* ofile - optional file opened in 'append' mode to which each log line is written -TODO: Replace system_test Logger with this after merging dev-protocol-adaptors branch -""" - -def __init__(self, - title="Logger", - print_to_console=False, - save_for_dump=True, - ofilename=None): -self.title = title -self.print_to_console = print_to_console -self.save_for_dump = save_for_dump -self.logs = [] -self.ofilename = ofilename - -def log(self, msg): -ts = Timestamp() -if self.save_for_dump: -self.logs.append((ts, msg)) -if self.print_to_console: -print("%s %s" % (ts, msg)) -sys.stdout.flush() -if self.ofilename is not None: -with open(self.ofilename, 'a') as f_out: -f_out.write("%s %s\n" % (ts, msg)) -f_out.flush() - -def dump(self): -print(self) -sys.stdout.flush() - -@property -def __str__(self): -lines = [self.title] -for ts, msg in self.logs: -lines.append("%s %s" % (ts, msg)) -res = str('\n'.join(lines)) -return res - - class TcpAdaptor(TestCase): """ 6 edge routers connected via 3 interior routers. - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch master updated: DISPATCH-2003: Memory usage as uint32 underreports larger sizes
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new 380a53c DISPATCH-2003: Memory usage as uint32 underreports larger sizes 380a53c is described below commit 380a53c0084a6164912290ec9fb5c675cbf28e52 Author: Chuck Rolke AuthorDate: Tue Mar 16 17:03:17 2021 -0400 DISPATCH-2003: Memory usage as uint32 underreports larger sizes --- src/router_core/agent_router.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/router_core/agent_router.c b/src/router_core/agent_router.c index 33d544a..7d83eac 100644 --- a/src/router_core/agent_router.c +++ b/src/router_core/agent_router.c @@ -246,7 +246,7 @@ static void qdr_agent_write_column_CT(qd_composed_field_t *body, int col, qdr_co break; case QDR_ROUTER_MEMORY_USAGE: { -uint32_t size = qd_router_memory_usage(); +uint64_t size = qd_router_memory_usage(); if (size) qd_compose_insert_ulong(body, size); else // memory usage not available - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch master updated: DISPATCH-1981: TCP self test to force Q2 flow control
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new 4439fea DISPATCH-1981: TCP self test to force Q2 flow control 4439fea is described below commit 4439fea1fbcfcffb2a2d6657bcd13e4d6ec96bad Author: Chuck Rolke AuthorDate: Mon Mar 8 11:29:10 2021 -0500 DISPATCH-1981: TCP self test to force Q2 flow control The echo server is rewritten to generate Q2 holdoff more reliably. The self test uses the connection-stall feature of the the echo server. The server does not read the message content, beyond normal TCP window and python prefetch, so that the router gets into a Q2 block state. This closes #1060 --- tests/TCP_echo_server.py | 56 -- tests/system_tests_tcp_adaptor.py | 227 -- 2 files changed, 263 insertions(+), 20 deletions(-) diff --git a/tests/TCP_echo_server.py b/tests/TCP_echo_server.py index 9e1a286..678132c 100755 --- a/tests/TCP_echo_server.py +++ b/tests/TCP_echo_server.py @@ -46,7 +46,6 @@ class ClientRecord(object): the inbound and outbound data list buffers for this socket's payload. """ - def __init__(self, address): self.addr = address self.inb = b'' @@ -87,7 +86,8 @@ def split_chunk_for_display(raw_bytes): class TcpEchoServer: -def __init__(self, prefix="ECHO_SERVER", port="0", echo_count=0, timeout=0.0, logger=None): +def __init__(self, prefix="ECHO_SERVER", port="0", echo_count=0, timeout=0.0, logger=None, + conn_stall=0.0, close_on_conn=False, close_on_data=False): """ Start echo server in separate thread @@ -104,6 +104,9 @@ class TcpEchoServer: self.echo_count = echo_count self.timeout = timeout self.logger = logger +self.conn_stall = conn_stall +self.close_on_conn = close_on_conn +self.close_on_data = close_on_data self.keep_running = True self.HOST = '127.0.0.1' self.is_running = False @@ -115,7 +118,11 @@ class TcpEchoServer: def run(self): """ -Run server in daemon thread +Run server in daemon thread. +A single thread runs multiple sockets through selectors. +Note that timeouts and such are done in line and processing stops for +all sockets when one socket is timing out. For the intended one-at-a-time +test cases this works but it is not general solution for all cases. :return: """ try: @@ -160,11 +167,12 @@ class TcpEchoServer: for key, mask in events: if key.data is None: if key.fileobj is self.sock: -self.do_accept(key.fileobj, sel, self.logger) +self.do_accept(key.fileobj, sel, self.logger, self.conn_stall, self.close_on_conn) else: pass # Only listener 'sock' has None in opaque data field else: -total_echoed += self.do_service(key, mask, sel, self.logger) +n_echoed = self.do_service(key, mask, sel, self.logger, self.close_on_data) +total_echoed += n_echoed if n_echoed > 0 else 0 else: pass # select timeout. probably. @@ -176,14 +184,22 @@ class TcpEchoServer: self.is_running = False -def do_accept(self, sock, sel, logger): +def do_accept(self, sock, sel, logger, conn_stall, close_on_conn): conn, addr = sock.accept() logger.log('%s Accepted connection from %s:%d' % (self.prefix, addr[0], addr[1])) +if conn_stall > 0.0: +logger.log('%s Connection from %s:%d stall start' % (self.prefix, addr[0], addr[1])) +time.sleep(conn_stall) +logger.log('%s Connection from %s:%d stall end' % (self.prefix, addr[0], addr[1])) +if close_on_conn: +logger.log('%s Connection from %s:%d closing due to close_on_conn' % (self.prefix, addr[0], addr[1])) +conn.close() +return conn.setblocking(False) events = selectors.EVENT_READ | selectors.EVENT_WRITE sel.register(conn, events, data=ClientRecord(addr)) -def do_service(self, key, mask, sel, logger): +def do_service(self, key, mask, sel, logger, close_on_data): retval = 0 sock = key.fileobj data = key.data @@ -205,6 +221,11 @@ class TcpEchoServer: return 1
[qpid-dispatch] branch master updated: DISPATCH-1981: Delete TCP Q2 holdoff check; q2 holdoff be triggered reliably
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new d5809df DISPATCH-1981: Delete TCP Q2 holdoff check; q2 holdoff be triggered reliably d5809df is described below commit d5809df5b5dfc49027685744e7b1405eb80baf70 Author: Chuck Rolke AuthorDate: Wed Feb 24 17:17:58 2021 -0500 DISPATCH-1981: Delete TCP Q2 holdoff check; q2 holdoff be triggered reliably --- tests/system_tests_tcp_adaptor.py | 14 -- 1 file changed, 14 deletions(-) diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index faa115a..7ffd1a3 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -680,20 +680,6 @@ class TcpAdaptor(TestCase): sys.stdout.flush() assert result is None, "TCP_TEST Stop %s FAIL: %s" % (name, result) -# search the router log file to verify Q2 was hit -time.sleep(0.005) # wait for log file to be written -block_ct = 0 -unblock_ct = 0 -with io.open(self.INTA.logfile_path) as f: -for line in f: -if 'client link blocked on Q2 limit' in line: -block_ct += 1 -if 'client link unblocked from Q2 limit' in line: -unblock_ct += 1 -self.assertTrue(block_ct > 0) -self.assertEqual(block_ct, unblock_ct) -self.logger.log("TCP_TEST Stop %s SUCCESS" % name) - @SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON) def test_20_tcp_connect_disconnect(self): name = "test_20_tcp_connect_disconnect" - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch master updated: DISPATCH-1947: Add self test to verify that Q2 limit is in effect
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new ffc4c9c DISPATCH-1947: Add self test to verify that Q2 limit is in effect ffc4c9c is described below commit ffc4c9cc6d1871cf517754b5f060988229b6cec9 Author: Chuck Rolke AuthorDate: Tue Feb 23 17:09:09 2021 -0500 DISPATCH-1947: Add self test to verify that Q2 limit is in effect --- tests/system_tests_tcp_adaptor.py | 16 1 file changed, 16 insertions(+) diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index 09a50ec..bbe02dc 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -22,6 +22,7 @@ from __future__ import division from __future__ import absolute_import from __future__ import print_function +import io import os import sys import time @@ -678,6 +679,21 @@ class TcpAdaptor(TestCase): print(result) sys.stdout.flush() assert result is None, "TCP_TEST Stop %s FAIL: %s" % (name, result) + +# search the router log file to verify Q2 was hit +time.sleep(0.005) # wait for log file to be written +block_ct = 0 +unblock_ct = 0 +with io.open(self.INTA.logfile_path) as f: +for line in f: +if 'client link blocked on Q2 limit' in line: +block_ct += 1 +if 'client link unblocked from Q2 limit' in line: +unblock_ct += 1 +self.assertTrue(block_ct > 0) +self.assertEqual(block_ct, unblock_ct) + + self.logger.log("TCP_TEST Stop %s SUCCESS" % name) @SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON) - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch master updated: DISPATCH-1947: TCP Adaptor flow control
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new c6b2055 DISPATCH-1947: TCP Adaptor flow control c6b2055 is described below commit c6b205574ed5925cde40d8f126902d28ff5a32e2 Author: Chuck Rolke AuthorDate: Tue Feb 23 16:48:05 2021 -0500 DISPATCH-1947: TCP Adaptor flow control This closes #1056 --- src/adaptors/tcp_adaptor.c | 88 -- 1 file changed, 78 insertions(+), 10 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 69bed1a..0b83123 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -17,6 +17,7 @@ * under the License. */ +#include "tcp_adaptor.h" #include #include #include @@ -26,7 +27,6 @@ #include "qpid/dispatch/ctools.h" #include "qpid/dispatch/protocol_adaptor.h" #include "delivery.h" -#include "tcp_adaptor.h" #include #include @@ -78,6 +78,9 @@ struct qdr_tcp_connection_t { int outgoing_buff_count; // number of buffers with data int outgoing_buff_idx;// first buffer with data +sys_atomic_tq2_restart; // signal to resume receive +boolq2_blocked; // stop reading from raw conn + DEQ_LINKS(qdr_tcp_connection_t); }; @@ -148,7 +151,37 @@ static void grant_read_buffers(qdr_tcp_connection_t *conn) } } -static int handle_incoming(qdr_tcp_connection_t *conn) + +// Per-message callback to resume receiving after Q2 is unblocked on the +// incoming link. +// This routine must be thread safe: the thread on which it is running +// is not an IO thread that owns the underlying pn_raw_conn. +// +void qdr_tcp_q2_unblocked_handler(const qd_alloc_safe_ptr_t context) +{ +qdr_tcp_connection_t *tc = (qdr_tcp_connection_t*)qd_alloc_deref_safe_ptr(&context); +if (tc == 0) { +// bad news. +assert(false); +return; +} + +// prevent the tc from being deleted while running: +sys_mutex_lock(tc->activation_lock); + +if (tc && tc->pn_raw_conn) { +sys_atomic_set(&tc->q2_restart, 1); +pn_raw_connection_wake(tc->pn_raw_conn); +} + +sys_mutex_unlock(tc->activation_lock); +} + + +// Fetch incoming raw incoming buffers from proton and pass them to +// existing delivery or create a new delivery. +// If close is pending then do not give more buffers to proton. +static int handle_incoming_impl(qdr_tcp_connection_t *conn, bool close_pending) { // // Don't initiate an ingress stream message if we don't yet have a reply-to address and credit. @@ -163,6 +196,16 @@ static int handle_incoming(qdr_tcp_connection_t *conn) return 0; } +// +// Don't read from proton if in Q2 holdoff +// +if (conn->q2_blocked) { +qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] handle_incoming q2_blocked", conn->conn_id); +return 0; +} + +// Read all buffers available from proton. +// Collect buffers for ingress; free empty buffers. qd_buffer_list_t buffers; DEQ_INIT(buffers); pn_raw_buffer_t raw_buffers[READ_BUFFERS]; @@ -182,14 +225,20 @@ static int handle_incoming(qdr_tcp_connection_t *conn) } } } - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Took %zu read buffers", conn->conn_id, DEQ_SIZE(buffers)); qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freed %i read buffers", conn->conn_id, free_count); -grant_read_buffers(conn); + +// Only grant more buffers to proton for reading if close is not pending +if (!close_pending) { +grant_read_buffers(conn); +} if (conn->instream) { -// @TODO(kgiusti): handle Q2 block event: -qd_message_stream_data_append(qdr_delivery_message(conn->instream), &buffers, 0); +qd_message_stream_data_append(qdr_delivery_message(conn->instream), &buffers, &conn->q2_blocked); +if (conn->q2_blocked) { +// note: unit tests grep for this log! +qd_log(tcp_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] client link blocked on Q2 limit", conn->conn_id); +} qdr_delivery_continue(tcp_adaptor->core, conn->instream, false); qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Continuing message with %i bytes", conn->conn_id, conn->incoming_id, count); } else { @@ -230,6 +279,10 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
[qpid-dispatch] branch master updated: DISPATCH-1968: Avoid proton calls on a closed raw connections
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new 5d27cfa DISPATCH-1968: Avoid proton calls on a closed raw connections 5d27cfa is described below commit 5d27cfadde1c86745af554c50d02c03b70e288b7 Author: Chuck Rolke AuthorDate: Mon Feb 22 11:24:15 2021 -0500 DISPATCH-1968: Avoid proton calls on a closed raw connections * Do not write new buffers if connection is CLOSED_WRITE * Do not call connection_wake if CLOSED_READ or CLOSED_WRITE This closes #1047 --- src/adaptors/tcp_adaptor.c | 45 + 1 file changed, 41 insertions(+), 4 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 6cc8855..69bed1a 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -56,6 +56,8 @@ struct qdr_tcp_connection_t { bool egress_dispatcher; bool connector_closed;//only used if egress_dispatcher=true bool in_list; // This connection is in the adaptor's connections list +bool raw_closed_read; +bool raw_closed_write; qdr_delivery_t *initial_delivery; qd_timer_t *activate_timer; qd_bridge_config_tconfig; @@ -122,6 +124,9 @@ static void on_activate(void *context) static void grant_read_buffers(qdr_tcp_connection_t *conn) { +if (conn->raw_closed_read) +return; + pn_raw_buffer_t raw_buffers[READ_BUFFERS]; // Give proactor more read buffers for the socket if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn)) { @@ -231,6 +236,31 @@ static int handle_incoming(qdr_tcp_connection_t *conn) return count; } + +static void flush_outgoing_buffs(qdr_tcp_connection_t *conn) +{ +// Flush buffers staged for writing to raw conn +// and free possible references to stream data objects. +if (conn->outgoing_buff_count > 0) { +for (size_t i = conn->outgoing_buff_idx; +i < conn->outgoing_buff_idx + conn->outgoing_buff_count; +++i) { +if (conn->outgoing_buffs[i].context) { +qd_message_stream_data_release( + (qd_message_stream_data_t*)conn->outgoing_buffs[i].context); +} +} +} +conn->outgoing_buff_count = 0; + +// Flush in-progress stream data object +if (conn->outgoing_stream_data) { +free_qd_message_stream_data_t(conn->outgoing_stream_data); +conn->outgoing_stream_data = 0; +} +} + + static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc) { qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freeing tcp_connection %p", tc->conn_id, (void*) tc); @@ -240,9 +270,7 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc) if (tc->activate_timer) { qd_timer_free(tc->activate_timer); } -if (tc->outgoing_stream_data) { -free_qd_message_stream_data_t(tc->outgoing_stream_data); -} +flush_outgoing_buffs(tc); sys_mutex_free(tc->activation_lock); //proactor will free the socket free_qdr_tcp_connection_t(tc); @@ -348,6 +376,7 @@ static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_r return used; } + static bool write_outgoing_buffs(qdr_tcp_connection_t *conn) { // Send the outgoing buffs to pn_raw_conn. @@ -383,6 +412,12 @@ static bool write_outgoing_buffs(qdr_tcp_connection_t *conn) static void handle_outgoing(qdr_tcp_connection_t *conn) { if (conn->outstream) { +if (conn->raw_closed_write) { +// flush outgoing buffers and free attached stream_data objects +flush_outgoing_buffs(conn); +// give no more buffers to raw connection +return; +} qd_message_t *msg = qdr_delivery_message(conn->outstream); bool read_more_body = true; @@ -534,11 +569,13 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void } case PN_RAW_CONNECTION_CLOSED_READ: { qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id); +conn->raw_closed_read = true; pn_raw_connection_close(conn->pn_raw_conn); break; } case PN_RAW_CONNECTION_CLOSED_WRITE: { qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_WRITE", conn->conn_id); +conn->raw_closed_write = true; pn_raw_connection_close(conn->pn_raw_conn); break; } @@ -1186,7 +1223,7 @@ static void qdr_tcp_activate(void *notused, qdr_connection_t *c)
[qpid-dispatch] branch master updated: DISPATCH-1964: TCP adaptor connection object should be a pooled type
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new 9594d55 DISPATCH-1964: TCP adaptor connection object should be a pooled type 9594d55 is described below commit 9594d55d3e5bdcd2ef1ff2d7f33c45358ec8832a Author: Chuck Rolke AuthorDate: Mon Feb 15 11:38:37 2021 -0500 DISPATCH-1964: TCP adaptor connection object should be a pooled type --- src/adaptors/tcp_adaptor.c | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 51d9316..fce2219 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -22,6 +22,7 @@ #include #include #include +#include "qpid/dispatch/alloc_pool.h" #include "qpid/dispatch/ctools.h" #include "qpid/dispatch/protocol_adaptor.h" #include "delivery.h" @@ -79,6 +80,8 @@ struct qdr_tcp_connection_t { }; DEQ_DECLARE(qdr_tcp_connection_t, qdr_tcp_connection_list_t); +ALLOC_DECLARE(qdr_tcp_connection_t); +ALLOC_DEFINE(qdr_tcp_connection_t); typedef struct qdr_tcp_adaptor_t { qdr_core_t *core; @@ -241,7 +244,7 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc) } sys_mutex_free(tc->activation_lock); //proactor will free the socket -free(tc); +free_qdr_tcp_connection_t(tc); } static void handle_disconnected(qdr_tcp_connection_t* conn) @@ -597,7 +600,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void static qdr_tcp_connection_t *qdr_tcp_connection_ingress(qd_tcp_listener_t* listener) { -qdr_tcp_connection_t* tc = NEW(qdr_tcp_connection_t); +qdr_tcp_connection_t* tc = new_qdr_tcp_connection_t(); ZERO(tc); tc->activation_lock = sys_mutex(); tc->ingress = true; @@ -682,7 +685,7 @@ static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc) static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *config, qd_server_t *server, qdr_delivery_t *initial_delivery) { -qdr_tcp_connection_t* tc = NEW(qdr_tcp_connection_t); +qdr_tcp_connection_t* tc = new_qdr_tcp_connection_t(); ZERO(tc); tc->activation_lock = sys_mutex(); if (initial_delivery) { - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch master updated: DISPATCH-1955: TCP adaptor adds byte totals and other stats to logs
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new c5785d9 DISPATCH-1955: TCP adaptor adds byte totals and other stats to logs c5785d9 is described below commit c5785d9f557f340ad71440e30ef7a3ffc9a7e058 Author: Chuck Rolke AuthorDate: Mon Feb 15 11:19:45 2021 -0500 DISPATCH-1955: TCP adaptor adds byte totals and other stats to logs 1. Add byte total to logs where incremental values are shown. 2. Add management stat summary when connection is closed/deleted. This closes #1026 --- src/adaptors/tcp_adaptor.c | 12 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index a38167f..51d9316 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -567,7 +567,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void int read = handle_incoming(conn); conn->last_in_time = tcp_adaptor->core->uptime_ticks; conn->bytes_in += read; -qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_READ Read %i bytes", conn->conn_id, read); +qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_READ Read %i bytes. Total read %"PRIu64" bytes", conn->conn_id, read, conn->bytes_in); while (qdr_connection_process(conn->qdr_conn)) {} break; } @@ -583,9 +583,9 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void } } } -qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_WRITTEN Wrote %zu bytes", conn->conn_id, written); conn->last_out_time = tcp_adaptor->core->uptime_ticks; conn->bytes_out += written; +qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_WRITTEN Wrote %zu bytes. Total written %"PRIu64" bytes", conn->conn_id, written, conn->bytes_out); while (qdr_connection_process(conn->qdr_conn)) {} break; } @@ -1527,8 +1527,12 @@ static void qdr_del_tcp_connection_CT(qdr_core_t *core, qdr_action_t *action, bo qdr_tcp_connection_t *conn = (qdr_tcp_connection_t*) action->args.general.context_1; if (conn->in_list) { DEQ_REMOVE(tcp_adaptor->connections, conn); -qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] qdr_del_tcp_connection_CT %s (%zu)", - conn->conn_id, conn->config.host_port, DEQ_SIZE(tcp_adaptor->connections)); +qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"] qdr_del_tcp_connection_CT %s deleted. bytes_in=%"PRIu64", bytes_out=%"PRId64", " + "opened_time=%"PRId64", last_in_time=%"PRId64", last_out_time=%"PRId64". Connections remaining %zu", + conn->conn_id, conn->config.host_port, + conn->bytes_in, conn->bytes_out, conn->opened_time, conn->last_in_time, conn->last_out_time, + DEQ_SIZE(tcp_adaptor->connections)); } free_qdr_tcp_connection(conn); } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch master updated: DISPATCH-1946: Ensure that router schema is presented to json as a string
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new e78a18a DISPATCH-1946: Ensure that router schema is presented to json as a string e78a18a is described below commit e78a18ae77451afc5dda064467f0695a3742fd41 Author: Chuck Rolke AuthorDate: Mon Feb 8 09:35:41 2021 -0500 DISPATCH-1946: Ensure that router schema is presented to json as a string This closes #1013 --- python/qpid_dispatch_internal/management/qdrouter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/qpid_dispatch_internal/management/qdrouter.py b/python/qpid_dispatch_internal/management/qdrouter.py index 09fefb3..610e6f4 100644 --- a/python/qpid_dispatch_internal/management/qdrouter.py +++ b/python/qpid_dispatch_internal/management/qdrouter.py @@ -40,7 +40,7 @@ class QdSchema(schema.Schema): def __init__(self): """Load schema.""" -qd_schema = get_data('qpid_dispatch.management', 'qdrouter.json') +qd_schema = get_data('qpid_dispatch.management', 'qdrouter.json').decode('utf8') try: super(QdSchema, self).__init__(**json.loads(qd_schema, **JSON_LOAD_KWARGS)) except Exception as e: - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch master updated: DISPATCH-1854: Add delivery id and print in log prefix
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new 54dfd04 DISPATCH-1854: Add delivery id and print in log prefix 54dfd04 is described below commit 54dfd04cb854fa2bf6f4e828660160051138c070 Author: Chuck Rolke AuthorDate: Mon Jan 4 12:33:40 2021 -0500 DISPATCH-1854: Add delivery id and print in log prefix Delivery Id is similar to connection id and link id. It will be printed with log statements as a common prefix allowing for easier delivery tracking. Implementation notes: 1. Connection/Link/Delivery Id caching This patch saves the connection id in links and the connection and link ids in the delivery. The these copies are made once at object creation and used as necessary. The cached copies eliminate hunting for the values at log-statement time. The caching is slightly complicated by initial delivery handoff in the adaptors. That handoff is logged as the delivery's connection and link ids are rewritten. For instance: TCP_ADAPTOR (debug) [C1][L1][D121] initial_delivery ownership passed to [C22][L68][D121] Here a TCP_ADAPTOR egress dispatcher on [C1][L1] is passing delivery [D121] to tcp [C22][L68]. The cache strategy may print connection and link ids after the connection or link has disappeared. That's usually not a problem and the strategy eliminates the defensive code required to test if the connection or link still exists. 2. Delivery ids replace printing the address of the delivery Delivery addresses get reused a lot and grepping for them is hard. 3. Common macros to print the connection, link, and delivery ids from a delivery DLV_FMT - the format string defining conn-link-delivery log prefix DLV_ARGS - accessor to get log prefix values from delivery DLV_FMT is a quoted string similar to PRIu64 and PRIu32 and as such must be used outside of quotation marks in the source. DLV_ARGS(dlv) takes an argument which is a pointer to a qd_delivery_t object. A typical usage changes code like this: -qd_log(core->log, QD_LOG_DEBUG, "Delivery decref_CT: dlv:%lx rc:%"PRIu32" link:%"PRIu64" %s", - (long) dlv, ref_count - 1, link_identity, label); +qd_log(core->log, QD_LOG_DEBUG, DLV_FMT" Delivery decref_CT: rc:%"PRIu32" %s", + DLV_ARGS(dlv), ref_count - 1, label); This closes #962 --- src/adaptors/tcp_adaptor.c| 16 +++--- src/dispatch.c| 2 ++ src/router_core/connections.c | 2 ++ src/router_core/core_link_endpoint.c | 5 +++ src/router_core/delivery.c| 57 ++- src/router_core/delivery.h| 11 +++ src/router_core/forwarder.c | 5 +++ src/router_core/router_core_private.h | 1 + src/router_core/transfer.c| 24 +++ src/router_node.c | 2 ++ 10 files changed, 87 insertions(+), 38 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index d813e42..3b886b9 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -661,6 +661,8 @@ static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc) qdr_terminus_t *source = qdr_terminus(0); qdr_terminus_set_address(source, tc->config.address); +// This attach passes the ownership of the delivery from the core-side connection and link +// to the adaptor-side outgoing connection and link. tc->outgoing = qdr_link_first_attach(conn, QD_OUTGOING, source, //qdr_terminus_t *source, @@ -671,6 +673,10 @@ static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc) tc->initial_delivery, &(tc->outgoing_id)); if (!!tc->initial_delivery) { +qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, DLV_FMT" initial_delivery ownership passed to "DLV_FMT, + DLV_ARGS(tc->initial_delivery), tc->outgoing->conn_id, tc->outgoing->identity, tc->initial_delivery->delivery_id); +tc->initial_delivery->conn_id = tc->outgoing->conn_id; +tc->initial_delivery->link_id = tc->outgoing->identity; qdr_delivery_decref(tcp_adaptor->core, tc->initial_delivery, "tcp-adaptor - passing initial_delivery into new link"); tc->initial_delivery = 0; } @@ -1065,9 +1071
[qpid-dispatch] branch master updated: DISPATCH-1901: Allow any proton:io reason failing connection to bad host name
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new 27ae419 DISPATCH-1901: Allow any proton:io reason failing connection to bad host name 27ae419 is described below commit 27ae419b514ca962a37fa1cfedc7a67ae67447b3 Author: Chuck Rolke AuthorDate: Mon Jan 4 10:47:28 2021 -0500 DISPATCH-1901: Allow any proton:io reason failing connection to bad host name The system_tests_bad_configuration test sets up a connector to a host that is never reachable. The test expects the connection to fail. On Linux proton normally returns 'Name or service not known'. On macOS there is some other specific reason. Another class of errors shows up when DNS is unavailable. On Linux the reason is 'Temporary failure in name resolution'. This patch stops looking for specific text from the proton:io error messages. The test declares success when a qpid-dispatch log message indicates a connection failure to the host in question. This patch also removes nsswitch.conf binding from BWRAP as that workaround is not necessary any more. This closes #966 --- tests/CMakeLists.txt| 5 + tests/system_tests_bad_configuration.py | 11 --- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 42661e2..db8da65 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -67,10 +67,7 @@ target_link_libraries(clogger ${Proton_LIBRARIES}) # running tests in parallel (the ctest -j option) without port clashes set(USE_BWRAP OFF CACHE BOOL "Wrap test executions with bwrap (https://github.com/containers/bubblewrap)") if(USE_BWRAP) - # Inaccessible DNS servers produce "proton:io Temporary failure in name resolution". - # For system_tests_bad_configuration we need to get "proton:io Name or service not known", - # so blank /etc/nsswitch.conf to achieve that. - set(BWRAP_ARGS bwrap --bind / / --bind /dev/zero /etc/nsswitch.conf --unshare-net --dev /dev --die-with-parent --) + set(BWRAP_ARGS bwrap --bind / / --unshare-net --dev /dev --die-with-parent --) else() set(BWRAP_ARGS "") endif() diff --git a/tests/system_tests_bad_configuration.py b/tests/system_tests_bad_configuration.py index c651d86..c679790 100644 --- a/tests/system_tests_bad_configuration.py +++ b/tests/system_tests_bad_configuration.py @@ -51,12 +51,13 @@ class RouterTestBadConfiguration(TestCase): """ super(RouterTestBadConfiguration, cls).setUpClass() cls.name = "test-router" +cls.unresolvable_host_name = 'unresolvable.host.name' cls.config = Qdrouterd.Config([ ('router', {'mode': 'standalone', 'id': 'QDR.A'}), # Define a connector that uses an unresolvable hostname ('connector', {'name': 'UnresolvableConn', - 'host': 'unresolvable.host.name', + 'host': cls.unresolvable_host_name, 'port': 'amqp'}), ('listener', {'port': cls.tester.get_port()}), @@ -111,12 +112,8 @@ class RouterTestBadConfiguration(TestCase): """ with open('../setUpClass/test-router.log', 'r') as router_log: log_lines = router_log.read().split("\n") -expected_errors = [ -"proton:io Name or service not known", # Linux -"proton:io unknown node or service", # macOS -] -errors_caught = [line for line in log_lines - if any([expected_error in line for expected_error in expected_errors])] +expected_log_snip = "Connection to %s" % self.unresolvable_host_name +errors_caught = [line for line in log_lines if expected_log_snip in line and "failed" in line] self.error_caught = any(errors_caught) - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch master updated: DISPATCH-1895: Modify tcp self test not to use http listeners
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new 276514d DISPATCH-1895: Modify tcp self test not to use http listeners 276514d is described below commit 276514d3755aa075bdb04d8b1599c2f859073dd5 Author: Chuck Rolke AuthorDate: Thu Dec 17 15:08:44 2020 -0500 DISPATCH-1895: Modify tcp self test not to use http listeners As noted in jira comment, the test failure is the http-libwebsockets code doing a wild 64-bit realloc. The failure has nothing to do with the tcp test itself and should be directed to a new jira blaming the libwebsockets code. That said, the tcp test is using http listeners purely for easier test debugging. The http listeners may be removed with no loss of test functionality and that is what this patch does. --- tests/system_tests_tcp_adaptor.py | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index 236f4c5..301b1bc 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -159,7 +159,7 @@ class TcpAdaptor(TestCase): nodest_listener_ports = {} # Each router has a console listener -http_listener_ports = {} +#http_listener_ports = {} # local timeout in seconds to wait for one echo client to finish echo_timeout = 30 @@ -183,7 +183,7 @@ class TcpAdaptor(TestCase): Launch a router through the system_test framework. For each router: * normal listener first - * http listener for console connections + #* http listener for console connections * tcp listener for 'nodest', which will never exist * tcp connector to echo server whose address is the same as this router's name * six tcp listeners, one for each server on each router on the network @@ -196,7 +196,7 @@ class TcpAdaptor(TestCase): config = [ ('router', {'mode': mode, 'id': name}), ('listener', {'port': cls.amqp_listener_ports[name]}), -('listener', {'port': cls.http_listener_ports[name], 'http': 'yes'}), +#('listener', {'port': cls.http_listener_ports[name], 'http': 'yes'}), ('tcpListener', {'host': "0.0.0.0", 'port': cls.nodest_listener_ports[name], 'address': 'nodest', @@ -235,7 +235,7 @@ class TcpAdaptor(TestCase): tl_ports[tcp_listener] = cls.tester.get_port() cls.tcp_client_listener_ports[rtr] = tl_ports cls.nodest_listener_ports[rtr] = cls.tester.get_port() -cls.http_listener_ports[rtr] = cls.tester.get_port() +#cls.http_listener_ports[rtr] = cls.tester.get_port() inter_router_port_AB = cls.tester.get_port() inter_router_port_BC = cls.tester.get_port() @@ -263,8 +263,8 @@ class TcpAdaptor(TestCase): (rtr, tcp_listener, cls.tcp_client_listener_ports[rtr][tcp_listener])) p_out.append("%s_nodest_listener=%d" % (rtr, cls.nodest_listener_ports[rtr])) -p_out.append("%s_http_listener=%d" % - (rtr, cls.http_listener_ports[rtr])) +#p_out.append("%s_http_listener=%d" % +# (rtr, cls.http_listener_ports[rtr])) p_out.append("inter_router_port_AB=%d" % inter_router_port_AB) p_out.append("inter_router_port_BC=%d" % inter_router_port_BC) p_out.append("INTA_edge_port=%d" % cls.INTA_edge_port) - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch master updated: DISPATCH-1883: Tcp self test helpers require python selectors
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new e0fa62c DISPATCH-1883: Tcp self test helpers require python selectors e0fa62c is described below commit e0fa62c117557207239f603ff77ba68c4e2cee59 Author: Chuck Rolke AuthorDate: Wed Dec 16 10:37:05 2020 -0500 DISPATCH-1883: Tcp self test helpers require python selectors * add requirement to README * include selectors in github workflow, travis, and dockerfiles This closes #949 --- .github/workflows/build.yaml | 2 +- .travis.yml | 2 ++ README| 7 ++- dockerfiles/Dockerfile-fedora | 1 + dockerfiles/Dockerfile-ubuntu | 1 + 5 files changed, 11 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index ae4790c..1e51d53 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -197,7 +197,7 @@ jobs: architecture: x64 - name: Install Python runtime/test dependencies -run: python -m pip install tox quart +run: python -m pip install tox quart selectors - name: Install Linux runtime/test dependencies if: ${{ runner.os == 'Linux' }} diff --git a/.travis.yml b/.travis.yml index f334f6b..aee6913 100644 --- a/.travis.yml +++ b/.travis.yml @@ -69,6 +69,8 @@ jobs: - python -m pip install --user --upgrade tox virtualenv==20.0.23 # Install quart to run the http2 tests. - python -m pip install --user quart +# DISPATCH-1883: Install selectors to run tcp echo server/client tools +- python -m pip install --user selectors env: - CC=clang-10 - CXX=clang++-10 diff --git a/README b/README index b394e2b..b0ff0fe 100644 --- a/README +++ b/README @@ -75,10 +75,15 @@ The HTTP2 system tests will run only if 2. Python Web Microframework Quart version >= 0.13 3. curl is available -To install pip and Quart +The TCP system tests (tests/system_tests_tcp_adaptor.py) use the +Python selectors module when running echo clients and servers. +The TCP system tests run only if Python selectors is available. + +To install pip, Quart, and selectors - curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py - python3 get-pip.py - pip3 install --user quart + - pip3 install --user selectors Test Suite Code Coverage (GNU tools only) diff --git a/dockerfiles/Dockerfile-fedora b/dockerfiles/Dockerfile-fedora index 72b7f18..5330e24 100644 --- a/dockerfiles/Dockerfile-fedora +++ b/dockerfiles/Dockerfile-fedora @@ -35,6 +35,7 @@ RUN dnf -y install gcc gcc-c++ cmake openssl-devel cyrus-sasl-devel cyrus-sasl-p RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py RUN python3 get-pip.py RUN pip3 install --user quart +RUN pip3 install --user selectors # Create a main directory and clone the qpid-proton repo from github RUN mkdir /main && cd /main && git clone https://gitbox.apache.org/repos/asf/qpid-proton.git && cd /main/qpid-proton && mkdir /main/qpid-proton/build diff --git a/dockerfiles/Dockerfile-ubuntu b/dockerfiles/Dockerfile-ubuntu index dde2c8b..ecbcd43 100644 --- a/dockerfiles/Dockerfile-ubuntu +++ b/dockerfiles/Dockerfile-ubuntu @@ -32,6 +32,7 @@ RUN apt-get update && \ RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py RUN python3 get-pip.py RUN pip3 install --user quart +RUN pip3 install --user selectors RUN git clone https://gitbox.apache.org/repos/asf/qpid-dispatch.git && cd /qpid-dispatch && git submodule add https://gitbox.apache.org/repos/asf/qpid-proton.git && git submodule update --init - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch master updated: NO-JIRA: Add include guard to http2_adaptor.h file
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new 4115ecd NO-JIRA: Add include guard to http2_adaptor.h file 4115ecd is described below commit 4115ecd92e1d23ee2091a8852fd9e7ca07aa0b09 Author: Chuck Rolke AuthorDate: Tue Dec 15 16:24:31 2020 -0500 NO-JIRA: Add include guard to http2_adaptor.h file --- src/adaptors/http2/http2_adaptor.h | 4 1 file changed, 4 insertions(+) diff --git a/src/adaptors/http2/http2_adaptor.h b/src/adaptors/http2/http2_adaptor.h index c27e2a4..decbfc1 100644 --- a/src/adaptors/http2/http2_adaptor.h +++ b/src/adaptors/http2/http2_adaptor.h @@ -1,3 +1,6 @@ +#ifndef __http2_adaptor_h__ +#define __http2_adaptor_h__ 1 + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -210,3 +213,4 @@ ALLOC_DECLARE(qdr_http2_connection_t); ALLOC_DECLARE(qd_http2_buffer_t); +#endif // __http2_adaptor_h__ - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch master updated: DISPATCH-1882: TCP test handles empty echo servers dictionary
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new f632206 DISPATCH-1882: TCP test handles empty echo servers dictionary f632206 is described below commit f6322060d6677415b693ba24b337cfd75cf419c7 Author: Chuck Rolke AuthorDate: Mon Dec 14 09:54:59 2020 -0500 DISPATCH-1882: TCP test handles empty echo servers dictionary --- tests/system_tests_tcp_adaptor.py | 11 +++ 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index e73ec9a..236f4c5 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -167,6 +167,9 @@ class TcpAdaptor(TestCase): # TCP siteId for listeners and connectors site = "mySite" +# Each router has an echo server to which it connects +echo_servers = {} + @classmethod def setUpClass(cls): """Start a router""" @@ -361,7 +364,6 @@ class TcpAdaptor(TestCase): cls.print_logs_client = True # start echo servers -cls.echo_servers = {} for rtr in cls.router_order: test_name = "TcpAdaptor" server_prefix = "ECHO_SERVER %s ES_%s" % (test_name, rtr) @@ -407,9 +409,10 @@ class TcpAdaptor(TestCase): def tearDownClass(cls): # stop echo servers for rtr in cls.router_order: -server = cls.echo_servers[rtr] -cls.logger.log("TCP_TEST Stopping echo server %s" % rtr) -server.wait() +server = cls.echo_servers.get(rtr) +if not server is None: +cls.logger.log("TCP_TEST Stopping echo server %s" % rtr) +server.wait() super(TcpAdaptor, cls).tearDownClass() # - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch master updated (06eef72 -> 31d69f1)
This is an automated email from the ASF dual-hosted git repository. chug pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git. from 06eef72 NO-JIRA: readme tweak, prod CI jobs new dfe6643 Dataplane: Moved protocol-adapter functions from router_core.h into proto_adaptor.h new 50ff2d4 Dataplane: Re-factored direct-AMQP to use the protocol-adaptor interface. new 852b57e Dataplane: Exposed the protocol name with the connection entity. new b5b4938 Dataplane: Improved sasl-plain test by using symbolic attribute names, not positional indices. new 3bd7195 Dataplane: Added adaptor plugin capability. Started first reference adaptor (TCP). new 2d241e5 Dataplane: Added documentation for the protocol adaptor callbacks. new 1bf5083 Dataplane: Removed old handler call which is not used anymore. new 513c365 Dataplane: minor cleanup new 86cafb9 Dataplane: Added setter for dynamic in qdr_terminus_t new a42f462 Dataplane: Renamed tcp_adaptor to reference_adaptor. Added more test content to the reference adaptor. It now sends messages to a fixed address. Fixed qdr_terminus_format to show the dynamically-assigned address for dynamis termini. new d2b87bb Dataplane: connection-activate is now routed through the protocol adapter that handles the connection. new ff4d6cc Dataplane: Updated the reference adaptor to implement connection activation new 962d52e Dataplane: Fixed order problem in shutting down the router. Disabled the reference adaptor by default (uncomment the last line to re-enable). The reference adaptor causes test failures. new c09b305 Dataplane: Added a 5th message compose variant to provide: - optional properties - optional application-properties - optional body in the form of a buffer list - indication of receive-complete new b6982f6 Dataplane: Changed the new compose function to have only one field for headers. This field can have both properties and application properties. It's more efficient put together like this. new 84f1d3e Dataplane: Added message method to set send-complete. Added reference code to receive messages (non streamed). new ccd1b42 Dataplane: Exposed access to connection-ids from server. Moved the generation of the "connection opened" log from router_node.c to the core module. This causes the log to be raised for all protocol adaptors. new 33066cb Dataplane: Added calls in message.h for streaming putput from adaptors. Renamed qdr_deliver_continue* to qdr_delivery_continue* new a98fead Dataplane: Set proper buffer refcount in messages during buffer-extend. This ensures that the streaming buffers are properly freed when no longer needed. new 8a2326d Dataplane: Updates to the message-extend (return buffer count for flow control). Added bidirectional streaming test to ref adaptor. new b9a4a87 Dataplane: Added API for streaming data out of messages. This commit adds the requirement for Proton raw-connection support. new 63ab97f Dataplane: Added no_route and initial_delivery on link-first-attach. new a9368ea Dataplane: (from gsim) Implementation of qd_message_read_body. new 3934871 Dataplane: Added implementation of qd_message_release_body. new 18a1986 Dataplane - Added qd_buffer_list_append function to efficiently accumulate data in buffer lists. new 2f4610b Dataplane: WIP changes new 99f0aca Dataplane: Message parsing bug fixed: now properly handles empty var-length fields. new 2c407b7 Dataplane: Fixed message parsing so it can handle partial and streaming content. new cd70101 Dataplane: disabled reference adaptor new 0cc281b Dataplane: WIP new 4846f1c Dataplane - Added the body_data data structure for reading streaming messages. WIP - The following functions (in message.c) need to be implemented: find_last_buffer qd_message_body_data_iterator qd_message_body_data_buffer_count qd_message_body_data_buffers qd_message_body_data_release new bcc6c80 DISPATCH-1742 Dataplane: Fixups from rebase to master. new b85bed4 DISPATCH-1742: Added receive_complete parameter to qd_message_compose_3() and qd_message_compose_3() in message.c new 3126244 DISPATCH-1743 - Introduce a HTTP/2 Adapter. This adaptor will act as the HTTP/2 <-->AMQP bridge new 820b005 DISPATCH-1742 - Completed implementation of outbound streaming path new 8244178 DISPATCH-1742 - Fixed compilation error new f061cf7 DISPATCH-1744: refactor common HTTP code new 365839e DISPATCH-1742 - Removed compiler warning by initializing a boolean value. new 96e6021 DISPATCH-1742 - Handle zero-length body sections. Fixed a bug in body content length calculation. new d533e9d DISPATCH-1744: Fixed minor error in field name new 5aebe32 DISPATCH-1654: Initial TCP adaptor new 70ea8c9 DISPA
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1876: reproducer self test
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 3088407 DISPATCH-1876: reproducer self test 3088407 is described below commit 3088407e2c635a71f0cc1564c18dd7239c138ebe Author: Chuck Rolke AuthorDate: Tue Dec 8 16:14:37 2020 -0500 DISPATCH-1876: reproducer self test --- tests/system_tests_tcp_adaptor.py | 14 ++ 1 file changed, 14 insertions(+) diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index 0f45987..4c4131f 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -672,6 +672,20 @@ class TcpAdaptor(TestCase): assert result is None, "TCP_TEST Stop %s FAIL: %s" % (name, result) self.logger.log("TCP_TEST Stop %s SUCCESS" % name) +@SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON) +def test_20_tcp_connect_disconnect(self): +self.skipTest("DISPATCH-1876 reproducer: disabled until DISPATCH-1876 is fixed") +name = "test_20_tcp_connect_disconnect" +self.logger.log("TCP_TEST Start %s" % name) +pairs = [self.EchoPair(self.INTA, self.INTA, sizes=[0])] +result = self.do_tcp_echo_n_routers(name, pairs) +if result is not None: +print(result) +sys.stdout.flush() +assert result is None, "TCP_TEST Stop %s FAIL: %s" % (name, result) +# TODO: This test passes but in passing router INTA crashes undetected. +self.logger.log("TCP_TEST Stop %s SUCCESS" % name) + # concurrent messages @SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON) def test_50_concurrent(self): - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1877: TCP echo client test has connect/disconnect capability
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new bf0672e DISPATCH-1877: TCP echo client test has connect/disconnect capability bf0672e is described below commit bf0672e7932fe14b77f49627a3ae64da8f836c7d Author: Chuck Rolke AuthorDate: Tue Dec 8 15:16:16 2020 -0500 DISPATCH-1877: TCP echo client test has connect/disconnect capability With '--size 0' or '--count 0' the test connects and disconnects quickly. --- tests/TCP_echo_client.py | 74 +--- 1 file changed, 39 insertions(+), 35 deletions(-) diff --git a/tests/TCP_echo_client.py b/tests/TCP_echo_client.py index e747896..87d0082 100755 --- a/tests/TCP_echo_client.py +++ b/tests/TCP_echo_client.py @@ -105,35 +105,39 @@ class TcpEchoClient: total_sent = 0 total_rcvd = 0 -# outbound payload -payload_out = [] -out_list_idx = 0 # current _out array being sent -out_byte_idx = 0 # next-to-send in current array -out_ready_to_send = True -# Generate unique content for each message so you can tell where the message -# or fragment belongs in the whole stream. Chunks look like: -#b'[localhost:3:6:0]g' -#host: localhost -#port: 3 -#index: 6 -#offset into message: 0 -CONTENT_CHUNK_SIZE = 50 # Content repeats after chunks this big - used by echo server, too -for idx in range(self.count): -body_msg = "" -padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[idx % 30] -while len(body_msg) < self.size: -chunk = "[%s:%d:%d:%d]" % (self.host, self.port, idx, len(body_msg)) -padlen = CONTENT_CHUNK_SIZE - len(chunk) -chunk += padchar * padlen -body_msg += chunk -if len(body_msg) > self.size: -body_msg = body_msg[:self.size] -payload_out.append(bytearray(body_msg.encode())) -# incoming payloads -payload_in = [] -in_list_idx = 0 # current _in array being received -for i in range(self.count): -payload_in.append(bytearray()) +if self.count > 0 and self.size > 0: +# outbound payload only if count and size both greater than zero +payload_out = [] +out_list_idx = 0 # current _out array being sent +out_byte_idx = 0 # next-to-send in current array +out_ready_to_send = True +# Generate unique content for each message so you can tell where the message +# or fragment belongs in the whole stream. Chunks look like: +#b'[localhost:3:6:0]g' +#host: localhost +#port: 3 +#index: 6 +#offset into message: 0 +CONTENT_CHUNK_SIZE = 50 # Content repeats after chunks this big - used by echo server, too +for idx in range(self.count): +body_msg = "" +padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[idx % 30] +while len(body_msg) < self.size: +chunk = "[%s:%d:%d:%d]" % (self.host, self.port, idx, len(body_msg)) +padlen = CONTENT_CHUNK_SIZE - len(chunk) +chunk += padchar * padlen +body_msg += chunk +if len(body_msg) > self.size: +body_msg = body_msg[:self.size] +payload_out.append(bytearray(body_msg.encode())) +# incoming payloads +payload_in = [] +in_list_idx = 0 # current _in array being received +for i in range(self.count): +payload_in.append(bytearray()) +else: +# when count or size .LE. zero then just connect-disconnect +self.keep_running = False # set up connection host_address = (self.host, self.port) @@ -231,9 +235,9 @@ def main(argv): p.add_argument('--port', '-p', type=int, help='Required target port number') p.add_argument('--size', '-s', type=int, default=100, const=1, nargs='?', - help='Size of payload in bytes') +
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1873: TCP echo test client prints errors to stderr
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 0aaba5a DISPATCH-1873: TCP echo test client prints errors to stderr 0aaba5a is described below commit 0aaba5a5eda991f4499ebe7698b1da4adbff1f95 Author: Chuck Rolke AuthorDate: Tue Dec 8 09:38:38 2020 -0500 DISPATCH-1873: TCP echo test client prints errors to stderr Fix applies only when run as main. --- tests/TCP_echo_client.py | 12 1 file changed, 12 insertions(+) diff --git a/tests/TCP_echo_client.py b/tests/TCP_echo_client.py index aa29bc9..e747896 100755 --- a/tests/TCP_echo_client.py +++ b/tests/TCP_echo_client.py @@ -191,6 +191,8 @@ class TcpEchoClient: else: # socket closed self.keep_running = False +if not in_list_idx == self.count: +self.error = "ERROR server closed. Echoed %d of %d messages." % (in_list_idx, self.count) if self.keep_running and mask & selectors.EVENT_WRITE: if out_ready_to_send: n_sent = self.sock.send(payload_out[out_list_idx][out_byte_idx:]) @@ -299,10 +301,20 @@ def main(argv): keep_running = False except Exception: +client.error = "ERROR: exception : '%s'" % traceback.format_exc() if logger is not None: logger.log("%s Exception: %s" % (prefix, traceback.format_exc())) retval = 1 +if client.error is not None: +# write client errors to stderr +def eprint(*args, **kwargs): +print(*args, file=sys.stderr, **kwargs) + +elines = client.error.split("\n") +for line in elines: +eprint("ERROR:", prefix, line) + return retval - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: NO-JIRA: Tcp test client data verification success case efficiency gain
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new a2f111f NO-JIRA: Tcp test client data verification success case efficiency gain a2f111f is described below commit a2f111f2512db94538cd8aa461e9aba651d5f889 Author: Chuck Rolke AuthorDate: Fri Dec 4 16:43:59 2020 -0500 NO-JIRA: Tcp test client data verification success case efficiency gain --- tests/TCP_echo_client.py | 20 +++- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/tests/TCP_echo_client.py b/tests/TCP_echo_client.py index fe673b4..aa29bc9 100755 --- a/tests/TCP_echo_client.py +++ b/tests/TCP_echo_client.py @@ -168,15 +168,17 @@ class TcpEchoClient: # Received all bytes of all chunks - done. self.keep_running = False # Verify the received data -for idxc in range(self.count): -for idxs in range(self.size): -ob = payload_out[idxc][idxs] -ib = payload_in[idxc][idxs] -if ob != ib: -self.error = "%s ERROR Rcvd message verify fail. row:%d, col:%d, " \ - "expected:%s, actual:%s" \ - % (self.prefix, idxc, idxs, repr(ob), repr(ib)) -break +if not payload_in == payload_out: +for idxc in range(self.count): +if not payload_in[idxc] == payload_out[idxc]: +for idxs in range(self.size): +ob = payload_out[idxc][idxs] +ib = payload_in[idxc][idxs] +if ob != ib: +self.error = "%s ERROR Rcvd message verify fail. row:%d, col:%d, " \ + "expected:%s, actual:%s" \ + % (self.prefix, idxc, idxs, repr(ob), repr(ib)) +break else: out_ready_to_send = True sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE) - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1846: Fix TCP adaptor test stall
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new b182b2e DISPATCH-1846: Fix TCP adaptor test stall b182b2e is described below commit b182b2ea7ed571f5ec9f30bc3ab0a9d08f70af82 Author: Chuck Rolke AuthorDate: Fri Dec 4 11:19:35 2020 -0500 DISPATCH-1846: Fix TCP adaptor test stall * Wait until all echo server addresses are known to all interior routers. * Add 'ES_' prefix to echo server addresses. This prevents confusing echo server mobile address 'INTA' with router address 'INTA'. (Note that everything works without the prefix. For debugging the unique addresses are easier.) * Turn off echo server logging. This closes #934 --- tests/TCP_echo_client.py | 1 - tests/system_tests_tcp_adaptor.py | 36 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/tests/TCP_echo_client.py b/tests/TCP_echo_client.py index 56280aa..fe673b4 100755 --- a/tests/TCP_echo_client.py +++ b/tests/TCP_echo_client.py @@ -147,7 +147,6 @@ class TcpEchoClient: selectors.EVENT_READ | selectors.EVENT_WRITE) # event loop -time.sleep(0.1) # DISPATCH-1820 investigation while self.keep_running: if self.timeout > 0.0: elapsed = time.time() - start_time diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index 3c2b9c3..0f45987 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -35,6 +35,7 @@ from system_test import QdManager from system_test import unittest from system_test import DIR from system_test import SkipIfNeeded +from system_test import Process from qpid_dispatch.management.client import Node from subprocess import PIPE, STDOUT @@ -199,7 +200,7 @@ class TcpAdaptor(TestCase): 'siteId': cls.site}), ('tcpConnector', {'host': "127.0.0.1", 'port': cls.tcp_server_listener_ports[name], - 'address': name, + 'address': 'ES_' + name, 'siteId': cls.site}) ] if connection: @@ -208,7 +209,7 @@ class TcpAdaptor(TestCase): for rtr in cls.router_order: listener = {'host': "0.0.0.0", 'port': cls.tcp_client_listener_ports[name][rtr], -'address': rtr, +'address': 'ES_' + rtr, 'siteId': cls.site} tup = [(('tcpListener', listener))] listeners.extend( tup ) @@ -356,14 +357,14 @@ class TcpAdaptor(TestCase): cls.INTC.wait_router_connected('INTB') # define logging levels -cls.print_logs_server = True +cls.print_logs_server = False cls.print_logs_client = True # start echo servers cls.echo_servers = {} for rtr in cls.router_order: test_name = "TcpAdaptor" -server_prefix = "ECHO_SERVER %s %s" % (test_name, rtr) +server_prefix = "ECHO_SERVER %s ES_%s" % (test_name, rtr) server_logger = Logger(title=test_name, print_to_console=cls.print_logs_server, save_for_dump=False, @@ -375,6 +376,33 @@ class TcpAdaptor(TestCase): assert server.is_running cls.echo_servers[rtr] = server +# wait for server addresses (mobile ES_) to propagate to all interior routers +interior_rtrs = [rtr for rtr in cls.router_order if rtr.startswith('I')] +found_all = False +while not found_all: +found_all = True +cls.logger.log("TCP_TEST Poll wait for echo server addresses to propagate") +for rtr in interior_rtrs: +# query each interior for addresses +p = Process( +['qdstat', '-b', str(cls.router_dict[rtr].addresses[0]), '-a'], +name='qdstat-snap1', stdout=PIPE, expect=None, +universal_newlines=True) +out = p.communicate()[0] +# examine what this router can see; signal poll loop to continue or not +lines = out.split("\n") +
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1863: add proton event names to event handler logs
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new dee76fd DISPATCH-1863: add proton event names to event handler logs dee76fd is described below commit dee76fdee0fa76a2afbe936535aac66e751741bd Author: Chuck Rolke AuthorDate: Wed Dec 2 16:31:37 2020 -0500 DISPATCH-1863: add proton event names to event handler logs --- src/adaptors/tcp_adaptor.c | 28 ++-- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 2b6fa98..a8d176a 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -496,12 +496,12 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void case PN_RAW_CONNECTION_CONNECTED: { if (conn->ingress) { qdr_tcp_connection_ingress_accept(conn); -qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Accepted from %s (global_id=%s)", conn->conn_id, conn->remote_address, conn->global_id); +qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED Ingress accepted to %s from %s (global_id=%s)", conn->conn_id, conn->config.host_port, conn->remote_address, conn->global_id); break; } else { conn->remote_address = get_address_string(conn->pn_raw_conn); conn->opened_time = tcp_adaptor->core->uptime_ticks; -qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Connected", conn->conn_id); +qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED Egress connected to %s", conn->conn_id, conn->remote_address); if (!!conn->initial_delivery) { qdr_tcp_open_server_side_connection(conn); conn->initial_delivery = 0; @@ -512,34 +512,34 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void } } case PN_RAW_CONNECTION_CLOSED_READ: { -qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Closed for reading", conn->conn_id); +qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id); pn_raw_connection_close(conn->pn_raw_conn); break; } case PN_RAW_CONNECTION_CLOSED_WRITE: { -qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Closed for writing", conn->conn_id); +qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_WRITE", conn->conn_id); pn_raw_connection_close(conn->pn_raw_conn); break; } case PN_RAW_CONNECTION_DISCONNECTED: { -qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Disconnected", conn->conn_id); +qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] PN_RAW_CONNECTION_DISCONNECTED", conn->conn_id); handle_disconnected(conn); break; } case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: { -qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need write buffers", conn->conn_id); +qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_NEED_WRITE_BUFFERS", conn->conn_id); while (qdr_connection_process(conn->qdr_conn)) {} handle_outgoing(conn); break; } case PN_RAW_CONNECTION_NEED_READ_BUFFERS: { -qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need read buffers", conn->conn_id); +qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_NEED_READ_BUFFERS", conn->conn_id); while (qdr_connection_process(conn->qdr_conn)) {} handle_incoming(conn); break; } case PN_RAW_CONNECTION_WAKE: { -qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Wake-up", conn->conn_id); +qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_WAKE", conn->conn_id); while (qdr_connection_process(conn->qdr_conn)) {} break; } @@ -547,7 +547,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void int read = handle_incoming(conn); conn->last_in_time = tcp_adaptor->core->uptime_ticks; conn->bytes_in += read; -qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Read %i bytes", conn->conn_id, read); +qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_READ Read %i bytes", conn->conn_id, read); while (qdr_connection_process(conn->qdr_conn)) {}
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1862: TCP adaptor - rename structure members
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 3f9d377 DISPATCH-1862: TCP adaptor - rename structure members 3f9d377 is described below commit 3f9d377178c81d110061ff89d81fc2bf808062bb Author: Chuck Rolke AuthorDate: Wed Dec 2 15:45:03 2020 -0500 DISPATCH-1862: TCP adaptor - rename structure members --- src/adaptors/tcp_adaptor.c | 72 +++--- 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 98df531..2b6fa98 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -40,13 +40,13 @@ typedef struct qdr_tcp_connection_t qdr_tcp_connection_t; struct qdr_tcp_connection_t { qd_handler_context_t context; char *reply_to; -qdr_connection_t *conn; +qdr_connection_t *qdr_conn; uint64_t conn_id; qdr_link_t *incoming; uint64_t incoming_id; qdr_link_t *outgoing; uint64_t outgoing_id; -pn_raw_connection_t *socket; +pn_raw_connection_t *pn_raw_conn; qdr_delivery_t *instream; qdr_delivery_t *outstream; bool ingress; @@ -107,10 +107,10 @@ static void on_activate(void *context) qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context; qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] on_activate", conn->conn_id); -while (qdr_connection_process(conn->conn)) {} +while (qdr_connection_process(conn->qdr_conn)) {} if (conn->egress_dispatcher && conn->connector_closed) { -qdr_connection_closed(conn->conn); -qdr_connection_set_context(conn->conn, 0); +qdr_connection_closed(conn->qdr_conn); +qdr_connection_set_context(conn->qdr_conn, 0); free_qdr_tcp_connection(conn); } } @@ -119,8 +119,8 @@ static void grant_read_buffers(qdr_tcp_connection_t *conn) { pn_raw_buffer_t raw_buffers[READ_BUFFERS]; // Give proactor more read buffers for the socket -if (!pn_raw_connection_is_read_closed(conn->socket)) { -size_t desired = pn_raw_connection_read_buffers_capacity(conn->socket); +if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn)) { +size_t desired = pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn); while (desired) { size_t i; for (i = 0; i < desired && i < READ_BUFFERS; ++i) { @@ -132,7 +132,7 @@ static void grant_read_buffers(qdr_tcp_connection_t *conn) raw_buffers[i].context = (uintptr_t) buf; } desired -= i; -pn_raw_connection_give_read_buffers(conn->socket, raw_buffers, i); +pn_raw_connection_give_read_buffers(conn->pn_raw_conn, raw_buffers, i); } } } @@ -157,7 +157,7 @@ static int handle_incoming(qdr_tcp_connection_t *conn) pn_raw_buffer_t raw_buffers[READ_BUFFERS]; size_t n; int count = 0; -while ( (n = pn_raw_connection_take_read_buffers(conn->socket, raw_buffers, READ_BUFFERS)) ) { +while ( (n = pn_raw_connection_take_read_buffers(conn->pn_raw_conn, raw_buffers, READ_BUFFERS)) ) { for (size_t i = 0; i < n && raw_buffers[i].bytes; ++i) { qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context; qd_buffer_insert(buf, raw_buffers[i].size); @@ -254,8 +254,8 @@ static void handle_disconnected(qdr_tcp_connection_t* conn) qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach outgoing", conn->conn_id, conn->outgoing_id); qdr_link_detach(conn->outgoing, QD_LOST, 0); } -qdr_connection_closed(conn->conn); -qdr_connection_set_context(conn->conn, 0); +qdr_connection_closed(conn->qdr_conn); +qdr_connection_set_context(conn->qdr_conn, 0); //need to free on core thread to avoid deleting while in use by management agent qdr_action_t *action = qdr_action(qdr_del_tcp_connection_CT, "delete_tcp_connection"); action->args.general.context_1 = conn; @@ -334,7 +334,7 @@ static bool write_outgoing_buffs(qdr_tcp_connection_t *conn) if (conn->outgoing_buff_count == 0) { result = true; } else { -size_t used = pn_raw_connection_write_buffers(conn->socket, +size_t used = pn_raw_connection_write_buffers(conn->pn_raw_conn, &conn->outgoing_buffs[conn->outgoing_buff_idx],
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: NO-JIRA: remove stray proton library function definition
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new fddb8e1 NO-JIRA: remove stray proton library function definition fddb8e1 is described below commit fddb8e1ff93439f5e3a0beec5700a34e9ac6f599 Author: Chuck Rolke AuthorDate: Tue Dec 1 09:34:30 2020 -0500 NO-JIRA: remove stray proton library function definition --- src/adaptors/tcp_adaptor.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index e26a94d..9c94c86 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -546,7 +546,6 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void } case PN_RAW_CONNECTION_WRITTEN: { pn_raw_buffer_t buffs[WRITE_BUFFERS]; -size_t pn_raw_connection_take_written_buffers(pn_raw_connection_t *connection, pn_raw_buffer_t *buffers, size_t num); size_t n; size_t written = 0; while ( (n = pn_raw_connection_take_written_buffers(conn->socket, buffs, WRITE_BUFFERS)) ) { - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1851: Show proper link id in second_attach
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 648ff4c DISPATCH-1851: Show proper link id in second_attach 648ff4c is described below commit 648ff4ce4bf87237ae27c845119d679f91c3b58a Author: Chuck Rolke AuthorDate: Wed Nov 25 10:35:52 2020 -0500 DISPATCH-1851: Show proper link id in second_attach --- src/adaptors/tcp_adaptor.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index fe8ee26..e26a94d 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -907,8 +907,8 @@ static void qdr_tcp_second_attach(void *context, qdr_link_t *link, void* link_context = qdr_link_get_context(link); if (link_context) { qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context; -qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_second_attach", tc->conn_id, qdr_tcp_conn_linkid(tc)); if (qdr_link_direction(link) == QD_OUTGOING) { +qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_second_attach", tc->conn_id, tc->outgoing_id); if (tc->ingress) { qdr_tcp_connection_copy_reply_to(tc, qdr_terminus_get_address(source)); // for ingress, can start reading from socket once we have @@ -919,6 +919,7 @@ static void qdr_tcp_second_attach(void *context, qdr_link_t *link, } qdr_link_flow(tcp_adaptor->core, link, 10, false); } else if (!tc->ingress) { +qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_second_attach", tc->conn_id, tc->incoming_id); //for egress we can start reading from the socket once we //have the link to send messages over grant_read_buffers(tc); - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1851: TCP adaptor - add more connection and link IDs to log output
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 29f4929 DISPATCH-1851: TCP adaptor - add more connection and link IDs to log output 29f4929 is described below commit 29f49295b3e695211b07241651fc5375894d71fe Author: Chuck Rolke AuthorDate: Tue Nov 24 09:40:29 2020 -0500 DISPATCH-1851: TCP adaptor - add more connection and link IDs to log output This closes #928 --- src/adaptors/tcp_adaptor.c | 201 - 1 file changed, 143 insertions(+), 58 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 7476808..fe8ee26 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -94,6 +94,12 @@ static void qdr_del_tcp_connection_CT(qdr_core_t *core, qdr_action_t *action, bo static void handle_disconnected(qdr_tcp_connection_t* conn); static void free_qdr_tcp_connection(qdr_tcp_connection_t* conn); +static inline uint64_t qdr_tcp_conn_linkid(const qdr_tcp_connection_t *conn) +{ +assert(conn); +return conn->instream ? conn->incoming_id : conn->outgoing_id; +} + static void on_activate(void *context) { qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context; @@ -227,18 +233,23 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc) static void handle_disconnected(qdr_tcp_connection_t* conn) { +qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] handle_disconnected", conn->conn_id); if (conn->instream) { +qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close instream", conn->conn_id, conn->incoming_id); qd_message_set_receive_complete(qdr_delivery_message(conn->instream)); qdr_delivery_continue(tcp_adaptor->core, conn->instream, true); qdr_delivery_decref(tcp_adaptor->core, conn->instream, "tcp-adaptor.handle_disconnected"); } if (conn->outstream) { +qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected close outstream", conn->conn_id, conn->outgoing_id); qdr_delivery_decref(tcp_adaptor->core, conn->outstream, "tcp-adaptor.handle_disconnected"); } if (conn->incoming) { +qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach incoming", conn->conn_id, conn->incoming_id); qdr_link_detach(conn->incoming, QD_LOST, 0); } if (conn->outgoing) { +qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach outgoing", conn->conn_id, conn->outgoing_id); qdr_link_detach(conn->outgoing, QD_LOST, 0); } qdr_connection_closed(conn->conn); @@ -868,7 +879,14 @@ static void qdr_tcp_first_attach(void *context, qdr_connection_t *conn, qdr_link qdr_terminus_t *source, qdr_terminus_t *target, qd_session_class_t session_class) { -qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_first_attach"); +void *tcontext = qdr_connection_get_context(conn); +if (tcontext) { +qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) tcontext; +qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_first_attach: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn)); +} else { +qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_first_attach: no link context"); +assert(false); +} } static void qdr_tcp_connection_copy_reply_to(qdr_tcp_connection_t* tc, qd_iterator_t* reply_to) @@ -886,11 +904,10 @@ static void qdr_tcp_connection_copy_global_id(qdr_tcp_connection_t* tc, qd_itera static void qdr_tcp_second_attach(void *context, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target) { -qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_second_attach"); - void* link_context = qdr_link_get_context(link); if (link_context) { qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context; +qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_second_attach", tc->conn_id, qdr_tcp_conn_linkid(tc)); if (qdr_link_direction(link) == QD_OUTGOING) { if (tc->ingress) { qdr_tcp_connection_copy_reply_t
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1848: TCP echo client/server handle socket errors better
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 6c88bd7 DISPATCH-1848: TCP echo client/server handle socket errors better 6c88bd7 is described below commit 6c88bd7cfa40d00e3e8e007ea2368226bfe1cb6f Author: Chuck Rolke AuthorDate: Wed Nov 18 15:13:56 2020 -0500 DISPATCH-1848: TCP echo client/server handle socket errors better And various PEP-8 compliance changes. --- tests/TCP_echo_client.py | 85 ++- tests/TCP_echo_server.py | 94 2 files changed, 101 insertions(+), 78 deletions(-) diff --git a/tests/TCP_echo_client.py b/tests/TCP_echo_client.py index 8465ad5..56280aa 100755 --- a/tests/TCP_echo_client.py +++ b/tests/TCP_echo_client.py @@ -38,14 +38,16 @@ import types from system_test import Logger from system_test import TIMEOUT -class GracefulKiller: - kill_now = False - def __init__(self): -signal.signal(signal.SIGINT, self.exit_gracefully) -signal.signal(signal.SIGTERM, self.exit_gracefully) - def exit_gracefully(self,signum, frame): -self.kill_now = True +class GracefulExitSignaler: +kill_now = False + +def __init__(self): +signal.signal(signal.SIGINT, self.exit_gracefully) +signal.signal(signal.SIGTERM, self.exit_gracefully) + +def exit_gracefully(self, signum, frame): +self.kill_now = True def split_chunk_for_display(raw_bytes): @@ -63,24 +65,24 @@ def split_chunk_for_display(raw_bytes): return result -class TcpEchoClient(): +class TcpEchoClient: def __init__(self, prefix, host, port, size, count, timeout, logger): -''' +""" :param host: connect to this host :param port: connect to this port :param size: size of individual payload chunks in bytes :param count: number of payload chunks -:param strategy: "1" Send one payload; # TODO +:param strategy: "1" Send one payload; # TODO more strategies Recv one payload :param logger: Logger() object :return: -''' +""" # Start up self.sock = None self.prefix = prefix self.host = host -self.port = port +self.port = int(port) self.size = size self.count = count self.timeout = timeout @@ -94,12 +96,12 @@ class TcpEchoClient(): self._thread.start() def run(self): -self.logger.log("%s Client is starting up" % (self.prefix)) +self.logger.log("%s Client is starting up" % self.prefix) try: start_time = time.time() self.is_running = True self.logger.log('%s Connecting to host:%s, port:%d, size:%d, count:%d' % - (self.prefix, self.host, self.port, self.size, self.count)) +(self.prefix, self.host, self.port, self.size, self.count)) total_sent = 0 total_rcvd = 0 @@ -115,7 +117,7 @@ class TcpEchoClient(): #port: 3 #index: 6 #offset into message: 0 -CONTENT_CHUNK_SIZE = 50 # Content repeats after chunks this big - used by echo server, too +CONTENT_CHUNK_SIZE = 50 # Content repeats after chunks this big - used by echo server, too for idx in range(self.count): body_msg = "" padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[idx % 30] @@ -128,8 +130,8 @@ class TcpEchoClient(): body_msg = body_msg[:self.size] payload_out.append(bytearray(body_msg.encode())) # incoming payloads -payload_in = [] -in_list_idx = 0 # current _in array being received +payload_in = [] +in_list_idx = 0 # current _in array being received for i in range(self.count): payload_in.append(bytearray()) @@ -145,13 +147,13 @@ class TcpEchoClient(): selectors.EVENT_READ | selectors.EVENT_WRITE) # event loop -time.sleep(0.1) # DISPATCH-1820 investigation +time.sleep(0.1) # DISPATCH-1820 investigation while self.keep_running: if self.timeout > 0.0: elapsed = time.time() - start_time if elapsed > self.timeout: self.exit_status = "%s Exiting due to timeout. Total sent= %d, total rcvd= %d" % \ -(self.prefix, total_sent, total_rcvd
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1846: Improve tcp adaptor basic connectivity test
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 40dc885 DISPATCH-1846: Improve tcp adaptor basic connectivity test 40dc885 is described below commit 40dc885914a2fbac47f57a806022cd84d85148cc Author: Chuck Rolke AuthorDate: Wed Nov 18 13:25:36 2020 -0500 DISPATCH-1846: Improve tcp adaptor basic connectivity test Use every tcp listener to connect to every echo server in the router network. This test hangs at test_01_tcp_INTA_INTC. This sounds like issue https://issues.apache.org/jira/browse/DISPATCH-1829 "multi-hop TCP does not seem to work" --- tests/system_tests_tcp_adaptor.py | 145 ++ 1 file changed, 38 insertions(+), 107 deletions(-) diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index 173626d..3c2b9c3 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -335,6 +335,17 @@ class TcpAdaptor(TestCase): cls.EC1 = cls.routers[7] cls.EC2 = cls.routers[8] +cls.router_dict = {} +cls.router_dict['INTA'] = cls.INTA +cls.router_dict['INTB'] = cls.INTB +cls.router_dict['INTC'] = cls.INTC +cls.router_dict['EA1'] = cls.EA1 +cls.router_dict['EA2'] = cls.EA2 +cls.router_dict['EB1'] = cls.EB1 +cls.router_dict['EB2'] = cls.EB2 +cls.router_dict['EC1'] = cls.EC1 +cls.router_dict['EC2'] = cls.EC2 + cls.logger.log("TCP_TEST INTA waiting for connection to INTB") cls.INTA.wait_router_connected('INTB') cls.logger.log("TCP_TEST INTB waiting for connection to INTA") @@ -566,115 +577,23 @@ class TcpAdaptor(TestCase): # A series of 1-byte messsages, one at a time, to prove general connectivity # @SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON) -def test_01a_tcp_INTA_INTA(self): +def test_01_tcp_basic_connectivity(self): """ -Connectivity - INTA only +Echo a series of 1-byte messages, one at a time, to prove general connectivity. +Every listener is tried. Proves every router can forward to servers on +every other router. """ -name = "test_01_tcp_INTA_INTA" -self.logger.log("TCP_TEST Start %s" % name) -pairs = [self.EchoPair(self.INTA, self.INTA)] -result = self.do_tcp_echo_n_routers(name, pairs) -if result is not None: -print(result) -sys.stdout.flush() -assert result is None, "TCP_TEST Stop %s FAIL: %s" % (name, result) -self.logger.log("TCP_TEST Stop %s SUCCESS" % name) - -@SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON) -def test_01b_tcp_INTB_INTB(self): -name = "test_01b_tcp_INTB_INTB" -self.logger.log("TCP_TEST Start %s" % name) -pairs = [self.EchoPair(self.INTB, self.INTB)] -result = self.do_tcp_echo_n_routers(name, pairs) -if result is not None: -print(result) -sys.stdout.flush() -assert result is None, "TCP_TEST Stop %s FAIL: %s" % (name, result) -self.logger.log("TCP_TEST Stop %s SUCCESS" % name) - -@SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON) -def test_01c_tcp_INTC_INTC(self): -name = "test_01c_tcp_INTC_INTC" -self.logger.log("TCP_TEST Start %s" % name) -pairs = [self.EchoPair(self.INTC, self.INTC)] -result = self.do_tcp_echo_n_routers(name, pairs) -if result is not None: -print(result) -sys.stdout.flush() -assert result is None, "TCP_TEST Stop %s FAIL: %s" % (name, result) -self.logger.log("TCP_TEST Stop %s SUCCESS" % name) - -@SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON) -def test_03_tcp_INTA_INTB(self): -name = "test_03_tcp_INTA_INTB" -self.logger.log("TCP_TEST Start %s" % name) -pairs = [self.EchoPair(self.INTA, self.INTB)] -result = self.do_tcp_echo_n_routers(name, pairs) -if result is not None: -print(result) -sys.stdout.flush() -assert result is None, "TCP_TEST Stop %s FAIL: %s" % (name, result) -self.logger.log("TCP_TEST Stop %s SUCCESS" % name) - -@SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON) -def test_04_tcp_EA1_EA1(self): -name = "test_04_tcp_EA1_EA1"
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1831: Extend TCP adaptor test interior router backbone
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 1e2bfed DISPATCH-1831: Extend TCP adaptor test interior router backbone 1e2bfed is described below commit 1e2bfed3639d96fe89fa16a7d01959dba65fc7fc Author: Chuck Rolke AuthorDate: Wed Nov 18 11:52:32 2020 -0500 DISPATCH-1831: Extend TCP adaptor test interior router backbone --- tests/system_tests_tcp_adaptor.py | 103 ++ 1 file changed, 70 insertions(+), 33 deletions(-) diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index db82d46..173626d 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -107,36 +107,41 @@ class Logger(): class TcpAdaptor(TestCase): """ -4 edge routers connected via 2 interior routers. -6 echo servers are connected via tcpConnector, one to each router. -Each router has 7 listeners, one for each server and +6 edge routers connected via 3 interior routers. +9 echo servers are connected via tcpConnector, one to each router. +Each router has 10 listeners, one for each server and another for which there is no server. """ -# +---++-++-++---+ -# | EA1 |<-->| INTA |<==>| INTB |<-->| EB1 | -# +---+| || |+---+ -# +---+| || |+---+ -# | EA2 |<-->| || |<-->| EB2 | -# +---++-++-++---+ +# +---++-++-++-++---+ +# | EA1 |<-->| INTA |<==>| INTB |<==>| INTC |<-->| EC1 | +# +---+| || || |+---+ +# +---+| || || |+---+ +# | EA2 |<-->| || || |<-->| EC2 | +# +---++-++-++-++---+ +#^ ^ +#| | +# +---+ +---+ +# | EB1 | | EB2 | +# +---+ +---+ # # Each router tcp-connects to a like-named echo server. # Each router has tcp-listeners for every echo server # -# ++ ++ ++ ++ ++ ++ -# +--|tcp |-|tcp |-|tcp |-|tcp |-|tcp |-|tcp |--+ -# | |lsnr| |lsnr| |lsnr| |lsnr| |lsnr| |lsnr| | -# | |EA1 | |EA2 | |INTA| |INTB| |EB1 | |EB2 | | -# | ++ ++ ++ ++ ++ ++ | -# | +-+ +--+ -# | Router | tcp | | echo | -# | EA1 |connector|->|server| -# | +-+ | EA1 | -# | | +--+ -# +-+ +# ++ ++ ++ ++ ++ ++ ++ ++ ++ +# +--|tcp |-|tcp |-|tcp |-|tcp |-|tcp |-|tcp |-|tcp |-|tcp |-|tcp |--+ +# | |lsnr| |lsnr| |lsnr| |lsnr| |lsnr| |lsnr| |lsnr| |lsnr| |lsnr| | +# | |EA1 | |EA2 | |INTA| |EB1 | |EB2 | |INTB| |EC1 | |EC2 | |INTC| | +# | ++ ++ ++ ++ ++ ++ ++ ++ ++ | +# | +-+ +--+ +# | Router | tcp | | echo | +# | EA1 |connector|->|server| +# | +-+ | EA1 | +# | | +--+ +# +--+ # # Allocate routers in this order -router_order = ['INTA', 'INTB', 'EA1', 'EA2', 'EB1', 'EB2'] +router_order = ['INTA', 'INTB', 'INTC', 'EA1', 'EA2', 'EB1', 'EB2', 'EC1', 'EC2'] # List indexed in router_order # First listener in each router is normal AMQP for test setup and mgmt. @@ -228,9 +233,11 @@ class TcpAdaptor(TestCase): cls.nodest_listener_ports[rtr] = cls.tester.get_port() cls.http_liste
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1826: Fix tcp adaptor stall on large messages
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new f926a6c DISPATCH-1826: Fix tcp adaptor stall on large messages f926a6c is described below commit f926a6c1b9b61f08a2fdd7c7436496bd771516d5 Author: Chuck Rolke AuthorDate: Wed Nov 18 10:40:35 2020 -0500 DISPATCH-1826: Fix tcp adaptor stall on large messages --- src/adaptors/tcp_adaptor.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 742a42c..7476808 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -512,6 +512,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: { qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need write buffers", conn->conn_id); while (qdr_connection_process(conn->conn)) {} +handle_outgoing(conn); break; } case PN_RAW_CONNECTION_NEED_READ_BUFFERS: { - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch master updated: DISPATCH-1786: Fallback dest random failures: defer starting sender
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new fd0d8dd DISPATCH-1786: Fallback dest random failures: defer starting sender fd0d8dd is described below commit fd0d8dd77b5101193fc35d9c459e1bd1b9754034 Author: Chuck Rolke AuthorDate: Fri Nov 13 15:18:57 2020 -0500 DISPATCH-1786: Fallback dest random failures: defer starting sender Don't create sender before the fallback receiver has opened. This fix is not proven to fix intermittent failures. However, it removes a case where fallback would not work. --- tests/system_tests_fallback_dest.py | 9 + 1 file changed, 9 insertions(+) diff --git a/tests/system_tests_fallback_dest.py b/tests/system_tests_fallback_dest.py index c70c226..d55d2ef 100644 --- a/tests/system_tests_fallback_dest.py +++ b/tests/system_tests_fallback_dest.py @@ -583,6 +583,8 @@ class SwitchoverTest(MessagingHandler): self.sender_conn= None self.primary_conn = None self.fallback_conn = None +self.primary_open = False +self.fallback_open = False self.error = None self.n_tx = 0 self.n_rx = 0 @@ -615,7 +617,14 @@ class SwitchoverTest(MessagingHandler): self.fallback_receiver.source.capabilities.put_object(symbol("qd.fallback")) def on_link_opened(self, event): +receiver_event = False if event.receiver == self.primary_receiver: +self.primary_open = True +receiver_event = True +if event.receiver == self.fallback_receiver: +self.fallback_open = True +receiver_event = True +if receiver_event and self.primary_open and self.fallback_open: self.sender = event.container.create_sender(self.sender_conn, self.addr, name=(self.addr + "_sender")) def on_link_closed(self, event): - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1830: fix inadvertent file rename that hides ECHO_CLIENT logs from Scraper
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new c430b53 DISPATCH-1830: fix inadvertent file rename that hides ECHO_CLIENT logs from Scraper c430b53 is described below commit c430b537d2f4c642545c0ee3aa342bc8622e0c2a Author: Chuck Rolke AuthorDate: Thu Nov 12 07:40:45 2020 -0500 DISPATCH-1830: fix inadvertent file rename that hides ECHO_CLIENT logs from Scraper --- tests/system_tests_tcp_adaptor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index d4fc81e..db82d46 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -389,7 +389,7 @@ class TcpAdaptor(TestCase): self.client_logger = Logger(title=self.client_prefix, print_to_console=self.print_client_logs, save_for_dump=False, - ofilename="../setUpClass/TcpAdaptor_runner_%s.log" % self.name) + ofilename="../setUpClass/TcpAdaptor_echo_client_%s.log" % self.name) try: self.e_client = TcpEchoClient(prefix=self.client_prefix, - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1830: Fix TCP test glitches
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 62a017a DISPATCH-1830: Fix TCP test glitches 62a017a is described below commit 62a017a9a8b94cb30d3909c3afa6bbfb59bc2b87 Author: Chuck Rolke AuthorDate: Wed Nov 11 15:46:29 2020 -0500 DISPATCH-1830: Fix TCP test glitches * Enable all tests * Fix misnamed tests * Don't set a timeout on echo servers * Fix printing success even when test failed * Don't log redundant 'test stop' messages --- tests/system_tests_tcp_adaptor.py | 24 +++- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index ac98d1b..d4fc81e 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -335,7 +335,6 @@ class TcpAdaptor(TestCase): cls.logger.log("TCP_TEST Launching echo server '%s'" % server_prefix) server = TcpEchoServer(prefix=server_prefix, port=cls.tcp_server_listener_ports[rtr], - timeout=TIMEOUT, logger=server_logger) assert server.is_running cls.echo_servers[rtr] = server @@ -520,7 +519,7 @@ class TcpAdaptor(TestCase): self.logger.log("TCP_TEST %s Client %s exited normally" % (test_name, runner.name)) runner.client_final = True -if complete: +if complete and result is None: self.logger.log("TCP_TEST %s SUCCESS" % test_name) break @@ -536,7 +535,6 @@ class TcpAdaptor(TestCase): result = "TCP_TEST %s failed. Exception: %s" % \ (test_name, traceback.format_exc()) -self.logger.log("TCP_TEST Stop %s do_tcp_echo_n_routers" % test_name) return result # @@ -570,7 +568,7 @@ class TcpAdaptor(TestCase): self.logger.log("TCP_TEST Stop %s SUCCESS" % name) @SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON) -def xtest_03_tcp_INTA_INTB(self): +def test_03_tcp_INTA_INTB(self): name = "test_03_tcp_INTA_INTB" self.logger.log("TCP_TEST Start %s" % name) pairs = [self.EchoPair(self.INTA, self.INTB)] @@ -582,7 +580,7 @@ class TcpAdaptor(TestCase): self.logger.log("TCP_TEST Stop %s SUCCESS" % name) @SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON) -def xtest_04_tcp_EA1_EA1(self): +def test_04_tcp_EA1_EA1(self): name = "test_04_tcp_EA1_EA1" self.logger.log("TCP_TEST Start %s" % name) pairs = [self.EchoPair(self.EA1, self.EA1)] @@ -594,7 +592,7 @@ class TcpAdaptor(TestCase): self.logger.log("TCP_TEST Stop %s SUCCESS" % name) @SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON) -def xtest_05_tcp_EA1_EA2(self): +def test_05_tcp_EA1_EA2(self): name = "test_05_tcp_EA1_EA2" self.logger.log("TCP_TEST Start %s" % name) pairs = [self.EchoPair(self.EA1, self.EA2)] @@ -606,8 +604,8 @@ class TcpAdaptor(TestCase): self.logger.log("TCP_TEST Stop %s SUCCESS" % name) @SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON) -def xtest_06_tcp_EA1_INTA(self): -name = "xtest_06_tcp_EA1_INTA" +def test_06_tcp_EA1_INTA(self): +name = "test_06_tcp_EA1_INTA" self.logger.log("TCP_TEST Start %s" % name) pairs = [self.EchoPair(self.EA1, self.INTA)] result = self.do_tcp_echo_n_routers(name, pairs) @@ -618,8 +616,8 @@ class TcpAdaptor(TestCase): self.logger.log("TCP_TEST Stop %s SUCCESS" % name) @SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON) -def xtest_07_tcp_EA1_INTB(self): -name = "xtest_07_tcp_EA1_INTB" +def test_07_tcp_EA1_INTB(self): +name = "test_07_tcp_EA1_INTB" self.logger.log("TCP_TEST Start %s" % name) pairs = [self.EchoPair(self.EA1, self.INTB)] result = self.do_tcp_echo_n_routers(name, pairs) @@ -630,8 +628,8 @@ class TcpAdaptor(TestCase): self.logger.log("TCP_TEST Stop %s SUCCESS" % name) @SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON) -def xtest_08_tcp_EA1_EB1(self): -name = "xtest_08_tcp_EA1_EB1" +def test
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1830: TCP test allows concurrent echo session
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 425fab4 DISPATCH-1830: TCP test allows concurrent echo session 425fab4 is described below commit 425fab4272d3750d6b494bfc4597a19d2b4997f6 Author: Chuck Rolke AuthorDate: Wed Nov 11 11:08:09 2020 -0500 DISPATCH-1830: TCP test allows concurrent echo session * In tearDownClass: call super to stop routers * Include only simple tests that pass (on developer's system) * More complex cases are commented out in the test definition with "def xtest_" instead of "def test_" --- tests/system_tests_tcp_adaptor.py | 411 -- 1 file changed, 305 insertions(+), 106 deletions(-) diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index 7d5cc6a..ac98d1b 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -120,7 +120,7 @@ class TcpAdaptor(TestCase): # +---++-++-++---+ # # Each router tcp-connects to a like-named echo server. -# Each router has tcp-listeners for ever echo server +# Each router has tcp-listeners for every echo server # # ++ ++ ++ ++ ++ ++ # +--|tcp |-|tcp |-|tcp |-|tcp |-|tcp |-|tcp |--+ @@ -228,9 +228,9 @@ class TcpAdaptor(TestCase): cls.nodest_listener_ports[rtr] = cls.tester.get_port() cls.http_listener_ports[rtr] = cls.tester.get_port() -inter_router_port = cls.tester.get_port() -cls.INTA_edge_port = cls.tester.get_port() -cls.INTB_edge_port = cls.tester.get_port() +inter_router_port = cls.tester.get_port() +cls.INTA_edge_port = cls.tester.get_port() +cls.INTB_edge_port = cls.tester.get_port() cls.logger = Logger(title="TcpAdaptor-testClass", print_to_console=True, @@ -265,7 +265,7 @@ class TcpAdaptor(TestCase): for line in p_out: o_file.write("set %s\n" % line) -# Write a script to run scraper on this test's log file +# Write a script to run scraper on this test's log files scraper_abspath = os.path.join(os.environ.get('BUILD_DIR'), 'tests', 'scraper', 'scraper.py') logs_dir = os.path.abspath("../setUpClass") main_log = "TcpAdaptor.log" @@ -273,7 +273,7 @@ class TcpAdaptor(TestCase): big_test_log = "TcpAdaptor_all.log" int_logs = "I*.log" edge_logs= "E*.log" -log_modules_spec = "--log-modules TCP_ADAPTOR,TCP_TEST,ECHO_SERVER,ECHO_CLIENT" +log_modules_spec = "--log-modules TCP_ADAPTOR,TCP_TEST,ECHO_SERVER,ECHO_CLIENT" html_output = "TcpAdaptor.html" with open("../setUpClass/TcpAdaptor-run-scraper.sh", 'w') as o_file: @@ -319,7 +319,7 @@ class TcpAdaptor(TestCase): cls.logger.log("TCP_TEST INTB waiting for connection to INTA") cls.INTB.wait_router_connected('INTA') -# define logging +# define logging levels cls.print_logs_server = True cls.print_logs_client = True @@ -340,10 +340,6 @@ class TcpAdaptor(TestCase): assert server.is_running cls.echo_servers[rtr] = server -# sleep so user can verify the mess -# cls.logger.log("Verify the setup while I sleep 5 min: firefox http://localhost:%d"; % cls.http_listener_ports["INTA"]) -# time.sleep(300) - @classmethod def tearDownClass(cls): # stop echo servers @@ -351,113 +347,213 @@ class TcpAdaptor(TestCase): server = cls.echo_servers[rtr] cls.logger.log("TCP_TEST Stopping echo server %s" % rtr) server.wait() +super(TcpAdaptor, cls).tearDownClass() -def do_test_echo(self, test_name, logger, client, server, size, count, print_client_logs): -# Run echo client. Return true if it works. - -# Each router has a listener for the echo server attached to every router -listener_port = self.tcp_client_listener_ports[client][server] +# +# Test concurrent clients +# +class EchoClientRunner(): +""" +Launch an echo client upon construction. +Provide poll interface for checking done/error. +Provide wait/join to shut down. +""" +def __init__(self, test_name, client_n, logger, client, server, size, count, print_client_logs): +"&q
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1807: Improve TCP Adaptor self test
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 3a80d73 DISPATCH-1807: Improve TCP Adaptor self test 3a80d73 is described below commit 3a80d73d8866d24f4b1790c99295d3f20793afba Author: Chuck Rolke AuthorDate: Mon Nov 9 18:30:13 2020 -0500 DISPATCH-1807: Improve TCP Adaptor self test * Add a logger that writes component logs to separate log files. * Emit a script that scrapes the logs and produces html result. The current test setup sends one one-byte message from the listening router to the router that is hosting the echo server. This is adequate until all router listeners can forward to any echo server. Later the test may add larger message sizes and concurrent activity. --- tests/system_tests_tcp_adaptor.py | 112 ++ 1 file changed, 90 insertions(+), 22 deletions(-) diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index d8019ef..7d5cc6a 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -23,13 +23,14 @@ from __future__ import absolute_import from __future__ import print_function import os +import sys import time import traceback from threading import Event from threading import Timer from system_test import TestCase, Qdrouterd, main_module, TIMEOUT -from system_test import Logger +from system_test import Timestamp from system_test import QdManager from system_test import unittest from system_test import DIR @@ -37,6 +38,13 @@ from system_test import SkipIfNeeded from qpid_dispatch.management.client import Node from subprocess import PIPE, STDOUT +# Tests in this file are organized by classes that inherit TestCase. +# The first instance is TcpAdaptor(TestCase). +# The tests emit files that are named starting with 'TcpAdaptor'. This includes +# logs and shell scripts. +# Subsequent TestCase subclasses must follow this pattern and emit files named +# with the test class name at the beginning of the emitted files. + try: from TCP_echo_client import TcpEchoClient from TCP_echo_server import TcpEchoServer @@ -55,6 +63,47 @@ except ImportError: DISABLE_SELECTOR_TESTS = True DISABLE_SELECTOR_REASON = "Python selectors module is not available on this platform." +class Logger(): +""" +Record event logs as existing Logger. Also add: +* ofile - optional file opened in 'append' mode to which each log line is written +TODO: Replace system_test Logger with this after merging dev-protocol-adaptors branch +""" +def __init__(self, + title="Logger", + print_to_console=False, + save_for_dump=True, + ofilename=None): +self.title = title +self.print_to_console = print_to_console +self.save_for_dump = save_for_dump +self.logs = [] +self.ofilename = ofilename + +def log(self, msg): +ts = Timestamp() +if self.save_for_dump: +self.logs.append( (ts, msg) ) +if self.print_to_console: +print("%s %s" % (ts, msg)) +sys.stdout.flush() +if self.ofilename is not None: +with open(self.ofilename, 'a') as f_out: +f_out.write("%s %s\n" % (ts, msg)) +f_out.flush() + +def dump(self): +print(self) +sys.stdout.flush() + +def __str__(self): +lines = [] +lines.append(self.title) +for ts, msg in self.logs: +lines.append("%s %s" % (ts, msg)) +res = str('\n'.join(lines)) +return res + class TcpAdaptor(TestCase): """ @@ -185,17 +234,10 @@ class TcpAdaptor(TestCase): cls.logger = Logger(title="TcpAdaptor-testClass", print_to_console=True, -save_for_dump=False) +save_for_dump=False, +ofilename='../setUpClass/TcpAdaptor.log') # Write a dummy log line for scraper. -# With this the test log can be identified and consumed in scraper. -# 1. Capture test log output to file 'test.log'. -# 2. Edit away prefix (e.g. '71: ') so each line starts with time of day. -# 3. Edit away ctest lines and fragments that have no time of day. -# 4. Run scraper: -# 'scraper -lm TCP_ADAPTOR,TCP_TEST,ECHO_SERVER,ECHO_CLIENT -f I*.log E*.log test.log > test.html' -# 5. Profit: -# 'firefox test.html&
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPACH-1807: TCP self test integration with scraper
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 61fe9a8 DISPACH-1807: TCP self test integration with scraper 61fe9a8 is described below commit 61fe9a8773eae07ac2e4914be56a17fbe3d22f51 Author: Chuck Rolke AuthorDate: Fri Nov 6 08:56:59 2020 -0500 DISPACH-1807: TCP self test integration with scraper Add hook and instructions for merging TCP adaptor self test log with router logs in scraper. There is still a manual step that will be unnecessary in upcoming commits. --- tests/system_tests_tcp_adaptor.py | 11 +++ 1 file changed, 11 insertions(+) diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index 52c945a..d8019ef 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -187,6 +187,17 @@ class TcpAdaptor(TestCase): print_to_console=True, save_for_dump=False) +# Write a dummy log line for scraper. +# With this the test log can be identified and consumed in scraper. +# 1. Capture test log output to file 'test.log'. +# 2. Edit away prefix (e.g. '71: ') so each line starts with time of day. +# 3. Edit away ctest lines and fragments that have no time of day. +# 4. Run scraper: +# 'scraper -lm TCP_ADAPTOR,TCP_TEST,ECHO_SERVER,ECHO_CLIENT -f I*.log E*.log test.log > test.html' +# 5. Profit: +# 'firefox test.html' +cls.logger.log("SERVER (info) Container Name: TCP_TEST") + # Create a scoreboard for the ports p_out = [] for rtr in cls.router_order: - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1807: Rewrite TCP adaptor self test
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 66a4d9f DISPATCH-1807: Rewrite TCP adaptor self test 66a4d9f is described below commit 66a4d9ffe7126bc93df543458d33569dfe2c5b2d Author: Chuck Rolke AuthorDate: Thu Nov 5 12:18:20 2020 -0500 DISPATCH-1807: Rewrite TCP adaptor self test Create multi-router network and an echo server for each router. Then launch echo clients to chosen routers targeting the echo server on any other router. --- tests/system_tests_tcp_adaptor.py | 420 -- 1 file changed, 314 insertions(+), 106 deletions(-) diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index 40f667b..52c945a 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -33,6 +33,7 @@ from system_test import Logger from system_test import QdManager from system_test import unittest from system_test import DIR +from system_test import SkipIfNeeded from qpid_dispatch.management.client import Node from subprocess import PIPE, STDOUT @@ -45,75 +46,256 @@ except ImportError: class TCP_echo_server(object): pass -class TcpAdaptorOneRouterEcho(TestCase): + +DISABLE_SELECTOR_TESTS = False +DISABLE_SELECTOR_REASON = '' +try: +import selectors +except ImportError: +DISABLE_SELECTOR_TESTS = True +DISABLE_SELECTOR_REASON = "Python selectors module is not available on this platform." + + +class TcpAdaptor(TestCase): """ -Run echo tests through a stand-alone router +4 edge routers connected via 2 interior routers. +6 echo servers are connected via tcpConnector, one to each router. +Each router has 7 listeners, one for each server and +another for which there is no server. """ -amqp_listener_port = None -tcp_client_listener_port = None -tcp_server_listener_port = None +# +---++-++-++---+ +# | EA1 |<-->| INTA |<==>| INTB |<-->| EB1 | +# +---+| || |+---+ +# +---+| || |+---+ +# | EA2 |<-->| || |<-->| EB2 | +# +---++-++-++---+ +# +# Each router tcp-connects to a like-named echo server. +# Each router has tcp-listeners for ever echo server +# +# ++ ++ ++ ++ ++ ++ +# +--|tcp |-|tcp |-|tcp |-|tcp |-|tcp |-|tcp |--+ +# | |lsnr| |lsnr| |lsnr| |lsnr| |lsnr| |lsnr| | +# | |EA1 | |EA2 | |INTA| |INTB| |EB1 | |EB2 | | +# | ++ ++ ++ ++ ++ ++ | +# | +-+ +--+ +# | Router | tcp | | echo | +# | EA1 |connector|->|server| +# | +-+ | EA1 | +# | | +--+ +# +-+ +# + +# Allocate routers in this order +router_order = ['INTA', 'INTB', 'EA1', 'EA2', 'EB1', 'EB2'] + +# List indexed in router_order +# First listener in each router is normal AMQP for test setup and mgmt. +amqp_listener_ports = {} + +# Each router listens for TCP where the tcp-address is the router name. +# Each router has N listeners, one for the echo server connected to each router. +tcp_client_listener_ports = {} + +# Each router connects to an echo server +tcp_server_listener_ports = {} + +# Each router has a TCP listener that has no associated server +nodest_listener_ports = {} + +# Each router has a console listener +http_listener_ports = {} + +# local timeout in seconds to wait for one echo client to finish +echo_timeout = 30 -echo_timeout = 30 # local timeout to wait for one echo client to finish +# TCP siteId for listeners and connectors +site = "mySite" @classmethod def setUpClass(cls): """Start a router""" -super(TcpAdaptorOneRouterEcho, cls).setUpClass() +super(TcpAdaptor, cls).setUpClass() -def router(name, mode, l_amqp, l_tcp_client, l_tcp_server, addr, site, extra=None): +if DISABLE_SELECTOR_TESTS: +return + +def router(name, mode, connection, extra=None): +""" +Launch a router through the system_test framework. +For each route
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1825: Skip TCP self test if selectors module is absent
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new ae262ac DISPATCH-1825: Skip TCP self test if selectors module is absent ae262ac is described below commit ae262acb8c7e800b5870f35246fd31b691d44a05 Author: Chuck Rolke AuthorDate: Tue Nov 3 15:41:29 2020 -0500 DISPATCH-1825: Skip TCP self test if selectors module is absent Create dummy echo client and server modules if import fails due to lack of 'selectors'. Skip tests if import of selectors fails --- tests/system_tests_tcp_adaptor.py | 21 + 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index c2b1dd1..40f667b 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -32,14 +32,20 @@ from system_test import TestCase, Qdrouterd, main_module, TIMEOUT from system_test import Logger from system_test import QdManager from system_test import unittest -from system_test import Process from system_test import DIR from qpid_dispatch.management.client import Node from subprocess import PIPE, STDOUT -from TCP_echo_client import TcpEchoClient -from TCP_echo_server import TcpEchoServer -class TcpAdaptorOneRouterEcho(TestCase, Process): +try: +from TCP_echo_client import TcpEchoClient +from TCP_echo_server import TcpEchoServer +except ImportError: +class TCP_echo_client(object): +pass +class TCP_echo_server(object): +pass + +class TcpAdaptorOneRouterEcho(TestCase): """ Run echo tests through a stand-alone router """ @@ -150,6 +156,13 @@ class TcpAdaptorOneRouterEcho(TestCase, Process): Run many echo clients. :return: """ +# run test only if selectors module is available +try: +import selectors +except ImportError: +# server and client modules require selectors +self.skipTest("Python selectors module is not available on this platform.") + # define logging print_logs_server = True print_logs_client = True - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1820: Fix python tox errors
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new ca1c17f DISPATCH-1820: Fix python tox errors ca1c17f is described below commit ca1c17f8a4aebe568e285f4135f94f20d3f047bb Author: Chuck Rolke AuthorDate: Tue Nov 3 15:32:32 2020 -0500 DISPATCH-1820: Fix python tox errors --- tests/TCP_echo_server.py | 6 ++ tests/system_tests_tcp_adaptor.py | 5 ++--- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/tests/TCP_echo_server.py b/tests/TCP_echo_server.py index d8a23d4..e74c780 100755 --- a/tests/TCP_echo_server.py +++ b/tests/TCP_echo_server.py @@ -132,9 +132,7 @@ class TcpEchoServer(): except Exception as exc: self.error = ('%s Opening listen socket %s:%d exception: %s' % (self.prefix, self.HOST, self.port, traceback.format_exc())) -logger.log(self.error) -sel.unregister(sock) -sock.close() +self.logger.log(self.error) return 1 # set up selector @@ -190,7 +188,7 @@ class TcpEchoServer(): if mask & selectors.EVENT_READ: try: recv_data = sock.recv(1024) -except ConnectionResetError as exc: +except Exception as exc: logger.log('%s Connection to %s:%d closed by peer' % (self.prefix, data.addr[0], data.addr[1])) sel.unregister(sock) diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index e290cec..c2b1dd1 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -39,7 +39,6 @@ from subprocess import PIPE, STDOUT from TCP_echo_client import TcpEchoClient from TCP_echo_server import TcpEchoServer - class TcpAdaptorOneRouterEcho(TestCase, Process): """ Run echo tests through a stand-alone router @@ -173,10 +172,10 @@ class TcpAdaptorOneRouterEcho(TestCase, Process): for count in [1, 10]: # make sure server is still running if server.error is not None: -logger.log("%s Server stopped with error: %s" % (name, server.error)) +self.logger.log("%s Server stopped with error: %s" % (self.name, server.error)) result = False if server.exit_status is not None: -logger.log("%s Server stopped with status: %s" % (name, server.exit_status)) +self.logger.log("%s Server stopped with status: %s" % (self.name, server.exit_status)) result = False # run another test client if result: - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1820: Improve logging to investigate TCP test hangs
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new a5cefb5 DISPATCH-1820: Improve logging to investigate TCP test hangs a5cefb5 is described below commit a5cefb5e34f4a3fa9c763f3e3a29b187a2e85472 Author: Chuck Rolke AuthorDate: Tue Nov 3 14:41:52 2020 -0500 DISPATCH-1820: Improve logging to investigate TCP test hangs --- tests/TCP_echo_client.py | 1 + tests/TCP_echo_server.py | 28 tests/system_tests_tcp_adaptor.py | 67 +-- 3 files changed, 74 insertions(+), 22 deletions(-) diff --git a/tests/TCP_echo_client.py b/tests/TCP_echo_client.py index ae5364c..8465ad5 100755 --- a/tests/TCP_echo_client.py +++ b/tests/TCP_echo_client.py @@ -94,6 +94,7 @@ class TcpEchoClient(): self._thread.start() def run(self): +self.logger.log("%s Client is starting up" % (self.prefix)) try: start_time = time.time() self.is_running = True diff --git a/tests/TCP_echo_server.py b/tests/TCP_echo_server.py index 34f72bb..d8a23d4 100755 --- a/tests/TCP_echo_server.py +++ b/tests/TCP_echo_server.py @@ -123,11 +123,19 @@ class TcpEchoServer(): total_echoed = 0 # set up listening socket -self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -self.sock.bind((self.HOST, self.port)) -self.sock.listen() -self.sock.setblocking(False) -self.logger.log('%s Listening on host:%s, port:%d' % (self.prefix, self.HOST, self.port)) +try: +self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +self.sock.bind((self.HOST, self.port)) +self.sock.listen() +self.sock.setblocking(False) +self.logger.log('%s Listening on host:%s, port:%d' % (self.prefix, self.HOST, self.port)) +except Exception as exc: +self.error = ('%s Opening listen socket %s:%d exception: %s' % + (self.prefix, self.HOST, self.port, traceback.format_exc())) +logger.log(self.error) +sel.unregister(sock) +sock.close() +return 1 # set up selector sel = selectors.DefaultSelector() @@ -183,10 +191,18 @@ class TcpEchoServer(): try: recv_data = sock.recv(1024) except ConnectionResetError as exc: -logger.log('%s Connection to %s:%d closed by peer' % (self.prefix, data.addr[0], data.addr[1])) +logger.log('%s Connection to %s:%d closed by peer' % + (self.prefix, data.addr[0], data.addr[1])) sel.unregister(sock) sock.close() return 0 +except Exception as exc: +self.error = ('%s Connection to %s:%d exception: %s' % + (self.prefix, data.addr[0], data.addr[1], traceback.format_exc())) +logger.log(self.error) +sel.unregister(sock) +sock.close() +return 1 if recv_data: data.outb += recv_data logger.log('%s read from: %s:%d len:%d: %s' % (self.prefix, data.addr[0], data.addr[1], len(recv_data), diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index c2df88d..e290cec 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -23,8 +23,8 @@ from __future__ import absolute_import from __future__ import print_function import os +import time import traceback -from time import sleep from threading import Event from threading import Timer @@ -48,6 +48,8 @@ class TcpAdaptorOneRouterEcho(TestCase, Process): tcp_client_listener_port = None tcp_server_listener_port = None +echo_timeout = 30 # local timeout to wait for one echo client to finish + @classmethod def setUpClass(cls): """Start a router""" @@ -81,14 +83,22 @@ class TcpAdaptorOneRouterEcho(TestCase, Process): router('A', 'interior', cls.amqp_listener_port, cls.tcp_client_listener_port, cls.tcp_server_listener_port, "some_address", "best_site") -cls.logger = Logger(title="TcpAdaptorOneRouterEcho-testClass", print_to_console=True) +cls.logger = Logger(title="TcpAdaptorOneRouterEcho-testClass", +print_to_console=True, +save_for_dump=False) +
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1806: Account properly for write buffer byte count
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 95c3d93 DISPATCH-1806: Account properly for write buffer byte count 95c3d93 is described below commit 95c3d936f7d6d90fd3760f4cc450f8ca5a4b805a Author: Chuck Rolke AuthorDate: Tue Nov 3 14:19:39 2020 -0500 DISPATCH-1806: Account properly for write buffer byte count Use proper indexes to get written buffer sizes. --- src/adaptors/tcp_adaptor.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index c91a855..ee3c436 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -305,8 +305,6 @@ static bool write_outgoing_buffs(qdr_tcp_connection_t *conn) &conn->outgoing_buffs[conn->outgoing_buff_idx], conn->outgoing_buff_count); result = used == conn->outgoing_buff_count; -conn->outgoing_buff_count -= used; -conn->outgoing_buff_idx += used; int bytes_written = 0; for (size_t i = 0; i < used; i++) { @@ -319,6 +317,9 @@ static bool write_outgoing_buffs(qdr_tcp_connection_t *conn) } qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Writing %i bytes", conn->conn_id, bytes_written); + +conn->outgoing_buff_count -= used; +conn->outgoing_buff_idx += used; } return result; } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1824: Fix TCP adaptor listener and connector shutdown leaks
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 4e2fbf0 DISPATCH-1824: Fix TCP adaptor listener and connector shutdown leaks 4e2fbf0 is described below commit 4e2fbf07a3186c9c9652fac8cc1d4ce2fd4c45f1 Author: Chuck Rolke AuthorDate: Tue Nov 3 13:59:30 2020 -0500 DISPATCH-1824: Fix TCP adaptor listener and connector shutdown leaks --- src/adaptors/tcp_adaptor.c | 15 +++ 1 file changed, 15 insertions(+) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 85c9a6f..c91a855 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -1054,12 +1054,27 @@ static void qdr_tcp_adaptor_final(void *adaptor_context) qd_log(tcp_adaptor->log_source, QD_LOG_CRITICAL, "Shutting down TCP protocol adaptor"); qdr_tcp_adaptor_t *adaptor = (qdr_tcp_adaptor_t*) adaptor_context; +qd_tcp_listener_t *tl = DEQ_HEAD(adaptor->listeners); +while (tl) { +qd_tcp_listener_t *next = DEQ_NEXT(tl); +free_qd_tcp_listener_t(tl); +tl = next; +} + +qd_tcp_connector_t *tr = DEQ_HEAD(adaptor->connectors); +while (tr) { +qd_tcp_connector_t *next = DEQ_NEXT(tr); +free_qd_tcp_connector_t(tr); +tr = next; +} + qdr_tcp_connection_t *tc = DEQ_HEAD(adaptor->connections); while (tc) { qdr_tcp_connection_t *next = DEQ_NEXT(tc); free_qdr_tcp_connection(tc); tc = next; } + qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor); free(adaptor); tcp_adaptor = NULL; - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1820: TCP tests hang, this gets many of them started
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new a57772b DISPATCH-1820: TCP tests hang, this gets many of them started a57772b is described below commit a57772b3a732df4b17830a34d34c88aae6861770 Author: Chuck Rolke AuthorDate: Tue Nov 3 11:35:15 2020 -0500 DISPATCH-1820: TCP tests hang, this gets many of them started --- tests/TCP_echo_client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/TCP_echo_client.py b/tests/TCP_echo_client.py index 57152e3..ae5364c 100755 --- a/tests/TCP_echo_client.py +++ b/tests/TCP_echo_client.py @@ -144,6 +144,7 @@ class TcpEchoClient(): selectors.EVENT_READ | selectors.EVENT_WRITE) # event loop +time.sleep(0.1) # DISPATCH-1820 investigation while self.keep_running: if self.timeout > 0.0: elapsed = time.time() - start_time - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1823: TCP adaptor honors discard flag in core callbacks
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 5772502 DISPATCH-1823: TCP adaptor honors discard flag in core callbacks 5772502 is described below commit 577250222160ccb33e907a90cc3c635e3f5ebcd1 Author: Chuck Rolke AuthorDate: Tue Nov 3 11:30:55 2020 -0500 DISPATCH-1823: TCP adaptor honors discard flag in core callbacks --- src/adaptors/tcp_adaptor.c | 21 + 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 6d71204..85c9a6f 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -1299,16 +1299,21 @@ void qdra_tcp_connection_get_CT(qdr_core_t *core, static void qdr_add_tcp_connection_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { -qdr_tcp_connection_t *conn = (qdr_tcp_connection_t*) action->args.general.context_1; -DEQ_INSERT_TAIL(tcp_adaptor->connections, conn); -qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "Added tcp connection %s (%zu)", conn->config.host_port, DEQ_SIZE(tcp_adaptor->connections)); +if (!discard) { +qdr_tcp_connection_t *conn = (qdr_tcp_connection_t*) action->args.general.context_1; +DEQ_INSERT_TAIL(tcp_adaptor->connections, conn); +qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] qdr_add_tcp_connection_CT %s (%zu)", +conn->conn_id, conn->config.host_port, DEQ_SIZE(tcp_adaptor->connections)); +} } static void qdr_del_tcp_connection_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { -qdr_tcp_connection_t *conn = (qdr_tcp_connection_t*) action->args.general.context_1; -DEQ_REMOVE(tcp_adaptor->connections, conn); -qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Removed tcp connection %s (%zu)", - conn->conn_id, conn->config.host_port, DEQ_SIZE(tcp_adaptor->connections)); -free_qdr_tcp_connection(conn); +if (!discard) { +qdr_tcp_connection_t *conn = (qdr_tcp_connection_t*) action->args.general.context_1; +DEQ_REMOVE(tcp_adaptor->connections, conn); +qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] qdr_del_tcp_connection_CT %s (%zu)", +conn->conn_id, conn->config.host_port, DEQ_SIZE(tcp_adaptor->connections)); +free_qdr_tcp_connection(conn); +} } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1817: TCP adaptor leaks streaming_data_t objects at shutdown
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new ca14f7f DISPATCH-1817: TCP adaptor leaks streaming_data_t objects at shutdown ca14f7f is described below commit ca14f7ff755b4870daa46ecd13636fadc075b1c4 Author: Chuck Rolke AuthorDate: Tue Nov 3 11:22:10 2020 -0500 DISPATCH-1817: TCP adaptor leaks streaming_data_t objects at shutdown In the adaptor final function run down connections that are still open and free the streaming objects they hold. This closes #907 --- src/adaptors/tcp_adaptor.c | 17 +++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index d711c7a..6d71204 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -194,7 +194,7 @@ static int handle_incoming(qdr_tcp_connection_t *conn) static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc) { -qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "Freeing %p", (void*) tc); +qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freeing tcp_connection %p", tc->conn_id, (void*) tc); if (tc->reply_to) { free(tc->reply_to); } @@ -207,6 +207,10 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc) if (tc->activate_timer) { qd_timer_free(tc->activate_timer); } +if (tc->outgoing_stream_data) { +free_qd_message_stream_data_t(tc->outgoing_stream_data); +} + //proactor will free the socket free(tc); } @@ -1047,7 +1051,15 @@ static void qdr_tcp_adaptor_init(qdr_core_t *core, void **adaptor_context) static void qdr_tcp_adaptor_final(void *adaptor_context) { +qd_log(tcp_adaptor->log_source, QD_LOG_CRITICAL, "Shutting down TCP protocol adaptor"); qdr_tcp_adaptor_t *adaptor = (qdr_tcp_adaptor_t*) adaptor_context; + +qdr_tcp_connection_t *tc = DEQ_HEAD(adaptor->connections); +while (tc) { +qdr_tcp_connection_t *next = DEQ_NEXT(tc); +free_qdr_tcp_connection(tc); +tc = next; +} qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor); free(adaptor); tcp_adaptor = NULL; @@ -1296,6 +1308,7 @@ static void qdr_del_tcp_connection_CT(qdr_core_t *core, qdr_action_t *action, bo { qdr_tcp_connection_t *conn = (qdr_tcp_connection_t*) action->args.general.context_1; DEQ_REMOVE(tcp_adaptor->connections, conn); -qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "Removed tcp connection %s (%zu)", conn->config.host_port, DEQ_SIZE(tcp_adaptor->connections)); +qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Removed tcp connection %s (%zu)", + conn->conn_id, conn->config.host_port, DEQ_SIZE(tcp_adaptor->connections)); free_qdr_tcp_connection(conn); } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1807: Add self tests for tcp protocol adaptor
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new e29ea39 DISPATCH-1807: Add self tests for tcp protocol adaptor e29ea39 is described below commit e29ea396acfab940e54db36b569a865720ac9f8a Author: Chuck Rolke AuthorDate: Fri Oct 30 09:40:35 2020 -0400 DISPATCH-1807: Add self tests for tcp protocol adaptor * echo_server and echo_client modified to have a test class that runs the test proper. * tcp_adaptor test runs the client and server test classes in separate threads and not in separate processes. This closes #905 --- tests/TCP_echo_client.py | 422 +- tests/TCP_echo_server.py | 357 +++- tests/system_tests_tcp_adaptor.py | 147 +++-- 3 files changed, 532 insertions(+), 394 deletions(-) diff --git a/tests/TCP_echo_client.py b/tests/TCP_echo_client.py index 5f2a687..57152e3 100755 --- a/tests/TCP_echo_client.py +++ b/tests/TCP_echo_client.py @@ -19,25 +19,33 @@ # under the License. # +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import +from __future__ import print_function + import argparse import os import selectors +import signal import socket import sys +from threading import Thread import time import traceback import types from system_test import Logger +from system_test import TIMEOUT +class GracefulKiller: + kill_now = False + def __init__(self): +signal.signal(signal.SIGINT, self.exit_gracefully) +signal.signal(signal.SIGTERM, self.exit_gracefully) -class EchoLogger(Logger): -def __init__(self, prefix="ECHO_LOGGER", title="EchoLogger", print_to_console=False, save_for_dump=False): -self.prefix = prefix + ' ' if len(prefix) > 0 else '' -super(EchoLogger, self).__init__(title=title, print_to_console=print_to_console, save_for_dump=save_for_dump) - -def log(self, msg): -super(EchoLogger, self).log(self.prefix + msg) + def exit_gracefully(self,signum, frame): +self.kill_now = True def split_chunk_for_display(raw_bytes): @@ -55,192 +63,240 @@ def split_chunk_for_display(raw_bytes): return result -def main_except(host, port, size, count, timeout, logger): -''' -:param host: connect to this host -:param port: connect to this port -:param size: size of individual payload chunks in bytes -:param count: number of payload chunks -:param strategy: "1" Send one payload; # TODO - Recv one payload -:param logger: Logger() object -:return: -''' -# Start up -start_time = time.time() -logger.log('Connecting to host:%s, port:%d, size:%d, count:%d' % (host, port, size, count)) -keep_going = True -total_sent = 0 -total_rcvd = 0 - -# outbound payload -payload_out = [] -out_list_idx = 0 # current _out array being sent -out_byte_idx = 0 # next-to-send in current array -out_ready_to_send = True -# Generate unique content for each message so you can tell where the message -# or fragment belongs in the whole stream. Chunks look like: -#b'[localhost:3:6:0]g' -#host: localhost -#port: 3 -#index: 6 -#offset into message: 0 -MAGIC_SIZE = 50 # Content repeats after chunks this big - used by echo server, too -for idx in range(count): -body_msg = "" -padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[idx % 30] -while len(body_msg) < size: -chunk = "[%s:%d:%d:%d]" % (host, port, idx, len(body_msg)) -padlen = MAGIC_SIZE - len(chunk) -chunk += padchar * padlen -body_msg += chunk -if len(body_msg) > size: -body_msg = body_msg[:size] -payload_out.append(bytearray(body_msg.encode())) -# incoming payloads -payload_in = [] -in_list_idx = 0 # current _in array being received -for i in range(count): -payload_in.append(bytearray()) - -# set up connection -host_address = (host, port) -sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -sock.connect(host_address) -sock.setblocking(False) - -# set up selector -sel = selectors.DefaultSelector() -sel.register(sock, - selectors.EVENT_READ | selectors.EVENT_WRITE) - -# event loop -while keep_going: -if timeout > 0.0: -elapsed = time.time() - start_time -if elapsed > timeout: -logger.log("Exiting due to timeout. To
[qpid-dispatch] 01/02: Revert "NO-JIRA: add dummy system_tests_tcp_adaptor.py for temporary CI fix"
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git commit 76c00c978c10687300fbefaff29fa70fca4d84b3 Author: Chuck Rolke AuthorDate: Tue Oct 27 15:03:33 2020 -0400 Revert "NO-JIRA: add dummy system_tests_tcp_adaptor.py for temporary CI fix" This reverts commit d338f851bcaa9259606ff067a659f80b884a77b4. --- tests/system_tests_tcp_adaptor.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py deleted file mode 100644 index e69de29..000 - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] 02/02: DISPATCH-1807: Add TCP protocol adaptor tests
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git commit 2d074ac41800f18a9549f97b82e49911027607ec Author: Chuck Rolke AuthorDate: Tue Oct 27 15:04:06 2020 -0400 DISPATCH-1807: Add TCP protocol adaptor tests Rewrite do-nothing tcp_adaptor test * Mistakenly committed test cmake including it before it's ready * It's still not ready but is finding some form Improve echo server * For messages larger than 100 bytes only print the first and last 50 bytes to logs. Improve echo client * Send unique data for each message * Improve logging * Add timeout * Don't use socket after closing it --- tests/TCP_echo_client.py | 102 --- tests/TCP_echo_server.py | 29 ++-- tests/system_tests_tcp_adaptor.py | 145 ++ 3 files changed, 262 insertions(+), 14 deletions(-) diff --git a/tests/TCP_echo_client.py b/tests/TCP_echo_client.py index 83f1b44..5f2a687 100755 --- a/tests/TCP_echo_client.py +++ b/tests/TCP_echo_client.py @@ -24,12 +24,38 @@ import os import selectors import socket import sys +import time import traceback import types from system_test import Logger -def main_except(host, port, size, count, logger): + +class EchoLogger(Logger): +def __init__(self, prefix="ECHO_LOGGER", title="EchoLogger", print_to_console=False, save_for_dump=False): +self.prefix = prefix + ' ' if len(prefix) > 0 else '' +super(EchoLogger, self).__init__(title=title, print_to_console=print_to_console, save_for_dump=save_for_dump) + +def log(self, msg): +super(EchoLogger, self).log(self.prefix + msg) + + +def split_chunk_for_display(raw_bytes): +""" +Given some raw bytes, return a display string +Only show the beginning and end of largish (2xMAGIC_SIZE) arrays. +:param raw_bytes: +:return: display string +""" +MAGIC_SIZE = 50 # Content repeats after chunks this big - used by echo client, too +if len(raw_bytes) > 2 * MAGIC_SIZE: +result = repr(raw_bytes[:MAGIC_SIZE]) + " ... " + repr(raw_bytes[-MAGIC_SIZE:]) +else: +result = repr(raw_bytes) +return result + + +def main_except(host, port, size, count, timeout, logger): ''' :param host: connect to this host :param port: connect to this port @@ -41,16 +67,36 @@ def main_except(host, port, size, count, logger): :return: ''' # Start up +start_time = time.time() logger.log('Connecting to host:%s, port:%d, size:%d, count:%d' % (host, port, size, count)) keep_going = True +total_sent = 0 +total_rcvd = 0 # outbound payload payload_out = [] out_list_idx = 0 # current _out array being sent out_byte_idx = 0 # next-to-send in current array out_ready_to_send = True -for i in range(count): -payload_out.append(bytearray([i & 255] * size)) +# Generate unique content for each message so you can tell where the message +# or fragment belongs in the whole stream. Chunks look like: +#b'[localhost:3:6:0]g' +#host: localhost +#port: 3 +#index: 6 +#offset into message: 0 +MAGIC_SIZE = 50 # Content repeats after chunks this big - used by echo server, too +for idx in range(count): +body_msg = "" +padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[idx % 30] +while len(body_msg) < size: +chunk = "[%s:%d:%d:%d]" % (host, port, idx, len(body_msg)) +padlen = MAGIC_SIZE - len(chunk) +chunk += padchar * padlen +body_msg += chunk +if len(body_msg) > size: +body_msg = body_msg[:size] +payload_out.append(bytearray(body_msg.encode())) # incoming payloads payload_in = [] in_list_idx = 0 # current _in array being received @@ -70,11 +116,17 @@ def main_except(host, port, size, count, logger): # event loop while keep_going: -for key, mask in sel.select(timeout=1): +if timeout > 0.0: +elapsed = time.time() - start_time +if elapsed > timeout: +logger.log("Exiting due to timeout. Total sent= %d, total rcvd= %d" % (total_sent, total_rcvd)) +break +for key, mask in sel.select(timeout=0.1): sock = key.fileobj if mask & selectors.EVENT_READ: recv_data = sock.recv(1024) if recv_data: +total_rcvd = len(recv_data) payload_
[qpid-dispatch] branch dev-protocol-adaptors-2 updated (d338f85 -> 2d074ac)
This is an automated email from the ASF dual-hosted git repository. chug pushed a change to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git. from d338f85 NO-JIRA: add dummy system_tests_tcp_adaptor.py for temporary CI fix new 76c00c9 Revert "NO-JIRA: add dummy system_tests_tcp_adaptor.py for temporary CI fix" new 2d074ac DISPATCH-1807: Add TCP protocol adaptor tests The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: tests/TCP_echo_client.py | 102 --- tests/TCP_echo_server.py | 29 ++-- tests/system_tests_tcp_adaptor.py | 145 ++ 3 files changed, 262 insertions(+), 14 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1807: Replace assert with raise; fixes tox test failure
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new a02a0ef DISPATCH-1807: Replace assert with raise; fixes tox test failure a02a0ef is described below commit a02a0ef0a0445b5033fe1ea2496bb77dda35d217 Author: Chuck Rolke AuthorDate: Tue Oct 27 09:53:42 2020 -0400 DISPATCH-1807: Replace assert with raise; fixes tox test failure --- tests/CMakeLists.txt | 3 +++ tests/TCP_echo_server.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index cf36a1d..be46d3a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -161,6 +161,7 @@ foreach(py_test_module system_tests_open_properties system_tests_http2 system_tests_http1_adaptor +system_tests_tcp_adaptor ) add_test(${py_test_module} ${TEST_WRAP} ${PYTHON_TEST_COMMAND} -v ${py_test_module}) @@ -233,6 +234,8 @@ file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/policy-2/policy-photoserver-sasl.sasldb D file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/policy-3/test-sender-receiver-limits.json DESTINATION ${CMAKE_CURRENT_BINARY_DIR}/policy-3) file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/policy-4/management-access.json DESTINATION ${CMAKE_CURRENT_BINARY_DIR}/policy-4/) file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/authservice.py DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) +file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/TCP_echo_server.py DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) +file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/TCP_echo_client.py DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) # following install() functions will be called only if you do a make "install" install(FILES ${SYSTEM_TEST_FILES} diff --git a/tests/TCP_echo_server.py b/tests/TCP_echo_server.py index bdfecfe..44aefd7 100755 --- a/tests/TCP_echo_server.py +++ b/tests/TCP_echo_server.py @@ -104,7 +104,7 @@ def main_except(sock, port, echo_count, timeout, logger): if key.fileobj is sock: do_accept(key.fileobj, sel, logger) else: -assert(False, "Only listener 'sock' has None in opaque data field") +raise Exception("Only listener 'sock' has None in opaque data field") else: total_echoed += do_service(key, mask, sel, logger) else: - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1807: TCP self test - improve test server
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 4aff90c DISPATCH-1807: TCP self test - improve test server 4aff90c is described below commit 4aff90cbd9d31140da437ffdf2e3aca9a999e9a2 Author: Chuck Rolke AuthorDate: Tue Oct 27 09:02:06 2020 -0400 DISPATCH-1807: TCP self test - improve test server * Add timeout * Add exit-on-byte-count * Improve logging for merge with Scraper --- tests/TCP_echo_server.py | 132 --- 1 file changed, 112 insertions(+), 20 deletions(-) diff --git a/tests/TCP_echo_server.py b/tests/TCP_echo_server.py index 61e95a2..bdfecfe 100755 --- a/tests/TCP_echo_server.py +++ b/tests/TCP_echo_server.py @@ -22,8 +22,10 @@ import argparse import os import selectors +from signal import signal, SIGINT import socket import sys +import time import traceback import types @@ -31,14 +33,50 @@ from system_test import Logger HOST = '127.0.0.1' -def main_except(port, logger): +class ClientRecord(object): +""" +Object to register with the selector 'data' field +for incoming user connections. This is *not* used +for the listening socket. +This object holds the socketId in the address and +the inbound and outbound data list buffers for this +socket's payload. +""" +def __init__(self, address): +self.addr = address +self.inb = b'' +self.outb = b'' + +def __repr__(self): +return str(self.addr) + " len(in)=" + str(len(self.inb)) + " len(out)=" + str(len(self.outb)) + +def __str__(self): +return self.__repr__() + + +class EchoLogger(Logger): +def __init__(self, prefix="ECHO_LOGGER", title="EchoLogger", print_to_console=False, save_for_dump=False): +self.prefix = prefix + ' ' if len(prefix) > 0 else '' +super(EchoLogger, self).__init__(title=title, print_to_console=print_to_console, save_for_dump=save_for_dump) + +def log(self, msg): +super(EchoLogger, self).log(self.prefix + msg) + + +def main_except(sock, port, echo_count, timeout, logger): ''' +:param lsock: socket to listen on :param port: port to listen on +:param echo_count: exit after echoing this many bytes +:param timeout: exit after this many seconds :param logger: Logger() object :return: ''' +# set up spontaneous exit settings +start_time = time.time() +total_echoed = 0 + # set up listening socket -sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind((HOST, port)) sock.listen() sock.setblocking(False) @@ -50,29 +88,48 @@ def main_except(port, logger): # event loop while True: -events = sel.select(timeout=None) -for key, mask in events: -if key.data is None: -do_accept(key.fileobj, sel, logger) -else: -do_service(key, mask, sel, logger) +if timeout > 0.0: +elapsed = time.time() - start_time +if elapsed > timeout: +logger.log("Exiting due to timeout. Total echoed = %d" % total_echoed) +break +if echo_count > 0: +if total_echoed >= echo_count: +logger.log("Exiting due to echo byte count. Total echoed = %d" % total_echoed) +break +events = sel.select(timeout=0.1) +if events: +for key, mask in events: +if key.data is None: +if key.fileobj is sock: +do_accept(key.fileobj, sel, logger) +else: +assert(False, "Only listener 'sock' has None in opaque data field") +else: +total_echoed += do_service(key, mask, sel, logger) +else: +pass # select timeout. probably. + +sel.unregister(sock) +sock.close() def do_accept(sock, sel, logger): conn, addr = sock.accept() logger.log('Accepted connection from %s:%d' % (addr[0], addr[1])) conn.setblocking(False) -data = types.SimpleNamespace(addr=addr, inb=b'', outb=b'') events = selectors.EVENT_READ | selectors.EVENT_WRITE -sel.register(conn, events, data=data) +sel.register(conn, events, data=ClientRecord(addr)) def do_service(key, mask, sel, logger): +retval = 0 sock = key.fileobj data = key.data if mask & selectors.EVENT_READ: recv_data = sock.recv(1024) if recv_data:
[qpid-dispatch] branch master updated: DISPATCH-1751: Fix 32-bit self test receiving unexpected incoming-capacity
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new 5371dfb DISPATCH-1751: Fix 32-bit self test receiving unexpected incoming-capacity 5371dfb is described below commit 5371dfbedc60fd6c429c237d7053c3e15f0d25fe Author: Chuck Rolke AuthorDate: Fri Oct 23 09:52:23 2020 -0400 DISPATCH-1751: Fix 32-bit self test receiving unexpected incoming-capacity Dispatch listener config defines a 'maxFrameSize' and a 'maxSessionFrames'. The idea is to have the 'maxFrameSize' go out in AMQP Open frames as 'max-frame-size' and the 'maxSessionFrames' go out in AMQP Begin frames as 'incoming-capacity'. To accomplish this Proton accepts a maxFrameSize and a capacity. For Dispatch to get Proton to emit the 'incoming-capacity' then Dispatch must specify 'capacity' as the product of 'maxFrameSize' and 'maxSessionFrames'. Furthermore, Proton specifies the capacity value as a system-dependent 'size_t' value type. For 64-bit systems the chosen defaults work just fine although the capacity is a huge 30+ Terabytes. For 32-bit systems the 30+ Tb number does not work since that does not fit into a 32-bit size_t. Dispatch uses a smaller fallback maximum for 32-bit systems. The self test was failing on 32-bit systems because the self test did not account for the 32-bit smaller fallback maximum. This patch fixes the self test to account for the fallback maximum on 32-bit systems. This patch also rewrites for readability the 32-bit/64-bit capacity calculations in the mission C code. It also computes capacity values using 64-bit numbers even on 32-bit systems to avoid size_t overflow truncation errors. This patch does *not* use a different strategy for driving Proton to use simpler calculation methods when essentially unlimited capacity values are in effect. Future work in this area might address some of the issues brought up during Pull Request reviews. Thank you to the reviewers! * Modify Proton to accept an 'incoming-capacity' value instead of or in addition to 'capacity'. * Modify Dispatch not to set a capacity at all in Proton when no flow control in effect. This closes #890 --- src/connection_manager.c| 44 - tests/system_tests_protocol_settings.py | 28 - 2 files changed, 49 insertions(+), 23 deletions(-) diff --git a/src/connection_manager.c b/src/connection_manager.c index aa95417..65cdfb6 100644 --- a/src/connection_manager.c +++ b/src/connection_manager.c @@ -438,25 +438,35 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf config->max_frame_size = QD_AMQP_MIN_MAX_FRAME_SIZE; // -// Given session frame count and max frame size compute session incoming_capacity +// Given session frame count and max frame size, compute session incoming_capacity +// On 64-bit systems the capacity has no practial limit. +// On 32-bit systems the largest default capacity is half the process address space. // -if (ssn_frames == 0) -config->incoming_capacity = (sizeof(size_t) < 8) ? 0x7FFFLL : 0x7FFFLL * config->max_frame_size; -else { -uint64_t mfs = (uint64_t) config->max_frame_size; -uint64_t trial_ic = ssn_frames * mfs; -uint64_t limit= (sizeof(size_t) < 8) ? (1ll << 31) - 1 : 0; -if (limit == 0 || trial_ic < limit) { -// Silently promote incoming capacity of zero to one -config->incoming_capacity = -(trial_ic < QD_AMQP_MIN_MAX_FRAME_SIZE ? QD_AMQP_MIN_MAX_FRAME_SIZE : trial_ic); +bool is_64bit = sizeof(size_t) == 8; +#define MAX_32BIT_CAPACITY ((size_t)(2147483647)) +if (ssn_frames == 0) { +config->incoming_capacity = is_64bit ? MAX_32BIT_CAPACITY * (size_t)config->max_frame_size : MAX_32BIT_CAPACITY; +} else { +// Limited incoming frames. +if (is_64bit) { +// Specify this to proton by setting capacity to be +// the product (max_frame_size * ssn_frames). +config->incoming_capacity = (size_t)config->max_frame_size * (size_t)ssn_frames; } else { -config->incoming_capacity = limit; -uint64_t computed_ssn_frames = limit / mfs; -qd_log(qd->connection_manager->log_source, QD_LOG_WARNING, - "Server configuation for I/O adapter entity name:'%s', host:'%s', port:'%s', &q
[qpid-dispatch] branch master updated: Revert "DISPATCH-1751: Rework how AMQP session incoming-window is derived"
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new c1a7f59 Revert "DISPATCH-1751: Rework how AMQP session incoming-window is derived" c1a7f59 is described below commit c1a7f59baaf0ba5944074b3c9544732538b2e310 Author: Chuck Rolke AuthorDate: Wed Oct 21 16:14:45 2020 -0400 Revert "DISPATCH-1751: Rework how AMQP session incoming-window is derived" This reverts commit 7525ac665c3600805615c86796482914cdba783d. --- src/connection_manager.c| 42 + tests/system_tests_protocol_settings.py | 22 + 2 files changed, 23 insertions(+), 41 deletions(-) diff --git a/src/connection_manager.c b/src/connection_manager.c index 04ce135..aa95417 100644 --- a/src/connection_manager.c +++ b/src/connection_manager.c @@ -438,33 +438,25 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf config->max_frame_size = QD_AMQP_MIN_MAX_FRAME_SIZE; // -// Given session frame count and max frame size, compute session incoming_capacity -// On 64-bit systems the capacity has no limit. -// On 32-bit systems the largest capacity is defined as half the process address space. +// Given session frame count and max frame size compute session incoming_capacity // -if (ssn_frames != 0) { -// Limited incoming frames. -// Specify this to proton by setting capacity to be -// the product (max_frame_size * ssn_frames). -size_t capacity = config->max_frame_size * ssn_frames; - -if (sizeof(size_t) == 8) { -// 64-bit systems use the configured, unbounded capacity -config->incoming_capacity = capacity; +if (ssn_frames == 0) +config->incoming_capacity = (sizeof(size_t) < 8) ? 0x7FFFLL : 0x7FFFLL * config->max_frame_size; +else { +uint64_t mfs = (uint64_t) config->max_frame_size; +uint64_t trial_ic = ssn_frames * mfs; +uint64_t limit= (sizeof(size_t) < 8) ? (1ll << 31) - 1 : 0; +if (limit == 0 || trial_ic < limit) { +// Silently promote incoming capacity of zero to one +config->incoming_capacity = +(trial_ic < QD_AMQP_MIN_MAX_FRAME_SIZE ? QD_AMQP_MIN_MAX_FRAME_SIZE : trial_ic); } else { -// 32-bit systems have an upper bound to the capacity -#define AMQP_MAX_WINDOW_SIZE (2147483647) -if (capacity <= AMQP_MAX_WINDOW_SIZE) { -config->incoming_capacity = capacity; -} else { -config->incoming_capacity = AMQP_MAX_WINDOW_SIZE; - -qd_log(qd->connection_manager->log_source, QD_LOG_WARNING, -"Server configuation for I/O adapter entity name:'%s', host:'%s', port:'%s', " -"requested maxSessionFrames truncated from %"PRId64" to %"PRId64, -config->name, config->host, config->port, ssn_frames, -AMQP_MAX_WINDOW_SIZE / config->max_frame_size); -} +config->incoming_capacity = limit; +uint64_t computed_ssn_frames = limit / mfs; +qd_log(qd->connection_manager->log_source, QD_LOG_WARNING, + "Server configuation for I/O adapter entity name:'%s', host:'%s', port:'%s', " + "requested maxSessionFrames truncated from %"PRId64" to %"PRId64, + config->name, config->host, config->port, ssn_frames, computed_ssn_frames); } } diff --git a/tests/system_tests_protocol_settings.py b/tests/system_tests_protocol_settings.py index 6b0d91d..546ffe6 100644 --- a/tests/system_tests_protocol_settings.py +++ b/tests/system_tests_protocol_settings.py @@ -26,7 +26,6 @@ from system_test import TestCase, Qdrouterd, main_module from system_test import unittest from proton.utils import BlockingConnection import subprocess -import sys class MaxFrameMaxSessionFramesTest(TestCase): """System tests setting proton negotiated size max-frame-size and incoming-window""" @@ -235,11 +234,8 @@ class MaxSessionFramesDefaultTest(TestCase): # if frame size not set then a default is used self.assertTrue(" max-frame-size=16384" in open_lines[0]) begin_lines = [s for s in log_lines if "-> @begin" in s] -# incoming-window is defaulted to 2^31-1 (64-bit) or 2^17-1 (32-bit) -is_64bits = sys.maxsize > 2 ** 32 -expected = " incoming-w
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1807: TCP self tests - add a TCP_echo_client
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 63ea082 DISPATCH-1807: TCP self tests - add a TCP_echo_client 63ea082 is described below commit 63ea0829241cb95eea7fc1d67f2cb4c6e26b4e41 Author: Chuck Rolke AuthorDate: Wed Oct 21 13:33:41 2020 -0400 DISPATCH-1807: TCP self tests - add a TCP_echo_client usage: TCP_echo_client.py [-h] [--host HOST] [--port PORT] [--size [SIZE]] [--count [COUNT]] [--log] optional arguments: -h, --helpshow this help message and exit --host HOST, -b HOST Required target host --port PORT, -p PORT Required target port number --size [SIZE], -s [SIZE] Size of payload in bytes --count [COUNT], -c [COUNT] Number of payloads to process --log, -l Write activity log to console example: ./TCP_echo_client.py -b localhost -p 9191 -s 100 -c 20 Writes 100 20-byte message where each message is sent and received before the next message is started. --- tests/TCP_echo_client.py | 162 +++ 1 file changed, 162 insertions(+) diff --git a/tests/TCP_echo_client.py b/tests/TCP_echo_client.py new file mode 100755 index 000..83f1b44 --- /dev/null +++ b/tests/TCP_echo_client.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import argparse +import os +import selectors +import socket +import sys +import traceback +import types + +from system_test import Logger + +def main_except(host, port, size, count, logger): +''' +:param host: connect to this host +:param port: connect to this port +:param size: size of individual payload chunks in bytes +:param count: number of payload chunks +:param strategy: "1" Send one payload; # TODO + Recv one payload +:param logger: Logger() object +:return: +''' +# Start up +logger.log('Connecting to host:%s, port:%d, size:%d, count:%d' % (host, port, size, count)) +keep_going = True + +# outbound payload +payload_out = [] +out_list_idx = 0 # current _out array being sent +out_byte_idx = 0 # next-to-send in current array +out_ready_to_send = True +for i in range(count): +payload_out.append(bytearray([i & 255] * size)) +# incoming payloads +payload_in = [] +in_list_idx = 0 # current _in array being received +for i in range(count): +payload_in.append(bytearray()) + +# set up connection +host_address = (host, port) +sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +sock.connect(host_address) +sock.setblocking(False) + +# set up selector +sel = selectors.DefaultSelector() +sel.register(sock, + selectors.EVENT_READ | selectors.EVENT_WRITE) + +# event loop +while keep_going: +for key, mask in sel.select(timeout=1): +sock = key.fileobj +if mask & selectors.EVENT_READ: +recv_data = sock.recv(1024) +if recv_data: +payload_in[in_list_idx].extend(recv_data) +if len(payload_in[in_list_idx]) == size: +logger.log("Rcvd message %d" % in_list_idx) +in_list_idx += 1 +if in_list_idx == count: +# Received all bytes of all chunks - done. +keep_going = False +else: +out_ready_to_send = True +sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE) +elif len(payload_in[in_list_idx]) > size: +error = "CRITICAL Rcvd message too big. Expected:%d, actual:%d" % \ +
[qpid-dispatch] branch master updated: DISPATCH-1751: Rework how AMQP session incoming-window is derived
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new 7525ac6 DISPATCH-1751: Rework how AMQP session incoming-window is derived 7525ac6 is described below commit 7525ac665c3600805615c86796482914cdba783d Author: Chuck Rolke AuthorDate: Tue Oct 20 14:14:00 2020 -0400 DISPATCH-1751: Rework how AMQP session incoming-window is derived Proton allows specification of a session 'capacity'. Initially the incoming-window will be (capacity / max-frame-size), defining at most how many max-size transfers will definitely be accepted on the session. Dispatch listener config defines a maxFrameSize and a maxSessionFrames the product of which is equal to the capacity to be configured in Proton. Dispatch vhostUserGroup policy defines a maxFrameSize and a maxSessionWindow. The maxSessionWindow is passed directly to Proton as the capacity. This closes #847 --- src/connection_manager.c| 42 - tests/system_tests_protocol_settings.py | 22 - 2 files changed, 41 insertions(+), 23 deletions(-) diff --git a/src/connection_manager.c b/src/connection_manager.c index aa95417..04ce135 100644 --- a/src/connection_manager.c +++ b/src/connection_manager.c @@ -438,25 +438,33 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf config->max_frame_size = QD_AMQP_MIN_MAX_FRAME_SIZE; // -// Given session frame count and max frame size compute session incoming_capacity +// Given session frame count and max frame size, compute session incoming_capacity +// On 64-bit systems the capacity has no limit. +// On 32-bit systems the largest capacity is defined as half the process address space. // -if (ssn_frames == 0) -config->incoming_capacity = (sizeof(size_t) < 8) ? 0x7FFFLL : 0x7FFFLL * config->max_frame_size; -else { -uint64_t mfs = (uint64_t) config->max_frame_size; -uint64_t trial_ic = ssn_frames * mfs; -uint64_t limit= (sizeof(size_t) < 8) ? (1ll << 31) - 1 : 0; -if (limit == 0 || trial_ic < limit) { -// Silently promote incoming capacity of zero to one -config->incoming_capacity = -(trial_ic < QD_AMQP_MIN_MAX_FRAME_SIZE ? QD_AMQP_MIN_MAX_FRAME_SIZE : trial_ic); +if (ssn_frames != 0) { +// Limited incoming frames. +// Specify this to proton by setting capacity to be +// the product (max_frame_size * ssn_frames). +size_t capacity = config->max_frame_size * ssn_frames; + +if (sizeof(size_t) == 8) { +// 64-bit systems use the configured, unbounded capacity +config->incoming_capacity = capacity; } else { -config->incoming_capacity = limit; -uint64_t computed_ssn_frames = limit / mfs; -qd_log(qd->connection_manager->log_source, QD_LOG_WARNING, - "Server configuation for I/O adapter entity name:'%s', host:'%s', port:'%s', " - "requested maxSessionFrames truncated from %"PRId64" to %"PRId64, - config->name, config->host, config->port, ssn_frames, computed_ssn_frames); +// 32-bit systems have an upper bound to the capacity +#define AMQP_MAX_WINDOW_SIZE (2147483647) +if (capacity <= AMQP_MAX_WINDOW_SIZE) { +config->incoming_capacity = capacity; +} else { +config->incoming_capacity = AMQP_MAX_WINDOW_SIZE; + +qd_log(qd->connection_manager->log_source, QD_LOG_WARNING, +"Server configuation for I/O adapter entity name:'%s', host:'%s', port:'%s', " +"requested maxSessionFrames truncated from %"PRId64" to %"PRId64, +config->name, config->host, config->port, ssn_frames, +AMQP_MAX_WINDOW_SIZE / config->max_frame_size); +} } } diff --git a/tests/system_tests_protocol_settings.py b/tests/system_tests_protocol_settings.py index 546ffe6..6b0d91d 100644 --- a/tests/system_tests_protocol_settings.py +++ b/tests/system_tests_protocol_settings.py @@ -26,6 +26,7 @@ from system_test import TestCase, Qdrouterd, main_module from system_test import unittest from proton.utils import BlockingConnection import subprocess +import sys class MaxFrameMaxSessionFramesTest(TestCase): """System tests setting proton negotiated size max-frame-size and incoming-window"&quo
[qpid-dispatch] 02/04: DISPATCH-1654: allow content to be seen when available
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git commit c56a1fba5165589c11a7779580a251c45e20828a Author: Gordon Sim AuthorDate: Wed Oct 7 12:09:02 2020 +0100 DISPATCH-1654: allow content to be seen when available --- src/message.c | 7 +++ 1 file changed, 7 insertions(+) diff --git a/src/message.c b/src/message.c index 23381db..a3a8152 100644 --- a/src/message.c +++ b/src/message.c @@ -1554,6 +1554,13 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) // the entire message. We'll be back later to finish it up. // Return the message so that the caller can start sending out whatever we have received so far // +// push what we do have for testing/processing +LOCK(content->lock); +qd_buffer_set_fanout(content->pending, content->fanout); +DEQ_INSERT_TAIL(content->buffers, content->pending); +content->pending = 0; +UNLOCK(content->lock); +content->pending = qd_buffer(); break; } } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] 03/04: DISPATCH-1654: fix for streaming message
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git commit a8e548bfca44851305a8b6a59cfb21272ca668ca Author: Gordon Sim AuthorDate: Wed Oct 7 12:09:57 2020 +0100 DISPATCH-1654: fix for streaming message --- include/qpid/dispatch/amqp.h| 2 ++ include/qpid/dispatch/message.h | 17 +++ include/qpid/dispatch/parse.h | 2 ++ src/adaptors/tcp_adaptor.c | 3 ++ src/amqp.c | 1 + src/message.c | 28 +- src/message_private.h | 2 ++ src/parse.c | 12 +++- src/router_node.c | 64 - 9 files changed, 97 insertions(+), 34 deletions(-) diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h index aca440c..6c1064f 100644 --- a/include/qpid/dispatch/amqp.h +++ b/include/qpid/dispatch/amqp.h @@ -112,6 +112,7 @@ extern const char * const QD_MA_TRACE;///< Trace extern const char * const QD_MA_TO; ///< To-Override extern const char * const QD_MA_PHASE;///< Phase for override address extern const char * const QD_MA_CLASS;///< Message-Class +extern const char * const QD_MA_STREAM; ///< Indicate streaming message #define QD_MA_PREFIX_LEN (9) #define QD_MA_INGRESS_LEN (16) @@ -119,6 +120,7 @@ extern const char * const QD_MA_CLASS;///< Message-Class #define QD_MA_TO_LEN (11) #define QD_MA_PHASE_LEN (14) #define QD_MA_CLASS_LEN (14) +#define QD_MA_STREAM_LEN (15) extern const int QD_MA_MAX_KEY_LEN; ///< strlen of longest key name extern const int QD_MA_N_KEYS; ///< number of router annotation keys diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index 9978d1c..b797eca 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -197,6 +197,23 @@ void qd_message_set_phase_annotation(qd_message_t *msg, int phase); int qd_message_get_phase_annotation(const qd_message_t *msg); /** + * Indicate whether message should be considered to be streaming. + * + * @param msg Pointer to an outgoing message. + * @param stream true if the message is streaming + * + */ +void qd_message_set_stream_annotation(qd_message_t *msg, bool stream); +/** + * Test whether received message should be considered to be streaming. + * + * @param msg Pointer to an outgoing message. + * @return true if the received message has the streaming annotation set, else false. + * + */ +int qd_message_is_streaming(qd_message_t *msg); + +/** * Set the value for the QD_MA_INGRESS field in the outgoing message * annotations for the message. * diff --git a/include/qpid/dispatch/parse.h b/include/qpid/dispatch/parse.h index 7fed15d..f6e5fd7 100644 --- a/include/qpid/dispatch/parse.h +++ b/include/qpid/dispatch/parse.h @@ -301,6 +301,7 @@ void qd_parse_annotations( qd_parsed_field_t**ma_phase, qd_parsed_field_t**ma_to_override, qd_parsed_field_t**ma_trace, +qd_parsed_field_t**ma_stream, qd_iterator_pointer_t *blob_pointer, uint32_t *blob_item_count); @@ -312,6 +313,7 @@ typedef enum { QD_MAE_TRACE, QD_MAE_TO, QD_MAE_PHASE, +QD_MAE_STREAM, QD_MAE_NONE } qd_ma_enum_t; diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index f5d0ac7..cd22384 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -145,6 +145,8 @@ static int handle_incoming(qdr_tcp_connection_t *conn) } else { qd_message_t *msg = qd_message(); +qd_message_set_stream_annotation(msg, true); + qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0); qd_compose_start_list(props); qd_compose_insert_null(props); // message-id @@ -966,6 +968,7 @@ static void qdr_tcp_adaptor_init(qdr_core_t *core, void **adaptor_context) adaptor->log_source = qd_log_source("TCP_ADAPTOR"); DEQ_INIT(adaptor->listeners); DEQ_INIT(adaptor->connectors); +DEQ_INIT(adaptor->connections); *adaptor_context = adaptor; tcp_adaptor = adaptor; diff --git a/src/amqp.c b/src/amqp.c index 7c9d9c0..8da6db2 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -30,6 +30,7 @@ const char * const QD_MA_TRACE = "x-opt-qd.trace"; const char * const QD_MA_TO = "x-opt-qd.to"; const char * const QD_MA_PHASE = "x-opt-qd.phase"; const char * const QD_MA_CLASS = "x-opt-qd.class"; +const char * const QD_MA_STREAM = "x-opt-qd.stream"; const int QD_MA_MAX_KEY_LEN = 16; const int QD_MA_N_KEYS = 5; // max number of router annotations to send/receive const int QD_MA_FILTER_LEN = 5; // N tailing inbound entries to search for stripp
[qpid-dispatch] 01/04: DISPATCH-1654: need to set to field on message
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git commit 9dc69d55b003cd097efa8b30edaed5b3f789a10d Author: Gordon Sim AuthorDate: Mon Oct 5 20:14:42 2020 +0100 DISPATCH-1654: need to set to field on message --- src/adaptors/tcp_adaptor.c | 12 +--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 0e06059..f5d0ac7 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -149,9 +149,15 @@ static int handle_incoming(qdr_tcp_connection_t *conn) qd_compose_start_list(props); qd_compose_insert_null(props); // message-id qd_compose_insert_null(props); // user-id -qd_compose_insert_null(props); // to -qd_compose_insert_string(props, conn->global_id); // subject -qd_compose_insert_string(props, conn->reply_to);// reply-to +if (conn->ingress) { +qd_compose_insert_string(props, conn->config.address); // to +qd_compose_insert_string(props, conn->global_id); // subject +qd_compose_insert_string(props, conn->reply_to);// reply-to +} else { +qd_compose_insert_string(props, conn->reply_to); // to +qd_compose_insert_string(props, conn->global_id); // subject +qd_compose_insert_null(props);// reply-to +} //qd_compose_insert_null(props); // correlation-id //qd_compose_insert_null(props); // content-type //qd_compose_insert_null(props); // content-encoding - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] 04/04: DISPATCH-1806: Rearrange TCP adaptor outbound body data handling
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git commit 4fe73d8b299f343f8a57cce80932a12c16f0385d Author: Chuck Rolke AuthorDate: Wed Oct 14 11:34:33 2020 -0400 DISPATCH-1806: Rearrange TCP adaptor outbound body data handling --- src/adaptors/tcp_adaptor.c | 130 +++-- 1 file changed, 101 insertions(+), 29 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index cd22384..397f8bd 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -63,6 +63,14 @@ struct qdr_tcp_connection_t { uint64_t last_in_time; uint64_t last_out_time; +qd_message_body_data_t *outgoing_body_data; // current segment +size_t outgoing_body_bytes; // bytes received from current segment +int outgoing_body_offset; // buffer offset into current segment + +pn_raw_buffer_t outgoing_buffs[WRITE_BUFFERS]; +int outgoing_buff_count; // number of buffers with data +int outgoing_buff_idx;// first buffer with data + DEQ_LINKS(qdr_tcp_connection_t); }; @@ -220,16 +228,18 @@ static void handle_disconnected(qdr_tcp_connection_t* conn) static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_raw_buffer_t *buffers, int count) { int used = 0; -qd_message_body_data_t *body_data; -while (used < count) { -qd_message_body_data_result_t body_data_result = qd_message_next_body_data(msg, &body_data); + +// Advance to next body_data vbin segment if necessary. +// Return early if no data to process or error +if (conn->outgoing_body_data == 0) { +qd_message_body_data_result_t body_data_result = qd_message_next_body_data(msg, &conn->outgoing_body_data); if (body_data_result == QD_MESSAGE_BODY_DATA_OK) { -used += qd_message_body_data_buffers(body_data, buffers + used, used, count - used); -if (used > 0) { -buffers[used-1].context = (uintptr_t) body_data; -} +// a new body_data segment has been found +conn->outgoing_body_bytes = 0; +conn->outgoing_body_offset = 0; +// continue to process this segment } else if (body_data_result == QD_MESSAGE_BODY_DATA_INCOMPLETE) { -return used; +return 0; } else { switch (body_data_result) { case QD_MESSAGE_BODY_DATA_NO_MORE: @@ -245,36 +255,98 @@ static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_r return -1; } } + +// A valid body_data is in place. +// Try to get a buffer set from it. +used = qd_message_body_data_buffers(conn->outgoing_body_data, buffers, conn->outgoing_body_offset, count); +if (used > 0) { +// Accumulate the lengths of the returned buffers. +for (int i=0; ioutgoing_body_bytes += buffers[i].size; +} + +// Buffers returned should never exceed the body_data payload length +assert(conn->outgoing_body_bytes <= conn->outgoing_body_data->payload.length); + +if (conn->outgoing_body_bytes == conn->outgoing_body_data->payload.length) { +// This buffer set consumes the remainder of the body_data segment. +// Attach the body_data struct to the last buffer so that the struct +// can be freed after the buffer has been transmitted by raw connection out. +buffers[used-1].context = (uintptr_t) conn->outgoing_body_data; + +// Erase the body_data struct from the connection so that +// a new one gets created on the next pass. +conn->outgoing_body_data = 0; +} else { +// Returned buffer set did not consume the entire body_data segment. +// Leave existing body_data struct in place for use on next pass. +// Add the number of returned buffers to the offset for the next pass. +conn->outgoing_body_offset += used; +} +} else { +// No buffers returned. +// This sender has caught up with all data available on the input stream. +} return used; } +static bool write_outgoing_buffs(qdr_tcp_connection_t *conn) +{ +// Send the outgoing buffs to pn_raw_conn. +// Return true if all the buffers went out. +bool result; + +if (conn->outgoing_buff_count == 0) { +result = true; +} else { +size_t used = pn_raw_connection_write_buffers(conn->socket, + &conn->outgoing_buffs[conn->outgoing_buff_idx], +
[qpid-dispatch] branch dev-protocol-adaptors-2 updated (eccba23 -> 4fe73d8)
This is an automated email from the ASF dual-hosted git repository. chug pushed a change to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git. from eccba23 DISPATCH-1807: TCP adaptor test echo server new 9dc69d5 DISPATCH-1654: need to set to field on message new c56a1fb DISPATCH-1654: allow content to be seen when available new a8e548b DISPATCH-1654: fix for streaming message new 4fe73d8 DISPATCH-1806: Rearrange TCP adaptor outbound body data handling The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: include/qpid/dispatch/amqp.h| 2 + include/qpid/dispatch/message.h | 17 + include/qpid/dispatch/parse.h | 2 + src/adaptors/tcp_adaptor.c | 145 +++- src/amqp.c | 1 + src/message.c | 35 +- src/message_private.h | 2 + src/parse.c | 12 +++- src/router_node.c | 64 +- 9 files changed, 214 insertions(+), 66 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1807: TCP adaptor test echo server
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new eccba23 DISPATCH-1807: TCP adaptor test echo server eccba23 is described below commit eccba23ad05df2490e23790ea6690eee41bd99f4 Author: Chuck Rolke AuthorDate: Thu Oct 15 19:26:35 2020 -0400 DISPATCH-1807: TCP adaptor test echo server --- tests/TCP_echo_server.py | 120 +++ 1 file changed, 120 insertions(+) diff --git a/tests/TCP_echo_server.py b/tests/TCP_echo_server.py new file mode 100755 index 000..61e95a2 --- /dev/null +++ b/tests/TCP_echo_server.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import argparse +import os +import selectors +import socket +import sys +import traceback +import types + +from system_test import Logger + +HOST = '127.0.0.1' + +def main_except(port, logger): +''' +:param port: port to listen on +:param logger: Logger() object +:return: +''' +# set up listening socket +sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +sock.bind((HOST, port)) +sock.listen() +sock.setblocking(False) +logger.log('Listening on host:%s, port:%d' % (HOST, port)) + +# set up selector +sel = selectors.DefaultSelector() +sel.register(sock, selectors.EVENT_READ, data=None) + +# event loop +while True: +events = sel.select(timeout=None) +for key, mask in events: +if key.data is None: +do_accept(key.fileobj, sel, logger) +else: +do_service(key, mask, sel, logger) + +def do_accept(sock, sel, logger): +conn, addr = sock.accept() +logger.log('Accepted connection from %s:%d' % (addr[0], addr[1])) +conn.setblocking(False) +data = types.SimpleNamespace(addr=addr, inb=b'', outb=b'') +events = selectors.EVENT_READ | selectors.EVENT_WRITE +sel.register(conn, events, data=data) + +def do_service(key, mask, sel, logger): +sock = key.fileobj +data = key.data +if mask & selectors.EVENT_READ: +recv_data = sock.recv(1024) +if recv_data: +data.outb += recv_data +logger.log('ECHO read from: %s:%d len:%d: %s' % (data.addr[0], data.addr[1], len(recv_data), repr(recv_data))) +else: +logger.log('Closing connection to %s:%d' % (data.addr[0], data.addr[1])) +sel.unregister(sock) +sock.close() +if mask & selectors.EVENT_WRITE: +if data.outb: +sent = sock.send(data.outb) +if sent > 0: +logger.log('ECHO write to : %s:%d len:%d: %s' % (data.addr[0], data.addr[1], sent, repr(data.outb[:sent]))) +else: +logger.log('ECHO write to : %s:%d len:0' % (data.addr[0], data.addr[1])) +data.outb = data.outb[sent:] + + +def main(argv): +try: +# parse args +p = argparse.ArgumentParser() +p.add_argument('--port', '-p', + help='Required listening port number') +p.add_argument('--log', '-l', + action='store_true', + help='Write activity log to console') +del argv[0] +args = p.parse_args(argv) + +# port +if args.port is None: +raise Exception("User must specify a port number") +port = int(args.port) + +# logging +logger = Logger(title = "TCP_echo_server port %d" % port, +print_to_console = args.log, +save_for_dump = False) + +main_except(port, logger) +return 0 +except Exception as e: +traceback.print_exc() +return 1 + + +if __name__ == "__main__": +sys.exit(main(sys.argv)) - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch dev-protocol-adaptors-2 updated: NO-JIRA: Assert offset vs. body_data_buffer_count only when offset nonzero.
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new e9b5dc7 NO-JIRA: Assert offset vs. body_data_buffer_count only when offset nonzero. e9b5dc7 is described below commit e9b5dc7551e79a8163f00a2bcabeb0ea421868db Author: Chuck Rolke AuthorDate: Wed Oct 14 11:27:11 2020 -0400 NO-JIRA: Assert offset vs. body_data_buffer_count only when offset nonzero. Prevents offset=0 and buffer_count=0 asserts. --- src/message.c | 14 -- 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/message.c b/src/message.c index 2894086..9d3c5c5 100644 --- a/src/message.c +++ b/src/message.c @@ -2435,12 +2435,14 @@ int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffe // // Skip the buffer offset // -assert(offset < qd_message_body_data_buffer_count(body_data)); -while (offset > 0 && payload_len > 0) { -payload_len -= qd_buffer_size(buffer) - data_offset; -offset--; -data_offset = 0; -buffer = DEQ_NEXT(buffer); +if (offset > 0) { +assert(offset < qd_message_body_data_buffer_count(body_data)); +while (offset > 0 && payload_len > 0) { +payload_len -= qd_buffer_size(buffer) - data_offset; +offset--; +data_offset = 0; +buffer = DEQ_NEXT(buffer); +} } // - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch master updated: DISPATCH-1794: Set annotation OUT pad length equal to IN strip/test length
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new f60330b DISPATCH-1794: Set annotation OUT pad length equal to IN strip/test length f60330b is described below commit f60330be10b66a47f99bf904412fb18a2ad57b53 Author: Chuck Rolke AuthorDate: Tue Oct 13 11:26:29 2020 -0400 DISPATCH-1794: Set annotation OUT pad length equal to IN strip/test length --- src/amqp.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/amqp.c b/src/amqp.c index 0dab017..7c9d9c0 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -31,7 +31,7 @@ const char * const QD_MA_TO = "x-opt-qd.to"; const char * const QD_MA_PHASE = "x-opt-qd.phase"; const char * const QD_MA_CLASS = "x-opt-qd.class"; const int QD_MA_MAX_KEY_LEN = 16; -const int QD_MA_N_KEYS = 4; // max number of router annotations to send/receive +const int QD_MA_N_KEYS = 5; // max number of router annotations to send/receive const int QD_MA_FILTER_LEN = 5; // N tailing inbound entries to search for stripping const char * const QD_CAPABILITY_ROUTER_CONTROL = "qd.router"; - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch dev-protocol-adaptors updated: DISPATCH-1799: Add unit test for message body_data functions
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push: new 25ff354 DISPATCH-1799: Add unit test for message body_data functions 25ff354 is described below commit 25ff3545d66d34cefafbe330194013ff53c20341 Author: Chuck Rolke AuthorDate: Tue Oct 13 09:23:47 2020 -0400 DISPATCH-1799: Add unit test for message body_data functions Passes messages with varying vbin sizes and vbin segment counts. The unit test framework then tests each combination with varying qd_buffer sizes. This closes #874 --- tests/message_test.c | 238 ++- 1 file changed, 237 insertions(+), 1 deletion(-) diff --git a/tests/message_test.c b/tests/message_test.c index 2966765..135adc8 100644 --- a/tests/message_test.c +++ b/tests/message_test.c @@ -24,8 +24,10 @@ #include #include #include +#include -static unsigned char buffer[1]; +#define FLAT_BUF_SIZE (10) +static unsigned char buffer[FLAT_BUF_SIZE]; static size_t flatten_bufs(qd_message_content_t *content) { @@ -703,6 +705,239 @@ exit: return result; } +// +// Testing protocol adapter 'body_data' interfaces +// + +static void body_data_generate_message(qd_message_t *msg, char *s_chunk_size, char *s_n_chunks) +{ +// Fill a message with n_chunks of vbin chunk_size body data. + +int chunk_size = atoi(s_chunk_size); +int n_chunks = atoi(s_n_chunks); + +// Add message headers +qd_message_compose_1(msg, "whom-it-may-concern", 0); + +// Add the chunks. This creates the test state for not-flattened buffers. +for (int j=0; jbuffers); +qd_message_extend(msg, field); + +// Clean up temporary resources +free(buf2); +qd_compose_free(field); +qd_message_free(mule); +} +} + +static void free_body_data_list(qd_message_t *msg_in) +{ +// DISPATCH-1800 - this should not be required here +qd_message_pvt_t *msg = (qd_message_pvt_t *)msg_in; +qd_message_body_data_t *bd = DEQ_HEAD(msg->body_data_list); +while (bd) { +qd_message_body_data_t *next = DEQ_NEXT(bd); +free_qd_message_body_data_t(bd); +bd = next; +} + +} + +static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten) +{ +// Fill a message with n chunks of vbin chunk_size body data. +// Then test by retrieving n chunks from a message copy and verifing. +// +// 'flatten' messes with message buffers after they have been composed. +// * Not flattened means that vbin headers stand alone in separate buffers and +// vbin data always starts in the first byte of a new buffer. This is the +// buffer condition when a message is forwarded between adaptors on a single +// router. The receiver and sender have two messages but share message content. +// * Flattened means that vbin headers and vbin data are packed into the buffer +// list. This is the buffer condition when a message is forwarded between +// routers and the receiver is handling the vbin segments. + +int chunk_size = atoi(s_chunk_size); +int n_chunks = atoi(s_n_chunks); + +char *result = 0; +int received; // got this much of chunk_size chunk + +// Messages for setting/sensing body data +qd_message_t *msg = qd_message(); +qd_message_t *copy= qd_message_copy(msg); +qd_message_pvt_t *msg_pvt = (qd_message_pvt_t *)msg; + +// Set the original message content +body_data_generate_message(msg, s_chunk_size, s_n_chunks); + +// flatten if required +if (flatten) { +// check that the flatten buffer is big enough +int vbin_size = chunk_size > 511 ? 8 : 5; // per-chunk vbin descriptor overhead +int header_size = 100; // leave plenty of allocaton for header +int msg_size = n_chunks * (chunk_size + vbin_size) + header_size; +assert(msg_size < FLAT_BUF_SIZE); + +// compress message into flatten buffer +size_t flat_size = flatten_bufs(MSG_CONTENT(msg)); + +// erase buffer list in msg and copy +qd_buffer_list_free_buffers(&msg_pvt->content->buffers); + +// reconstruct buffer list from flat buffer +qd_buffer_list_append(&msg_pvt->content->buffers, buffer, flat_size); +} + +// check the chunks +// Define the number of raw buffers to be extracted on each loop +#define N_PN_RAW_BUFFS (2) + +qd_message_body_data_t *body_data; + +for (int j=0; jpayload.length != chunk_size) { +printf("** check_body_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, " +
[qpid-dispatch] branch dev-protocol-adaptors updated: DISPATCH-1778: Extra data included in adaptor outbound streams
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push: new 2a410c8 DISPATCH-1778: Extra data included in adaptor outbound streams 2a410c8 is described below commit 2a410c8525af4f4377c2fea11adaf1ff5a8b56db Author: Chuck Rolke AuthorDate: Mon Oct 5 13:22:16 2020 -0400 DISPATCH-1778: Extra data included in adaptor outbound streams Body_data processing failed to contain output to the current section. This patch prevents qd_message_body_data_buffers from sending too much data if new sections are added before the current section is disposed. --- src/message.c | 6 ++ 1 file changed, 6 insertions(+) diff --git a/src/message.c b/src/message.c index c5d8725..21c226b 100644 --- a/src/message.c +++ b/src/message.c @@ -2450,6 +2450,12 @@ int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffe buffers[idx].size = qd_buffer_size(buffer) - (buffer == body_data->payload.buffer ? body_data->payload.offset : 0); buffers[idx].offset = 0; +if (buffer == body_data->last_buffer) { +// Don't process beyond the end of this body_data section +actual_count++; +break; +} + buffer = DEQ_NEXT(buffer); actual_count++; idx++; - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-dispatch] branch master updated: DISPATCH-1781: Scraper injects any log module output into data web page
This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new 0207301 DISPATCH-1781: Scraper injects any log module output into data web page 0207301 is described below commit 0207301f8d092fb661862c452a1faef90b794cc6 Author: Chuck Rolke AuthorDate: Thu Sep 24 09:43:18 2020 -0400 DISPATCH-1781: Scraper injects any log module output into data web page Add command line settings to specify a list of log modules instead of hard-coding 'SCRAPER' as the injected log module. These log module statements are injected verbatim into the main scraper web page. Command line adds: --log-modules LOG_MODULES, -lm LOG_MODULES Include list of named modules log entries in main log display. Specify multiple modules as a CSV string. Defaults to "SCRAPER" This will be useful for upcoming protocol adaptors. scraper --log-modules TCP_ADAPTOR -f *.log > test.html firefox test.html --- tools/scraper/common.py | 19 + tools/scraper/parser.py | 53 ++-- tools/scraper/scraper.py | 6 ++ 3 files changed, 58 insertions(+), 20 deletions(-) diff --git a/tools/scraper/common.py b/tools/scraper/common.py index 8f2b9c0..cdd04ec 100755 --- a/tools/scraper/common.py +++ b/tools/scraper/common.py @@ -101,6 +101,10 @@ class Common(): # when --no-data is in effect, how many log lines were skipped? data_skipped = 0 +# List of router log module names to include verbatim. +# Defaults to "SCRAPER". Overridden by command line. +verbatim_include_list = ["SCRAPER"] + def router_id_index(self, id): """ Given a router full container name, return the index in router_ids table @@ -110,6 +114,21 @@ class Common(): """ return self.router_ids.index(id) +def module_key_in_line(self, key, line): +''' +Sense if the key is a log module name in the log line. +The name can't be too far into the string or else it finds +false positives when a user uses qdstat to get a log file. +MAX_POSITION defines what constitutes 'too far'. +:param key: +:param line: +:return: +''' +MAX_POSITION = 40 +assert len(key) > 0 +st = line.find(key) +return st >= 0 and st <= MAX_POSITION + def log_letter_of(idx): ''' diff --git a/tools/scraper/parser.py b/tools/scraper/parser.py index edf110a..c0c998a 100755 --- a/tools/scraper/parser.py +++ b/tools/scraper/parser.py @@ -389,7 +389,6 @@ class ParsedLogLine(object): router_ls_key = "ROUTER_LS (info)" transfer_key = "@transfer(20)" proton_frame_key = "FRAME: " -scraper_key = "SCRAPER (" def sender_settle_mode_of(self, value): if value == "0": @@ -808,12 +807,18 @@ class ParsedLogLine(object): :param _comn: :param _router: """ -if not (ParsedLogLine.server_trace_key in _line or -ParsedLogLine.protocol_trace_key in _line or -(ParsedLogLine.policy_trace_key in _line and "lookup_user:" in _line) or # open (not begin, attach) -ParsedLogLine.server_info_key in _line or -ParsedLogLine.router_ls_key in _line or -ParsedLogLine.scraper_key in _line): +verbatim_module = None +if len(_comn.verbatim_include_list) > 0: +for modx in _comn.verbatim_include_list: +if _comn.module_key_in_line(modx, _line): +verbatim_module = modx +break +if not (_comn.module_key_in_line(self.server_trace_key, _line) or +_comn.module_key_in_line(self.protocol_trace_key, _line) or +(_comn.module_key_in_line(self.policy_trace_key, _line) and "lookup_user:" in _line) or # open (not begin, attach) +_comn.module_key_in_line(self.server_info_key, _line) or +_comn.module_key_in_line(self.router_ls_key, _line) or +verbatim_module is not None): raise ValueError("Line is not a candidate for parsing") self.index = _log_index # router prefix 0 for A, 1 for B self.instance = _instance # router instance in log file @@ -877,17 +882,20 @@ class ParsedLogLine(object): raise ValueError("Line too late outside time-of-day limits") # Pull out scraper literal logs