(qpid-proton) branch main updated: PROTON-2790: finer grained session flow control
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 60ab050bd PROTON-2790: finer grained session flow control 60ab050bd is described below commit 60ab050bd4da40fd845a0a329bb134bdb4e3903a Author: Clifford Jansen AuthorDate: Tue Oct 8 17:22:34 2024 -0700 PROTON-2790: finer grained session flow control --- c/docs/buffering.md| 2 +- c/include/proton/session.h | 82 +++ c/include/proton/transport.h | 8 +- c/include/proton/types.h | 7 + c/src/core/engine-internal.h | 6 + c/src/core/engine.c| 86 ++- c/src/core/transport.c | 75 +++--- c/tests/connection_driver_test.cpp | 7 +- c/tests/engine_test.cpp| 290 + 9 files changed, 533 insertions(+), 30 deletions(-) diff --git a/c/docs/buffering.md b/c/docs/buffering.md index 71dafbf7b..32567a2f0 100644 --- a/c/docs/buffering.md +++ b/c/docs/buffering.md @@ -16,7 +16,7 @@ gets a @ref PN_LINK_FLOW event. The AMQP protocol allows peers to exchange session limits so they can predict their buffering requirements for incoming data ( -`pn_session_set_incoming_capacity()` and +`pn_session_set_incoming_incoming_window_and_lwm()` and `pn_session_set_outgoing_window()`). Proton will not exceed those limits when sending to or receiving from the peer. However proton does *not* limit the amount of data buffered in local memory at the request of the application. It diff --git a/c/include/proton/session.h b/c/include/proton/session.h index e09d41113..ac30ccd2b 100644 --- a/c/include/proton/session.h +++ b/c/include/proton/session.h @@ -194,6 +194,8 @@ PN_EXTERN void pn_session_open(pn_session_t *session); PN_EXTERN void pn_session_close(pn_session_t *session); /** + * **Deprecated** - Use ::pn_session_incoming_window(). + * * Get the incoming capacity of the session measured in bytes. * * The incoming capacity of a session determines how much incoming @@ -205,6 +207,8 @@ PN_EXTERN void pn_session_close(pn_session_t *session); PN_EXTERN size_t pn_session_get_incoming_capacity(pn_session_t *session); /** + * **Deprecated** - Use ::pn_session_set_incoming_window_and_lwm(). + * * Set the incoming capacity for a session object. * * The incoming capacity of a session determines how much incoming message @@ -223,6 +227,84 @@ PN_EXTERN size_t pn_session_get_incoming_capacity(pn_session_t *session); */ PN_EXTERN void pn_session_set_incoming_capacity(pn_session_t *session, size_t capacity); +/** + * Get the maximum incoming window window for a session object. + * + * The maximum incoming window can be set by ::pn_session_set_incoming_window_and_lwm. + * + * @param[in] session the session object + * @return the maximum size of the incoming window or 0 if not set. + **/ +PN_EXTERN pn_frame_count_t pn_session_incoming_window(pn_session_t *session); + +/** + * Get the low water mark for the session incoming window. + * + * The low water mark governs how frequently the session updates the remote + * peer with changes to the incoming window. + * + * A value of zero indicates that Proton will choose a default strategy for + * updating the peer. + * + * The low water mark can be set by ::pn_session_set_incoming_window_and_lwm. + * + * @param[in] session the session object + * @return the low water mark of incoming window. + **/ +PN_EXTERN pn_frame_count_t pn_session_incoming_window_lwm(pn_session_t *session); + +/** + * Set the maximum incoming window and low water mark for a session object. + * + * The session incoming window is a count of the number of AMQP transfer frames + * that can be accepted and buffered locally by the session object until + * processed by the application (i.e. consumed by ::pn_link_recv or dropped by + * ::pn_link_advance). The maximum bytes buffered by the session will never + * exceed (max_incoming_window * max_frame_size). The incoming window frame count + * decreases 1-1 with incoming AMQP transfer frames. Whenever the application + * processes the buffered incoming bytes, the incoming window increases to the + * largest frame count that can be used by the peer without causing the local + * buffered bytes to exceed the maximum stated above. + * + * The session will defer updating the peer with a changed incoming window until + * it drops below the low water mark (lwm). Too many updates can delay + * other traffic on the connection without providing improved performance on the + * session. Too few can leave a remote sender frequently unable to send due + * to a closed window. The best balance is application specific. Note that the + * session incoming window is always updated along with the link credit on any + * of its child links, so the frequency of link
(qpid-proton) branch main updated: PROTON-2832: proactor raw connection shutdown race with simultaneous epoll event
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new bbe9a4b15 PROTON-2832: proactor raw connection shutdown race with simultaneous epoll event bbe9a4b15 is described below commit bbe9a4b15476060beccff32fde8cda667a204dc0 Author: Clifford Jansen AuthorDate: Thu Aug 1 10:03:49 2024 -0700 PROTON-2832: proactor raw connection shutdown race with simultaneous epoll event --- c/src/proactor/epoll_raw_connection.c | 42 ++- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index 350c16ba8..10870ffe7 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -48,6 +48,8 @@ struct praw_connection_t { pn_event_batch_t batch; struct addrinfo *addrinfo; /* Resolved address list */ struct addrinfo *ai; /* Current connect address */ + int current_arm; /* Active epoll io events */ + bool armed; bool connected; bool disconnected; bool hup_detected; @@ -101,7 +103,8 @@ static void praw_connection_start(praw_connection_t *prc, int fd) { pclosefd(prc->task.proactor, fd); } ee->fd = fd; - ee->wanted = EPOLLIN | EPOLLOUT; + prc->current_arm = ee->wanted = EPOLLIN | EPOLLOUT; + prc->armed = true; start_polling(ee, efd); // TODO: check for error } @@ -148,6 +151,8 @@ static void praw_connection_init(praw_connection_t *prc, pn_proactor_t *p, pn_ra prc->connected = false; prc->disconnected = false; prc->first_schedule = false; + prc->armed = false; + prc->current_arm = 0; prc->taddr = NULL; prc->batch.next_event = pni_raw_batch_next; @@ -173,6 +178,17 @@ static void praw_connection_cleanup(praw_connection_t *prc) { // else proactor_disconnect logic owns prc and its final free } +static void praw_initiate_cleanup(praw_connection_t *prc) { + if (prc->armed) { +// Possible race with epoll event. Wait for it to clear. +// Force EPOLLHUP callback if not already pending. +shutdown(prc->psocket.epoll_io.fd, SHUT_RDWR); +return; + } + pni_raw_finalize(&prc->raw_connection); + praw_connection_cleanup(prc); +} + pn_raw_connection_t *pn_raw_connection(void) { praw_connection_t *conn = (praw_connection_t*) calloc(1, sizeof(praw_connection_t)); if (!conn) return NULL; @@ -400,10 +416,13 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool else pni_task_wake_done(&rc->task); // Complete task wake without event. } + if (io_events) { +rc->armed = false; +rc->current_arm = 0; + } if (pni_raw_finished(&rc->raw_connection)) { unlock(&rc->task.mutex); -pni_raw_finalize(&rc->raw_connection); -praw_connection_cleanup(rc); +praw_initiate_cleanup(rc); return NULL; } int events = io_events; @@ -492,8 +511,7 @@ void pni_raw_connection_done(praw_connection_t *rc) { if (pni_raw_finished(raw) && !ready) { // If raw connection has no more work to do and safe to free resources, do so. -pni_raw_finalize(raw); -praw_connection_cleanup(rc); +praw_initiate_cleanup(rc); } else if (ready) { // Already scheduled to run. Skip poll. Remember if we want a read. rc->read_check = pni_raw_can_read(raw); @@ -509,12 +527,16 @@ void pni_raw_connection_done(praw_connection_t *rc) { // If wanted == 0 and hup_detected, blocking not possible, so skip arming until // application provides read buffers. if (wanted || !rc->hup_detected) { - rc->psocket.epoll_io.wanted = wanted; - rearm_polling(&rc->psocket.epoll_io, p->epollfd); // TODO: check for error + // Arm only if there is a change. + if (!rc->armed || (wanted != rc->current_arm)) { +rc->psocket.epoll_io.wanted = rc->current_arm = wanted; +rc->armed = true; +rearm_polling(&rc->psocket.epoll_io, p->epollfd); // TODO: check for error + } } } - // praw_connection_cleanup() may have been called above. Can no longer touch rc or raw. + // praw_initiate_cleanup() may have been called above. Can no longer touch rc or raw. lock(&p->sched_mutex); tslot_t *resume_thread; @@ -525,6 +547,6 @@ void pni_raw_connection_done(praw_connection_t *rc) { } void pni_raw_connection_forced_shutdown(praw_connection_t *rc) { - pni_raw_finalize(&rc->raw_connection); - praw_connection_cleanup(rc); + rc->armed = false; // Tear down. No epoll event callbacks. + praw_initiate_cleanup(rc); } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
(qpid-proton) branch main updated: PROTON-2818: Move epoll proctor connection logic to a task thread.
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 36fe831ec PROTON-2818: Move epoll proctor connection logic to a task thread. 36fe831ec is described below commit 36fe831ec8df52dea56ae04eec02304f2572f13a Author: Clifford Jansen AuthorDate: Sun May 12 11:04:19 2024 -0700 PROTON-2818: Move epoll proctor connection logic to a task thread. --- c/src/proactor/epoll-internal.h | 1 + c/src/proactor/epoll.c| 57 ++--- c/src/proactor/epoll_raw_connection.c | 69 --- c/tests/raw_wake_test.cpp | 1 - 4 files changed, 84 insertions(+), 44 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 8f765121e..550324ccd 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -238,6 +238,7 @@ typedef struct pconnection_t { bool server;/* accept, not connect */ bool tick_pending; bool queued_disconnect; /* deferred from pn_proactor_disconnect() */ + bool first_schedule; pn_condition_t *disconnect_condition; // Following values only changed by (sole) working task: uint32_t current_arm; // active epoll io events diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 7abd884ef..7714c23fe 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -810,6 +810,7 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con pc->wbuf_current = NULL; pc->hog_count = 0; pc->batch.next_event = pconnection_batch_next; + pc->first_schedule = false; if (server) { pn_transport_set_server(pc->driver.transport); @@ -1122,6 +1123,7 @@ static void write_flush(pconnection_t *pc) { static void pconnection_connected_lh(pconnection_t *pc); static void pconnection_maybe_connect_lh(pconnection_t *pc); +static bool pconnection_first_connect_lh(pconnection_t *pc); static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool sched_ready, bool topup) { bool waking = false; @@ -1139,6 +1141,17 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, } if (sched_ready) schedule_done(&pc->task); + if (pc->first_schedule) { +pc->first_schedule = false; +assert(!topup && !events); +if (!pc->queued_disconnect) { + if (pconnection_first_connect_lh(pc)) { +unlock(&pc->task.mutex); +return NULL; + } +} + } + if (topup) { // Only called by the batch owner. Does not loop, just "tops up" // once. May be back depending on hog_count. @@ -1396,6 +1409,7 @@ static void pconnection_maybe_connect_lh(pconnection_t *pc) { int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res) { + // NOTE: getaddrinfo can block on DNS lookup (PROTON-2812). struct addrinfo hints = { 0 }; hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; @@ -1416,7 +1430,27 @@ bool schedule_if_inactive(pn_proactor_t *p) { return false; } +// Call from pconnection_process with task lock held. +// Return true if the socket is connecting and there are no Proton events to deliver. +static bool pconnection_first_connect_lh(pconnection_t *pc) { + unlock(&pc->task.mutex); + // TODO: move this step to a separate worker thread that scales in response to multiple blocking DNS lookups. + int gai_error = pgetaddrinfo(pc->host, pc->port, 0, &pc->addrinfo); + lock(&pc->task.mutex); + + if (!gai_error) { +pc->ai = pc->addrinfo; +pconnection_maybe_connect_lh(pc); /* Start connection attempts */ +if (pc->psocket.epoll_io.fd != -1 && !pc->queued_disconnect && !pni_task_wake_pending(&pc->task)) + return true; + } else { +psocket_gai_error(&pc->psocket, gai_error, "connect to "); + } + return false; +} + void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, const char *addr) { + // Called from an arbitrary thread. Do setup prior to getaddrinfo, then switch to a worker thread. size_t addrlen = strlen(addr); pconnection_t *pc = (pconnection_t*) malloc(sizeof(pconnection_t)+addrlen); assert(pc); // TODO: memory safety @@ -1430,27 +1464,8 @@ void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t * lock(&pc->task.mutex); proactor_add(&pc->task); pn_connection_open(pc->driver.connection); /* Auto-open */ - - bool notify = false; - - if (pc->disconnected) { -notify = schedule(&pc->task);/* Error during initialization */ - } else { -int gai_error = pgetadd
(qpid-proton) branch main updated: NO-JIRA: placate C compiler warning with explicit cast for printf argument
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new f8fd68f19 NO-JIRA: placate C compiler warning with explicit cast for printf argument f8fd68f19 is described below commit f8fd68f194e2c6426c030a933cf6fd16df5d72ff Author: Clifford Jansen AuthorDate: Wed Apr 17 20:39:25 2024 -0700 NO-JIRA: placate C compiler warning with explicit cast for printf argument --- c/tests/threaderciser.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/c/tests/threaderciser.c b/c/tests/threaderciser.c index f51e01934..311ca78d4 100644 --- a/c/tests/threaderciser.c +++ b/c/tests/threaderciser.c @@ -423,7 +423,7 @@ static bool handle(global *g, pn_event_t *e) { if (lctx->pn_listener) { pn_netaddr_str(pn_listener_addr(lctx->pn_listener), lctx->addr, sizeof(lctx->addr)); } - debug("[%p] listening on %s", lctx->pn_listener, lctx->addr); + debug("[%p] listening on %s", (void *)lctx->pn_listener, lctx->addr); pthread_mutex_unlock(&lctx->lock); cpool_connect(&g->connections_active, g->proactor, lctx->addr); /* Initial connection */ break; - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
(qpid-proton) branch main updated: PROTON-2792: [C++] check that scheduled tasks are active under lock
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 674b0c6db PROTON-2792: [C++] check that scheduled tasks are active under lock 674b0c6db is described below commit 674b0c6dbddc3a20bab17c60381a145d7b692ff4 Author: Andrew Stitcher AuthorDate: Thu Mar 7 21:19:03 2024 -0500 PROTON-2792: [C++] check that scheduled tasks are active under lock Previously we checked whether the tasks were active without locking which was bad. --- cpp/include/proton/work_queue.hpp | 2 +- cpp/src/proactor_container_impl.cpp | 13 ++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/cpp/include/proton/work_queue.hpp b/cpp/include/proton/work_queue.hpp index e917fa3ce..defe1aaa7 100644 --- a/cpp/include/proton/work_queue.hpp +++ b/cpp/include/proton/work_queue.hpp @@ -280,7 +280,7 @@ class work { /// **Unsettled API** /// /// Execute the piece of work -void operator()() { item_(); } +void operator()() const { item_(); } ~work() = default; diff --git a/cpp/src/proactor_container_impl.cpp b/cpp/src/proactor_container_impl.cpp index e965be730..228002e38 100644 --- a/cpp/src/proactor_container_impl.cpp +++ b/cpp/src/proactor_container_impl.cpp @@ -542,9 +542,16 @@ void container::impl::run_timer_jobs() { // NB. We copied the due tasks in reverse order so execute from end for (int i = tasks.size()-1; i>=0; --i) { -if(is_active_.count(tasks[i].w_handle)) { -tasks[i].task(); -is_active_.erase(tasks[i].w_handle); +const auto& task = tasks[i]; +bool active; + +{ + GUARD(deferred_lock_); + // NB. erase returns the number of items erased + active = is_active_.erase(task.w_handle); +} +if (active) { +task.task(); } } } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 01/02: PROTON-2763: Raw connection double DISCONNECT event
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 0987726b3dd34da77bf826ac3cfb5b6fc19ba60b Author: Clifford Jansen AuthorDate: Wed Sep 13 19:06:04 2023 -0700 PROTON-2763: Raw connection double DISCONNECT event --- c/src/proactor/epoll_raw_connection.c| 16 c/src/proactor/raw_connection-internal.h | 1 + c/src/proactor/raw_connection.c | 7 ++- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index c0e731066..83f950b02 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -382,6 +382,12 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool else pni_task_wake_done(&rc->task); // Complete task wake without event. } + if (pni_raw_finished(&rc->raw_connection)) { +unlock(&rc->task.mutex); +pni_raw_finalize(&rc->raw_connection); +praw_connection_cleanup(rc); +return NULL; + } int events = io_events; int fd = rc->psocket.epoll_io.fd; if (!rc->connected) { @@ -445,18 +451,18 @@ void pni_raw_connection_done(praw_connection_t *rc) { bool have_event = pni_raw_batch_has_events(raw); lock(&rc->task.mutex); + bool wake_pending = pni_task_wake_pending(&rc->task) && pni_raw_can_wake(raw); pn_proactor_t *p = rc->task.proactor; tslot_t *ts = rc->task.runner; rc->task.working = false; // The task may be in the ready state even if we've got no raw connection // wakes outstanding because we dealt with it already in pni_raw_batch_next() - notify = (pni_task_wake_pending(&rc->task) || have_event) && schedule(&rc->task); + notify = (wake_pending || have_event) && schedule(&rc->task); ready = rc->task.ready; // No need to poll. Already scheduled. unlock(&rc->task.mutex); - bool finished_disconnect = raw->state==conn_fini && !ready && !raw->disconnectpending; - if (finished_disconnect) { -// If we're closed and we've sent the disconnect then close + if (pni_raw_finished(raw) && !ready) { +// If raw connection has no more work to do and safe to free resources, do so. pni_raw_finalize(raw); praw_connection_cleanup(rc); } else if (ready) { @@ -479,6 +485,8 @@ void pni_raw_connection_done(praw_connection_t *rc) { } } + // praw_connection_cleanup() may have been called above. Can no longer touch rc or raw. + lock(&p->sched_mutex); tslot_t *resume_thread; notify |= unassign_thread(p, ts, UNUSED, &resume_thread); diff --git a/c/src/proactor/raw_connection-internal.h b/c/src/proactor/raw_connection-internal.h index cdfd5a852..fe0e29fe0 100644 --- a/c/src/proactor/raw_connection-internal.h +++ b/c/src/proactor/raw_connection-internal.h @@ -141,6 +141,7 @@ pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn); bool pni_raw_batch_has_events(pn_raw_connection_t *conn); void pni_raw_initialize(pn_raw_connection_t *conn); void pni_raw_finalize(pn_raw_connection_t *conn); +bool pni_raw_finished(pn_raw_connection_t *conn); #ifdef __cplusplus } diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c index 89fbbd1dd..0c2118e01 100644 --- a/c/src/proactor/raw_connection.c +++ b/c/src/proactor/raw_connection.c @@ -484,10 +484,15 @@ bool pni_raw_wake_is_pending(pn_raw_connection_t *conn) { } bool pni_raw_can_wake(pn_raw_connection_t *conn) { - // True if DISCONNECTED event has not yet been extracted from the batch. + // True if DISCONNECTED event has not yet been generated. return (conn->disconnect_state != disc_fini); } +bool pni_raw_finished(pn_raw_connection_t *conn) { + // True if state machine is in final state and application has consumed final DISCONNECTED event. + return (conn->disconnect_state == disc_fini && pn_collector_peek(conn->collector) == NULL); +} + void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *, int)) { assert(conn); - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 02/02: PROTON-2764: schedule failed raw connections from pn_listener_raw_accept() so they can process events and cleanup
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit c542df80c51abdc52ccde9eb7b581922563e8240 Author: Clifford Jansen AuthorDate: Wed Sep 13 19:18:24 2023 -0700 PROTON-2764: schedule failed raw connections from pn_listener_raw_accept() so they can process events and cleanup --- c/src/proactor/epoll_raw_connection.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index 83f950b02..7e4dcb4a4 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -244,10 +244,12 @@ void pn_listener_raw_accept(pn_listener_t *l, pn_raw_connection_t *rc) { praw_connection_connected_lh(prc); } else { psocket_error(prc, err, "pn_listener_accept"); +pni_raw_connect_failed(&prc->raw_connection); +notify = schedule(&prc->task); } if (!l->task.working && listener_has_event(l)) { -notify = schedule(&l->task); +notify |= schedule(&l->task); } unlock(&prc->task.mutex); unlock(&l->task.mutex); - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated (cb637b79e -> c542df80c)
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git from cb637b79e PROTON-2748: Raw connection async close fix and tests. First part of pull 402 new 0987726b3 PROTON-2763: Raw connection double DISCONNECT event new c542df80c PROTON-2764: schedule failed raw connections from pn_listener_raw_accept() so they can process events and cleanup The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: c/src/proactor/epoll_raw_connection.c| 20 +++- c/src/proactor/raw_connection-internal.h | 1 + c/src/proactor/raw_connection.c | 7 ++- 3 files changed, 22 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2748: Raw connection async close fix and tests. First part of pull 402
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new cb637b79e PROTON-2748: Raw connection async close fix and tests. First part of pull 402 cb637b79e is described below commit cb637b79ee2a4cb74c5332975c510f672c2c05fa Author: Clifford Jansen AuthorDate: Mon Sep 11 23:00:12 2023 -0700 PROTON-2748: Raw connection async close fix and tests. First part of pull 402 --- c/src/proactor/epoll_raw_connection.c| 96 --- c/src/proactor/raw_connection-internal.h | 2 + c/src/proactor/raw_connection.c | 35 ++- c/tests/raw_wake_test.cpp| 417 ++- 4 files changed, 499 insertions(+), 51 deletions(-) diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index 9b85b15f1..c0e731066 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -50,7 +50,8 @@ struct praw_connection_t { struct addrinfo *ai; /* Current connect address */ bool connected; bool disconnected; - bool batch_empty; + bool hup_detected; + bool read_check; }; static void psocket_error(praw_connection_t *rc, int err, const char* msg) { @@ -318,10 +319,7 @@ static pn_event_t *pni_raw_batch_next(pn_event_batch_t *batch) { unlock(&rc->task.mutex); if (waking) pni_raw_wake(raw); - pn_event_t *e = pni_raw_event_next(raw); - if (!e || pn_event_type(e) == PN_RAW_CONNECTION_DISCONNECTED) -rc->batch_empty = true; - return e; + return pni_raw_event_next(raw); } task_t *pni_psocket_raw_task(psocket_t* ps) { @@ -393,10 +391,10 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool if (rc->disconnected) { pni_raw_connect_failed(&rc->raw_connection); unlock(&rc->task.mutex); - rc->batch_empty = false; return &rc->batch; } if (events & (EPOLLHUP | EPOLLERR)) { + // Continuation of praw_connection_maybe_connect_lh() logic. // A wake can be the first event. Otherwise, wait for connection to complete. bool event_pending = task_wake || pni_raw_wake_is_pending(&rc->raw_connection) || pn_collector_peek(rc->raw_connection.collector); t->working = event_pending; @@ -405,35 +403,46 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool } if (events & EPOLLOUT) praw_connection_connected_lh(rc); +unlock(&rc->task.mutex); +return &rc->batch; } unlock(&rc->task.mutex); - if (events & EPOLLIN) pni_raw_read(&rc->raw_connection, fd, rcv, set_error); + if (events & EPOLLERR) { +// Read and write sides closed via RST. Tear down immediately. +int soerr; +socklen_t soerrlen = sizeof(soerr); +int ec = getsockopt(fd, SOL_SOCKET, SO_ERROR, &soerr, &soerrlen); +if (ec == 0 && soerr) { + psocket_error(rc, soerr, "async disconnect"); +} +pni_raw_async_disconnect(&rc->raw_connection); +return &rc->batch; + } + if (events & EPOLLHUP) { +rc->hup_detected = true; + } + + if (events & (EPOLLIN | EPOLLRDHUP) || rc->read_check) { +pni_raw_read(&rc->raw_connection, fd, rcv, set_error); +rc->read_check = false; + } if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd, set_error); - rc->batch_empty = false; return &rc->batch; } void pni_raw_connection_done(praw_connection_t *rc) { bool notify = false; bool ready = false; - bool have_event = false; - - // If !batch_empty, can't be sure state machine up to date, so reschedule task if necessary. - if (!rc->batch_empty) { -if (pn_collector_peek(rc->raw_connection.collector)) - have_event = true; -else { - pn_event_t *e = pni_raw_batch_next(&rc->batch); - // State machine up to date. - if (e) { -have_event = true; -// Sole event. Can put back without order issues. -// Edge case, performance not important. -pn_collector_put(rc->raw_connection.collector, pn_event_class(e), pn_event_context(e), pn_event_type(e)); - } -} - } + pn_raw_connection_t *raw = &rc->raw_connection; + int fd = rc->psocket.epoll_io.fd; + + // Try write + if (pni_raw_can_write(raw)) pni_raw_write(raw, fd, snd, set_error); + pni_raw_process_shutdown(raw, fd, shutr, shutw); + + // Update state machine and check for possible pending event. + bool have_event = pni_raw_batch_has_events(raw); lock(&rc->task.mutex); pn_proactor_t *p = rc->task.proactor; @@ -442,24 +451,31 @@ void pni_raw_connection_done(praw_connection_t *rc) { // The task
[qpid-proton] branch main updated: PROTON-2736: tls library - restore fixes backed out by mistake in previous jaeger/oltp commit
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 0585bfde4 PROTON-2736: tls library - restore fixes backed out by mistake in previous jaeger/oltp commit 0585bfde4 is described below commit 0585bfde42171e12bc209537f74245c898a04964 Author: Clifford Jansen AuthorDate: Mon May 29 09:56:12 2023 -0700 PROTON-2736: tls library - restore fixes backed out by mistake in previous jaeger/oltp commit --- c/src/tls/openssl.c | 38 -- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/c/src/tls/openssl.c b/c/src/tls/openssl.c index 8c96a35d6..0c3b6bd66 100644 --- a/c/src/tls/openssl.c +++ b/c/src/tls/openssl.c @@ -2112,7 +2112,8 @@ static void decrypt(pn_tls_t *tls) { pbuffer_t *pending = next_decrypt_pending(tls); bool peek_needed = false; - while (true) { + bool decrypt_done = false; + while (!decrypt_done) { if (tls->pn_tls_err) return; @@ -2166,23 +2167,32 @@ static void decrypt(pn_tls_t *tls) { } // Done if not possible to move any more bytes from input to output bufs -if (tls->dec_closed) break; -if ((!pending || tls->dec_wblocked) // write side -&& (!curr_result || tls->dec_rblocked)) // read side - break; - } +if ( (tls->dec_closed || !pending || tls->dec_wblocked) /* write side */ && + (!curr_result || tls->dec_rblocked) ) /* read side */ { + decrypt_done = true; + if (peek_needed && !tls->pn_tls_err && !tls->dec_closed) { +// Set dec_rpending. +// Make OpenSSL process input to at least first decrypted byte (if any) +char unused; +int pcount = SSL_peek(tls->ssl, &unused, 1); +tls->dec_rpending = (pcount == 1); +if (pcount <= 0) { + check_error_reason(tls, pcount); +} - if (!tls->pn_tls_err && peek_needed) { -// Make OpenSSL examine the next buffered TLS record (if exists and complete) -char unused; -int pcount = SSL_peek(tls->ssl, &unused, 1); -tls->dec_rpending = (pcount == 1); -if (pcount <= 0) { - check_error_reason(tls, pcount); +// Peek may have made more room in buffer (i.e. handshake followed by large +// incomplete application record and dec_wblocked). If we did not process an +// application record, we must have processed at least one non-app record. +// No longer write blocked after peek. PROTON-2736. +if (!tls->dec_rpending && tls->dec_wblocked) { + decrypt_done = false; + tls->dec_wblocked = false; +} + } } } - if (!tls->pn_tls_err && !tls->handshake_ok && SSL_do_handshake(tls->ssl) == 1) { + if (!tls->handshake_ok && SSL_do_handshake(tls->ssl) == 1) { tls->handshake_ok = true; tls->can_shutdown = true; } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2736: tls library - stay in decrypt loop long enough to finalize reads and state vars
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new e637082ac PROTON-2736: tls library - stay in decrypt loop long enough to finalize reads and state vars e637082ac is described below commit e637082ac62c6caefbc3ac94ef01e98a3c0b5902 Author: Clifford Jansen AuthorDate: Sun May 14 16:16:26 2023 -0700 PROTON-2736: tls library - stay in decrypt loop long enough to finalize reads and state vars --- c/src/tls/openssl.c | 5 - 1 file changed, 5 deletions(-) diff --git a/c/src/tls/openssl.c b/c/src/tls/openssl.c index 2ed4d5298..0c3b6bd66 100644 --- a/c/src/tls/openssl.c +++ b/c/src/tls/openssl.c @@ -2166,9 +2166,6 @@ static void decrypt(pn_tls_t *tls) { } } -if (tls->pn_tls_err || tls->dec_closed) - return; - // Done if not possible to move any more bytes from input to output bufs if ( (tls->dec_closed || !pending || tls->dec_wblocked) /* write side */ && (!curr_result || tls->dec_rblocked) ) /* read side */ { @@ -2181,8 +2178,6 @@ static void decrypt(pn_tls_t *tls) { tls->dec_rpending = (pcount == 1); if (pcount <= 0) { check_error_reason(tls, pcount); - if (tls->pn_tls_err || tls->dec_closed) -return; } // Peek may have made more room in buffer (i.e. handshake followed by large - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2736: tls library - catch unblocking of decrypt stream for processed non-application records
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 939a48f42 PROTON-2736: tls library - catch unblocking of decrypt stream for processed non-application records 939a48f42 is described below commit 939a48f420aa4c68123132d582c7217625e20d38 Author: Clifford Jansen AuthorDate: Sun May 14 11:57:23 2023 -0700 PROTON-2736: tls library - catch unblocking of decrypt stream for processed non-application records --- c/src/tls/openssl.c | 43 +-- 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/c/src/tls/openssl.c b/c/src/tls/openssl.c index 8c96a35d6..2ed4d5298 100644 --- a/c/src/tls/openssl.c +++ b/c/src/tls/openssl.c @@ -2112,7 +2112,8 @@ static void decrypt(pn_tls_t *tls) { pbuffer_t *pending = next_decrypt_pending(tls); bool peek_needed = false; - while (true) { + bool decrypt_done = false; + while (!decrypt_done) { if (tls->pn_tls_err) return; @@ -2165,24 +2166,38 @@ static void decrypt(pn_tls_t *tls) { } } +if (tls->pn_tls_err || tls->dec_closed) + return; + // Done if not possible to move any more bytes from input to output bufs -if (tls->dec_closed) break; -if ((!pending || tls->dec_wblocked) // write side -&& (!curr_result || tls->dec_rblocked)) // read side - break; - } +if ( (tls->dec_closed || !pending || tls->dec_wblocked) /* write side */ && + (!curr_result || tls->dec_rblocked) ) /* read side */ { + decrypt_done = true; + if (peek_needed && !tls->pn_tls_err && !tls->dec_closed) { +// Set dec_rpending. +// Make OpenSSL process input to at least first decrypted byte (if any) +char unused; +int pcount = SSL_peek(tls->ssl, &unused, 1); +tls->dec_rpending = (pcount == 1); +if (pcount <= 0) { + check_error_reason(tls, pcount); + if (tls->pn_tls_err || tls->dec_closed) +return; +} - if (!tls->pn_tls_err && peek_needed) { -// Make OpenSSL examine the next buffered TLS record (if exists and complete) -char unused; -int pcount = SSL_peek(tls->ssl, &unused, 1); -tls->dec_rpending = (pcount == 1); -if (pcount <= 0) { - check_error_reason(tls, pcount); +// Peek may have made more room in buffer (i.e. handshake followed by large +// incomplete application record and dec_wblocked). If we did not process an +// application record, we must have processed at least one non-app record. +// No longer write blocked after peek. PROTON-2736. +if (!tls->dec_rpending && tls->dec_wblocked) { + decrypt_done = false; + tls->dec_wblocked = false; +} + } } } - if (!tls->pn_tls_err && !tls->handshake_ok && SSL_do_handshake(tls->ssl) == 1) { + if (!tls->handshake_ok && SSL_do_handshake(tls->ssl) == 1) { tls->handshake_ok = true; tls->can_shutdown = true; } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2699: disable fdlimit test by default
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new af4382b85 PROTON-2699: disable fdlimit test by default af4382b85 is described below commit af4382b85fac30a5a352fd0057d67a04113f610d Author: Clifford Jansen AuthorDate: Wed Mar 29 23:27:10 2023 -0700 PROTON-2699: disable fdlimit test by default --- c/tests/CMakeLists.txt | 23 +++ 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/c/tests/CMakeLists.txt b/c/tests/CMakeLists.txt index 2645ba5f8..83f6e2213 100644 --- a/c/tests/CMakeLists.txt +++ b/c/tests/CMakeLists.txt @@ -115,14 +115,21 @@ if (CMAKE_CXX_COMPILER) set(pypath "${CMAKE_SOURCE_DIR}/tests/py") -# unset TEST_EXE_PREFIX as valgrind does not run successfully when fds are limited -pn_add_test( - UNWRAPPED - NAME c-fdlimit-tests - PREPEND_ENVIRONMENT "PATH=${path}" "PYTHONPATH=${pypath}" "${test_env}" - APPEND_ENVIRONMENT "TEST_EXE_PREFIX=" - WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} - COMMAND ${Python_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/fdlimit.py) +# Disable fdlimit test by default. Flaky throughout its history and implemntations. +# Very useful for proactor development, so keep around. +set(DEFAULT_FDLIMIT OFF) +option(FDLIMIT "Run the fdlimit recovery test" ${DEFAULT_FDLIMIT}) +if (FDLIMIT) + # unset TEST_EXE_PREFIX as valgrind does not run successfully when fds are limited + pn_add_test( +UNWRAPPED +NAME c-fdlimit-tests +PREPEND_ENVIRONMENT "PATH=${path}" "PYTHONPATH=${pypath}" "${test_env}" +APPEND_ENVIRONMENT "TEST_EXE_PREFIX=" +WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} +COMMAND ${Python_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/fdlimit.py) +endif() + endif(HAS_PROACTOR) if(HAS_TLS) - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2695: separate raw wake test to run only on proactors with raw wake support
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new dba4a2e33 PROTON-2695: separate raw wake test to run only on proactors with raw wake support dba4a2e33 is described below commit dba4a2e33282b1a4cba2aa68ff1aeea9013b4f6d Author: Clifford Jansen AuthorDate: Mon Mar 27 09:27:40 2023 -0700 PROTON-2695: separate raw wake test to run only on proactors with raw wake support --- c/tests/CMakeLists.txt | 7 ++- c/tests/raw_connection_test.cpp | 86 c/tests/raw_wake_test.cpp | 121 3 files changed, 127 insertions(+), 87 deletions(-) diff --git a/c/tests/CMakeLists.txt b/c/tests/CMakeLists.txt index 8ff0d8da9..2645ba5f8 100644 --- a/c/tests/CMakeLists.txt +++ b/c/tests/CMakeLists.txt @@ -79,9 +79,14 @@ if (CMAKE_CXX_COMPILER) add_c_test(c-proactor-test pn_test_proactor.cpp proactor_test.cpp) target_link_libraries(c-proactor-test qpid-proton-core qpid-proton-proactor ${PLATFORM_LIBS}) -add_c_test(c-raw-connection-test raw_connection_test.cpp pn_test_proactor.cpp $) +add_c_test(c-raw-connection-test raw_connection_test.cpp $) target_link_libraries(c-raw-connection-test qpid-proton-core ${PLATFORM_LIBS} ${PROACTOR_LIBS}) +if (PROACTOR_OK STREQUAL "epoll") + add_c_test(c-raw-wake-test raw_wake_test.cpp pn_test_proactor.cpp $) + target_link_libraries(c-raw-wake-test qpid-proton-core ${PLATFORM_LIBS} ${PROACTOR_LIBS}) +endif() + add_c_test(c-ssl-proactor-test pn_test_proactor.cpp ssl_proactor_test.cpp) target_link_libraries(c-ssl-proactor-test qpid-proton-core qpid-proton-proactor ${PLATFORM_LIBS}) diff --git a/c/tests/raw_connection_test.cpp b/c/tests/raw_connection_test.cpp index 39c7d4e32..0f31c4910 100644 --- a/c/tests/raw_connection_test.cpp +++ b/c/tests/raw_connection_test.cpp @@ -832,89 +832,3 @@ TEST_CASE("raw connection") { } } } - -// WAKE tests require a running proactor. - -#include "../src/proactor/proactor-internal.h" -#include "./pn_test_proactor.hpp" -#include -#include - -namespace { - -class common_handler : public handler { - bool close_on_wake_; - pn_raw_connection_t *last_server_; - -public: - explicit common_handler() : close_on_wake_(false), last_server_(0) {} - - void set_close_on_wake(bool b) { close_on_wake_ = b; } - - pn_raw_connection_t *last_server() { return last_server_; } - - bool handle(pn_event_t *e) override { -switch (pn_event_type(e)) { - /* Always stop on these noteworthy events */ -case PN_LISTENER_OPEN: -case PN_LISTENER_CLOSE: -case PN_PROACTOR_INACTIVE: - return true; - -case PN_LISTENER_ACCEPT: { - listener = pn_event_listener(e); - pn_raw_connection_t *rc = pn_raw_connection(); - pn_listener_raw_accept(listener, rc); - last_server_ = rc; - return false; -} break; - -case PN_RAW_CONNECTION_WAKE: { - if (close_on_wake_) { -pn_raw_connection_t *rc = pn_event_raw_connection(e); -pn_raw_connection_close(rc); - } - return true; -} break; - - -default: - return false; -} - } -}; - - -} // namespace - -// Test waking up a connection that is idle -TEST_CASE("proactor_raw_connection_wake") { - common_handler h; - proactor p(&h); - pn_listener_t *l = p.listen(); - REQUIRE_RUN(p, PN_LISTENER_OPEN); - - pn_raw_connection_t *rc = pn_raw_connection(); - std::string addr = ":" + pn_test::listening_port(l); - pn_proactor_raw_connect(pn_listener_proactor(l), rc, addr.c_str()); - - - REQUIRE_RUN(p, PN_LISTENER_ACCEPT); - REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS); - REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS); - CHECK(pn_proactor_get(p) == NULL); /* idle */ -pn_raw_connection_wake(rc); - REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE); - CHECK(pn_proactor_get(p) == NULL); /* idle */ - - h.set_close_on_wake(true); - pn_raw_connection_wake(rc); - REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE); - REQUIRE_RUN(p, PN_RAW_CONNECTION_DISCONNECTED); - pn_raw_connection_wake(h.last_server()); - REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE); - REQUIRE_RUN(p, PN_RAW_CONNECTION_DISCONNECTED); - pn_listener_close(l); - REQUIRE_RUN(p, PN_LISTENER_CLOSE); - REQUIRE_RUN(p, PN_PROACTOR_INACTIVE); -} diff --git a/c/tests/raw_wake_test.cpp b/c/tests/raw_wake_test.cpp new file mode 100644 index 0..4a5dc23d3 --- /dev/null +++ b/c/tests/raw_wake_test.cpp @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under
[qpid-proton] branch main updated: PROTON-2696: fix clang warning/error on new raw connection test
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 229ae1dff PROTON-2696: fix clang warning/error on new raw connection test 229ae1dff is described below commit 229ae1dffc0b3999b50005767ad92a051fd8fe93 Author: Clifford Jansen AuthorDate: Fri Mar 24 11:47:38 2023 -0700 PROTON-2696: fix clang warning/error on new raw connection test --- c/tests/raw_connection_test.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/c/tests/raw_connection_test.cpp b/c/tests/raw_connection_test.cpp index 9f5a9b72e..39c7d4e32 100644 --- a/c/tests/raw_connection_test.cpp +++ b/c/tests/raw_connection_test.cpp @@ -843,12 +843,11 @@ TEST_CASE("raw connection") { namespace { class common_handler : public handler { - handler *accept_; // Handler for accepted connections bool close_on_wake_; pn_raw_connection_t *last_server_; public: - explicit common_handler(handler *accept = 0) : accept_(accept), close_on_wake_(false), last_server_(0) {} + explicit common_handler() : close_on_wake_(false), last_server_(0) {} void set_close_on_wake(bool b) { close_on_wake_ = b; } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated (34b4d930d -> f13bb8132)
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git from 34b4d930d PROTON-2691: Fix -Wstrict-prototypes compile warning from Clang (#389) new d08e4a22e PROTON-2695: epoll raw connections - reschedule task to finish unprocessed events from an event batch. new 47b958f27 PROTON-2673: epoll raw connections - allow delivery of wake events prior to successful connect. new f13bb8132 PROTON-2673: clarify doc for pn_raw_connection_wake() The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: c/include/proton/raw_connection.h| 11 +++- c/src/proactor/epoll_raw_connection.c| 60 +++--- c/src/proactor/raw_connection-internal.h | 2 + c/src/proactor/raw_connection.c | 12 - c/tests/CMakeLists.txt | 2 +- c/tests/raw_connection_test.cpp | 87 6 files changed, 154 insertions(+), 20 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 02/03: PROTON-2673: epoll raw connections - allow delivery of wake events prior to successful connect.
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 47b958f271ea0637046c9b85de4690bd4dbebb1d Author: Clifford Jansen AuthorDate: Thu Mar 23 17:05:49 2023 -0700 PROTON-2673: epoll raw connections - allow delivery of wake events prior to successful connect. --- c/src/proactor/epoll_raw_connection.c| 34 +--- c/src/proactor/raw_connection-internal.h | 2 ++ c/src/proactor/raw_connection.c | 12 ++- 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index cb61d6a75..56bebee85 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -367,7 +367,18 @@ static void set_error(pn_raw_connection_t *conn, const char *msg, int err) { pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool sched_ready) { praw_connection_t *rc = containerof(t, praw_connection_t, task); + bool task_wake = false; + bool can_wake = pni_raw_can_wake(&rc->raw_connection); lock(&rc->task.mutex); + t->working = true; + if (sched_ready) +schedule_done(t); + if (pni_task_wake_pending(&rc->task)) { +if (can_wake) + task_wake = true; // batch_next() will complete the task wake. +else + pni_task_wake_done(&rc->task); // Complete task wake without event. + } int events = io_events; int fd = rc->psocket.epoll_io.fd; if (!rc->connected) { @@ -381,26 +392,17 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool return &rc->batch; } if (events & (EPOLLHUP | EPOLLERR)) { + // A wake can be the first event. Otherwise, wait for connection to complete. + bool event_pending = task_wake || pni_raw_wake_is_pending(&rc->raw_connection) || pn_collector_peek(rc->raw_connection.collector); + t->working = event_pending; unlock(&rc->task.mutex); - return NULL; + return event_pending ? &rc->batch : NULL; } -praw_connection_connected_lh(rc); +if (events & EPOLLOUT) + praw_connection_connected_lh(rc); } unlock(&rc->task.mutex); - bool wake = false; - lock(&t->mutex); - t->working = true; - if (sched_ready) { -schedule_done(t); -if (pni_task_wake_pending(&rc->task)) { - wake = true; - pni_task_wake_done(&rc->task); -} - } - unlock(&t->mutex); - - if (wake) pni_raw_wake(&rc->raw_connection); if (events & EPOLLIN) pni_raw_read(&rc->raw_connection, fd, rcv, set_error); if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd, set_error); rc->batch_empty = false; @@ -417,7 +419,7 @@ void pni_raw_connection_done(praw_connection_t *rc) { if (pn_collector_peek(rc->raw_connection.collector)) have_event = true; else { - pn_event_t *e = pni_raw_event_next(&rc->raw_connection); + pn_event_t *e = pni_raw_batch_next(&rc->batch); // State machine up to date. if (e) { have_event = true; diff --git a/c/src/proactor/raw_connection-internal.h b/c/src/proactor/raw_connection-internal.h index 218bf2b2d..47b0ea925 100644 --- a/c/src/proactor/raw_connection-internal.h +++ b/c/src/proactor/raw_connection-internal.h @@ -126,6 +126,8 @@ bool pni_raw_validate(pn_raw_connection_t *conn); void pni_raw_connected(pn_raw_connection_t *conn); void pni_raw_connect_failed(pn_raw_connection_t *conn); void pni_raw_wake(pn_raw_connection_t *conn); +bool pni_raw_wake_is_pending(pn_raw_connection_t *conn); +bool pni_raw_can_wake(pn_raw_connection_t *conn); void pni_raw_close(pn_raw_connection_t *conn); void pni_raw_read_close(pn_raw_connection_t *conn); void pni_raw_write_close(pn_raw_connection_t *conn); diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c index a7aa21d11..fd633a284 100644 --- a/c/src/proactor/raw_connection.c +++ b/c/src/proactor/raw_connection.c @@ -475,7 +475,17 @@ void pni_raw_connect_failed(pn_raw_connection_t *conn) { } void pni_raw_wake(pn_raw_connection_t *conn) { - conn->wakepending = true; + if (conn->disconnect_state != disc_fini) +conn->wakepending = true; +} + +bool pni_raw_wake_is_pending(pn_raw_connection_t *conn) { + return conn->wakepending; +} + +bool pni_raw_can_wake(pn_raw_connection_t *conn) { + // True if DISCONNECTED event has not yet been extracted from the batch. + return (conn->disconnect_state != disc_fini); } void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *, int)) { - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 03/03: PROTON-2673: clarify doc for pn_raw_connection_wake()
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit f13bb8132d9e744c4bc4ff66d283f9d4e34965f8 Author: Clifford Jansen AuthorDate: Thu Mar 23 23:53:12 2023 -0700 PROTON-2673: clarify doc for pn_raw_connection_wake() --- c/include/proton/raw_connection.h | 11 ++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/c/include/proton/raw_connection.h b/c/include/proton/raw_connection.h index d19ff6dc5..ef5fd33a7 100644 --- a/c/include/proton/raw_connection.h +++ b/c/include/proton/raw_connection.h @@ -244,10 +244,19 @@ PNP_EXTERN bool pn_raw_connection_is_write_closed(pn_raw_connection_t *connectio * Return a @ref PN_RAW_CONNECTION_WAKE event for @p connection as soon as possible. * * At least one wake event will be returned, serialized with other @ref proactor_events - * for the same raw connection. Wakes can be "coalesced" - if several + * for the same raw connection, except as noted. Wakes can be "coalesced" - if several * @ref pn_raw_connection_wake() calls happen close together, there may be only one * @ref PN_RAW_CONNECTION_WAKE event that occurs after all of them. * + * A @ref PN_RAW_CONNECTION_WAKE event will never follow a + * @ref PN_RAW_CONNECTION_DISCONNECTED event. I.e. it will be dropped. + * + * The result of this call is undefined if called after a @ref PN_RAW_CONNECTION_DISCONNECTED + * event has been delivered and its event batch has been released by a call to + * @ref pn_proactor_done(). It is also undefined if called before the return of either + * @ref pn_proactor_raw_connect() or @ref pn_listener_raw_accept() for client or server + * raw connections respectively. + * * @note Thread-safe */ PNP_EXTERN void pn_raw_connection_wake(pn_raw_connection_t *connection); - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 01/03: PROTON-2695: epoll raw connections - reschedule task to finish unprocessed events from an event batch.
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit d08e4a22e86610243dab1e5c08a8fd4c1c0d001b Author: Clifford Jansen AuthorDate: Sun Mar 19 23:00:12 2023 -0700 PROTON-2695: epoll raw connections - reschedule task to finish unprocessed events from an event batch. --- c/src/proactor/epoll_raw_connection.c | 28 ++- c/tests/CMakeLists.txt| 2 +- c/tests/raw_connection_test.cpp | 87 +++ 3 files changed, 114 insertions(+), 3 deletions(-) diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index 919a4b808..cb61d6a75 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -50,6 +50,7 @@ struct praw_connection_t { struct addrinfo *ai; /* Current connect address */ bool connected; bool disconnected; + bool batch_empty; }; static void psocket_error(praw_connection_t *rc, int err, const char* msg) { @@ -317,7 +318,10 @@ static pn_event_t *pni_raw_batch_next(pn_event_batch_t *batch) { unlock(&rc->task.mutex); if (waking) pni_raw_wake(raw); - return pni_raw_event_next(raw); + pn_event_t *e = pni_raw_event_next(raw); + if (!e || pn_event_type(e) == PN_RAW_CONNECTION_DISCONNECTED) +rc->batch_empty = true; + return e; } task_t *pni_psocket_raw_task(psocket_t* ps) { @@ -373,6 +377,7 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool if (rc->disconnected) { pni_raw_connect_failed(&rc->raw_connection); unlock(&rc->task.mutex); + rc->batch_empty = false; return &rc->batch; } if (events & (EPOLLHUP | EPOLLERR)) { @@ -398,19 +403,38 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool if (wake) pni_raw_wake(&rc->raw_connection); if (events & EPOLLIN) pni_raw_read(&rc->raw_connection, fd, rcv, set_error); if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd, set_error); + rc->batch_empty = false; return &rc->batch; } void pni_raw_connection_done(praw_connection_t *rc) { bool notify = false; bool ready = false; + bool have_event = false; + + // If !batch_empty, can't be sure state machine up to date, so reschedule task if necessary. + if (!rc->batch_empty) { +if (pn_collector_peek(rc->raw_connection.collector)) + have_event = true; +else { + pn_event_t *e = pni_raw_event_next(&rc->raw_connection); + // State machine up to date. + if (e) { +have_event = true; +// Sole event. Can put back without order issues. +// Edge case, performance not important. +pn_collector_put(rc->raw_connection.collector, pn_event_class(e), pn_event_context(e), pn_event_type(e)); + } +} + } + lock(&rc->task.mutex); pn_proactor_t *p = rc->task.proactor; tslot_t *ts = rc->task.runner; rc->task.working = false; - notify = pni_task_wake_pending(&rc->task) && schedule(&rc->task); // The task may be in the ready state even if we've got no raw connection // wakes outstanding because we dealt with it already in pni_raw_batch_next() + notify = (pni_task_wake_pending(&rc->task) || have_event) && schedule(&rc->task); ready = rc->task.ready; unlock(&rc->task.mutex); diff --git a/c/tests/CMakeLists.txt b/c/tests/CMakeLists.txt index 641ba3c75..8ff0d8da9 100644 --- a/c/tests/CMakeLists.txt +++ b/c/tests/CMakeLists.txt @@ -79,7 +79,7 @@ if (CMAKE_CXX_COMPILER) add_c_test(c-proactor-test pn_test_proactor.cpp proactor_test.cpp) target_link_libraries(c-proactor-test qpid-proton-core qpid-proton-proactor ${PLATFORM_LIBS}) -add_c_test(c-raw-connection-test raw_connection_test.cpp $) +add_c_test(c-raw-connection-test raw_connection_test.cpp pn_test_proactor.cpp $) target_link_libraries(c-raw-connection-test qpid-proton-core ${PLATFORM_LIBS} ${PROACTOR_LIBS}) add_c_test(c-ssl-proactor-test pn_test_proactor.cpp ssl_proactor_test.cpp) diff --git a/c/tests/raw_connection_test.cpp b/c/tests/raw_connection_test.cpp index 0f31c4910..9f5a9b72e 100644 --- a/c/tests/raw_connection_test.cpp +++ b/c/tests/raw_connection_test.cpp @@ -832,3 +832,90 @@ TEST_CASE("raw connection") { } } } + +// WAKE tests require a running proactor. + +#include "../src/proactor/proactor-internal.h" +#include "./pn_test_proactor.hpp" +#include +#include + +namespace { + +class common_handler : public handler { + handler *accept_; // Handler for accepted connections + bool close_on_wake_; + pn_raw_connection_t *last_server_; + +public: + explicit common_handler(handler *accept =
[qpid-proton] branch main updated: PROTON-2658: fix Proton-C TLS buffer leak
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 2eab5c0f3 PROTON-2658: fix Proton-C TLS buffer leak 2eab5c0f3 is described below commit 2eab5c0f3be03412600d3682803cca19bf658159 Author: Clifford Jansen AuthorDate: Mon Dec 5 10:06:56 2022 -0800 PROTON-2658: fix Proton-C TLS buffer leak --- c/src/tls/openssl.c | 58 +++- c/tests/tls_test.cpp | 68 2 files changed, 125 insertions(+), 1 deletion(-) diff --git a/c/src/tls/openssl.c b/c/src/tls/openssl.c index ab125cb82..8c96a35d6 100644 --- a/c/src/tls/openssl.c +++ b/c/src/tls/openssl.c @@ -222,7 +222,8 @@ struct pn_tls_t { static void encrypt(pn_tls_t *); static void decrypt(pn_tls_t *); static int pn_tls_alpn_cb(SSL *ssn, const unsigned char **out, unsigned char *outlen, const unsigned char *in, unsigned int inlen, void *arg); - +static void release_buffers(pn_tls_t * +); static void tls_initialize_buffers(pn_tls_t *tls) { // Link together free lists @@ -1143,6 +1144,8 @@ int pn_tls_stop(pn_tls_t *tls) { if (tls->stopped) return PN_ARG_ERR; tls->stopped = true; + release_buffers(tls); + if (tls->validating) validate_strict(tls); return 0; } @@ -1927,6 +1930,59 @@ static buff_ptr current_decrypted_result(pn_tls_t *tls) { return tls->dresult_first_blank; } +static void release_buffers(pn_tls_t *tls) { + // encrypt input + for(;tls->encrypt_first_pending;) { +buff_ptr p = tls->encrypt_first_pending; +assert(tls->encrypt_buffers[p-1].type == buff_encrypt_pending); +if (!tls->encrypt_first_done) { + tls->encrypt_first_done = p; +} +if (tls->encrypt_last_done) { + tls->encrypt_buffers[tls->encrypt_last_done-1].next = p; +} +tls->encrypt_last_done = p; +tls->encrypt_first_pending = tls->encrypt_buffers[p-1].next; + +tls->encrypt_buffers[p-1].next = 0; +tls->encrypt_buffers[p-1].type = buff_encrypt_done; + } + + // decrypt input + for(;tls->decrypt_first_pending;) { +buff_ptr p = tls->decrypt_first_pending; +assert(tls->decrypt_buffers[p-1].type == buff_decrypt_pending); +if (!tls->decrypt_first_done) { + tls->decrypt_first_done = p; +} +if (tls->decrypt_last_done) { + tls->decrypt_buffers[tls->decrypt_last_done-1].next = p; +} +tls->decrypt_last_done = p; +tls->decrypt_first_pending = tls->decrypt_buffers[p-1].next; + +tls->decrypt_buffers[p-1].next = 0; +tls->decrypt_buffers[p-1].type = buff_decrypt_done; + } + + // encrypt output + for(;tls->eresult_first_blank;) { +buff_ptr p = tls->eresult_first_blank; +assert(tls->eresult_buffers[p-1].type == buff_eresult_blank); +blank_eresult_pop(tls, p); +encrypted_result_add(tls, p); + } + + // decrypt output + for(;tls->dresult_first_blank;) { +buff_ptr p = tls->dresult_first_blank; +assert(tls->dresult_buffers[p-1].type == buff_dresult_blank); +blank_dresult_pop(tls, p); +decrypted_result_add(tls, p); + } + +} + static bool try_shutdown(pn_tls_t *tls) { assert(tls->enc_closed && !tls->encrypt_first_pending && !tls->ssl_shutdown); bool success = false; diff --git a/c/tests/tls_test.cpp b/c/tests/tls_test.cpp index 44b4b972f..c735abe44 100644 --- a/c/tests/tls_test.cpp +++ b/c/tests/tls_test.cpp @@ -471,3 +471,71 @@ TEST_CASE("missing client cert") { set_rbuf(&early_data_buf, NULL, 0, 0); reset_rbuf(&early_data_buf); } + +TEST_CASE("buffer release on pn_tls_stop()") { + pn_raw_buffer_t bufs[4]; + size_t give_count = 0; + size_t take_count = 0; + TestPeer client(false); + client.init(); // ... up to pn_tls_start() + pn_tls_t *tls = client.tls; + + // Confirm we can add them but not take out empty buffers until after pn_tls_top() + size_t count = 1; + for (size_t i = 0; i < count; i++) +bufs[i] = new_rbuf(512); + give_count += count; + REQUIRE(pn_tls_give_encrypt_input_buffers(tls, bufs, count) == count); + REQUIRE(pn_tls_take_encrypt_input_buffers(tls, bufs, count) == 0); + + count = 2; + for (size_t i = 0; i < count; i++) +bufs[i] = new_rbuf(512); + give_count += count; + REQUIRE(pn_tls_give_decrypt_input_buffers(tls, bufs, count) == count); + REQUIRE(pn_tls_take_decrypt_input_buffers(tls, bufs, count) == 0); + + count = 3; + for (size_t i = 0; i < count; i++) +bufs[i] = new_rbuf(512); + give_count += count; + REQUIRE(pn_tls_give_encrypt_output_buffers(tls, bufs, count) == count); + REQUIRE(pn_tls_get_encrypt_output_buffer_count(tls) == 0); + REQUIRE(pn_tls_take_encrypt_output_buffers(tls, bufs, count) == 0); + + count
[qpid-proton] branch main updated: PROTON-2643: C ssl driver - avoid hang in handshake in older versions of OpenSSL
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new c9d8caa07 PROTON-2643: C ssl driver - avoid hang in handshake in older versions of OpenSSL c9d8caa07 is described below commit c9d8caa07a229f3d255159316fa55441ff638752 Author: Clifford Jansen AuthorDate: Thu Nov 24 10:31:55 2022 -0800 PROTON-2643: C ssl driver - avoid hang in handshake in older versions of OpenSSL --- c/src/ssl/openssl.c | 5 + 1 file changed, 5 insertions(+) diff --git a/c/src/ssl/openssl.c b/c/src/ssl/openssl.c index 145f60e0b..caa7ea011 100644 --- a/c/src/ssl/openssl.c +++ b/c/src/ssl/openssl.c @@ -114,6 +114,7 @@ struct pni_ssl_t { bool ssl_closed; // shutdown complete, or SSL error bool read_blocked;// SSL blocked until more network data is read bool write_blocked; // SSL blocked until data is written to network + bool handshake_ok; int err_reason; char *subject; @@ -1275,6 +1276,10 @@ static ssize_t process_output_ssl( pn_transport_t *transport, unsigned int layer ssl->write_blocked = false; work_pending = work_pending || max_len > 0; ssl_log(transport, PN_LEVEL_TRACE, "Read %d bytes from BIO Layer", available ); + } else if ( !ssl->handshake_ok && !ssl->ssl_closed ) { +// OpenSSL bug workaround 1.0.x -> unknown. Harmless in all versions. +// See PROTON-2643. SSL_do_handshake() prevents forgetting to refill the BIO. +ssl->handshake_ok = (SSL_do_handshake(ssl->ssl) == 1); } } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2647: make send-abort example handle variations in number of FLOW events
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 6255d0b46 PROTON-2647: make send-abort example handle variations in number of FLOW events 6255d0b46 is described below commit 6255d0b4698472d45a15550a217be0aa07277db2 Author: Clifford Jansen AuthorDate: Sun Nov 6 23:47:58 2022 -0800 PROTON-2647: make send-abort example handle variations in number of FLOW events --- c/examples/send-abort.c | 24 +++- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/c/examples/send-abort.c b/c/examples/send-abort.c index d68352063..7f28addb3 100644 --- a/c/examples/send-abort.c +++ b/c/examples/send-abort.c @@ -137,19 +137,25 @@ static bool handle(app_data_t* app, pn_event_t* event) { pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent))); pn_link_send(sender, msgbuf.start, msgbuf.size - HOLDBACK); /* Send some part of message */ app->in_progress = true; - /* Return from this link flow event and abort the message on next, */ + /* Return from this link flow event and abort the message on future FLOW, */ break; } else { pn_delivery_t * pnd = pn_link_current(sender); - pn_delivery_abort(pnd); - /* aborted delivery is presettled and never ack'd. */ - if (++app->aborted == app->message_count) { -printf("%d messages started and aborted\n", app->aborted); -pn_connection_close(pn_event_connection(event)); -/* Continue handling events till we receive TRANSPORT_CLOSED */ + if (pn_delivery_pending(pnd) == 0) { +// All message data from pn_link_send has been processed to physical frames. +pn_delivery_abort(pnd); +/* aborted delivery is presettled and never ack'd. */ +if (++app->aborted == app->message_count) { + printf("%d messages started and aborted\n", app->aborted); + pn_connection_close(pn_event_connection(event)); + /* Continue handling events till we receive TRANSPORT_CLOSED */ +} +++app->sent; +app->in_progress = false; + } else { +// Keep checking FLOW events until all message data forwarded to peer. +break; } - ++app->sent; - app->in_progress = false; } } break; - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2641: previous commit ref should be this, epoll proactor change socket read() to recv()
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 2821e5bfc PROTON-2641: previous commit ref should be this, epoll proactor change socket read() to recv() 2821e5bfc is described below commit 2821e5bfc3ff5a789f901ff96ff67e038b55881f Author: Clifford Jansen AuthorDate: Mon Oct 31 12:20:48 2022 -0700 PROTON-2641: previous commit ref should be this, epoll proactor change socket read() to recv() - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2461: epoll proactor change socket read() to recv()
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 93960f1e2 PROTON-2461: epoll proactor change socket read() to recv() 93960f1e2 is described below commit 93960f1e2129cf98200bdb2ab31e9ad868f71f61 Author: Clifford Jansen AuthorDate: Mon Oct 31 11:59:20 2022 -0700 PROTON-2461: epoll proactor change socket read() to recv() --- c/src/proactor/epoll.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index e78b91f67..ae9971b38 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -1209,14 +1209,14 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, if (!pconnection_rclosed(pc)) { pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver); if (rbuf.size > 0 && !pc->read_blocked) { - ssize_t n = read(pc->psocket.epoll_io.fd, rbuf.start, rbuf.size); + ssize_t n = recv(pc->psocket.epoll_io.fd, rbuf.start, rbuf.size, 0); if (n > 0) { pn_connection_driver_read_done(&pc->driver, n); // If n == rbuf.size then we should enlarge the buffer and see if there is more to read if ((size_t)n==rbuf.size) { rbuf = pn_connection_driver_read_buffer_sized(&pc->driver, n*2); if (rbuf.size > 0) { -n = read(pc->psocket.epoll_io.fd, rbuf.start, rbuf.size); +n = recv(pc->psocket.epoll_io.fd, rbuf.start, rbuf.size, 0); if (n > 0) { pn_connection_driver_read_done(&pc->driver, n); } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2640: fix JIRA ref on previous commit for default max frame size
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 50c10c6d7 PROTON-2640: fix JIRA ref on previous commit for default max frame size 50c10c6d7 is described below commit 50c10c6d769dbf440ae5a760379ad0103881aa8e Author: Clifford Jansen AuthorDate: Mon Oct 31 10:26:27 2022 -0700 PROTON-2640: fix JIRA ref on previous commit for default max frame size --- c/src/core/engine-internal.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h index 56cf4d6fd..5ff6ae4ce 100644 --- a/c/src/core/engine-internal.h +++ b/c/src/core/engine-internal.h @@ -140,7 +140,7 @@ struct pn_transport_t { pn_data_t *remote_desired_capabilities; pn_data_t *remote_properties; pn_data_t *disp_data; - // DEFAULT_MAX_FRAME_SIZE see PROTON-2460 + // DEFAULT_MAX_FRAME_SIZE see PROTON-2640 #define PN_DEFAULT_MAX_FRAME_SIZE (32*1024) uint32_t local_max_frame; uint32_t remote_max_frame; - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2460: if max frame size not set by application, set default max frame size to 32k
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new e245c19d8 PROTON-2460: if max frame size not set by application, set default max frame size to 32k e245c19d8 is described below commit e245c19d8ea421994eacffd46bf078ae169ce8b6 Author: Clifford Jansen AuthorDate: Mon Oct 31 10:21:41 2022 -0700 PROTON-2460: if max frame size not set by application, set default max frame size to 32k --- c/src/core/engine-internal.h | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h index 7554dd36c..56cf4d6fd 100644 --- a/c/src/core/engine-internal.h +++ b/c/src/core/engine-internal.h @@ -140,9 +140,8 @@ struct pn_transport_t { pn_data_t *remote_desired_capabilities; pn_data_t *remote_properties; pn_data_t *disp_data; - //#define PN_DEFAULT_MAX_FRAME_SIZE (16*1024) -/* This is wrong and bad we should really use a sensible starting size not unlimited */ -#define PN_DEFAULT_MAX_FRAME_SIZE (0) /* for now, allow unlimited size */ + // DEFAULT_MAX_FRAME_SIZE see PROTON-2460 +#define PN_DEFAULT_MAX_FRAME_SIZE (32*1024) uint32_t local_max_frame; uint32_t remote_max_frame; pn_condition_t remote_condition; - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2633: epoll proactor write flush functionality
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new b69fc15c7 PROTON-2633: epoll proactor write flush functionality b69fc15c7 is described below commit b69fc15c7d5eff72872706c2093ca149786c9a7a Author: Clifford Jansen AuthorDate: Sun Oct 30 23:54:47 2022 -0700 PROTON-2633: epoll proactor write flush functionality --- c/include/proton/proactor.h | 16 c/src/proactor/epoll.c | 10 ++ c/src/proactor/libuv.c | 3 +++ c/src/proactor/win_iocp.cpp | 3 +++ 4 files changed, 32 insertions(+) diff --git a/c/include/proton/proactor.h b/c/include/proton/proactor.h index 93b9c894b..1d7ff6c8c 100644 --- a/c/include/proton/proactor.h +++ b/c/include/proton/proactor.h @@ -292,6 +292,22 @@ PNP_EXTERN void pn_proactor_release_connection(pn_connection_t *connection); */ PNP_EXTERN void pn_connection_wake(pn_connection_t *connection); +/** + * **Unsettled API** Send available AMQP protocol frames to the remote peer. + * + * Generate as many currently availabe AMQP frames for @p connection that can be sent on + * the network to the remote peer without blocking. May help reduce latency, at the expense of + * extra processing overhead, for event handlers that spend a long time processing a + * single event batch. Has little effect if called soon before a call to + * pn_proactor_done(). + * + * @note **Not thread-safe**. Call this function from a connection + * event handler. + * + * @note If @p connection does not belong to a proactor, this call does nothing. + */ +PNP_EXTERN void pn_connection_write_flush(pn_connection_t *connection); + /** * Return the proactor associated with a connection. * diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 35a728984..e78b91f67 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -2946,3 +2946,13 @@ int64_t pn_proactor_now_64(void) { clock_gettime(CLOCK_MONOTONIC, &t); return ((int64_t)t.tv_sec) * 1000 + t.tv_nsec / 100; } + +void pn_connection_write_flush(pn_connection_t *c) { + pconnection_t *pc = get_pconnection(c); + if (pc) { +// Assume can write and have frames to write. Booleans will be correctly re-evaluated in write_flush(). +pc->write_blocked = false; +pc->output_drained = false; +write_flush(pc); // May generate transport event. + } +} diff --git a/c/src/proactor/libuv.c b/c/src/proactor/libuv.c index 9179823a0..aebd4a238 100644 --- a/c/src/proactor/libuv.c +++ b/c/src/proactor/libuv.c @@ -1368,6 +1368,9 @@ int64_t pn_proactor_now_64(void) { return uv_hrtime() / 100; // uv_hrtime returns time in nanoseconds } +// Empty stub for pending write flush functionality. +void pn_connection_write_flush(pn_connection_t *connection) {} + // Empty stubs for raw connection code pn_raw_connection_t *pn_raw_connection(void) { return NULL; } void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) {} diff --git a/c/src/proactor/win_iocp.cpp b/c/src/proactor/win_iocp.cpp index 7e7f1378d..4293d4464 100644 --- a/c/src/proactor/win_iocp.cpp +++ b/c/src/proactor/win_iocp.cpp @@ -3421,6 +3421,9 @@ int64_t pn_proactor_now_64(void) { return GetTickCount64(); } +// Empty stub for pending write flush functionality. +void pn_connection_write_flush(pn_connection_t *connection) {} + // Empty stubs for raw connection code pn_raw_connection_t *pn_raw_connection(void) { return NULL; } void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) {} - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2622: tls library - ensure capacity values match given capacity
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 2e8c767a3 PROTON-2622: tls library - ensure capacity values match given capacity 2e8c767a3 is described below commit 2e8c767a3bf2d2488a9edeac854243dcfe37068d Author: Clifford Jansen AuthorDate: Wed Oct 26 12:17:58 2022 -0700 PROTON-2622: tls library - ensure capacity values match given capacity --- c/src/tls/openssl.c | 18 -- 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/c/src/tls/openssl.c b/c/src/tls/openssl.c index 4aadc8861..ab125cb82 100644 --- a/c/src/tls/openssl.c +++ b/c/src/tls/openssl.c @@ -442,8 +442,18 @@ size_t pn_tls_get_session_error_string(pn_tls_t* tls, char *buf, size_t buf_len) return strlen(buf); } -size_t pn_tls_get_encrypt_input_buffer_capacity(pn_tls_t *tls) { return tls->encrypt_buffer_empty_count; } -size_t pn_tls_get_decrypt_input_buffer_capacity(pn_tls_t *tls) { return tls->decrypt_buffer_empty_count; } +size_t pn_tls_get_encrypt_input_buffer_capacity(pn_tls_t *tls) { + if (tls->enc_closed || tls->stopped || tls->pn_tls_err) +return 0; + return tls->encrypt_buffer_empty_count; +} + +size_t pn_tls_get_decrypt_input_buffer_capacity(pn_tls_t *tls) { +if (tls->dec_closed || tls->stopped || tls->pn_tls_err) +return 0; +return tls->decrypt_buffer_empty_count; +} + size_t pn_tls_get_encrypt_output_buffer_capacity(pn_tls_t *tls) { return tls->eresult_empty_count; } size_t pn_tls_get_decrypt_output_buffer_capacity(pn_tls_t *tls) { return tls->dresult_empty_count; } @@ -1566,8 +1576,6 @@ static void pbuffer_to_raw_buffer(pbuffer_t *pbuf, pn_raw_buffer_t *rbuf) { size_t pn_tls_give_encrypt_input_buffers(pn_tls_t* tls, pn_raw_buffer_t const* bufs, size_t count_bufs) { assert(tls); - if (tls->enc_closed || tls->stopped || tls->pn_tls_err) -return 0; size_t can_take = pn_min(count_bufs, pn_tls_get_encrypt_input_buffer_capacity(tls)); if ( can_take==0 ) return 0; @@ -1602,8 +1610,6 @@ size_t pn_tls_give_encrypt_input_buffers(pn_tls_t* tls, pn_raw_buffer_t const* b size_t pn_tls_give_decrypt_input_buffers(pn_tls_t* tls, pn_raw_buffer_t const* bufs, size_t count_bufs) { assert(tls); - if (tls->dec_closed || tls->stopped || tls->pn_tls_err) -return 0; size_t can_take = pn_min(count_bufs, pn_tls_get_decrypt_input_buffer_capacity(tls)); if ( can_take==0 ) return 0; - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated (a4375a835 -> c1171d9ab)
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git from a4375a835 PROTON-2590: [Python] Remove use of "from __future__ import ..." new f8f538645 PROTON-2612: TLS OpenSSL library: initialize read raw buffer size to use whole capacity new c1171d9ab PROTON-2613: TLS OpenSSL library: fix SSL_write configuration for partial writes The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: c/src/tls/openssl.c | 5 + 1 file changed, 5 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 01/02: PROTON-2612: TLS OpenSSL library: initialize read raw buffer size to use whole capacity
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit f8f5386455c8bfec3cc40fe8271acdd50b4f2f4f Author: Clifford Jansen AuthorDate: Thu Sep 15 10:14:55 2022 -0700 PROTON-2612: TLS OpenSSL library: initialize read raw buffer size to use whole capacity --- c/src/tls/openssl.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/c/src/tls/openssl.c b/c/src/tls/openssl.c index d36232604..c05e09b99 100644 --- a/c/src/tls/openssl.c +++ b/c/src/tls/openssl.c @@ -1646,6 +1646,7 @@ size_t pn_tls_give_encrypt_output_buffers(pn_tls_t* tls, pn_raw_buffer_t const* // Get next free assert(tls->eresult_buffers[current-1].type == buff_empty); raw_buffer_to_pbuffer(bufs + i, &tls->eresult_buffers[current-1], buff_eresult_blank); +tls->eresult_buffers[current-1].size = 0; previous = current; current = tls->eresult_buffers[current-1].next; } @@ -1673,6 +1674,7 @@ size_t pn_tls_give_decrypt_output_buffers(pn_tls_t* tls, pn_raw_buffer_t const* // Get next free assert(tls->dresult_buffers[current-1].type == buff_empty); raw_buffer_to_pbuffer(bufs + i, &tls->dresult_buffers[current-1], buff_dresult_blank); +tls->dresult_buffers[current-1].size = 0; previous = current; current = tls->dresult_buffers[current-1].next; } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 02/02: PROTON-2613: TLS OpenSSL library: fix SSL_write configuration for partial writes
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit c1171d9ab347cbd1442a7f77ad6dd4db74f8619b Author: Clifford Jansen AuthorDate: Thu Sep 15 10:25:12 2022 -0700 PROTON-2613: TLS OpenSSL library: fix SSL_write configuration for partial writes --- c/src/tls/openssl.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/c/src/tls/openssl.c b/c/src/tls/openssl.c index c05e09b99..4aadc8861 100644 --- a/c/src/tls/openssl.c +++ b/c/src/tls/openssl.c @@ -1205,6 +1205,9 @@ static int init_ssl_socket(pn_tls_t *ssl, pn_tls_config_t *domain) return -1; } + // Enable "write as much as you hve buffer space for", similar to BIOs and raw sockets. + SSL_set_mode(ssl->ssl, SSL_MODE_ENABLE_PARTIAL_WRITE); + // store backpointer SSL_set_ex_data(ssl->ssl, tls_ex_data_index, ssl); - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2586: TLS OpenSSL library - incomplete decryption/encryption of staged buffers
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new b9fe1739d PROTON-2586: TLS OpenSSL library - incomplete decryption/encryption of staged buffers b9fe1739d is described below commit b9fe1739d4f2ba59c136a7f5e87b30bab5950e5a Author: Clifford Jansen AuthorDate: Tue Aug 2 22:59:23 2022 -0700 PROTON-2586: TLS OpenSSL library - incomplete decryption/encryption of staged buffers PROTON-2586: TLS OpenSSL library - incomplete decryption/encryption of staged buffers --- c/src/tls/openssl.c | 17 +++-- 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/c/src/tls/openssl.c b/c/src/tls/openssl.c index 39d865d2d..d36232604 100644 --- a/c/src/tls/openssl.c +++ b/c/src/tls/openssl.c @@ -1949,10 +1949,12 @@ static void encrypt(pn_tls_t *tls) { while (true) { // Insert unencrypted data into BIO. +// This loop must run even if tls->pn_tls_err, in order to extract the error message for the peer. // OpenSSL maps each write to a separate TLS record. // The SSL can take 16KB + a bit before blocking. // TODO: consider allowing application to configure BIO buffer size on encrypt side. -while (pending && !tls->enc_wblocked && tls->can_shutdown && !tls->pn_tls_err) { + +while (pending && !tls->enc_wblocked && tls->can_shutdown) { size_t n = pending->size - tls->encrypt_pending_offset; if (n) { char *bytes = pending->bytes + pending->offset + tls->encrypt_pending_offset; @@ -2012,8 +2014,9 @@ static void encrypt(pn_tls_t *tls) { } } -// Done if output buffers exhausted or all available encrypted bytes drained from BIO. -if (!curr_result || tls->enc_rblocked) +// Done if not possible to move any more bytes from input to output bufs +if ((!pending || tls->enc_wblocked || !tls->can_shutdown) // write side +&& (!curr_result || tls->enc_rblocked)) // read side break; } } @@ -2095,8 +2098,10 @@ static void decrypt(pn_tls_t *tls) { } } -// Done if outbufs exhausted or all inbufs decrypted -if (!curr_result || tls->dec_rblocked || tls->dec_closed) +// Done if not possible to move any more bytes from input to output bufs +if (tls->dec_closed) break; +if ((!pending || tls->dec_wblocked) // write side +&& (!curr_result || tls->dec_rblocked)) // read side break; } @@ -2124,7 +2129,7 @@ int pn_tls_process(pn_tls_t* tls) { decrypt(tls); // Do this first. May generate handshake or other on encrypt side. if (tls->validating) validate_strict(tls); } - // We keep sending if there is a "minor" error that may result in an error message for the peer + // We keep sending as long as we might generate an error message for the peer if (!(tls->pn_tls_err && tls->openssl_err_type == SSL_ERROR_SYSCALL)) { encrypt(tls); if (tls->validating) validate_strict(tls); - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2535: TLS OpenSSL library: remove BIO on decrypt side for accurate errors and byte counts
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new c33f1abc1 PROTON-2535: TLS OpenSSL library: remove BIO on decrypt side for accurate errors and byte counts c33f1abc1 is described below commit c33f1abc17936ea4f8a4d1545ea626cd9fd3577f Author: Clifford Jansen AuthorDate: Fri Jun 24 10:32:04 2022 -0700 PROTON-2535: TLS OpenSSL library: remove BIO on decrypt side for accurate errors and byte counts --- c/experimental/raw_plus_tls2.c | 408 - c/src/tls/openssl.c| 108 +-- c/tests/tls_test.cpp | 236 +++- 3 files changed, 486 insertions(+), 266 deletions(-) diff --git a/c/experimental/raw_plus_tls2.c b/c/experimental/raw_plus_tls2.c index 9b285e6ed..f65c290d1 100644 --- a/c/experimental/raw_plus_tls2.c +++ b/c/experimental/raw_plus_tls2.c @@ -29,20 +29,23 @@ #include #include #include -#include #include /* * Jabberwock raw connection example with and without TLS. * - * One client and one server take turns sending some lines of the poem. - * The simple "application" logic resides in some_jabber() and gobble_jabber(). + * One client and one server take turns sending a line of the poem. + * The simple "application" logic resides in line_of_jabber() and gobble_jabber(). * handle_outgoing() and handle_incoming() handle the application IO * plumbing to use Proton raw connections, with or without TLS. * * See the "no_tls" option to contrast the approach compared to TLS usage. * + * This example is frugal in the number of buffers it uses, giving at most one + * pn_raw_buffer_t at any time to the read or write sides of raw connections and + * the TLS engine. + * * In this example orderly termination of the connection is initiated by one side. * The initiator does not wait for any close handshake from the peer. * The peer looks for confirmation of orderly closure from the initiator. @@ -70,23 +73,44 @@ pn_tls_config_set_credentials(DOMAIN, CERTIFICATE(NAME), SSL_FILE(NAME "-private-key.pem"), SSL_PW) #endif +static void jfatal(const char *file, int line) { + fprintf(stderr, "epoll proactor failure in %s:%d\n", __FILE__, __LINE__); + abort(); \ +} +#define jcheck( EXPR ) \ + { if (!(EXPR)) jfatal(__FILE__,__LINE__); } + +static uintptr_t JBR_UNUSED = 0; +static uintptr_t JBR_INUSE = 1; + // A raw buffer "pool", in name only -static void rbuf_pool_get(pn_raw_buffer_t *bufs, uint32_t num) { - memset(bufs, 0, sizeof(pn_raw_buffer_t) * num); - while (num--) { -bufs->bytes = calloc(1, 4096); -bufs->capacity = 4096; -bufs++; - } +static void rbuf_pool_get(pn_raw_buffer_t *buf) { + memset(buf, 0, sizeof(pn_raw_buffer_t)); + buf->bytes = calloc(1, 4096); + buf->capacity = 4096; + buf->context = JBR_UNUSED; } -static void rbuf_pool_return(pn_raw_buffer_t *buf) { - free(buf->bytes); +static void rbuf_pool_return(pn_raw_buffer_t *rbuf) { + free(rbuf->bytes); +} + +// Buffer is "in use" if ownership transferred to raw connection or TLS until a future event. +static void buf_set_in_use(pn_raw_buffer_t *rbuf, bool in_use) { + rbuf->context = in_use ? JBR_INUSE : JBR_UNUSED; +} +static bool buf_in_use(pn_raw_buffer_t *rbuf) { + return rbuf->context == JBR_INUSE; +} +static bool buf_unused(pn_raw_buffer_t *rbuf) { + return !buf_in_use(rbuf); } -static void rbuf_pool_multi_return(pn_raw_buffer_t *buf, size_t n) { - while(n--) -rbuf_pool_return(buf++); +static void buf_reset(pn_raw_buffer_t *rbuf) { + rbuf->size = 0; + rbuf->offset = 0; + memset(rbuf->bytes, 0, rbuf->capacity); + buf_set_in_use(rbuf, false); } static size_t size_t_min(size_t a, size_t b) { @@ -120,10 +144,15 @@ typedef struct jabber_connection_t { bool connecting; bool tls_closing; + bool input_done; bool orderly_close_initiated; bool orderly_close_detected; bool tls_error; + pn_raw_buffer_t out_wire_buf; + pn_raw_buffer_t in_wire_buf; + pn_raw_buffer_t out_app_buf; + pn_raw_buffer_t in_app_buf; bool is_server; bool jabber_turn; char *alpn_protocol; @@ -145,30 +174,24 @@ static const char* jlines[] = { static size_t jlines_count = sizeof(jlines) / sizeof(jlines[0]); -static size_t some_jabber(jabber_connection_t *jc, pn_raw_buffer_t *rbufp, size_t nrbufs) { +// Provide one line of poem into rbuf. +static void line_of_jabber(jabber_connection_t *jc, pn_raw_buffer_t *rbufp) { jabber_t *j = jc->parent; const char *self = jc->is_server ? "server" : "client"; - size_t actual = 0; - // Simuate varying output for each run by choo
[qpid-proton] branch main updated: PROTON-2544: temporary warning-as-error disable for DH_xxx calls in openssl related code
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 5aaa655cb PROTON-2544: temporary warning-as-error disable for DH_xxx calls in openssl related code 5aaa655cb is described below commit 5aaa655cb1023e9fb48e59f64e3d0c1307ea54f2 Author: Clifford Jansen AuthorDate: Wed Jun 1 18:28:03 2022 -0700 PROTON-2544: temporary warning-as-error disable for DH_xxx calls in openssl related code --- c/src/ssl/openssl.c | 7 +++ c/src/tls/openssl.c | 7 +++ 2 files changed, 14 insertions(+) diff --git a/c/src/ssl/openssl.c b/c/src/ssl/openssl.c index 2681d2d63..145f60e0b 100644 --- a/c/src/ssl/openssl.c +++ b/c/src/ssl/openssl.c @@ -371,6 +371,10 @@ static int verify_callback(int preverify_ok, X509_STORE_CTX *ctx) return preverify_ok; } +// Temporary: PROTON-2544 for build. Next release: replace or remove DH_xxx() functions. +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" + // This was introduced in v1.1 #if OPENSSL_VERSION_NUMBER < 0x1010 int DH_set0_pqg(DH *dh, BIGNUM *p, BIGNUM *q, BIGNUM *g) @@ -570,6 +574,9 @@ static bool pni_init_ssl_domain( pn_ssl_domain_t * domain, pn_ssl_mode_t mode ) return true; } +// PROTON-2544: see earlier related push. Temporary only. +#pragma GCC diagnostic pop + pn_ssl_domain_t *pn_ssl_domain( pn_ssl_mode_t mode ) { pn_ssl_domain_t *domain = (pn_ssl_domain_t *) calloc(1, sizeof(pn_ssl_domain_t)); diff --git a/c/src/tls/openssl.c b/c/src/tls/openssl.c index d26836c0d..de07aca62 100644 --- a/c/src/tls/openssl.c +++ b/c/src/tls/openssl.c @@ -692,6 +692,10 @@ static int verify_callback(int preverify_ok, X509_STORE_CTX *ctx) return preverify_ok; } +// Temporary: PROTON-2544 for build. Next release: replace or remove DH_xxx() functions. +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" + // This was introduced in v1.1 #if OPENSSL_VERSION_NUMBER < 0x1010 int DH_set0_pqg(DH *dh, BIGNUM *p, BIGNUM *q, BIGNUM *g) @@ -868,6 +872,9 @@ static bool pni_init_ssl_domain( pn_tls_config_t * domain, pn_tls_mode_t mode ) return true; } +// PROTON-2544: see earlier related push. Temporary only. +#pragma GCC diagnostic pop + pn_tls_config_t *pn_tls_config( pn_tls_mode_t mode ) { pn_tls_config_t *domain = (pn_tls_config_t *) calloc(1, sizeof(pn_tls_config_t)); - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-1870: openssl layer: flush TLS error alert to peer for better error reporting
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new dfebbe8c PROTON-1870: openssl layer: flush TLS error alert to peer for better error reporting dfebbe8c is described below commit dfebbe8c10666fecb65f74c99199fc7e013997ef Author: Clifford Jansen AuthorDate: Thu May 12 07:11:45 2022 -0700 PROTON-1870: openssl layer: flush TLS error alert to peer for better error reporting --- c/src/ssl/openssl.c | 35 +-- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/c/src/ssl/openssl.c b/c/src/ssl/openssl.c index b4b75645..2681d2d6 100644 --- a/c/src/ssl/openssl.c +++ b/c/src/ssl/openssl.c @@ -114,6 +114,7 @@ struct pni_ssl_t { bool ssl_closed; // shutdown complete, or SSL error bool read_blocked;// SSL blocked until more network data is read bool write_blocked; // SSL blocked until data is written to network + int err_reason; char *subject; X509 *peer_certificate; @@ -193,14 +194,30 @@ static void ssl_log_clear_data(pn_transport_t *transport, const char *data, size } // unrecoverable SSL failure occurred, notify transport and generate error code. -static int ssl_failed(pn_transport_t *transport) +static int ssl_failed(pn_transport_t *transport, int reason) { pni_ssl_t *ssl = transport->ssl; - SSL_set_shutdown(ssl->ssl, SSL_SENT_SHUTDOWN|SSL_RECEIVED_SHUTDOWN); - ssl->ssl_closed = true; + bool first_fail = true; + if (ssl->err_reason != 0) +first_fail = false; // No need for second transport error event. + else +ssl->err_reason = reason; + ssl->app_input_closed = ssl->app_output_closed = PN_EOS; // fake a shutdown so the i/o processing code will close properly SSL_set_shutdown(ssl->ssl, SSL_SENT_SHUTDOWN|SSL_RECEIVED_SHUTDOWN); + + if (first_fail && reason == SSL_ERROR_SSL) { +// Protocol error. Toss input and pending app output but best efforts for SSL generated output (i.e. error alerts) +ssl->out_count = 0; // Discard. No more writing into SSL engine allowed. + } else { +// SSL_ERROR_SYS or unknown error. SSL engine state unknown. Stop IO in all directions immediately. +ssl->ssl_closed = true; + } + // transport->io_layers[layer] is updated in handle_error_ssl + if (!first_fail) +return PN_EOS; + // try to grab the first SSL error to add to the failure log char buf[256] = "Unknown error"; unsigned long ssl_err = ERR_get_error(); @@ -208,6 +225,7 @@ static int ssl_failed(pn_transport_t *transport) ERR_error_string_n( ssl_err, buf, sizeof(buf) ); } ssl_log_flush(transport, PN_LEVEL_ERROR);// spit out any remaining errors to the log file + // Following call invokes hanlde_error_ssl which sets transport->io_layers[layer] pn_do_error(transport, "amqp:connection:framing-error", "SSL Failure: %s", buf); return PN_EOS; } @@ -1059,7 +1077,7 @@ static ssize_t process_input_ssl( pn_transport_t *transport, unsigned int layer, break; default: // unexpected error -return (ssize_t)ssl_failed(transport); +return (ssize_t)ssl_failed(transport, reason); } } else { if (BIO_should_write( ssl->bio_ssl )) { @@ -1152,7 +1170,12 @@ static ssize_t process_input_ssl( pn_transport_t *transport, unsigned int layer, static void handle_error_ssl(pn_transport_t *transport, unsigned int layer) { - transport->io_layers[layer] = &ssl_closed_layer; + // External errors do not affect SSL operation and graceful shutdown. + if (transport->ssl->err_reason == SSL_ERROR_SSL) { +transport->io_layers[layer] = &ssl_input_closed_layer; + } else { +transport->io_layers[layer] = &ssl_closed_layer; + } } static ssize_t process_output_ssl( pn_transport_t *transport, unsigned int layer, char *buffer, size_t max_len) @@ -1209,7 +1232,7 @@ static ssize_t process_output_ssl( pn_transport_t *transport, unsigned int layer break; default: // unexpected error - return (ssize_t)ssl_failed(transport); + return (ssize_t)ssl_failed(transport, reason); } } else { if (BIO_should_read( ssl->bio_ssl )) { - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2519: TLS Library - null pointer reference
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new d409829 PROTON-2519: TLS Library - null pointer reference d409829 is described below commit d4098290d6c56cd059b87e24dcb83429f8655d20 Author: Cliff Jansen AuthorDate: Mon Mar 14 22:27:28 2022 -0700 PROTON-2519: TLS Library - null pointer reference --- c/src/tls/openssl.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/c/src/tls/openssl.c b/c/src/tls/openssl.c index 779e2b8..d26836c 100644 --- a/c/src/tls/openssl.c +++ b/c/src/tls/openssl.c @@ -2271,6 +2271,8 @@ static int pn_tls_alpn_cb(SSL *ssn, int pn_tls_config_set_alpn_protocols(pn_tls_config_t *domain, const char **protocols, size_t protocol_count) { unsigned char *wire_bytes; size_t wb_len; + if ((protocols == NULL && protocol_count != 0) || (protocols && protocol_count == 0)) +return PN_ARG_ERR; if (protocols == NULL && protocol_count == 0) { free(domain->alpn_list); domain->alpn_list = NULL; - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2512: TLS library - remove unused code
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 532bbd8 PROTON-2512: TLS library - remove unused code 532bbd8 is described below commit 532bbd84cfe761c54a685c38edca864aeea3d0cd Author: Cliff Jansen AuthorDate: Mon Mar 14 22:11:41 2022 -0700 PROTON-2512: TLS library - remove unused code --- c/src/tls/openssl.c | 4 1 file changed, 4 deletions(-) diff --git a/c/src/tls/openssl.c b/c/src/tls/openssl.c index 7b889c5..779e2b8 100644 --- a/c/src/tls/openssl.c +++ b/c/src/tls/openssl.c @@ -1462,10 +1462,6 @@ static inline uint32_t room(pbuffer_t const *b) { return 0; } -static inline size_t size_min(uint32_t a, uint32_t b) { - return (a <= b) ? a : b; -} - bool pn_tls_is_secure(pn_tls_t * tls) { return tls->handshake_ok && !tls->pn_tls_err && !tls->stopped; } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2517: for pn_data used by emitters, save point, rewind, restore. This closes #362
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 4c0b03f PROTON-2517: for pn_data used by emitters, save point, rewind, restore. This closes #362 4c0b03f is described below commit 4c0b03f8e4dcb632b8232f6c5b97ce20156ae3f3 Author: Cliff Jansen AuthorDate: Mon Mar 14 12:16:10 2022 -0700 PROTON-2517: for pn_data used by emitters, save point, rewind, restore. This closes #362 --- c/src/core/emitters.h | 14 ++ 1 file changed, 14 insertions(+) diff --git a/c/src/core/emitters.h b/c/src/core/emitters.h index 1d10f34..5a5614e 100644 --- a/c/src/core/emitters.h +++ b/c/src/core/emitters.h @@ -534,12 +534,23 @@ static inline void emit_copy(pni_emitter_t* emitter, pni_compound_context* compo if (!data || pn_data_size(data) == 0) { pni_emitter_writef8(emitter, PNE_NULL); } else { +pn_handle_t point = pn_data_point(data); +pn_data_rewind(data); pni_emitter_data(emitter, data); +pn_data_restore(data, point); } compound->count++; } static inline void emit_multiple(pni_emitter_t* emitter, pni_compound_context* compound, pn_data_t* data) { + pn_handle_t point = {0}; + if (data) { +point = pn_data_point(data); +pn_data_rewind(data); +// Rewind and position to first node so data type is defined. +pn_data_next(data); + } + emit_accumulated_nulls(emitter, compound); if (!data || pn_data_size(data) == 0) { pni_emitter_writef8(emitter, PNE_NULL); @@ -560,7 +571,10 @@ static inline void emit_multiple(pni_emitter_t* emitter, pni_compound_context* c } else { pni_emitter_data(emitter, data); } + compound->count++; + if (data) +pn_data_restore(data, point); } static inline void emit_described_type_copy(pni_emitter_t* emitter, pni_compound_context* compound, uint64_t descriptor, pn_data_t* data) { - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2472: prevent early consumption of ready list on EINTR
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 3a063e3 PROTON-2472: prevent early consumption of ready list on EINTR 3a063e3 is described below commit 3a063e363c6a3a8807ac82955c5bb60926d6f0dd Author: Cliff Jansen AuthorDate: Wed Feb 2 23:54:29 2022 -0800 PROTON-2472: prevent early consumption of ready list on EINTR --- c/src/proactor/epoll-internal.h | 2 +- c/src/proactor/epoll.c | 41 - 2 files changed, 17 insertions(+), 26 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 8db12a1..79dddaa 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -192,7 +192,7 @@ struct pn_proactor_t { tslot_t *last_earmark; task_t *sched_ready_first; task_t *sched_ready_last; - task_t *sched_ready_current; // TODO: remove or use for sceduling priority or fairness + bool sched_ready_pending; unsigned int sched_ready_count; task_t *resched_first; task_t *resched_last; diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 6a243be..1ff68ef 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -250,8 +250,8 @@ void task_init(task_t *tsk, task_type_t t, pn_proactor_t *p) { * are needed to cross or reconcile the two portions of the list. */ -// Call with sched lock held. -static void pop_ready_task(task_t *tsk) { +// Call with sched lock held and sched_ready_count > 0. +static task_t *sched_ready_pop_front(pn_proactor_t *p) { // every task on the sched_ready_list is either currently running, // or to be scheduled. schedule() will not "see" any of the ready_next // pointers until ready and working have transitioned to 0 @@ -262,22 +262,19 @@ static void pop_ready_task(task_t *tsk) { // !ready .. schedule() .. on ready_list .. on sched_ready_list .. working task .. !sched_ready && !ready // // Intervening locks at each transition ensures ready_next has memory coherence throughout the ready task scheduling cycle. - // TODO: sched_ready list changed to sequential processing. Review need for sched_ready_current. - pn_proactor_t *p = tsk->proactor; - if (tsk == p->sched_ready_current) -p->sched_ready_current = tsk->ready_next; - assert (tsk == p->sched_ready_first); assert (p->sched_ready_count); + task_t *tsk = p->sched_ready_first; p->sched_ready_count--; if (tsk == p->sched_ready_last) { p->sched_ready_first = p->sched_ready_last = NULL; } else { p->sched_ready_first = tsk->ready_next; } - if (!p->sched_ready_first) { -p->sched_ready_last = NULL; -assert(p->sched_ready_count == 0); + if (p->sched_ready_count == 0) { +assert(!p->sched_ready_first); +p->sched_ready_pending = false; } + return tsk; } // Call only as the poller task that has already called schedule_ready_list() and already @@ -2273,8 +2270,6 @@ static void schedule_ready_list(pn_proactor_t *p) { if (!p->sched_ready_first) p->sched_ready_first = p->ready_list_first; p->sched_ready_last = p->ready_list_last; -if (!p->sched_ready_current) - p->sched_ready_current = p->sched_ready_first; p->ready_list_first = p->ready_list_last = NULL; // Track sched_ready_count to know how many threads may be needed. @@ -2403,17 +2398,14 @@ static task_t *next_runnable(pn_proactor_t *p, tslot_t *ts) { } } - // rest of sched_ready list - while (p->sched_ready_count) { -tsk = p->sched_ready_current; + // sched_ready list tasks deferred in poller_do_epoll() + while (p->sched_ready_pending) { +tsk = sched_ready_pop_front(p); assert(tsk->ready); // eventfd_mutex required post ready set and pre move to sched_ready_list if (post_ready(p, tsk)) { - pop_ready_task(tsk); // updates sched_ready_current assert(!tsk->runnables_idx && !tsk->runner); assign_thread(ts, tsk); return tsk; -} else { - pop_ready_task(tsk); } } @@ -2501,7 +2493,7 @@ static pn_event_batch_t *next_event_batch(pn_proactor_t* p, bool can_block) { static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block) { // As poller with lots to do, be mindful of hogging the sched lock. Release when making kernel calls. assert(!p->resched_cutoff); - assert(!p->sched_ready_first); + assert(!p->sched_ready_first && !p->sched_ready_pending); int n_events; task_t *tsk; bool unpolled_work = false; @@ -2608,21 +2600,20 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block if (warm_tries < 0) warm_tries = 0; - task_t
[qpid-proton] branch main updated: PROTON-2472: epoll proactor - fix ready list tracking when EINTR in epoll_wait
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 4650f3e PROTON-2472: epoll proactor - fix ready list tracking when EINTR in epoll_wait 4650f3e is described below commit 4650f3e0e549b1d964d40484ae0c5d7bfea3faf5 Author: Cliff Jansen AuthorDate: Wed Feb 2 11:20:13 2022 -0800 PROTON-2472: epoll proactor - fix ready list tracking when EINTR in epoll_wait --- c/src/proactor/epoll.c | 11 ++- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 19867af..6a243be 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -2265,7 +2265,8 @@ static pn_event_batch_t *process(task_t *tsk) { // Call with both sched_mutex and eventfd_mutex held static void schedule_ready_list(pn_proactor_t *p) { - // append ready_list_first..ready_list_last to end of sched_ready_last + // Append ready_list_first..ready_list_last to end of sched_ready_last + // May see several in single do_epoll() if EINTR. if (p->ready_list_first) { if (p->sched_ready_last) p->sched_ready_last->ready_next = p->ready_list_first; // join them @@ -2275,11 +2276,11 @@ static void schedule_ready_list(pn_proactor_t *p) { if (!p->sched_ready_current) p->sched_ready_current = p->sched_ready_first; p->ready_list_first = p->ready_list_last = NULL; - } - // Track sched_ready_count to know how many threads may be needed. - p->sched_ready_count = p->ready_list_count; - p->ready_list_count = 0; +// Track sched_ready_count to know how many threads may be needed. +p->sched_ready_count += p->ready_list_count; +p->ready_list_count = 0; + } } // Call with schedule lock and eventfd lock held. Called only by poller thread. - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2112: prevent fd overflow recovery during proactror shutdown
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 4eda877 PROTON-2112: prevent fd overflow recovery during proactror shutdown 4eda877 is described below commit 4eda877fdca73e38633a201103e57fc1d8525832 Author: Cliff Jansen AuthorDate: Wed Jan 19 17:29:51 2022 -0800 PROTON-2112: prevent fd overflow recovery during proactror shutdown --- c/src/proactor/epoll.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index adce4cc..19867af 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -756,7 +756,7 @@ static void proactor_rearm_overflow(pn_proactor_t *p) { // Close an FD and rearm overflow listeners. Call with no listener locks held. int pclosefd(pn_proactor_t *p, int fd) { int err = close(fd); - if (!err) proactor_rearm_overflow(p); + if (!err && !p->shutting_down) proactor_rearm_overflow(p); return err; } @@ -1639,13 +1639,14 @@ void pn_listener_free(pn_listener_t *l) { static void listener_begin_close(pn_listener_t* l) { if (!l->task.closing) { l->task.closing = true; +bool polling = !l->task.proactor->shutting_down; // Is poller still running? /* Close all listening sockets */ for (size_t i = 0; i < l->acceptors_size; ++i) { acceptor_t *a = &l->acceptors[i]; psocket_t *ps = &a->psocket; if (ps->epoll_io.fd >= 0) { -if (a->armed) { +if (a->armed && polling) { shutdown(ps->epoll_io.fd, SHUT_RD); // Force epoll event and callback } else { int fd = ps->epoll_io.fd; - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2484: epoll proactor - missing null pointer logic generously found by coverity
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 9ddecea PROTON-2484: epoll proactor - missing null pointer logic generously found by coverity 9ddecea is described below commit 9ddecea80f366c6898671a2545b762a44b675052 Author: Cliff Jansen AuthorDate: Fri Jan 14 10:30:38 2022 -0800 PROTON-2484: epoll proactor - missing null pointer logic generously found by coverity --- c/src/proactor/epoll.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index e375f11..adce4cc 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -1476,18 +1476,18 @@ void pn_connection_wake(pn_connection_t* c) { } void pn_proactor_release_connection(pn_connection_t *c) { - bool notify = false; pconnection_t *pc = get_pconnection(c); - pn_proactor_t *p = pc->task.proactor; if (pc) { +bool notify = false; +pn_proactor_t *p = pc->task.proactor; set_pconnection(c, NULL); lock(&pc->task.mutex); pn_connection_driver_release_connection(&pc->driver); pconnection_begin_close(pc); notify = schedule(&pc->task); unlock(&pc->task.mutex); +if (notify) notify_poller(p); } - if (notify) notify_poller(p); } // - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2484: epoll proactor - update fix for new introduced TSAN race
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 5101a6f PROTON-2484: epoll proactor - update fix for new introduced TSAN race 5101a6f is described below commit 5101a6f72485bd21e39ca62c956edd122954d260 Author: Cliff Jansen AuthorDate: Thu Jan 13 11:24:54 2022 -0800 PROTON-2484: epoll proactor - update fix for new introduced TSAN race --- c/src/proactor/epoll.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index aa1bd1b..e375f11 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -1670,9 +1670,9 @@ static void listener_begin_close(pn_listener_t* l) { void pn_listener_close(pn_listener_t* l) { bool notify = false; - pn_proactor_t *p = l->task.proactor; lock(&l->task.mutex); - if (l->task.proactor && !l->task.closing) { + pn_proactor_t *p = l->task.proactor; + if (p && !l->task.closing) { listener_begin_close(l); notify = schedule(&l->task); } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2484: epoll proactor - remove task memory reference after possible free
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new a19aa87 PROTON-2484: epoll proactor - remove task memory reference after possible free a19aa87 is described below commit a19aa87e66e4207da0dee51d0315bc087aabcd30 Author: Cliff Jansen AuthorDate: Thu Jan 13 09:19:35 2022 -0800 PROTON-2484: epoll proactor - remove task memory reference after possible free --- c/src/proactor/epoll.c| 22 ++ c/src/proactor/epoll_raw_connection.c | 8 +--- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index d2f394a..aa1bd1b 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -900,6 +900,7 @@ static void pconnection_forced_shutdown(pconnection_t *pc) { // Called from timer_manager with no locks. void pni_pconnection_timeout(pconnection_t *pc) { bool notify = false; + pn_proactor_t *p = pc->task.proactor; uint64_t now = pn_proactor_now_64(); lock(&pc->task.mutex); if (!pc->task.closing) { @@ -912,7 +913,7 @@ void pni_pconnection_timeout(pconnection_t *pc) { } unlock(&pc->task.mutex); if (notify) -notify_poller(pc->task.proactor); +notify_poller(p); } static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) { @@ -1460,21 +1461,24 @@ static void pconnection_tick(pconnection_t *pc) { } void pn_connection_wake(pn_connection_t* c) { - bool notify = false; pconnection_t *pc = get_pconnection(c); if (pc) { +pn_proactor_t *p = pc->task.proactor; +bool notify = false; + lock(&pc->task.mutex); if (!pc->task.closing) { notify = pni_task_wake(&pc->task); } unlock(&pc->task.mutex); +if (notify) notify_poller(p); } - if (notify) notify_poller(pc->task.proactor); } void pn_proactor_release_connection(pn_connection_t *c) { bool notify = false; pconnection_t *pc = get_pconnection(c); + pn_proactor_t *p = pc->task.proactor; if (pc) { set_pconnection(c, NULL); lock(&pc->task.mutex); @@ -1483,7 +1487,7 @@ void pn_proactor_release_connection(pn_connection_t *c) { notify = schedule(&pc->task); unlock(&pc->task.mutex); } - if (notify) notify_poller(pc->task.proactor); + if (notify) notify_poller(p); } // @@ -1666,13 +1670,14 @@ static void listener_begin_close(pn_listener_t* l) { void pn_listener_close(pn_listener_t* l) { bool notify = false; + pn_proactor_t *p = l->task.proactor; lock(&l->task.mutex); if (l->task.proactor && !l->task.closing) { listener_begin_close(l); notify = schedule(&l->task); } unlock(&l->task.mutex); - if (notify) notify_poller(l->task.proactor); + if (notify) notify_poller(p); } static void listener_forced_shutdown(pn_listener_t *l) { @@ -1859,8 +1864,9 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) { void pn_listener_accept2(pn_listener_t *l, pn_connection_t *c, pn_transport_t *t) { pconnection_t *pc = (pconnection_t*) malloc(sizeof(pconnection_t)); - assert(pc); // TODO: memory safety - const char *err = pconnection_setup(pc, pn_listener_proactor(l), c, t, true, "", 0); + pn_proactor_t *p = pn_listener_proactor(l); + assert(pc && p); // TODO: memory safety + const char *err = pconnection_setup(pc, p, c, t, true, "", 0); if (err) { PN_LOG_DEFAULT(PN_SUBSYSTEM_EVENT, PN_LEVEL_ERROR, "pn_listener_accept failure: %s", err); return; @@ -1895,7 +1901,7 @@ void pn_listener_accept2(pn_listener_t *l, pn_connection_t *c, pn_transport_t *t notify = schedule(&l->task); unlock(&pc->task.mutex); unlock(&l->task.mutex); - if (notify) notify_poller(l->task.proactor); + if (notify) notify_poller(p); } diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index 89e315f..a025698 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -211,8 +211,9 @@ void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const ch void pn_listener_raw_accept(pn_listener_t *l, pn_raw_connection_t *rc) { assert(rc); + pn_proactor_t *p = pn_listener_proactor(l); praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection); - praw_connection_init(prc, pn_listener_proactor(l), rc); + praw_connection_init(prc, p, rc); // TODO: fuller sanity check on input args int err = 0; @@ -246,7 +247,7 @@ void pn_listener_raw_accept(pn_listener_t *l, pn_raw_connection_t *rc) { } unlock(&prc->task.mutex); unlock(
[qpid-proton] branch main updated: PROTON-2483: epoll proactor TSAN race - ensure task locks not held when calling pni_timer_set()
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new c6db2b0 PROTON-2483: epoll proactor TSAN race - ensure task locks not held when calling pni_timer_set() c6db2b0 is described below commit c6db2b089c6c40c46b5e37a701164bbce3f5ebf9 Author: Cliff Jansen AuthorDate: Wed Jan 12 13:00:14 2022 -0800 PROTON-2483: epoll proactor TSAN race - ensure task locks not held when calling pni_timer_set() --- c/src/proactor/epoll-internal.h | 3 ++- c/src/proactor/epoll.c | 36 c/src/proactor/epoll_timer.c| 14 +- 3 files changed, 39 insertions(+), 14 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 984724f..8db12a1 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -157,6 +157,7 @@ struct pn_proactor_t { bool need_timeout; bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */ bool timeout_processed; /* timeout event dispatched in the most recent event batch */ + pmutex timeout_mutex; int task_count; // ready list subsystem @@ -391,7 +392,7 @@ void pni_raw_connection_done(praw_connection_t *rc); pni_timer_t *pni_timer(pni_timer_manager_t *tm, pconnection_t *c); void pni_timer_free(pni_timer_t *timer); -void pni_timer_set(pni_timer_t *timer, uint64_t deadline); +bool pni_timer_set(pni_timer_t *timer, uint64_t deadline); bool pni_timer_manager_init(pni_timer_manager_t *tm); void pni_timer_manager_finalize(pni_timer_manager_t *tm); pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeout, bool sched_ready); diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 3784d05..d2f394a 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -835,8 +835,8 @@ static void pconnection_final_free(pconnection_t *pc) { pmutex_finalize(&pc->rearm_mutex); pn_condition_free(pc->disconnect_condition); pn_connection_driver_destroy(&pc->driver); - task_finalize(&pc->task); pni_timer_free(pc->timer); + task_finalize(&pc->task); free(pc); } @@ -1453,7 +1453,8 @@ static void pconnection_tick(pconnection_t *pc) { lock(&pc->task.mutex); pc->expected_timeout = next; unlock(&pc->task.mutex); - pni_timer_set(pc->timer, next); + if (pni_timer_set(pc->timer, next)) +notify_poller(pc->task.proactor); } } } @@ -1957,6 +1958,7 @@ pn_proactor_t *pn_proactor() { pmutex_init(&p->eventfd_mutex); pmutex_init(&p->sched_mutex); pmutex_init(&p->tslot_mutex); + pmutex_init(&p->timeout_mutex); if ((p->epollfd = epoll_create(1)) >= 0) { if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) { @@ -1979,6 +1981,7 @@ pn_proactor_t *pn_proactor() { if (p->eventfd >= 0) close(p->eventfd); if (p->interruptfd >= 0) close(p->interruptfd); pni_timer_manager_finalize(&p->timer_manager); + pmutex_finalize(&p->timeout_mutex); pmutex_finalize(&p->tslot_mutex); pmutex_finalize(&p->sched_mutex); pmutex_finalize(&p->eventfd_mutex); @@ -2014,6 +2017,7 @@ void pn_proactor_free(pn_proactor_t *p) { pni_timer_manager_finalize(&p->timer_manager); pn_collector_free(p->collector); + pmutex_finalize(&p->timeout_mutex); pmutex_finalize(&p->tslot_mutex); pmutex_finalize(&p->sched_mutex); pmutex_finalize(&p->eventfd_mutex); @@ -2768,26 +2772,42 @@ void pn_proactor_interrupt(pn_proactor_t *p) { void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) { bool notify = false; + lock(&p->timeout_mutex); + lock(&p->task.mutex); p->timeout_set = true; if (t == 0) { -pni_timer_set(p->timer, 0); p->need_timeout = true; -notify = schedule(&p->task); - } else { -pni_timer_set(p->timer, t + pn_proactor_now_64()); +if (schedule(&p->task)) + notify = true; } unlock(&p->task.mutex); + + if (t == 0) { +if (pni_timer_set(p->timer, 0)) + notify = true; + } else { +if (pni_timer_set(p->timer, t + pn_proactor_now_64())) + notify = true; + } + unlock(&p->timeout_mutex); if (notify) notify_poller(p); } void pn_proactor_cancel_timeout(pn_proactor_t *p) { + bool notify = false; + lock(&p->timeout_mutex); + lock(&p->task.mutex); p->timeout_set = false; p->need_timeout = false; - pni_timer_set(p->timer, 0); - bool notify = schedule_if_inactive(p); + if (schedule_if_inactive(p)) +notify = true; unlock(&p->task.mutex); + + if (pni_timer_set(p->time
[qpid-proton] branch main updated: PROTON-2436: epoll raw connection TSAN race fix
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 3701f04 PROTON-2436: epoll raw connection TSAN race fix 3701f04 is described below commit 3701f04534cbfa457e1724e0177ca8df234de565 Author: Cliff Jansen AuthorDate: Tue Jan 11 07:01:34 2022 -0800 PROTON-2436: epoll raw connection TSAN race fix --- c/src/proactor/epoll-internal.h | 3 ++- c/src/proactor/epoll.c| 5 - c/src/proactor/epoll_raw_connection.c | 8 ++-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index f0afc9e..984724f 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -381,7 +381,8 @@ void configure_socket(int sock); accepted_t *listener_accepted_next(pn_listener_t *listener); task_t *pni_psocket_raw_task(psocket_t *ps); -pn_event_batch_t *pni_raw_connection_process(task_t *t, bool sched_ready); +psocket_t *pni_task_raw_psocket(task_t *t); +pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool sched_ready); typedef struct praw_connection_t praw_connection_t; task_t *pni_raw_connection_task(praw_connection_t *rc); diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 61e4dbd..adce8cd 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -2221,8 +2221,11 @@ static pn_event_batch_t *process(task_t *tsk) { break; } case RAW_CONNECTION: { +psocket_t *ps = pni_task_raw_psocket(tsk); +uint32_t events = ps->sched_io_events; +if (events) ps->sched_io_events = 0; unlock(&p->sched_mutex); -batch = pni_raw_connection_process(tsk, tsk_ready); +batch = pni_raw_connection_process(tsk, events, tsk_ready); break; } case TIMER_MANAGER: { diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index 94d6460..89e315f 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -320,6 +320,10 @@ task_t *pni_psocket_raw_task(psocket_t* ps) { return &containerof(ps, praw_connection_t, psocket)->task; } +psocket_t *pni_task_raw_psocket(task_t *t) { + return &containerof(t, praw_connection_t, task)->psocket; +} + praw_connection_t *pni_batch_raw_connection(pn_event_batch_t *batch) { return (batch->next_event == pni_raw_batch_next) ? containerof(batch, praw_connection_t, batch) : NULL; @@ -349,10 +353,10 @@ static void set_error(pn_raw_connection_t *conn, const char *msg, int err) { psocket_error(containerof(conn, praw_connection_t, raw_connection), err, msg); } -pn_event_batch_t *pni_raw_connection_process(task_t *t, bool sched_ready) { +pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool sched_ready) { praw_connection_t *rc = containerof(t, praw_connection_t, task); lock(&rc->task.mutex); - int events = rc->psocket.sched_io_events; + int events = io_events; int fd = rc->psocket.epoll_io.fd; if (!rc->connected) { if (events & (EPOLLHUP | EPOLLERR)) { - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2362: epoll proactor - handle connection wake and listener close before task setup complete
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new f734e9e PROTON-2362: epoll proactor - handle connection wake and listener close before task setup complete f734e9e is described below commit f734e9ea41ff39d23a489090c04851f2ccd3e187 Author: Cliff Jansen AuthorDate: Mon Dec 20 09:29:47 2021 -0800 PROTON-2362: epoll proactor - handle connection wake and listener close before task setup complete --- c/src/proactor/epoll.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 2f491ca..6ad043f 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -965,7 +965,7 @@ static inline bool pconnection_wclosed(pconnection_t *pc) { close/shutdown. Let read()/write() return 0 or -1 to trigger cleanup logic. */ static int pconnection_rearm_check(pconnection_t *pc) { - if (pconnection_rclosed(pc) && pconnection_wclosed(pc)) { + if ((pconnection_rclosed(pc) && pconnection_wclosed(pc)) || pc->psocket.epoll_io.fd == -1) { return 0; } uint32_t wanted_now = (pc->read_blocked && !pconnection_rclosed(pc)) ? EPOLLIN : 0; @@ -1652,7 +1652,7 @@ static void listener_begin_close(pn_listener_t* l) { void pn_listener_close(pn_listener_t* l) { bool notify = false; lock(&l->task.mutex); - if (!l->task.closing) { + if (l->task.proactor && !l->task.closing) { listener_begin_close(l); notify = schedule(&l->task); } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2519: epoll proactor - clarify unpolled work and move assertion outside loop repeated with EINTR
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new c8bcd4a PROTON-2519: epoll proactor - clarify unpolled work and move assertion outside loop repeated with EINTR c8bcd4a is described below commit c8bcd4a0991607851cea2c611c550a59db1f47b6 Author: Cliff Jansen AuthorDate: Fri Dec 17 12:42:58 2021 -0800 PROTON-2519: epoll proactor - clarify unpolled work and move assertion outside loop repeated with EINTR --- c/src/proactor/epoll.c | 11 +++ 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 6207267..2f491ca 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -2471,9 +2471,11 @@ static pn_event_batch_t *next_event_batch(pn_proactor_t* p, bool can_block) { // Call with sched lock. Return true if !can_block and no new events to process. static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block) { // As poller with lots to do, be mindful of hogging the sched lock. Release when making kernel calls. + assert(!p->resched_cutoff); + assert(!p->sched_ready_first); int n_events; task_t *tsk; - assert(!p->resched_cutoff); + bool unpolled_work = false; while (true) { assert(p->n_runnables == 0); @@ -2484,13 +2486,15 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block p->last_earmark = NULL; bool unfinished_earmarks = p->earmark_count > 0; -bool epoll_immediate = p->resched_first || unfinished_earmarks || !can_block; -assert(!p->sched_ready_first); +if (unfinished_earmarks || p->resched_first) + unpolled_work = true; +bool epoll_immediate = unpolled_work || !can_block; // Determine if notify_poller() can be avoided. if (!epoll_immediate) { lock(&p->eventfd_mutex); if (p->ready_list_first) { +unpolled_work = true; epoll_immediate = true; } else { // Poller may sleep. Enable eventfd wakeup. @@ -2508,7 +2512,6 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block lock(&p->sched_mutex); p->poller_suspended = false; -bool unpolled_work = false; if (p->resched_first) { // Defer future resched tasks until next do_epoll() p->resched_cutoff = p->resched_last; - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 03/03: PROTON-2362: epoll proactor: remove ready list sneak peek optimization causing early task deletion
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 996b9b114fdb4682c8114ad700705446ff3a24fd Author: Cliff Jansen AuthorDate: Wed Dec 15 17:54:51 2021 -0800 PROTON-2362: epoll proactor: remove ready list sneak peek optimization causing early task deletion --- c/src/proactor/epoll.c | 68 +++--- 1 file changed, 9 insertions(+), 59 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 6a5beb4..6207267 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -991,35 +991,6 @@ static inline void pconnection_rearm(pconnection_t *pc, int wanted_now) { // Return immediately. pc may have just been freed by another thread. } -/* Only call when context switch is imminent. Sched lock is highly contested. */ -// Call with both task and sched locks. -static bool pconnection_sched_sync(pconnection_t *pc) { - uint32_t sync_events = 0; - uint32_t sync_args = pc->tick_pending << 1; - if (pc->psocket.sched_io_events) { -pc->new_events = pc->psocket.sched_io_events; -pc->psocket.sched_io_events = 0; -pc->current_arm = 0; // or outside lock? -sync_events = pc->new_events; - } - if (pc->task.sched_ready) { -pc->task.sched_ready = false; -schedule_done(&pc->task); -sync_args |= 1; - } - pc->task.sched_pending = false; - - if (sync_args || sync_events) { -// Only replace if poller has found new work for us. -pc->process_args = (1 << 2) | sync_args; -pc->process_events = sync_events; - } - - // Indicate if there are free proactor threads - pn_proactor_t *p = pc->task.proactor; - return p->poller_suspended || p->suspend_list_head; -} - /* Call with task lock and having done a write_flush() to "know" the value of wbuf_remaining */ static inline bool pconnection_work_pending(pconnection_t *pc) { if (pc->new_events || pni_task_wake_pending(&pc->task) || pc->tick_pending || pc->queued_disconnect) @@ -1038,14 +1009,9 @@ static void pconnection_done(pconnection_t *pc) { bool self_sched = false; lock(&pc->task.mutex); pc->task.working = false; // So we can schedule() ourself if necessary. We remain the de facto - // working task instance while the lock is held. Need sched_sync too to drain - // a possible stale sched_ready. + // working task instance while the lock is held. pc->hog_count = 0; bool has_event = pconnection_has_event(pc); - // Do as little as possible while holding the sched lock - lock(&p->sched_mutex); - pconnection_sched_sync(pc); - unlock(&p->sched_mutex); if (has_event || pconnection_work_pending(pc)) { self_sched = true; @@ -1295,9 +1261,6 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, } // Never stop working while work remains. hog_count exception to this rule is elsewhere. - lock(&pc->task.proactor->sched_mutex); - bool workers_free = pconnection_sched_sync(pc); - unlock(&pc->task.proactor->sched_mutex); if (pconnection_work_pending(pc)) { goto retry; // TODO: get rid of goto without adding more locking @@ -1314,8 +1277,13 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, } } + // check one last time for new io before context switch + bool workers_free; + pn_proactor_t *p = pc->task.proactor; + lock(&p->sched_mutex); + workers_free = (p->suspend_list_head != NULL); + unlock(&p->sched_mutex); if (workers_free && !pc->task.closing && !pc->io_doublecheck) { -// check one last time for new io before context switch pc->io_doublecheck = true; pc->read_blocked = false; pc->write_blocked = false; @@ -1831,25 +1799,7 @@ static void listener_done(pn_listener_t *l) { bool notify = false; l->task.working = false; - lock(&p->sched_mutex); - int n_events = 0; - for (size_t i = 0; i < l->acceptors_size; i++) { -psocket_t *ps = &l->acceptors[i].psocket; -if (ps->sched_io_events) { - ps->working_io_events = ps->sched_io_events; - ps->sched_io_events = 0; -} -if (ps->working_io_events) - n_events++; - } - - if (l->task.sched_ready) { -l->task.sched_ready = false; -schedule_done(&l->task); - } - unlock(&p->sched_mutex); - - if (!n_events && listener_can_free(l)) { + if (listener_can_free(l)) { unlock(&l->task.mutex); pn_listener_free(l); lock(&p->sched_mutex); @@ -1859,7 +1809,7 @@ static void listener
[qpid-proton] 01/03: PROTON-2362: epoll proactor: prevent opportunistic warm task from running twice
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 4f8ac48c48e66d1650308bc1be694dae3215fc9a Author: Cliff Jansen AuthorDate: Wed Dec 15 17:43:30 2021 -0800 PROTON-2362: epoll proactor: prevent opportunistic warm task from running twice --- c/src/proactor/epoll-internal.h | 4 ++-- c/src/proactor/epoll.c | 46 +++-- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 8e9e1b2..155277b 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -102,7 +102,7 @@ typedef struct task_t { tslot_t *prev_runner; bool sched_ready; bool sched_pending; /* If true, one or more unseen epoll or other events to process() */ - bool runnable ; /* on one of the runnable lists */ + int runnables_idx;/* 0 means unset, idx-1 is array position */ } task_t; typedef enum { @@ -198,7 +198,7 @@ struct pn_proactor_t { task_t *resched_cutoff; // last resched task of current poller work snapshot. TODO: superseded by polled_resched_count? task_t *resched_next; unsigned int resched_count; - unsigned int polled_resched_count; + unsigned int polled_resched_count; pmutex tslot_mutex; int earmark_count; bool earmark_drain; diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index ea2e25a..2363f48 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -47,6 +47,8 @@ non-proactor-task -> proactor-task tslot -> sched + TODO: doc new work: warm (assigned), earmarked (assigned), runnables (unordered), sched_ready + list (ordered), resched list (ordered). TODO: document role of sched_pending and how sched_XXX (i.e. sched_interrupt) transitions from "private to the scheduler" to "visible to the task". TODO: document task.working duration can be long: from xxx_process() to xxx_done() or null batch. @@ -442,7 +444,7 @@ static void assign_thread(tslot_t *ts, task_t *tsk) { assert(!tsk->runner); tsk->runner = ts; tsk->prev_runner = NULL; - tsk->runnable = false; + tsk->runnables_idx = 0; ts->task = tsk; ts->prev_task = NULL; } @@ -539,10 +541,9 @@ static void remove_earmark(tslot_t *ts) { static void make_runnable(task_t *tsk) { pn_proactor_t *p = tsk->proactor; assert(p->n_runnables <= p->runnables_capacity); - assert(!tsk->runnable); + assert(!tsk->runnables_idx); if (tsk->runner) return; - tsk->runnable = true; // Track it as normal or warm or earmarked if (pni_warm_sched) { tslot_t *ts = tsk->prev_runner; @@ -552,8 +553,11 @@ static void make_runnable(task_t *tsk) { p->warm_runnables[p->n_warm_runnables++] = tsk; assign_thread(ts, tsk); } -else - p->runnables[p->n_runnables++] = tsk; +else { + p->runnables[p->n_runnables] = tsk; + tsk->runnables_idx = p->n_runnables + 1; // off by one accounting + p->n_runnables++; +} return; } if (ts->state == UNUSED && !p->earmark_drain) { @@ -563,7 +567,9 @@ static void make_runnable(task_t *tsk) { } } } - p->runnables[p->n_runnables++] = tsk; + p->runnables[p->n_runnables] = tsk; + tsk->runnables_idx = p->n_runnables + 1; // off by one accounting + p->n_runnables++; } @@ -2339,7 +2345,7 @@ static task_t *post_event(pn_proactor_t *p, struct epoll_event *evp) { break; } } - if (tsk && !tsk->runnable && !tsk->runner && !on_sched_ready_list(tsk, p)) + if (tsk && !tsk->runnables_idx && !tsk->runner && !on_sched_ready_list(tsk, p)) return tsk; return NULL; } @@ -2348,7 +2354,7 @@ static task_t *post_event(pn_proactor_t *p, struct epoll_event *evp) { static inline task_t *post_ready(pn_proactor_t *p, task_t *tsk) { tsk->sched_ready = true; tsk->sched_pending = true; - if (!tsk->runnable && !tsk->runner) + if (!tsk->runnables_idx && !tsk->runner) return tsk; return NULL; } @@ -2386,33 +2392,38 @@ static task_t *next_runnable(pn_proactor_t *p, tslot_t *ts) { return ts->task; } - // warm pairing ? + // Take any runnables task if it results in a warm pairing. task_t *tsk = ts->prev_task; - if (tsk && (tsk->runnable)) { + if (tsk && (tsk->runnables_idx)) { +// A task can self delete, so don't allow it to run twice. +task_t **runnables_slot = &p->runnables[tsk->runnables_idx -1]; +assert(*runnables_slot == tsk); +*runnables_slot = NULL; assign_thread(ts, tsk); return tsk;
[qpid-proton] 02/03: PROTON-2362: epoll proactor: fix locking in listener overflow processing
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit cfd300e7d41ddfd39ae3e130fde8d1d654cfd099 Author: Cliff Jansen AuthorDate: Wed Dec 15 17:48:20 2021 -0800 PROTON-2362: epoll proactor: fix locking in listener overflow processing --- c/src/proactor/epoll-internal.h | 1 + c/src/proactor/epoll.c | 17 - 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 155277b..f0afc9e 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -302,6 +302,7 @@ struct pn_listener_t { size_t pending_count; /* number of pending accepted connections */ size_t backlog; /* size of pending accepted array */ bool close_dispatched; + int overflow_count; uint32_t sched_io_events; }; diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 2363f48..6a5beb4 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -715,6 +715,7 @@ static acceptor_t *acceptor_list_next(acceptor_t **start) { // Add an overflowing acceptor to the overflow list. Called with listener task lock held. static void acceptor_set_overflow(acceptor_t *a) { a->overflowed = true; + a->listener->overflow_count++; pn_proactor_t *p = a->listener->task.proactor; lock(&p->overflow_mutex); acceptor_list_append(&p->overflow, a); @@ -740,6 +741,7 @@ static void proactor_rearm_overflow(pn_proactor_t *p) { assert(!a->armed); assert(a->overflowed); a->overflowed = false; +l->overflow_count++; if (rearming) { rearm(p, &a->psocket.epoll_io); a->armed = true; @@ -1617,7 +1619,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in // call with lock held and task.working false static inline bool listener_can_free(pn_listener_t *l) { - return l->task.closing && l->close_dispatched && !l->task.ready && !l->active_count; + return l->task.closing && l->close_dispatched && !l->task.ready && !l->active_count && !l->overflow_count; } static inline void listener_final_free(pn_listener_t *l) { @@ -1675,10 +1677,6 @@ static void listener_begin_close(pn_listener_t* l) { } assert(!l->pending_count); -unlock(&l->task.mutex); -/* Remove all acceptors from the overflow list. closing flag prevents re-insertion.*/ -proactor_rearm_overflow(pn_listener_proactor(l)); -lock(&l->task.mutex); pn_collector_put(l->collector, PN_CLASSCLASS(pn_listener), l, PN_LISTENER_CLOSE); } } @@ -1799,8 +1797,17 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) { static void listener_done(pn_listener_t *l) { pn_proactor_t *p = l->task.proactor; tslot_t *ts = l->task.runner; + lock(&l->task.mutex); + if (l->close_dispatched && l->overflow_count) { +unlock(&l->task.mutex); +/* Remove all acceptors from the overflow list. closing flag prevents re-insertion.*/ +proactor_rearm_overflow(pn_listener_proactor(l)); +lock(&l->task.mutex); +assert(l->overflow_count == 0); + } + // Just in case the app didn't accept all the pending accepts // Shuffle the list back to start at 0 memmove(&l->pending_accepteds[0], &l->pending_accepteds[l->pending_first], l->pending_count * sizeof(accepted_t)); - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated (89082f1 -> 996b9b1)
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git. from 89082f1 PROTON-2476: Restore truncation behaviour for transfer frame traces new 4f8ac48 PROTON-2362: epoll proactor: prevent opportunistic warm task from running twice new cfd300e PROTON-2362: epoll proactor: fix locking in listener overflow processing new 996b9b1 PROTON-2362: epoll proactor: remove ready list sneak peek optimization causing early task deletion The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: c/src/proactor/epoll-internal.h | 5 +- c/src/proactor/epoll.c | 131 2 files changed, 54 insertions(+), 82 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 03/03: PROTON-2362: epoll proactor fix for tsan_tr2.txt. Make scheduling and re-scheduling completely separate.
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit d31f829baafd3a45208c85e7d791452b4e997235 Author: Cliff Jansen AuthorDate: Mon Nov 22 10:25:23 2021 -0800 PROTON-2362: epoll proactor fix for tsan_tr2.txt. Make scheduling and re-scheduling completely separate. --- c/src/proactor/epoll-internal.h | 14 ++- c/src/proactor/epoll.c | 226 +++- c/tests/proactor_test.cpp | 30 -- 3 files changed, 168 insertions(+), 102 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index f0f57af..8e9e1b2 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -88,8 +88,9 @@ typedef struct task_t { bool working; bool ready;// ready to run and on ready list. Poller notified by eventfd. bool waking; - bool on_ready_list;// todo: protected by eventfd_mutex or sched mutex? needed? + unsigned int ready_generation; struct task_t *ready_next; // ready list, guarded by proactor eventfd_mutex + struct task_t *resched_next; // resched list, guarded by sched mutex bool closing; // Next 4 are protected by the proactor mutex struct task_t* next; /* Protected by proactor.mutex */ @@ -164,6 +165,8 @@ struct pn_proactor_t { bool ready_list_active; task_t *ready_list_first; task_t *ready_list_last; + unsigned int ready_list_count; + unsigned int ready_list_generation; // protected by both eventfd_mutex and a single p->poller instance // Interrupts have a dedicated eventfd because they must be async-signal safe. int interruptfd; // If the process runs out of file descriptors, disarm listening sockets temporarily and save them here. @@ -188,7 +191,14 @@ struct pn_proactor_t { tslot_t *last_earmark; task_t *sched_ready_first; task_t *sched_ready_last; - task_t *sched_ready_current; + task_t *sched_ready_current; // TODO: remove or use for sceduling priority or fairness + unsigned int sched_ready_count; + task_t *resched_first; + task_t *resched_last; + task_t *resched_cutoff; // last resched task of current poller work snapshot. TODO: superseded by polled_resched_count? + task_t *resched_next; + unsigned int resched_count; + unsigned int polled_resched_count; pmutex tslot_mutex; int earmark_count; bool earmark_drain; diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 31edfbe..ea2e25a 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -260,28 +260,31 @@ static void pop_ready_task(task_t *tsk) { // !ready .. schedule() .. on ready_list .. on sched_ready_list .. working task .. !sched_ready && !ready // // Intervening locks at each transition ensures ready_next has memory coherence throughout the ready task scheduling cycle. + // TODO: sched_ready list changed to sequential processing. Review need for sched_ready_current. pn_proactor_t *p = tsk->proactor; if (tsk == p->sched_ready_current) p->sched_ready_current = tsk->ready_next; - if (tsk == p->sched_ready_first) { -// normal code path -if (tsk == p->sched_ready_last) { - p->sched_ready_first = p->sched_ready_last = NULL; -} else { - p->sched_ready_first = tsk->ready_next; -} -if (!p->sched_ready_first) - p->sched_ready_last = NULL; + assert (tsk == p->sched_ready_first); + assert (p->sched_ready_count); + p->sched_ready_count--; + if (tsk == p->sched_ready_last) { +p->sched_ready_first = p->sched_ready_last = NULL; } else { -// tsk is not first in a multi-element list -task_t *prev = NULL; -for (task_t *i = p->sched_ready_first; i != tsk; i = i->ready_next) - prev = i; -prev->ready_next = tsk->ready_next; -if (tsk == p->sched_ready_last) - p->sched_ready_last = prev; +p->sched_ready_first = tsk->ready_next; } - tsk->on_ready_list = false; + if (!p->sched_ready_first) { +p->sched_ready_last = NULL; +assert(p->sched_ready_count == 0); + } +} + +// Call only as the poller task that has already called schedule_ready_list() and already +// incremented p->ready_list_generation. All list elements before sched_ready_last have +// correct generation from mutex barrier and cannot have tsk->ready_generation set to a +// new generation until after the poller task releases the sched lock and allows tsk to +// run again. +inline static bool on_sched_ready_list(task_t *tsk, pn_proactor_t *p) { + return tsk->ready_generation && (tsk->ready_generation != p->ready_list_generation); } // part1: call with tsk->owner lock held, return true if notify_poller required by caller. @@ -294,8 +297,10 @@ bool schedule(task_t *tsk) { tsk->ready = true;
[qpid-proton] 01/03: PROTON-2362: epoll proactor fix for tsan_tr1.txt. Check earmark edge case at same time and with same lock as for unassign_thread.
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 239a39eb2d04f0588081975a4f173bb5f121d1fa Author: Cliff Jansen AuthorDate: Sun Nov 21 12:37:32 2021 -0800 PROTON-2362: epoll proactor fix for tsan_tr1.txt. Check earmark edge case at same time and with same lock as for unassign_thread. --- c/src/proactor/epoll-internal.h | 3 +- c/src/proactor/epoll.c| 114 ++ c/src/proactor/epoll_raw_connection.c | 4 +- 3 files changed, 65 insertions(+), 56 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 5f7ed9b..f0f57af 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -348,7 +348,7 @@ int pclosefd(pn_proactor_t *p, int fd); void proactor_add(task_t *tsk); bool proactor_remove(task_t *tsk); -bool unassign_thread(tslot_t *ts, tslot_state new_state); +bool unassign_thread(pn_proactor_t *p, tslot_t *ts, tslot_state new_state, tslot_t **resume_thread); void task_init(task_t *tsk, task_type_t t, pn_proactor_t *p); static void task_finalize(task_t* tsk) { @@ -385,6 +385,7 @@ void pni_timer_manager_finalize(pni_timer_manager_t *tm); pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeout, bool sched_ready); void pni_pconnection_timeout(pconnection_t *pc); void pni_proactor_timeout(pn_proactor_t *p); +void pni_resume(pn_proactor_t *p, tslot_t *ts); // Generic wake primitives for a task. diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 1adaeb3..c481274 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -406,7 +406,26 @@ static void resume(pn_proactor_t *p, tslot_t *ts) { pthread_cond_signal(&ts->cond); } unlock(&ts->mutex); +} + +// Call with no lock +void pni_resume(pn_proactor_t *p, tslot_t *ts) { + resume(p, ts); +} + +// Call with shed_lock held +// Caller must resume() return value if not null +static tslot_t *resume_one_thread(pn_proactor_t *p) { + // If pn_proactor_get has an early return, we need to resume one suspended thread (if any) + // to be the new poller. + tslot_t *ts = p->suspend_list_head; + if (ts) { +LL_REMOVE(p, suspend_list, ts); +p->suspend_list_count--; +ts->state = PROCESSING; + } + return ts; } // Call with sched lock @@ -445,11 +464,14 @@ static bool reschedule(task_t *tsk) { return notify; } -// Call with sched lock -bool unassign_thread(tslot_t *ts, tslot_state new_state) { +// Call with sched lock. +// If true returned, caller must call notify_poller() after releasing the lock. +// If resume_thread is set, the caller must call resume() after releasing the lock. +bool unassign_thread(pn_proactor_t *p, tslot_t *ts, tslot_state new_state, tslot_t **resume_thread) { task_t *tsk = ts->task; bool notify = false; bool deleting = (ts->state == DELETING); + *resume_thread = NULL; ts->task = NULL; ts->state = new_state; if (tsk) { @@ -460,7 +482,6 @@ bool unassign_thread(tslot_t *ts, tslot_state new_state) { // Check if unseen events or schedule() calls occurred while task was working. if (tsk && !deleting) { -pn_proactor_t *p = tsk->proactor; ts->prev_task = tsk; if (tsk->sched_pending) { // Make sure the task is already scheduled or put it on the ready list @@ -482,6 +503,19 @@ bool unassign_thread(tslot_t *ts, tslot_state new_state) { } } } + + // Earmark drain accounting. + if (ts->earmark_override) { +// This thread "stole" the task previously assigned to thread ts->earmark_override. +if (ts->earmark_override->generation == ts->earmark_override_gen) { + // Other (overridden) thread not seen since this thread completed the pending work on the task. + // Thread is perhaps gone forever, which may leave us short of a poller thread and hanging. + // Find a thread to resume if available. Worst case is a spurious resume/suspend by an idle thread. + *resume_thread = resume_one_thread(p); +} +ts->earmark_override = NULL; + } + return notify; } @@ -1014,10 +1048,11 @@ static void pconnection_done(pconnection_t *pc) { pconnection_cleanup(pc); // pc may be undefined now lock(&p->sched_mutex); - notify = unassign_thread(ts, UNUSED); + tslot_t *resume_thread; + notify = unassign_thread(p, ts, UNUSED, &resume_thread); unlock(&p->sched_mutex); - if (notify) -notify_poller(p); + if (notify) notify_poller(p); + if (resume_thread) resume(p, resume_thread); return; } } @@ -1029,10 +1064,12 @@ static void pconnection_done(pconnection_t *pc) { if (wanted) pconnection_rearm(pc, wanted); // May free pc on another thre
[qpid-proton] 02/03: PROTON-2362: epoll proactor fix for tsan_tr3.txt. Use safe local variable not subject to deletion in another thread.
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 586d94464d2ad6fc69e04aaa42f0df54a788561c Author: Cliff Jansen AuthorDate: Sun Nov 21 13:09:25 2021 -0800 PROTON-2362: epoll proactor fix for tsan_tr3.txt. Use safe local variable not subject to deletion in another thread. --- c/src/proactor/epoll.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index c481274..31edfbe 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -1451,7 +1451,7 @@ void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t * } /* We need to issue INACTIVE on immediate failure */ unlock(&pc->task.mutex); - if (notify) notify_poller(pc->task.proactor); + if (notify) notify_poller(p); } static void pconnection_tick(pconnection_t *pc) { - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated (8b816f4 -> d31f829)
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git. from 8b816f4 NO-JIRA upgrade from Catch v2.13.6 to v2.13.7 (#342) new 239a39e PROTON-2362: epoll proactor fix for tsan_tr1.txt. Check earmark edge case at same time and with same lock as for unassign_thread. new 586d944 PROTON-2362: epoll proactor fix for tsan_tr3.txt. Use safe local variable not subject to deletion in another thread. new d31f829 PROTON-2362: epoll proactor fix for tsan_tr2.txt. Make scheduling and re-scheduling completely separate. The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: c/src/proactor/epoll-internal.h | 17 +- c/src/proactor/epoll.c| 348 +++--- c/src/proactor/epoll_raw_connection.c | 4 +- c/tests/proactor_test.cpp | 30 ++- 4 files changed, 237 insertions(+), 162 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2422: fix epoll timer ordering bug
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 63abb68 PROTON-2422: fix epoll timer ordering bug 63abb68 is described below commit 63abb683de7dcc5ded69c2e095fae68e14e9d463 Author: Cliff Jansen AuthorDate: Fri Oct 29 00:35:14 2021 -0700 PROTON-2422: fix epoll timer ordering bug --- c/src/proactor/epoll_timer.c | 54 +++- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/c/src/proactor/epoll_timer.c b/c/src/proactor/epoll_timer.c index 26de7d2..c154881 100644 --- a/c/src/proactor/epoll_timer.c +++ b/c/src/proactor/epoll_timer.c @@ -73,36 +73,48 @@ static void timerfd_drain(int fd) { // Struct to manage the ordering of timers on the heap ordered list and manage the lifecycle if // the parent timer is self-deleting. +// This needs a Proton class definition to provide a custom compare() function. It otherwise +// remains private to this file and opts out of the general object machinery. +// This works because the pn_list class is already meant to work with non-Proton objects using a +// different compare operator (pn_void_compare). typedef struct timer_deadline_t { uint64_t list_deadline; // Heap ordering deadline. Must not change while on list. pni_timer_t *timer; // Parent timer. NULL means orphaned and to be deleted. bool resequenced;// An out-of-order connection timeout caught and handled. } timer_deadline_t; -static void timer_deadline_initialize(void *object) { - timer_deadline_t *td = (timer_deadline_t *) object; - memset(td, 0 , sizeof(*td)); -} - -static void timer_deadline_finalize(void *object) { - assert(((timer_deadline_t *) object)->list_deadline == 0); -} +static const pn_class_t *timer_deadline_reify(void *p); +// The pn_list_t calls this to maintain its sorted heap. static intptr_t timer_deadline_compare(void *oa, void *ob) { timer_deadline_t *a = (timer_deadline_t *) oa; timer_deadline_t *b = (timer_deadline_t *) ob; return a->list_deadline - b->list_deadline; } -#define timer_deadline_inspect NULL -#define timer_deadline_hashcode NULL #define CID_timer_deadline CID_pn_void +#define timer_deadline_new NULL +#define timer_deadline_initialize NULL +#define timer_deadline_incref pn_void_incref +#define timer_deadline_decref pn_void_decref +#define timer_deadline_refcount pn_void_refcount +#define timer_deadline_finalize NULL +#define timer_deadline_free NULL +#define timer_deadline_hashcode NULL +#define timer_deadline_inspect NULL -static timer_deadline_t* pni_timer_deadline(void) { - static const pn_class_t timer_deadline_clazz = PN_CLASS(timer_deadline); - return (timer_deadline_t *) pn_class_new(&timer_deadline_clazz, sizeof(timer_deadline_t)); +static const pn_class_t timer_deadline_clazz = PN_METACLASS(timer_deadline); +static const pn_class_t *timer_deadline_reify(void *p) { return &timer_deadline_clazz; } + +static timer_deadline_t* timer_deadline_t_new(void) { + // Just the struct. Not a Proton class based object. + return (timer_deadline_t *) calloc(1, sizeof(timer_deadline_t)); } +static void timer_deadline_t_free(timer_deadline_t* td) { + assert(td->list_deadline == 0); + free(td); +} struct pni_timer_t { uint64_t deadline; @@ -119,7 +131,7 @@ pni_timer_t *pni_timer(pni_timer_manager_t *tm, pconnection_t *c) { if (!timer) return NULL; if (c) { // Connections are tracked on the timer_heap. Allocate the tracking struct. -td = pni_timer_deadline(); +td = timer_deadline_t_new(); if (!td) { free(timer); return NULL; @@ -147,14 +159,14 @@ void pni_timer_free(pni_timer_t *timer) { lock(&tm->deletion_mutex); if (td) { if (td->list_deadline) - td->timer = NULL; // Orphan. timer_manager does eventual pn_free() in process(). + td->timer = NULL; // Orphan. timer_manager does eventual timer_deadline_t_free() in process(). else can_free_td = true; } unlock(&tm->deletion_mutex); unlock(&tm->task.mutex); if (can_free_td) { -pn_free(td); +timer_deadline_t_free(td); } free(timer); } @@ -171,8 +183,8 @@ bool pni_timer_manager_init(pni_timer_manager_t *tm) { task_init(&tm->task, TIMER_MANAGER, p); pmutex_init(&tm->deletion_mutex); - // PN_VOID turns off ref counting for the elements in the list. - tm->timers_heap = pn_list(PN_VOID, 0); + // Heap sorted pn_list_t using timer_deadline_compare() to determine ordering. + tm->timers_heap = pn_list(&timer_deadline_clazz, 0); if (!tm->timers_heap) return false; tm->proactor_timer = pni_timer(tm, NULL); @@ -200,7 +212,7 @@ void pni_timer_manager_finalize(pni_timer_manager_t *tm) { for (si
[qpid-proton] branch main updated: PROTON-2403: libuv proactor update for newer libuv library versions
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 471d043 PROTON-2403: libuv proactor update for newer libuv library versions 471d043 is described below commit 471d043db648e9fc855ae2d9330fa5dfb4332dd3 Author: Cliff Jansen AuthorDate: Fri Jul 2 09:46:05 2021 -0700 PROTON-2403: libuv proactor update for newer libuv library versions --- c/src/proactor/libuv.c | 11 +-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/c/src/proactor/libuv.c b/c/src/proactor/libuv.c index 3e50685..b450919 100644 --- a/c/src/proactor/libuv.c +++ b/c/src/proactor/libuv.c @@ -177,6 +177,7 @@ typedef struct pconnection_t { uv_timer_t timer; uv_write_t write; size_t writing; /* size of pending write request, 0 if none pending */ + bool read_started; uv_shutdown_t shutdown; /* Locked for thread-safe access */ @@ -936,9 +937,12 @@ static bool leader_process_pconnection(pconnection_t *pc) { uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->tcp, NULL); } } - if (!err && rbuf.size > 0) { + if (!err && !pc->read_started && rbuf.size > 0) { what = "read"; err = uv_read_start((uv_stream_t*)&pc->tcp, alloc_read_buffer, on_read); +if (!err) { + pc->read_started = true; +} } if (err) { /* Some IO requests failed, generate the error events */ @@ -952,7 +956,10 @@ static bool leader_process_pconnection(pconnection_t *pc) { /* Detach a connection from the UV loop so it can be used safely by a worker */ void pconnection_detach(pconnection_t *pc) { if (pc->connected && !pc->writing) { /* Can't detach while a write is pending */ -uv_read_stop((uv_stream_t*)&pc->tcp); +if (pc->read_started) { + uv_read_stop((uv_stream_t*)&pc->tcp); + pc->read_started = false; +} uv_timer_stop(&pc->timer); } } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2397: fix to Windows implementation
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new a453a87 PROTON-2397: fix to Windows implementation a453a87 is described below commit a453a878ce944d0d5bd6e6630ef4f0098febecd6 Author: Cliff Jansen AuthorDate: Thu Jun 24 15:34:50 2021 -0700 PROTON-2397: fix to Windows implementation --- c/src/ssl/schannel.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/c/src/ssl/schannel.cpp b/c/src/ssl/schannel.cpp index d167678..5d2ff8b 100644 --- a/c/src/ssl/schannel.cpp +++ b/c/src/ssl/schannel.cpp @@ -487,7 +487,7 @@ pn_ssl_domain_t *pn_ssl_domain( pn_ssl_mode_t mode ) return NULL; } if (mode == PN_SSL_MODE_CLIENT) -domain->verify_mode == PN_SSL_VERIFY_PEER_NAME; +domain->verify_mode = PN_SSL_VERIFY_PEER_NAME; return domain; } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2397: test fixes and extra test
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 1151c36 PROTON-2397: test fixes and extra test 1151c36 is described below commit 1151c36ee65a43d380f12eeebead2d2fe73b76d7 Author: Cliff Jansen AuthorDate: Wed Jun 23 08:55:03 2021 -0700 PROTON-2397: test fixes and extra test --- python/tests/proton_tests/ssl.py | 51 +--- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/python/tests/proton_tests/ssl.py b/python/tests/proton_tests/ssl.py index a2efeb2..55facd0 100644 --- a/python/tests/proton_tests/ssl.py +++ b/python/tests/proton_tests/ssl.py @@ -100,8 +100,33 @@ class SslTest(common.Test): def test_anonymous_cipher(self): if os.name == "nt": raise Skipped("Windows SChannel lacks anonymous cipher support.") -""" By default, both the server and the client support anonymous -ciphers - they should connect without need for a certificate. +""" With no configuration at all, the client default +VERIFY_PEER_NAME should preclude anonymous cipher TLS negotiation. +""" +server = SslTest.SslTestConnection(self.server_domain, mode=Transport.SERVER) +client = SslTest.SslTestConnection(self.client_domain) + +# check that no SSL connection exists +assert not server.ssl.cipher_name() +assert not client.ssl.protocol_name() + +# client.transport.trace(Transport.TRACE_DRV) +# server.transport.trace(Transport.TRACE_DRV) + +client.connection.open() +server.connection.open() +self._pump(client, server) + +assert client.transport.closed +assert server.transport.closed +assert client.connection.state & Endpoint.REMOTE_UNINIT +assert server.connection.state & Endpoint.REMOTE_UNINIT + +def test_simple_anonymous(self): +if os.name == "nt": +raise Skipped("Windows SChannel lacks anonymous cipher support.") +""" The simplest SSL configuration using anonymous +ciphers. """ self.client_domain.set_peer_authentication(SSLDomain.ANONYMOUS_PEER) server = SslTest.SslTestConnection(self.server_domain, mode=Transport.SERVER) @@ -147,13 +172,33 @@ class SslTest(common.Test): server.connection.close() self._pump(client, server) -def test_server_certificate(self): +def test_server_certificate_fail(self): +""" Test that default configured clients cannot connect to a server that has +a certificate configured. +""" + self.server_domain.set_credentials(self._testpath("server-certificate.pem"), + self._testpath("server-private-key.pem"), + "server-password") +server = SslTest.SslTestConnection(self.server_domain, mode=Transport.SERVER) +client = SslTest.SslTestConnection(self.client_domain) + +client.connection.open() +server.connection.open() +self._pump(client, server) + +assert client.transport.closed +assert server.transport.closed +assert client.connection.state & Endpoint.REMOTE_UNINIT +assert server.connection.state & Endpoint.REMOTE_UNINIT + +def test_server_certificate_no_verify(self): """ Test that anonymous clients can still connect to a server that has a certificate configured. """ self.server_domain.set_credentials(self._testpath("server-certificate.pem"), self._testpath("server-private-key.pem"), "server-password") +self.client_domain.set_peer_authentication(SSLDomain.ANONYMOUS_PEER) server = SslTest.SslTestConnection(self.server_domain, mode=Transport.SERVER) client = SslTest.SslTestConnection(self.client_domain) - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated: PROTON-2397: make client TLS connection verification defaults consistent: verify peer certificate and name
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/main by this push: new 605bc58 PROTON-2397: make client TLS connection verification defaults consistent: verify peer certificate and name 605bc58 is described below commit 605bc58009549dff2678961455a7ec86b0acede4 Author: Cliff Jansen AuthorDate: Thu Jun 17 23:04:21 2021 -0700 PROTON-2397: make client TLS connection verification defaults consistent: verify peer certificate and name --- c/include/proton/ssl.h | 5 +++-- c/src/ssl/openssl.c | 6 -- c/src/ssl/schannel.cpp | 2 ++ c/tests/proactor_test.cpp| 35 +++ python/proton/_reactor.py| 12 +++- python/proton/_transport.py | 3 ++- python/tests/proton_tests/connect.py | 25 +++-- python/tests/proton_tests/sasl.py| 6 ++ python/tests/proton_tests/ssl.py | 3 ++- ruby/examples/ssl_send.rb| 3 ++- ruby/lib/core/ssl_domain.rb | 7 --- 11 files changed, 78 insertions(+), 29 deletions(-) diff --git a/c/include/proton/ssl.h b/c/include/proton/ssl.h index 03acbf5..a84a9b5 100644 --- a/c/include/proton/ssl.h +++ b/c/include/proton/ssl.h @@ -195,7 +195,7 @@ PN_EXTERN int pn_ssl_domain_set_trusted_ca_db(pn_ssl_domain_t *domain, * identity as contained in the certificate to be valid (see * ::pn_ssl_set_peer_hostname). * - * ANONYMOUS_PEER is configured by default. + * VERIFY_PEER_NAME is configured by default. */ typedef enum { PN_SSL_VERIFY_NULL = 0, /**< internal use only */ @@ -208,7 +208,8 @@ typedef enum { * Configure the level of verification used on the peer certificate. * * This method controls how the peer's certificate is validated, if at all. By default, - * neither servers nor clients attempt to verify their peers (PN_SSL_ANONYMOUS_PEER). + * servers do not attempt to verify their peers (PN_SSL_ANONYMOUS_PEER) but + * clients attempt to verify both the certificate and peer name (PN_SSL_VERIFY_PEER_NAME). * Once certificates and trusted CAs are configured, peer verification can be enabled. * * @note In order to verify a peer, a trusted CA must be configured. See diff --git a/c/src/ssl/openssl.c b/c/src/ssl/openssl.c index 3a98200..b4b7564 100644 --- a/c/src/ssl/openssl.c +++ b/c/src/ssl/openssl.c @@ -562,12 +562,6 @@ pn_ssl_domain_t *pn_ssl_domain( pn_ssl_mode_t mode ) return NULL; } - // Insecure, but backward compatible client default - if (mode==PN_SSL_MODE_CLIENT && - pn_ssl_domain_set_peer_authentication(domain, PN_SSL_ANONYMOUS_PEER, NULL)) { -free(domain); -return NULL; - } return domain; } diff --git a/c/src/ssl/schannel.cpp b/c/src/ssl/schannel.cpp index 3056890..d167678 100644 --- a/c/src/ssl/schannel.cpp +++ b/c/src/ssl/schannel.cpp @@ -486,6 +486,8 @@ pn_ssl_domain_t *pn_ssl_domain( pn_ssl_mode_t mode ) pn_ssl_domain_free(domain); return NULL; } + if (mode == PN_SSL_MODE_CLIENT) +domain->verify_mode == PN_SSL_VERIFY_PEER_NAME; return domain; } diff --git a/c/tests/proactor_test.cpp b/c/tests/proactor_test.cpp index e13fcf4..73e285e 100644 --- a/c/tests/proactor_test.cpp +++ b/c/tests/proactor_test.cpp @@ -533,9 +533,21 @@ TEST_CASE("proactor_ssl") { pn_listener_t *l = p.listen(":0", &listener); REQUIRE_RUN(p, PN_LISTENER_OPEN); - /* Basic SSL connection */ + /* Not Anonymous by default */ p.connect(l, &client); - /* Open ok at both ends */ + REQUIRE_RUN(p, PN_TRANSPORT_ERROR); + CHECK_THAT(*client.last_condition, + cond_matches("amqp:connection:framing-error", "SSL")); + REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); + REQUIRE_RUN(p, PN_TRANSPORT_ERROR); + REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); + + /* Deliberate use of Anonymous */ + pn_ssl_domain_t *cd = client.ssl_domain; + REQUIRE(0 == pn_ssl_domain_set_peer_authentication( + cd, PN_SSL_ANONYMOUS_PEER, NULL)); + pn_connection_t *c = pn_connection(); + p.connect(l, &client, c); REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); CHECK_THAT(*server.last_condition, cond_empty()); @@ -544,11 +556,10 @@ TEST_CASE("proactor_ssl") { REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); /* Verify peer with good hostname */ - pn_ssl_domain_t *cd = client.ssl_domain; REQUIRE(0 == pn_ssl_domain_set_trusted_ca_db(cd, CERTIFICATE("tserver"))); REQUIRE(0 == pn_ssl_domain_set_peer_authentication( cd, PN_SSL_VERIFY_PEER_NAME, NULL)); - pn_connection_t *c = pn_connection(); + c = pn_connection(); pn_connection_set_hostname(c, "test_server"); p.connect(l, &client, c); REQ
[qpid-cpp] branch main updated: QPID-8527: Fix hang in qpidd with TLS connections.
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-cpp.git The following commit(s) were added to refs/heads/main by this push: new 21a8667 QPID-8527: Fix hang in qpidd with TLS connections. 21a8667 is described below commit 21a866749828aa2354234e4c5d3d1cc8ef2b9a80 Author: Cliff Jansen AuthorDate: Thu May 20 07:34:11 2021 -0700 QPID-8527: Fix hang in qpidd with TLS connections. --- src/qpid/sys/posix/AsynchIO.cpp | 7 +++ 1 file changed, 7 insertions(+) diff --git a/src/qpid/sys/posix/AsynchIO.cpp b/src/qpid/sys/posix/AsynchIO.cpp index a05e66f..9b842f3 100644 --- a/src/qpid/sys/posix/AsynchIO.cpp +++ b/src/qpid/sys/posix/AsynchIO.cpp @@ -27,6 +27,7 @@ #include "qpid/sys/Probes.h" #include "qpid/sys/DispatchHandle.h" #include "qpid/sys/Time.h" +#include "qpid/sys/ssl/SslSocket.h" #include "qpid/log/Statement.h" // TODO The basic algorithm here is not really POSIX specific and with a @@ -461,6 +462,12 @@ void AsynchIO::readable(DispatchHandle& h) { // Stop reading if we've overrun our timeslot if ( duration > threadMaxIoTimeNs) { QPID_PROBE4(asynchio_read_finished_maxtime, &h, duration, total, readCalls); +// epoll cannot see into an SslSocket's buffered input and may hang. +const qpid::sys::ssl::SslSocket *s = dynamic_cast(&socket); +if (s) { +// Schedule a future readble callback. QPID-8527 +call(boost::bind(&AsynchIO::readable, this, _1)); +} break; } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch main updated (22ff802 -> 69339de)
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git. from 22ff802 PROTON-2354: Better fix for some 'macOS' test failures add 69339de PROTON-2344: fix Python BlockingConnection resource leaks, memory and socket fds No new revisions were added by this update. Summary of changes: python/proton/_utils.py | 14 -- 1 file changed, 8 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 01/01: PROTON-2344: fix Python BlockingConnection resource leaks, memory and socket fds
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 69339dedbb2ec8d6f934e7de08d021e525312c03 Author: Cliff Jansen AuthorDate: Fri Apr 2 11:46:39 2021 -0700 PROTON-2344: fix Python BlockingConnection resource leaks, memory and socket fds --- python/proton/_utils.py | 14 -- 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/python/proton/_utils.py b/python/proton/_utils.py index 52bdfc2..275f12b 100644 --- a/python/proton/_utils.py +++ b/python/proton/_utils.py @@ -440,12 +440,16 @@ class BlockingConnection(Handler): self.conn.close() self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), msg="Closing connection") +if self.conn.transport: +# Close tail to force transport cleanup without waiting/hanging for peer close frame. +self.conn.transport.close_tail() finally: self.conn.free() # Nothing left to block on. Allow reactor to clean up. self.run() -self.conn = None -self.container.global_handler = None # break circular ref: container to cadapter.on_error +if self.conn: +self.conn.handler = None # break cyclical reference +self.conn = None self.container.stop_events() self.container = None @@ -502,9 +506,6 @@ class BlockingConnection(Handler): raise Timeout(txt) finally: self.container.timeout = container_timeout -if self.disconnected or self._is_closed(): -self.container.stop() -self.conn.handler = None # break cyclical reference if self.disconnected and not self._is_closed(): raise ConnectionException( "Connection %s disconnected: %s" % (self.url, self.disconnected)) @@ -534,7 +535,8 @@ class BlockingConnection(Handler): self.on_transport_closed(event) def on_transport_closed(self, event): -self.disconnected = event.transport.condition or "unknown" +if not self.closing: +self.disconnected = event.transport.condition or "unknown" class AtomicCount(object): - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch master created (now 69339de)
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git. at 69339de PROTON-2344: fix Python BlockingConnection resource leaks, memory and socket fds This branch includes the following new commits: new 69339de PROTON-2344: fix Python BlockingConnection resource leaks, memory and socket fds The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch master updated: PROTON-2334: fix libuv proactor PN_PROACTOR_INACTIVE event generation
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new 6353ad9 PROTON-2334: fix libuv proactor PN_PROACTOR_INACTIVE event generation 6353ad9 is described below commit 6353ad99c23d3a9861ddcc4642df88c68e62698c Author: Cliff Jansen AuthorDate: Mon Feb 22 09:41:04 2021 -0800 PROTON-2334: fix libuv proactor PN_PROACTOR_INACTIVE event generation --- c/src/proactor/libuv.c | 27 +++ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/c/src/proactor/libuv.c b/c/src/proactor/libuv.c index 31f1e7c..b06b1eb 100644 --- a/c/src/proactor/libuv.c +++ b/c/src/proactor/libuv.c @@ -253,6 +253,7 @@ struct pn_proactor_t { bool batch_working; /* batch is being processed in a worker thread */ bool need_interrupt; /* Need a PN_PROACTOR_INTERRUPT event */ bool need_inactive; /* need INACTIVE event */ + bool timeout_processed; }; @@ -383,6 +384,13 @@ static inline work_t *batch_work(pn_event_batch_t *batch) { return NULL; } +static void check_for_inactive(pn_proactor_t *p) { + /* No future events: no active socket io, no pending timer, no + current event processing. */ + if (!p->batch_working && !p->active && !p->need_interrupt && p->timeout_state == TM_NONE) +p->need_inactive = true; +} + /* Total count of listener and connections for PN_PROACTOR_INACTIVE */ static void add_active(pn_proactor_t *p) { uv_mutex_lock(&p->lock); @@ -393,7 +401,7 @@ static void add_active(pn_proactor_t *p) { static void remove_active_lh(pn_proactor_t *p) { assert(p->active > 0); if (--p->active == 0) { -p->need_inactive = true; +check_for_inactive(p); } } @@ -503,9 +511,7 @@ static int pconnection_init(pconnection_t *pc) { uv_close((uv_handle_t*)&pc->tcp, NULL); } } - if (!err) { -add_active(pc->work.proactor); - } else { + if (err) { pconnection_error(pc, err, "initialization"); } return err; @@ -856,7 +862,7 @@ static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) { } if (p->timeout_state == TM_FIRED) { p->timeout_state = TM_NONE; - remove_active_lh(p); + p->timeout_processed = true; return proactor_batch_lh(p, PN_PROACTOR_TIMEOUT); } } @@ -1084,6 +1090,10 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { pn_proactor_t *bp = batch_proactor(batch); /* Proactor events */ if (bp == p) { p->batch_working = false; +if (p->timeout_processed) { + p->timeout_processed = false; + check_for_inactive(p); +} } uv_mutex_unlock(&p->lock); notify(p); @@ -1141,7 +1151,6 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) { uv_mutex_lock(&p->lock); p->timeout = t; // This timeout *replaces* any existing timeout - if (p->timeout_state == TM_NONE) ++p->active; p->timeout_state = TM_REQUEST; uv_mutex_unlock(&p->lock); notify(p); @@ -1151,7 +1160,7 @@ void pn_proactor_cancel_timeout(pn_proactor_t *p) { uv_mutex_lock(&p->lock); if (p->timeout_state != TM_NONE) { p->timeout_state = TM_NONE; -remove_active_lh(p); +check_for_inactive(p); notify(p); } uv_mutex_unlock(&p->lock); @@ -1160,6 +1169,7 @@ void pn_proactor_cancel_timeout(pn_proactor_t *p) { void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, const char *addr) { pconnection_t *pc = pconnection(p, c, t, false); assert(pc); /* TODO aconway 2017-03-31: memory safety */ + add_active(p); pn_connection_open(pc->driver.connection); /* Auto-open */ parse_addr(&pc->addr, addr); work_start(&pc->work); @@ -1314,9 +1324,10 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) { } void pn_listener_accept2(pn_listener_t *l, pn_connection_t *c, pn_transport_t *t) { - uv_mutex_lock(&l->lock); pconnection_t *pc = pconnection(l->work.proactor, c, t, true); assert(pc); + add_active(l->work.proactor); + uv_mutex_lock(&l->lock); /* Get the socket from the accept event that we are processing */ pn_event_t *e = pn_collector_prev(l->collector); assert(pn_event_type(e) == PN_LISTENER_ACCEPT); - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch master updated: PROTON-2337: epoll proactor - missing lock on variable increment, found by TSAN
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new ff8ca1a PROTON-2337: epoll proactor - missing lock on variable increment, found by TSAN ff8ca1a is described below commit ff8ca1a0c96cbabf99a79e248610a03ddf016262 Author: Cliff Jansen AuthorDate: Thu Feb 18 22:19:58 2021 -0800 PROTON-2337: epoll proactor - missing lock on variable increment, found by TSAN --- c/src/proactor/epoll.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 8c792e6..876c55a 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -2407,12 +2407,12 @@ static pn_event_batch_t *next_event_batch(pn_proactor_t* p, bool can_block) { lock(&p->tslot_mutex); tslot_t * ts = find_tslot(p); unlock(&p->tslot_mutex); - ts->generation++; // wrapping OK. Just looking for any change lock(&p->sched_mutex); assert(ts->task == NULL || ts->earmarked); assert(ts->state == UNUSED || ts->state == NEW); ts->state = PROCESSING; + ts->generation++; // wrapping OK. Just looking for any change // Process outstanding epoll events until we get a batch or need to block. while (true) { - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 02/02: PROTON-2334: c-fdlimit-tests fix for libuv immediate close connection drain behaviour for EMFILE accept() error
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 039c1fef0954491292c0fd171f22abc63fc541c5 Author: Cliff Jansen AuthorDate: Fri Feb 12 10:02:43 2021 -0800 PROTON-2334: c-fdlimit-tests fix for libuv immediate close connection drain behaviour for EMFILE accept() error --- c/tests/fdlimit.py | 31 --- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/c/tests/fdlimit.py b/c/tests/fdlimit.py index 8ef60ac..ae750ef 100644 --- a/c/tests/fdlimit.py +++ b/c/tests/fdlimit.py @@ -64,25 +64,26 @@ class FdLimitTest(unittest.TestCase): # PN_TRANSPORT_CLOSED: amqp:connection:framing-error: connection aborted # PN_TRANSPORT_CLOSED: proton:io: Connection reset by peer - disconnected :5672 (connection aborted) for i in range(fdlimit): -receiver = test_subprocess.Popen(["receive", "", b.port, str(i)], stdout=self.devnull) +receiver = subprocess.Popen(["receive", "", b.port, str(i)], stdout=self.devnull, stderr=subprocess.STDOUT) receivers.append(receiver) # Allow these subprocesses time to establish ahead of the upcoming test sender. time.sleep(1) -# All FDs are now in use, send attempt will (with present implementation) hang -with test_subprocess.Popen(["send", "", b.port, "x"], - stdout=self.devnull, stderr=subprocess.STDOUT) as sender: -time.sleep(1) # polling for None immediately would always succeed, regardless whether send hangs or not -self.assertIsNone(sender.poll()) - -# Kill receivers to free up FDs -for r in receivers: -r.kill() -for r in receivers: -r.wait() - -# Sender now succeeded and exited -self.assertEqual(sender.wait(), 0) +# All FDs are now in use, new send should not succeed. May fail by hanging (epoll) or by +# immediate failure (libuv). But poll() should never be 0. +sender = subprocess.Popen(["send", "", b.port, "x"], + stdout=self.devnull, stderr=subprocess.STDOUT) +time.sleep(1) # polling for None immediately would always succeed, regardless whether send hangs or not +self.assertNotEqual(sender.poll(), 0) + +# Kill receivers to free up FDs +for r in receivers: +r.kill() +for r in receivers: +r.wait() + +# Sender completes on its own +sender.wait() # Additional send/receive should succeed now self.assertIn("10 messages sent", test_subprocess.check_output( - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch master updated (e4f3c34 -> 039c1fe)
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git. from e4f3c34 NO-JIRA Add two forgotten test files to the project (#296) new 09dfeea PROTON-2334: threaderciser test - simultaneous connect and pn_proactor_disconnect() new 039c1fe PROTON-2334: c-fdlimit-tests fix for libuv immediate close connection drain behaviour for EMFILE accept() error The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: c/src/proactor/libuv.c | 2 +- c/tests/fdlimit.py | 31 --- 2 files changed, 17 insertions(+), 16 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 01/02: PROTON-2334: threaderciser test - simultaneous connect and pn_proactor_disconnect()
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 09dfeea5db115a99217dbc64900db8f2496e2787 Author: Cliff Jansen AuthorDate: Fri Feb 12 08:33:53 2021 -0800 PROTON-2334: threaderciser test - simultaneous connect and pn_proactor_disconnect() --- c/src/proactor/libuv.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/c/src/proactor/libuv.c b/c/src/proactor/libuv.c index fa51cc4..31f1e7c 100644 --- a/c/src/proactor/libuv.c +++ b/c/src/proactor/libuv.c @@ -654,7 +654,6 @@ static int lsocket(pn_listener_t *l, struct addrinfo *ai) { /* Listen on all available addresses */ static void leader_listen_lh(pn_listener_t *l) { - add_active(l->work.proactor); int err = leader_resolve(l->work.proactor, &l->addr, true); if (!err) { /* Allocate enough space for the pn_netaddr_t addresses */ @@ -1170,6 +1169,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in work_init(&l->work, p, T_LISTENER); parse_addr(&l->addr, addr); l->backlog = backlog; + add_active(l->work.proactor); /* Owned by proactor. Track it for PN_PROACTOR_INACTIVE. */; work_start(&l->work); } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch master updated: PROTON-2328: epoll proactor, use local variable not zeroed task member to track warm pairings
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new b4ce612 PROTON-2328: epoll proactor, use local variable not zeroed task member to track warm pairings b4ce612 is described below commit b4ce6127df4e9be84ebc5a6a98750bf4feb403ad Author: Cliff Jansen AuthorDate: Mon Jan 25 09:06:29 2021 -0800 PROTON-2328: epoll proactor, use local variable not zeroed task member to track warm pairings --- c/src/proactor/epoll.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 7467683..8c792e6 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -461,7 +461,7 @@ bool unassign_thread(tslot_t *ts, tslot_state new_state) { if (tsk && !deleting) { pn_proactor_t *p = tsk->proactor; -ts->prev_task = ts->task; +ts->prev_task = tsk; if (tsk->sched_pending) { // Make sure the task is already scheduled or put it on the ready list if (tsk->sched_ready) { - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 01/03: PROTON-2326: epoll proactor refactor - "schedule" instead of "wake", "task" instead of "context"
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 38c2dab1083d1cada116360d7219192614467277 Author: Cliff Jansen AuthorDate: Sun Jan 24 10:54:21 2021 -0800 PROTON-2326: epoll proactor refactor - "schedule" instead of "wake", "task" instead of "context" --- c/src/proactor/epoll-internal.h | 99 ++- c/src/proactor/epoll.c| 1184 + c/src/proactor/epoll_raw_connection.c | 126 ++-- c/src/proactor/epoll_timer.c | 88 +-- 4 files changed, 749 insertions(+), 748 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 66fb15e..21226a9 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -57,7 +57,7 @@ typedef pthread_mutex_t pmutex; typedef struct pni_timer_t pni_timer_t; typedef enum { - WAKE, /* see if any work to do in proactor/psocket context */ + EVENT_FD, /* schedule() or pn_proactor_interrupt() */ LISTENER_IO, PCONNECTION_IO, RAW_CONNECTION_IO, @@ -67,7 +67,7 @@ typedef enum { // Data to use with epoll. typedef struct epoll_extended_t { int fd; - epoll_type_t type; // io/timer/wakeup + epoll_type_t type; // io/timer/eventfd uint32_t wanted; // events to poll for bool polling; pmutex barrier_mutex; @@ -79,36 +79,36 @@ typedef enum { LISTENER, RAW_CONNECTION, TIMER_MANAGER -} pcontext_type_t; +} task_type_t; -typedef struct pcontext_t { +typedef struct task_t { pmutex mutex; pn_proactor_t *proactor; /* Immutable */ - pcontext_type_t type; + task_type_t type; bool working; - bool on_wake_list; - bool wake_pending; // unprocessed eventfd wake callback - struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex + bool on_ready_list; + bool ready;// ready to run and on ready list. Poller notified by eventfd. + struct task_t *ready_next; // ready list, guarded by proactor eventfd_mutex bool closing; // Next 4 are protected by the proactor mutex - struct pcontext_t* next; /* Protected by proactor.mutex */ - struct pcontext_t* prev; /* Protected by proactor.mutex */ + struct task_t* next; /* Protected by proactor.mutex */ + struct task_t* prev; /* Protected by proactor.mutex */ int disconnect_ops; /* ops remaining before disconnect complete */ bool disconnecting; /* pn_proactor_disconnect */ // Protected by schedule mutex tslot_t *runner __attribute__((aligned(64))); /* designated or running thread */ tslot_t *prev_runner; - bool sched_wake; + bool sched_ready; bool sched_pending; /* If true, one or more unseen epoll or other events to process() */ - bool runnable ; /* in need of scheduling */ -} pcontext_t; + bool runnable ; /* on one of the runnable lists */ +} task_t; typedef enum { NEW, UNUSED, /* pn_proactor_done() called, may never come back */ SUSPENDED, - PROCESSING, /* Hunting for a context */ - BATCHING, /* Doing work on behalf of a context */ + PROCESSING, /* Hunting for a task */ + BATCHING, /* Doing work on behalf of a task */ DELETING, POLLING } tslot_state; @@ -121,8 +121,8 @@ struct tslot_t { bool suspended; volatile bool scheduled; tslot_state state; - pcontext_t *context; - pcontext_t *prev_context; + task_t *task; + task_t *prev_task; bool earmarked; tslot_t *suspend_list_prev; tslot_t *suspend_list_next; @@ -131,7 +131,7 @@ struct tslot_t { }; typedef struct pni_timer_manager_t { - pcontext_t context; + task_t task; epoll_extended_t epoll_timer; pmutex deletion_mutex; pni_timer_t *proactor_timer; @@ -141,12 +141,12 @@ typedef struct pni_timer_manager_t { } pni_timer_manager_t; struct pn_proactor_t { - pcontext_t context; + task_t task; pni_timer_manager_t timer_manager; - epoll_extended_t epoll_wake; + epoll_extended_t epoll_schedule; /* ready list */ epoll_extended_t epoll_interrupt; pn_event_batch_t batch; - pcontext_t *contexts; /* track in-use contexts for PN_PROACTOR_INACTIVE and disconnect */ + task_t *tasks; /* track in-use tasks for PN_PROACTOR_INACTIVE and disconnect */ pni_timer_t *timer; size_t disconnects_pending; /* unfinished proactor disconnects*/ // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch() @@ -155,21 +155,21 @@ struct pn_proactor_t { bool need_timeout; bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */ bool timeout_processed; /* timeout event dispatched in the most recent event batch */ - int context_count; + int task_count;
[qpid-proton] branch master updated (ea4dbf1 -> 4fc4c78)
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git. from ea4dbf1 PROTON-2327: Fix example build breakage on cmake 2.8.12 new 38c2dab PROTON-2326: epoll proactor refactor - "schedule" instead of "wake", "task" instead of "context" new f6734ed PROTON-2326: epoll proactor refactor - provide proactor as direct argument to notify_poller(), not indirect via task new 4fc4c78 PROTON-2326: epoll proactor refactor - make all tasks wakeable in generic manner. This closes #290 The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: c/src/proactor/epoll-internal.h | 121 ++-- c/src/proactor/epoll.c| 1192 - c/src/proactor/epoll_raw_connection.c | 142 ++-- c/src/proactor/epoll_timer.c | 88 +-- 4 files changed, 780 insertions(+), 763 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 02/03: PROTON-2326: epoll proactor refactor - provide proactor as direct argument to notify_poller(), not indirect via task
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit f6734ed6cdab7ca7ff84d208774735a0edad7731 Author: Cliff Jansen AuthorDate: Wed Jan 20 22:33:34 2021 -0800 PROTON-2326: epoll proactor refactor - provide proactor as direct argument to notify_poller(), not indirect via task --- c/src/proactor/epoll-internal.h | 2 +- c/src/proactor/epoll.c| 49 --- c/src/proactor/epoll_raw_connection.c | 19 ++ c/src/proactor/epoll_timer.c | 4 +-- 4 files changed, 34 insertions(+), 40 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 21226a9..0d63f90 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -352,7 +352,7 @@ static void task_finalize(task_t* tsk) { } bool schedule(task_t *tsk); -void notify_poller(task_t *tsk); +void notify_poller(pn_proactor_t *p); void schedule_done(task_t *tsk); void psocket_init(psocket_t* ps, epoll_type_t type); diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index ae0c37b..c4f028d 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -314,8 +314,7 @@ bool schedule(task_t *tsk) { } // part2: unblock epoll_wait(). Make OS call without lock held. -void notify_poller(task_t *tsk) { - pn_proactor_t *p = tsk->proactor; +void notify_poller(pn_proactor_t *p) { if (p->eventfd == -1) return; rearm(p, &p->epoll_schedule); @@ -706,7 +705,7 @@ static void proactor_rearm_overflow(pn_proactor_t *p) { } else notify = schedule(&l->task); unlock(&l->task.mutex); -if (notify) notify_poller(&l->task); +if (notify) notify_poller(p); a = acceptor_list_next(&ovflw); } } @@ -871,7 +870,7 @@ void pni_pconnection_timeout(pconnection_t *pc) { } unlock(&pc->task.mutex); if (notify) -notify_poller(&pc->task); +notify_poller(pc->task.proactor); } static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) { @@ -1018,7 +1017,7 @@ static void pconnection_done(pconnection_t *pc) { notify = unassign_thread(ts, UNUSED); unlock(&p->sched_mutex); if (notify) -notify_poller(&p->task); +notify_poller(p); return; } } @@ -1033,7 +1032,7 @@ static void pconnection_done(pconnection_t *pc) { if (unassign_thread(ts, UNUSED)) notify = true; unlock(&p->sched_mutex); - if (notify) notify_poller(&p->task); + if (notify) notify_poller(p); return; } @@ -1399,7 +1398,6 @@ void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t * pn_connection_open(pc->driver.connection); /* Auto-open */ bool notify = false; - bool notify_proactor = false; if (pc->disconnected) { notify = schedule(&pc->task);/* Error during initialization */ @@ -1414,14 +1412,13 @@ void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t * psocket_gai_error(&pc->psocket, gai_error, "connect to "); notify = schedule(&pc->task); lock(&p->task.mutex); - notify_proactor = schedule_if_inactive(p); + notify |= schedule_if_inactive(p); unlock(&p->task.mutex); } } /* We need to issue INACTIVE on immediate failure */ unlock(&pc->task.mutex); - if (notify) notify_poller(&pc->task); - if (notify_proactor) notify_poller(&p->task); + if (notify) notify_poller(pc->task.proactor); } static void pconnection_tick(pconnection_t *pc) { @@ -1449,7 +1446,7 @@ void pn_connection_wake(pn_connection_t* c) { } unlock(&pc->task.mutex); } - if (notify) notify_poller(&pc->task); + if (notify) notify_poller(pc->task.proactor); } void pn_proactor_release_connection(pn_connection_t *c) { @@ -1463,7 +1460,7 @@ void pn_proactor_release_connection(pn_connection_t *c) { notify = schedule(&pc->task); unlock(&pc->task.mutex); } - if (notify) notify_poller(&pc->task); + if (notify) notify_poller(pc->task.proactor); } // @@ -1576,7 +1573,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in } proactor_add(&l->task); unlock(&l->task.mutex); - if (notify) notify_poller(&l->task); + if (notify) notify_poller(p); return; } @@ -1656,7 +1653,7 @@ void pn_listener_close(pn_listener_t* l) { notify = schedule(&l->task); } unlock(&l->task.mutex); - if (notify) notify_poller(&l->task); + if (notify) notify_poller(l->task.proactor); } static void listener_forced_shutdown(pn_listener_t *l) { @@ -18
[qpid-proton] 03/03: PROTON-2326: epoll proactor refactor - make all tasks wakeable in generic manner. This closes #290
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 4fc4c7869f470c688777c4f040b0fc31219f4f42 Author: Cliff Jansen AuthorDate: Sun Jan 24 11:18:30 2021 -0800 PROTON-2326: epoll proactor refactor - make all tasks wakeable in generic manner. This closes #290 --- c/src/proactor/epoll-internal.h | 26 -- c/src/proactor/epoll.c| 11 +-- c/src/proactor/epoll_raw_connection.c | 19 ++- 3 files changed, 39 insertions(+), 17 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 0d63f90..e8c9f0a 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -86,8 +86,9 @@ typedef struct task_t { pn_proactor_t *proactor; /* Immutable */ task_type_t type; bool working; - bool on_ready_list; bool ready;// ready to run and on ready list. Poller notified by eventfd. + bool waking; + bool on_ready_list;// todo: protected by eventfd_mutex or sched mutex? needed? struct task_t *ready_next; // ready list, guarded by proactor eventfd_mutex bool closing; // Next 4 are protected by the proactor mutex @@ -223,7 +224,6 @@ typedef struct pconnection_t { pni_timer_t *timer; const char *host, *port; uint32_t new_events; - int wake_count; // TODO: protected by task.mutex so should be moved in there (also really bool) bool server;/* accept, not connect */ bool tick_pending; bool queued_disconnect; /* deferred from pn_proactor_disconnect() */ @@ -382,6 +382,28 @@ pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeou void pni_pconnection_timeout(pconnection_t *pc); void pni_proactor_timeout(pn_proactor_t *p); +// Generic wake primitives for a task. + +// Call with task lock held. Must call notify_poller() if returns true. +static inline bool pni_task_wake(task_t *tsk) { + if (!tsk->waking) { +tsk->waking = true; +return schedule(tsk); + } + return false; +} + +// Call with task lock held. +static inline bool pni_task_wake_pending(task_t *tsk) { + return tsk->waking; +} + +// Call with task lock held and only from the running task. +static inline void pni_task_wake_done(task_t *tsk) { + tsk->waking = false; +} + + #ifdef __cplusplus } #endif diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index c4f028d..7467683 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -49,6 +49,7 @@ TODO: document role of sched_pending and how sched_XXX (i.e. sched_interrupt) transitions from "private to the scheduler" to "visible to the task". + TODO: document task.working duration can be long: from xxx_process() to xxx_done() or null batch. */ @@ -742,7 +743,6 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con psocket_init(&pc->psocket, PCONNECTION_IO); pni_parse_addr(addr, pc->addr_buf, addrlen+1, &pc->host, &pc->port); pc->new_events = 0; - pc->wake_count = 0; pc->tick_pending = false; pc->queued_disconnect = false; pc->disconnect_condition = NULL; @@ -980,7 +980,7 @@ static bool pconnection_sched_sync(pconnection_t *pc) { /* Call with task lock and having done a write_flush() to "know" the value of wbuf_remaining */ static inline bool pconnection_work_pending(pconnection_t *pc) { - if (pc->new_events || pc->wake_count || pc->tick_pending || pc->queued_disconnect) + if (pc->new_events || pni_task_wake_pending(&pc->task) || pc->tick_pending || pc->queued_disconnect) return true; if (!pc->read_blocked && !pconnection_rclosed(pc)) return true; @@ -1153,9 +1153,9 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, return &pc->batch; } bool closed = pconnection_rclosed(pc) && pconnection_wclosed(pc); - if (pc->wake_count) { + if (pni_task_wake_pending(&pc->task)) { waking = !closed; -pc->wake_count = 0; +pni_task_wake_done(&pc->task); } if (pc->tick_pending) { pc->tick_pending = false; @@ -1441,8 +1441,7 @@ void pn_connection_wake(pn_connection_t* c) { if (pc) { lock(&pc->task.mutex); if (!pc->task.closing) { - pc->wake_count++; - notify = schedule(&pc->task); + notify = pni_task_wake(&pc->task); } unlock(&pc->task.mutex); } diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index 3fd2b36..8722ff0 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -50,7 +50,6 @@ struct praw_connection_t { struct addrinfo *ai; /* Curre
[qpid-proton] branch master updated: PROTON-1516: add tests for empty last frame in a streamed message
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new 8c0e209 PROTON-1516: add tests for empty last frame in a streamed message 8c0e209 is described below commit 8c0e2099cc25c6fb9b53f25335898564a945f9f6 Author: Cliff Jansen AuthorDate: Mon Dec 14 09:59:43 2020 -0800 PROTON-1516: add tests for empty last frame in a streamed message --- c/tests/connection_driver_test.cpp | 42 + python/tests/proton_tests/engine.py | 30 ++ 2 files changed, 72 insertions(+) diff --git a/c/tests/connection_driver_test.cpp b/c/tests/connection_driver_test.cpp index 8a86db5..ff04e81 100644 --- a/c/tests/connection_driver_test.cpp +++ b/c/tests/connection_driver_test.cpp @@ -575,3 +575,45 @@ TEST_CASE("driver_settle_incomplete_receiver") { cond_empty()); CHECK_THAT(*pn_connection_condition(d.server.connection), cond_empty()); } + +/* Empty last frame in streaming message. +*/ +TEST_CASE("driver_empty_last_frame") { + send_client_handler client; + delivery_handler server; + pn_test::driver_pair d(client, server); + + d.run(); + pn_link_t *rcv = server.link; + pn_link_t *snd = client.link; + char data[100] = {0}; /* Dummy data to send. */ + char rbuf[sizeof(data)] = {0}; /* Read buffer for incoming data. */ + pn_link_flow(rcv, 1); + pn_delivery_t *sd = pn_delivery(snd, pn_bytes("1")); /* Prepare to send */ + d.run(); + + /* Send/receive a frame */ + CHECK(sizeof(data) == pn_link_send(snd, data, sizeof(data))); + server.log_clear(); + d.run(); + CHECK_THAT(ETYPES(PN_DELIVERY), Equals(server.log_clear())); + CHECK(sizeof(data) == pn_link_recv(rcv, rbuf, sizeof(data))); + CHECK(pn_delivery_partial(pn_link_current(rcv))); + d.run(); + + /* Advance after all data transfered over wire. */ + CHECK(pn_link_advance(snd)); + server.log_clear(); + d.run(); + CHECK_THAT(ETYPES(PN_DELIVERY), Equals(server.log_clear())); + CHECK(PN_EOS == pn_link_recv(rcv, rbuf, sizeof(data))); + CHECK(!pn_delivery_partial(pn_link_current(rcv))); + + pn_delivery_settle(sd); + sd = NULL; + pn_delivery_settle(pn_link_current(rcv)); + d.run(); + CHECK_THAT(*pn_connection_remote_condition(d.client.connection), + cond_empty()); + CHECK_THAT(*pn_connection_condition(d.server.connection), cond_empty()); +} diff --git a/python/tests/proton_tests/engine.py b/python/tests/proton_tests/engine.py index 1b4c02b..70d10c8 100644 --- a/python/tests/proton_tests/engine.py +++ b/python/tests/proton_tests/engine.py @@ -935,6 +935,36 @@ class TransferTest(Test): self.pump() assert self.rcv.current.aborted + def test_multiframe_last_empty(self): +self.rcv.flow(1) +sd = self.snd.delivery("tag") +msg_p1 = b"this is a test" +n = self.snd.send(msg_p1) +assert n == len(msg_p1) + +self.pump() + +assert len(msg_p1) == self.rcv.current.pending +assert self.rcv.current.partial +msg_p2 = b"this is more." +n = self.snd.send(msg_p2) +assert n == len(msg_p2) + +self.pump() + +msg = msg_p1 + msg_p2 +assert len(msg) == self.rcv.current.pending +assert self.rcv.current.partial +# Advance. Should send empty xfer frame with more flag false. +assert self.snd.advance() + +self.pump() + +assert len(msg) == self.rcv.current.pending +assert not self.rcv.current.partial +binary = self.rcv.recv(self.rcv.current.pending) +assert binary == msg + def test_disposition(self): self.rcv.flow(1) - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch master updated: PROTON-1914: early settlement of inbound streamed message. This closes #279
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new 72fccd6 PROTON-1914: early settlement of inbound streamed message. This closes #279 new 3c638f9 PROTON-1914: early settlement of inbound streamed message. This closes #279 72fccd6 is described below commit 72fccd66580dc510540010794e1f05f8a57183f2 Author: Cliff Jansen AuthorDate: Fri Dec 11 10:29:30 2020 -0800 PROTON-1914: early settlement of inbound streamed message. This closes #279 --- c/src/core/engine-internal.h | 2 + c/src/core/engine.c| 2 + c/src/core/transport.c | 78 -- c/tests/connection_driver_test.cpp | 23 +++ 4 files changed, 78 insertions(+), 27 deletions(-) diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h index 11718c9..832d29d 100644 --- a/c/src/core/engine-internal.h +++ b/c/src/core/engine-internal.h @@ -305,6 +305,7 @@ struct pn_link_t { pn_sequence_t available; pn_sequence_t credit; pn_sequence_t queued; + pn_sequence_t more_id; int drained; // number of drained credits uint8_t snd_settle_mode; uint8_t rcv_settle_mode; @@ -313,6 +314,7 @@ struct pn_link_t { bool drain_flag_mode; // receiver only bool drain; bool detached; + bool more_pending; }; struct pn_disposition_t { diff --git a/c/src/core/engine.c b/c/src/core/engine.c index bfc8613..1aa1992 100644 --- a/c/src/core/engine.c +++ b/c/src/core/engine.c @@ -1177,6 +1177,7 @@ pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name) link->available = 0; link->credit = 0; link->queued = 0; + link->more_id = 0; link->drain = false; link->drain_flag_mode = true; link->drained = 0; @@ -1186,6 +1187,7 @@ pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name) link->remote_snd_settle_mode = PN_SND_MIXED; link->remote_rcv_settle_mode = PN_RCV_FIRST; link->detached = false; + link->more_pending = false; link->properties = 0; link->remote_properties = 0; diff --git a/c/src/core/transport.c b/c/src/core/transport.c index fe6ebf1..0467eef 100644 --- a/c/src/core/transport.c +++ b/c/src/core/transport.c @@ -1519,12 +1519,36 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann if (!link) { return pn_do_error(transport, "amqp:invalid-field", "no such handle: %u", handle); } - pn_delivery_t *delivery; - if (link->unsettled_tail && !link->unsettled_tail->done) { -delivery = link->unsettled_tail; -if (settled_set && !settled && delivery->remote.settled) - return pn_do_error(transport, "amqp:invalid-field", "invalid transition from settled to unsettled"); + pn_delivery_t *delivery = NULL; + bool new_delivery = false; + if (link->more_pending) { +// Ongoing multiframe delivery. +if (link->unsettled_tail && !link->unsettled_tail->done) { + delivery = link->unsettled_tail; + if (settled_set && !settled && delivery->remote.settled) +return pn_do_error(transport, "amqp:invalid-field", "invalid transition from settled to unsettled"); + if (id_present && id != delivery->state.id) +return pn_do_error(transport, "amqp:invalid-field", "invalid delivery-id for a continuation transfer"); +} else { + // Application has already settled. Delivery is no more. + // Ignore content and look for transition to a new delivery. + if (!id_present || id == link->more_id) { +// Still old delivery. +if (!more || aborted) + link->more_pending = false; + } else { +// New id. +new_delivery = true; +link->more_pending = false; + } +} } else { +new_delivery = true; + } + + if (new_delivery) { +assert(!link->more_pending); +assert(delivery == NULL); pn_delivery_map_t *incoming = &ssn->state.incoming; if (!ssn->state.incoming_init) { @@ -1550,17 +1574,38 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann link->queued++; } - pn_buffer_append(delivery->bytes, payload->start, payload->size); - ssn->incoming_bytes += payload->size; - delivery->done = !more; + if (delivery) { +pn_buffer_append(delivery->bytes, payload->start, payload->size); +if (more) { + if (!link->more_pending) { +// First frame of a multi-frame transfer. Remember at link level. +link->more_pending = true; +assert(id_present); // Id MUST be set on first frame, and already c
[qpid-proton] branch master updated: PROTON-2172: proactor fdlimit test. Should work after PROTON-1496.
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new 7735f1b PROTON-2172: proactor fdlimit test. Should work after PROTON-1496. 7735f1b is described below commit 7735f1b7b39b36f5fd292410385e6a5a9c4fb68a Author: Cliff Jansen AuthorDate: Mon Nov 23 10:27:47 2020 -0800 PROTON-2172: proactor fdlimit test. Should work after PROTON-1496. --- c/tests/fdlimit.py | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/c/tests/fdlimit.py b/c/tests/fdlimit.py index 6848c68..2fe2e71 100644 --- a/c/tests/fdlimit.py +++ b/c/tests/fdlimit.py @@ -51,8 +51,7 @@ class FdLimitTest(unittest.TestCase): if cls.devnull: cls.devnull.close() -# @unittest.skipUnless(prlimit_available, "prlimit not available") -@unittest.skip("temporarily disabled (epoll fix pending)") +@unittest.skipUnless(prlimit_available, "prlimit not available") def test_fd_limit_broker(self): """Check behaviour when running out of file descriptors on accept""" # Not too many FDs but not too few either, some are used for system purposes. @@ -64,9 +63,11 @@ class FdLimitTest(unittest.TestCase): # NOTE: broker does not log a file descriptor related error at any point in the test, only # PN_TRANSPORT_CLOSED: amqp:connection:framing-error: connection aborted # PN_TRANSPORT_CLOSED: proton:io: Connection reset by peer - disconnected :5672 (connection aborted) -for i in range(fdlimit + 1): +for i in range(fdlimit): receiver = test_subprocess.Popen(["receive", "", b.port, str(i)], stdout=self.devnull) receivers.append(receiver) +# Allow these subprocesses time to establish ahead of the upcoming test sender. +time.sleep(1) # All FDs are now in use, send attempt will (with present implementation) hang with test_subprocess.Popen(["send", "", b.port, "x"], - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch master updated: PROTON-2304: fix epoll proactor hang removing and replacing same canceled timer at front of timers list repeatedly
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new 9722a7f PROTON-2304: fix epoll proactor hang removing and replacing same canceled timer at front of timers list repeatedly 9722a7f is described below commit 9722a7f9d3afadffe1883305aff64bfaf977f596 Author: Cliff Jansen AuthorDate: Mon Nov 16 22:06:07 2020 -0800 PROTON-2304: fix epoll proactor hang removing and replacing same canceled timer at front of timers list repeatedly --- c/src/proactor/epoll_timer.c | 24 +--- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/c/src/proactor/epoll_timer.c b/c/src/proactor/epoll_timer.c index 6c7c3db..6d288e1 100644 --- a/c/src/proactor/epoll_timer.c +++ b/c/src/proactor/epoll_timer.c @@ -326,17 +326,19 @@ pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeou lock(&tm->context.mutex); } else { uint64_t deadline = td->timer->deadline; - if (deadline && deadline <= now) { -td->timer->deadline = 0; -pconnection_t *pc = td->timer->connection; -lock(&tm->deletion_mutex); // Prevent connection from deleting itself when tm->context.mutex dropped. -unlock(&tm->context.mutex); -pni_pconnection_timeout(pc); -unlock(&tm->deletion_mutex); -lock(&tm->context.mutex); - } else { -td->list_deadline = td->timer->deadline; -pn_list_minpush(tm->timers_heap, td); + if (deadline) { +if (deadline <= now) { + td->timer->deadline = 0; + pconnection_t *pc = td->timer->connection; + lock(&tm->deletion_mutex); // Prevent connection from deleting itself when tm->context.mutex dropped. + unlock(&tm->context.mutex); + pni_pconnection_timeout(pc); + unlock(&tm->deletion_mutex); + lock(&tm->context.mutex); +} else { + td->list_deadline = deadline; + pn_list_minpush(tm->timers_heap, td); +} } } } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch master updated: PROTON-2292: backout tsan suppression. Apply fix to forced shutdown code path.
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new 2ec1ba5 PROTON-2292: backout tsan suppression. Apply fix to forced shutdown code path. 2ec1ba5 is described below commit 2ec1ba5026ad821f0f7f6919cd14762f04dcadd2 Author: Cliff Jansen AuthorDate: Tue Nov 10 09:50:07 2020 -0800 PROTON-2292: backout tsan suppression. Apply fix to forced shutdown code path. --- c/src/proactor/epoll.c | 2 +- tests/tsan.supp| 4 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index d2822b4..30ef5f1 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -1985,7 +1985,6 @@ void pn_proactor_free(pn_proactor_t *p) { p->eventfd = -1; close(p->interruptfd); p->interruptfd = -1; - pni_timer_manager_finalize(&p->timer_manager); while (p->contexts) { pcontext_t *ctx = p->contexts; p->contexts = ctx->next; @@ -2001,6 +2000,7 @@ void pn_proactor_free(pn_proactor_t *p) { } } + pni_timer_manager_finalize(&p->timer_manager); pn_collector_free(p->collector); pmutex_finalize(&p->tslot_mutex); pmutex_finalize(&p->sched_mutex); diff --git a/tests/tsan.supp b/tests/tsan.supp index 5a1c61a..f0f9020 100644 --- a/tests/tsan.supp +++ b/tests/tsan.supp @@ -30,7 +30,3 @@ race:cpp/examples/broker # found by threaderciser, in c/src/proactor/epoll.c race:^listener_final_free$ race:^pn_proactor_connect2$ - -# PROTON-2292 & PROTON-1496: false suspected collision between set/free -mutex:^pni_timer_set$ -mutex:^pni_timer_free$ - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch master updated: PROTON-2292: update tsan.supp for new epoll proactor timer code
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new 8c8a8bc PROTON-2292: update tsan.supp for new epoll proactor timer code 8c8a8bc is described below commit 8c8a8bc14cb5a24382bbb08e42bc1a7b69188a95 Author: Cliff Jansen AuthorDate: Mon Nov 9 23:44:18 2020 -0800 PROTON-2292: update tsan.supp for new epoll proactor timer code --- tests/tsan.supp | 4 1 file changed, 4 insertions(+) diff --git a/tests/tsan.supp b/tests/tsan.supp index f0f9020..5a1c61a 100644 --- a/tests/tsan.supp +++ b/tests/tsan.supp @@ -30,3 +30,7 @@ race:cpp/examples/broker # found by threaderciser, in c/src/proactor/epoll.c race:^listener_final_free$ race:^pn_proactor_connect2$ + +# PROTON-2292 & PROTON-1496: false suspected collision between set/free +mutex:^pni_timer_set$ +mutex:^pni_timer_free$ - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch master updated: PROTON-2292: TSAN failure in epoll_timer.c. Replace missing lock spotted by TSAN and Coverity.
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new 06833ac PROTON-2292: TSAN failure in epoll_timer.c. Replace missing lock spotted by TSAN and Coverity. 06833ac is described below commit 06833acb10f30343d1e2e970a31e11cb290e32ff Author: Cliff Jansen AuthorDate: Mon Nov 9 00:25:31 2020 -0800 PROTON-2292: TSAN failure in epoll_timer.c. Replace missing lock spotted by TSAN and Coverity. --- c/src/proactor/epoll_timer.c | 1 + 1 file changed, 1 insertion(+) diff --git a/c/src/proactor/epoll_timer.c b/c/src/proactor/epoll_timer.c index 58f0211..6c7c3db 100644 --- a/c/src/proactor/epoll_timer.c +++ b/c/src/proactor/epoll_timer.c @@ -287,6 +287,7 @@ void pni_timer_set(pni_timer_t *timer, uint64_t deadline) { pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeout, bool wake) { uint64_t now = pn_proactor_now_64(); + lock(&tm->context.mutex); tm->context.working = true; if (timeout) tm->timerfd_deadline = 0; - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch master updated: PROTON-1496: epoll proactor - improved timers implementation with single timerfd kernel resource
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new d31ad96 PROTON-1496: epoll proactor - improved timers implementation with single timerfd kernel resource d31ad96 is described below commit d31ad9652a1a63f856ed10772e362b4a155ecbf4 Author: Cliff Jansen AuthorDate: Sun Nov 8 11:56:58 2020 -0800 PROTON-1496: epoll proactor - improved timers implementation with single timerfd kernel resource --- c/CMakeLists.txt | 2 +- c/src/proactor/epoll-internal.h | 62 -- c/src/proactor/epoll.c| 338 -- c/src/proactor/epoll_raw_connection.c | 1 + c/src/proactor/epoll_timer.c | 380 ++ 5 files changed, 510 insertions(+), 273 deletions(-) diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt index 2967176..99e328b 100644 --- a/c/CMakeLists.txt +++ b/c/CMakeLists.txt @@ -340,7 +340,7 @@ if (PROACTOR STREQUAL "epoll" OR (NOT PROACTOR AND NOT BUILD_PROACTOR)) check_symbol_exists(epoll_wait "sys/epoll.h" HAVE_EPOLL) if (HAVE_EPOLL) set (PROACTOR_OK epoll) -set (qpid-proton-proactor src/proactor/epoll.c src/proactor/epoll_raw_connection.c ${qpid-proton-proactor-common}) +set (qpid-proton-proactor src/proactor/epoll.c src/proactor/epoll_raw_connection.c src/proactor/epoll_timer.c ${qpid-proton-proactor-common}) set (PROACTOR_LIBS Threads::Threads ${TIME_LIB}) endif() endif() diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 1b8edd3..b14b485 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -54,14 +54,14 @@ extern "C" { typedef struct acceptor_t acceptor_t; typedef struct tslot_t tslot_t; typedef pthread_mutex_t pmutex; +typedef struct pni_timer_t pni_timer_t; typedef enum { WAKE, /* see if any work to do in proactor/psocket context */ - PCONNECTION_IO, - PCONNECTION_TIMER, LISTENER_IO, - PROACTOR_TIMER, - RAW_CONNECTION_IO + PCONNECTION_IO, + RAW_CONNECTION_IO, + TIMER } epoll_type_t; // Data to use with epoll. @@ -73,19 +73,12 @@ typedef struct epoll_extended_t { pmutex barrier_mutex; } epoll_extended_t; -typedef struct ptimer_t { - pmutex mutex; - epoll_extended_t epoll_io; - bool timer_active; - bool in_doubt; // 0 or 1 callbacks are possible - bool shutting_down; -} ptimer_t; - typedef enum { PROACTOR, PCONNECTION, LISTENER, - RAW_CONNECTION + RAW_CONNECTION, + TIMER_MANAGER } pcontext_type_t; typedef struct pcontext_t { @@ -137,13 +130,24 @@ struct tslot_t { unsigned int earmark_override_gen; }; +typedef struct pni_timer_manager_t { + pcontext_t context; + epoll_extended_t epoll_timer; + pmutex deletion_mutex; + pni_timer_t *proactor_timer; + pn_list_t *timers_heap; + uint64_t timerfd_deadline; + bool sched_timeout; +} pni_timer_manager_t; + struct pn_proactor_t { pcontext_t context; - ptimer_t timer; + pni_timer_manager_t timer_manager; epoll_extended_t epoll_wake; epoll_extended_t epoll_interrupt; pn_event_batch_t batch; pcontext_t *contexts; /* track in-use contexts for PN_PROACTOR_INACTIVE and disconnect */ + pni_timer_t *timer; size_t disconnects_pending; /* unfinished proactor disconnects*/ // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch() bool need_interrupt; @@ -151,7 +155,6 @@ struct pn_proactor_t { bool need_timeout; bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */ bool timeout_processed; /* timeout event dispatched in the most recent event batch */ - bool timer_armed; /* timer is armed in epoll */ int context_count; // wake subsystem @@ -167,7 +170,6 @@ struct pn_proactor_t { pmutex overflow_mutex; // Sched vars specific to proactor context. - bool sched_timeout; bool sched_interrupt; // Global scheduling/poller vars. @@ -220,13 +222,12 @@ typedef struct psocket_t { typedef struct pconnection_t { psocket_t psocket; pcontext_t context; - ptimer_t timer; // TODO: review one timerfd per connection + pni_timer_t *timer; const char *host, *port; uint32_t new_events; int wake_count; // TODO: protected by context.mutex so should be moved in there (also really bool) bool server;/* accept, not connect */ bool tick_pending; - bool timer_armed; bool queued_disconnect; /* deferred from pn_proactor_disconnect() */ pn_condition_t *disconnect_condition; // Following values only changed by (sole) working context: @@ -250,7 +251,7 @@ typedef struct pconnection_t { struct addrinfo *ai; /* Current connect address */ pmutex rearm_mutex;/* protects pconne
[qpid-proton] branch master updated: NO-JIRA: Split out epoll proactor poller logic to separate routine for readability
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new 34ca28f NO-JIRA: Split out epoll proactor poller logic to separate routine for readability 34ca28f is described below commit 34ca28fae4a880151440c33312ca7f723441c2b2 Author: Cliff Jansen AuthorDate: Tue Oct 20 00:04:13 2020 -0700 NO-JIRA: Split out epoll proactor poller logic to separate routine for readability --- c/src/proactor/epoll.c | 292 + 1 file changed, 151 insertions(+), 141 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 01d9db8..4c00d47 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -101,7 +101,7 @@ // Maybe futex is even better? // See other "TODO" in code. // -// Consider case of large number of wakes: proactor_do_epoll() could start by +// Consider case of large number of wakes: next_event_batch() could start by // looking for pending wakes before a kernel call to epoll_wait(), or there // could be several eventfds with random assignment of wakeables. @@ -691,6 +691,7 @@ static void set_pconnection(pn_connection_t* c, pconnection_t *pc) { static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool wake, bool topup); static void write_flush(pconnection_t *pc); static void listener_begin_close(pn_listener_t* l); +static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block); static void poller_done(struct pn_proactor_t* p, tslot_t *ts); static inline pconnection_t *psocket_pconnection(psocket_t* ps) { @@ -2580,7 +2581,7 @@ static pcontext_t *next_runnable(pn_proactor_t *p, tslot_t *ts) { return NULL; } -static pn_event_batch_t *proactor_do_epoll(pn_proactor_t* p, bool can_block) { +static pn_event_batch_t *next_event_batch(pn_proactor_t* p, bool can_block) { lock(&p->tslot_mutex); tslot_t * ts = find_tslot(p); unlock(&p->tslot_mutex); @@ -2591,9 +2592,9 @@ static pn_event_batch_t *proactor_do_epoll(pn_proactor_t* p, bool can_block) { assert(ts->state == UNUSED || ts->state == NEW); ts->state = PROCESSING; + // Process outstanding epoll events until we get a batch or need to block. while (true) { -// Process outstanding epoll events until we get a batch or need to block. - +// First see if there are any contexts waiting to run and perhaps generate new Proton events, pcontext_t *ctx = next_runnable(p, ts); if (ctx) { ts->state = BATCHING; @@ -2611,155 +2612,164 @@ static pn_event_batch_t *proactor_do_epoll(pn_proactor_t* p, bool can_block) { continue; // Long time may have passed. Back to beginning. } -// poll or wait for a runnable context +// Poll or wait for a runnable context if (p->poller == NULL) { + bool return_immediately; p->poller = ts; - // As poller with lots to do, be mindful of hogging the sched lock. Release when making kernel calls. - assert(p->n_runnables == 0); - if (p->thread_count > p->thread_capacity) -grow_poller_bufs(p); - p->next_runnable = 0; - p->n_warm_runnables = 0; - p->last_earmark = NULL; - - bool unfinished_earmarks = p->earmark_count > 0; - bool new_wakes = false; - bool epoll_immediate = unfinished_earmarks || !can_block; - assert(!p->sched_wake_first); - if (!epoll_immediate) { -lock(&p->eventfd_mutex); -if (p->wake_list_first) { - epoll_immediate = true; - new_wakes = true; -} else { - p->wakes_in_progress = false; -} -unlock(&p->eventfd_mutex); + // Get new epoll events (if any) and mark the relevant contexts as runnable + return_immediately = poller_do_epoll(p, ts, can_block); + p->poller = NULL; + if (return_immediately) { +// Check if another thread is available to continue epoll-ing. +tslot_t *res_ts = resume_one_thread(p); +ts->state = UNUSED; +unlock(&p->sched_mutex); +if (res_ts) resume(p, res_ts); +return NULL; } - int timeout = (epoll_immediate) ? 0 : -1; - p->poller_suspended = (timeout == -1); + poller_done(p, ts); // put suspended threads to work. +} else if (!can_block) { + ts->state = UNUSED; unlock(&p->sched_mutex); + return NULL; +} else { + // TODO: loop while !poller_suspended, since new work coming + suspend(p, ts); +} + } // while +} - int n = epoll_wait(p->epollfd, p->kevents, p->kevents_capacity, timeout); - - lock(&p->sched_mutex); - p->poller_suspended = false; +// Call with sche
[qpid-proton] branch master updated: PROTON-2277: move comment associated with moved code. grrr
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new e2d82fc PROTON-2277: move comment associated with moved code. grrr e2d82fc is described below commit e2d82fc1fe05e79277fc67c53b087446238a838f Author: Cliff Jansen AuthorDate: Mon Sep 21 09:58:19 2020 -0700 PROTON-2277: move comment associated with moved code. grrr --- c/src/proactor/epoll-internal.h | 7 +++ 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 56cfdda..1b8edd3 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -242,9 +242,9 @@ typedef struct pconnection_t { const char *wbuf_current; size_t wbuf_remaining; size_t wbuf_completed; - pn_event_type_t current_event_type; - uint32_t process_args; - uint32_t process_events; + pn_event_type_t current_event_type;/* Sole use for debugging, i.e. crash analysis of optimized code. */ + uint32_t process_args; /* Sole use for debugging */ + uint32_t process_events; /* Sole use for debugging */ struct pn_netaddr_t local, remote; /* Actual addresses */ struct addrinfo *addrinfo; /* Resolved address list */ struct addrinfo *ai; /* Current connect address */ @@ -252,7 +252,6 @@ typedef struct pconnection_t { bool io_doublecheck; /* callbacks made and new IO may have arrived */ bool sched_timeout; char addr_buf[1]; - // For debugging help for core dumps with optimized code. } pconnection_t; /* - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch master updated: PROTON-2277: move debug variable memory to separate from net address buffer
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new 4ee924e PROTON-2277: move debug variable memory to separate from net address buffer 4ee924e is described below commit 4ee924eaf0616ed93c4495affbf07bd79c9565a0 Author: Cliff Jansen AuthorDate: Mon Sep 21 09:45:14 2020 -0700 PROTON-2277: move debug variable memory to separate from net address buffer --- c/src/proactor/epoll-internal.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index c0c2b68..56cfdda 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -242,6 +242,9 @@ typedef struct pconnection_t { const char *wbuf_current; size_t wbuf_remaining; size_t wbuf_completed; + pn_event_type_t current_event_type; + uint32_t process_args; + uint32_t process_events; struct pn_netaddr_t local, remote; /* Actual addresses */ struct addrinfo *addrinfo; /* Resolved address list */ struct addrinfo *ai; /* Current connect address */ @@ -250,9 +253,6 @@ typedef struct pconnection_t { bool sched_timeout; char addr_buf[1]; // For debugging help for core dumps with optimized code. - pn_event_type_t current_event_type; - uint32_t process_args; - uint32_t process_events; } pconnection_t; /* - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch master updated: PROTON-2277: maintain some extra epoll related state to aid debugging
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new 31a5cd2 PROTON-2277: maintain some extra epoll related state to aid debugging 31a5cd2 is described below commit 31a5cd2e3f162cff25ee2b7b2cf54f4a93fa6aa0 Author: Cliff Jansen AuthorDate: Fri Sep 18 10:51:35 2020 -0700 PROTON-2277: maintain some extra epoll related state to aid debugging --- c/src/proactor/epoll-internal.h | 6 ++ c/src/proactor/epoll.c | 28 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 78cad14..c0c2b68 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -190,6 +190,8 @@ struct pn_proactor_t { int earmark_count; bool earmark_drain; bool sched_wakes_pending; + // For debugging help for core dumps with optimized code. + pn_event_type_t current_event_type; // Mostly read only: after init or once thread_count stabilizes pn_collector_t *collector __attribute__((aligned(64))); @@ -247,6 +249,10 @@ typedef struct pconnection_t { bool io_doublecheck; /* callbacks made and new IO may have arrived */ bool sched_timeout; char addr_buf[1]; + // For debugging help for core dumps with optimized code. + pn_event_type_t current_event_type; + uint32_t process_args; + uint32_t process_events; } pconnection_t; /* diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 1e693fe..01d9db8 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -1012,7 +1012,10 @@ static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) { } } } - if (e) pc->output_drained = false; + if (e) { +pc->output_drained = false; +pc->current_event_type = pn_event_type(e); + } return e; } @@ -1074,23 +1077,34 @@ static inline void pconnection_rearm(pconnection_t *pc, int wanted_now) { /* Only call when context switch is imminent. Sched lock is highly contested. */ // Call with both context and sched locks. static bool pconnection_sched_sync(pconnection_t *pc, bool *timerfd_fired) { + uint32_t sync_events = 0; + uint32_t sync_args = 0; *timerfd_fired = false; if (pc->sched_timeout) { *timerfd_fired = true;; pc->timer_armed = false; pc->sched_timeout = false; +sync_args |= (1 << 1); } if (pc->psocket.sched_io_events) { pc->new_events = pc->psocket.sched_io_events; pc->psocket.sched_io_events = 0; pc->current_arm = 0; // or outside lock? +sync_events = pc->new_events; } if (pc->context.sched_wake) { pc->context.sched_wake = false; wake_done(&pc->context); +sync_args |= 1; } pc->context.sched_pending = false; + if (sync_args || sync_events) { +// Only replace if poller has found new work for us. +pc->process_args = sync_args; +pc->process_events = sync_events; + } + // Indicate if there are free proactor threads pn_proactor_t *p = pc->context.proactor; return p->poller_suspended || p->suspend_list_head; @@ -1233,7 +1247,10 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool waking = false; bool tick_required = false; bool immediate_write = false; - + if (!topup) { +pc->process_events = events; +pc->process_args = (timeout << 1) | sched_wake; + } // Don't touch data exclusive to working thread (yet). if (timeout) { rearm_timer = true; @@ -2206,8 +2223,11 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) { lock(&p->context.mutex); proactor_update_batch(p); pn_event_t *e = pn_collector_next(p->collector); - if (e && pn_event_type(e) == PN_PROACTOR_TIMEOUT) -p->timeout_processed = true; + if (e) { +p->current_event_type = pn_event_type(e); +if (p->current_event_type == PN_PROACTOR_TIMEOUT) + p->timeout_processed = true; + } unlock(&p->context.mutex); return pni_log_event(p, e); } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch master updated: PROTON-2226: remove assertion check that generates false positive
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new c8aba5e PROTON-2226: remove assertion check that generates false positive c8aba5e is described below commit c8aba5ecb67d140aca03da3c1e20510c85bc0fdf Author: Cliff Jansen AuthorDate: Tue Aug 11 08:37:36 2020 -0700 PROTON-2226: remove assertion check that generates false positive --- c/src/core/engine.c | 1 - 1 file changed, 1 deletion(-) diff --git a/c/src/core/engine.c b/c/src/core/engine.c index e1a6610..8300e42 100644 --- a/c/src/core/engine.c +++ b/c/src/core/engine.c @@ -686,7 +686,6 @@ static void pni_add_work(pn_connection_t *connection, pn_delivery_t *delivery) { if (!delivery->work) { -assert(!delivery->local.settled); // never allow settled deliveries LL_ADD(connection, work, delivery); delivery->work = true; } - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch master updated: PROTON-2258: mark pn_work_head(), pn_work_next(), and Python equivalents as deprecated.
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new ee88029 PROTON-2258: mark pn_work_head(), pn_work_next(), and Python equivalents as deprecated. ee88029 is described below commit ee880294923cdf81a0f693282dde6df1e320b599 Author: Cliff Jansen AuthorDate: Thu Aug 6 17:03:03 2020 -0700 PROTON-2258: mark pn_work_head(), pn_work_next(), and Python equivalents as deprecated. --- c/include/proton/delivery.h | 2 ++ c/src/core/engine.c | 5 + python/cproton.i| 4 python/proton/_delivery.py | 4 +++- python/proton/_endpoints.py | 4 +++- 5 files changed, 17 insertions(+), 2 deletions(-) diff --git a/c/include/proton/delivery.h b/c/include/proton/delivery.h index ec034e9..118fbf6 100644 --- a/c/include/proton/delivery.h +++ b/c/include/proton/delivery.h @@ -328,6 +328,7 @@ PN_EXTERN bool pn_delivery_buffered(pn_delivery_t *delivery); * @return the first delivery object that needs to be serviced, else * NULL if none */ +PN_DEPRECATED("Use the PN_DELIVERY event to track deliveries with pending operations") PN_EXTERN pn_delivery_t *pn_work_head(pn_connection_t *connection); /** @@ -339,6 +340,7 @@ PN_EXTERN pn_delivery_t *pn_work_head(pn_connection_t *connection); * @return the next delivery that has pending operations, else * NULL if none */ +PN_DEPRECATED("Use the PN_DELIVERY event to track deliveries with pending operations") PN_EXTERN pn_delivery_t *pn_work_next(pn_delivery_t *delivery); /** diff --git a/c/src/core/engine.c b/c/src/core/engine.c index 02062f6..e1a6610 100644 --- a/c/src/core/engine.c +++ b/c/src/core/engine.c @@ -19,6 +19,9 @@ * */ +/* for pn_work_head and related deprecations */ +#define PN_USE_DEPRECATED_API 1 + #include "engine-internal.h" #include "framing.h" @@ -2407,3 +2410,5 @@ const char *pn_disposition_type_name(uint64_t d) { default: return "unknown"; } } + +#undef PN_USE_DEPRECATED_API diff --git a/python/cproton.i b/python/cproton.i index 0dcc2ce..2a7bff9 100644 --- a/python/cproton.i +++ b/python/cproton.i @@ -22,6 +22,10 @@ #if defined(_WIN32) && ! defined(__CYGWIN__) #include #endif + +/* TODO: Remove once pn_work_head() and related have been removed from Proton */ +#define PN_USE_DEPRECATED_API 1 + #include #include #include diff --git a/python/proton/_delivery.py b/python/proton/_delivery.py index f5c955f..0f871cf 100644 --- a/python/proton/_delivery.py +++ b/python/proton/_delivery.py @@ -445,7 +445,9 @@ class Delivery(Wrapper): @property def work_next(self): -""" +"""Deprecated: use on_message(), on_accepted(), on_rejected(), +on_released(), and on_settled() instead. + The next :class:`Delivery` on the connection that has pending operations. diff --git a/python/proton/_endpoints.py b/python/proton/_endpoints.py index 50cf677..e528de6 100644 --- a/python/proton/_endpoints.py +++ b/python/proton/_endpoints.py @@ -472,7 +472,9 @@ class Connection(Wrapper, Endpoint): @property def work_head(self): -""" +"""Deprecated: use on_message(), on_accepted(), on_rejected(), +on_released(), and on_settled() instead. + Extracts the first delivery on the connection that has pending operations. - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-cpp] branch master updated: QPID-8453: fix boundary test between map8 and map32 encodings for AMQP 1.0 messages
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-cpp.git The following commit(s) were added to refs/heads/master by this push: new 9be8db5 QPID-8453: fix boundary test between map8 and map32 encodings for AMQP 1.0 messages 9be8db5 is described below commit 9be8db506b10584872dd14f8fef5cc8c53118f79 Author: Cliff Jansen AuthorDate: Mon Jul 13 10:18:14 2020 -0700 QPID-8453: fix boundary test between map8 and map32 encodings for AMQP 1.0 messages --- src/qpid/amqp/MapEncoder.cpp| 4 +++- src/qpid/amqp/MapSizeCalculator.cpp | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/qpid/amqp/MapEncoder.cpp b/src/qpid/amqp/MapEncoder.cpp index cf8ef4e..43e1954 100644 --- a/src/qpid/amqp/MapEncoder.cpp +++ b/src/qpid/amqp/MapEncoder.cpp @@ -116,9 +116,11 @@ void MapEncoder::handleString(const CharSequence& key, const CharSequence& value void MapEncoder::writeMetaData(size_t size, size_t count, const Descriptor* d) { -if (count > 255 || size > 255) { + +if (count > 255 || (size+1) > 255) { writeMap32MetaData((uint32_t) size, (uint32_t) count, d); } else { +/* can use more compact format */ writeMap8MetaData((uint8_t) size, (uint8_t) count, d); } } diff --git a/src/qpid/amqp/MapSizeCalculator.cpp b/src/qpid/amqp/MapSizeCalculator.cpp index 2da1521..4107df6 100644 --- a/src/qpid/amqp/MapSizeCalculator.cpp +++ b/src/qpid/amqp/MapSizeCalculator.cpp @@ -139,7 +139,7 @@ size_t MapSizeCalculator::getTotalSizeRequired(const Descriptor* d) const size_t result(size); if (d) result += d->getSize(); result += 1/*typecode*/; -if (count * 2 > 255 || size > 255) { +if (count * 2 > 255 || (size+1) > 255) { result += 4/*size*/ + 4/*count*/; } else { result += 1/*size*/ + 1/*count*/; - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch master updated: PROTON-2240: epoll proactor - fix cases where a fired timer is not rearmed as expected
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new 4a22a0d PROTON-2240: epoll proactor - fix cases where a fired timer is not rearmed as expected 4a22a0d is described below commit 4a22a0dc59016fdf64dc7ed056467a52a699460e Author: Cliff Jansen AuthorDate: Wed Jun 17 22:53:07 2020 -0700 PROTON-2240: epoll proactor - fix cases where a fired timer is not rearmed as expected --- c/src/proactor/epoll.c | 33 + 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 20376e4..0f6a606 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -1054,6 +1054,15 @@ static inline bool pconnection_wclosed(pconnection_t *pc) { return pn_connection_driver_write_closed(&pc->driver); } +// Call with pc context locked. +static void pconnection_rearm_timer(pconnection_t *pc) { + if (!pc->timer_armed && !pc->timer.shutting_down && + pc->timer.epoll_io.fd >= 0 && pc->timer.epoll_io.polling) { +pc->timer_armed = true; +rearm(pc->psocket.proactor, &pc->timer.epoll_io); + } +} + /* Call only from working context (no competitor for pc->current_arm or connection driver). If true returned, caller must do pconnection_rearm(). @@ -1093,9 +1102,11 @@ static inline void pconnection_rearm(pconnection_t *pc) { /* Only call when context switch is imminent. Sched lock is highly contested. */ // Call with both context and sched locks. -static bool pconnection_sched_sync(pconnection_t *pc) { +static bool pconnection_sched_sync(pconnection_t *pc, bool *timerfd_fired) { + *timerfd_fired = false; if (pc->sched_timeout) { -pc->tick_pending = true; +*timerfd_fired = true;; +pc->timer_armed = false; pc->sched_timeout = false; } if (pc->psocket.sched_io_events) { @@ -1135,10 +1146,14 @@ static void pconnection_done(pconnection_t *pc) { // working context while the lock is held. Need sched_sync too to drain possible stale wake. pc->hog_count = 0; bool has_event = pconnection_has_event(pc); + bool timerfd_fired; // Do as little as possible while holding the sched lock lock(&p->sched_mutex); - pconnection_sched_sync(pc); + pconnection_sched_sync(pc, &timerfd_fired); unlock(&p->sched_mutex); + if (timerfd_fired) +if (ptimer_callback(&pc->timer) != 0) + pc->tick_pending = true; if (has_event || pconnection_work_pending(pc)) { self_wake = true; @@ -1159,6 +1174,7 @@ static void pconnection_done(pconnection_t *pc) { if (self_wake) notify = wake(&pc->context); + pconnection_rearm_timer(pc); bool rearm = pconnection_rearm_check(pc); unlock(&pc->context.mutex); @@ -1405,9 +1421,13 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, } // Never stop working while work remains. hog_count exception to this rule is elsewhere. + bool timerfd_fired; lock(&pc->context.proactor->sched_mutex); - bool workers_free = pconnection_sched_sync(pc); + bool workers_free = pconnection_sched_sync(pc, &timerfd_fired); unlock(&pc->context.proactor->sched_mutex); + if (timerfd_fired) +if (ptimer_callback(&pc->timer) != 0) + pc->tick_pending = true; if (pconnection_work_pending(pc)) { goto retry; // TODO: get rid of goto without adding more locking @@ -1433,10 +1453,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, goto retry; } - if (!pc->timer_armed && !pc->timer.shutting_down && pc->timer.epoll_io.fd >= 0) { -pc->timer_armed = true; -rearm(pc->psocket.proactor, &pc->timer.epoll_io); - } + pconnection_rearm_timer(pc); bool rearm_pc = pconnection_rearm_check(pc); // holds rearm_mutex until pconnection_rearm() below unlock(&pc->context.mutex); - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch master updated: PROTON-2211: fix fd leak on connections in new epoll proactor implementation
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new 9e1fff5 PROTON-2211: fix fd leak on connections in new epoll proactor implementation 9e1fff5 is described below commit 9e1fff5f7998fcb171c958c10a84d72b4df7eb93 Author: Cliff Jansen AuthorDate: Thu May 7 07:33:16 2020 -0700 PROTON-2211: fix fd leak on connections in new epoll proactor implementation --- c/src/proactor/epoll.c | 4 1 file changed, 4 insertions(+) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 6667365..6ae9660 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -953,7 +953,11 @@ static void pconnection_cleanup(pconnection_t *pc) { stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd); if (fd != -1) pclosefd(pc->psocket.proactor, fd); + + fd = pc->timer.epoll_io.fd; stop_polling(&pc->timer.epoll_io, pc->psocket.proactor->epollfd); + if (fd != -1) +pclosefd(pc->psocket.proactor, fd); ptimer_finalize(&pc->timer); lock(&pc->context.mutex); - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch master updated: PROTON-2203: fix duplicate listener socket rearming and rationalize rearming locking
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new 9f1e5f0 PROTON-2203: fix duplicate listener socket rearming and rationalize rearming locking 9f1e5f0 is described below commit 9f1e5f0cd9cd1c146cee94ca4d6ff6ed4b71c139 Author: Cliff Jansen AuthorDate: Thu Apr 30 22:47:17 2020 -0700 PROTON-2203: fix duplicate listener socket rearming and rationalize rearming locking --- c/src/proactor/epoll-internal.h | 1 - c/src/proactor/epoll.c | 40 +--- 2 files changed, 13 insertions(+), 28 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 20b01ac..fd02817 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -276,7 +276,6 @@ struct pn_listener_t { size_t pending_count; /* number of pending accepted connections */ size_t backlog; /* size of pending accepted array */ bool close_dispatched; - pmutex rearm_mutex; /* orders rearms/disarms, nothing else */ uint32_t sched_io_events; }; diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 76b5c37..6667365 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -848,15 +848,11 @@ static void proactor_rearm_overflow(pn_proactor_t *p) { assert(a->overflowed); a->overflowed = false; if (rearming) { - lock(&l->rearm_mutex); + rearm(p, &a->psocket.epoll_io); a->armed = true; } else notify = wake(&l->context); unlock(&l->context.mutex); -if (rearming) { - rearm(p, &a->psocket.epoll_io); - unlock(&l->rearm_mutex); -} if (notify) wake_notify(&l->context); a = acceptor_list_next(&ovflw); } @@ -1642,7 +1638,6 @@ pn_listener_t *pn_listener() { } pn_proactor_t *unknown = NULL; // won't know until pn_proactor_listen pcontext_init(&l->context, LISTENER, unknown); -pmutex_init(&l->rearm_mutex); } return l; } @@ -1702,11 +1697,9 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in ps->epoll_io.fd = fd; ps->epoll_io.wanted = EPOLLIN; ps->epoll_io.polling = false; - lock(&l->rearm_mutex); start_polling(&ps->epoll_io, ps->proactor->epollfd); // TODO: check for error l->active_count++; acceptor->armed = true; - unlock(&l->rearm_mutex); } else { close(fd); } @@ -1745,7 +1738,6 @@ static inline bool listener_can_free(pn_listener_t *l) { static inline void listener_final_free(pn_listener_t *l) { pcontext_finalize(&l->context); - pmutex_finalize(&l->rearm_mutex); free(l->acceptors); free(l->pending_accepteds); free(l); @@ -1780,7 +1772,6 @@ static void listener_begin_close(pn_listener_t* l) { acceptor_t *a = &l->acceptors[i]; psocket_t *ps = &a->psocket; if (ps->epoll_io.fd >= 0) { -lock(&l->rearm_mutex); if (a->armed) { shutdown(ps->epoll_io.fd, SHUT_RD); // Force epoll event and callback } else { @@ -1789,7 +1780,6 @@ static void listener_begin_close(pn_listener_t* l) { ps->epoll_io.fd = -1; l->active_count--; } -unlock(&l->rearm_mutex); } } /* Close all sockets waiting for a pn_listener_accept2() */ @@ -1869,17 +1859,13 @@ static pn_event_batch_t *listener_process(pn_listener_t *l, int n_events, bool w uint32_t events = ps->working_io_events; ps->working_io_events = 0; if (l->context.closing) { - lock(&l->rearm_mutex); l->acceptors[i].armed = false; stop_polling(&ps->epoll_io, ps->proactor->epollfd); - unlock(&l->rearm_mutex); close(ps->epoll_io.fd); ps->epoll_io.fd = -1; l->active_count--; } else { - lock(&l->rearm_mutex); l->acceptors[i].armed = false; - unlock(&l->rearm_mutex); if (events & EPOLLRDHUP) { /* Calls listener_begin_close which closes all the listener's sockets */ psocket_error(ps, errno, "listener epoll"); @@ -1929,29 +1915,29 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) { static void listener_done(pn_listener_t *l) { pn_proactor_t *p = l->context.proactor; tslot_t *ts = l->context.runner; + lock(&l->context.mutex); // Just in case the app didn't accept all the pending accepts //
[qpid-proton] branch master updated: PROTON-2172: temporarily disable test pending proper epoll timers fix
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new b496997 PROTON-2172: temporarily disable test pending proper epoll timers fix b496997 is described below commit b496997bf7ec201e1469f5d1707f55f821bddab2 Author: Cliff Jansen AuthorDate: Thu Apr 23 11:48:14 2020 -0700 PROTON-2172: temporarily disable test pending proper epoll timers fix --- c/tests/fdlimit.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/c/tests/fdlimit.py b/c/tests/fdlimit.py index 349ab0b..6848c68 100644 --- a/c/tests/fdlimit.py +++ b/c/tests/fdlimit.py @@ -51,7 +51,8 @@ class FdLimitTest(unittest.TestCase): if cls.devnull: cls.devnull.close() -@unittest.skipUnless(prlimit_available, "prlimit not available") +# @unittest.skipUnless(prlimit_available, "prlimit not available") +@unittest.skip("temporarily disabled (epoll fix pending)") def test_fd_limit_broker(self): """Check behaviour when running out of file descriptors on accept""" # Not too many FDs but not too few either, some are used for system purposes. - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 06/09: PROTON-2130: epoll reworking: - Only keep fd in epoll_extended_t remove from ptimer_t, psocket_t - Remove backpointers and consistently use structure embedding to go from: psocket
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 9e1990b1345f9917648659399f198e74fba7d5d7 Author: Andrew Stitcher AuthorDate: Thu Mar 19 23:35:36 2020 -0400 PROTON-2130: epoll reworking: - Only keep fd in epoll_extended_t remove from ptimer_t, psocket_t - Remove backpointers and consistently use structure embedding to go from: psocket->pconnection; psocket->pn_listener; psocket->acceptor; pcontext->pconnection; pcontext->pn_listener; pn_event_batch->pn_proactor; pn_batch_event->pn_listener; pn_batch_event->pconnection; - Move address string from being stored in psocket to being stored in pconnection and pn_listener - saves strings for multiple listening sockets - Rationalise post_event by switching on event types instead of the previous ad hoc event type detection. --- c/src/proactor/epoll-internal.h | 37 + c/src/proactor/epoll.c | 167 +--- 2 files changed, 107 insertions(+), 97 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 6a13e7f..e639e48 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -57,7 +57,6 @@ typedef enum { // Data to use with epoll. typedef struct epoll_extended_t { - struct psocket_t *psocket; // pconnection, listener, or NULL -> proactor int fd; epoll_type_t type; // io/timer/wakeup uint32_t wanted; // events to poll for @@ -67,7 +66,6 @@ typedef struct epoll_extended_t { typedef struct ptimer_t { pmutex mutex; - int timerfd; epoll_extended_t epoll_io; bool timer_active; bool in_doubt; // 0 or 1 callbacks are possible @@ -131,19 +129,6 @@ struct tslot_t { unsigned int earmark_override_gen; }; -/* common to connection and listener */ -typedef struct psocket_t { - pn_proactor_t *proactor; - // Remaining protected by the pconnection/listener mutex - int sockfd; - epoll_extended_t epoll_io; - pn_listener_t *listener; /* NULL for a connection socket */ - char addr_buf[PN_MAX_ADDR]; - const char *host, *port; - uint32_t sched_io_events; - uint32_t working_io_events; -} psocket_t; - struct pn_proactor_t { pcontext_t context; ptimer_t timer; @@ -213,9 +198,21 @@ struct pn_proactor_t { bool shutting_down; }; +/* common to connection and listener */ +typedef struct psocket_t { + pn_proactor_t *proactor; + // Remaining protected by the pconnection/listener mutex + epoll_extended_t epoll_io; + uint32_t sched_io_events; + uint32_t working_io_events; +} psocket_t; + typedef struct pconnection_t { psocket_t psocket; pcontext_t context; + ptimer_t timer; // TODO: review one timerfd per connection + char addr_buf[PN_MAX_ADDR]; + const char *host, *port; uint32_t new_events; int wake_count; bool server;/* accept, not connect */ @@ -223,7 +220,6 @@ typedef struct pconnection_t { bool timer_armed; bool queued_disconnect; /* deferred from pn_proactor_disconnect() */ pn_condition_t *disconnect_condition; - ptimer_t timer; // TODO: review one timerfd per connection // Following values only changed by (sole) working context: uint32_t current_arm; // active epoll io events bool connected; @@ -256,18 +252,21 @@ typedef struct pconnection_t { struct acceptor_t{ psocket_t psocket; + struct pn_netaddr_t addr; /* listening address */ + pn_listener_t *listener; + acceptor_t *next; /* next listener list member */ int accepted_fd; bool armed; bool overflowed; - acceptor_t *next; /* next listener list member */ - struct pn_netaddr_t addr; /* listening address */ }; struct pn_listener_t { + pcontext_t context; acceptor_t *acceptors; /* Array of listening sockets */ size_t acceptors_size; + char addr_buf[PN_MAX_ADDR]; + const char *host, *port; int active_count; /* Number of listener sockets registered with epoll */ - pcontext_t context; pn_condition_t *condition; pn_collector_t *collector; pn_event_batch_t batch; diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index f8ebbf4..b891133 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -177,18 +177,16 @@ static void memory_barrier(epoll_extended_t *ee) { */ static bool ptimer_init(ptimer_t *pt, struct psocket_t *ps) { - pt->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); pmutex_init(&pt->mutex); pt->timer_active = false; pt->in_doubt = false; pt->shutting_down = false; epoll_type_t type = ps ? PCONNECTION_TIMER : PROACTOR_TIMER; - pt->epoll_io.psocket = ps; - pt->epoll_io.fd = pt->timerfd; + pt->epoll_io.fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
[qpid-proton] 05/09: PROTON-2130: Split out structs from epoll.c to make it easier to mess with them
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit b860538ba698a6d6a85cc47a44e9e522d4825d47 Author: Andrew Stitcher AuthorDate: Mon Mar 16 13:09:05 2020 -0400 PROTON-2130: Split out structs from epoll.c to make it easier to mess with them --- c/src/proactor/epoll-internal.h | 288 c/src/proactor/epoll.c | 238 + 2 files changed, 289 insertions(+), 237 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h new file mode 100644 index 000..6a13e7f --- /dev/null +++ b/c/src/proactor/epoll-internal.h @@ -0,0 +1,288 @@ +#ifndef PROACTOR_EPOLL_INTERNAL_H +#define PROACTOR_EPOLL_INTERNAL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include +#include +#include + +#include +#include +#include + + +#include +#include + +#include "netaddr-internal.h" + +#ifdef __cplusplus +extern "C" { +#endif + +//typedef struct pn_proactor_t pn_proactor_t; +//typedef struct pn_listener_t pn_listener_t; +//typedef struct pn_connection_driver_t pn_connection_driver_t; +typedef struct acceptor_t acceptor_t; +typedef struct tslot_t tslot_t; +typedef pthread_mutex_t pmutex; + +typedef enum { + WAKE, /* see if any work to do in proactor/psocket context */ + PCONNECTION_IO, + PCONNECTION_TIMER, + LISTENER_IO, + PROACTOR_TIMER +} epoll_type_t; + +// Data to use with epoll. +typedef struct epoll_extended_t { + struct psocket_t *psocket; // pconnection, listener, or NULL -> proactor + int fd; + epoll_type_t type; // io/timer/wakeup + uint32_t wanted; // events to poll for + bool polling; + pmutex barrier_mutex; +} epoll_extended_t; + +typedef struct ptimer_t { + pmutex mutex; + int timerfd; + epoll_extended_t epoll_io; + bool timer_active; + bool in_doubt; // 0 or 1 callbacks are possible + bool shutting_down; +} ptimer_t; + +typedef enum { + PROACTOR, + PCONNECTION, + LISTENER, + WAKEABLE +} pcontext_type_t; + +typedef struct pcontext_t { + pmutex mutex; + pn_proactor_t *proactor; /* Immutable */ + void *owner; /* Instance governed by the context */ + pcontext_type_t type; + bool working; + bool on_wake_list; + bool wake_pending; // unprocessed eventfd wake callback (convert to bool?) + struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex + bool closing; + // Next 4 are protected by the proactor mutex + struct pcontext_t* next; /* Protected by proactor.mutex */ + struct pcontext_t* prev; /* Protected by proactor.mutex */ + int disconnect_ops; /* ops remaining before disconnect complete */ + bool disconnecting; /* pn_proactor_disconnect */ + // Protected by schedule mutex + tslot_t *runner __attribute__((aligned(64))); /* designated or running thread */ + tslot_t *prev_runner; + bool sched_wake; + bool sched_pending; /* If true, one or more unseen epoll or other events to process() */ + bool runnable ; /* in need of scheduling */ +} pcontext_t; + +typedef enum { + NEW, + UNUSED, /* pn_proactor_done() called, may never come back */ + SUSPENDED, + PROCESSING, /* Hunting for a context */ + BATCHING, /* Doing work on behalf of a context */ + DELETING, + POLLING +} tslot_state; + +// Epoll proactor's concept of a worker thread provided by the application. +struct tslot_t { + pmutex mutex; // suspend and resume + pthread_cond_t cond; + unsigned int generation; + bool suspended; + volatile bool scheduled; + tslot_state state; + pcontext_t *context; + pcontext_t *prev_context; + bool earmarked; + tslot_t *suspend_list_prev; + tslot_t *suspend_list_next; + tslot_t *earmark_override; // on earmark_drain, which thread was unassigned + unsigned int earmark_override_gen; +}; + +/* common to connection and listener */ +typedef struct psocket_t { + pn_proactor_t *proactor; + // Remaining protected by the pconnection/l
[qpid-proton] 04/09: PROTON-2130: epoll proactor race/deadlock fixes
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 7b000f12448a93a7aa1168fbca416a036547a92a Author: Cliff Jansen AuthorDate: Wed Dec 11 10:19:08 2019 -0800 PROTON-2130: epoll proactor race/deadlock fixes --- c/src/proactor/epoll.c | 66 +++--- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 21c611f..9390595 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -87,6 +87,7 @@ #include #include #include +#include #include "./netaddr-internal.h" /* Include after socket/inet headers */ @@ -940,6 +941,8 @@ static void write_flush(pconnection_t *pc); static void listener_begin_close(pn_listener_t* l); static void proactor_add(pcontext_t *ctx); static bool proactor_remove(pcontext_t *ctx); +static void poller_done(struct pn_proactor_t* p, tslot_t *ts); + static inline pconnection_t *psocket_pconnection(psocket_t* ps) { return ps->listener ? NULL : (pconnection_t*)ps; @@ -2928,14 +2931,36 @@ static pn_event_batch_t *proactor_do_epoll(pn_proactor_t* p, bool can_block) { } } - // Create a list of available threads to put to work. + poller_done(p, ts); // put suspended threads to work. + // p->poller has been released, so a new poller may already be running. +} else if (!can_block) { + ts->state = UNUSED; + unlock(&p->sched_mutex); + return NULL; +} else { + // TODO: loop while !poller_suspended, since new work coming + suspend(p, ts); +} + } // while +} - int resume_list_count = 0; +// Call with sched lock, but only from poller context. +static void poller_done(struct pn_proactor_t* p, tslot_t *ts) { + // Create a list of available threads to put to work. + // ts is the poller thread + int resume_list_count = 0; + tslot_t **resume_list2 = NULL; + + if (p->suspend_list_count) { +int max_resumes = p->n_warm_runnables + p->n_runnables; +max_resumes = pn_min(max_resumes, p->suspend_list_count); +if (max_resumes) { + resume_list2 = (tslot_t **) alloca(max_resumes * sizeof(tslot_t *)); for (int i = 0; i < p->n_warm_runnables ; i++) { -ctx = p->warm_runnables[i]; +pcontext_t *ctx = p->warm_runnables[i]; tslot_t *tsp = ctx->runner; if (tsp->state == SUSPENDED) { - p->resume_list[resume_list_count++] = tsp; + resume_list2[resume_list_count++] = tsp; LL_REMOVE(p, suspend_list, tsp); p->suspend_list_count--; tsp->state = PROCESSING; @@ -2956,34 +2981,25 @@ static pn_event_batch_t *proactor_do_epoll(pn_proactor_t* p, bool can_block) { for (int i = 0; i < new_runners; i++) { tslot_t *tsp = p->suspend_list_head; assert(tsp); -p->resume_list[resume_list_count++] = tsp; +resume_list2[resume_list_count++] = tsp; LL_REMOVE(p, suspend_list, tsp); p->suspend_list_count--; tsp->state = PROCESSING; } +} + } + p->poller = NULL; - p->poller = NULL; - // New poller may run concurrently. Touch only this thread's stack for rest of block. - - if (resume_list_count) { -unlock(&p->sched_mutex); -for (int i = 0; i < resume_list_count; i++) { - resume(p, p->resume_list[i]); -} -lock(&p->sched_mutex); - } -} else if (!can_block) { - ts->state = UNUSED; - unlock(&p->sched_mutex); - return NULL; -} else { - // TODO: loop while !poller_suspended, since new work coming - suspend(p, ts); + if (resume_list_count) { +// Allows a new poller to run concurrently. Touch only stack vars. +unlock(&p->sched_mutex); +for (int i = 0; i < resume_list_count; i++) { + resume(p, resume_list2[i]); } - } // while +lock(&p->sched_mutex); + } } - pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) { return proactor_do_epoll(p, true); } @@ -3042,12 +3058,12 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { if (wake(&p->context)) notify = true; +unlock(&p->context.mutex); lock(&p->sched_mutex); tslot_t *ts = p->context.runner; if (unassign_thread(ts, UNUSED)) notify = true; unlock(&p->sched_mutex); -unlock(&p->context.mutex); if (notify) wake_notify(&p->context); - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 09/09: PROTON-2130: swap include file ordering for PROTON-2195/2196, trim trailing whitespace
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 72635e3e882266be5a9c479bb4cfc1fb92b9d694 Author: Cliff Jansen AuthorDate: Wed Apr 22 22:21:21 2020 -0700 PROTON-2130: swap include file ordering for PROTON-2195/2196, trim trailing whitespace --- c/src/proactor/epoll.c | 10 +++--- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 2eb1ac2..baf1638 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -43,8 +43,8 @@ already running. The context will know if it needs to put itself back on the wake list to be runnable later to process the pending events. - Lock ordering - never add locks right to left: -context -> sched -> wake + Lock ordering - never add locks right to left: +context -> sched -> wake non-proactor-context -> proactor-context tslot -> sched */ @@ -57,8 +57,8 @@ /* Avoid GNU extensions, in particular the incompatible alternative strerror_r() */ #undef _GNU_SOURCE -#include "epoll-internal.h" #include "proactor-internal.h" +#include "epoll-internal.h" #include "core/engine-internal.h" #include "core/logger_private.h" #include "core/util.h" @@ -700,9 +700,6 @@ static void proactor_add(pcontext_t *ctx); static bool proactor_remove(pcontext_t *ctx); static void poller_done(struct pn_proactor_t* p, tslot_t *ts); -// Type safe version of containerof -#define containerof(ptr, type, member) ((type *)((char *)(1 ? (ptr) : &((type *)0)->member) - offsetof(type, member))) - static inline pconnection_t *psocket_pconnection(psocket_t* ps) { return ps->epoll_io.type == PCONNECTION_IO ? containerof(ps, pconnection_t, psocket) : NULL; } @@ -2815,7 +2812,6 @@ pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) { // Call with no locks static inline void check_earmark_override(pn_proactor_t *p, tslot_t *ts) { - if (!ts || !ts->earmark_override) return; if (ts->earmark_override->generation == ts->earmark_override_gen) { - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 03/09: PROTON-2130: epoll proactor io bytes accounting fix for shutdown and error
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 664e436e36267ce3e7354c8de0949dacfb13d485 Author: Cliff Jansen AuthorDate: Mon Dec 2 09:42:59 2019 -0800 PROTON-2130: epoll proactor io bytes accounting fix for shutdown and error --- c/src/proactor/epoll.c | 4 1 file changed, 4 insertions(+) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 1283d91..21c611f 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -1417,6 +1417,7 @@ static bool pconnection_write(pconnection_t *pc) { } else if (errno == EWOULDBLOCK) { pc->write_blocked = true; } else if (!(errno == EAGAIN || errno == EINTR)) { +pc->wbuf_remaining = 0; return false; } return true; @@ -1586,6 +1587,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, pc->read_blocked = true; } else if (n == 0) { +pc->read_blocked = true; pn_connection_driver_read_close(&pc->driver); } else if (errno == EWOULDBLOCK) @@ -1594,6 +1596,8 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, psocket_error(&pc->psocket, errno, pc->disconnected ? "disconnected" : "on read from"); } } + } else { +pc->read_blocked = true; } if (tick_required) { - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch master updated (b569a4f -> 72635e3)
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git. from b569a4f PROTON-2196: Small proactor tidy ups new 667f78c PROTON-2130: epoll proactor changed to use serialized calls to epoll_wait for multiple events new b3d1004 PROTON-2130: epoll proactor: fix unwritten output bytes, pick up PROTON-2030 and PROTON-2131 new 664e436 PROTON-2130: epoll proactor io bytes accounting fix for shutdown and error new 7b000f1 PROTON-2130: epoll proactor race/deadlock fixes new b860538 PROTON-2130: Split out structs from epoll.c to make it easier to mess with them new 9e1990b PROTON-2130: epoll reworking: - Only keep fd in epoll_extended_t remove from ptimer_t, psocket_t - Remove backpointers and consistently use structure embedding to go from: psocket->pconnection; psocket->pn_listener; psocket->acceptor; pcontext->pconnection; pcontext->pn_listener; pn_event_batch->pn_proactor; pn_batch_event->pn_listener; pn_batch_event->pconnection; - Move address string from being stored in psocket to being stored in pconnection and pn_ [...] new ce6d021 PROTON-2130: Substantially reduce memory use for proactor connections new 4281367 PROTON-2130: more epoll reworking: - Rework queued accepts so that we do multiple at once - This should allow app to accept new connections a little more efficiently. - We limit the number of accepted connections to the specified backlog - If the app doesn't accept all the connections in a single batch we don't rearm the listener until they do, as a form of accept flow control. new 72635e3 PROTON-2130: swap include file ordering for PROTON-2195/2196, trim trailing whitespace The 9 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: c/src/proactor/epoll-internal.h | 286 ++ c/src/proactor/epoll.c | 1838 +++ c/tests/proactor_test.cpp |2 +- 3 files changed, 1554 insertions(+), 572 deletions(-) create mode 100644 c/src/proactor/epoll-internal.h - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org