This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit b860538ba698a6d6a85cc47a44e9e522d4825d47 Author: Andrew Stitcher <astitc...@apache.org> AuthorDate: Mon Mar 16 13:09:05 2020 -0400 PROTON-2130: Split out structs from epoll.c to make it easier to mess with them --- c/src/proactor/epoll-internal.h | 288 ++++++++++++++++++++++++++++++++++++++++ c/src/proactor/epoll.c | 238 +-------------------------------- 2 files changed, 289 insertions(+), 237 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h new file mode 100644 index 0000000..6a13e7f --- /dev/null +++ b/c/src/proactor/epoll-internal.h @@ -0,0 +1,288 @@ +#ifndef PROACTOR_EPOLL_INTERNAL_H +#define PROACTOR_EPOLL_INTERNAL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <stdbool.h> +#include <stdint.h> +#include <pthread.h> + +#include <netdb.h> +#include <netinet/in.h> +#include <sys/socket.h> + + +#include <proton/connection_driver.h> +#include <proton/proactor.h> + +#include "netaddr-internal.h" + +#ifdef __cplusplus +extern "C" { +#endif + +//typedef struct pn_proactor_t pn_proactor_t; +//typedef struct pn_listener_t pn_listener_t; +//typedef struct pn_connection_driver_t pn_connection_driver_t; +typedef struct acceptor_t acceptor_t; +typedef struct tslot_t tslot_t; +typedef pthread_mutex_t pmutex; + +typedef enum { + WAKE, /* see if any work to do in proactor/psocket context */ + PCONNECTION_IO, + PCONNECTION_TIMER, + LISTENER_IO, + PROACTOR_TIMER +} epoll_type_t; + +// Data to use with epoll. +typedef struct epoll_extended_t { + struct psocket_t *psocket; // pconnection, listener, or NULL -> proactor + int fd; + epoll_type_t type; // io/timer/wakeup + uint32_t wanted; // events to poll for + bool polling; + pmutex barrier_mutex; +} epoll_extended_t; + +typedef struct ptimer_t { + pmutex mutex; + int timerfd; + epoll_extended_t epoll_io; + bool timer_active; + bool in_doubt; // 0 or 1 callbacks are possible + bool shutting_down; +} ptimer_t; + +typedef enum { + PROACTOR, + PCONNECTION, + LISTENER, + WAKEABLE +} pcontext_type_t; + +typedef struct pcontext_t { + pmutex mutex; + pn_proactor_t *proactor; /* Immutable */ + void *owner; /* Instance governed by the context */ + pcontext_type_t type; + bool working; + bool on_wake_list; + bool wake_pending; // unprocessed eventfd wake callback (convert to bool?) + struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex + bool closing; + // Next 4 are protected by the proactor mutex + struct pcontext_t* next; /* Protected by proactor.mutex */ + struct pcontext_t* prev; /* Protected by proactor.mutex */ + int disconnect_ops; /* ops remaining before disconnect complete */ + bool disconnecting; /* pn_proactor_disconnect */ + // Protected by schedule mutex + tslot_t *runner __attribute__((aligned(64))); /* designated or running thread */ + tslot_t *prev_runner; + bool sched_wake; + bool sched_pending; /* If true, one or more unseen epoll or other events to process() */ + bool runnable ; /* in need of scheduling */ +} pcontext_t; + +typedef enum { + NEW, + UNUSED, /* pn_proactor_done() called, may never come back */ + SUSPENDED, + PROCESSING, /* Hunting for a context */ + BATCHING, /* Doing work on behalf of a context */ + DELETING, + POLLING +} tslot_state; + +// Epoll proactor's concept of a worker thread provided by the application. +struct tslot_t { + pmutex mutex; // suspend and resume + pthread_cond_t cond; + unsigned int generation; + bool suspended; + volatile bool scheduled; + tslot_state state; + pcontext_t *context; + pcontext_t *prev_context; + bool earmarked; + tslot_t *suspend_list_prev; + tslot_t *suspend_list_next; + tslot_t *earmark_override; // on earmark_drain, which thread was unassigned + unsigned int earmark_override_gen; +}; + +/* common to connection and listener */ +typedef struct psocket_t { + pn_proactor_t *proactor; + // Remaining protected by the pconnection/listener mutex + int sockfd; + epoll_extended_t epoll_io; + pn_listener_t *listener; /* NULL for a connection socket */ + char addr_buf[PN_MAX_ADDR]; + const char *host, *port; + uint32_t sched_io_events; + uint32_t working_io_events; +} psocket_t; + +struct pn_proactor_t { + pcontext_t context; + ptimer_t timer; + epoll_extended_t epoll_wake; + epoll_extended_t epoll_interrupt; + pn_event_batch_t batch; + pcontext_t *contexts; /* track in-use contexts for PN_PROACTOR_INACTIVE and disconnect */ + size_t disconnects_pending; /* unfinished proactor disconnects*/ + // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch() + bool need_interrupt; + bool need_inactive; + bool need_timeout; + bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */ + bool timeout_processed; /* timeout event dispatched in the most recent event batch */ + bool timer_armed; /* timer is armed in epoll */ + int context_count; + + // wake subsystem + int eventfd; + pmutex eventfd_mutex; + bool wakes_in_progress; + pcontext_t *wake_list_first; + pcontext_t *wake_list_last; + // Interrupts have a dedicated eventfd because they must be async-signal safe. + int interruptfd; + // If the process runs out of file descriptors, disarm listening sockets temporarily and save them here. + acceptor_t *overflow; + pmutex overflow_mutex; + + // Sched vars specific to proactor context. + bool sched_timeout; + bool sched_interrupt; + + // Global scheduling/poller vars. + // Warm runnables have assigned or earmarked tslots and can run right away. + // Other runnables are run as tslots come available. + pmutex sched_mutex; + int n_runnables; + int next_runnable; + int n_warm_runnables; + tslot_t *suspend_list_head; + tslot_t *suspend_list_tail; + int suspend_list_count; + tslot_t *poller; + bool poller_suspended; + tslot_t *last_earmark; + pcontext_t *sched_wake_first; + pcontext_t *sched_wake_last; + pcontext_t *sched_wake_current; + pmutex tslot_mutex; + int earmark_count; + bool earmark_drain; + bool sched_wakes_pending; + + // Mostly read only: after init or once thread_count stabilizes + pn_collector_t *collector __attribute__((aligned(64))); + pcontext_t **warm_runnables; + pcontext_t **runnables; + tslot_t **resume_list; + pn_hash_t *tslot_map; + struct epoll_event *kevents; + int epollfd; + int thread_count; + int thread_capacity; + int runnables_capacity; + int kevents_capacity; + bool shutting_down; +}; + +typedef struct pconnection_t { + psocket_t psocket; + pcontext_t context; + uint32_t new_events; + int wake_count; + bool server; /* accept, not connect */ + bool tick_pending; + bool timer_armed; + bool queued_disconnect; /* deferred from pn_proactor_disconnect() */ + pn_condition_t *disconnect_condition; + ptimer_t timer; // TODO: review one timerfd per connection + // Following values only changed by (sole) working context: + uint32_t current_arm; // active epoll io events + bool connected; + bool read_blocked; + bool write_blocked; + bool disconnected; + int hog_count; // thread hogging limiter + pn_event_batch_t batch; + pn_connection_driver_t driver; + bool output_drained; + const char *wbuf_current; + size_t wbuf_remaining; + size_t wbuf_completed; + struct pn_netaddr_t local, remote; /* Actual addresses */ + struct addrinfo *addrinfo; /* Resolved address list */ + struct addrinfo *ai; /* Current connect address */ + pmutex rearm_mutex; /* protects pconnection_rearm from out of order arming*/ + bool io_doublecheck; /* callbacks made and new IO may have arrived */ + bool sched_timeout; +} pconnection_t; + +/* + * A listener can have multiple sockets (as specified in the addrinfo). They + * are armed separately. The individual psockets can be part of at most one + * list: the global proactor overflow retry list or the per-listener list of + * pending accepts (valid inbound socket obtained, but pn_listener_accept not + * yet called by the application). These lists will be small and quick to + * traverse. + */ + +struct acceptor_t{ + psocket_t psocket; + int accepted_fd; + bool armed; + bool overflowed; + acceptor_t *next; /* next listener list member */ + struct pn_netaddr_t addr; /* listening address */ +}; + +struct pn_listener_t { + acceptor_t *acceptors; /* Array of listening sockets */ + size_t acceptors_size; + int active_count; /* Number of listener sockets registered with epoll */ + pcontext_t context; + pn_condition_t *condition; + pn_collector_t *collector; + pn_event_batch_t batch; + pn_record_t *attachments; + void *listener_context; + acceptor_t *pending_acceptors; /* list of those with a valid inbound fd*/ + int pending_count; + bool unclaimed; /* attach event dispatched but no pn_listener_attach() call yet */ + size_t backlog; + bool close_dispatched; + pmutex rearm_mutex; /* orders rearms/disarms, nothing else */ + uint32_t sched_io_events; +}; + +#ifdef __cplusplus +} +#endif +#endif // PROACTOR_EPOLL_INTERNAL_H diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 9390595..f8ebbf4 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -57,6 +57,7 @@ /* Avoid GNU extensions, in particular the incompatible alternative strerror_r() */ #undef _GNU_SOURCE +#include "epoll-internal.h" #include "proactor-internal.h" #include "core/engine-internal.h" #include "core/logger_private.h" @@ -128,7 +129,6 @@ static void pstrerror(int err, strerrorbuf msg) { // In general all locks to be held singly and shortly (possibly as spin locks). // See above about lock ordering. -typedef pthread_mutex_t pmutex; static void pmutex_init(pthread_mutex_t *pm){ pthread_mutexattr_t attr; @@ -144,26 +144,6 @@ static void pmutex_finalize(pthread_mutex_t *m) { pthread_mutex_destroy(m); } static inline void lock(pmutex *m) { pthread_mutex_lock(m); } static inline void unlock(pmutex *m) { pthread_mutex_unlock(m); } -typedef struct acceptor_t acceptor_t; -typedef struct tslot_t tslot_t; - -typedef enum { - WAKE, /* see if any work to do in proactor/psocket context */ - PCONNECTION_IO, - PCONNECTION_TIMER, - LISTENER_IO, - PROACTOR_TIMER } epoll_type_t; - -// Data to use with epoll. -typedef struct epoll_extended_t { - struct psocket_t *psocket; // pconnection, listener, or NULL -> proactor - int fd; - epoll_type_t type; // io/timer/wakeup - uint32_t wanted; // events to poll for - bool polling; - pmutex barrier_mutex; -} epoll_extended_t; - /* epoll_ctl()/epoll_wait() do not form a memory barrier, so cached memory writes to struct epoll_extended_t in the EPOLL_ADD thread might not be visible to epoll_wait() thread. This function creates a memory barrier, @@ -196,15 +176,6 @@ static void memory_barrier(epoll_extended_t *ee) { * TODO: review above in light of single poller thread. */ -typedef struct ptimer_t { - pmutex mutex; - int timerfd; - epoll_extended_t epoll_io; - bool timer_active; - bool in_doubt; // 0 or 1 callbacks are possible - bool shutting_down; -} ptimer_t; - static bool ptimer_init(ptimer_t *pt, struct psocket_t *ps) { pt->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); pmutex_init(&pt->mutex); @@ -364,60 +335,6 @@ static void stop_polling(epoll_extended_t *ee, int epollfd) { * eventfd to allow a lock-free pn_proactor_interrupt() implementation. */ -typedef enum { - PROACTOR, - PCONNECTION, - LISTENER, - WAKEABLE } pcontext_type_t; - -typedef struct pcontext_t { - pmutex mutex; - pn_proactor_t *proactor; /* Immutable */ - void *owner; /* Instance governed by the context */ - pcontext_type_t type; - bool working; - bool on_wake_list; - bool wake_pending; // unprocessed eventfd wake callback (convert to bool?) - struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex - bool closing; - // Next 4 are protected by the proactor mutex - struct pcontext_t* next; /* Protected by proactor.mutex */ - struct pcontext_t* prev; /* Protected by proactor.mutex */ - int disconnect_ops; /* ops remaining before disconnect complete */ - bool disconnecting; /* pn_proactor_disconnect */ - // Protected by schedule mutex - tslot_t *runner __attribute__((aligned(64))); /* designated or running thread */ - tslot_t *prev_runner; - bool sched_wake; - bool sched_pending; /* If true, one or more unseen epoll or other events to process() */ - bool runnable ; /* in need of scheduling */ -} pcontext_t; - -typedef enum { - NEW, - UNUSED, /* pn_proactor_done() called, may never come back */ - SUSPENDED, - PROCESSING, /* Hunting for a context */ - BATCHING, /* Doing work on behalf of a context */ - DELETING, - POLLING } tslot_state; - -// Epoll proactor's concept of a worker thread provided by the application. -struct tslot_t { - pmutex mutex; // suspend and resume - pthread_cond_t cond; - unsigned int generation; - bool suspended; - volatile bool scheduled; - tslot_state state; - pcontext_t *context; - pcontext_t *prev_context; - bool earmarked; - tslot_t *suspend_list_prev; - tslot_t *suspend_list_next; - tslot_t *earmark_override; // on earmark_drain, which thread was unassigned - unsigned int earmark_override_gen; -}; // Fake thread for temporarily disabling the scheduling of a context. static struct tslot_t *REWAKE_PLACEHOLDER = (struct tslot_t*) -1; @@ -434,159 +351,6 @@ static void pcontext_finalize(pcontext_t* ctx) { pmutex_finalize(&ctx->mutex); } - -/* common to connection and listener */ -typedef struct psocket_t { - pn_proactor_t *proactor; - // Remaining protected by the pconnection/listener mutex - int sockfd; - epoll_extended_t epoll_io; - pn_listener_t *listener; /* NULL for a connection socket */ - char addr_buf[PN_MAX_ADDR]; - const char *host, *port; - uint32_t sched_io_events; - uint32_t working_io_events; -} psocket_t; - -struct pn_proactor_t { - pcontext_t context; - ptimer_t timer; - epoll_extended_t epoll_wake; - epoll_extended_t epoll_interrupt; - pn_event_batch_t batch; - pcontext_t *contexts; /* track in-use contexts for PN_PROACTOR_INACTIVE and disconnect */ - size_t disconnects_pending; /* unfinished proactor disconnects*/ - // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch() - bool need_interrupt; - bool need_inactive; - bool need_timeout; - bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */ - bool timeout_processed; /* timeout event dispatched in the most recent event batch */ - bool timer_armed; /* timer is armed in epoll */ - int context_count; - - // wake subsystem - int eventfd; - pmutex eventfd_mutex; - bool wakes_in_progress; - pcontext_t *wake_list_first; - pcontext_t *wake_list_last; - // Interrupts have a dedicated eventfd because they must be async-signal safe. - int interruptfd; - // If the process runs out of file descriptors, disarm listening sockets temporarily and save them here. - acceptor_t *overflow; - pmutex overflow_mutex; - - // Sched vars specific to proactor context. - bool sched_timeout; - bool sched_interrupt; - - // Global scheduling/poller vars. - // Warm runnables have assigned or earmarked tslots and can run right away. - // Other runnables are run as tslots come available. - pmutex sched_mutex; - int n_runnables; - int next_runnable; - int n_warm_runnables; - tslot_t *suspend_list_head; - tslot_t *suspend_list_tail; - int suspend_list_count; - tslot_t *poller; - bool poller_suspended; - tslot_t *last_earmark; - pcontext_t *sched_wake_first; - pcontext_t *sched_wake_last; - pcontext_t *sched_wake_current; - pmutex tslot_mutex; - int earmark_count; - bool earmark_drain; - bool sched_wakes_pending; - - // Mostly read only: after init or once thread_count stabilizes - pn_collector_t *collector __attribute__((aligned(64))); - pcontext_t **warm_runnables; - pcontext_t **runnables; - tslot_t **resume_list; - pn_hash_t *tslot_map; - struct epoll_event *kevents; - int epollfd; - int thread_count; - int thread_capacity; - int runnables_capacity; - int kevents_capacity; - bool shutting_down; -}; - -typedef struct pconnection_t { - psocket_t psocket; - pcontext_t context; - uint32_t new_events; - int wake_count; - bool server; /* accept, not connect */ - bool tick_pending; - bool timer_armed; - bool queued_disconnect; /* deferred from pn_proactor_disconnect() */ - pn_condition_t *disconnect_condition; - ptimer_t timer; // TODO: review one timerfd per connection - // Following values only changed by (sole) working context: - uint32_t current_arm; // active epoll io events - bool connected; - bool read_blocked; - bool write_blocked; - bool disconnected; - int hog_count; // thread hogging limiter - pn_event_batch_t batch; - pn_connection_driver_t driver; - bool output_drained; - const char *wbuf_current; - size_t wbuf_remaining; - size_t wbuf_completed; - struct pn_netaddr_t local, remote; /* Actual addresses */ - struct addrinfo *addrinfo; /* Resolved address list */ - struct addrinfo *ai; /* Current connect address */ - pmutex rearm_mutex; /* protects pconnection_rearm from out of order arming*/ - bool io_doublecheck; /* callbacks made and new IO may have arrived */ - bool sched_timeout; -} pconnection_t; - -/* - * A listener can have multiple sockets (as specified in the addrinfo). They - * are armed separately. The individual psockets can be part of at most one - * list: the global proactor overflow retry list or the per-listener list of - * pending accepts (valid inbound socket obtained, but pn_listener_accept not - * yet called by the application). These lists will be small and quick to - * traverse. - */ - -struct acceptor_t{ - psocket_t psocket; - int accepted_fd; - bool armed; - bool overflowed; - acceptor_t *next; /* next listener list member */ - struct pn_netaddr_t addr; /* listening address */ -}; - -struct pn_listener_t { - acceptor_t *acceptors; /* Array of listening sockets */ - size_t acceptors_size; - int active_count; /* Number of listener sockets registered with epoll */ - pcontext_t context; - pn_condition_t *condition; - pn_collector_t *collector; - pn_event_batch_t batch; - pn_record_t *attachments; - void *listener_context; - acceptor_t *pending_acceptors; /* list of those with a valid inbound fd*/ - int pending_count; - bool unclaimed; /* attach event dispatched but no pn_listener_attach() call yet */ - size_t backlog; - bool close_dispatched; - pmutex rearm_mutex; /* orders rearms/disarms, nothing else */ - uint32_t sched_io_events; -}; - - static void rearm(pn_proactor_t *p, epoll_extended_t *ee); /* --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org