This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 47b958f271ea0637046c9b85de4690bd4dbebb1d Author: Clifford Jansen <cliffjan...@apache.org> AuthorDate: Thu Mar 23 17:05:49 2023 -0700 PROTON-2673: epoll raw connections - allow delivery of wake events prior to successful connect. --- c/src/proactor/epoll_raw_connection.c | 34 +++++++++++++++++--------------- c/src/proactor/raw_connection-internal.h | 2 ++ c/src/proactor/raw_connection.c | 12 ++++++++++- 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index cb61d6a75..56bebee85 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -367,7 +367,18 @@ static void set_error(pn_raw_connection_t *conn, const char *msg, int err) { pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool sched_ready) { praw_connection_t *rc = containerof(t, praw_connection_t, task); + bool task_wake = false; + bool can_wake = pni_raw_can_wake(&rc->raw_connection); lock(&rc->task.mutex); + t->working = true; + if (sched_ready) + schedule_done(t); + if (pni_task_wake_pending(&rc->task)) { + if (can_wake) + task_wake = true; // batch_next() will complete the task wake. + else + pni_task_wake_done(&rc->task); // Complete task wake without event. + } int events = io_events; int fd = rc->psocket.epoll_io.fd; if (!rc->connected) { @@ -381,26 +392,17 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool return &rc->batch; } if (events & (EPOLLHUP | EPOLLERR)) { + // A wake can be the first event. Otherwise, wait for connection to complete. + bool event_pending = task_wake || pni_raw_wake_is_pending(&rc->raw_connection) || pn_collector_peek(rc->raw_connection.collector); + t->working = event_pending; unlock(&rc->task.mutex); - return NULL; + return event_pending ? &rc->batch : NULL; } - praw_connection_connected_lh(rc); + if (events & EPOLLOUT) + praw_connection_connected_lh(rc); } unlock(&rc->task.mutex); - bool wake = false; - lock(&t->mutex); - t->working = true; - if (sched_ready) { - schedule_done(t); - if (pni_task_wake_pending(&rc->task)) { - wake = true; - pni_task_wake_done(&rc->task); - } - } - unlock(&t->mutex); - - if (wake) pni_raw_wake(&rc->raw_connection); if (events & EPOLLIN) pni_raw_read(&rc->raw_connection, fd, rcv, set_error); if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd, set_error); rc->batch_empty = false; @@ -417,7 +419,7 @@ void pni_raw_connection_done(praw_connection_t *rc) { if (pn_collector_peek(rc->raw_connection.collector)) have_event = true; else { - pn_event_t *e = pni_raw_event_next(&rc->raw_connection); + pn_event_t *e = pni_raw_batch_next(&rc->batch); // State machine up to date. if (e) { have_event = true; diff --git a/c/src/proactor/raw_connection-internal.h b/c/src/proactor/raw_connection-internal.h index 218bf2b2d..47b0ea925 100644 --- a/c/src/proactor/raw_connection-internal.h +++ b/c/src/proactor/raw_connection-internal.h @@ -126,6 +126,8 @@ bool pni_raw_validate(pn_raw_connection_t *conn); void pni_raw_connected(pn_raw_connection_t *conn); void pni_raw_connect_failed(pn_raw_connection_t *conn); void pni_raw_wake(pn_raw_connection_t *conn); +bool pni_raw_wake_is_pending(pn_raw_connection_t *conn); +bool pni_raw_can_wake(pn_raw_connection_t *conn); void pni_raw_close(pn_raw_connection_t *conn); void pni_raw_read_close(pn_raw_connection_t *conn); void pni_raw_write_close(pn_raw_connection_t *conn); diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c index a7aa21d11..fd633a284 100644 --- a/c/src/proactor/raw_connection.c +++ b/c/src/proactor/raw_connection.c @@ -475,7 +475,17 @@ void pni_raw_connect_failed(pn_raw_connection_t *conn) { } void pni_raw_wake(pn_raw_connection_t *conn) { - conn->wakepending = true; + if (conn->disconnect_state != disc_fini) + conn->wakepending = true; +} + +bool pni_raw_wake_is_pending(pn_raw_connection_t *conn) { + return conn->wakepending; +} + +bool pni_raw_can_wake(pn_raw_connection_t *conn) { + // True if DISCONNECTED event has not yet been extracted from the batch. + return (conn->disconnect_state != disc_fini); } void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *, int)) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org