This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new 5d693210 Fixes #482: replace reactor tests with proactor implementation (#488) 5d693210 is described below commit 5d693210b806f5441960a457c68f16fd3fc83606 Author: Ken Giusti <kgiu...@redhat.com> AuthorDate: Thu May 19 14:02:56 2022 -0400 Fixes #482: replace reactor tests with proactor implementation (#488) (cherry picked from commit ff073628c99e24eeb1904435666bb28eacfed20c) --- tests/clogger.c | 101 +++++++++++++++++++------------------ tests/test-receiver.c | 135 ++++++++++++++++++++++++++++--------------------- tests/test-sender.c | 136 ++++++++++++++++++++++++++++++-------------------- 3 files changed, 212 insertions(+), 160 deletions(-) diff --git a/tests/clogger.c b/tests/clogger.c index dfa8e3af..a60a8979 100644 --- a/tests/clogger.c +++ b/tests/clogger.c @@ -26,10 +26,8 @@ #include "proton/connection.h" #include "proton/delivery.h" -#include "proton/event.h" -#include "proton/handlers.h" #include "proton/link.h" -#include "proton/reactor.h" +#include "proton/proactor.h" #include "proton/session.h" #include "proton/transport.h" @@ -63,14 +61,14 @@ uint32_t pause_msec = 100; // pause between sending chunks (milliseconds) const char *target_address = "test-address"; const char *host_address = "127.0.0.1:5672"; const char *container_name = "Clogger"; +char proactor_address[1024]; -pn_reactor_t *reactor; +// +pn_proactor_t *proactor; pn_connection_t *pn_conn; pn_session_t *pn_ssn; pn_link_t *pn_link; pn_delivery_t *pn_dlv; // current in-flight delivery -pn_handler_t *_handler; - uint32_t bytes_sent; // number of body data bytes written out link uint32_t remote_max_frame = DEFAULT_MAX_FRAME; // used to limit amount written @@ -119,7 +117,7 @@ static void signal_handler(int signum) case SIGINT: case SIGQUIT: stop = true; - if (reactor) pn_reactor_wakeup(reactor); + if (proactor) pn_proactor_interrupt(proactor); break; default: break; @@ -193,9 +191,9 @@ bool send_message_data() pn_delivery_settle(pn_dlv); if (limit && sent == limit) { // no need to wait for acks - debug("stopping...\n"); + debug("stopping (presettled)...\n"); stop = true; - pn_reactor_wakeup(reactor); + pn_proactor_interrupt(proactor); } } pn_dlv = 0; @@ -208,10 +206,9 @@ bool send_message_data() /* Process each event posted by the proactor. Return true if client has stopped. */ -static void event_handler(pn_handler_t *handler, - pn_event_t *event, - pn_event_type_t etype) +static bool event_handler(pn_event_t *event) { + const pn_event_type_t etype = pn_event_type(event); debug("new event=%s\n", pn_event_type_name(etype)); switch (etype) { @@ -243,13 +240,12 @@ static void event_handler(pn_handler_t *handler, if (pn_link_credit(pn_link) > 0) { if (!pn_dlv) { start_message(); - pn_reactor_schedule(reactor, pause_msec, _handler); // send body after pause + pn_proactor_set_timeout(proactor, pause_msec); // send body after pause } } } } break; - case PN_TRANSPORT: { ssize_t pending = pn_transport_pending(pn_event_transport(event)); debug("PN_TRANSPORT pending=%ld\n", pending); @@ -288,31 +284,37 @@ static void event_handler(pn_handler_t *handler, // initiate clean shutdown of the endpoints debug("stopping...\n"); stop = true; - pn_reactor_wakeup(reactor); + pn_proactor_interrupt(proactor); } } } break; - case PN_TIMER_TASK: { + case PN_PROACTOR_TIMEOUT: { + if (pn_conn) pn_connection_wake(pn_conn); + } break; + + case PN_CONNECTION_WAKE: { if (!send_message_data()) { // not done sending - pn_reactor_schedule(reactor, pause_msec, _handler); + pn_proactor_set_timeout(proactor, pause_msec); } else if (limit == 0 || sent < limit) { if (pn_link_credit(pn_link) > 0) { // send next message start_message(); - pn_reactor_schedule(reactor, pause_msec, _handler); + pn_proactor_set_timeout(proactor, pause_msec); } } } break; + case PN_PROACTOR_INACTIVE: { + debug("proactor inactive!\n"); + return stop; + } break; + default: break; } -} - -static void delete_handler(pn_handler_t *handler) -{ + return false; } @@ -374,51 +376,54 @@ int main(int argc, char** argv) host_address += strlen("amqp://"); } - // convert host_address to hostname and port + // trim port from hostname char *hostname = strdup(host_address); char *port = strchr(hostname, ':'); - if (!port) { - port = "5672"; - } else { + if (port) { *port++ = 0; + } else { + port = "5672"; } - _handler = pn_handler_new(event_handler, 0, delete_handler); - pn_handler_add(_handler, pn_handshaker()); - - reactor = pn_reactor(); - pn_conn = pn_reactor_connection_to_host(reactor, - hostname, - port, - _handler); - + pn_conn = pn_connection(); // the container name should be unique for each client pn_connection_set_container(pn_conn, container_name); pn_connection_set_hostname(pn_conn, hostname); + proactor = pn_proactor(); + pn_proactor_addr(proactor_address, sizeof(proactor_address), hostname, port); + pn_proactor_connect2(proactor, pn_conn, 0, proactor_address); + free(hostname); + + bool done = false; + while (!done) { + debug("Waiting for proactor event...\n"); + pn_event_batch_t *events = pn_proactor_wait(proactor); + debug("Start new proactor batch\n"); + pn_event_t *event = pn_event_batch_next(events); + while (event) { + done = event_handler(event); + if (done) + break; - // break out of pn_reactor_process once a second to check if done - pn_reactor_set_timeout(reactor, 1000); + event = pn_event_batch_next(events); + } - pn_reactor_start(reactor); + debug("Proactor batch processing done\n"); + pn_proactor_done(proactor, events); - while (pn_reactor_process(reactor)) { - if (stop) { - // close the endpoints this will cause pn_reactor_process() to - // eventually break the loop + if (stop && pn_conn) { + debug("Stop detected - closing connection...\n"); if (pn_link) pn_link_close(pn_link); if (pn_ssn) pn_session_close(pn_ssn); - if (pn_conn) pn_connection_close(pn_conn); + pn_connection_close(pn_conn); pn_link = 0; pn_ssn = 0; pn_conn = 0; } } - if (pn_link) pn_link_free(pn_link); - if (pn_ssn) pn_session_free(pn_ssn); - if (pn_conn) pn_connection_close(pn_conn); - - pn_reactor_free(reactor); + debug("Send complete!\n"); + pn_proactor_free(proactor); if (not_accepted) { printf("Sent: %" PRIu64 " Accepted: %" PRIu64 " Not Accepted: %" PRIu64 "\n", sent, accepted, not_accepted); diff --git a/tests/test-receiver.c b/tests/test-receiver.c index 14f8c9bd..68fb46f6 100644 --- a/tests/test-receiver.c +++ b/tests/test-receiver.c @@ -20,12 +20,10 @@ #include "proton/connection.h" #include "proton/delivery.h" -#include "proton/event.h" -#include "proton/handlers.h" #include "proton/link.h" #include "proton/message.h" -#include "proton/reactor.h" #include "proton/session.h" +#include "proton/proactor.h" #include <inttypes.h> #include <signal.h> @@ -34,11 +32,13 @@ #include <string.h> #include <time.h> #include <unistd.h> +#include <assert.h> #define BOOL2STR(b) ((b)?"true":"false") bool stop = false; bool verbose = false; +bool debug_mode = false; int credit_window = 1000; char *source_address = "test-address"; // name of the source node to receive from @@ -46,17 +46,30 @@ char _addr[] = "127.0.0.1:5672"; char *host_address = _addr; char *container_name = "TestReceiver"; bool drop_connection = false; +char proactor_address[1024]; pn_connection_t *pn_conn; pn_session_t *pn_ssn; pn_link_t *pn_link; -pn_reactor_t *reactor; +pn_proactor_t *proactor; pn_message_t *in_message; // holds the current received message uint64_t count = 0; uint64_t limit = 0; // if > 0 stop after limit messages arrive +void debug(const char *format, ...) +{ + va_list args; + + if (!debug_mode) return; + + va_start(args, format); + vprintf(format, args); + va_end(args); +} + + static void signal_handler(int signum) { signal(SIGINT, SIG_IGN); @@ -66,6 +79,7 @@ static void signal_handler(int signum) case SIGINT: case SIGQUIT: stop = true; + if (proactor) pn_proactor_interrupt(proactor); break; default: break; @@ -73,23 +87,12 @@ static void signal_handler(int signum) } -// Called when reactor exits to clean up app_data -// -static void delete_handler(pn_handler_t *handler) -{ - if (in_message) { - pn_message_free(in_message); - in_message = NULL; - } -} - - -/* Process each event posted by the reactor. +/* Process each event posted by the proactor */ -static void event_handler(pn_handler_t *handler, - pn_event_t *event, - pn_event_type_t type) +static bool event_handler(pn_event_t *event) { + const pn_event_type_t type = pn_event_type(event); + debug("new event=%s\n", pn_event_type_name(type)); switch (type) { case PN_CONNECTION_INIT: { @@ -140,14 +143,31 @@ static void event_handler(pn_handler_t *handler, if (limit && count == limit) { stop = true; - pn_reactor_wakeup(reactor); } } } break; + case PN_PROACTOR_TIMEOUT: { + if (verbose) { + fprintf(stdout, "Received:%"PRIu64" of %"PRIu64"\n", count, limit); + fflush(stdout); + if (!stop) { + pn_proactor_set_timeout(proactor, 10 * 1000); + } + } + } break; + + case PN_PROACTOR_INACTIVE: { + assert(stop); // expect: inactive due to stopping + debug("proactor inactive!\n"); + return true; + } break; + default: break; } + + return false; } static void usage(void) @@ -160,21 +180,17 @@ static void usage(void) printf("-w \tCredit window [%d]\n", credit_window); printf("-E \tExit without cleanly closing the connection [off]\n"); printf("-d \tPrint periodic status updates [%s]\n", BOOL2STR(verbose)); + printf("-D \tPrint debug info [off]\n"); exit(1); } int main(int argc, char** argv) { - /* create a handler for the connection's events. - */ - pn_handler_t *handler = pn_handler_new(event_handler, 0, delete_handler); - pn_handler_add(handler, pn_handshaker()); - /* command line options */ opterr = 0; int c; - while((c = getopt(argc, argv, "i:a:s:hdw:c:E")) != -1) { + while((c = getopt(argc, argv, "i:a:s:hdDw:c:E")) != -1) { switch(c) { case 'h': usage(); break; case 'a': host_address = optarg; break; @@ -190,6 +206,7 @@ int main(int argc, char** argv) break; case 'E': drop_connection = true; break; case 'd': verbose = true; break; + case 'D': debug_mode = true; break; default: usage(); @@ -210,50 +227,52 @@ int main(int argc, char** argv) port = "5672"; } - reactor = pn_reactor(); - pn_conn = pn_reactor_connection_to_host(reactor, - host, - port, - handler); - + pn_conn = pn_connection(); // the container name should be unique for each client pn_connection_set_container(pn_conn, container_name); pn_connection_set_hostname(pn_conn, host); + proactor = pn_proactor(); + pn_proactor_addr(proactor_address, sizeof(proactor_address), host, port); + pn_proactor_connect2(proactor, pn_conn, 0, proactor_address); - // periodic wakeup - pn_reactor_set_timeout(reactor, 1000); + if (verbose) { + // print status every 10 seconds.. + pn_proactor_set_timeout(proactor, 10 * 1000); + } - pn_reactor_start(reactor); + bool done = false; + while (!done) { + debug("Waiting for proactor event...\n"); + pn_event_batch_t *events = pn_proactor_wait(proactor); + debug("Start new proactor batch\n"); - time_t last_log = time(NULL); - while (pn_reactor_process(reactor)) { - if (stop) { - if (drop_connection) // hard exit - exit(0); - // close the endpoints this will cause pn_reactor_process() to - // eventually break the loop - if (pn_link) pn_link_close(pn_link); - if (pn_ssn) pn_session_close(pn_ssn); - if (pn_conn) pn_connection_close(pn_conn); - - } else if (verbose) { + pn_event_t *event = pn_event_batch_next(events); + while (!done && event) { + done = event_handler(event); + event = pn_event_batch_next(events); + } - // periodically give status for test output logs + debug("Proactor batch processing done\n"); + pn_proactor_done(proactor, events); - time_t now = time(NULL); - if ((now - last_log) >= 10) { - fprintf(stdout, "Received:%"PRIu64" of %"PRIu64"\n", count, limit); - fflush(stdout); - last_log = now; + if (stop) { + pn_proactor_cancel_timeout(proactor); + if (drop_connection) { // hard stop + exit(0); + } + if (pn_conn) { + debug("Stop detected - closing connection...\n"); + if (pn_link) pn_link_close(pn_link); + if (pn_ssn) pn_session_close(pn_ssn); + pn_connection_close(pn_conn); + pn_link = 0; + pn_ssn = 0; + pn_conn = 0; } } } - if (pn_link) pn_link_free(pn_link); - if (pn_ssn) pn_session_free(pn_ssn); - if (pn_conn) pn_connection_close(pn_conn); - - pn_reactor_free(reactor); + pn_proactor_free(proactor); if (verbose) { fprintf(stdout, "Received:%"PRIu64" of %"PRIu64"\n", count, limit); diff --git a/tests/test-sender.c b/tests/test-sender.c index 57867b4d..d0bd448f 100644 --- a/tests/test-sender.c +++ b/tests/test-sender.c @@ -25,12 +25,10 @@ #include "proton/connection.h" #include "proton/delivery.h" -#include "proton/event.h" -#include "proton/handlers.h" #include "proton/link.h" #include "proton/message.h" -#include "proton/reactor.h" #include "proton/session.h" +#include "proton/proactor.h" #include <errno.h> #include <inttypes.h> @@ -41,6 +39,7 @@ #include <string.h> #include <time.h> #include <unistd.h> +#include <assert.h> #define BOOL2STR(b) ((b)?"true":"false") @@ -56,6 +55,7 @@ bool stop = false; bool verbose = false; +bool debug_mode = false; uint64_t limit = 1; // # messages to send uint64_t count = 0; // # sent @@ -86,11 +86,12 @@ char *target_address = "test-address"; char _addr[] = "127.0.0.1:5672"; char *host_address = _addr; char *container_name = "TestSender"; +char proactor_address[1024]; pn_connection_t *pn_conn; pn_session_t *pn_ssn; pn_link_t *pn_link; -pn_reactor_t *reactor; +pn_proactor_t *proactor; pn_message_t *out_message; @@ -109,6 +110,18 @@ const char big_string[] = "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"; +void debug(const char *format, ...) +{ + va_list args; + + if (!debug_mode) return; + + va_start(args, format); + vprintf(format, args); + va_end(args); +} + + static void add_message_annotations(pn_message_t *out_message) { // just a bunch of dummy MA @@ -228,6 +241,7 @@ static void signal_handler(int signum) case SIGINT: case SIGQUIT: stop = true; + if (proactor) pn_proactor_interrupt(proactor); break; default: break; @@ -235,20 +249,12 @@ static void signal_handler(int signum) } -static void delete_handler(pn_handler_t *handler) -{ - free(encode_buffer); - pn_message_free(out_message); - free((void *)body_data.start); -} - - -/* Process each event posted by the reactor. +/* Process each event posted by the proactor. */ -static void event_handler(pn_handler_t *handler, - pn_event_t *event, - pn_event_type_t type) +static bool event_handler(pn_event_t *event) { + const pn_event_type_t type = pn_event_type(event); + debug("new event=%s\n", pn_event_type_name(type)); switch (type) { case PN_CONNECTION_INIT: { @@ -289,8 +295,8 @@ static void event_handler(pn_handler_t *handler, ++accepted; if (limit && count == limit) { // no need to wait for acks + debug("stopping (presettled)...\n"); stop = true; - pn_reactor_wakeup(reactor); } } } @@ -332,17 +338,39 @@ static void event_handler(pn_handler_t *handler, if (limit && acked == limit) { // initiate clean shutdown of the endpoints + debug("stopping...\n"); stop = true; - pn_reactor_wakeup(reactor); } } } break; + case PN_PROACTOR_TIMEOUT: { + if (verbose) { + fprintf(stdout, + "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64 + " Released:%"PRIu64" Modified:%"PRIu64" Limit:%"PRIu64"\n", + count, accepted, rejected, released, modified, limit); + fflush(stdout); + if (!stop) { + pn_proactor_set_timeout(proactor, 10 * 1000); + } + } + } break; + + case PN_PROACTOR_INACTIVE: { + assert(stop); // expect: inactive due to stopping + debug("proactor inactive!\n"); + return true; + } break; + default: break; } + + return false; } + static void usage(void) { printf("Usage: sender <options>\n"); @@ -359,6 +387,7 @@ static void usage(void) printf("-p \tMessage priority [%d]\n", priority); printf("-X \tMessage body data pattern [%c]\n", (char)body_data_pattern); printf("-d \tPrint periodic status updates [%s]\n", BOOL2STR(verbose)); + printf("-D \tPrint debug info [off]\n"); exit(1); } @@ -367,7 +396,7 @@ int main(int argc, char** argv) /* command line options */ opterr = 0; int c; - while ((c = getopt(argc, argv, "ha:c:i:ns:t:udMEp:X:")) != -1) { + while ((c = getopt(argc, argv, "ha:c:i:ns:t:udMEDp:X:")) != -1) { switch(c) { case 'h': usage(); break; case 'a': host_address = optarg; break; @@ -393,6 +422,7 @@ int main(int argc, char** argv) case 'M': add_annotations = true; break; case 'E': drop_connection = true; break; case 'X': body_data_pattern = optarg[0]; break; + case 'D': debug_mode = true; break; case 'p': if (sscanf(optarg, "%u", &priority) != 1) usage(); @@ -417,59 +447,57 @@ int main(int argc, char** argv) port = "5672"; } - pn_handler_t *handler = pn_handler_new(event_handler, 0, delete_handler); - pn_handler_add(handler, pn_handshaker()); - - reactor = pn_reactor(); - pn_conn = pn_reactor_connection_to_host(reactor, - host, - port, - handler); - + pn_conn = pn_connection(); // the container name should be unique for each client pn_connection_set_container(pn_conn, container_name); pn_connection_set_hostname(pn_conn, host); + proactor = pn_proactor(); + pn_proactor_addr(proactor_address, sizeof(proactor_address), host, port); + pn_proactor_connect2(proactor, pn_conn, 0, proactor_address); + + if (verbose) { + // print status every 10 seconds.. + pn_proactor_set_timeout(proactor, 10 * 1000); + } - // break out of pn_reactor_process once a second to check if done - pn_reactor_set_timeout(reactor, 1000); + bool done = false; + while (!done) { + debug("Waiting for proactor event...\n"); + pn_event_batch_t *events = pn_proactor_wait(proactor); + debug("Start new proactor batch\n"); + + pn_event_t *event = pn_event_batch_next(events); + while (!done && event) { + done = event_handler(event); + event = pn_event_batch_next(events); + } - pn_reactor_start(reactor); + debug("Proactor batch processing done\n"); + pn_proactor_done(proactor, events); - time_t last_log = time(NULL); - while (pn_reactor_process(reactor)) { if (stop) { + pn_proactor_cancel_timeout(proactor); if (drop_connection) { // hard stop fprintf(stdout, "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64 " Released:%"PRIu64" Modified:%"PRIu64"\n", count, accepted, rejected, released, modified); + fflush(stdout); exit(0); } - if (pn_link) pn_link_close(pn_link); - if (pn_ssn) pn_session_close(pn_ssn); - if (pn_conn) pn_connection_close(pn_conn); - - } else if (verbose) { - - // periodically give status for test output logs - - time_t now = time(NULL); - if ((now - last_log) >= 10) { - fprintf(stdout, - "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64 - " Released:%"PRIu64" Modified:%"PRIu64" Limit:%"PRIu64"\n", - count, accepted, rejected, released, modified, limit); - fflush(stdout); - last_log = now; + if (pn_conn) { + debug("Stop detected - closing connection...\n"); + if (pn_link) pn_link_close(pn_link); + if (pn_ssn) pn_session_close(pn_ssn); + pn_connection_close(pn_conn); + pn_link = 0; + pn_ssn = 0; + pn_conn = 0; } } } - if (pn_link) pn_link_free(pn_link); - if (pn_ssn) pn_session_free(pn_ssn); - if (pn_conn) pn_connection_close(pn_conn); - - pn_reactor_free(reactor); + pn_proactor_free(proactor); if (verbose) { fprintf(stdout, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org