On Wed, Nov 17, 2021 at 8:10 PM Mladen Turk <mt...@apache.org> wrote: > > If only a single thread can call apr_poll_drain_wakeup_pipe > it shoud be OK.
Calling apr_poll{set,cb}_poll() concurrently on the same pollset is quite undefined behaviour with our implementation(s), so I think we are OK here. > However I'd rather check if there is > something to read before calling apr_file_getc since it'll block. I restored nonblocking on unix in my patch already, here is a new one that does the same on Windows (I still can't compile/test it though, likewise for the OS/2 bits..). This new patch does not blindly set nonblocking for the pipe-socket (like r1894917) but adds a new ->socket boolean to Windows' struct apr_file_t (similar to the existing ->pipe flag), such that file_socket_pipe_create() can now create a nonblocking socket (via create_socket_pipe) for the read side of the pipe and apr_file_read() can do the right thing (i.e. WSARecv/WSASend) when ->socket is set. Does it work for you? Regards; Yann.
Index: file_io/win32/pipe.c =================================================================== --- file_io/win32/pipe.c (revision 1895110) +++ file_io/win32/pipe.c (working copy) @@ -354,6 +354,7 @@ static apr_status_t create_socket_pipe(SOCKET *rd, for (;;) { int ns; int nc = 0; + apr_size_t nb = 0; /* Listening socket is nonblocking by now. * The accept should create the socket * immediatelly because we are connected already. @@ -378,32 +379,28 @@ static apr_status_t create_socket_pipe(SOCKET *rd, rv = apr_get_netos_error(); goto cleanup; } - /* Verify the connection by reading the send identification. - */ - do { - if (nc++) - Sleep(1); - nrd = recv(*rd, (char *)iid, sizeof(iid), 0); - rv = nrd == SOCKET_ERROR ? apr_get_netos_error() : APR_SUCCESS; - } while (APR_STATUS_IS_EAGAIN(rv)); - - if (nrd == sizeof(iid)) { - if (memcmp(uid, iid, sizeof(uid)) == 0) { - /* Wow, we recived what we send. - * Put read side of the pipe to the blocking - * mode and return. - */ - bm = 0; - if (ioctlsocket(*rd, FIONBIO, &bm) == SOCKET_ERROR) { - rv = apr_get_netos_error(); - goto cleanup; - } - break; - } + /* Verify the connection by reading/waiting for the identification */ + bm = 0; + if (ioctlsocket(*rd, FIONBIO, &bm) == SOCKET_ERROR) { + rv = apr_get_netos_error(); + goto cleanup; } - else if (nrd == SOCKET_ERROR) { + nrd = recv(*rd, (char *)iid, sizeof(iid), 0); + if (nrd == SOCKET_ERROR) { + rv = apr_get_netos_error(); goto cleanup; } + if (nrd == (int)sizeof(uid) && memcmp(iid, uid, sizeof(uid)) == 0) { + /* Wow we received what we send, put read side of the pipe in + * nonblocking mode and return. + */ + bm = 1; + if (ioctlsocket(*rd, FIONBIO, &bm) == SOCKET_ERROR) { + rv = apr_get_netos_error(); + goto cleanup; + } + break; + } closesocket(*rd); } /* We don't need the listening socket any more */ @@ -448,8 +445,8 @@ apr_status_t file_socket_pipe_create(apr_file_t ** (*in) = (apr_file_t *)apr_pcalloc(p, sizeof(apr_file_t)); (*in)->pool = p; (*in)->fname = NULL; - (*in)->pipe = 1; - (*in)->timeout = -1; + (*in)->socket = 1; + (*in)->timeout = 0; /* nonblocking reads */ (*in)->ungetchar = -1; (*in)->eof_hit = 0; (*in)->filePtr = 0; @@ -456,13 +453,13 @@ apr_status_t file_socket_pipe_create(apr_file_t ** (*in)->bufpos = 0; (*in)->dataRead = 0; (*in)->direction = 0; - (*in)->pOverlapped = (OVERLAPPED*)apr_pcalloc(p, sizeof(OVERLAPPED)); + (*in)->pOverlapped = NULL; (*in)->filehand = (HANDLE)rd; (*out) = (apr_file_t *)apr_pcalloc(p, sizeof(apr_file_t)); (*out)->pool = p; (*out)->fname = NULL; - (*out)->pipe = 1; + (*out)->socket = 1; (*out)->timeout = -1; (*out)->ungetchar = -1; (*out)->eof_hit = 0; @@ -470,7 +467,7 @@ apr_status_t file_socket_pipe_create(apr_file_t ** (*out)->bufpos = 0; (*out)->dataRead = 0; (*out)->direction = 0; - (*out)->pOverlapped = (OVERLAPPED*)apr_pcalloc(p, sizeof(OVERLAPPED)); + (*out)->pOverlapped = NULL; (*out)->filehand = (HANDLE)wr; apr_pool_cleanup_register(p, (void *)(*in), socket_pipe_cleanup, @@ -484,7 +481,7 @@ apr_status_t file_socket_pipe_create(apr_file_t ** apr_status_t file_socket_pipe_close(apr_file_t *file) { apr_status_t stat; - if (!file->pipe) + if (!file->socket) return apr_file_close(file); if ((stat = socket_pipe_cleanup(file)) == APR_SUCCESS) { apr_pool_cleanup_kill(file->pool, file, socket_pipe_cleanup); Index: file_io/win32/readwrite.c =================================================================== --- file_io/win32/readwrite.c (revision 1895110) +++ file_io/win32/readwrite.c (working copy) @@ -20,10 +20,12 @@ #include "apr_strings.h" #include "apr_lib.h" #include "apr_errno.h" -#include <malloc.h> +#include "apr_arch_networkio.h" #include "apr_arch_atime.h" #include "apr_arch_misc.h" +#include <malloc.h> + /* * read_with_timeout() * Uses async i/o to emulate unix non-blocking i/o with timeouts. @@ -31,6 +33,7 @@ static apr_status_t read_with_timeout(apr_file_t *file, void *buf, apr_size_t len_in, apr_size_t *nbytes) { apr_status_t rv; + int pipe_or_socket = (file->pipe || file->socket); DWORD len = (DWORD)len_in; DWORD bytesread = 0; @@ -67,13 +70,28 @@ static apr_status_t read_with_timeout(apr_file_t * } } - if (file->pOverlapped && !file->pipe) { + if (file->pOverlapped && !pipe_or_socket) { file->pOverlapped->Offset = (DWORD)file->filePtr; file->pOverlapped->OffsetHigh = (DWORD)(file->filePtr >> 32); } - if (ReadFile(file->filehand, buf, len, - &bytesread, file->pOverlapped)) { + if (file->socket && !file->pOverlapped) { + WSABUF wsaData; + DWORD flags = 0; + + wsaData.buf = (char*) buf; + wsaData.len = (u_long)len; + if (WSARecv((SOCKET)file->filehand, &wsaData, 1, &bytesread, + &flags, NULL, NULL) == SOCKET_ERROR) { + rv = apr_get_netos_error(); + bytesread = 0; + } + else { + rv = APR_SUCCESS; + } + } + else if (ReadFile(file->filehand, buf, len, + &bytesread, file->pOverlapped)) { rv = APR_SUCCESS; } else { @@ -139,7 +157,7 @@ static apr_status_t read_with_timeout(apr_file_t * if (rv == APR_SUCCESS && bytesread == 0) rv = APR_EOF; - if (rv == APR_SUCCESS && file->pOverlapped && !file->pipe) { + if (rv == APR_SUCCESS && file->pOverlapped && !pipe_or_socket) { file->filePtr += bytesread; } *nbytes = bytesread; @@ -410,8 +428,27 @@ APR_DECLARE(apr_status_t) apr_file_write(apr_file_ apr_thread_mutex_unlock(thefile->mutex); } return rv; - } else { - if (thefile->pipe) { + } + else if (thefile->socket && !thefile->pOverlapped) { + WSABUF wsaData; + DWORD flags = 0; + + wsaData.buf = (char*) buf; + wsaData.len = (u_long)*nbytes; + if (WSASend((SOCKET)file->filehand, &wsaData, 1, &bwrote, + flags, NULL, NULL) == SOCKET_ERROR) { + rv = apr_get_netos_error(); + *nbytes = 0; + } + else { + *nbytes = bwrote; + rv = APR_SUCCESS; + } + } + else { + int pipe_or_socket = (thefile->pipe || thefile->socket); + + if (pipe_or_socket) { rv = WriteFile(thefile->filehand, buf, (DWORD)*nbytes, &bwrote, thefile->pOverlapped); } @@ -545,7 +582,7 @@ APR_DECLARE(apr_status_t) apr_file_write(apr_file_ } } } - if (rv == APR_SUCCESS && thefile->pOverlapped && !thefile->pipe) { + if (rv == APR_SUCCESS && thefile->pOverlapped && !pipe_or_socket) { thefile->filePtr += *nbytes; } } Index: include/arch/win32/apr_arch_file_io.h =================================================================== --- include/arch/win32/apr_arch_file_io.h (revision 1895110) +++ include/arch/win32/apr_arch_file_io.h (working copy) @@ -162,7 +162,7 @@ apr_status_t more_finfo(apr_finfo_t *finfo, const struct apr_file_t { apr_pool_t *pool; HANDLE filehand; - BOOLEAN pipe; /* Is this a pipe of a file? */ + BOOLEAN pipe, socket; /* Is this a pipe, a socket or a file? */ OVERLAPPED *pOverlapped; apr_interval_time_t timeout; apr_int32_t flags; Index: poll/os2/pollset.c =================================================================== --- poll/os2/pollset.c (revision 1895110) +++ poll/os2/pollset.c (working copy) @@ -263,17 +263,14 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pol if (rtnevents) { if (i == 0 && pollset->wake_listen != NULL) { + char ch; + apr_size_t len = 1; struct apr_sockaddr_t from_addr; - char buffer[16]; - apr_size_t buflen; - for (;;) { - buflen = sizeof(buffer); - rv = apr_socket_recvfrom(&from_addr, pollset->wake_listen, - MSG_DONTWAIT, buffer, &buflen); - if (rv != APR_SUCCESS) { - break; - } - /* Woken up, drain the pipe still. */ + rv = apr_socket_recvfrom(&from_addr, pollset->wake_listen, + MSG_DONTWAIT, &ch, &len); + if (rv == APR_SUCCESS) { + /* Woken up, senders can fill the pipe again */ + apr_atomic_set32(&pollset->wakeup_set, 0); rc = APR_EINTR; } } @@ -298,12 +295,15 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pol APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset) { - if (pollset->wake_sender) { + if (!pollset->wake_sender) + return APR_EINIT; + + if (apr_atomic_cas32(&pollset->wakeup_set, 1, 0) == 0) { apr_size_t len = 1; return apr_socket_sendto(pollset->wake_sender, pollset->wake_address, 0, "", &len); } - return APR_EINIT; + return APR_SUCCESS; } Index: poll/unix/pollcb.c =================================================================== --- poll/unix/pollcb.c (revision 1895110) +++ poll/unix/pollcb.c (working copy) @@ -214,14 +214,13 @@ APR_DECLARE(apr_status_t) apr_pollcb_poll(apr_poll APR_DECLARE(apr_status_t) apr_pollcb_wakeup(apr_pollcb_t *pollcb) { - if (pollcb->flags & APR_POLLSET_WAKEABLE) { - if (apr_atomic_cas32(&pollcb->wakeup_set, 1, 0) == 0) - return apr_file_putc(1, pollcb->wakeup_pipe[1]); - else - return APR_SUCCESS; - } - else + if (!(pollcb->flags & APR_POLLSET_WAKEABLE)) return APR_EINIT; + + if (apr_atomic_cas32(&pollcb->wakeup_set, 1, 0) == 0) + return apr_file_putc(1, pollcb->wakeup_pipe[1]); + + return APR_SUCCESS; } APR_DECLARE(const char *) apr_pollcb_method_name(apr_pollcb_t *pollcb) Index: poll/unix/pollset.c =================================================================== --- poll/unix/pollset.c (revision 1895110) +++ poll/unix/pollset.c (working copy) @@ -218,14 +218,13 @@ APR_DECLARE(apr_status_t) apr_pollset_destroy(apr_ APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset) { - if (pollset->flags & APR_POLLSET_WAKEABLE) { - if (apr_atomic_cas32(&pollset->wakeup_set, 1, 0) == 0) - return apr_file_putc(1, pollset->wakeup_pipe[1]); - else - return APR_SUCCESS; - } - else + if (!(pollset->flags & APR_POLLSET_WAKEABLE)) return APR_EINIT; + + if (apr_atomic_cas32(&pollset->wakeup_set, 1, 0) == 0) + return apr_file_putc(1, pollset->wakeup_pipe[1]); + + return APR_SUCCESS; } APR_DECLARE(apr_status_t) apr_pollset_add(apr_pollset_t *pollset, Index: poll/unix/wakeup.c =================================================================== --- poll/unix/wakeup.c (revision 1895111) +++ poll/unix/wakeup.c (working copy) @@ -81,8 +81,8 @@ apr_status_t apr_poll_create_wakeup_pipe(apr_pool_ { apr_status_t rv; - if ((rv = apr_file_pipe_create(&wakeup_pipe[0], &wakeup_pipe[1], - pool)) != APR_SUCCESS) + if ((rv = apr_file_pipe_create_ex(&wakeup_pipe[0], &wakeup_pipe[1], + APR_WRITE_BLOCK, pool))) return rv; pfd->p = pool; @@ -137,16 +137,9 @@ apr_status_t apr_poll_close_wakeup_pipe(apr_file_t */ void apr_poll_drain_wakeup_pipe(volatile apr_uint32_t *wakeup_set, apr_file_t **wakeup_pipe) { + char ch; - while (apr_atomic_cas32(wakeup_set, 0, 1) > 0) { - char ch; - /* though we write just one byte to the other end of the pipe - * during wakeup, multiple threads could call the wakeup. - * So simply drain out from the input side of the pipe all - * the data. - */ - if (apr_file_getc(&ch, wakeup_pipe[0]) != APR_SUCCESS) - break; - } + (void)apr_file_getc(&ch, wakeup_pipe[0]); + apr_atomic_set32(wakeup_set, 0); }