[qpid-proton] 03/09: PROTON-2130: epoll proactor io bytes accounting fix for shutdown and error
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 664e436e36267ce3e7354c8de0949dacfb13d485 Author: Cliff Jansen AuthorDate: Mon Dec 2 09:42:59 2019 -0800 PROTON-2130: epoll proactor io bytes accounting fix for shutdown and error --- c/src/proactor/epoll.c | 4 1 file changed, 4 insertions(+) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 1283d91..21c611f 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -1417,6 +1417,7 @@ static bool pconnection_write(pconnection_t *pc) { } else if (errno == EWOULDBLOCK) { pc->write_blocked = true; } else if (!(errno == EAGAIN || errno == EINTR)) { +pc->wbuf_remaining = 0; return false; } return true; @@ -1586,6 +1587,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, pc->read_blocked = true; } else if (n == 0) { +pc->read_blocked = true; pn_connection_driver_read_close(>driver); } else if (errno == EWOULDBLOCK) @@ -1594,6 +1596,8 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, psocket_error(>psocket, errno, pc->disconnected ? "disconnected" : "on read from"); } } + } else { +pc->read_blocked = true; } if (tick_required) { - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 06/09: PROTON-2130: epoll reworking: - Only keep fd in epoll_extended_t remove from ptimer_t, psocket_t - Remove backpointers and consistently use structure embedding to go from: psocket
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 9e1990b1345f9917648659399f198e74fba7d5d7 Author: Andrew Stitcher AuthorDate: Thu Mar 19 23:35:36 2020 -0400 PROTON-2130: epoll reworking: - Only keep fd in epoll_extended_t remove from ptimer_t, psocket_t - Remove backpointers and consistently use structure embedding to go from: psocket->pconnection; psocket->pn_listener; psocket->acceptor; pcontext->pconnection; pcontext->pn_listener; pn_event_batch->pn_proactor; pn_batch_event->pn_listener; pn_batch_event->pconnection; - Move address string from being stored in psocket to being stored in pconnection and pn_listener - saves strings for multiple listening sockets - Rationalise post_event by switching on event types instead of the previous ad hoc event type detection. --- c/src/proactor/epoll-internal.h | 37 + c/src/proactor/epoll.c | 167 +--- 2 files changed, 107 insertions(+), 97 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 6a13e7f..e639e48 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -57,7 +57,6 @@ typedef enum { // Data to use with epoll. typedef struct epoll_extended_t { - struct psocket_t *psocket; // pconnection, listener, or NULL -> proactor int fd; epoll_type_t type; // io/timer/wakeup uint32_t wanted; // events to poll for @@ -67,7 +66,6 @@ typedef struct epoll_extended_t { typedef struct ptimer_t { pmutex mutex; - int timerfd; epoll_extended_t epoll_io; bool timer_active; bool in_doubt; // 0 or 1 callbacks are possible @@ -131,19 +129,6 @@ struct tslot_t { unsigned int earmark_override_gen; }; -/* common to connection and listener */ -typedef struct psocket_t { - pn_proactor_t *proactor; - // Remaining protected by the pconnection/listener mutex - int sockfd; - epoll_extended_t epoll_io; - pn_listener_t *listener; /* NULL for a connection socket */ - char addr_buf[PN_MAX_ADDR]; - const char *host, *port; - uint32_t sched_io_events; - uint32_t working_io_events; -} psocket_t; - struct pn_proactor_t { pcontext_t context; ptimer_t timer; @@ -213,9 +198,21 @@ struct pn_proactor_t { bool shutting_down; }; +/* common to connection and listener */ +typedef struct psocket_t { + pn_proactor_t *proactor; + // Remaining protected by the pconnection/listener mutex + epoll_extended_t epoll_io; + uint32_t sched_io_events; + uint32_t working_io_events; +} psocket_t; + typedef struct pconnection_t { psocket_t psocket; pcontext_t context; + ptimer_t timer; // TODO: review one timerfd per connection + char addr_buf[PN_MAX_ADDR]; + const char *host, *port; uint32_t new_events; int wake_count; bool server;/* accept, not connect */ @@ -223,7 +220,6 @@ typedef struct pconnection_t { bool timer_armed; bool queued_disconnect; /* deferred from pn_proactor_disconnect() */ pn_condition_t *disconnect_condition; - ptimer_t timer; // TODO: review one timerfd per connection // Following values only changed by (sole) working context: uint32_t current_arm; // active epoll io events bool connected; @@ -256,18 +252,21 @@ typedef struct pconnection_t { struct acceptor_t{ psocket_t psocket; + struct pn_netaddr_t addr; /* listening address */ + pn_listener_t *listener; + acceptor_t *next; /* next listener list member */ int accepted_fd; bool armed; bool overflowed; - acceptor_t *next; /* next listener list member */ - struct pn_netaddr_t addr; /* listening address */ }; struct pn_listener_t { + pcontext_t context; acceptor_t *acceptors; /* Array of listening sockets */ size_t acceptors_size; + char addr_buf[PN_MAX_ADDR]; + const char *host, *port; int active_count; /* Number of listener sockets registered with epoll */ - pcontext_t context; pn_condition_t *condition; pn_collector_t *collector; pn_event_batch_t batch; diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index f8ebbf4..b891133 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -177,18 +177,16 @@ static void memory_barrier(epoll_extended_t *ee) { */ static bool ptimer_init(ptimer_t *pt, struct psocket_t *ps) { - pt->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); pmutex_init(>mutex); pt->timer_active = false; pt->in_doubt = false; pt->shutting_down = false; epoll_type_t type = ps ? PCONNECTION_TIMER : PROACTOR_TIMER; - pt->epoll_io.psocket = ps; - pt->epoll_io.fd = pt->timerfd; + pt->epoll_io.fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); pt->epoll_io.type = type; pt->epoll_io.wanted = EPOLLIN;
[qpid-proton] 05/09: PROTON-2130: Split out structs from epoll.c to make it easier to mess with them
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit b860538ba698a6d6a85cc47a44e9e522d4825d47 Author: Andrew Stitcher AuthorDate: Mon Mar 16 13:09:05 2020 -0400 PROTON-2130: Split out structs from epoll.c to make it easier to mess with them --- c/src/proactor/epoll-internal.h | 288 c/src/proactor/epoll.c | 238 + 2 files changed, 289 insertions(+), 237 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h new file mode 100644 index 000..6a13e7f --- /dev/null +++ b/c/src/proactor/epoll-internal.h @@ -0,0 +1,288 @@ +#ifndef PROACTOR_EPOLL_INTERNAL_H +#define PROACTOR_EPOLL_INTERNAL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include +#include +#include + +#include +#include +#include + + +#include +#include + +#include "netaddr-internal.h" + +#ifdef __cplusplus +extern "C" { +#endif + +//typedef struct pn_proactor_t pn_proactor_t; +//typedef struct pn_listener_t pn_listener_t; +//typedef struct pn_connection_driver_t pn_connection_driver_t; +typedef struct acceptor_t acceptor_t; +typedef struct tslot_t tslot_t; +typedef pthread_mutex_t pmutex; + +typedef enum { + WAKE, /* see if any work to do in proactor/psocket context */ + PCONNECTION_IO, + PCONNECTION_TIMER, + LISTENER_IO, + PROACTOR_TIMER +} epoll_type_t; + +// Data to use with epoll. +typedef struct epoll_extended_t { + struct psocket_t *psocket; // pconnection, listener, or NULL -> proactor + int fd; + epoll_type_t type; // io/timer/wakeup + uint32_t wanted; // events to poll for + bool polling; + pmutex barrier_mutex; +} epoll_extended_t; + +typedef struct ptimer_t { + pmutex mutex; + int timerfd; + epoll_extended_t epoll_io; + bool timer_active; + bool in_doubt; // 0 or 1 callbacks are possible + bool shutting_down; +} ptimer_t; + +typedef enum { + PROACTOR, + PCONNECTION, + LISTENER, + WAKEABLE +} pcontext_type_t; + +typedef struct pcontext_t { + pmutex mutex; + pn_proactor_t *proactor; /* Immutable */ + void *owner; /* Instance governed by the context */ + pcontext_type_t type; + bool working; + bool on_wake_list; + bool wake_pending; // unprocessed eventfd wake callback (convert to bool?) + struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex + bool closing; + // Next 4 are protected by the proactor mutex + struct pcontext_t* next; /* Protected by proactor.mutex */ + struct pcontext_t* prev; /* Protected by proactor.mutex */ + int disconnect_ops; /* ops remaining before disconnect complete */ + bool disconnecting; /* pn_proactor_disconnect */ + // Protected by schedule mutex + tslot_t *runner __attribute__((aligned(64))); /* designated or running thread */ + tslot_t *prev_runner; + bool sched_wake; + bool sched_pending; /* If true, one or more unseen epoll or other events to process() */ + bool runnable ; /* in need of scheduling */ +} pcontext_t; + +typedef enum { + NEW, + UNUSED, /* pn_proactor_done() called, may never come back */ + SUSPENDED, + PROCESSING, /* Hunting for a context */ + BATCHING, /* Doing work on behalf of a context */ + DELETING, + POLLING +} tslot_state; + +// Epoll proactor's concept of a worker thread provided by the application. +struct tslot_t { + pmutex mutex; // suspend and resume + pthread_cond_t cond; + unsigned int generation; + bool suspended; + volatile bool scheduled; + tslot_state state; + pcontext_t *context; + pcontext_t *prev_context; + bool earmarked; + tslot_t *suspend_list_prev; + tslot_t *suspend_list_next; + tslot_t *earmark_override; // on earmark_drain, which thread was unassigned + unsigned int earmark_override_gen; +}; + +/* common to connection and listener */ +typedef struct psocket_t { + pn_proactor_t *proactor; + // Remaining protected by the pconnection/listener mutex + int sockfd; + epoll_extended_t
[qpid-proton] 04/09: PROTON-2130: epoll proactor race/deadlock fixes
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 7b000f12448a93a7aa1168fbca416a036547a92a Author: Cliff Jansen AuthorDate: Wed Dec 11 10:19:08 2019 -0800 PROTON-2130: epoll proactor race/deadlock fixes --- c/src/proactor/epoll.c | 66 +++--- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 21c611f..9390595 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -87,6 +87,7 @@ #include #include #include +#include #include "./netaddr-internal.h" /* Include after socket/inet headers */ @@ -940,6 +941,8 @@ static void write_flush(pconnection_t *pc); static void listener_begin_close(pn_listener_t* l); static void proactor_add(pcontext_t *ctx); static bool proactor_remove(pcontext_t *ctx); +static void poller_done(struct pn_proactor_t* p, tslot_t *ts); + static inline pconnection_t *psocket_pconnection(psocket_t* ps) { return ps->listener ? NULL : (pconnection_t*)ps; @@ -2928,14 +2931,36 @@ static pn_event_batch_t *proactor_do_epoll(pn_proactor_t* p, bool can_block) { } } - // Create a list of available threads to put to work. + poller_done(p, ts); // put suspended threads to work. + // p->poller has been released, so a new poller may already be running. +} else if (!can_block) { + ts->state = UNUSED; + unlock(>sched_mutex); + return NULL; +} else { + // TODO: loop while !poller_suspended, since new work coming + suspend(p, ts); +} + } // while +} - int resume_list_count = 0; +// Call with sched lock, but only from poller context. +static void poller_done(struct pn_proactor_t* p, tslot_t *ts) { + // Create a list of available threads to put to work. + // ts is the poller thread + int resume_list_count = 0; + tslot_t **resume_list2 = NULL; + + if (p->suspend_list_count) { +int max_resumes = p->n_warm_runnables + p->n_runnables; +max_resumes = pn_min(max_resumes, p->suspend_list_count); +if (max_resumes) { + resume_list2 = (tslot_t **) alloca(max_resumes * sizeof(tslot_t *)); for (int i = 0; i < p->n_warm_runnables ; i++) { -ctx = p->warm_runnables[i]; +pcontext_t *ctx = p->warm_runnables[i]; tslot_t *tsp = ctx->runner; if (tsp->state == SUSPENDED) { - p->resume_list[resume_list_count++] = tsp; + resume_list2[resume_list_count++] = tsp; LL_REMOVE(p, suspend_list, tsp); p->suspend_list_count--; tsp->state = PROCESSING; @@ -2956,34 +2981,25 @@ static pn_event_batch_t *proactor_do_epoll(pn_proactor_t* p, bool can_block) { for (int i = 0; i < new_runners; i++) { tslot_t *tsp = p->suspend_list_head; assert(tsp); -p->resume_list[resume_list_count++] = tsp; +resume_list2[resume_list_count++] = tsp; LL_REMOVE(p, suspend_list, tsp); p->suspend_list_count--; tsp->state = PROCESSING; } +} + } + p->poller = NULL; - p->poller = NULL; - // New poller may run concurrently. Touch only this thread's stack for rest of block. - - if (resume_list_count) { -unlock(>sched_mutex); -for (int i = 0; i < resume_list_count; i++) { - resume(p, p->resume_list[i]); -} -lock(>sched_mutex); - } -} else if (!can_block) { - ts->state = UNUSED; - unlock(>sched_mutex); - return NULL; -} else { - // TODO: loop while !poller_suspended, since new work coming - suspend(p, ts); + if (resume_list_count) { +// Allows a new poller to run concurrently. Touch only stack vars. +unlock(>sched_mutex); +for (int i = 0; i < resume_list_count; i++) { + resume(p, resume_list2[i]); } - } // while +lock(>sched_mutex); + } } - pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) { return proactor_do_epoll(p, true); } @@ -3042,12 +3058,12 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { if (wake(>context)) notify = true; +unlock(>context.mutex); lock(>sched_mutex); tslot_t *ts = p->context.runner; if (unassign_thread(ts, UNUSED)) notify = true; unlock(>sched_mutex); -unlock(>context.mutex); if (notify) wake_notify(>context); - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 09/09: PROTON-2130: swap include file ordering for PROTON-2195/2196, trim trailing whitespace
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 72635e3e882266be5a9c479bb4cfc1fb92b9d694 Author: Cliff Jansen AuthorDate: Wed Apr 22 22:21:21 2020 -0700 PROTON-2130: swap include file ordering for PROTON-2195/2196, trim trailing whitespace --- c/src/proactor/epoll.c | 10 +++--- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 2eb1ac2..baf1638 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -43,8 +43,8 @@ already running. The context will know if it needs to put itself back on the wake list to be runnable later to process the pending events. - Lock ordering - never add locks right to left: -context -> sched -> wake + Lock ordering - never add locks right to left: +context -> sched -> wake non-proactor-context -> proactor-context tslot -> sched */ @@ -57,8 +57,8 @@ /* Avoid GNU extensions, in particular the incompatible alternative strerror_r() */ #undef _GNU_SOURCE -#include "epoll-internal.h" #include "proactor-internal.h" +#include "epoll-internal.h" #include "core/engine-internal.h" #include "core/logger_private.h" #include "core/util.h" @@ -700,9 +700,6 @@ static void proactor_add(pcontext_t *ctx); static bool proactor_remove(pcontext_t *ctx); static void poller_done(struct pn_proactor_t* p, tslot_t *ts); -// Type safe version of containerof -#define containerof(ptr, type, member) ((type *)((char *)(1 ? (ptr) : &((type *)0)->member) - offsetof(type, member))) - static inline pconnection_t *psocket_pconnection(psocket_t* ps) { return ps->epoll_io.type == PCONNECTION_IO ? containerof(ps, pconnection_t, psocket) : NULL; } @@ -2815,7 +2812,6 @@ pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) { // Call with no locks static inline void check_earmark_override(pn_proactor_t *p, tslot_t *ts) { - if (!ts || !ts->earmark_override) return; if (ts->earmark_override->generation == ts->earmark_override_gen) { - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch master updated (b569a4f -> 72635e3)
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git. from b569a4f PROTON-2196: Small proactor tidy ups new 667f78c PROTON-2130: epoll proactor changed to use serialized calls to epoll_wait for multiple events new b3d1004 PROTON-2130: epoll proactor: fix unwritten output bytes, pick up PROTON-2030 and PROTON-2131 new 664e436 PROTON-2130: epoll proactor io bytes accounting fix for shutdown and error new 7b000f1 PROTON-2130: epoll proactor race/deadlock fixes new b860538 PROTON-2130: Split out structs from epoll.c to make it easier to mess with them new 9e1990b PROTON-2130: epoll reworking: - Only keep fd in epoll_extended_t remove from ptimer_t, psocket_t - Remove backpointers and consistently use structure embedding to go from: psocket->pconnection; psocket->pn_listener; psocket->acceptor; pcontext->pconnection; pcontext->pn_listener; pn_event_batch->pn_proactor; pn_batch_event->pn_listener; pn_batch_event->pconnection; - Move address string from being stored in psocket to being stored in pconnection and pn_ [...] new ce6d021 PROTON-2130: Substantially reduce memory use for proactor connections new 4281367 PROTON-2130: more epoll reworking: - Rework queued accepts so that we do multiple at once - This should allow app to accept new connections a little more efficiently. - We limit the number of accepted connections to the specified backlog - If the app doesn't accept all the connections in a single batch we don't rearm the listener until they do, as a form of accept flow control. new 72635e3 PROTON-2130: swap include file ordering for PROTON-2195/2196, trim trailing whitespace The 9 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: c/src/proactor/epoll-internal.h | 286 ++ c/src/proactor/epoll.c | 1838 +++ c/tests/proactor_test.cpp |2 +- 3 files changed, 1554 insertions(+), 572 deletions(-) create mode 100644 c/src/proactor/epoll-internal.h - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 01/09: PROTON-2130: epoll proactor changed to use serialized calls to epoll_wait for multiple events
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 667f78c6d2ac17256ea67da54ff5703d424328f1 Author: Cliff Jansen AuthorDate: Thu Nov 7 00:55:50 2019 -0800 PROTON-2130: epoll proactor changed to use serialized calls to epoll_wait for multiple events --- c/src/proactor/epoll.c| 1627 ++--- c/tests/proactor_test.cpp |2 +- 2 files changed, 1258 insertions(+), 371 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 5cc3b65..d0664d9 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -19,6 +19,37 @@ * */ +/* + The epoll proactor works with multiple concurrent threads. If there is no work to do, + one thread temporarily becomes the poller thread, while other inbound threads suspend + waiting for work. The poller calls epoll_wait(), generates work lists, resumes suspended + threads, and grabs work for itself or polls again. + + A serialized grouping of Proton events is a context (connection, listener, proactor). + Each has multiple pollable fds that make it schedulable. E.g. a connection could have a + socket fd, timerfd, and (indirect) eventfd all signaled in a single epoll_wait(). + + At the conclusion of each + N = epoll_wait(..., N_MAX, timeout) + + there will be N epoll events and M wakes on the wake list. M can be very large in a + server with many active connections. The poller makes the contexts "runnable" if they are + not already running. A running context can only be made runnable once until it completes + a chunk of work and calls unassign_thread(). (N + M - duplicates) contexts will be + scheduled. A new poller will occur when next_runnable() returns NULL. + + A running context, before it stops "working" must check to see if there were new incoming + events that the poller posted to the context, but could not make it runnable since it was + already running. The context will know if it needs to put itself back on the wake list + to be runnable later to process the pending events. + + Lock ordering - never add locks right to left: +context -> sched -> wake +non-proactor-context -> proactor-context +tslot -> sched + */ + + /* Enable POSIX features beyond c99 for modern pthread and standard strerror_r() */ #ifndef _POSIX_C_SOURCE #define _POSIX_C_SOURCE 200809L @@ -26,8 +57,9 @@ /* Avoid GNU extensions, in particular the incompatible alternative strerror_r() */ #undef _GNU_SOURCE -#include "core/logger_private.h" #include "proactor-internal.h" +#include "core/logger_private.h" +#include "core/util.h" #include #include @@ -92,9 +124,7 @@ static void pstrerror(int err, strerrorbuf msg) { // // In general all locks to be held singly and shortly (possibly as spin locks). -// Exception: psockets+proactor for pn_proactor_disconnect (convention: acquire -// psocket first to avoid deadlock). TODO: revisit the exception and its -// awkwardness in the code (additional mutex? different type?). +// See above about lock ordering. typedef pthread_mutex_t pmutex; static void pmutex_init(pthread_mutex_t *pm){ @@ -113,14 +143,13 @@ static inline void lock(pmutex *m) { pthread_mutex_lock(m); } static inline void unlock(pmutex *m) { pthread_mutex_unlock(m); } typedef struct acceptor_t acceptor_t; +typedef struct tslot_t tslot_t; typedef enum { WAKE, /* see if any work to do in proactor/psocket context */ PCONNECTION_IO, - PCONNECTION_IO_2, PCONNECTION_TIMER, LISTENER_IO, - CHAINED_EPOLL, PROACTOR_TIMER } epoll_type_t; // Data to use with epoll. @@ -161,6 +190,8 @@ static void memory_barrier(epoll_extended_t *ee) { * The original implementation with counters to track expiry counts * was abandoned in favor of "in doubt" transitions and resolution * at shutdown. + * + * TODO: review above in light of single poller thread. */ typedef struct ptimer_t { @@ -262,6 +293,14 @@ static void ptimer_finalize(ptimer_t *pt) { pmutex_finalize(>mutex); } +pn_timestamp_t pn_i_now2(void) +{ + struct timespec now; + clock_gettime(CLOCK_REALTIME, ); + return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_nsec / 100); +} + + // // Proactor common code // @@ -330,30 +369,6 @@ static void stop_polling(epoll_extended_t *ee, int epollfd) { * eventfd to allow a lock-free pn_proactor_interrupt() implementation. */ -/* - * epollfd and epollfd_2 - * - * This implementation allows multiple threads to call epoll_wait() - * concurrently (as opposed to having a single thread call - * epoll_wait() and feed work to helper threads). Unfortunately - * with this
[qpid-proton] 02/09: PROTON-2130: epoll proactor: fix unwritten output bytes, pick up PROTON-2030 and PROTON-2131
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit b3d1004800b569f69ebea58366021f7ddcd64692 Author: Cliff Jansen AuthorDate: Mon Dec 2 09:27:22 2019 -0800 PROTON-2130: epoll proactor: fix unwritten output bytes, pick up PROTON-2030 and PROTON-2131 --- c/src/proactor/epoll.c | 130 ++--- 1 file changed, 59 insertions(+), 71 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index d0664d9..1283d91 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -58,6 +58,7 @@ #undef _GNU_SOURCE #include "proactor-internal.h" +#include "core/engine-internal.h" #include "core/logger_private.h" #include "core/util.h" @@ -293,13 +294,6 @@ static void ptimer_finalize(ptimer_t *pt) { pmutex_finalize(>mutex); } -pn_timestamp_t pn_i_now2(void) -{ - struct timespec now; - clock_gettime(CLOCK_REALTIME, ); - return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_nsec / 100); -} - // // Proactor common code @@ -542,7 +536,7 @@ typedef struct pconnection_t { int hog_count; // thread hogging limiter pn_event_batch_t batch; pn_connection_driver_t driver; - bool wbuf_valid; + bool output_drained; const char *wbuf_current; size_t wbuf_remaining; size_t wbuf_completed; @@ -555,7 +549,7 @@ typedef struct pconnection_t { } pconnection_t; /* - * A listener can have mutiple sockets (as specified in the addrinfo). They + * A listener can have multiple sockets (as specified in the addrinfo). They * are armed separately. The individual psockets can be part of at most one * list: the global proactor overflow retry list or the per-listener list of * pending accepts (valid inbound socket obtained, but pn_listener_accept not @@ -1128,7 +1122,7 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con pc->read_blocked = true; pc->write_blocked = true; pc->disconnected = false; - pc->wbuf_valid = false; + pc->output_drained = false; pc->wbuf_completed = 0; pc->wbuf_remaining = 0; pc->wbuf_current = NULL; @@ -1195,26 +1189,19 @@ static void pconnection_cleanup(pconnection_t *pc) { // else proactor_disconnect logic owns psocket and its final free } -static void invalidate_wbuf(pconnection_t *pc) { - if (pc->wbuf_valid) { -if (pc->wbuf_completed) - pn_connection_driver_write_done(>driver, pc->wbuf_completed); -pc->wbuf_completed = 0; -pc->wbuf_remaining = 0; -pc->wbuf_valid = false; - } +static void set_wbuf(pconnection_t *pc, const char *start, size_t sz) { + pc->wbuf_completed = 0; + pc->wbuf_current = start; + pc->wbuf_remaining = sz; } // Never call with any locks held. static void ensure_wbuf(pconnection_t *pc) { - if (!pc->wbuf_valid) { -// next connection_driver call is the expensive output generator -pn_bytes_t wbuf = pn_connection_driver_write_buffer(>driver); -pc->wbuf_completed = 0; -pc->wbuf_remaining = wbuf.size; -pc->wbuf_current = wbuf.start; -pc->wbuf_valid = true; - } + // next connection_driver call is the expensive output generator + pn_bytes_t bytes = pn_connection_driver_write_buffer(>driver); + set_wbuf(pc, bytes.start, bytes.size); + if (bytes.size == 0) +pc->output_drained = true; } // Call with lock held or from forced_shutdown @@ -1260,9 +1247,8 @@ static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) { lock(>sched_mutex); idle_threads = (p->suspend_list_head != NULL); unlock(>sched_mutex); -if (idle_threads) { +if (idle_threads && !pc->write_blocked && !pc->read_blocked) { write_flush(pc); // May generate transport event - pc->read_blocked = pc->write_blocked = false; pconnection_process(pc, 0, false, false, true); e = pn_connection_driver_next_event(>driver); } @@ -1275,7 +1261,7 @@ static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) { } } } - if (e) invalidate_wbuf(pc); + if (e) pc->output_drained = false; return e; } @@ -1299,7 +1285,6 @@ static inline bool pconnection_wclosed(pconnection_t *pc) { close/shutdown. Let read()/write() return 0 or -1 to trigger cleanup logic. */ static bool pconnection_rearm_check(pconnection_t *pc) { - assert(pc->wbuf_valid); if (pconnection_rclosed(pc) && pconnection_wclosed(pc)) { return false; } @@ -1352,7 +1337,6 @@ static bool pconnection_sched_sync(pconnection_t *pc) { /* Call with context lock and having done a write_flush() to "know" the value of wbuf_remaining */ static inline bool pconnection_work_pending(pconnection_t *pc) { - assert(pc->wbuf_valid); if (pc->new_events || pc->wake_count || pc->tick_pending || pc->queued_disconnect) return true; if (!pc->read_blocked
[qpid-proton] 07/09: PROTON-2130: Substantially reduce memory use for proactor connections
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit ce6d0219b4fcc839db0e53b555ec378a391eff3d Author: Andrew Stitcher AuthorDate: Tue Apr 7 14:12:42 2020 -0400 PROTON-2130: Substantially reduce memory use for proactor connections --- c/src/proactor/epoll-internal.h | 2 +- c/src/proactor/epoll.c | 13 +++-- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index e639e48..3cdeef3 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -211,7 +211,6 @@ typedef struct pconnection_t { psocket_t psocket; pcontext_t context; ptimer_t timer; // TODO: review one timerfd per connection - char addr_buf[PN_MAX_ADDR]; const char *host, *port; uint32_t new_events; int wake_count; @@ -239,6 +238,7 @@ typedef struct pconnection_t { pmutex rearm_mutex;/* protects pconnection_rearm from out of order arming*/ bool io_doublecheck; /* callbacks made and new IO may have arrived */ bool sched_timeout; + char addr_buf[1]; } pconnection_t; /* diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index b891133..a173b9b 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -866,7 +866,7 @@ static int pclosefd(pn_proactor_t *p, int fd) { static void pconnection_tick(pconnection_t *pc); -static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, bool server, const char *addr) +static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, bool server, const char *addr, size_t addrlen) { memset(pc, 0, sizeof(*pc)); @@ -877,7 +877,7 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con pcontext_init(>context, PCONNECTION, p, pc); psocket_init(>psocket, p, NULL); - pni_parse_addr(addr, pc->addr_buf, sizeof(pc->addr_buf), >host, >port); + pni_parse_addr(addr, pc->addr_buf, addrlen+1, >host, >port); pc->new_events = 0; pc->wake_count = 0; pc->tick_pending = false; @@ -1531,9 +1531,10 @@ static bool wake_if_inactive(pn_proactor_t *p) { } void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, const char *addr) { - pconnection_t *pc = (pconnection_t*) calloc(1, sizeof(pconnection_t)); + size_t addrlen = strlen(addr); + pconnection_t *pc = (pconnection_t*) malloc(sizeof(pconnection_t)+addrlen); assert(pc); // TODO: memory safety - const char *err = pconnection_setup(pc, p, c, t, false, addr); + const char *err = pconnection_setup(pc, p, c, t, false, addr, addrlen); if (err) {/* TODO aconway 2017-09-13: errors must be reported as events */ PN_LOG_DEFAULT(PN_SUBSYSTEM_EVENT, PN_LEVEL_ERROR, "pn_proactor_connect failure: %s", err); return; @@ -1977,9 +1978,9 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) { } void pn_listener_accept2(pn_listener_t *l, pn_connection_t *c, pn_transport_t *t) { - pconnection_t *pc = (pconnection_t*) calloc(1, sizeof(pconnection_t)); + pconnection_t *pc = (pconnection_t*) malloc(sizeof(pconnection_t)); assert(pc); // TODO: memory safety - const char *err = pconnection_setup(pc, pn_listener_proactor(l), c, t, true, ""); + const char *err = pconnection_setup(pc, pn_listener_proactor(l), c, t, true, "", 0); if (err) { PN_LOG_DEFAULT(PN_SUBSYSTEM_EVENT, PN_LEVEL_ERROR, "pn_listener_accept failure: %s", err); return; - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 08/09: PROTON-2130: more epoll reworking: - Rework queued accepts so that we do multiple at once - This should allow app to accept new connections a little more efficiently. - We limit t
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 4281367e77b403b5b18cc82a038ef3a6d4f92238 Author: Andrew Stitcher AuthorDate: Fri Mar 27 18:31:32 2020 -0400 PROTON-2130: more epoll reworking: - Rework queued accepts so that we do multiple at once - This should allow app to accept new connections a little more efficiently. - We limit the number of accepted connections to the specified backlog - If the app doesn't accept all the connections in a single batch we don't rearm the listener until they do, as a form of accept flow control. --- c/src/proactor/epoll-internal.h | 17 +++-- c/src/proactor/epoll.c | 147 2 files changed, 96 insertions(+), 68 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 3cdeef3..3dfe956 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -40,9 +40,6 @@ extern "C" { #endif -//typedef struct pn_proactor_t pn_proactor_t; -//typedef struct pn_listener_t pn_listener_t; -//typedef struct pn_connection_driver_t pn_connection_driver_t; typedef struct acceptor_t acceptor_t; typedef struct tslot_t tslot_t; typedef pthread_mutex_t pmutex; @@ -82,7 +79,6 @@ typedef enum { typedef struct pcontext_t { pmutex mutex; pn_proactor_t *proactor; /* Immutable */ - void *owner; /* Instance governed by the context */ pcontext_type_t type; bool working; bool on_wake_list; @@ -255,11 +251,14 @@ struct acceptor_t{ struct pn_netaddr_t addr; /* listening address */ pn_listener_t *listener; acceptor_t *next; /* next listener list member */ - int accepted_fd; bool armed; bool overflowed; }; +typedef struct accepted_t{ + int accepted_fd; +} accepted_t; + struct pn_listener_t { pcontext_t context; acceptor_t *acceptors; /* Array of listening sockets */ @@ -272,10 +271,10 @@ struct pn_listener_t { pn_event_batch_t batch; pn_record_t *attachments; void *listener_context; - acceptor_t *pending_acceptors; /* list of those with a valid inbound fd*/ - int pending_count; - bool unclaimed; /* attach event dispatched but no pn_listener_attach() call yet */ - size_t backlog; + accepted_t *pending_accepteds; /* array of accepted connections */ + size_t pending_first; /* index of first pending connection */ + size_t pending_count; /* number of pending accepted connections */ + size_t backlog; /* size of pending accepted array */ bool close_dispatched; pmutex rearm_mutex; /* orders rearms/disarms, nothing else */ uint32_t sched_io_events; diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index a173b9b..2eb1ac2 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -337,11 +337,10 @@ static void stop_polling(epoll_extended_t *ee, int epollfd) { // Fake thread for temporarily disabling the scheduling of a context. static struct tslot_t *REWAKE_PLACEHOLDER = (struct tslot_t*) -1; -static void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p, void *o) { +static void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p) { memset(ctx, 0, sizeof(*ctx)); pmutex_init(>mutex); ctx->proactor = p; - ctx->owner = o; ctx->type = t; } @@ -659,10 +658,10 @@ static void make_runnable(pcontext_t *ctx) { -static void psocket_init(psocket_t* ps, pn_proactor_t* p, pn_listener_t *listener) +static void psocket_init(psocket_t* ps, pn_proactor_t* p, epoll_type_t type) { ps->epoll_io.fd = -1; - ps->epoll_io.type = listener ? LISTENER_IO : PCONNECTION_IO; + ps->epoll_io.type = type; ps->epoll_io.wanted = 0; ps->epoll_io.polling = false; ps->proactor = p; @@ -748,7 +747,7 @@ static inline bool pconnection_has_event(pconnection_t *pc) { } static inline bool listener_has_event(pn_listener_t *l) { - return pn_collector_peek(l->collector) || (l->pending_count && !l->unclaimed); + return pn_collector_peek(l->collector) || (l->pending_count); } static inline bool proactor_has_event(pn_proactor_t *p) { @@ -791,7 +790,21 @@ static void rearm(pn_proactor_t *p, epoll_extended_t *ee) { EPOLL_FATAL("arming polled file descriptor", errno); } -static void listener_list_append(acceptor_t **start, acceptor_t *item) { +static void listener_accepted_append(pn_listener_t *listener, accepted_t item) { + if (listener->pending_first+listener->pending_count >= listener->backlog) return; + + listener->pending_accepteds[listener->pending_first+listener->pending_count] = item; + listener->pending_count++; +} + +static accepted_t *listener_accepted_next(pn_listener_t *listener) { + if (!listener->pending_count) return NULL; + + listener->pending_count--; +
[qpid-proton] branch master updated: PROTON-2196: Small proactor tidy ups
This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new b569a4f PROTON-2196: Small proactor tidy ups b569a4f is described below commit b569a4f9db094558f29f05a8825385583bbb8902 Author: Andrew Stitcher AuthorDate: Wed Apr 22 22:25:29 2020 -0400 PROTON-2196: Small proactor tidy ups --- c/src/proactor/epoll.c | 14 ++ c/src/proactor/libuv.c | 8 c/src/proactor/proactor-internal.h | 3 +++ c/src/proactor/win_iocp.c | 14 +++--- 4 files changed, 20 insertions(+), 19 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 9008489..5cc3b65 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -571,7 +571,7 @@ static pconnection_t *get_pconnection(pn_connection_t* c) { pn_connection_driver_t *d = *pn_connection_driver_ptr(c); unlock(_ptr_mutex); if (!d) return NULL; - return (pconnection_t*)((char*)d-offsetof(pconnection_t, driver)); + return containerof(d, pconnection_t, driver); } static void set_pconnection(pn_connection_t* c, pconnection_t *pc) { @@ -635,13 +635,11 @@ static inline acceptor_t *psocket_acceptor(psocket_t* ps) { } static inline pconnection_t *pcontext_pconnection(pcontext_t *c) { - return c->type == PCONNECTION ? -(pconnection_t*)((char*)c - offsetof(pconnection_t, context)) : NULL; + return c->type == PCONNECTION ? containerof(c, pconnection_t, context) : NULL; } static inline pn_listener_t *pcontext_listener(pcontext_t *c) { - return c->type == LISTENER ? -(pn_listener_t*)((char*)c - offsetof(pn_listener_t, context)) : NULL; + return c->type == LISTENER ? containerof(c, pn_listener_t, context) : NULL; } static pn_event_t *listener_batch_next(pn_event_batch_t *batch); @@ -650,17 +648,17 @@ static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch); static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) { return (batch->next_event == proactor_batch_next) ? -(pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL; +containerof(batch, pn_proactor_t, batch) : NULL; } static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) { return (batch->next_event == listener_batch_next) ? -(pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL; +containerof(batch, pn_listener_t, batch) : NULL; } static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) { return (batch->next_event == pconnection_batch_next) ? -(pconnection_t*)((char*)batch - offsetof(pconnection_t, batch)) : NULL; +containerof(batch, pconnection_t, batch) : NULL; } static inline bool pconnection_has_event(pconnection_t *pc) { diff --git a/c/src/proactor/libuv.c b/c/src/proactor/libuv.c index ef2cd7c..4779aba 100644 --- a/c/src/proactor/libuv.c +++ b/c/src/proactor/libuv.c @@ -316,7 +316,7 @@ static pconnection_t *get_pconnection(pn_connection_t* c) { pn_connection_driver_t *d = *pn_connection_driver_ptr(c); uv_mutex_unlock(_ptr_mutex); if (!d) return NULL; - return (pconnection_t*)((char*)d-offsetof(pconnection_t, driver)); + return containerof(d, pconnection_t, driver); } static void set_pconnection(pn_connection_t* c, pconnection_t *pc) { @@ -360,17 +360,17 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch); static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) { return (batch->next_event == proactor_batch_next) ? -(pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL; +containerof(batch, pn_proactor_t, batch) : NULL; } static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) { return (batch->next_event == listener_batch_next) ? -(pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL; +containerof(batch, pn_listener_t, batch) : NULL; } static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) { return (batch->next_event == pconnection_batch_next) ? -(pconnection_t*)((char*)batch - offsetof(pconnection_t, batch)) : NULL; +containerof(batch, pconnection_t, batch) : NULL; } static inline work_t *batch_work(pn_event_batch_t *batch) { diff --git a/c/src/proactor/proactor-internal.h b/c/src/proactor/proactor-internal.h index afa615a..d368a0c 100644 --- a/c/src/proactor/proactor-internal.h +++ b/c/src/proactor/proactor-internal.h @@ -27,6 +27,9 @@ #include "core/logger_private.h" +// Type safe version of containerof used to find parent structs from contained structs +#define containerof(ptr, type, member) ((type *)((char *)(1 ? (ptr) : &((type *)0)->member) - offsetof(type, member))) + #ifdef __cplusplus extern "C" { #endif diff --git a/c/src/proactor/win_iocp.c b/c/src/proactor/win_iocp.c index 0c738f4..ae5a369
[qpid-proton] branch master updated: PROTON-2196: Small proactor tidy ups
This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new c856de2 PROTON-2196: Small proactor tidy ups c856de2 is described below commit c856de20b15e53bf7a2b351375bb4100f962 Author: Andrew Stitcher AuthorDate: Wed Apr 22 22:25:29 2020 -0400 PROTON-2196: Small proactor tidy ups --- c/src/proactor/epoll.c | 11 ++- c/src/proactor/libuv.c | 11 ++- c/src/proactor/proactor-internal.h | 9 + c/src/proactor/win_iocp.c | 7 --- 4 files changed, 13 insertions(+), 25 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index e68e7c8..9008489 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -675,13 +675,6 @@ static inline bool proactor_has_event(pn_proactor_t *p) { return pn_collector_peek(p->collector); } -static pn_event_t *log_event(void* p, pn_event_t *e) { - if (e) { -PN_LOG_DEFAULT(PN_SUBSYSTEM_EVENT, PN_LEVEL_DEBUG, "[%p]:(%s)", (void*)p, pn_event_type_name(pn_event_type(e))); - } - return e; -} - static void psocket_error_str(psocket_t *ps, const char *msg, const char* what) { if (!ps->listener) { pn_connection_driver_t *driver = _pconnection(ps)->driver; @@ -1746,7 +1739,7 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) { if (e && pn_event_type(e) == PN_LISTENER_CLOSE) l->close_dispatched = true; unlock(>context.mutex); - return log_event(l, e); + return pni_log_event(l, e); } static void listener_done(pn_listener_t *l) { @@ -1974,7 +1967,7 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) { if (e && pn_event_type(e) == PN_PROACTOR_TIMEOUT) p->timeout_processed = true; unlock(>context.mutex); - return log_event(p, e); + return pni_log_event(p, e); } static pn_event_batch_t *proactor_process(pn_proactor_t *p, pn_event_type_t event) { diff --git a/c/src/proactor/libuv.c b/c/src/proactor/libuv.c index fb27cd7..ef2cd7c 100644 --- a/c/src/proactor/libuv.c +++ b/c/src/proactor/libuv.c @@ -823,25 +823,18 @@ static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) return >batch; } -static pn_event_t *log_event(void* p, pn_event_t *e) { - if (e) { -PN_LOG_DEFAULT(PN_SUBSYSTEM_EVENT, PN_LEVEL_DEBUG, "[%p]:(%s)", (void*)p, pn_event_type_name(pn_event_type(e))); - } - return e; -} - static pn_event_t *listener_batch_next(pn_event_batch_t *batch) { pn_listener_t *l = batch_listener(batch); uv_mutex_lock(>lock); pn_event_t *e = pn_collector_next(l->collector); uv_mutex_unlock(>lock); - return log_event(l, e); + return pni_log_event(l, e); } static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) { pn_proactor_t *p = batch_proactor(batch); assert(p->batch_working); - return log_event(p, pn_collector_next(p->collector)); + return pni_log_event(p, pn_collector_next(p->collector)); } static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) { diff --git a/c/src/proactor/proactor-internal.h b/c/src/proactor/proactor-internal.h index 88e61c0..afa615a 100644 --- a/c/src/proactor/proactor-internal.h +++ b/c/src/proactor/proactor-internal.h @@ -25,6 +25,8 @@ #include #include +#include "core/logger_private.h" + #ifdef __cplusplus extern "C" { #endif @@ -60,6 +62,13 @@ struct pn_event_batch_t { pn_event_t *(*next_event)(pn_event_batch_t *batch); }; +static inline pn_event_t *pni_log_event(void* p, pn_event_t *e) { + if (e) { +PN_LOG_DEFAULT(PN_SUBSYSTEM_EVENT, PN_LEVEL_DEBUG, "[%p]:(%s)", (void*)p, pn_event_type_name(pn_event_type(e))); + } + return e; +} + #ifdef __cplusplus } #endif diff --git a/c/src/proactor/win_iocp.c b/c/src/proactor/win_iocp.c index 07a3a78..0c738f4 100644 --- a/c/src/proactor/win_iocp.c +++ b/c/src/proactor/win_iocp.c @@ -2017,13 +2017,6 @@ static inline bool proactor_has_event(pn_proactor_t *p) { return pn_collector_peek(p->collector); } -static pn_event_t *log_event(void* p, pn_event_t *e) { - if (e) { -PN_LOG_DEFAULT(PN_SUBSYSTEM_EVENT, PN_LEVEL_DEBUG, "[%p]:(%s)", (void*)p, pn_event_type_name(pn_event_type(e))); - } - return e; -} - static void psocket_error_str(psocket_t *ps, const char *msg, const char* what) { if (ps->is_reaper) return; - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] branch master updated: PROTON-2195: Finalise the proactor API around pn_event_batch_t - Move pn_event_batch_t entirely into libqpid-proton-proactor - Make the pn_event_batch_t implementa
This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new ae06b41 PROTON-2195: Finalise the proactor API around pn_event_batch_t - Move pn_event_batch_t entirely into libqpid-proton-proactor - Make the pn_event_batch_t implementation private - Removed pn_event_batch_connection_driver() API as it is not used at all and can't be used for anything. ae06b41 is described below commit ae06b4180e924b392e1b816f97aada8591255b5a Author: Andrew Stitcher AuthorDate: Wed Apr 22 13:34:51 2020 -0400 PROTON-2195: Finalise the proactor API around pn_event_batch_t - Move pn_event_batch_t entirely into libqpid-proton-proactor - Make the pn_event_batch_t implementation private - Removed pn_event_batch_connection_driver() API as it is not used at all and can't be used for anything. --- c/include/proton/connection_driver.h | 7 --- c/include/proton/event.h | 32 c/include/proton/proactor.h | 12 c/include/proton/types.h | 13 + c/src/core/connection_driver.c | 18 ++ c/src/core/event.c | 4 c/src/proactor/libuv.c | 15 --- c/src/proactor/proactor-internal.c | 4 c/src/proactor/proactor-internal.h | 8 9 files changed, 51 insertions(+), 62 deletions(-) diff --git a/c/include/proton/connection_driver.h b/c/include/proton/connection_driver.h index 06b80ee..aa703b5 100644 --- a/c/include/proton/connection_driver.h +++ b/c/include/proton/connection_driver.h @@ -91,7 +91,6 @@ typedef struct pn_connection_driver_t { pn_connection_t *connection; pn_transport_t *transport; pn_collector_t *collector; - pn_event_batch_t batch; } pn_connection_driver_t; /** @@ -234,12 +233,6 @@ PN_EXTERN void pn_connection_driver_errorf(pn_connection_driver_t *d, const char PN_EXTERN void pn_connection_driver_verrorf(pn_connection_driver_t *d, const char *name, const char *fmt, va_list); /** - * If batch is part of a connection_driver, return the connection_driver address, - * else return NULL - */ -PN_EXTERN pn_connection_driver_t* pn_event_batch_connection_driver(pn_event_batch_t *batch); - -/** * The write side of the transport is closed, it will no longer produce bytes to write to * external IO. Synonym for PN_TRANSPORT_HEAD_CLOSED */ diff --git a/c/include/proton/event.h b/c/include/proton/event.h index 8d62d6b..8e5fba2 100644 --- a/c/include/proton/event.h +++ b/c/include/proton/event.h @@ -539,38 +539,6 @@ PN_EXTERN pn_record_t *pn_event_attachments(pn_event_t *event); */ PN_EXTERN struct pn_condition_t *pn_event_condition(pn_event_t *event); -/** - * **Unsettled API** - A batch of events that must be handled in sequence. - * Call pn_event_batch_next() in a loop until it returns NULL to extract - * the events. - */ -typedef struct pn_event_batch_t pn_event_batch_t; - -/* NOTE: there is deliberately no peek(), more() or other look-ahead on an event - * batch. We want to know exactly which events have been handled, next() only - * allows the user to get each event exactly once, in order. - */ - -/** - * **Unsettled API** - Remove the next event from the batch and return - * it. NULL means the batch is empty. The returned event pointer is - * valid until pn_event_batch_next() is called again on the same - * batch. - */ -PN_EXTERN pn_event_t *pn_event_batch_next(pn_event_batch_t *batch); - -/** - * @cond INTERNAL - * - * pn_event_batch_next() can be re-implemented for different behaviors in different contexts. - */ -struct pn_event_batch_t { - pn_event_t *(*next_event)(pn_event_batch_t *batch); -}; -/** - * @endcond - */ - #ifdef __cplusplus } #endif diff --git a/c/include/proton/proactor.h b/c/include/proton/proactor.h index 88544bc..b0303c9 100644 --- a/c/include/proton/proactor.h +++ b/c/include/proton/proactor.h @@ -206,6 +206,18 @@ PNP_EXTERN pn_event_batch_t *pn_proactor_wait(pn_proactor_t *proactor); PNP_EXTERN pn_event_batch_t *pn_proactor_get(pn_proactor_t *proactor); /** + * Remove the next event from the batch and return + * it. NULL means the batch is empty. The returned event pointer is + * valid until pn_event_batch_next() is called again on the same + * batch. + * + * @note there is deliberately no peek(), more() or other look-ahead on an event + * batch. We want to know exactly which events have been handled, next() only + * allows the user to get each event exactly once, in order. + */ +PNP_EXTERN pn_event_t *pn_event_batch_next(pn_event_batch_t *batch); + +/** * Call when finished handling a batch of events. * * Must be called exactly once to match each call to pn_proactor_wait(). diff --git a/c/include/proton/types.h b/c/include/proton/types.h index
[qpid-proton] branch master updated: Passing pointer value instead of address to LocalFree.
This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git The following commit(s) were added to refs/heads/master by this push: new 06c680b Passing pointer value instead of address to LocalFree. 06c680b is described below commit 06c680b39fe3ef2dfe1215cb1221e9360d3261bf Author: Attila Kun AuthorDate: Wed Apr 15 20:30:38 2020 +0100 Passing pointer value instead of address to LocalFree. --- c/src/ssl/schannel.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/c/src/ssl/schannel.c b/c/src/ssl/schannel.c index 5938857..18073f8 100644 --- a/c/src/ssl/schannel.c +++ b/c/src/ssl/schannel.c @@ -2075,7 +2075,7 @@ static bool server_name_matches(const char *server_name, CERT_EXTENSION *alt_nam } } } -LocalFree(_name_info); +LocalFree(alt_name_info); } if (!matched) { - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-jms] branch master updated: QPIDJMS-499 Update to netty 4.1.49.Final
This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-jms.git The following commit(s) were added to refs/heads/master by this push: new f8e79b7 QPIDJMS-499 Update to netty 4.1.49.Final f8e79b7 is described below commit f8e79b71948b9f9ce201457ce3e95bf4afa8a9cf Author: Timothy Bish AuthorDate: Wed Apr 22 09:29:30 2020 -0400 QPIDJMS-499 Update to netty 4.1.49.Final --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index adf3927..98546f6 100644 --- a/pom.xml +++ b/pom.xml @@ -37,7 +37,7 @@ 0.33.4 -4.1.48.Final +4.1.49.Final 1.7.30 1.0-alpha-2 - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org