On Wed, Nov 17, 2021 at 8:10 PM Mladen Turk <mt...@apache.org> wrote:
>
> If only a single thread can call apr_poll_drain_wakeup_pipe
> it shoud be OK.

Calling apr_poll{set,cb}_poll() concurrently on the same pollset is
quite undefined behaviour with our implementation(s), so I think we
are OK here.

> However I'd rather check if there is
> something to read before calling apr_file_getc since it'll block.

I restored nonblocking on unix in my patch already, here is a new one
that does the same on Windows (I still can't compile/test it though,
likewise for the OS/2 bits..).

This new patch does not blindly set nonblocking for the pipe-socket
(like r1894917) but adds a new ->socket boolean to Windows' struct
apr_file_t (similar to the existing ->pipe flag), such that
file_socket_pipe_create() can now create a nonblocking socket (via
create_socket_pipe) for the read side of the pipe and apr_file_read()
can do the right thing (i.e. WSARecv/WSASend) when ->socket is set.

Does it work for you?

Regards;
Yann.
Index: file_io/win32/pipe.c
===================================================================
--- file_io/win32/pipe.c	(revision 1895110)
+++ file_io/win32/pipe.c	(working copy)
@@ -354,6 +354,7 @@ static apr_status_t create_socket_pipe(SOCKET *rd,
     for (;;) {
         int ns;
         int nc = 0;
+        apr_size_t nb = 0;
         /* Listening socket is nonblocking by now.
          * The accept should create the socket
          * immediatelly because we are connected already.
@@ -378,32 +379,28 @@ static apr_status_t create_socket_pipe(SOCKET *rd,
             rv =  apr_get_netos_error();
             goto cleanup;
         }
-        /* Verify the connection by reading the send identification.
-         */
-        do {
-            if (nc++)
-                Sleep(1);
-            nrd = recv(*rd, (char *)iid, sizeof(iid), 0);
-            rv = nrd == SOCKET_ERROR ? apr_get_netos_error() : APR_SUCCESS;
-        } while (APR_STATUS_IS_EAGAIN(rv));
-
-        if (nrd == sizeof(iid)) {
-            if (memcmp(uid, iid, sizeof(uid)) == 0) {
-                /* Wow, we recived what we send.
-                 * Put read side of the pipe to the blocking
-                 * mode and return.
-                 */
-                bm = 0;
-                if (ioctlsocket(*rd, FIONBIO, &bm) == SOCKET_ERROR) {
-                    rv = apr_get_netos_error();
-                    goto cleanup;
-                }
-                break;
-            }
+        /* Verify the connection by reading/waiting for the identification */
+        bm = 0;
+        if (ioctlsocket(*rd, FIONBIO, &bm) == SOCKET_ERROR) {
+            rv = apr_get_netos_error();
+            goto cleanup;
         }
-        else if (nrd == SOCKET_ERROR) {
+        nrd = recv(*rd, (char *)iid, sizeof(iid), 0);
+        if (nrd == SOCKET_ERROR) {
+            rv = apr_get_netos_error();
             goto cleanup;
         }
+        if (nrd == (int)sizeof(uid) && memcmp(iid, uid, sizeof(uid)) == 0) {
+            /* Wow we received what we send, put read side of the pipe in
+             * nonblocking mode and return.
+             */
+            bm = 1;
+            if (ioctlsocket(*rd, FIONBIO, &bm) == SOCKET_ERROR) {
+                rv = apr_get_netos_error();
+                goto cleanup;
+            }
+            break;
+        }
         closesocket(*rd);
     }
     /* We don't need the listening socket any more */
@@ -448,8 +445,8 @@ apr_status_t file_socket_pipe_create(apr_file_t **
     (*in) = (apr_file_t *)apr_pcalloc(p, sizeof(apr_file_t));
     (*in)->pool = p;
     (*in)->fname = NULL;
-    (*in)->pipe = 1;
-    (*in)->timeout = -1;
+    (*in)->socket = 1;
+    (*in)->timeout = 0; /* nonblocking reads */
     (*in)->ungetchar = -1;
     (*in)->eof_hit = 0;
     (*in)->filePtr = 0;
@@ -456,13 +453,13 @@ apr_status_t file_socket_pipe_create(apr_file_t **
     (*in)->bufpos = 0;
     (*in)->dataRead = 0;
     (*in)->direction = 0;
-    (*in)->pOverlapped = (OVERLAPPED*)apr_pcalloc(p, sizeof(OVERLAPPED));
+    (*in)->pOverlapped = NULL;
     (*in)->filehand = (HANDLE)rd;
 
     (*out) = (apr_file_t *)apr_pcalloc(p, sizeof(apr_file_t));
     (*out)->pool = p;
     (*out)->fname = NULL;
-    (*out)->pipe = 1;
+    (*out)->socket = 1;
     (*out)->timeout = -1;
     (*out)->ungetchar = -1;
     (*out)->eof_hit = 0;
@@ -470,7 +467,7 @@ apr_status_t file_socket_pipe_create(apr_file_t **
     (*out)->bufpos = 0;
     (*out)->dataRead = 0;
     (*out)->direction = 0;
-    (*out)->pOverlapped = (OVERLAPPED*)apr_pcalloc(p, sizeof(OVERLAPPED));
+    (*out)->pOverlapped = NULL;
     (*out)->filehand = (HANDLE)wr;
 
     apr_pool_cleanup_register(p, (void *)(*in), socket_pipe_cleanup,
@@ -484,7 +481,7 @@ apr_status_t file_socket_pipe_create(apr_file_t **
 apr_status_t file_socket_pipe_close(apr_file_t *file)
 {
     apr_status_t stat;
-    if (!file->pipe)
+    if (!file->socket)
         return apr_file_close(file);
     if ((stat = socket_pipe_cleanup(file)) == APR_SUCCESS) {
         apr_pool_cleanup_kill(file->pool, file, socket_pipe_cleanup);
Index: file_io/win32/readwrite.c
===================================================================
--- file_io/win32/readwrite.c	(revision 1895110)
+++ file_io/win32/readwrite.c	(working copy)
@@ -20,10 +20,12 @@
 #include "apr_strings.h"
 #include "apr_lib.h"
 #include "apr_errno.h"
-#include <malloc.h>
+#include "apr_arch_networkio.h"
 #include "apr_arch_atime.h"
 #include "apr_arch_misc.h"
 
+#include <malloc.h>
+
 /*
  * read_with_timeout() 
  * Uses async i/o to emulate unix non-blocking i/o with timeouts.
@@ -31,6 +33,7 @@
 static apr_status_t read_with_timeout(apr_file_t *file, void *buf, apr_size_t len_in, apr_size_t *nbytes)
 {
     apr_status_t rv;
+    int pipe_or_socket = (file->pipe || file->socket);
     DWORD len = (DWORD)len_in;
     DWORD bytesread = 0;
 
@@ -67,13 +70,28 @@ static apr_status_t read_with_timeout(apr_file_t *
         }
     }
 
-    if (file->pOverlapped && !file->pipe) {
+    if (file->pOverlapped && !pipe_or_socket) {
         file->pOverlapped->Offset     = (DWORD)file->filePtr;
         file->pOverlapped->OffsetHigh = (DWORD)(file->filePtr >> 32);
     }
 
-    if (ReadFile(file->filehand, buf, len, 
-                 &bytesread, file->pOverlapped)) {
+    if (file->socket && !file->pOverlapped) {
+        WSABUF wsaData;
+        DWORD flags = 0;
+
+        wsaData.buf = (char*) buf;
+        wsaData.len = (u_long)len;
+        if (WSARecv((SOCKET)file->filehand, &wsaData, 1, &bytesread,
+                    &flags, NULL, NULL) == SOCKET_ERROR) {
+            rv = apr_get_netos_error();
+            bytesread = 0;
+        }
+        else {
+            rv = APR_SUCCESS;
+        }
+    }
+    else if (ReadFile(file->filehand, buf, len, 
+                      &bytesread, file->pOverlapped)) {
         rv = APR_SUCCESS;
     }
     else {
@@ -139,7 +157,7 @@ static apr_status_t read_with_timeout(apr_file_t *
     if (rv == APR_SUCCESS && bytesread == 0)
         rv = APR_EOF;
     
-    if (rv == APR_SUCCESS && file->pOverlapped && !file->pipe) {
+    if (rv == APR_SUCCESS && file->pOverlapped && !pipe_or_socket) {
         file->filePtr += bytesread;
     }
     *nbytes = bytesread;
@@ -410,8 +428,27 @@ APR_DECLARE(apr_status_t) apr_file_write(apr_file_
             apr_thread_mutex_unlock(thefile->mutex);
         }
         return rv;
-    } else {
-        if (thefile->pipe) {
+    }
+    else if (thefile->socket && !thefile->pOverlapped) {
+        WSABUF wsaData;
+        DWORD flags = 0;
+
+        wsaData.buf = (char*) buf;
+        wsaData.len = (u_long)*nbytes;
+        if (WSASend((SOCKET)file->filehand, &wsaData, 1, &bwrote,
+                    flags, NULL, NULL) == SOCKET_ERROR) {
+            rv = apr_get_netos_error();
+            *nbytes = 0;
+        }
+        else {
+            *nbytes = bwrote;
+            rv = APR_SUCCESS;
+        }
+    }
+    else {
+        int pipe_or_socket = (thefile->pipe || thefile->socket);
+
+        if (pipe_or_socket) {
             rv = WriteFile(thefile->filehand, buf, (DWORD)*nbytes, &bwrote,
                            thefile->pOverlapped);
         }
@@ -545,7 +582,7 @@ APR_DECLARE(apr_status_t) apr_file_write(apr_file_
                 }
             }
         }
-        if (rv == APR_SUCCESS && thefile->pOverlapped && !thefile->pipe) {
+        if (rv == APR_SUCCESS && thefile->pOverlapped && !pipe_or_socket) {
             thefile->filePtr += *nbytes;
         }
     }
Index: include/arch/win32/apr_arch_file_io.h
===================================================================
--- include/arch/win32/apr_arch_file_io.h	(revision 1895110)
+++ include/arch/win32/apr_arch_file_io.h	(working copy)
@@ -162,7 +162,7 @@ apr_status_t more_finfo(apr_finfo_t *finfo, const
 struct apr_file_t {
     apr_pool_t *pool;
     HANDLE filehand;
-    BOOLEAN pipe;              /* Is this a pipe of a file? */
+    BOOLEAN pipe, socket;      /* Is this a pipe, a socket or a file? */
     OVERLAPPED *pOverlapped;
     apr_interval_time_t timeout;
     apr_int32_t flags;
Index: poll/os2/pollset.c
===================================================================
--- poll/os2/pollset.c	(revision 1895110)
+++ poll/os2/pollset.c	(working copy)
@@ -263,17 +263,14 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pol
 
         if (rtnevents) {
             if (i == 0 && pollset->wake_listen != NULL) {
+                char ch;
+                apr_size_t len = 1;
                 struct apr_sockaddr_t from_addr;
-                char buffer[16];
-                apr_size_t buflen;
-                for (;;) {
-                    buflen = sizeof(buffer);
-                    rv = apr_socket_recvfrom(&from_addr, pollset->wake_listen,
-                                             MSG_DONTWAIT, buffer, &buflen);
-                    if (rv != APR_SUCCESS) {
-                        break;
-                    }
-                    /* Woken up, drain the pipe still. */
+                rv = apr_socket_recvfrom(&from_addr, pollset->wake_listen,
+                                          MSG_DONTWAIT, &ch, &len);
+                if (rv == APR_SUCCESS) {
+                    /* Woken up, senders can fill the pipe again */
+                    apr_atomic_set32(&pollset->wakeup_set, 0);
                     rc = APR_EINTR;
                 }
             }
@@ -298,12 +295,15 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pol
 
 APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
 {
-    if (pollset->wake_sender) {
+    if (!pollset->wake_sender)
+        return APR_EINIT;
+
+    if (apr_atomic_cas32(&pollset->wakeup_set, 1, 0) == 0) {
         apr_size_t len = 1;
         return apr_socket_sendto(pollset->wake_sender, pollset->wake_address, 0, "", &len);
     }
 
-    return APR_EINIT;
+    return APR_SUCCESS;
 }
 
 
Index: poll/unix/pollcb.c
===================================================================
--- poll/unix/pollcb.c	(revision 1895110)
+++ poll/unix/pollcb.c	(working copy)
@@ -214,14 +214,13 @@ APR_DECLARE(apr_status_t) apr_pollcb_poll(apr_poll
 
 APR_DECLARE(apr_status_t) apr_pollcb_wakeup(apr_pollcb_t *pollcb)
 {
-    if (pollcb->flags & APR_POLLSET_WAKEABLE) {
-        if (apr_atomic_cas32(&pollcb->wakeup_set, 1, 0) == 0)
-            return apr_file_putc(1, pollcb->wakeup_pipe[1]);
-        else
-           return APR_SUCCESS;
-    }
-    else
+    if (!(pollcb->flags & APR_POLLSET_WAKEABLE))
         return APR_EINIT;
+
+    if (apr_atomic_cas32(&pollcb->wakeup_set, 1, 0) == 0)
+        return apr_file_putc(1, pollcb->wakeup_pipe[1]);
+
+    return APR_SUCCESS;
 }
 
 APR_DECLARE(const char *) apr_pollcb_method_name(apr_pollcb_t *pollcb)
Index: poll/unix/pollset.c
===================================================================
--- poll/unix/pollset.c	(revision 1895110)
+++ poll/unix/pollset.c	(working copy)
@@ -218,14 +218,13 @@ APR_DECLARE(apr_status_t) apr_pollset_destroy(apr_
 
 APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
 {
-    if (pollset->flags & APR_POLLSET_WAKEABLE) {
-        if (apr_atomic_cas32(&pollset->wakeup_set, 1, 0) == 0)
-            return apr_file_putc(1, pollset->wakeup_pipe[1]);
-        else
-           return APR_SUCCESS;
-    }
-    else
+    if (!(pollset->flags & APR_POLLSET_WAKEABLE))
         return APR_EINIT;
+
+    if (apr_atomic_cas32(&pollset->wakeup_set, 1, 0) == 0)
+        return apr_file_putc(1, pollset->wakeup_pipe[1]);
+
+    return APR_SUCCESS;
 }
 
 APR_DECLARE(apr_status_t) apr_pollset_add(apr_pollset_t *pollset,
Index: poll/unix/wakeup.c
===================================================================
--- poll/unix/wakeup.c	(revision 1895111)
+++ poll/unix/wakeup.c	(working copy)
@@ -81,8 +81,8 @@ apr_status_t apr_poll_create_wakeup_pipe(apr_pool_
 {
     apr_status_t rv;
 
-    if ((rv = apr_file_pipe_create(&wakeup_pipe[0], &wakeup_pipe[1],
-                                   pool)) != APR_SUCCESS)
+    if ((rv = apr_file_pipe_create_ex(&wakeup_pipe[0], &wakeup_pipe[1],
+                                      APR_WRITE_BLOCK, pool)))
         return rv;
 
     pfd->p = pool;
@@ -137,16 +137,9 @@ apr_status_t apr_poll_close_wakeup_pipe(apr_file_t
  */
 void apr_poll_drain_wakeup_pipe(volatile apr_uint32_t *wakeup_set, apr_file_t **wakeup_pipe)
 {
+    char ch;
 
-    while (apr_atomic_cas32(wakeup_set, 0, 1) > 0) {
-        char ch;
-        /* though we write just one byte to the other end of the pipe
-         * during wakeup, multiple threads could call the wakeup.
-         * So simply drain out from the input side of the pipe all
-         * the data.
-         */
-        if (apr_file_getc(&ch, wakeup_pipe[0]) != APR_SUCCESS)
-            break;
-    }
+    (void)apr_file_getc(&ch, wakeup_pipe[0]);
+    apr_atomic_set32(wakeup_set, 0);
 }
 

Reply via email to