Ok, I'll include that in the next version.
Mind if I respin the series after you take a look over the rest of the patches?

> From: Guru Shetty [mailto:g...@ovn.org] 
> Sent: Tuesday, January 3, 2017 9:09 PM
> To: Alin Balutoiu <abalut...@cloudbasesolutions.com>
> Cc: d...@openvswitch.org
> Subject: Re: [ovs-dev] [PATCH V3 2/5] Python tests: Ported UNIX sockets to 
> Windows
> 
> 
> 
> On 3 January 2017 at 11:03, Alin Balutoiu 
> <mailto:abalut...@cloudbasesolutions.com> wrote:
> Thanks for the comment.
> The socket attribute of the class Stream cannot be None if the code runs on 
> Unix.
> Therefore the condition "self.socket is not None" is True only if sockets are 
> not used,
> and named pipes are used instead (i.e. it runs on Windows).
> 
> However, if you prefer I can replace it with the following code:
> I like it better with the below piece. So please go ahead and send it as part 
> of the next version.
>  
>     if self.socket is not None:
>         retval = ovs.socket_util.check_connection_completion(self.socket)
>         assert retval != errno.EINPROGRESS
>     elif sys.platform == 'win32':
>         if self.retry_connect:
>             try:
>                 self.pipe = winutils.create_file(self._pipename)
>                 self._retry_connect = False
>                 retval = 0
>             except pywintypes.error as e:
>                 if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
>                     retval = errno.EAGAIN
>                 else:
>                     self._retry_connect = False
>                     retval = errno.ENOENT
>         else:
>             # If retry_connect is false, it means it's already
>             # connected so we can set the value of retval to 0
>             retval = 0
> 
> 
> > From: Guru Shetty [mailto:mailto:g...@ovn.org]
> > Sent: Tuesday, January 3, 2017 8:17 PM
> > To: Alin Balutoiu <mailto:abalut...@cloudbasesolutions.com>
> > Cc: mailto:d...@openvswitch.org
> > Subject: Re: [ovs-dev] [PATCH V3 2/5] Python tests: Ported UNIX sockets to 
> > Windows
> >
> >
> >
> > On 3 January 2017 at 08:46, Alin Balutoiu 
> > <mailto:mailto:abalut...@cloudbasesolutions.com> wrote:
> > From: Alin Balutoiu <mailto:mailto:abalut...@cloudbasesolutions.com>
> >
> > Unix sockets (AF_UNIX) are not supported on Windows.
> > The replacement of Unix sockets on Windows is implemented
> > using named pipes, we are trying to mimic the behaviour
> > of unix sockets.
> >
> > Instead of using Unix sockets to communicate
> > between components Named Pipes are used. This
> > makes the python sockets compatible with the
> > Named Pipe used in Windows applications.
> >
> > Signed-off-by: Paul-Daniel Boca <mailto:mailto:pb...@cloudbasesolutions.com>
> > Signed-off-by: Alin Balutoiu 
> > <mailto:mailto:abalut...@cloudbasesolutions.com>
> > Acked-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions>
> > Tested-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions>
> > ---
> > V2: No changes.
> > V3: Changed Signed-off-by name and added previous Acked-by's, Tested-by's.
> >
> > I intend to add the following diff:
> >
> > diff --git a/python/ovs/stream.py b/python/ovs/stream.py
> > index e8e5700..af35afd 100644
> > --- a/python/ovs/stream.py
> > +++ b/python/ovs/stream.py
> > @@ -251,7 +251,7 @@ class Stream(object):
> >                  else:
> >                      self._retry_connect = False
> >                      retval = errno.ENOENT
> > -        else:
> > +        elif sys.platform == 'win32':
> >              # Windows only, if retry_connect is false, it means it's 
> > already
> >              # connected so we can set the value of retval to 0
> >              retval = 0
> >
> >
> > ---
> >  python/ovs/jsonrpc.py        |   6 +
> >  python/ovs/poller.py         |  72 ++++++---
> >  python/ovs/socket_util.py    |  31 +++-
> >  python/ovs/stream.py         | 351 
> > ++++++++++++++++++++++++++++++++++++++++---
> >  python/ovs/unixctl/server.py |   4 +
> >  tests/test-jsonrpc.py        |  16 +-
> >  6 files changed, 436 insertions(+), 44 deletions(-)
> >
> > diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py
> > index 6300c67..5a11500 100644
> > --- a/python/ovs/jsonrpc.py
> > +++ b/python/ovs/jsonrpc.py
> > @@ -14,6 +14,7 @@
> >
> >  import errno
> >  import os
> > +import sys
> >
> >  import six
> >
> > @@ -274,6 +275,11 @@ class Connection(object):
> >                      except UnicodeError:
> >                          error = errno.EILSEQ
> >                  if error:
> > +                    if (sys.platform == "win32" and
> > +                            error == errno.WSAEWOULDBLOCK):
> > +                        # WSAEWOULDBLOCK would be the equivalent on Windows
> > +                        # for EAGAIN on Unix.
> > +                        error = errno.EAGAIN
> >                      if error == errno.EAGAIN:
> >                          return error, None
> >                      else:
> > diff --git a/python/ovs/poller.py b/python/ovs/poller.py
> > index d7cb7d3..d836483 100644
> > --- a/python/ovs/poller.py
> > +++ b/python/ovs/poller.py
> > @@ -18,6 +18,10 @@ import ovs.vlog
> >  import select
> >  import socket
> >  import os
> > +import sys
> > +
> > +if sys.platform == "win32":
> > +    import ovs.winutils as winutils
> >
> >  try:
> >      from OpenSSL import SSL
> > @@ -62,7 +66,9 @@ class _SelectSelect(object):
> >          if SSL and isinstance(fd, SSL.Connection):
> >              fd = fd.fileno()
> >
> > -        assert isinstance(fd, int)
> > +        if sys.platform != 'win32':
> > +            # Skip this on Windows, it also register events
> > +            assert isinstance(fd, int)
> >          if events & POLLIN:
> >              self.rlist.append(fd)
> >              events &= ~POLLIN
> > @@ -73,28 +79,58 @@ class _SelectSelect(object):
> >              self.xlist.append(fd)
> >
> >      def poll(self, timeout):
> > -        if timeout == -1:
> > -            # epoll uses -1 for infinite timeout, select uses None.
> > -            timeout = None
> > -        else:
> > -            timeout = float(timeout) / 1000
> >          # XXX workaround a bug in eventlet
> >          # see https://github.com/eventlet/eventlet/pull/25
> >          if timeout == 0 and _using_eventlet_green_select():
> >              timeout = 0.1
> > +        if sys.platform == 'win32':
> > +            events = self.rlist + self.wlist + self.xlist
> > +            if not events:
> > +                return []
> > +            if len(events) > winutils.win32event.MAXIMUM_WAIT_OBJECTS:
> > +                raise WindowsError("Cannot handle more than maximum wait"
> > +                                   "objects\n")
> > +
> > +            # win32event.INFINITE timeout is -1
> > +            # timeout must be an int number, expressed in ms
> > +            if timeout == 0.1:
> > +                timeout = 100
> > +            else:
> > +                timeout = int(timeout)
> > +
> > +            # Wait until any of the events is set to signaled
> > +            try:
> > +                retval = winutils.win32event.WaitForMultipleObjects(
> > +                    events,
> > +                    False,  # Wait all
> > +                    timeout)
> > +            except winutils.pywintypes.error:
> > +                    return [(0, POLLERR)]
> >
> > -        rlist, wlist, xlist = select.select(self.rlist, self.wlist, 
> > self.xlist,
> > -                                            timeout)
> > -        events_dict = {}
> > -        for fd in rlist:
> > -            events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> > -        for fd in wlist:
> > -            events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> > -        for fd in xlist:
> > -            events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> > -                                                        POLLHUP |
> > -                                                        POLLNVAL)
> > -        return list(events_dict.items())
> > +            if retval == winutils.winerror.WAIT_TIMEOUT:
> > +                return []
> > +
> > +            return [(events[retval], 0)]
> > +        else:
> > +            if timeout == -1:
> > +                # epoll uses -1 for infinite timeout, select uses None.
> > +                timeout = None
> > +            else:
> > +                timeout = float(timeout) / 1000
> > +            rlist, wlist, xlist = select.select(self.rlist,
> > +                                                self.wlist,
> > +                                                self.xlist,
> > +                                                timeout)
> > +            events_dict = {}
> > +            for fd in rlist:
> > +                events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> > +            for fd in wlist:
> > +                events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> > +            for fd in xlist:
> > +                events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> > +                                                            POLLHUP |
> > +                                                            POLLNVAL)
> > +            return list(events_dict.items())
> >
> >
> >  SelectPoll = _SelectSelect
> > diff --git a/python/ovs/socket_util.py b/python/ovs/socket_util.py
> > index b358b05..fb6cee4 100644
> > --- a/python/ovs/socket_util.py
> > +++ b/python/ovs/socket_util.py
> > @@ -17,6 +17,7 @@ import os
> >  import os.path
> >  import random
> >  import socket
> > +import sys
> >
> >  import six
> >  from six.moves import range
> > @@ -25,6 +26,10 @@ import ovs.fatal_signal
> >  import ovs.poller
> >  import ovs.vlog
> >
> > +if sys.platform == 'win32':
> > +    import ovs.winutils as winutils
> > +    import win32file
> > +
> >  vlog = ovs.vlog.Vlog("socket_util")
> >
> >
> > @@ -158,7 +163,17 @@ def make_unix_socket(style, nonblock, bind_path, 
> > connect_path, short=False):
> >
> >  def check_connection_completion(sock):
> >      p = ovs.poller.SelectPoll()
> > -    p.register(sock, ovs.poller.POLLOUT)
> > +    if sys.platform == "win32":
> > +        event = winutils.get_new_event(None, False, True, None)
> > +        # Receive notification of readiness for writing, of completed
> > +        # connection or multipoint join operation, and of socket closure.
> > +        win32file.WSAEventSelect(sock, event,
> > +                                 win32file.FD_WRITE |
> > +                                 win32file.FD_CONNECT |
> > +                                 win32file.FD_CLOSE)
> > +        p.register(event, ovs.poller.POLLOUT)
> > +    else:
> > +        p.register(sock, ovs.poller.POLLOUT)
> >      pfds = p.poll(0)
> >      if len(pfds) == 1:
> >          revents = pfds[0][1]
> > @@ -228,7 +243,12 @@ def inet_open_active(style, target, default_port, 
> > dscp):
> >          try:
> >              sock.connect(address)
> >          except socket.error as e:
> > -            if get_exception_errno(e) != errno.EINPROGRESS:
> > +            error = get_exception_errno(e)
> > +            if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
> > +                # WSAEWOULDBLOCK would be the equivalent on Windows
> > +                # for EINPROGRESS on Unix.
> > +                error = errno.EINPROGRESS
> > +            if error != errno.EINPROGRESS:
> >                  raise
> >          return 0, sock
> >      except socket.error as e:
> > @@ -257,9 +277,12 @@ def get_null_fd():
> >      global null_fd
> >      if null_fd < 0:
> >          try:
> > -            null_fd = os.open("/dev/null", os.O_RDWR)
> > +            # os.devnull ensures compatibility with Windows, returns
> > +            # '/dev/null' for Unix and 'nul' for Windows
> > +            null_fd = os.open(os.devnull, os.O_RDWR)
> >          except OSError as e:
> > -            vlog.err("could not open /dev/null: %s" % os.strerror(e.errno))
> > +            vlog.err("could not open %s: %s" % (os.devnull,
> > +                                                os.strerror(e.errno)))
> >              return -e.errno
> >      return null_fd
> >
> > diff --git a/python/ovs/stream.py b/python/ovs/stream.py
> > index cd57eb3..e8e5700 100644
> > --- a/python/ovs/stream.py
> > +++ b/python/ovs/stream.py
> > @@ -15,6 +15,7 @@
> >  import errno
> >  import os
> >  import socket
> > +import sys
> >
> >  import six
> >
> > @@ -27,6 +28,13 @@ try:
> >  except ImportError:
> >      SSL = None
> >
> > +if sys.platform == 'win32':
> > +    import ovs.winutils as winutils
> > +    import pywintypes
> > +    import win32event
> > +    import win32file
> > +    import win32pipe
> > +
> >  vlog = ovs.vlog.Vlog("stream")
> >
> >
> > @@ -63,6 +71,13 @@ class Stream(object):
> >      _SSL_certificate_file = None
> >      _SSL_ca_cert_file = None
> >
> > +    # Windows only
> > +    _write = None                # overlapped for write operation
> > +    _read = None                 # overlapped for read operation
> > +    _write_pending = False
> > +    _read_pending = False
> > +    _retry_connect = False
> > +
> >      @staticmethod
> >      def register_method(method, cls):
> >          Stream._SOCKET_METHODS[method + ":"] = cls
> > @@ -81,8 +96,23 @@ class Stream(object):
> >          otherwise False."""
> >          return bool(Stream._find_method(name))
> >
> > -    def __init__(self, socket, name, status):
> > +    def __init__(self, socket, name, status, pipe=None, is_server=False):
> >          self.socket = socket
> > +        self.pipe = pipe
> > +        if sys.platform == 'win32':
> > +            self._read = pywintypes.OVERLAPPED()
> > +            self._read.hEvent = winutils.get_new_event()
> > +            self._write = pywintypes.OVERLAPPED()
> > +            self._write.hEvent = winutils.get_new_event()
> > +            if pipe is not None:
> > +                # Flag to check if fd is a server HANDLE.  In the case of a
> > +                # server handle we have to issue a disconnect before 
> > closing
> > +                # the actual handle.
> > +                self._server = is_server
> > +                suffix = name.split(":", 1)[1]
> > +                suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> > +                self._pipename = winutils.get_pipe_name(suffix)
> > +
> >          http://self.name = name
> >          if status == errno.EAGAIN:
> >              self.state = Stream.__S_CONNECTING
> > @@ -120,6 +150,38 @@ class Stream(object):
> >          suffix = name.split(":", 1)[1]
> >          if name.startswith("unix:"):
> >              suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> > +            if sys.platform == 'win32':
> > +                pipename = winutils.get_pipe_name(suffix)
> > +
> > +                if len(suffix) > 255:
> > +                    # Return invalid argument if the name is too long
> > +                    return errno.ENOENT, None
> > +
> > +                try:
> > +                    # In case of "unix:" argument, the assumption is that
> > +                    # there is a file created in the path (suffix).
> > +                    open(suffix, 'r').close()
> > +                except:
> > +                    return errno.ENOENT, None
> > +
> > +                try:
> > +                    npipe = winutils.create_file(pipename)
> > +                    try:
> > +                        winutils.set_pipe_mode(npipe,
> > +                                               
> > win32pipe.PIPE_READMODE_BYTE)
> > +                    except pywintypes.error as e:
> > +                        return errno.ENOENT, None
> > +                except pywintypes.error as e:
> > +                    if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> > +                        # Pipe is busy, set the retry flag to true and 
> > retry
> > +                        # again during the connect function.
> > +                        Stream.retry_connect = True
> > +                        return 0, cls(None, name, errno.EAGAIN,
> > +                                      pipe=win32file.INVALID_HANDLE_VALUE,
> > +                                      is_server=False)
> > +                    return errno.ENOENT, None
> > +                return 0, cls(None, name, 0, pipe=npipe, is_server=False)
> > +
> >          error, sock = cls._open(suffix, dscp)
> >          if error:
> >              return error, None
> > @@ -145,6 +207,10 @@ class Stream(object):
> >          if not error:
> >              while True:
> >                  error = stream.connect()
> > +                if sys.platform == 'win32' and error == 
> > errno.WSAEWOULDBLOCK:
> > +                    # WSAEWOULDBLOCK would be the equivalent on Windows
> > +                    # for EAGAIN on Unix.
> > +                    error = errno.EAGAIN
> >                  if error != errno.EAGAIN:
> >                      break
> >                  stream.run()
> > @@ -152,7 +218,8 @@ class Stream(object):
> >                  stream.run_wait(poller)
> >                  stream.connect_wait(poller)
> >                  poller.block()
> > -            assert error != errno.EINPROGRESS
> > +            if stream.socket is not None:
> > +                assert error != errno.EINPROGRESS
> >
> >          if error and stream:
> >              stream.close()
> > @@ -160,11 +227,35 @@ class Stream(object):
> >          return error, stream
> >
> >      def close(self):
> > -        self.socket.close()
> > +        if self.socket is not None:
> > +            self.socket.close()
> > +        if self.pipe is not None:
> > +            if self._server:
> > +                win32pipe.DisconnectNamedPipe(self.pipe)
> > +            winutils.close_handle(self.pipe, vlog.warn)
> > +            winutils.close_handle(self._read.hEvent, vlog.warn)
> > +            winutils.close_handle(self._write.hEvent, vlog.warn)
> >
> >      def __scs_connecting(self):
> > -        retval = ovs.socket_util.check_connection_completion(self.socket)
> > -        assert retval != errno.EINPROGRESS
> > +        if self.socket is not None:
> > +            retval = 
> > ovs.socket_util.check_connection_completion(self.socket)
> > +            assert retval != errno.EINPROGRESS
> > +        elif sys.platform == 'win32' and self.retry_connect:
> > +            try:
> > +                self.pipe = winutils.create_file(self._pipename)
> > +                self._retry_connect = False
> > +                retval = 0
> > +            except pywintypes.error as e:
> > +                if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> > +                    retval = errno.EAGAIN
> > +                else:
> > +                    self._retry_connect = False
> > +                    retval = errno.ENOENT
> > +        else:
> > +            # Windows only, if retry_connect is false, it means it's 
> > already
> > +            # connected so we can set the value of retval to 0
> > +            retval = 0
> > +
> >          if retval == 0:
> >              self.state = Stream.__S_CONNECTED
> >          elif retval != errno.EAGAIN:
> > @@ -209,11 +300,63 @@ class Stream(object):
> >          elif n == 0:
> >              return (0, "")
> >
> > +        if sys.platform == 'win32' and self.socket is None:
> > +            return self.__recv_windows(n)
> > +
> >          try:
> >              return (0, self.socket.recv(n))
> >          except socket.error as e:
> >              return (ovs.socket_util.get_exception_errno(e), "")
> >
> > +    def __recv_windows(self, n):
> > +        if self._read_pending:
> > +            try:
> > +                nBytesRead = winutils.get_overlapped_result(self.pipe,
> > +                                                            self._read,
> > +                                                            False)
> > +                self._read_pending = False
> > +                recvBuffer = self._read_buffer[:nBytesRead]
> > +
> > +                return (0, winutils.get_decoded_buffer(recvBuffer))
> > +            except pywintypes.error as e:
> > +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> > +                    # The operation is still pending, try again
> > +                    self._read_pending = True
> > +                    return (errno.EAGAIN, "")
> > +                elif e.winerror in winutils.pipe_disconnected_errors:
> > +                    # If the pipe was disconnected, return 0.
> > +                    return (0, "")
> > +                else:
> > +                    return (errno.EINVAL, "")
> > +
> > +        (errCode, self._read_buffer) = winutils.read_file(self.pipe,
> > +                                                          n,
> > +                                                          self._read)
> > +        if errCode:
> > +            if errCode == winutils.winerror.ERROR_IO_PENDING:
> > +                self._read_pending = True
> > +                return (errno.EAGAIN, "")
> > +            elif errCode in winutils.pipe_disconnected_errors:
> > +                # If the pipe was disconnected, return 0.
> > +                return (0, "")
> > +            else:
> > +                return (errCode, "")
> > +
> > +        try:
> > +            nBytesRead = winutils.get_overlapped_result(self.pipe,
> > +                                                        self._read,
> > +                                                        False)
> > +            winutils.win32event.SetEvent(self._read.hEvent)
> > +        except pywintypes.error as e:
> > +            if e.winerror in winutils.pipe_disconnected_errors:
> > +                # If the pipe was disconnected, return 0.
> > +                return (0, "")
> > +            else:
> > +                return (e.winerror, "")
> > +
> > +        recvBuffer = self._read_buffer[:nBytesRead]
> > +        return (0, winutils.get_decoded_buffer(recvBuffer))
> > +
> >      def send(self, buf):
> >          """Tries to send 'buf' on this stream.
> >
> > @@ -231,6 +374,9 @@ class Stream(object):
> >          elif len(buf) == 0:
> >              return 0
> >
> > +        if sys.platform == 'win32' and self.socket is None:
> > +            return self.__send_windows(buf)
> > +
> >          try:
> >              # Python 3 has separate types for strings and bytes.  We must 
> > have
> >              # bytes here.
> > @@ -240,6 +386,40 @@ class Stream(object):
> >          except socket.error as e:
> >              return -ovs.socket_util.get_exception_errno(e)
> >
> > +    def __send_windows(self, buf):
> > +        if self._write_pending:
> > +            try:
> > +                nBytesWritten = winutils.get_overlapped_result(self.pipe,
> > +                                                               self._write,
> > +                                                               False)
> > +                self._write_pending = False
> > +                return nBytesWritten
> > +            except pywintypes.error as e:
> > +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> > +                    # The operation is still pending, try again
> > +                    self._read_pending = True
> > +                    return -errno.EAGAIN
> > +                elif e.winerror in winutils.pipe_disconnected_errors:
> > +                    # If the pipe was disconnected, return connection 
> > reset.
> > +                    return -errno.ECONNRESET
> > +                else:
> > +                    return -errno.EINVAL
> > +
> > +        buf = winutils.get_encoded_buffer(buf)
> > +        self._write_pending = False
> > +        (errCode, nBytesWritten) = winutils.write_file(self.pipe,
> > +                                                       buf,
> > +                                                       self._write)
> > +        if errCode:
> > +            if errCode == winutils.winerror.ERROR_IO_PENDING:
> > +                self._write_pending = True
> > +                return -errno.EAGAIN
> > +            if (not nBytesWritten and
> > +                    errCode in winutils.pipe_disconnected_errors):
> > +                # If the pipe was disconnected, return connection reset.
> > +                return -errno.ECONNRESET
> > +        return nBytesWritten
> > +
> >      def run(self):
> >          pass
> >
> > @@ -255,11 +435,52 @@ class Stream(object):
> >
> >          if self.state == Stream.__S_CONNECTING:
> >              wait = Stream.W_CONNECT
> > +
> > +        if sys.platform == 'win32':
> > +            self.__wait_windows(poller, wait)
> > +            return
> > +
> >          if wait == Stream.W_RECV:
> >              poller.fd_wait(self.socket, ovs.poller.POLLIN)
> >          else:
> >              poller.fd_wait(self.socket, ovs.poller.POLLOUT)
> >
> > +    def __wait_windows(self, poller, wait):
> > +        if self.socket is not None:
> > +            if wait == Stream.W_RECV:
> > +                read_flags = (win32file.FD_READ |
> > +                              win32file.FD_ACCEPT |
> > +                              win32file.FD_CLOSE)
> > +                try:
> > +                    win32file.WSAEventSelect(self.socket,
> > +                                             self._read.hEvent,
> > +                                             read_flags)
> > +                except pywintypes.error as e:
> > +                    vlog.err("failed to associate events with socket: %s"
> > +                             % e.strerror)
> > +                poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
> > +            else:
> > +                write_flags = (win32file.FD_WRITE |
> > +                               win32file.FD_CONNECT |
> > +                               win32file.FD_CLOSE)
> > +                try:
> > +                    win32file.WSAEventSelect(self.socket,
> > +                                             self._write.hEvent,
> > +                                             write_flags)
> > +                except pywintypes.error as e:
> > +                    vlog.err("failed to associate events with socket: %s"
> > +                             % e.strerror)
> > +                poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
> > +        else:
> > +            if wait == Stream.W_RECV:
> > +                if self._read:
> > +                    poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
> > +            elif wait == Stream.W_SEND:
> > +                if self._write:
> > +                    poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
> > +            elif wait == Stream.W_CONNECT:
> > +                return
> > +
> >      def connect_wait(self, poller):
> >          self.wait(poller, Stream.W_CONNECT)
> >
> > @@ -267,11 +488,22 @@ class Stream(object):
> >          self.wait(poller, Stream.W_RECV)
> >
> >      def send_wait(self, poller):
> > +        if sys.platform == 'win32':
> > +            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
> >          self.wait(poller, Stream.W_SEND)
> >
> >      def __del__(self):
> >          # Don't delete the file: we might have forked.
> > -        self.socket.close()
> > +        if self.socket is not None:
> > +            self.socket.close()
> > +        if self.pipe is not None:
> > +            # Check if there are any remaining valid handles and close them
> > +            if self.pipe:
> > +                winutils.close_handle(self.pipe)
> > +            if self._read.hEvent:
> > +                winutils.close_handle(self._read.hEvent)
> > +            if self._write.hEvent:
> > +                winutils.close_handle(self._write.hEvent)
> >
> >      @staticmethod
> >      def ssl_set_private_key_file(file_name):
> > @@ -287,6 +519,10 @@ class Stream(object):
> >
> >
> >  class PassiveStream(object):
> > +    # Windows only
> > +    connect = None                  # overlapped for read operation
> > +    connect_pending = False
> > +
> >      @staticmethod
> >      def is_valid_name(name):
> >          """Returns True if 'name' is a passive stream name in the form
> > @@ -294,9 +530,18 @@ class PassiveStream(object):
> >          "punix:" or "ptcp"), otherwise False."""
> >          return name.startswith("punix:") | name.startswith("ptcp:")
> >
> > -    def __init__(self, sock, name, bind_path):
> > +    def __init__(self, sock, name, bind_path, pipe=None):
> >          http://self.name = name
> > +        self.pipe = pipe
> >          self.socket = sock
> > +        if pipe is not None:
> > +            self.connect = pywintypes.OVERLAPPED()
> > +            self.connect.hEvent = winutils.get_new_event(bManualReset=True)
> > +            self.connect_pending = False
> > +            suffix = name.split(":", 1)[1]
> > +            suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> > +            self._pipename = winutils.get_pipe_name(suffix)
> > +
> >          self.bind_path = bind_path
> >
> >      @staticmethod
> > @@ -315,11 +560,27 @@ class PassiveStream(object):
> >          bind_path = name[6:]
> >          if name.startswith("punix:"):
> >              bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
> > -            error, sock = 
> > ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
> > -                                                           True, bind_path,
> > -                                                           None)
> > -            if error:
> > -                return error, None
> > +            if sys.platform != 'win32':
> > +                error, sock = ovs.socket_util.make_unix_socket(
> > +                    socket.SOCK_STREAM, True, bind_path, None)
> > +                if error:
> > +                    return error, None
> > +            else:
> > +                # Branch used only on Windows
> > +                try:
> > +                    open(bind_path, 'w').close()
> > +                except:
> > +                    return errno.ENOENT, None
> > +
> > +                pipename = winutils.get_pipe_name(bind_path)
> > +                if len(pipename) > 255:
> > +                    # Return invalid argument if the name is too long
> > +                    return errno.ENOENT, None
> > +
> > +                npipe = winutils.create_named_pipe(pipename)
> > +                if not npipe:
> > +                    return errno.ENOENT, None
> > +                return 0, PassiveStream(None, name, bind_path, pipe=npipe)
> >
> >          elif name.startswith("ptcp:"):
> >              sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> > @@ -341,7 +602,11 @@ class PassiveStream(object):
> >
> >      def close(self):
> >          """Closes this PassiveStream."""
> > -        self.socket.close()
> > +        if self.socket is not None:
> > +            self.socket.close()
> > +        if self.pipe is not None:
> > +            winutils.close_handle(self.pipe, vlog.warn)
> > +            winutils.close_handle(self.connect.hEvent, vlog.warn)
> >          if self.bind_path is not None:
> >              ovs.fatal_signal.unlink_file_now(self.bind_path)
> >              self.bind_path = None
> > @@ -354,28 +619,80 @@ class PassiveStream(object):
> >
> >          Will not block waiting for a connection.  If no connection is 
> > ready to
> >          be accepted, returns (errno.EAGAIN, None) immediately."""
> > -
> > +        if sys.platform == 'win32' and self.socket is None:
> > +            return self.__accept_windows()
> >          while True:
> >              try:
> >                  sock, addr = self.socket.accept()
> >                  ovs.socket_util.set_nonblocking(sock)
> > -                if (sock.family == socket.AF_UNIX):
> > +                if (sys.platform != 'win32' and sock.family == 
> > socket.AF_UNIX):
> >                      return 0, Stream(sock, "unix:%s" % addr, 0)
> >                  return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
> >                                                         str(addr[1])), 0)
> >              except socket.error as e:
> >                  error = ovs.socket_util.get_exception_errno(e)
> > +                if sys.platform == 'win32' and error == 
> > errno.WSAEWOULDBLOCK:
> > +                    # WSAEWOULDBLOCK would be the equivalent on Windows
> > +                    # for EAGAIN on Unix.
> > +                    error = errno.EAGAIN
> >                  if error != errno.EAGAIN:
> >                      # XXX rate-limit
> >                      vlog.dbg("accept: %s" % os.strerror(error))
> >                  return error, None
> >
> > +    def __accept_windows(self):
> > +        if self.connect_pending:
> > +            try:
> > +                winutils.get_overlapped_result(self.pipe, self.connect, 
> > False)
> > +            except pywintypes.error as e:
> > +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> > +                    # The operation is still pending, try again
> > +                    self.connect_pending = True
> > +                    return errno.EAGAIN, None
> > +                else:
> > +                    if self.pipe:
> > +                        win32pipe.DisconnectNamedPipe(self.pipe)
> > +                    return errno.EINVAL, None
> > +            self.connect_pending = False
> > +
> > +        error = winutils.connect_named_pipe(self.pipe, self.connect)
> > +        if error:
> > +            if error == winutils.winerror.ERROR_IO_PENDING:
> > +                self.connect_pending = True
> > +                return errno.EAGAIN, None
> > +            elif error != winutils.winerror.ERROR_PIPE_CONNECTED:
> > +                if self.pipe:
> > +                    win32pipe.DisconnectNamedPipe(self.pipe)
> > +                self.connect_pending = False
> > +                return errno.EINVAL, None
> > +            else:
> > +                win32event.SetEvent(self.connect.hEvent)
> > +
> > +        npipe = winutils.create_named_pipe(self._pipename)
> > +        if not npipe:
> > +            return errno.ENOENT, None
> > +
> > +        old_pipe = self.pipe
> > +        self.pipe = npipe
> > +        winutils.win32event.ResetEvent(self.connect.hEvent)
> > +        return 0, Stream(None, http://self.name, 0, pipe=old_pipe)
> > +
> >      def wait(self, poller):
> > -        poller.fd_wait(self.socket, ovs.poller.POLLIN)
> > +        if sys.platform != 'win32' or self.socket is not None:
> > +            poller.fd_wait(self.socket, ovs.poller.POLLIN)
> > +        else:
> > +            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
> >
> >      def __del__(self):
> >          # Don't delete the file: we might have forked.
> > -        self.socket.close()
> > +        if self.socket is not None:
> > +            self.socket.close()
> > +        if self.pipe is not None:
> > +            # Check if there are any remaining valid handles and close them
> > +            if self.pipe:
> > +                winutils.close_handle(self.pipe)
> > +            if self._connect.hEvent:
> > +                winutils.close_handle(self._read.hEvent)
> >
> >
> >  def usage(name):
> > diff --git a/python/ovs/unixctl/server.py b/python/ovs/unixctl/server.py
> > index 8595ed8..3f3e051 100644
> > --- a/python/ovs/unixctl/server.py
> > +++ b/python/ovs/unixctl/server.py
> > @@ -148,6 +148,10 @@ class UnixctlServer(object):
> >      def run(self):
> >          for _ in range(10):
> >              error, stream = self._listener.accept()
> > +            if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK:
> > +                # WSAEWOULDBLOCK would be the equivalent on Windows
> > +                # for EAGAIN on Unix.
> > +                error = errno.EAGAIN
> >              if not error:
> >                  rpc = ovs.jsonrpc.Connection(stream)
> >                  self._conns.append(UnixctlConnection(rpc))
> > diff --git a/tests/test-jsonrpc.py b/tests/test-jsonrpc.py
> > index 18634e6..3eabcd7 100644
> > --- a/tests/test-jsonrpc.py
> > +++ b/tests/test-jsonrpc.py
> > @@ -53,11 +53,17 @@ def handle_rpc(rpc, msg):
> >
> >
> >  def do_listen(name):
> > -    error, pstream = ovs.stream.PassiveStream.open(name)
> > -    if error:
> > -        sys.stderr.write("could not listen on \"%s\": %s\n"
> > -                         % (name, os.strerror(error)))
> > -        sys.exit(1)
> > +    if sys.platform != 'win32' or (
> > +            ovs.daemon._detach and ovs.daemon._detached):
> > +        # On Windows the child is a new process created which should be the
> > +        # one that creates the PassiveStream. Without this check, the new
> > +        # child process will create a new PassiveStream overwriting the one
> > +        # that the parent process created.
> > +        error, pstream = ovs.stream.PassiveStream.open(name)
> > +        if error:
> > +            sys.stderr.write("could not listen on \"%s\": %s\n"
> > +                             % (name, os.strerror(error)))
> > +            sys.exit(1)
> >
> >      ovs.daemon.daemonize()
> >
> > --
> > 2.10.0.windows.1
> > _______________________________________________
> > dev mailing list
> > mailto:mailto:d...@openvswitch.org
> > https://mail.openvswitch.org/mailman/listinfo/ovs-dev
_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to