Author: gsim Date: Fri Oct 10 12:41:36 2014 New Revision: 1630790 URL: http://svn.apache.org/r1630790 Log: PROTON-640 Windows IO completion port implementation for pn_io and pn_selectable classes
Added: qpid/proton/branches/examples/proton-c/src/windows/iocp.c qpid/proton/branches/examples/proton-c/src/windows/iocp.h qpid/proton/branches/examples/proton-c/src/windows/write_pipeline.c Modified: qpid/proton/branches/examples/proton-c/CMakeLists.txt qpid/proton/branches/examples/proton-c/include/proton/io.h qpid/proton/branches/examples/proton-c/include/proton/selector.h qpid/proton/branches/examples/proton-c/src/messenger/messenger.c qpid/proton/branches/examples/proton-c/src/posix/io.c qpid/proton/branches/examples/proton-c/src/posix/selector.c qpid/proton/branches/examples/proton-c/src/windows/io.c qpid/proton/branches/examples/proton-c/src/windows/selector.c Modified: qpid/proton/branches/examples/proton-c/CMakeLists.txt URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/CMakeLists.txt?rev=1630790&r1=1630789&r2=1630790&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-c/CMakeLists.txt (original) +++ qpid/proton/branches/examples/proton-c/CMakeLists.txt Fri Oct 10 12:41:36 2014 @@ -81,7 +81,7 @@ add_custom_command ( # Select driver if(PN_WINAPI) - set (pn_io_impl src/windows/io.c) + set (pn_io_impl src/windows/io.c src/windows/iocp.c src/windows/write_pipeline.c) set (pn_selector_impl src/windows/selector.c) set (pn_driver_impl src/windows/driver.c) else(PN_WINAPI) Modified: qpid/proton/branches/examples/proton-c/include/proton/io.h URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/include/proton/io.h?rev=1630790&r1=1630789&r2=1630790&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-c/include/proton/io.h (original) +++ qpid/proton/branches/examples/proton-c/include/proton/io.h Fri Oct 10 12:41:36 2014 @@ -44,6 +44,7 @@ typedef int pn_socket_t; #endif typedef struct pn_io_t pn_io_t; +typedef struct pn_selector_t pn_selector_t; PN_EXTERN pn_io_t *pn_io(void); PN_EXTERN void pn_io_free(pn_io_t *io); @@ -58,6 +59,7 @@ PN_EXTERN int pn_pipe(pn_io_t *io, pn_so PN_EXTERN ssize_t pn_read(pn_io_t *io, pn_socket_t socket, void *buf, size_t size); PN_EXTERN ssize_t pn_write(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size); PN_EXTERN bool pn_wouldblock(pn_io_t *io); +PN_EXTERN pn_selector_t *pn_io_selector(pn_io_t *io); #ifdef __cplusplus } Modified: qpid/proton/branches/examples/proton-c/include/proton/selector.h URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/include/proton/selector.h?rev=1630790&r1=1630789&r2=1630790&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-c/include/proton/selector.h (original) +++ qpid/proton/branches/examples/proton-c/include/proton/selector.h Fri Oct 10 12:41:36 2014 @@ -34,9 +34,7 @@ extern "C" { #define PN_WRITABLE (2) #define PN_EXPIRED (4) -typedef struct pn_selector_t pn_selector_t; - -PN_EXTERN pn_selector_t *pn_selector(void); +pn_selector_t *pni_selector(void); PN_EXTERN void pn_selector_free(pn_selector_t *selector); PN_EXTERN void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable); PN_EXTERN void pn_selector_update(pn_selector_t *selector, pn_selectable_t *selectable); Modified: qpid/proton/branches/examples/proton-c/src/messenger/messenger.c URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/messenger/messenger.c?rev=1630790&r1=1630789&r2=1630790&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-c/src/messenger/messenger.c (original) +++ qpid/proton/branches/examples/proton-c/src/messenger/messenger.c Fri Oct 10 12:41:36 2014 @@ -612,7 +612,7 @@ pn_messenger_t *pn_messenger(const char pni_selectable_set_context(m->interruptor, m); m->listeners = pn_list(0, 0); m->connections = pn_list(0, 0); - m->selector = pn_selector(); + m->selector = pn_io_selector(m->io); m->collector = pn_collector(); m->credit_mode = LINK_CREDIT_EXPLICIT; m->credit_batch = 1024; Modified: qpid/proton/branches/examples/proton-c/src/posix/io.c URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/posix/io.c?rev=1630790&r1=1630789&r2=1630790&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-c/src/posix/io.c (original) +++ qpid/proton/branches/examples/proton-c/src/posix/io.c Fri Oct 10 12:41:36 2014 @@ -21,6 +21,7 @@ #include <proton/io.h> #include <proton/object.h> +#include <proton/selector.h> #include <ctype.h> #include <errno.h> @@ -43,6 +44,7 @@ struct pn_io_t { char host[MAX_HOST]; char serv[MAX_SERV]; pn_error_t *error; + pn_selector_t *selector; bool wouldblock; }; @@ -51,6 +53,7 @@ void pn_io_initialize(void *obj) pn_io_t *io = (pn_io_t *) obj; io->error = pn_error(); io->wouldblock = false; + io->selector = NULL; } void pn_io_finalize(void *obj) @@ -275,3 +278,10 @@ bool pn_wouldblock(pn_io_t *io) { return io->wouldblock; } + +pn_selector_t *pn_io_selector(pn_io_t *io) +{ + if (io->selector == NULL) + io->selector = pni_selector(); + return io->selector; +} Modified: qpid/proton/branches/examples/proton-c/src/posix/selector.c URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/posix/selector.c?rev=1630790&r1=1630789&r2=1630790&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-c/src/posix/selector.c (original) +++ qpid/proton/branches/examples/proton-c/src/posix/selector.c Fri Oct 10 12:41:36 2014 @@ -65,7 +65,7 @@ void pn_selector_finalize(void *obj) #define pn_selector_compare NULL #define pn_selector_inspect NULL -pn_selector_t *pn_selector(void) +pn_selector_t *pni_selector(void) { static const pn_class_t clazz = PN_CLASS(pn_selector); pn_selector_t *selector = (pn_selector_t *) pn_new(sizeof(pn_selector_t), &clazz); Modified: qpid/proton/branches/examples/proton-c/src/windows/io.c URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/windows/io.c?rev=1630790&r1=1630789&r2=1630790&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-c/src/windows/io.c (original) +++ qpid/proton/branches/examples/proton-c/src/windows/io.c Fri Oct 10 12:41:36 2014 @@ -27,32 +27,47 @@ #error "Proton requires Windows API support for XP or later." #endif #include <winsock2.h> +#include <mswsock.h> #include <Ws2tcpip.h> #define PN_WINAPI -#include "../platform.h" +#include "platform.h" #include <proton/io.h> #include <proton/object.h> +#include <proton/selector.h> +#include "iocp.h" +#include "util.h" #include <ctype.h> #include <errno.h> #include <stdio.h> #include <assert.h> -static int pni_error_from_wsaerr(pn_error_t *error, const char *msg) { - errno = WSAGetLastError(); - return pn_i_error_from_errno(error, msg); +int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code) +{ + // Error code can be from GetLastError or WSAGetLastError, + char err[1024] = {0}; + FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS | + FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL); + return pn_error_format(error, PN_ERR, "%s: %s", msg, err); +} + +static void io_log(const char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fflush(stderr); } - -#define MAX_HOST (1024) -#define MAX_SERV (64) - struct pn_io_t { - char host[MAX_HOST]; - char serv[MAX_SERV]; + char host[NI_MAXHOST]; + char serv[NI_MAXSERV]; pn_error_t *error; + bool trace; bool wouldblock; + iocp_t *iocp; }; void pn_io_initialize(void *obj) @@ -60,21 +75,24 @@ void pn_io_initialize(void *obj) pn_io_t *io = (pn_io_t *) obj; io->error = pn_error(); io->wouldblock = false; + io->trace = pn_env_bool("PN_TRACE_DRV"); /* Request WinSock 2.2 */ WORD wsa_ver = MAKEWORD(2, 2); WSADATA unused; int err = WSAStartup(wsa_ver, &unused); if (err) { - pni_error_from_wsaerr(io->error, "pipe"); - fprintf(stderr, "Can't load WinSock: %d\n", err); + pni_win32_error(io->error, "WSAStartup", WSAGetLastError()); + fprintf(stderr, "Can't load WinSock: %d\n", pn_error_text(io->error)); } + io->iocp = pni_iocp(); } void pn_io_finalize(void *obj) { pn_io_t *io = (pn_io_t *) obj; pn_error_free(io->error); + pn_free(io->iocp); WSACleanup(); } @@ -100,20 +118,40 @@ pn_error_t *pn_io_error(pn_io_t *io) return io->error; } +static void ensure_unique(pn_io_t *io, pn_socket_t new_socket) +{ + // A brand new socket can have the same HANDLE value as a previous + // one after a socketclose. If the application closes one itself + // (i.e. not using pn_close), we don't find out about it until here. + iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, new_socket); + if (iocpd) { + if (io->trace) + io_log("Stale external socket reference discarded\n"); + // Re-use means former socket instance was closed + assert(iocpd->ops_in_progress == 0); + assert(iocpd->external); + // Clean up the straggler as best we can + pn_socket_t sock = iocpd->socket; + iocpd->socket = INVALID_SOCKET; + pni_iocpdesc_map_del(io->iocp, sock); // may free the iocpdesc_t depending on refcount + } +} + + /* - * Windows pipes don't work with select(), so a socket based pipe - * workaround is provided. They do work with completion ports, so the - * workaround can be disposed with in future. + * This heavyweight surrogate pipe could be replaced with a normal Windows pipe + * now that select() is no longer used. If interrupt semantics are all that is + * needed, a simple user space counter and reserved completion status would + * probably suffice. */ -static int pni_socket_pair(SOCKET sv[2]); +static int pni_socket_pair(pn_io_t *io, SOCKET sv[2]); int pn_pipe(pn_io_t *io, pn_socket_t *dest) { - int n = pni_socket_pair(dest); + int n = pni_socket_pair(io, dest); if (n) { - pni_error_from_wsaerr(io->error, "pipe"); + pni_win32_error(io->error, "pipe", WSAGetLastError()); } - return n; } @@ -125,9 +163,14 @@ static void pn_configure_sock(pn_io_t *i if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)) != 0) { perror("setsockopt"); } + + u_long nonblock = 1; + if (ioctlsocket(sock, FIONBIO, &nonblock)) { + perror("ioctlsocket"); + } } -static inline pn_socket_t pn_create_socket(void); +static inline pn_socket_t pni_create_socket(); pn_socket_t pn_listen(pn_io_t *io, const char *host, const char *port) { @@ -138,34 +181,43 @@ pn_socket_t pn_listen(pn_io_t *io, const return INVALID_SOCKET; } - pn_socket_t sock = pn_create_socket(); + pn_socket_t sock = pni_create_socket(); if (sock == INVALID_SOCKET) { - pni_error_from_wsaerr(io->error, "pn_create_socket"); + pni_win32_error(io->error, "pni_create_socket", WSAGetLastError()); return INVALID_SOCKET; } + ensure_unique(io, sock); - BOOL optval = 1; - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char *) &optval, sizeof(optval)) == -1) { - pni_error_from_wsaerr(io->error, "setsockopt"); + bool optval = 1; + if (setsockopt(sock, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char *) &optval, + sizeof(optval)) == -1) { + pni_win32_error(io->error, "setsockopt", WSAGetLastError()); closesocket(sock); return INVALID_SOCKET; } if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) { - pni_error_from_wsaerr(io->error, "bind"); + pni_win32_error(io->error, "bind", WSAGetLastError()); freeaddrinfo(addr); closesocket(sock); return INVALID_SOCKET; } - freeaddrinfo(addr); if (listen(sock, 50) == -1) { - pni_error_from_wsaerr(io->error, "listen"); + pni_win32_error(io->error, "listen", WSAGetLastError()); closesocket(sock); return INVALID_SOCKET; } + iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false); + if (!iocpd) { + pn_i_error_from_errno(io->error, "register"); + closesocket(sock); + return INVALID_SOCKET; + } + + pni_iocpdesc_start(iocpd); return sock; } @@ -181,66 +233,83 @@ pn_socket_t pn_connect(pn_io_t *io, cons return INVALID_SOCKET; } - pn_socket_t sock = pn_create_socket(); + pn_socket_t sock = pni_create_socket(); if (sock == INVALID_SOCKET) { - pni_error_from_wsaerr(io->error, "pn_create_socket"); + pni_win32_error(io->error, "proton pni_create_socket", WSAGetLastError()); + freeaddrinfo(addr); return INVALID_SOCKET; } + ensure_unique(io, sock); pn_configure_sock(io, sock); - - if (connect(sock, addr->ai_addr, addr->ai_addrlen) != 0) { - if (WSAGetLastError() != WSAEWOULDBLOCK) { - pni_error_from_wsaerr(io->error, "connect"); - freeaddrinfo(addr); - closesocket(sock); - return INVALID_SOCKET; - } - } - - freeaddrinfo(addr); - - return sock; + return pni_iocp_begin_connect(io->iocp, sock, addr, io->error); } -pn_socket_t pn_accept(pn_io_t *io, pn_socket_t socket, char *name, size_t size) +pn_socket_t pn_accept(pn_io_t *io, pn_socket_t listen_sock, char *name, size_t size) { struct sockaddr_in addr = {0}; addr.sin_family = AF_INET; socklen_t addrlen = sizeof(addr); - pn_socket_t sock = accept(socket, (struct sockaddr *) &addr, &addrlen); - if (sock == INVALID_SOCKET) { - pni_error_from_wsaerr(io->error, "accept"); - return sock; + iocpdesc_t *listend = pni_iocpdesc_map_get(io->iocp, listen_sock); + pn_socket_t accept_sock; + + if (listend) + accept_sock = pni_iocp_end_accept(listend, (struct sockaddr *) &addr, &addrlen, &io->wouldblock, io->error); + else { + // User supplied socket + accept_sock = accept(listen_sock, (struct sockaddr *) &addr, &addrlen); + if (accept_sock == INVALID_SOCKET) + pni_win32_error(io->error, "sync accept", WSAGetLastError()); + } + + if (accept_sock == INVALID_SOCKET) + return accept_sock; + + int code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST, + io->serv, NI_MAXSERV, 0); + if (code) + code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST, + io->serv, NI_MAXSERV, NI_NUMERICHOST | NI_NUMERICSERV); + if (code) { + pn_error_format(io->error, PN_ERR, "getnameinfo: %s\n", gai_strerror(code)); + pn_close(io, accept_sock); + return INVALID_SOCKET; } else { - int code; - if ((code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, MAX_HOST, io->serv, MAX_SERV, 0))) { - pn_error_format(io->error, PN_ERR, "getnameinfo: %s\n", gai_strerror(code)); - if (closesocket(sock) == -1) - pni_error_from_wsaerr(io->error, "closesocket"); - return INVALID_SOCKET; - } else { - pn_configure_sock(io, sock); - snprintf(name, size, "%s:%s", io->host, io->serv); - return sock; + pn_configure_sock(io, accept_sock); + snprintf(name, size, "%s:%s", io->host, io->serv); + if (listend) { + pni_iocpdesc_start(pni_iocpdesc_map_get(io->iocp, accept_sock)); } + return accept_sock; } } -static inline pn_socket_t pn_create_socket(void) { +static inline pn_socket_t pni_create_socket() { return socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto); } ssize_t pn_send(pn_io_t *io, pn_socket_t sockfd, const void *buf, size_t len) { - ssize_t count = send(sockfd, (const char *) buf, len, 0); - io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK; + ssize_t count; + iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, sockfd); + if (iocpd) { + count = pni_iocp_begin_write(iocpd, buf, len, &io->wouldblock, io->error); + } else { + count = send(sockfd, (const char *) buf, len, 0); + io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK; + } return count; } ssize_t pn_recv(pn_io_t *io, pn_socket_t socket, void *buf, size_t size) { - ssize_t count = recv(socket, (char *) buf, size, 0); - io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK; + ssize_t count; + iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket); + if (iocpd) { + count = pni_iocp_recv(iocpd, buf, size, &io->wouldblock, io->error); + } else { + count = recv(socket, (char *) buf, size, 0); + io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK; + } return count; } @@ -257,7 +326,12 @@ ssize_t pn_read(pn_io_t *io, pn_socket_t void pn_close(pn_io_t *io, pn_socket_t socket) { - closesocket(socket); + iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket); + if (iocpd) + pni_iocp_begin_close(iocpd); + else { + closesocket(socket); + } } bool pn_wouldblock(pn_io_t *io) @@ -265,8 +339,24 @@ bool pn_wouldblock(pn_io_t *io) return io->wouldblock; } +pn_selector_t *pn_io_selector(pn_io_t *io) +{ + if (io->iocp->selector == NULL) + io->iocp->selector = pni_selector_create(io->iocp); + return io->iocp->selector; +} -static int pni_socket_pair (SOCKET sv[2]) { +static void configure_pipe_socket(pn_io_t *io, pn_socket_t sock) +{ + u_long v = 1; + ioctlsocket (sock, FIONBIO, &v); + ensure_unique(io, sock); + iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false); + pni_iocpdesc_start(iocpd); +} + + +static int pni_socket_pair (pn_io_t *io, SOCKET sv[2]) { // no socketpair on windows. provide pipe() semantics using sockets SOCKET sock = socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto); @@ -330,9 +420,9 @@ static int pni_socket_pair (SOCKET sv[2] } } - u_long v = 1; - ioctlsocket (sv[0], FIONBIO, &v); - ioctlsocket (sv[1], FIONBIO, &v); + configure_pipe_socket(io, sv[0]); + configure_pipe_socket(io, sv[1]); closesocket(sock); return 0; } + Added: qpid/proton/branches/examples/proton-c/src/windows/iocp.c URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/windows/iocp.c?rev=1630790&view=auto ============================================================================== --- qpid/proton/branches/examples/proton-c/src/windows/iocp.c (added) +++ qpid/proton/branches/examples/proton-c/src/windows/iocp.c Fri Oct 10 12:41:36 2014 @@ -0,0 +1,1138 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 +#endif +#if _WIN32_WINNT < 0x0501 +#error "Proton requires Windows API support for XP or later." +#endif +#include <winsock2.h> +#include <mswsock.h> +#include <Ws2tcpip.h> +#define PN_WINAPI + +#include "../platform.h" +#include <proton/object.h> +#include <proton/io.h> +#include <proton/selector.h> +#include <proton/error.h> +#include <proton/transport.h> +#include "iocp.h" +#include "util.h" +#include <assert.h> + +/* + * Windows IO Completion Port support for Proton. + * + * Overlapped writes are used to avoid lengthy stalls between write + * completion and starting a new write. Non-overlapped reads are used + * since Windows accumulates inbound traffic without stalling and + * managing read buffers would not avoid a memory copy at the pn_read + * boundary. + */ + +// Max number of overlapped accepts per listener +#define IOCP_MAX_ACCEPTS 10 + +// AcceptEx squishes the local and remote addresses and optional data +// all together when accepting the connection. Reserve enough for +// IPv6 addresses, even if the socket is IPv4. The 16 bytes padding +// per address is required by AcceptEx. +#define IOCP_SOCKADDRMAXLEN (sizeof(sockaddr_in6) + 16) +#define IOCP_SOCKADDRBUFLEN (2 * IOCP_SOCKADDRMAXLEN) + +static void iocp_log(const char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fflush(stderr); +} + +static void set_iocp_error_status(pn_error_t *error, int code, HRESULT status) +{ + char buf[512]; + if (FormatMessage(FORMAT_MESSAGE_MAX_WIDTH_MASK | FORMAT_MESSAGE_FROM_SYSTEM, + 0, status, 0, buf, sizeof(buf), 0)) + pn_error_set(error, code, buf); + else { + fprintf(stderr, "pn internal Windows error: %lu\n", GetLastError()); + } +} + +static void reap_check(iocpdesc_t *); +static void bind_to_completion_port(iocpdesc_t *iocpd); +static void iocp_shutdown(iocpdesc_t *iocpd); +static void start_reading(iocpdesc_t *iocpd); +static bool is_listener(iocpdesc_t *iocpd); +static void release_sys_sendbuf(SOCKET s); + +static void iocpdesc_fail(iocpdesc_t *iocpd, HRESULT status, const char* text) +{ + pni_win32_error(iocpd->error, text, status); + if (iocpd->iocp->iocp_trace) { + iocp_log("connection terminated: %s\n", pn_error_text(iocpd->error)); + } + if (!is_listener(iocpd) && !iocpd->write_closed && !pni_write_pipeline_size(iocpd->pipeline)) + iocp_shutdown(iocpd); + iocpd->write_closed = true; + iocpd->read_closed = true; + pni_events_update(iocpd, iocpd->events | PN_READABLE | PN_WRITABLE); +} + +// Helper functions to use specialized IOCP AcceptEx() and ConnectEx() +static LPFN_ACCEPTEX lookup_accept_ex(SOCKET s) +{ + GUID guid = WSAID_ACCEPTEX; + DWORD bytes = 0; + LPFN_ACCEPTEX fn; + WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), + &fn, sizeof(fn), &bytes, NULL, NULL); + assert(fn); + return fn; +} + +static LPFN_CONNECTEX lookup_connect_ex(SOCKET s) +{ + GUID guid = WSAID_CONNECTEX; + DWORD bytes = 0; + LPFN_CONNECTEX fn; + WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), + &fn, sizeof(fn), &bytes, NULL, NULL); + assert(fn); + return fn; +} + +static LPFN_GETACCEPTEXSOCKADDRS lookup_get_accept_ex_sockaddrs(SOCKET s) +{ + GUID guid = WSAID_GETACCEPTEXSOCKADDRS; + DWORD bytes = 0; + LPFN_GETACCEPTEXSOCKADDRS fn; + WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), + &fn, sizeof(fn), &bytes, NULL, NULL); + assert(fn); + return fn; +} + +// match accept socket to listener socket +static iocpdesc_t *create_same_type_socket(iocpdesc_t *iocpd) +{ + sockaddr_storage sa; + socklen_t salen = sizeof(sa); + if (getsockname(iocpd->socket, (sockaddr*)&sa, &salen) == -1) + return NULL; + SOCKET s = socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM + if (s == INVALID_SOCKET) + return NULL; + return pni_iocpdesc_create(iocpd->iocp, s, false); +} + +static bool is_listener(iocpdesc_t *iocpd) +{ + return iocpd && iocpd->acceptor; +} + +// === Async accept processing + +typedef struct { + iocp_result_t base; + iocpdesc_t *new_sock; + char address_buffer[IOCP_SOCKADDRBUFLEN]; + DWORD unused; +} accept_result_t; + +static accept_result_t *accept_result(iocpdesc_t *listen_sock) { + accept_result_t *result = (accept_result_t *) pn_new(sizeof(accept_result_t), 0); + memset(result, 0, sizeof(accept_result_t)); + if (result) { + result->base.type = IOCP_ACCEPT; + result->base.iocpd = listen_sock; + } + return result; +} + +static void reset_accept_result(accept_result_t *result) { + memset(&result->base.overlapped, 0, sizeof (OVERLAPPED)); + memset(&result->address_buffer, 0, IOCP_SOCKADDRBUFLEN); +} + +struct pni_acceptor_t { + int accept_queue_size; + pn_list_t *accepts; + iocpdesc_t *listen_sock; + bool signalled; + LPFN_ACCEPTEX fn_accept_ex; + LPFN_GETACCEPTEXSOCKADDRS fn_get_accept_ex_sockaddrs; +}; + +#define pni_acceptor_compare NULL +#define pni_acceptor_inspect NULL +#define pni_acceptor_hashcode NULL + +static void pni_acceptor_initialize(void *object) +{ + pni_acceptor_t *acceptor = (pni_acceptor_t *) object; + acceptor->accepts = pn_list(IOCP_MAX_ACCEPTS, 0); +} + +static void pni_acceptor_finalize(void *object) +{ + pni_acceptor_t *acceptor = (pni_acceptor_t *) object; + size_t len = pn_list_size(acceptor->accepts); + for (size_t i = 0; i < len; i++) + pn_free(pn_list_get(acceptor->accepts, i)); + pn_free(acceptor->accepts); +} + +static pni_acceptor_t *pni_acceptor(iocpdesc_t *iocpd) +{ + static const pn_class_t clazz = PN_CLASS(pni_acceptor); + pni_acceptor_t *acceptor = (pni_acceptor_t *) pn_new(sizeof(pni_acceptor_t), &clazz); + acceptor->listen_sock = iocpd; + acceptor->accept_queue_size = 0; + acceptor->signalled = false; + pn_socket_t sock = acceptor->listen_sock->socket; + acceptor->fn_accept_ex = lookup_accept_ex(sock); + acceptor->fn_get_accept_ex_sockaddrs = lookup_get_accept_ex_sockaddrs(sock); + return acceptor; +} + +static void begin_accept(pni_acceptor_t *acceptor, accept_result_t *result) +{ + if (acceptor->listen_sock->closing) { + if (result) { + pn_free(result); + acceptor->accept_queue_size--; + } + if (acceptor->accept_queue_size == 0) + acceptor->signalled = true; + return; + } + + if (result) { + reset_accept_result(result); + } else { + if (acceptor->accept_queue_size < IOCP_MAX_ACCEPTS && + pn_list_size(acceptor->accepts) == acceptor->accept_queue_size ) { + result = accept_result(acceptor->listen_sock); + acceptor->accept_queue_size++; + } else { + // an async accept is still pending or max concurrent accepts already hit + return; + } + } + + result->new_sock = create_same_type_socket(acceptor->listen_sock); + if (result->new_sock) { + // Not yet connected. + result->new_sock->read_closed = true; + result->new_sock->write_closed = true; + + bool success = acceptor->fn_accept_ex(acceptor->listen_sock->socket, result->new_sock->socket, + result->address_buffer, 0, IOCP_SOCKADDRMAXLEN, IOCP_SOCKADDRMAXLEN, + &result->unused, (LPOVERLAPPED) result); + if (!success && WSAGetLastError() != ERROR_IO_PENDING) { + result->base.status = WSAGetLastError(); + pn_list_add(acceptor->accepts, result); + pni_events_update(acceptor->listen_sock, acceptor->listen_sock->events | PN_READABLE); + } else { + acceptor->listen_sock->ops_in_progress++; + // This socket is equally involved in the async operation. + result->new_sock->ops_in_progress++; + } + } else { + iocpdesc_fail(acceptor->listen_sock, WSAGetLastError(), "create accept socket"); + } +} + +static void complete_accept(accept_result_t *result, HRESULT status) +{ + result->new_sock->ops_in_progress--; + iocpdesc_t *ld = result->base.iocpd; + if (ld->read_closed) { + if (!result->new_sock->closing) + pni_iocp_begin_close(result->new_sock); + pn_free(result); // discard + reap_check(ld); + } else { + result->base.status = status; + pn_list_add(ld->acceptor->accepts, result); + pni_events_update(ld, ld->events | PN_READABLE); + } +} + +pn_socket_t pni_iocp_end_accept(iocpdesc_t *ld, sockaddr *addr, socklen_t *addrlen, bool *would_block, pn_error_t *error) +{ + if (!is_listener(ld)) { + set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP); + return INVALID_SOCKET; + } + if (ld->read_closed) { + set_iocp_error_status(error, PN_ERR, WSAENOTSOCK); + return INVALID_SOCKET; + } + if (pn_list_size(ld->acceptor->accepts) == 0) { + if (ld->events & PN_READABLE && ld->iocp->iocp_trace) + iocp_log("listen socket readable with no available accept completions\n"); + *would_block = true; + return INVALID_SOCKET; + } + + accept_result_t *result = (accept_result_t *) pn_list_get(ld->acceptor->accepts, 0); + pn_list_del(ld->acceptor->accepts, 0, 1); + if (!pn_list_size(ld->acceptor->accepts)) + pni_events_update(ld, ld->events & ~PN_READABLE); // No pending accepts + + pn_socket_t accept_sock; + if (result->base.status) { + accept_sock = INVALID_SOCKET; + pni_win32_error(ld->error, "accept failure", result->base.status); + if (ld->iocp->iocp_trace) + iocp_log("%s\n", pn_error_text(ld->error)); + // App never sees this socket so close it here. + pni_iocp_begin_close(result->new_sock); + } else { + accept_sock = result->new_sock->socket; + // AcceptEx special setsockopt: + setsockopt(accept_sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&ld->socket, + sizeof (SOCKET)); + if (addr && addrlen && *addrlen > 0) { + sockaddr_storage *local_addr = NULL; + sockaddr_storage *remote_addr = NULL; + int local_addrlen, remote_addrlen; + LPFN_GETACCEPTEXSOCKADDRS fn = ld->acceptor->fn_get_accept_ex_sockaddrs; + fn(result->address_buffer, 0, IOCP_SOCKADDRMAXLEN, IOCP_SOCKADDRMAXLEN, + (SOCKADDR **) &local_addr, &local_addrlen, (SOCKADDR **) &remote_addr, + &remote_addrlen); + *addrlen = pn_min(*addrlen, remote_addrlen); + memmove(addr, remote_addr, *addrlen); + } + } + + if (accept_sock != INVALID_SOCKET) { + // Connected. + result->new_sock->read_closed = false; + result->new_sock->write_closed = false; + } + + // Done with the completion result, so reuse it + result->new_sock = NULL; + begin_accept(ld->acceptor, result); + return accept_sock; +} + + +// === Async connect processing + +typedef struct { + iocp_result_t base; + char address_buffer[IOCP_SOCKADDRBUFLEN]; + struct addrinfo *addrinfo; +} connect_result_t; + +#define connect_result_initialize NULL +#define connect_result_compare NULL +#define connect_result_inspect NULL +#define connect_result_hashcode NULL + +static void connect_result_finalize(void *object) +{ + connect_result_t *result = (connect_result_t *) object; + // Do not release addrinfo until ConnectEx completes + if (result->addrinfo) + freeaddrinfo(result->addrinfo); +} + +static connect_result_t *connect_result(iocpdesc_t *iocpd, struct addrinfo *addr) { + static const pn_class_t clazz = PN_CLASS(connect_result); + connect_result_t *result = (connect_result_t *) pn_new(sizeof(connect_result_t), &clazz); + if (result) { + memset(result, 0, sizeof(connect_result_t)); + result->base.type = IOCP_CONNECT; + result->base.iocpd = iocpd; + result->addrinfo = addr; + } + return result; +} + +pn_socket_t pni_iocp_begin_connect(iocp_t *iocp, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error) +{ + // addr lives for the duration of the async connect. Caller has passed ownership here. + // See connect_result_finalize(). + // Use of Windows-specific ConnectEx() requires our socket to be "loosely" pre-bound: + sockaddr_storage sa; + memset(&sa, 0, sizeof(sa)); + sa.ss_family = addr->ai_family; + if (bind(sock, (SOCKADDR *) &sa, addr->ai_addrlen)) { + pni_win32_error(error, "begin async connection", WSAGetLastError()); + if (iocp->iocp_trace) + iocp_log("%s\n", pn_error_text(error)); + closesocket(sock); + freeaddrinfo(addr); + return INVALID_SOCKET; + } + + iocpdesc_t *iocpd = pni_iocpdesc_create(iocp, sock, false); + bind_to_completion_port(iocpd); + LPFN_CONNECTEX fn_connect_ex = lookup_connect_ex(iocpd->socket); + connect_result_t *result = connect_result(iocpd, addr); + DWORD unused; + bool success = fn_connect_ex(iocpd->socket, result->addrinfo->ai_addr, result->addrinfo->ai_addrlen, + NULL, 0, &unused, (LPOVERLAPPED) result); + if (!success && WSAGetLastError() != ERROR_IO_PENDING) { + pni_win32_error(error, "ConnectEx failure", WSAGetLastError()); + pn_free(result); + iocpd->write_closed = true; + iocpd->read_closed = true; + pni_iocp_begin_close(iocpd); + sock = INVALID_SOCKET; + if (iocp->iocp_trace) + iocp_log("%s\n", pn_error_text(error)); + } else { + iocpd->ops_in_progress++; + } + return sock; +} + +static void complete_connect(connect_result_t *result, HRESULT status) +{ + iocpdesc_t *iocpd = result->base.iocpd; + if (iocpd->closing) { + pn_free(result); + reap_check(iocpd); + return; + } + + if (status) { + iocpdesc_fail(iocpd, status, "Connect failure"); + } else { + release_sys_sendbuf(iocpd->socket); + if (setsockopt(iocpd->socket, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0)) { + iocpdesc_fail(iocpd, WSAGetLastError(), "Connect failure (update context)"); + } else { + pni_events_update(iocpd, PN_WRITABLE); + start_reading(iocpd); + } + } + pn_free(result); + return; +} + + +// === Async writes + +static bool write_in_progress(iocpdesc_t *iocpd) +{ + return pni_write_pipeline_size(iocpd->pipeline) != 0; +} + +write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen) +{ + write_result_t *result = (write_result_t *) calloc(sizeof(write_result_t), 1); + if (result) { + result->base.type = IOCP_WRITE; + result->base.iocpd = iocpd; + result->buffer.start = buf; + result->buffer.size = buflen; + } + return result; +} + +static int submit_write(write_result_t *result, const void *buf, size_t len) +{ + WSABUF wsabuf; + wsabuf.buf = (char *) buf; + wsabuf.len = len; + memset(&result->base.overlapped, 0, sizeof (OVERLAPPED)); + return WSASend(result->base.iocpd->socket, &wsabuf, 1, NULL, 0, + (LPOVERLAPPED) result, 0); +} + +ssize_t pni_iocp_begin_write(iocpdesc_t *iocpd, const void *buf, size_t len, bool *would_block, pn_error_t *error) +{ + if (len == 0) return 0; + *would_block = false; + if (is_listener(iocpd)) { + set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP); + return INVALID_SOCKET; + } + if (iocpd->closing) { + set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN); + return SOCKET_ERROR; + } + if (iocpd->write_closed) { + assert(pn_error_code(iocpd->error)); + pn_error_copy(error, iocpd->error); + if (iocpd->iocp->iocp_trace) + iocp_log("write error: %s\n", pn_error_text(error)); + return SOCKET_ERROR; + } + if (len == 0) return 0; + if (!(iocpd->events & PN_WRITABLE)) { + *would_block = true; + return SOCKET_ERROR; + } + + size_t written = 0; + size_t requested = len; + const char *outgoing = (const char *) buf; + size_t available = pni_write_pipeline_reserve(iocpd->pipeline, len); + if (!available) { + *would_block = true; + return SOCKET_ERROR; + } + + for (size_t wr_count = 0; wr_count < available; wr_count++) { + write_result_t *result = pni_write_pipeline_next(iocpd->pipeline); + assert(result); + result->base.iocpd = iocpd; + ssize_t actual_len = pn_min(len, result->buffer.size); + result->requested = actual_len; + memmove((void *)result->buffer.start, outgoing, actual_len); + outgoing += actual_len; + written += actual_len; + len -= actual_len; + + int werror = submit_write(result, result->buffer.start, actual_len); + if (werror && WSAGetLastError() != ERROR_IO_PENDING) { + pni_write_pipeline_return(iocpd->pipeline, result); + iocpdesc_fail(iocpd, WSAGetLastError(), "overlapped send"); + return SOCKET_ERROR; + } + iocpd->ops_in_progress++; + } + + if (!pni_write_pipeline_writable(iocpd->pipeline)) + pni_events_update(iocpd, iocpd->events & ~PN_WRITABLE); + return written; +} + +static void complete_write(write_result_t *result, DWORD xfer_count, HRESULT status) +{ + iocpdesc_t *iocpd = result->base.iocpd; + if (iocpd->closing) { + pni_write_pipeline_return(iocpd->pipeline, result); + if (!iocpd->write_closed && !write_in_progress(iocpd)) + iocp_shutdown(iocpd); + reap_check(iocpd); + return; + } + if (status == 0 && xfer_count > 0) { + if (xfer_count != result->requested) { + // Is this recoverable? How to preserve order if multiple overlapped writes? + pni_write_pipeline_return(iocpd->pipeline, result); + iocpdesc_fail(iocpd, WSA_OPERATION_ABORTED, "Partial overlapped write on socket"); + return; + } else { + // Success. + pni_write_pipeline_return(iocpd->pipeline, result); + if (pni_write_pipeline_writable(iocpd->pipeline)) + pni_events_update(iocpd, iocpd->events | PN_WRITABLE); + return; + } + } + pni_write_pipeline_return(iocpd->pipeline, result); + iocpdesc_fail(iocpd, status, "IOCP async write error"); +} + + +// === Async reads + +struct read_result_t { + iocp_result_t base; + size_t drain_count; + char unused_buf[1]; +}; + +static read_result_t *read_result(iocpdesc_t *iocpd) +{ + read_result_t *result = (read_result_t *) calloc(sizeof(read_result_t), 1); + if (result) { + result->base.type = IOCP_READ; + result->base.iocpd = iocpd; + } + return result; +} + +static void begin_zero_byte_read(iocpdesc_t *iocpd) +{ + if (iocpd->read_in_progress) return; + if (iocpd->read_closed) { + pni_events_update(iocpd, iocpd->events | PN_READABLE); + return; + } + + read_result_t *result = iocpd->read_result; + memset(&result->base.overlapped, 0, sizeof (OVERLAPPED)); + DWORD flags = 0; + WSABUF wsabuf; + wsabuf.buf = result->unused_buf; + wsabuf.len = 0; + int rc = WSARecv(iocpd->socket, &wsabuf, 1, NULL, &flags, + &result->base.overlapped, 0); + if (rc && WSAGetLastError() != ERROR_IO_PENDING) { + iocpdesc_fail(iocpd, WSAGetLastError(), "IOCP read error"); + return; + } + iocpd->ops_in_progress++; + iocpd->read_in_progress = true; +} + +static void drain_until_closed(iocpdesc_t *iocpd) { + int max_drain = 16 * 1024; + char buf[512]; + read_result_t *result = iocpd->read_result; + while (result->drain_count < max_drain) { + int rv = recv(iocpd->socket, buf, 512, 0); + if (rv > 0) + result->drain_count += rv; + else if (rv == 0) { + iocpd->read_closed = true; + return; + } else if (WSAGetLastError() == WSAEWOULDBLOCK) { + // wait a little longer + start_reading(iocpd); + return; + } + else + break; + } + // Graceful close indication unlikely, force the issue + if (iocpd->iocp->iocp_trace) + if (result->drain_count >= max_drain) + iocp_log("graceful close on reader abandoned (too many chars)\n"); + else + iocp_log("graceful close on reader abandoned: %d\n", WSAGetLastError()); + iocpd->read_closed = true; + closesocket(iocpd->socket); + iocpd->socket = INVALID_SOCKET; +} + + +static void complete_read(read_result_t *result, DWORD xfer_count, HRESULT status) +{ + iocpdesc_t *iocpd = result->base.iocpd; + iocpd->read_in_progress = false; + + if (iocpd->closing) { + // Application no longer reading, but we are looking for a zero length read + if (!iocpd->read_closed) + drain_until_closed(iocpd); + reap_check(iocpd); + return; + } + + if (status == 0 && xfer_count == 0) { + // Success. + pni_events_update(iocpd, iocpd->events | PN_READABLE); + } else { + iocpdesc_fail(iocpd, status, "IOCP read complete error"); + } +} + +ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error) +{ + if (size == 0) return 0; + *would_block = false; + if (is_listener(iocpd)) { + set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP); + return SOCKET_ERROR; + } + if (iocpd->closing) { + // Previous call to pn_close() + set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN); + return SOCKET_ERROR; + } + if (iocpd->read_closed) { + if (pn_error_code(iocpd->error)) + pn_error_copy(error, iocpd->error); + else + set_iocp_error_status(error, PN_ERR, WSAENOTCONN); + return SOCKET_ERROR; + } + + size_t count = recv(iocpd->socket, (char *) buf, size, 0); + if (count > 0) { + pni_events_update(iocpd, iocpd->events & ~PN_READABLE); + begin_zero_byte_read(iocpd); + return count; + } else if (count == 0) { + iocpd->read_closed = true; + return 0; + } + if (WSAGetLastError() == WSAEWOULDBLOCK) + *would_block = true; + else + set_iocp_error_status(error, PN_ERR, WSAGetLastError()); + return SOCKET_ERROR; +} + +static void start_reading(iocpdesc_t *iocpd) +{ + begin_zero_byte_read(iocpd); +} + + +// === The iocp descriptor + +static void pni_iocpdesc_initialize(void *object) +{ + iocpdesc_t *iocpd = (iocpdesc_t *) object; + memset(iocpd, 0, sizeof(iocpdesc_t)); + iocpd->socket = INVALID_SOCKET; +} + +static void pni_iocpdesc_finalize(void *object) +{ + iocpdesc_t *iocpd = (iocpdesc_t *) object; + pn_free(iocpd->acceptor); + pn_error_free(iocpd->error); + if (iocpd->pipeline) + if (write_in_progress(iocpd)) + iocp_log("iocp descriptor write leak\n"); + else + pn_free(iocpd->pipeline); + if (iocpd->read_in_progress) + iocp_log("iocp descriptor read leak\n"); + else + free(iocpd->read_result); +} + +static uintptr_t pni_iocpdesc_hashcode(void *object) +{ + iocpdesc_t *iocpd = (iocpdesc_t *) object; + return iocpd->socket; +} + +#define pni_iocpdesc_compare NULL +#define pni_iocpdesc_inspect NULL + +// Reference counted in the iocpdesc map, zombie_list, selector. +static iocpdesc_t *pni_iocpdesc(pn_socket_t s) +{ + static pn_class_t clazz = PN_CLASS(pni_iocpdesc); + assert (s != INVALID_SOCKET); + iocpdesc_t *iocpd = (iocpdesc_t *) pn_new(sizeof(iocpdesc_t), &clazz); + assert(iocpd); + iocpd->socket = s; + return iocpd; +} + +static bool is_listener_socket(pn_socket_t s) +{ + BOOL tval = false; + int tvalsz = sizeof(tval); + int code = getsockopt(s, SOL_SOCKET, SO_ACCEPTCONN, (char *)&tval, &tvalsz); + return code == 0 && tval; +} + +iocpdesc_t *pni_iocpdesc_create(iocp_t *iocp, pn_socket_t s, bool external) { + assert(!pni_iocpdesc_map_get(iocp, s)); + bool listening = is_listener_socket(s); + iocpdesc_t *iocpd = pni_iocpdesc(s); + iocpd->iocp = iocp; + if (iocpd) { + iocpd->external = external; + iocpd->error = pn_error(); + if (listening) { + iocpd->acceptor = pni_acceptor(iocpd); + } else { + iocpd->pipeline = pni_write_pipeline(iocpd); + iocpd->read_result = read_result(iocpd); + } + pni_iocpdesc_map_push(iocpd); + } + return iocpd; +} + +// === Fast lookup of a socket's iocpdesc_t + +iocpdesc_t *pni_iocpdesc_map_get(iocp_t *iocp, pn_socket_t s) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_get(iocp->iocpdesc_map, s); + return iocpd; +} + +void pni_iocpdesc_map_push(iocpdesc_t *iocpd) { + pn_hash_put(iocpd->iocp->iocpdesc_map, iocpd->socket, iocpd); + pn_decref(iocpd); + assert(pn_refcount(iocpd) == 1); +} + +void pni_iocpdesc_map_del(iocp_t *iocp, pn_socket_t s) { + pn_hash_del(iocp->iocpdesc_map, (uintptr_t) s); +} + +static void bind_to_completion_port(iocpdesc_t *iocpd) +{ + if (iocpd->bound) return; + if (!iocpd->iocp->completion_port) { + iocpdesc_fail(iocpd, WSAEINVAL, "Incomplete setup, no completion port."); + return; + } + + if (CreateIoCompletionPort ((HANDLE) iocpd->socket, iocpd->iocp->completion_port, 0, 0)) + iocpd->bound = true; + else { + iocpdesc_fail(iocpd, GetLastError(), "IOCP socket setup."); + } +} + +static void release_sys_sendbuf(SOCKET s) +{ + // Set the socket's send buffer size to zero. + int sz = 0; + int status = setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&sz, sizeof(int)); + assert(status == 0); +} + +void pni_iocpdesc_start(iocpdesc_t *iocpd) +{ + if (iocpd->bound) return; + bind_to_completion_port(iocpd); + if (is_listener(iocpd)) { + begin_accept(iocpd->acceptor, NULL); + } + else { + release_sys_sendbuf(iocpd->socket); + pni_events_update(iocpd, PN_WRITABLE); + start_reading(iocpd); + } +} + +static void complete(iocp_result_t *result, bool success, DWORD num_transferred) { + result->iocpd->ops_in_progress--; + DWORD status = success ? 0 : GetLastError(); + + switch (result->type) { + case IOCP_ACCEPT: + complete_accept((accept_result_t *) result, status); + break; + case IOCP_CONNECT: + complete_connect((connect_result_t *) result, status); + break; + case IOCP_WRITE: + complete_write((write_result_t *) result, num_transferred, status); + break; + case IOCP_READ: + complete_read((read_result_t *) result, num_transferred, status); + break; + default: + assert(false); + } +} + +void pni_iocp_drain_completions(iocp_t *iocp) +{ + while (true) { + DWORD timeout_ms = 0; + DWORD num_transferred = 0; + ULONG_PTR completion_key = 0; + OVERLAPPED *overlapped = 0; + + bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred, + &completion_key, &overlapped, timeout_ms); + if (!overlapped) + return; // timed out + iocp_result_t *result = (iocp_result_t *) overlapped; + complete(result, good_op, num_transferred); + } +} + +// returns: -1 on error, 0 on timeout, 1 successful completion +int pni_iocp_wait_one(iocp_t *iocp, int timeout, pn_error_t *error) { + DWORD win_timeout = (timeout < 0) ? INFINITE : (DWORD) timeout; + DWORD num_transferred = 0; + ULONG_PTR completion_key = 0; + OVERLAPPED *overlapped = 0; + + bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred, + &completion_key, &overlapped, win_timeout); + if (!overlapped) + if (GetLastError() == WAIT_TIMEOUT) + return 0; + else { + if (error) + pni_win32_error(error, "GetQueuedCompletionStatus", GetLastError()); + return -1; + } + + iocp_result_t *result = (iocp_result_t *) overlapped; + complete(result, good_op, num_transferred); + return 1; +} + +// === Close (graceful and otherwise) + +// zombie_list is for sockets transitioning out of iocp on their way to zero ops_in_progress +// and fully closed. + +static void zombie_list_add(iocpdesc_t *iocpd) +{ + assert(iocpd->closing); + if (!iocpd->ops_in_progress) { + // No need to make a zombie. + if (iocpd->socket != INVALID_SOCKET) { + closesocket(iocpd->socket); + iocpd->socket = INVALID_SOCKET; + iocpd->read_closed = true; + } + return; + } + // Allow 2 seconds for graceful shutdown before releasing socket resource. + iocpd->reap_time = pn_i_now() + 2000; + pn_list_add(iocpd->iocp->zombie_list, iocpd); +} + +static void reap_check(iocpdesc_t *iocpd) +{ + if (iocpd->closing && !iocpd->ops_in_progress) { + if (iocpd->socket != INVALID_SOCKET) { + closesocket(iocpd->socket); + iocpd->socket = INVALID_SOCKET; + } + pn_list_remove(iocpd->iocp->zombie_list, iocpd); + // iocpd is decref'ed and possibly released + } +} + +pn_timestamp_t pni_zombie_deadline(iocp_t *iocp) +{ + if (pn_list_size(iocp->zombie_list)) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, 0); + return iocpd->reap_time; + } + return 0; +} + +void pni_zombie_check(iocp_t *iocp, pn_timestamp_t now) +{ + pn_list_t *zl = iocp->zombie_list; + // Look for stale zombies that should have been reaped by "now" + for (size_t idx = 0; idx < pn_list_size(zl); idx++) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(zl, idx); + if (iocpd->reap_time > now) + return; + if (iocpd->socket == INVALID_SOCKET) + continue; + assert(iocpd->ops_in_progress > 0); + if (iocp->iocp_trace) + iocp_log("async close: graceful close timeout exceeded\n"); + closesocket(iocpd->socket); + iocpd->socket = INVALID_SOCKET; + iocpd->read_closed = true; + // outstanding ops should complete immediately now + } +} + +static void drain_zombie_completions(iocp_t *iocp) +{ + // No more pn_selector_select() from App, but zombies still need care and feeding + // until their outstanding async actions complete. + pni_iocp_drain_completions(iocp); + + // Discard any that have no pending async IO + size_t sz = pn_list_size(iocp->zombie_list); + for (size_t idx = 0; idx < sz;) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, idx); + if (!iocpd->ops_in_progress) { + pn_list_del(iocp->zombie_list, idx, 1); + sz--; + } else { + idx++; + } + } + + pn_timestamp_t now = pn_i_now(); + pn_timestamp_t deadline = now + 2000; + + while (pn_list_size(iocp->zombie_list)) { + if (now >= deadline) + break; + int rv = pni_iocp_wait_one(iocp, deadline - now, NULL); + if (rv < 0) { + iocp_log("unexpected IOCP failure on Proton IO shutdown %d\n", GetLastError()); + break; + } + now = pn_i_now(); + } + if (now >= deadline && pn_list_size(iocp->zombie_list)) + // Should only happen if really slow TCP handshakes, i.e. total network failure + iocp_log("network failure on Proton shutdown\n"); +} + +static pn_list_t *iocp_map_close_all(iocp_t *iocp) +{ + // Zombify stragglers, i.e. no pn_close() from the application. + pn_list_t *externals = pn_list(0, PN_REFCOUNT); + for (pn_handle_t entry = pn_hash_head(iocp->iocpdesc_map); entry; + entry = pn_hash_next(iocp->iocpdesc_map, entry)) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_value(iocp->iocpdesc_map, entry); + // Just listeners first. + if (is_listener(iocpd)) { + if (iocpd->external) { + // Owned by application, just keep a temporary reference to it. + // iocp_result_t structs must not be free'd until completed or + // the completion port is closed. + if (iocpd->ops_in_progress) + pn_list_add(externals, iocpd); + pni_iocpdesc_map_del(iocp, iocpd->socket); + } else { + // Make it a zombie. + pni_iocp_begin_close(iocpd); + } + } + } + pni_iocp_drain_completions(iocp); + + for (pn_handle_t entry = pn_hash_head(iocp->iocpdesc_map); entry; + entry = pn_hash_next(iocp->iocpdesc_map, entry)) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_value(iocp->iocpdesc_map, entry); + if (iocpd->external) { + iocpd->read_closed = true; // Do not consume from read side + iocpd->write_closed = true; // Do not shutdown write side + if (iocpd->ops_in_progress) + pn_list_add(externals, iocpd); + pni_iocpdesc_map_del(iocp, iocpd->socket); + } else { + // Make it a zombie. + pni_iocp_begin_close(iocpd); + } + } + return externals; +} + +static void zombie_list_hard_close_all(iocp_t *iocp) +{ + pni_iocp_drain_completions(iocp); + size_t zs = pn_list_size(iocp->zombie_list); + for (size_t i = 0; i < zs; i++) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i); + if (iocpd->socket != INVALID_SOCKET) { + closesocket(iocpd->socket); + iocpd->socket = INVALID_SOCKET; + iocpd->read_closed = true; + iocpd->write_closed = true; + } + } + pni_iocp_drain_completions(iocp); + + // Zombies should be all gone. Do a sanity check. + zs = pn_list_size(iocp->zombie_list); + int remaining = 0; + int ops = 0; + for (size_t i = 0; i < zs; i++) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i); + remaining++; + ops += iocpd->ops_in_progress; + } + if (remaining) + iocp_log("Proton: %d unfinished close operations (ops count = %d)\n", remaining, ops); +} + +static void iocp_shutdown(iocpdesc_t *iocpd) +{ + if (shutdown(iocpd->socket, SD_SEND)) { + if (iocpd->iocp->iocp_trace) + iocp_log("socket shutdown failed %d\n", WSAGetLastError()); + } + iocpd->write_closed = true; + if (iocpd->read_closed) { + closesocket(iocpd->socket); + iocpd->socket = INVALID_SOCKET; + } +} + +void pni_iocp_begin_close(iocpdesc_t *iocpd) +{ + assert (!iocpd->closing); + if (is_listener(iocpd)) { + // Listening socket is easy. Close the socket which will cancel async ops. + pn_socket_t old_sock = iocpd->socket; + iocpd->socket = INVALID_SOCKET; + iocpd->closing = true; + iocpd->read_closed = true; + iocpd->write_closed = true; + closesocket(old_sock); + // Pending accepts will now complete. Zombie can die when all consumed. + zombie_list_add(iocpd); + pni_iocpdesc_map_del(iocpd->iocp, old_sock); // may pn_free *iocpd + } else { + // Continue async operation looking for graceful close confirmation or timeout. + pn_socket_t old_sock = iocpd->socket; + iocpd->closing = true; + if (!iocpd->write_closed && !write_in_progress(iocpd)) + iocp_shutdown(iocpd); + zombie_list_add(iocpd); + pni_iocpdesc_map_del(iocpd->iocp, old_sock); // may pn_free *iocpd + } +} + + +// === iocp_t + +#define pni_iocp_hashcode NULL +#define pni_iocp_compare NULL +#define pni_iocp_inspect NULL + +void pni_iocp_initialize(void *obj) +{ + iocp_t *iocp = (iocp_t *) obj; + memset(iocp, 0, sizeof(iocp_t)); + pni_shared_pool_create(iocp); + iocp->completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); + assert(iocp->completion_port != NULL); + iocp->iocpdesc_map = pn_hash(0, 0.75, PN_REFCOUNT); + iocp->zombie_list = pn_list(0, PN_REFCOUNT); + iocp->iocp_trace = pn_env_bool("PN_TRACE_DRV"); + iocp->selector = NULL; +} + +void pni_iocp_finalize(void *obj) +{ + iocp_t *iocp = (iocp_t *) obj; + // Move sockets to closed state, except external sockets. + pn_list_t *externals = iocp_map_close_all(iocp); + // Now everything with ops_in_progress is in the zombie_list or the externals list. + assert(!pn_hash_head(iocp->iocpdesc_map)); + pn_free(iocp->iocpdesc_map); + + drain_zombie_completions(iocp); // Last chance for graceful close + zombie_list_hard_close_all(iocp); + CloseHandle(iocp->completion_port); // This cancels all our async ops + iocp->completion_port = NULL; + + if (pn_list_size(externals) && iocp->iocp_trace) + iocp_log("%d external sockets not closed and removed from Proton IOCP control\n", pn_list_size(externals)); + + // Now safe to free everything that might be touched by a former async operation. + pn_free(externals); + pn_free(iocp->zombie_list); + pni_shared_pool_free(iocp); +} + +iocp_t *pni_iocp() +{ + static const pn_class_t clazz = PN_CLASS(pni_iocp); + iocp_t *iocp = (iocp_t *) pn_new(sizeof(iocp_t), &clazz); + return iocp; +} Added: qpid/proton/branches/examples/proton-c/src/windows/iocp.h URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/windows/iocp.h?rev=1630790&view=auto ============================================================================== --- qpid/proton/branches/examples/proton-c/src/windows/iocp.h (added) +++ qpid/proton/branches/examples/proton-c/src/windows/iocp.h Fri Oct 10 12:41:36 2014 @@ -0,0 +1,141 @@ +#ifndef PROTON_SRC_IOCP_H +#define PROTON_SRC_IOCP_H 1 + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/import_export.h> +#include <proton/selectable.h> +#include <proton/type_compat.h> + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct pni_acceptor_t pni_acceptor_t; +typedef struct write_result_t write_result_t; +typedef struct read_result_t read_result_t; +typedef struct write_pipeline_t write_pipeline_t; +typedef struct iocpdesc_t iocpdesc_t; + + +// One per pn_io_t. + +struct iocp_t { + HANDLE completion_port; + pn_hash_t *iocpdesc_map; + pn_list_t *zombie_list; + int shared_pool_size; + char *shared_pool_memory; + write_result_t **shared_results; + write_result_t **available_results; + int shared_available_count; + size_t writer_count; + int loopback_bufsize; + bool iocp_trace; + pn_selector_t *selector; +}; + + +// One for each socket. +// This iocpdesc_t structure is ref counted by the iocpdesc_map, zombie_list, +// selector->iocp_descriptors list. It should remain ref counted in the +// zombie_list until ops_in_progress == 0 or the completion port is closed. + +struct iocpdesc_t { + pn_socket_t socket; + iocp_t *iocp; + pni_acceptor_t *acceptor; + pn_error_t *error; + int ops_in_progress; + bool read_in_progress; + write_pipeline_t *pipeline; + read_result_t *read_result; + bool external; // true if socket set up outside Proton + bool bound; // associted with the completion port + bool closing; // pn_close called + bool read_closed; // EOF or read error + bool write_closed; // shutdown sent or write error + pn_selector_t *selector; + pn_selectable_t *selectable; + int events; + int interests; + pn_timestamp_t deadline; + iocpdesc_t *triggered_list_next; + iocpdesc_t *triggered_list_prev; + iocpdesc_t *deadlines_next; + iocpdesc_t *deadlines_prev; + pn_timestamp_t reap_time;; +}; + +typedef enum { IOCP_ACCEPT, IOCP_CONNECT, IOCP_READ, IOCP_WRITE } iocp_type_t; + +typedef struct { + OVERLAPPED overlapped; + iocp_type_t type; + iocpdesc_t *iocpd; + HRESULT status; +} iocp_result_t; + +struct write_result_t { + iocp_result_t base; + size_t requested; + bool in_use; + pn_bytes_t buffer; +}; + +iocpdesc_t *pni_iocpdesc_create(iocp_t *, pn_socket_t s, bool external); +iocpdesc_t *pni_iocpdesc_map_get(iocp_t *, pn_socket_t s); +void pni_iocpdesc_map_del(iocp_t *, pn_socket_t s); +void pni_iocpdesc_map_push(iocpdesc_t *iocpd); +void pni_iocpdesc_start(iocpdesc_t *iocpd); +void pni_iocp_drain_completions(iocp_t *); +int pni_iocp_wait_one(iocp_t *, int timeout, pn_error_t *); +void pni_iocp_start_accepting(iocpdesc_t *iocpd); +pn_socket_t pni_iocp_end_accept(iocpdesc_t *ld, sockaddr *addr, socklen_t *addrlen, bool *would_block, pn_error_t *error); +pn_socket_t pni_iocp_begin_connect(iocp_t *, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error); +ssize_t pni_iocp_begin_write(iocpdesc_t *, const void *, size_t, bool *, pn_error_t *); +ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error); +void pni_iocp_begin_close(iocpdesc_t *iocpd); +iocp_t *pni_iocp(); + +void pni_events_update(iocpdesc_t *iocpd, int events); +write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen); +write_pipeline_t *pni_write_pipeline(iocpdesc_t *iocpd); +size_t pni_write_pipeline_size(write_pipeline_t *); +bool pni_write_pipeline_writable(write_pipeline_t *); +void pni_write_pipeline_return(write_pipeline_t *, write_result_t *); +size_t pni_write_pipeline_reserve(write_pipeline_t *, size_t); +write_result_t *pni_write_pipeline_next(write_pipeline_t *); +void pni_shared_pool_create(iocp_t *); +void pni_shared_pool_free(iocp_t *); +void pni_zombie_check(iocp_t *, pn_timestamp_t); +pn_timestamp_t pni_zombie_deadline(iocp_t *); + +pn_selector_t *pni_selector_create(iocp_t *iocp); + +int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code); + +#ifdef __cplusplus +} +#endif + +#endif /* iocp.h */ Modified: qpid/proton/branches/examples/proton-c/src/windows/selector.c URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/windows/selector.c?rev=1630790&r1=1630789&r2=1630790&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-c/src/windows/selector.c (original) +++ qpid/proton/branches/examples/proton-c/src/windows/selector.c Fri Oct 10 12:41:36 2014 @@ -19,21 +19,6 @@ * */ -/* - * Copy of posix poll-based selector with minimal changes to use - * select(). TODO: fully native implementaton with I/O completion - * ports. - * - * This implementation comments out the posix max_fds arg to select - * which has no meaning on windows. The number of fd_set slots are - * configured at compile time via FD_SETSIZE, chosen "large enough" - * for the limited scalability of select() at the expense of - * 3*N*sizeof(unsigned int) bytes per driver instance. select (and - * associated macros like FD_ZERO) are otherwise unaffected - * performance-wise by increasing FD_SETSIZE. - */ - -#define FD_SETSIZE 2048 #ifndef _WIN32_WINNT #define _WIN32_WINNT 0x0501 #endif @@ -44,37 +29,53 @@ #include <Ws2tcpip.h> #define PN_WINAPI -#include "../platform.h" +#include "platform.h" +#include <proton/object.h> #include <proton/io.h> #include <proton/selector.h> #include <proton/error.h> #include <assert.h> -#include "../selectable.h" -#include "../util.h" +#include "selectable.h" +#include "util.h" +#include "iocp.h" + +static void interests_update(iocpdesc_t *iocpd, int interests); +static void deadlines_update(iocpdesc_t *iocpd, pn_timestamp_t t); struct pn_selector_t { - fd_set readfds; - fd_set writefds; - fd_set exceptfds; + iocp_t *iocp; pn_timestamp_t *deadlines; size_t capacity; pn_list_t *selectables; + pn_list_t *iocp_descriptors; pn_timestamp_t deadline; size_t current; + iocpdesc_t *current_triggered; pn_timestamp_t awoken; pn_error_t *error; + iocpdesc_t *triggered_list_head; + iocpdesc_t *triggered_list_tail; + iocpdesc_t *deadlines_head; + iocpdesc_t *deadlines_tail; }; void pn_selector_initialize(void *obj) { pn_selector_t *selector = (pn_selector_t *) obj; + selector->iocp = NULL; selector->deadlines = NULL; selector->capacity = 0; selector->selectables = pn_list(0, 0); + selector->iocp_descriptors = pn_list(0, PN_REFCOUNT); selector->deadline = 0; selector->current = 0; + selector->current_triggered = NULL; selector->awoken = 0; selector->error = pn_error(); + selector->triggered_list_head = NULL; + selector->triggered_list_tail = NULL; + selector->deadlines_head = NULL; + selector->deadlines_tail = NULL; } void pn_selector_finalize(void *obj) @@ -82,28 +83,51 @@ void pn_selector_finalize(void *obj) pn_selector_t *selector = (pn_selector_t *) obj; free(selector->deadlines); pn_free(selector->selectables); + pn_free(selector->iocp_descriptors); pn_error_free(selector->error); + selector->iocp->selector = NULL; } #define pn_selector_hashcode NULL #define pn_selector_compare NULL #define pn_selector_inspect NULL -pn_selector_t *pn_selector(void) +pn_selector_t *pni_selector() { static const pn_class_t clazz = PN_CLASS(pn_selector); pn_selector_t *selector = (pn_selector_t *) pn_new(sizeof(pn_selector_t), &clazz); return selector; } +pn_selector_t *pni_selector_create(iocp_t *iocp) +{ + pn_selector_t *selector = pni_selector(); + selector->iocp = iocp; + return selector; +} + void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable) { assert(selector); assert(selectable); assert(pni_selectable_get_index(selectable) < 0); + pn_socket_t sock = pn_selectable_fd(selectable); + + iocpdesc_t *iocpd = NULL; + if (sock != INVALID_SOCKET) { + iocpd = pni_iocpdesc_map_get(selector->iocp, sock); + if (!iocpd) { + // Socket created outside proton. Hook it up to iocp. + iocpd = pni_iocpdesc_create(selector->iocp, sock, true); + pni_iocpdesc_start(iocpd); + } else { + assert(iocpd->iocp == selector->iocp); + } + } if (pni_selectable_get_index(selectable) < 0) { pn_list_add(selector->selectables, selectable); + pn_list_add(selector->iocp_descriptors, iocpd); size_t size = pn_list_size(selector->selectables); if (selector->capacity < size) { @@ -112,6 +136,10 @@ void pn_selector_add(pn_selector_t *sele } pni_selectable_set_index(selectable, size - 1); + if (iocpd) { + iocpd->selector = selector; + iocpd->selectable = selectable; + } } pn_selector_update(selector, selectable); @@ -121,18 +149,22 @@ void pn_selector_update(pn_selector_t *s { int idx = pni_selectable_get_index(selectable); assert(idx >= 0); - /* - selector->fds[idx].fd = pn_selectable_fd(selectable); - selector->fds[idx].events = 0; - selector->fds[idx].revents = 0; - if (pn_selectable_capacity(selectable) > 0) { - selector->fds[idx].events |= POLLIN; - } - if (pn_selectable_pending(selectable) > 0) { - selector->fds[idx].events |= POLLOUT; - } - */ selector->deadlines[idx] = pn_selectable_deadline(selectable); + + pn_socket_t sock = pn_selectable_fd(selectable); + iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(selector->iocp_descriptors, idx); + if (iocpd) { + assert(sock == iocpd->socket || iocpd->closing); + int interests = 0; + if (pn_selectable_capacity(selectable) > 0) { + interests |= PN_READABLE; + } + if (pn_selectable_pending(selectable) > 0) { + interests |= PN_WRITABLE; + } + interests_update(iocpd, interests); + deadlines_update(iocpd, selector->deadlines[idx]); + } } void pn_selector_remove(pn_selector_t *selector, pn_selectable_t *selectable) @@ -142,107 +174,94 @@ void pn_selector_remove(pn_selector_t *s int idx = pni_selectable_get_index(selectable); assert(idx >= 0); + iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(selector->iocp_descriptors, idx); + if (iocpd) { + if (selector->current_triggered == iocpd) + selector->current_triggered = iocpd->triggered_list_next; + interests_update(iocpd, 0); + deadlines_update(iocpd, 0); + assert(selector->triggered_list_head != iocpd && !iocpd->triggered_list_prev); + assert(selector->deadlines_head != iocpd && !iocpd->deadlines_prev); + iocpd->selector = NULL; + iocpd->selectable = NULL; + } pn_list_del(selector->selectables, idx, 1); + pn_list_del(selector->iocp_descriptors, idx, 1); size_t size = pn_list_size(selector->selectables); for (size_t i = idx; i < size; i++) { pn_selectable_t *sel = (pn_selectable_t *) pn_list_get(selector->selectables, i); pni_selectable_set_index(sel, i); } - pni_selectable_set_index(selectable, -1); } int pn_selector_select(pn_selector_t *selector, int timeout) { assert(selector); - - FD_ZERO(&selector->readfds); - FD_ZERO(&selector->writefds); - FD_ZERO(&selector->exceptfds); - - size_t size = pn_list_size(selector->selectables); - if (size > FD_SETSIZE) { - // This Windows limitation will go away when switching to completion ports - pn_error_set(selector->error, PN_ERR, "maximum sockets exceeded for Windows selector"); - return PN_ERR; - } + pn_error_clear(selector->error); + pn_timestamp_t deadline = 0; + pn_timestamp_t now = pn_i_now(); if (timeout) { - pn_timestamp_t deadline = 0; - for (size_t i = 0; i < size; i++) { - pn_timestamp_t d = selector->deadlines[i]; - if (d) - deadline = (deadline == 0) ? d : pn_min(deadline, d); - } - - if (deadline) { - pn_timestamp_t now = pn_i_now(); - int delta = selector->deadline - now; - if (delta < 0) { - timeout = 0; - } else if (delta < timeout) { - timeout = delta; - } - } - } - - struct timeval to = {0}; - struct timeval *to_arg = &to; - // block only if (timeout == 0) and (closed_count == 0) - if (timeout > 0) { - // convert millisecs to sec and usec: - to.tv_sec = timeout/1000; - to.tv_usec = (timeout - (to.tv_sec * 1000)) * 1000; - } - else if (timeout < 0) { - to_arg = NULL; + if (selector->deadlines_head) + deadline = selector->deadlines_head->deadline; } - - for (size_t i = 0; i < size; i++) { - pn_selectable_t *sel = (pn_selectable_t *) pn_list_get(selector->selectables, i); - pn_socket_t fd = pn_selectable_fd(sel); - if (pn_selectable_capacity(sel) > 0) { - FD_SET(fd, &selector->readfds); - } - if (pn_selectable_pending(sel) > 0) { - FD_SET(fd, &selector->writefds); + if (deadline) { + int delta = deadline - now; + if (delta < 0) { + delta = 0; + } + if (timeout < 0) + timeout = delta; + else if (timeout > delta) + timeout = delta; + } + deadline = (timeout >= 0) ? now + timeout : 0; + + // Process all currently available completions, even if matched events available + pni_iocp_drain_completions(selector->iocp); + pni_zombie_check(selector->iocp, now); + // Loop until an interested event is matched, or until deadline + while (true) { + if (selector->triggered_list_head) + break; + if (deadline && deadline <= now) + break; + pn_timestamp_t completion_deadline = deadline; + pn_timestamp_t zd = pni_zombie_deadline(selector->iocp); + if (zd) + completion_deadline = completion_deadline ? pn_min(zd, completion_deadline) : zd; + + int completion_timeout = (!completion_deadline) ? -1 : completion_deadline - now; + int rv = pni_iocp_wait_one(selector->iocp, completion_timeout, selector->error); + if (rv < 0) + return pn_error_code(selector->error); + + now = pn_i_now(); + if (zd && zd <= now) { + pni_zombie_check(selector->iocp, now); } } - int result = select(0 /* ignored in win32 */, &selector->readfds, &selector->writefds, &selector->exceptfds, to_arg); - if (result == -1) { - pn_i_error_from_errno(selector->error, "select"); - } else { - selector->current = 0; - selector->awoken = pn_i_now(); + selector->current = 0; + selector->awoken = now; + selector->current_triggered = selector->triggered_list_head; + for (iocpdesc_t *iocpd = selector->deadlines_head; iocpd; iocpd = iocpd->deadlines_next) { + if (iocpd->deadline <= now) + pni_events_update(iocpd, iocpd->events | PN_EXPIRED); + else + break; } - return pn_error_code(selector->error); } pn_selectable_t *pn_selector_next(pn_selector_t *selector, int *events) { - pn_list_t *l = selector->selectables; - size_t size = pn_list_size(l); - while (selector->current < size) { - pn_selectable_t *sel = (pn_selectable_t *) pn_list_get(l, selector->current); - pn_timestamp_t deadline = selector->deadlines[selector->current]; - int ev = 0; - pn_socket_t fd = pn_selectable_fd(sel); - if (FD_ISSET(fd, &selector->readfds)) { - ev |= PN_READABLE; - } - if (FD_ISSET(fd, &selector->writefds)) { - ev |= PN_WRITABLE; - } - if (deadline && selector->awoken >= deadline) { - ev |= PN_EXPIRED; - } - selector->current++; - if (ev) { - *events = ev; - return sel; - } + if (selector->current_triggered) { + iocpdesc_t *iocpd = selector->current_triggered; + *events = iocpd->interests & iocpd->events; + selector->current_triggered = iocpd->triggered_list_next; + return iocpd->selectable; } return NULL; } @@ -252,3 +271,91 @@ void pn_selector_free(pn_selector_t *sel assert(selector); pn_free(selector); } + + +static void triggered_list_add(pn_selector_t *selector, iocpdesc_t *iocpd) +{ + if (iocpd->triggered_list_prev || selector->triggered_list_head == iocpd) + return; // already in list + LL_ADD(selector, triggered_list, iocpd); +} + +static void triggered_list_remove(pn_selector_t *selector, iocpdesc_t *iocpd) +{ + if (!iocpd->triggered_list_prev && selector->triggered_list_head != iocpd) + return; // not in list + LL_REMOVE(selector, triggered_list, iocpd); + iocpd->triggered_list_prev = NULL; + iocpd->triggered_list_next = NULL; +} + + +void pni_events_update(iocpdesc_t *iocpd, int events) +{ + int old_events = iocpd->events; + if (old_events == events) + return; + iocpd->events = events; + if (iocpd->selector) { + if (iocpd->events & iocpd->interests) + triggered_list_add(iocpd->selector, iocpd); + else + triggered_list_remove(iocpd->selector, iocpd); + } +} + +static void interests_update(iocpdesc_t *iocpd, int interests) +{ + int old_interests = iocpd->interests; + if (old_interests == interests) + return; + iocpd->interests = interests; + if (iocpd->selector) { + if (iocpd->events & iocpd->interests) + triggered_list_add(iocpd->selector, iocpd); + else + triggered_list_remove(iocpd->selector, iocpd); + } +} + +static void deadlines_remove(pn_selector_t *selector, iocpdesc_t *iocpd) +{ + if (!iocpd->deadlines_prev && selector->deadlines_head != iocpd) + return; // not in list + LL_REMOVE(selector, deadlines, iocpd); + iocpd->deadlines_prev = NULL; + iocpd->deadlines_next = NULL; +} + + +static void deadlines_update(iocpdesc_t *iocpd, pn_timestamp_t deadline) +{ + if (deadline == iocpd->deadline) + return; + iocpd->deadline = deadline; + pn_selector_t *selector = iocpd->selector; + if (!deadline) { + deadlines_remove(selector, iocpd); + pni_events_update(iocpd, iocpd->events & ~PN_EXPIRED); + interests_update(iocpd, iocpd->interests & ~PN_EXPIRED); + } else { + if (iocpd->deadlines_prev || selector->deadlines_head == iocpd) { + deadlines_remove(selector, iocpd); + pni_events_update(iocpd, iocpd->events & ~PN_EXPIRED); + } + interests_update(iocpd, iocpd->interests | PN_EXPIRED); + iocpdesc_t *dl_iocpd = LL_HEAD(selector, deadlines); + while (dl_iocpd && dl_iocpd->deadline <= deadline) + dl_iocpd = dl_iocpd->deadlines_next; + if (dl_iocpd) { + // insert + iocpd->deadlines_prev = dl_iocpd->deadlines_prev; + iocpd->deadlines_next = dl_iocpd; + dl_iocpd->deadlines_prev = iocpd; + if (selector->deadlines_head == dl_iocpd) + selector->deadlines_head = iocpd; + } else { + LL_ADD(selector, deadlines, iocpd); // append + } + } +} Added: qpid/proton/branches/examples/proton-c/src/windows/write_pipeline.c URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/windows/write_pipeline.c?rev=1630790&view=auto ============================================================================== --- qpid/proton/branches/examples/proton-c/src/windows/write_pipeline.c (added) +++ qpid/proton/branches/examples/proton-c/src/windows/write_pipeline.c Fri Oct 10 12:41:36 2014 @@ -0,0 +1,312 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * A simple write buffer pool. Each socket has a dedicated "primary" + * buffer and can borrow from a shared pool with limited size tuning. + * Could enhance e.g. with separate pools per network interface and fancier + * memory tuning based on interface speed, system resources, and + * number of connections, etc. + */ + +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 +#endif +#if _WIN32_WINNT < 0x0501 +#error "Proton requires Windows API support for XP or later." +#endif +#include <winsock2.h> +#include <Ws2tcpip.h> +#define PN_WINAPI + +#include "platform.h" +#include <proton/object.h> +#include <proton/io.h> +#include <proton/selector.h> +#include <proton/error.h> +#include <assert.h> +#include "selectable.h" +#include "util.h" +#include "iocp.h" + +// Max overlapped writes per socket +#define IOCP_MAX_OWRITES 16 +// Write buffer size +#define IOCP_WBUFSIZE 16384 + +static void pipeline_log(const char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fflush(stderr); +} + +void pni_shared_pool_create(iocp_t *iocp) +{ + // TODO: more pools (or larger one) when using multiple non-loopback interfaces + iocp->shared_pool_size = 16; + char *env = getenv("PNI_WRITE_BUFFERS"); // Internal: for debugging + if (env) { + int sz = atoi(env); + if (sz >= 0 && sz < 256) { + iocp->shared_pool_size = sz; + } + } + iocp->loopback_bufsize = 0; + env = getenv("PNI_LB_BUFSIZE"); // Internal: for debugging + if (env) { + int sz = atoi(env); + if (sz >= 0 && sz <= 128 * 1024) { + iocp->loopback_bufsize = sz; + } + } + + if (iocp->shared_pool_size) { + iocp->shared_pool_memory = (char *) VirtualAlloc(NULL, IOCP_WBUFSIZE * iocp->shared_pool_size, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE); + HRESULT status = GetLastError(); + if (!iocp->shared_pool_memory) { + perror("Proton write buffer pool allocation failure\n"); + iocp->shared_pool_size = 0; + iocp->shared_available_count = 0; + return; + } + + iocp->shared_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *)); + iocp->available_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *)); + iocp->shared_available_count = iocp->shared_pool_size; + char *mem = iocp->shared_pool_memory; + for (int i = 0; i < iocp->shared_pool_size; i++) { + iocp->shared_results[i] = iocp->available_results[i] = pni_write_result(NULL, mem, IOCP_WBUFSIZE); + mem += IOCP_WBUFSIZE; + } + } +} + +void pni_shared_pool_free(iocp_t *iocp) +{ + for (int i = 0; i < iocp->shared_pool_size; i++) { + write_result_t *result = iocp->shared_results[i]; + if (result->in_use) + pipeline_log("Proton buffer pool leak\n"); + else + free(result); + } + if (iocp->shared_pool_size) { + free(iocp->shared_results); + free(iocp->available_results); + if (iocp->shared_pool_memory) { + if (!VirtualFree(iocp->shared_pool_memory, 0, MEM_RELEASE)) { + perror("write buffers release failed"); + } + iocp->shared_pool_memory = NULL; + } + } +} + +static void shared_pool_push(write_result_t *result) +{ + iocp_t *iocp = result->base.iocpd->iocp; + assert(iocp->shared_available_count < iocp->shared_pool_size); + iocp->available_results[iocp->shared_available_count++] = result; +} + +static write_result_t *shared_pool_pop(iocp_t *iocp) +{ + return iocp->shared_available_count ? iocp->available_results[--iocp->shared_available_count] : NULL; +} + +struct write_pipeline_t { + iocpdesc_t *iocpd; + size_t pending_count; + write_result_t *primary; + size_t reserved_count; + size_t next_primary_index; + size_t depth; + bool is_writer; +}; + +#define write_pipeline_compare NULL +#define write_pipeline_inspect NULL +#define write_pipeline_hashcode NULL + +static void write_pipeline_initialize(void *object) +{ + write_pipeline_t *pl = (write_pipeline_t *) object; + pl->pending_count = 0; + const char *pribuf = (const char *) malloc(IOCP_WBUFSIZE); + pl->primary = pni_write_result(NULL, pribuf, IOCP_WBUFSIZE); + pl->depth = 0; + pl->is_writer = false; +} + +static void write_pipeline_finalize(void *object) +{ + write_pipeline_t *pl = (write_pipeline_t *) object; + free((void *)pl->primary->buffer.start); + free(pl->primary); +} + +write_pipeline_t *pni_write_pipeline(iocpdesc_t *iocpd) +{ + static const pn_class_t clazz = PN_CLASS(write_pipeline); + write_pipeline_t *pipeline = (write_pipeline_t *) pn_new(sizeof(write_pipeline_t), &clazz); + pipeline->iocpd = iocpd; + pipeline->primary->base.iocpd = iocpd; + return pipeline; +} + +static void confirm_as_writer(write_pipeline_t *pl) +{ + if (!pl->is_writer) { + iocp_t *iocp = pl->iocpd->iocp; + iocp->writer_count++; + pl->is_writer = true; + } +} + +static void remove_as_writer(write_pipeline_t *pl) +{ + if (!pl->is_writer) + return; + iocp_t *iocp = pl->iocpd->iocp; + assert(iocp->writer_count); + pl->is_writer = false; + iocp->writer_count--; +} + +/* + * Optimal depth will depend on properties of the NIC, server, and driver. For now, + * just distinguish between loopback interfaces and the rest. Optimizations in the + * loopback stack allow decent performance with depth 1 and actually cause major + * performance hiccups if set to large values. + */ +static void set_depth(write_pipeline_t *pl) +{ + pl->depth = 1; + sockaddr_storage sa; + socklen_t salen = sizeof(sa); + char buf[INET6_ADDRSTRLEN]; + DWORD buflen = sizeof(buf); + + if (getsockname(pl->iocpd->socket,(sockaddr*) &sa, &salen) == 0 && + getnameinfo((sockaddr*) &sa, salen, buf, buflen, NULL, 0, NI_NUMERICHOST) == 0) { + if ((sa.ss_family == AF_INET6 && strcmp(buf, "::1")) || + (sa.ss_family == AF_INET && strncmp(buf, "127.", 4))) { + // not loopback + pl->depth = IOCP_MAX_OWRITES; + } else { + iocp_t *iocp = pl->iocpd->iocp; + if (iocp->loopback_bufsize) { + const char *p = (const char *) realloc((void *) pl->primary->buffer.start, iocp->loopback_bufsize); + if (p) { + pl->primary->buffer.start = p; + pl->primary->buffer.size = iocp->loopback_bufsize; + } + } + } + } +} + +// Reserve as many buffers as possible for count bytes. +size_t pni_write_pipeline_reserve(write_pipeline_t *pl, size_t count) +{ + if (pl->primary->in_use) + return 0; // I.e. io->wouldblock + if (!pl->depth) + set_depth(pl); + if (pl->depth == 1) { + // always use the primary + pl->reserved_count = 1; + pl->next_primary_index = 0; + return 1; + } + + iocp_t *iocp = pl->iocpd->iocp; + confirm_as_writer(pl); + int wanted = (count / IOCP_WBUFSIZE); + if (count % IOCP_WBUFSIZE) + wanted++; + size_t pending = pl->pending_count; + assert(pending < pl->depth); + int bufs = pn_min(wanted, pl->depth - pending); + // Can draw from shared pool or the primary... but share with others. + size_t writers = iocp->writer_count; + int shared_count = (iocp->shared_available_count + writers - 1) / writers; + bufs = pn_min(bufs, shared_count + 1); + pl->reserved_count = pending + bufs; + + if (bufs == wanted && + pl->reserved_count < (pl->depth / 2) && + iocp->shared_available_count > (2 * writers + bufs)) { + // No shortage: keep the primary as spare for future use + pl->next_primary_index = pl->reserved_count; + } else if (bufs == 1) { + pl->next_primary_index = pending; + } else { + // let approx 1/3 drain before replenishing + pl->next_primary_index = ((pl->reserved_count + 2) / 3) - 1; + if (pl->next_primary_index < pending) + pl->next_primary_index = pending; + } + return bufs; +} + +write_result_t *pni_write_pipeline_next(write_pipeline_t *pl) +{ + size_t sz = pl->pending_count; + if (sz >= pl->reserved_count) + return NULL; + write_result_t *result; + if (sz == pl->next_primary_index) { + result = pl->primary; + } else { + assert(pl->iocpd->iocp->shared_available_count > 0); + result = shared_pool_pop(pl->iocpd->iocp); + } + + result->in_use = true; + pl->pending_count++; + return result; +} + +void pni_write_pipeline_return(write_pipeline_t *pl, write_result_t *result) +{ + result->in_use = false; + pl->pending_count--; + pl->reserved_count = 0; + if (result != pl->primary) + shared_pool_push(result); + if (pl->pending_count == 0) + remove_as_writer(pl); +} + +bool pni_write_pipeline_writable(write_pipeline_t *pl) +{ + // Only writable if not full and we can guarantee a buffer: + return pl->pending_count < pl->depth && !pl->primary->in_use; +} + +size_t pni_write_pipeline_size(write_pipeline_t *pl) +{ + return pl->pending_count; +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org