Author: gsim Date: Fri Oct 10 12:43:24 2014 New Revision: 1630802 URL: http://svn.apache.org/r1630802 Log: QPID-658 Windows driver.c with selectors and selectables
Modified: qpid/proton/branches/examples/proton-c/src/windows/driver.c Modified: qpid/proton/branches/examples/proton-c/src/windows/driver.c URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/windows/driver.c?rev=1630802&r1=1630801&r2=1630802&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-c/src/windows/driver.c (original) +++ qpid/proton/branches/examples/proton-c/src/windows/driver.c Fri Oct 10 12:43:24 2014 @@ -19,59 +19,39 @@ * */ -/* - * Copy of posix poll-based driver with minimal changes to use - * select(). TODO: fully native implementaton with I/O completion - * ports. - * - * This implementation comments out the posix max_fds arg to select - * which has no meaning on windows. The number of fd_set slots are - * configured at compile time via FD_SETSIZE, chosen "large enough" - * for the limited scalability of select() at the expense of - * 2*N*sizeof(unsigned int) bytes per driver instance. select (and - * associated macros like FD_ZERO) are otherwise unaffected - * performance-wise by increasing FD_SETSIZE. - */ - -#define FD_SETSIZE 2048 -#ifndef _WIN32_WINNT -#define _WIN32_WINNT 0x0501 -#endif -#if _WIN32_WINNT < 0x0501 -#error "Proton requires Windows API support for XP or later." -#endif -#include <winsock2.h> -#include <Ws2tcpip.h> -#define PN_WINAPI - #include <assert.h> #include <stdio.h> #include <ctype.h> #include <sys/types.h> #include <fcntl.h> -#include "../platform.h" -#include <proton/io.h> #include <proton/driver.h> #include <proton/driver_extras.h> #include <proton/error.h> +#include <proton/io.h> #include <proton/sasl.h> #include <proton/ssl.h> #include <proton/util.h> +#include <proton/object.h> +#include <proton/selector.h> +#include <proton/types.h> +#include "../selectable.h" #include "../util.h" +#include "../platform.h" #include "../ssl/ssl-internal.h" -#include <proton/types.h> - -/* Posix compatibility helpers */ - -static int pn_socket_pair(SOCKET sv[2]); -#define close(sock) closesocket(sock) -static int pn_i_error_from_errno_wrap(pn_error_t *error, const char *msg) { - errno = WSAGetLastError(); - return pn_i_error_from_errno(error, msg); -} -#define pn_i_error_from_errno(e,m) pn_i_error_from_errno_wrap(e,m) +/* + * This driver provides limited thread safety for some operations on pn_connector_t objects. + * + * These calls are: pn_connector_process(), pn_connector_activate(), pn_connector_activated(), + * pn_connector_close(), and others that only touch the connection object, i.e. + * pn_connector_context(). These calls provide limited safety in that simultaneous calls are + * not allowed to the same pn_connector_t object. + * + * The application must call pn_driver_wakeup() and resume its wait loop logic if a call to + * pn_wait() may have overlapped with any of the above calls that could affect a pn_wait() + * outcome. + */ /* Decls */ @@ -81,82 +61,137 @@ static int pn_i_error_from_errno_wrap(pn struct pn_driver_t { pn_error_t *error; pn_io_t *io; + pn_selector_t *selector; pn_listener_t *listener_head; pn_listener_t *listener_tail; pn_listener_t *listener_next; pn_connector_t *connector_head; pn_connector_t *connector_tail; - pn_connector_t *connector_next; + pn_listener_t *ready_listener_head; + pn_listener_t *ready_listener_tail; + pn_connector_t *ready_connector_head; + pn_connector_t *ready_connector_tail; + pn_selectable_t *ctrl_selectable; size_t listener_count; size_t connector_count; - size_t closed_count; - fd_set readfds; - fd_set writefds; - fd_set exceptfds; - // int max_fds; - bool overflow; pn_socket_t ctrl[2]; //pipe for updating selectable status - pn_trace_t trace; - pn_timestamp_t wakeup; }; +typedef enum {LISTENER, CONNECTOR} sel_type_t; + struct pn_listener_t { + sel_type_t type; pn_driver_t *driver; pn_listener_t *listener_next; pn_listener_t *listener_prev; - int idx; + pn_listener_t *ready_listener_next; + pn_listener_t *ready_listener_prev; + void *context; + pn_selectable_t *selectable; bool pending; - pn_socket_t fd; bool closed; - void *context; }; #define PN_NAME_MAX (256) struct pn_connector_t { + sel_type_t type; pn_driver_t *driver; pn_connector_t *connector_next; pn_connector_t *connector_prev; + pn_connector_t *ready_connector_next; + pn_connector_t *ready_connector_prev; char name[PN_NAME_MAX]; + pn_timestamp_t wakeup; + pn_timestamp_t posted_wakeup; + pn_connection_t *connection; + pn_transport_t *transport; + pn_sasl_t *sasl; + pn_listener_t *listener; + void *context; + pn_selectable_t *selectable; int idx; + int status; + int posted_status; + pn_trace_t trace; bool pending_tick; bool pending_read; bool pending_write; - pn_socket_t fd; - int status; - pn_trace_t trace; bool closed; - pn_timestamp_t wakeup; - pn_connection_t *connection; - pn_transport_t *transport; - pn_sasl_t *sasl; bool input_done; bool output_done; - pn_listener_t *listener; - void *context; }; +static void get_new_events(pn_driver_t *); + /* Impls */ // listener +static void driver_listener_readable(pn_selectable_t *sel) +{ + // do nothing +} + +static void driver_listener_writable(pn_selectable_t *sel) +{ + // do nothing +} + +static void driver_listener_expired(pn_selectable_t *sel) +{ + // do nothing +} + +static ssize_t driver_listener_capacity(pn_selectable_t *sel) +{ + return 1; +} + +static ssize_t driver_listener_pending(pn_selectable_t *sel) +{ + return 0; +} + +static pn_timestamp_t driver_listener_deadline(pn_selectable_t *sel) +{ + return 0; +} + +static void driver_listener_finalize(pn_selectable_t *sel) +{ + // do nothing +} + + static void pn_driver_add_listener(pn_driver_t *d, pn_listener_t *l) { if (!l->driver) return; LL_ADD(d, listener, l); l->driver = d; d->listener_count++; + pn_selector_add(d->selector, l->selectable); +} + +static void ready_listener_list_remove(pn_driver_t *d, pn_listener_t *l) +{ + LL_REMOVE(d, ready_listener, l); + l->ready_listener_next = NULL; + l->ready_listener_prev = NULL; } static void pn_driver_remove_listener(pn_driver_t *d, pn_listener_t *l) { if (!l->driver) return; + pn_selector_remove(d->selector, l->selectable); + if (l == d->ready_listener_head || l->ready_listener_prev) + ready_listener_list_remove(d, l); + if (l == d->listener_next) { d->listener_next = l->listener_next; } - LL_REMOVE(d, listener, l); l->driver = NULL; d->listener_count--; @@ -169,7 +204,7 @@ pn_listener_t *pn_listener(pn_driver_t * pn_socket_t sock = pn_listen(driver->io, host, port); - if (sock == INVALID_SOCKET) { + if (sock == PN_INVALID_SOCKET) { return NULL; } else { pn_listener_t *l = pn_listener_fd(driver, sock, context); @@ -186,15 +221,24 @@ pn_listener_t *pn_listener_fd(pn_driver_ pn_listener_t *l = (pn_listener_t *) malloc(sizeof(pn_listener_t)); if (!l) return NULL; + l->type = LISTENER; l->driver = driver; l->listener_next = NULL; l->listener_prev = NULL; - l->idx = 0; + l->ready_listener_next = NULL; + l->ready_listener_prev = NULL; l->pending = false; - l->fd = fd; l->closed = false; l->context = context; - + l->selectable = pni_selectable(driver_listener_capacity, + driver_listener_pending, + driver_listener_deadline, + driver_listener_readable, + driver_listener_writable, + driver_listener_expired, + driver_listener_finalize); + pni_selectable_set_fd(l->selectable, fd); + pni_selectable_set_context(l->selectable, l); pn_driver_add_listener(driver, l); return l; } @@ -202,7 +246,7 @@ pn_listener_t *pn_listener_fd(pn_driver_ pn_socket_t pn_listener_get_fd(pn_listener_t *listener) { assert(listener); - return listener->fd; + return pn_selectable_fd(listener->selectable); } pn_listener_t *pn_listener_head(pn_driver_t *driver) @@ -234,8 +278,8 @@ pn_connector_t *pn_listener_accept(pn_li if (!l || !l->pending) return NULL; char name[PN_NAME_MAX]; - pn_socket_t sock = pn_accept(l->driver->io, l->fd, name, PN_NAME_MAX); - if (sock == INVALID_SOCKET) { + pn_socket_t sock = pn_accept(l->driver->io, pn_selectable_fd(l->selectable), name, PN_NAME_MAX); + if (sock == PN_INVALID_SOCKET) { return NULL; } else { if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) @@ -252,8 +296,7 @@ void pn_listener_close(pn_listener_t *l) if (!l) return; if (l->closed) return; - if (close(l->fd) == -1) - perror("close"); + pn_close(l->driver->io, pn_selectable_fd(l->selectable)); l->closed = true; } @@ -262,45 +305,85 @@ void pn_listener_free(pn_listener_t *l) if (!l) return; if (l->driver) pn_driver_remove_listener(l->driver, l); + pn_selectable_free(l->selectable); free(l); } // connector +static ssize_t driver_connection_capacity(pn_selectable_t *sel) +{ + pn_connector_t *c = (pn_connector_t *) pni_selectable_get_context(sel); + return c->posted_status & PN_SEL_RD ? 1 : 0; +} + +static ssize_t driver_connection_pending(pn_selectable_t *sel) +{ + pn_connector_t *c = (pn_connector_t *) pni_selectable_get_context(sel); + return c->posted_status & PN_SEL_WR ? 1 : 0; +} + +static pn_timestamp_t driver_connection_deadline(pn_selectable_t *sel) +{ + pn_connector_t *c = (pn_connector_t *) pni_selectable_get_context(sel); + return c->posted_wakeup; +} + +static void driver_connection_readable(pn_selectable_t *sel) +{ + // do nothing +} + +static void driver_connection_writable(pn_selectable_t *sel) +{ + // do nothing +} + +static void driver_connection_expired(pn_selectable_t *sel) +{ + // do nothing +} + +static void driver_connection_finalize(pn_selectable_t *sel) +{ + // do nothing +} + static void pn_driver_add_connector(pn_driver_t *d, pn_connector_t *c) { if (!c->driver) return; LL_ADD(d, connector, c); c->driver = d; d->connector_count++; + pn_selector_add(d->selector, c->selectable); +} + +static void ready_connector_list_remove(pn_driver_t *d, pn_connector_t *c) +{ + LL_REMOVE(d, ready_connector, c); + c->ready_connector_next = NULL; + c->ready_connector_prev = NULL; } static void pn_driver_remove_connector(pn_driver_t *d, pn_connector_t *c) { if (!c->driver) return; - if (c == d->connector_next) { - d->connector_next = c->connector_next; - } + pn_selector_remove(d->selector, c->selectable); + if (c == d->ready_connector_head || c->ready_connector_prev) + ready_connector_list_remove(d, c); LL_REMOVE(d, connector, c); c->driver = NULL; d->connector_count--; - if (c->closed) { - d->closed_count--; - } } -pn_connector_t *pn_connector(pn_driver_t *driver, const char *hostarg, +pn_connector_t *pn_connector(pn_driver_t *driver, const char *host, const char *port, void *context) { if (!driver) return NULL; - // convert "0.0.0.0" to "127.0.0.1" on Windows for outgoing sockets - const char *host = strcmp("0.0.0.0", hostarg) ? hostarg : "127.0.0.1"; - pn_socket_t sock = pn_connect(driver->io, host, port); - pn_connector_t *c = pn_connector_fd(driver, sock, context); snprintf(c->name, PN_NAME_MAX, "%s:%s", host, port); if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) @@ -308,28 +391,28 @@ pn_connector_t *pn_connector(pn_driver_t return c; } -static void pn_connector_read(pn_connector_t *ctor); -static void pn_connector_write(pn_connector_t *ctor); - pn_connector_t *pn_connector_fd(pn_driver_t *driver, pn_socket_t fd, void *context) { if (!driver) return NULL; pn_connector_t *c = (pn_connector_t *) malloc(sizeof(pn_connector_t)); if (!c) return NULL; + c->type = CONNECTOR; c->driver = driver; c->connector_next = NULL; c->connector_prev = NULL; + c->ready_connector_next = NULL; + c->ready_connector_prev = NULL; c->pending_tick = false; c->pending_read = false; c->pending_write = false; c->name[0] = '\0'; - c->idx = 0; - c->fd = fd; c->status = PN_SEL_RD | PN_SEL_WR; + c->posted_status = -1; c->trace = driver->trace; c->closed = false; c->wakeup = 0; + c->posted_wakeup = 0; c->connection = NULL; c->transport = pn_transport(); c->sasl = pn_sasl(c->transport); @@ -337,7 +420,15 @@ pn_connector_t *pn_connector_fd(pn_drive c->output_done = false; c->context = context; c->listener = NULL; - + c->selectable = pni_selectable(driver_connection_capacity, + driver_connection_pending, + driver_connection_deadline, + driver_connection_readable, + driver_connection_writable, + driver_connection_expired, + driver_connection_finalize); + pni_selectable_set_fd(c->selectable, fd); + pni_selectable_set_context(c->selectable, c); pn_connector_trace(c, driver->trace); pn_driver_add_connector(driver, c); @@ -347,7 +438,7 @@ pn_connector_t *pn_connector_fd(pn_drive pn_socket_t pn_connector_get_fd(pn_connector_t *connector) { assert(connector); - return connector->fd; + return pn_selectable_fd(connector->selectable); } pn_connector_t *pn_connector_head(pn_driver_t *driver) @@ -380,8 +471,15 @@ pn_transport_t *pn_connector_transport(p void pn_connector_set_connection(pn_connector_t *ctor, pn_connection_t *connection) { if (!ctor) return; + if (ctor->connection) { + pn_decref(ctor->connection); + pn_transport_unbind(ctor->transport); + } ctor->connection = connection; - pn_transport_bind(ctor->transport, connection); + if (ctor->connection) { + pn_incref(ctor->connection); + pn_transport_bind(ctor->transport, connection); + } if (ctor->transport) pn_transport_trace(ctor->transport, ctor->trace); } @@ -418,10 +516,8 @@ void pn_connector_close(pn_connector_t * if (!ctor) return; ctor->status = 0; - if (close(ctor->fd) == -1) - perror("close"); + pn_close(ctor->driver->io, pn_selectable_fd(ctor->selectable)); ctor->closed = true; - ctor->driver->closed_count++; } bool pn_connector_closed(pn_connector_t *ctor) @@ -434,9 +530,11 @@ void pn_connector_free(pn_connector_t *c if (!ctor) return; if (ctor->driver) pn_driver_remove_connector(ctor->driver, ctor); - ctor->connection = NULL; pn_transport_free(ctor->transport); ctor->transport = NULL; + if (ctor->connection) pn_decref(ctor->connection); + ctor->connection = NULL; + pn_selectable_free(ctor->selectable); free(ctor); } @@ -487,6 +585,7 @@ void pn_connector_process(pn_connector_t if (c->closed) return; pn_transport_t *transport = c->transport; + pn_socket_t sock = pn_selectable_fd(c->selectable); /// /// Socket read @@ -497,7 +596,7 @@ void pn_connector_process(pn_connector_t c->status |= PN_SEL_RD; if (c->pending_read) { c->pending_read = false; - ssize_t n = pn_recv(c->driver->io, c->fd, pn_transport_tail(transport), capacity); + ssize_t n = pn_recv(c->driver->io, sock, pn_transport_tail(transport), capacity); if (n < 0) { if (errno != EAGAIN) { perror("read"); @@ -540,7 +639,7 @@ void pn_connector_process(pn_connector_t c->status |= PN_SEL_WR; if (c->pending_write) { c->pending_write = false; - ssize_t n = pn_send(c->driver->io, c->fd, pn_transport_head(transport), pending); + ssize_t n = pn_send(c->driver->io, sock, pn_transport_head(transport), pending); if (n < 0) { // XXX if (errno != EAGAIN) { @@ -574,35 +673,41 @@ void pn_connector_process(pn_connector_t // driver +static pn_selectable_t *create_ctrl_selectable(pn_socket_t fd); + pn_driver_t *pn_driver() { pn_driver_t *d = (pn_driver_t *) malloc(sizeof(pn_driver_t)); if (!d) return NULL; + d->error = pn_error(); d->io = pn_io(); + d->selector = pn_io_selector(d->io); d->listener_head = NULL; d->listener_tail = NULL; d->listener_next = NULL; + d->ready_listener_head = NULL; + d->ready_listener_tail = NULL; d->connector_head = NULL; d->connector_tail = NULL; - d->connector_next = NULL; + d->ready_connector_head = NULL; + d->ready_connector_tail = NULL; d->listener_count = 0; d->connector_count = 0; - d->closed_count = 0; - // d->max_fds = 0; d->ctrl[0] = 0; d->ctrl[1] = 0; d->trace = ((pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) | (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) | (pn_env_bool("PN_TRACE_DRV") ? PN_TRACE_DRV : PN_TRACE_OFF)); - d->wakeup = 0; // XXX - if (pn_socket_pair(d->ctrl)) { + if (pn_pipe(d->io, d->ctrl)) { perror("Can't create control pipe"); free(d); return NULL; } + d->ctrl_selectable = create_ctrl_selectable(d->ctrl[0]); + pn_selector_add(d->selector, d->ctrl_selectable); return d; } @@ -626,8 +731,9 @@ void pn_driver_free(pn_driver_t *d) { if (!d) return; - close(d->ctrl[0]); - close(d->ctrl[1]); + pn_selectable_free(d->ctrl_selectable); + pn_close(d->io, d->ctrl[0]); + pn_close(d->io, d->ctrl[1]); while (d->connector_head) pn_connector_free(d->connector_head); while (d->listener_head) @@ -640,7 +746,7 @@ void pn_driver_free(pn_driver_t *d) int pn_driver_wakeup(pn_driver_t *d) { if (d) { - ssize_t count = send(d->ctrl[1], "x", 1, 0); + ssize_t count = pn_write(d->io, d->ctrl[1], "x", 1); if (count <= 0) { return count; } else { @@ -651,158 +757,57 @@ int pn_driver_wakeup(pn_driver_t *d) } } -static void pn_driver_rebuild(pn_driver_t *d) +void pn_driver_wait_1(pn_driver_t *d) { - d->wakeup = 0; - d->overflow = false; - int r_avail = FD_SETSIZE; - int w_avail = FD_SETSIZE; - // d->max_fds = -1; - FD_ZERO(&d->readfds); - FD_ZERO(&d->writefds); - FD_ZERO(&d->exceptfds); - - FD_SET(d->ctrl[0], &d->readfds); - // if (d->ctrl[0] > d->max_fds) d->max_fds = d->ctrl[0]; - - pn_listener_t *l = d->listener_head; - for (unsigned i = 0; i < d->listener_count; i++) { - if (r_avail) { - FD_SET(l->fd, &d->readfds); - // if (l->fd > d->max_fds) d->max_fds = l->fd; - r_avail--; - l = l->listener_next; - } - else { - d->overflow = true; - break; - } - } +} +int pn_driver_wait_2(pn_driver_t *d, int timeout) +{ + // These lists will normally be empty + while (d->ready_listener_head) + ready_listener_list_remove(d, d->ready_listener_head); + while (d->ready_connector_head) + ready_connector_list_remove(d, d->ready_connector_head); pn_connector_t *c = d->connector_head; for (unsigned i = 0; i < d->connector_count; i++) { - if (!c->closed) { - FD_SET(c->fd, &d->exceptfds); - d->wakeup = pn_timestamp_min(d->wakeup, c->wakeup); - if (c->status & PN_SEL_RD) { - if (r_avail) { - FD_SET(c->fd, &d->readfds); - r_avail--; - } - else { - d->overflow = true; - break; - } - } - if (c->status & PN_SEL_WR) { - if (w_avail) { - FD_SET(c->fd, &d->writefds); - w_avail--; - } - else { - d->overflow = true; - break; - } - } - // if (c->fd > d->max_fds) d->max_fds = c->fd; + // Optimistically use a snapshot of the non-threadsafe vars. + // If they are in flux, the app will guarantee progress with a pn_driver_wakeup(). + int current_status = c->status; + pn_timestamp_t current_wakeup = c->wakeup; + if (c->posted_status != current_status || c->posted_wakeup != current_wakeup) { + c->posted_status = current_status; + c->posted_wakeup = current_wakeup; + pn_selector_update(c->driver->selector, c->selectable); + } + if (c->closed) { + c->pending_read = false; + c->pending_write = false; + c->pending_tick = false; + LL_ADD(d, ready_connector, c); } c = c->connector_next; } -} -void pn_driver_wait_1(pn_driver_t *d) -{ - pn_driver_rebuild(d); -} + if (d->ready_connector_head) + timeout = 0; // We found closed connections -int pn_driver_wait_2(pn_driver_t *d, int timeout) -{ - if (d->overflow) - return pn_error_set(d->error, PN_ERR, "maximum driver sockets exceeded"); - if (d->wakeup) { - pn_timestamp_t now = pn_i_now(); - if (now >= d->wakeup) - timeout = 0; - else - timeout = (timeout < 0) ? d->wakeup-now : pn_min(timeout, d->wakeup - now); - } - - struct timeval to = {0}; - struct timeval *to_arg = &to; - // block only if (timeout == 0) and (closed_count == 0) - if (d->closed_count == 0) { - if (timeout > 0) { - // convert millisecs to sec and usec: - to.tv_sec = timeout/1000; - to.tv_usec = (timeout - (to.tv_sec * 1000)) * 1000; - } - else if (timeout < 0) { - to_arg = NULL; - } - } - int nfds = select(/* d->max_fds */ 0, &d->readfds, &d->writefds, &d->exceptfds, to_arg); - if (nfds == SOCKET_ERROR) { - errno = WSAGetLastError(); - pn_i_error_from_errno(d->error, "select"); + int code = pn_selector_select(d->selector, timeout); + if (code) { + pn_error_set(d->error, code, "select"); return -1; } + get_new_events(d); return 0; } int pn_driver_wait_3(pn_driver_t *d) { - bool woken = false; - if (FD_ISSET(d->ctrl[0], &d->readfds)) { - woken = true; - //clear the pipe - char buffer[512]; - while (recv(d->ctrl[0], buffer, 512, 0) == 512); - } - - pn_listener_t *l = d->listener_head; - while (l) { - l->pending = (FD_ISSET(l->fd, &d->readfds)); - l = l->listener_next; - } - - pn_timestamp_t now = pn_i_now(); - pn_connector_t *c = d->connector_head; - while (c) { - if (c->closed) { - c->pending_read = false; - c->pending_write = false; - c->pending_tick = false; - } else { - c->pending_read = FD_ISSET(c->fd, &d->readfds); - c->pending_write = FD_ISSET(c->fd, &d->writefds); - c->pending_tick = (c->wakeup && c->wakeup <= now); -// Unlike Posix no distinction of POLLERR and POLLHUP -// if (idx && d->fds[idx].revents & POLLERR) -// pn_connector_close(c); -// else if (idx && (d->fds[idx].revents & POLLHUP)) { -// [...] -// Strategy, defer error to a recv or send if read or write pending. -// Otherwise proclaim the connection dead. - if (!c->pending_read && !c->pending_write) { - if (FD_ISSET(c->fd, &d->exceptfds)) { - // can't defer error to a read or write, close now. - // How to get WSAlastError() equivalent info? - fprintf(stderr, "connector cleanup on unknown error %s\n", c->name); - pn_connector_close(c); - } - } - } - c = c->connector_next; - } - - d->listener_next = d->listener_head; - d->connector_next = d->connector_head; - - return woken ? PN_INTR : 0; + // no-op with new selector/selectables + return 0; } -// + // XXX - pn_driver_wait has been divided into three internal functions as a // temporary workaround for a multi-threading problem. A multi-threaded // application must hold a lock on parts 1 and 3, but not on part 2. @@ -821,103 +826,75 @@ int pn_driver_wait(pn_driver_t *d, int t return pn_driver_wait_3(d); } +static void get_new_events(pn_driver_t *d) +{ + bool woken = false; + int events; + pn_selectable_t *sel; + while ((sel = pn_selector_next(d->selector, &events)) != NULL) { + if (sel == d->ctrl_selectable) { + woken = true; + //clear the pipe + char buffer[512]; + while (pn_read(d->io, d->ctrl[0], buffer, 512) == 512); + continue; + } + + void *ctx = pni_selectable_get_context(sel); + sel_type_t *type = (sel_type_t *) ctx; + if (*type == CONNECTOR) { + pn_connector_t *c = (pn_connector_t *) ctx; + if (!c->closed) { + LL_ADD(d, ready_connector, c); + c->pending_read = events & PN_READABLE; + c->pending_write = events & PN_WRITABLE; + c->pending_tick = events & PN_EXPIRED; + } + } else { + pn_listener_t *l = (pn_listener_t *) ctx; + LL_ADD(d, ready_listener, l); + l->pending = events & PN_READABLE; + } + } +} + pn_listener_t *pn_driver_listener(pn_driver_t *d) { if (!d) return NULL; - while (d->listener_next) { - pn_listener_t *l = d->listener_next; - d->listener_next = l->listener_next; - - if (l->pending) { + pn_listener_t *l = d->ready_listener_head; + while (l) { + ready_listener_list_remove(d, l); + if (l->pending) return l; - } + l = d->ready_listener_head; } - return NULL; } pn_connector_t *pn_driver_connector(pn_driver_t *d) { if (!d) return NULL; - while (d->connector_next) { - pn_connector_t *c = d->connector_next; - d->connector_next = c->connector_next; - + pn_connector_t *c = d->ready_connector_head; + while (c) { + ready_connector_list_remove(d, c); if (c->closed || c->pending_read || c->pending_write || c->pending_tick) { return c; } + c = d->ready_connector_head; } - return NULL; } -static int pn_socket_pair (SOCKET sv[2]) { - // no socketpair on windows. provide pipe() semantics using sockets - - SOCKET sock = socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto); - if (sock == INVALID_SOCKET) { - perror("socket"); - return -1; - } - - BOOL b = 1; - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char *) &b, sizeof(b)) == -1) { - perror("setsockopt"); - closesocket(sock); - return -1; - } - else { - struct sockaddr_in addr = {0}; - addr.sin_family = AF_INET; - addr.sin_port = 0; - addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); - - if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) == -1) { - perror("bind"); - closesocket(sock); - return -1; - } - } - - if (listen(sock, 50) == -1) { - perror("listen"); - closesocket(sock); - return -1; - } - - if ((sv[1] = socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto)) == INVALID_SOCKET) { - perror("sock1"); - closesocket(sock); - return -1; - } - else { - struct sockaddr addr = {0}; - int l = sizeof(addr); - if (getsockname(sock, &addr, &l) == -1) { - perror("getsockname"); - closesocket(sock); - return -1; - } - - if (connect(sv[1], &addr, sizeof(addr)) == -1) { - int err = WSAGetLastError(); - fprintf(stderr, "connect wsaerrr %d\n", err); - closesocket(sock); - closesocket(sv[1]); - return -1; - } - - if ((sv[0] = accept(sock, &addr, &l)) == INVALID_SOCKET) { - perror("accept"); - closesocket(sock); - closesocket(sv[1]); - return -1; - } - } - - u_long v = 1; - ioctlsocket (sv[0], FIONBIO, &v); - ioctlsocket (sv[1], FIONBIO, &v); - closesocket(sock); - return 0; +static pn_selectable_t *create_ctrl_selectable(pn_socket_t fd) +{ + // ctrl input only needs to know about read events, just like a listener. + pn_selectable_t *sel = pni_selectable(driver_listener_capacity, + driver_listener_pending, + driver_listener_deadline, + driver_listener_readable, + driver_listener_writable, + driver_listener_expired, + driver_listener_finalize); + pni_selectable_set_fd(sel, fd); + return sel; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org