- Make read_ready a manual-reset event.

- Signal read_ready in open instead of in the listen_client_thread.

- Don't reset read_ready when the listen_client thread terminates;
  instead do it in close().

- Rearrange open and change its error handling.

- Add a wait_open_pipe method that waits for a pipe instance to be
  available and then calls open_pipe.  Use it when opening a writer if
  we can't connect immediately.  This can happen if the system is
  heavily loaded and/or if many writers are trying to open
  simultaneously.
---
 winsup/cygwin/fhandler.h       |   1 +
 winsup/cygwin/fhandler_fifo.cc | 267 +++++++++++++++++++++------------
 2 files changed, 168 insertions(+), 100 deletions(-)

diff --git a/winsup/cygwin/fhandler.h b/winsup/cygwin/fhandler.h
index 3bc04cf13..2516c93b4 100644
--- a/winsup/cygwin/fhandler.h
+++ b/winsup/cygwin/fhandler.h
@@ -1323,6 +1323,7 @@ class fhandler_fifo: public fhandler_base
   static NTSTATUS npfs_handle (HANDLE &);
   HANDLE create_pipe_instance (bool);
   NTSTATUS open_pipe (HANDLE&);
+  NTSTATUS wait_open_pipe (HANDLE&);
   int add_client_handler ();
   void delete_client_handler (int);
   bool listen_client ();
diff --git a/winsup/cygwin/fhandler_fifo.cc b/winsup/cygwin/fhandler_fifo.cc
index 21faf4ec2..5c3df5497 100644
--- a/winsup/cygwin/fhandler_fifo.cc
+++ b/winsup/cygwin/fhandler_fifo.cc
@@ -222,7 +222,64 @@ fhandler_fifo::open_pipe (HANDLE& ph)
                              openflags & O_CLOEXEC ? 0 : OBJ_INHERIT,
                              npfsh, NULL);
   sharing = FILE_SHARE_READ | FILE_SHARE_WRITE;
-  status = NtOpenFile (&ph, access, &attr, &io, sharing, 0);
+  return NtOpenFile (&ph, access, &attr, &io, sharing, 0);
+}
+
+/* Wait up to 100ms for a pipe instance to be available, then connect. */
+NTSTATUS
+fhandler_fifo::wait_open_pipe (HANDLE& ph)
+{
+  HANDLE npfsh;
+  HANDLE evt;
+  NTSTATUS status;
+  IO_STATUS_BLOCK io;
+  ULONG pwbuf_size;
+  PFILE_PIPE_WAIT_FOR_BUFFER pwbuf;
+  LONGLONG stamp;
+  LONGLONG orig_timeout = -100 * NS100PERSEC / MSPERSEC;   /* 100ms */
+
+  status = npfs_handle (npfsh);
+  if (!NT_SUCCESS (status))
+    return status;
+  if (!(evt = create_event ()))
+    api_fatal ("Can't create event, %E");
+  pwbuf_size
+    = offsetof (FILE_PIPE_WAIT_FOR_BUFFER, Name) + get_pipe_name ()->Length;
+  pwbuf = (PFILE_PIPE_WAIT_FOR_BUFFER) alloca (pwbuf_size);
+  pwbuf->Timeout.QuadPart = orig_timeout;
+  pwbuf->NameLength = get_pipe_name ()->Length;
+  pwbuf->TimeoutSpecified = TRUE;
+  memcpy (pwbuf->Name, get_pipe_name ()->Buffer, get_pipe_name ()->Length);
+  stamp = get_clock (CLOCK_MONOTONIC)->n100secs ();
+  bool retry;
+  do
+    {
+      retry = false;
+      status = NtFsControlFile (npfsh, evt, NULL, NULL, &io, FSCTL_PIPE_WAIT,
+                               pwbuf, pwbuf_size, NULL, 0);
+      if (status == STATUS_PENDING)
+       {
+         if (WaitForSingleObject (evt, INFINITE) == WAIT_OBJECT_0)
+           status = io.Status;
+         else
+           api_fatal ("WFSO failed, %E");
+       }
+      if (NT_SUCCESS (status))
+       status = open_pipe (ph);
+      if (STATUS_PIPE_NO_INSTANCE_AVAILABLE (status))
+       {
+         /* Another writer has grabbed the pipe instance.  Adjust
+            the timeout and keep waiting if there's time left. */
+         pwbuf->Timeout.QuadPart = orig_timeout
+           + get_clock (CLOCK_MONOTONIC)->n100secs () - stamp;
+         if (pwbuf->Timeout.QuadPart < 0)
+           retry = true;
+         else
+           status = STATUS_IO_TIMEOUT;
+       }
+    }
+  while (retry);
+  NtClose (evt);
   return status;
 }
 
@@ -294,7 +351,6 @@ void
 fhandler_fifo::record_connection (fifo_client_handler& fc,
                                  fifo_client_connect_state s)
 {
-  SetEvent (write_ready);
   fc.state = s;
   maybe_eof (false);
   ResetEvent (writer_opening);
@@ -327,9 +383,6 @@ fhandler_fifo::listen_client_thread ()
       if (add_client_handler () < 0)
        api_fatal ("Can't add a client handler, %E");
 
-      /* Allow a writer to open. */
-      SetEvent (read_ready);
-
       /* Listen for a writer to connect to the new client handler. */
       fifo_client_handler& fc = fc_handler[nhandlers - 1];
       NTSTATUS status;
@@ -405,19 +458,13 @@ fhandler_fifo::listen_client_thread ()
 out:
   if (conn_evt)
     NtClose (conn_evt);
-  ResetEvent (read_ready);
   return 0;
 }
 
 int
 fhandler_fifo::open (int flags, mode_t)
 {
-  enum
-  {
-   success,
-   error_errno_set,
-   error_set_errno
-  } res;
+  int saved_errno = 0;
 
   if (flags & O_PATH)
     return open_fs (flags);
@@ -437,8 +484,7 @@ fhandler_fifo::open (int flags, mode_t)
       break;
     default:
       set_errno (EINVAL);
-      res = error_errno_set;
-      goto out;
+      goto err;
     }
 
   debug_only_printf ("reader %d, writer %d, duplexer %d", reader, writer, 
duplexer);
@@ -454,135 +500,151 @@ fhandler_fifo::open (int flags, mode_t)
 
   char npbuf[MAX_PATH];
   __small_sprintf (npbuf, "r-event.%08x.%016X", get_dev (), get_ino ());
-  if (!(read_ready = CreateEvent (sa_buf, false, false, npbuf)))
+  if (!(read_ready = CreateEvent (sa_buf, true, false, npbuf)))
     {
       debug_printf ("CreateEvent for %s failed, %E", npbuf);
-      res = error_set_errno;
-      goto out;
+      __seterrno ();
+      goto err;
     }
   npbuf[0] = 'w';
   if (!(write_ready = CreateEvent (sa_buf, true, false, npbuf)))
     {
       debug_printf ("CreateEvent for %s failed, %E", npbuf);
-      res = error_set_errno;
-      goto out;
+      __seterrno ();
+      goto err_close_read_ready;
     }
   npbuf[0] = 'o';
   if (!(writer_opening = CreateEvent (sa_buf, true, false, npbuf)))
     {
       debug_printf ("CreateEvent for %s failed, %E", npbuf);
-      res = error_set_errno;
-      goto out;
-    }
-
-  /* If we're a duplexer, create the pipe and the first client handler. */
-  if (duplexer)
-    {
-      HANDLE ph = NULL;
-
-      if (add_client_handler () < 0)
-       {
-         res = error_errno_set;
-         goto out;
-       }
-      NTSTATUS status = open_pipe (ph);
-      if (NT_SUCCESS (status))
-       {
-         record_connection (fc_handler[0]);
-         set_handle (ph);
-         set_pipe_non_blocking (ph, flags & O_NONBLOCK);
-       }
-      else
-       {
-         __seterrno_from_nt_status (status);
-         res = error_errno_set;
-         goto out;
-       }
+      __seterrno ();
+      goto err_close_write_ready;
     }
 
-  /* If we're reading, start the listen_client thread (which should
-     signal read_ready), and wait for a writer. */
+  /* If we're reading, signal read_ready and start the listen_client
+     thread. */
   if (reader)
     {
       if (!listen_client ())
        {
          debug_printf ("create of listen_client thread failed");
-         res = error_errno_set;
-         goto out;
+         goto err_close_writer_opening;
        }
-      else if (!duplexer && !wait (write_ready))
-       {
-         res = error_errno_set;
-         goto out;
-       }
-      else
+      SetEvent (read_ready);
+
+      /* If we're a duplexer, we need a handle for writing. */
+      if (duplexer)
        {
-         init_fixup_before ();
-         res = success;
+         HANDLE ph = NULL;
+         NTSTATUS status;
+
+         while (1)
+           {
+             status = open_pipe (ph);
+             if (NT_SUCCESS (status))
+               {
+                 set_handle (ph);
+                 set_pipe_non_blocking (ph, flags & O_NONBLOCK);
+                 break;
+               }
+             else if (status == STATUS_OBJECT_NAME_NOT_FOUND)
+               {
+                 /* The pipe hasn't been created yet. */
+                 yield ();
+                 continue;
+               }
+             else
+               {
+                 __seterrno_from_nt_status (status);
+                 goto err_close_reader;
+               }
+           }
        }
+      /* Not a duplexer; wait for a writer to connect. */
+      else if (!wait (write_ready))
+       goto err_close_reader;
+      init_fixup_before ();
+      goto success;
     }
 
-  /* If we're writing, wait for read_ready and then connect to the
-     pipe.  This should always succeed quickly if the reader's
-     listen_client thread is running.  Then signal write_ready.  */
+  /* If we're writing, wait for read_ready, connect to the pipe, and
+     signal write_ready.  */
   if (writer)
     {
+      NTSTATUS status;
+
       SetEvent (writer_opening);
+      if (!wait (read_ready))
+       {
+         ResetEvent (writer_opening);
+         goto err_close_writer_opening;
+       }
       while (1)
        {
-         if (!wait (read_ready))
-           {
-             ResetEvent (writer_opening);
-             res = error_errno_set;
-             goto out;
-           }
-         NTSTATUS status = open_pipe (get_handle ());
+         status = open_pipe (get_handle ());
          if (NT_SUCCESS (status))
+           goto writer_success;
+         else if (status == STATUS_OBJECT_NAME_NOT_FOUND)
            {
-             set_pipe_non_blocking (get_handle (), flags & O_NONBLOCK);
-             SetEvent (write_ready);
-             res = success;
-             goto out;
+             /* The pipe hasn't been created yet. */
+             yield ();
+             continue;
            }
          else if (STATUS_PIPE_NO_INSTANCE_AVAILABLE (status))
-           Sleep (1);
+           break;
          else
            {
              debug_printf ("create of writer failed");
              __seterrno_from_nt_status (status);
-             res = error_errno_set;
              ResetEvent (writer_opening);
-             goto out;
+             goto err_close_writer_opening;
            }
        }
-    }
-out:
-  if (res == error_set_errno)
-    __seterrno ();
-  if (res != success)
-    {
-      if (read_ready)
-       {
-         NtClose (read_ready);
-         read_ready = NULL;
-       }
-      if (write_ready)
-       {
-         NtClose (write_ready);
-         write_ready = NULL;
-       }
-      if (writer_opening)
+
+      /* We should get here only if the system is heavily loaded
+        and/or many writers are trying to connect simultaneously */
+      while (1)
        {
-         NtClose (writer_opening);
-         writer_opening = NULL;
+         SetEvent (writer_opening);
+         if (!wait (read_ready))
+           {
+             ResetEvent (writer_opening);
+             goto err_close_writer_opening;
+           }
+         status = wait_open_pipe (get_handle ());
+         if (NT_SUCCESS (status))
+           goto writer_success;
+         else if (status == STATUS_IO_TIMEOUT)
+           continue;
+         else
+           {
+             debug_printf ("create of writer failed");
+             __seterrno_from_nt_status (status);
+             ResetEvent (writer_opening);
+             goto err_close_writer_opening;
+           }
        }
-      if (get_handle ())
-       NtClose (get_handle ());
-      if (listen_client_thr)
-       stop_listen_client ();
     }
-  debug_printf ("res %d", res);
-  return res == success;
+writer_success:
+  set_pipe_non_blocking (get_handle (), flags & O_NONBLOCK);
+  SetEvent (write_ready);
+success:
+  return 1;
+err_close_reader:
+  saved_errno = get_errno ();
+  close ();
+  set_errno (saved_errno);
+  return 0;
+err_close_writer_opening:
+  NtClose (writer_opening);
+err_close_write_ready:
+  NtClose (write_ready);
+err_close_read_ready:
+  NtClose (read_ready);
+err:
+  if (get_handle ())
+    NtClose (get_handle ());
+  return 0;
 }
 
 off_t
@@ -938,6 +1000,11 @@ fhandler_fifo::close ()
      handler or another thread. */
   fifo_client_unlock ();
   stop_listen_client ();
+  if (reader)
+    /* FIXME: There could be several readers open because of
+       dup/fork/exec; we should only reset read_ready when the last
+       one closes. */
+    ResetEvent (read_ready);
   if (read_ready)
     NtClose (read_ready);
   if (write_ready)
-- 
2.21.0

Reply via email to