This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit d08e4a22e86610243dab1e5c08a8fd4c1c0d001b Author: Clifford Jansen <cliffjan...@apache.org> AuthorDate: Sun Mar 19 23:00:12 2023 -0700 PROTON-2695: epoll raw connections - reschedule task to finish unprocessed events from an event batch. --- c/src/proactor/epoll_raw_connection.c | 28 ++++++++++- c/tests/CMakeLists.txt | 2 +- c/tests/raw_connection_test.cpp | 87 +++++++++++++++++++++++++++++++++++ 3 files changed, 114 insertions(+), 3 deletions(-) diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index 919a4b808..cb61d6a75 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -50,6 +50,7 @@ struct praw_connection_t { struct addrinfo *ai; /* Current connect address */ bool connected; bool disconnected; + bool batch_empty; }; static void psocket_error(praw_connection_t *rc, int err, const char* msg) { @@ -317,7 +318,10 @@ static pn_event_t *pni_raw_batch_next(pn_event_batch_t *batch) { unlock(&rc->task.mutex); if (waking) pni_raw_wake(raw); - return pni_raw_event_next(raw); + pn_event_t *e = pni_raw_event_next(raw); + if (!e || pn_event_type(e) == PN_RAW_CONNECTION_DISCONNECTED) + rc->batch_empty = true; + return e; } task_t *pni_psocket_raw_task(psocket_t* ps) { @@ -373,6 +377,7 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool if (rc->disconnected) { pni_raw_connect_failed(&rc->raw_connection); unlock(&rc->task.mutex); + rc->batch_empty = false; return &rc->batch; } if (events & (EPOLLHUP | EPOLLERR)) { @@ -398,19 +403,38 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool if (wake) pni_raw_wake(&rc->raw_connection); if (events & EPOLLIN) pni_raw_read(&rc->raw_connection, fd, rcv, set_error); if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd, set_error); + rc->batch_empty = false; return &rc->batch; } void pni_raw_connection_done(praw_connection_t *rc) { bool notify = false; bool ready = false; + bool have_event = false; + + // If !batch_empty, can't be sure state machine up to date, so reschedule task if necessary. + if (!rc->batch_empty) { + if (pn_collector_peek(rc->raw_connection.collector)) + have_event = true; + else { + pn_event_t *e = pni_raw_event_next(&rc->raw_connection); + // State machine up to date. + if (e) { + have_event = true; + // Sole event. Can put back without order issues. + // Edge case, performance not important. + pn_collector_put(rc->raw_connection.collector, pn_event_class(e), pn_event_context(e), pn_event_type(e)); + } + } + } + lock(&rc->task.mutex); pn_proactor_t *p = rc->task.proactor; tslot_t *ts = rc->task.runner; rc->task.working = false; - notify = pni_task_wake_pending(&rc->task) && schedule(&rc->task); // The task may be in the ready state even if we've got no raw connection // wakes outstanding because we dealt with it already in pni_raw_batch_next() + notify = (pni_task_wake_pending(&rc->task) || have_event) && schedule(&rc->task); ready = rc->task.ready; unlock(&rc->task.mutex); diff --git a/c/tests/CMakeLists.txt b/c/tests/CMakeLists.txt index 641ba3c75..8ff0d8da9 100644 --- a/c/tests/CMakeLists.txt +++ b/c/tests/CMakeLists.txt @@ -79,7 +79,7 @@ if (CMAKE_CXX_COMPILER) add_c_test(c-proactor-test pn_test_proactor.cpp proactor_test.cpp) target_link_libraries(c-proactor-test qpid-proton-core qpid-proton-proactor ${PLATFORM_LIBS}) - add_c_test(c-raw-connection-test raw_connection_test.cpp $<TARGET_OBJECTS:qpid-proton-proactor-objects>) + add_c_test(c-raw-connection-test raw_connection_test.cpp pn_test_proactor.cpp $<TARGET_OBJECTS:qpid-proton-proactor-objects>) target_link_libraries(c-raw-connection-test qpid-proton-core ${PLATFORM_LIBS} ${PROACTOR_LIBS}) add_c_test(c-ssl-proactor-test pn_test_proactor.cpp ssl_proactor_test.cpp) diff --git a/c/tests/raw_connection_test.cpp b/c/tests/raw_connection_test.cpp index 0f31c4910..9f5a9b72e 100644 --- a/c/tests/raw_connection_test.cpp +++ b/c/tests/raw_connection_test.cpp @@ -832,3 +832,90 @@ TEST_CASE("raw connection") { } } } + +// WAKE tests require a running proactor. + +#include "../src/proactor/proactor-internal.h" +#include "./pn_test_proactor.hpp" +#include <proton/event.h> +#include <proton/listener.h> + +namespace { + +class common_handler : public handler { + handler *accept_; // Handler for accepted connections + bool close_on_wake_; + pn_raw_connection_t *last_server_; + +public: + explicit common_handler(handler *accept = 0) : accept_(accept), close_on_wake_(false), last_server_(0) {} + + void set_close_on_wake(bool b) { close_on_wake_ = b; } + + pn_raw_connection_t *last_server() { return last_server_; } + + bool handle(pn_event_t *e) override { + switch (pn_event_type(e)) { + /* Always stop on these noteworthy events */ + case PN_LISTENER_OPEN: + case PN_LISTENER_CLOSE: + case PN_PROACTOR_INACTIVE: + return true; + + case PN_LISTENER_ACCEPT: { + listener = pn_event_listener(e); + pn_raw_connection_t *rc = pn_raw_connection(); + pn_listener_raw_accept(listener, rc); + last_server_ = rc; + return false; + } break; + + case PN_RAW_CONNECTION_WAKE: { + if (close_on_wake_) { + pn_raw_connection_t *rc = pn_event_raw_connection(e); + pn_raw_connection_close(rc); + } + return true; + } break; + + + default: + return false; + } + } +}; + + +} // namespace + +// Test waking up a connection that is idle +TEST_CASE("proactor_raw_connection_wake") { + common_handler h; + proactor p(&h); + pn_listener_t *l = p.listen(); + REQUIRE_RUN(p, PN_LISTENER_OPEN); + + pn_raw_connection_t *rc = pn_raw_connection(); + std::string addr = ":" + pn_test::listening_port(l); + pn_proactor_raw_connect(pn_listener_proactor(l), rc, addr.c_str()); + + + REQUIRE_RUN(p, PN_LISTENER_ACCEPT); + REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS); + REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS); + CHECK(pn_proactor_get(p) == NULL); /* idle */ + pn_raw_connection_wake(rc); + REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE); + CHECK(pn_proactor_get(p) == NULL); /* idle */ + + h.set_close_on_wake(true); + pn_raw_connection_wake(rc); + REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE); + REQUIRE_RUN(p, PN_RAW_CONNECTION_DISCONNECTED); + pn_raw_connection_wake(h.last_server()); + REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE); + REQUIRE_RUN(p, PN_RAW_CONNECTION_DISCONNECTED); + pn_listener_close(l); + REQUIRE_RUN(p, PN_LISTENER_CLOSE); + REQUIRE_RUN(p, PN_PROACTOR_INACTIVE); +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org