Thanks for the review, I will send the patches soon.

> From: Guru Shetty [mailto:g...@ovn.org] 
> Sent: Tuesday, January 3, 2017 9:35 PM
> To: Alin Balutoiu <abalut...@cloudbasesolutions.com>
> Cc: d...@openvswitch.org
> Subject: Re: [ovs-dev] [PATCH V3 2/5] Python tests: Ported UNIX sockets to 
> Windows
> 
> 
> 
> On 3 January 2017 at 11:16, Alin Balutoiu 
> <mailto:abalut...@cloudbasesolutions.com> wrote:
> Ok, I'll include that in the next version.
> Mind if I respin the series after you take a look over the rest of the 
> patches?
> 
> I am done looking at the rest of the patches. When you respin, please fix the 
> spelling of "intented" in the first patch of the series. Also, in the first 
> patch, edit (and add) AUTHORS file to include yourself. 
> 
> Thanks.
>  
> 
> > From: Guru Shetty [mailto:mailto:g...@ovn.org]
> > Sent: Tuesday, January 3, 2017 9:09 PM
> > To: Alin Balutoiu <mailto:abalut...@cloudbasesolutions.com>
> > Cc: mailto:d...@openvswitch.org
> > Subject: Re: [ovs-dev] [PATCH V3 2/5] Python tests: Ported UNIX sockets to 
> > Windows
> >
> >
> >
> > On 3 January 2017 at 11:03, Alin Balutoiu 
> > <mailto:mailto:abalut...@cloudbasesolutions.com> wrote:
> > Thanks for the comment.
> > The socket attribute of the class Stream cannot be None if the code runs on 
> > Unix.
> > Therefore the condition "self.socket is not None" is True only if sockets 
> > are not used,
> > and named pipes are used instead (i.e. it runs on Windows).
> >
> > However, if you prefer I can replace it with the following code:
> > I like it better with the below piece. So please go ahead and send it as 
> > part of the next version.
> >
> >     if self.socket is not None:
> >         retval = ovs.socket_util.check_connection_completion(self.socket)
> >         assert retval != errno.EINPROGRESS
> >     elif sys.platform == 'win32':
> >         if self.retry_connect:
> >             try:
> >                 self.pipe = winutils.create_file(self._pipename)
> >                 self._retry_connect = False
> >                 retval = 0
> >             except pywintypes.error as e:
> >                 if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> >                     retval = errno.EAGAIN
> >                 else:
> >                     self._retry_connect = False
> >                     retval = errno.ENOENT
> >         else:
> >             # If retry_connect is false, it means it's already
> >             # connected so we can set the value of retval to 0
> >             retval = 0
> >
> >
> > > From: Guru Shetty [mailto:mailto:mailto:mailto:g...@ovn.org]
> > > Sent: Tuesday, January 3, 2017 8:17 PM
> > > To: Alin Balutoiu <mailto:mailto:abalut...@cloudbasesolutions.com>
> > > Cc: mailto:mailto:d...@openvswitch.org
> > > Subject: Re: [ovs-dev] [PATCH V3 2/5] Python tests: Ported UNIX sockets 
> > > to Windows
> > >
> > >
> > >
> > > On 3 January 2017 at 08:46, Alin Balutoiu 
> > > <mailto:mailto:mailto:mailto:abalut...@cloudbasesolutions.com> wrote:
> > > From: Alin Balutoiu 
> > > <mailto:mailto:mailto:mailto:abalut...@cloudbasesolutions.com>
> > >
> > > Unix sockets (AF_UNIX) are not supported on Windows.
> > > The replacement of Unix sockets on Windows is implemented
> > > using named pipes, we are trying to mimic the behaviour
> > > of unix sockets.
> > >
> > > Instead of using Unix sockets to communicate
> > > between components Named Pipes are used. This
> > > makes the python sockets compatible with the
> > > Named Pipe used in Windows applications.
> > >
> > > Signed-off-by: Paul-Daniel Boca 
> > > <mailto:mailto:mailto:mailto:pb...@cloudbasesolutions.com>
> > > Signed-off-by: Alin Balutoiu 
> > > <mailto:mailto:mailto:mailto:abalut...@cloudbasesolutions.com>
> > > Acked-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions>
> > > Tested-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions>
> > > ---
> > > V2: No changes.
> > > V3: Changed Signed-off-by name and added previous Acked-by's, Tested-by's.
> > >
> > > I intend to add the following diff:
> > >
> > > diff --git a/python/ovs/stream.py b/python/ovs/stream.py
> > > index e8e5700..af35afd 100644
> > > --- a/python/ovs/stream.py
> > > +++ b/python/ovs/stream.py
> > > @@ -251,7 +251,7 @@ class Stream(object):
> > >                  else:
> > >                      self._retry_connect = False
> > >                      retval = errno.ENOENT
> > > -        else:
> > > +        elif sys.platform == 'win32':
> > >              # Windows only, if retry_connect is false, it means it's 
> > > already
> > >              # connected so we can set the value of retval to 0
> > >              retval = 0
> > >
> > >
> > > ---
> > >  python/ovs/jsonrpc.py        |   6 +
> > >  python/ovs/poller.py         |  72 ++++++---
> > >  python/ovs/socket_util.py    |  31 +++-
> > >  python/ovs/stream.py         | 351 
> > > ++++++++++++++++++++++++++++++++++++++++---
> > >  python/ovs/unixctl/server.py |   4 +
> > >  tests/test-jsonrpc.py        |  16 +-
> > >  6 files changed, 436 insertions(+), 44 deletions(-)
> > >
> > > diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py
> > > index 6300c67..5a11500 100644
> > > --- a/python/ovs/jsonrpc.py
> > > +++ b/python/ovs/jsonrpc.py
> > > @@ -14,6 +14,7 @@
> > >
> > >  import errno
> > >  import os
> > > +import sys
> > >
> > >  import six
> > >
> > > @@ -274,6 +275,11 @@ class Connection(object):
> > >                      except UnicodeError:
> > >                          error = errno.EILSEQ
> > >                  if error:
> > > +                    if (sys.platform == "win32" and
> > > +                            error == errno.WSAEWOULDBLOCK):
> > > +                        # WSAEWOULDBLOCK would be the equivalent on 
> > > Windows
> > > +                        # for EAGAIN on Unix.
> > > +                        error = errno.EAGAIN
> > >                      if error == errno.EAGAIN:
> > >                          return error, None
> > >                      else:
> > > diff --git a/python/ovs/poller.py b/python/ovs/poller.py
> > > index d7cb7d3..d836483 100644
> > > --- a/python/ovs/poller.py
> > > +++ b/python/ovs/poller.py
> > > @@ -18,6 +18,10 @@ import ovs.vlog
> > >  import select
> > >  import socket
> > >  import os
> > > +import sys
> > > +
> > > +if sys.platform == "win32":
> > > +    import ovs.winutils as winutils
> > >
> > >  try:
> > >      from OpenSSL import SSL
> > > @@ -62,7 +66,9 @@ class _SelectSelect(object):
> > >          if SSL and isinstance(fd, SSL.Connection):
> > >              fd = fd.fileno()
> > >
> > > -        assert isinstance(fd, int)
> > > +        if sys.platform != 'win32':
> > > +            # Skip this on Windows, it also register events
> > > +            assert isinstance(fd, int)
> > >          if events & POLLIN:
> > >              self.rlist.append(fd)
> > >              events &= ~POLLIN
> > > @@ -73,28 +79,58 @@ class _SelectSelect(object):
> > >              self.xlist.append(fd)
> > >
> > >      def poll(self, timeout):
> > > -        if timeout == -1:
> > > -            # epoll uses -1 for infinite timeout, select uses None.
> > > -            timeout = None
> > > -        else:
> > > -            timeout = float(timeout) / 1000
> > >          # XXX workaround a bug in eventlet
> > >          # see https://github.com/eventlet/eventlet/pull/25
> > >          if timeout == 0 and _using_eventlet_green_select():
> > >              timeout = 0.1
> > > +        if sys.platform == 'win32':
> > > +            events = self.rlist + self.wlist + self.xlist
> > > +            if not events:
> > > +                return []
> > > +            if len(events) > winutils.win32event.MAXIMUM_WAIT_OBJECTS:
> > > +                raise WindowsError("Cannot handle more than maximum wait"
> > > +                                   "objects\n")
> > > +
> > > +            # win32event.INFINITE timeout is -1
> > > +            # timeout must be an int number, expressed in ms
> > > +            if timeout == 0.1:
> > > +                timeout = 100
> > > +            else:
> > > +                timeout = int(timeout)
> > > +
> > > +            # Wait until any of the events is set to signaled
> > > +            try:
> > > +                retval = winutils.win32event.WaitForMultipleObjects(
> > > +                    events,
> > > +                    False,  # Wait all
> > > +                    timeout)
> > > +            except winutils.pywintypes.error:
> > > +                    return [(0, POLLERR)]
> > >
> > > -        rlist, wlist, xlist = select.select(self.rlist, self.wlist, 
> > > self.xlist,
> > > -                                            timeout)
> > > -        events_dict = {}
> > > -        for fd in rlist:
> > > -            events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> > > -        for fd in wlist:
> > > -            events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> > > -        for fd in xlist:
> > > -            events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> > > -                                                        POLLHUP |
> > > -                                                        POLLNVAL)
> > > -        return list(events_dict.items())
> > > +            if retval == winutils.winerror.WAIT_TIMEOUT:
> > > +                return []
> > > +
> > > +            return [(events[retval], 0)]
> > > +        else:
> > > +            if timeout == -1:
> > > +                # epoll uses -1 for infinite timeout, select uses None.
> > > +                timeout = None
> > > +            else:
> > > +                timeout = float(timeout) / 1000
> > > +            rlist, wlist, xlist = select.select(self.rlist,
> > > +                                                self.wlist,
> > > +                                                self.xlist,
> > > +                                                timeout)
> > > +            events_dict = {}
> > > +            for fd in rlist:
> > > +                events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> > > +            for fd in wlist:
> > > +                events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> > > +            for fd in xlist:
> > > +                events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> > > +                                                            POLLHUP |
> > > +                                                            POLLNVAL)
> > > +            return list(events_dict.items())
> > >
> > >
> > >  SelectPoll = _SelectSelect
> > > diff --git a/python/ovs/socket_util.py b/python/ovs/socket_util.py
> > > index b358b05..fb6cee4 100644
> > > --- a/python/ovs/socket_util.py
> > > +++ b/python/ovs/socket_util.py
> > > @@ -17,6 +17,7 @@ import os
> > >  import os.path
> > >  import random
> > >  import socket
> > > +import sys
> > >
> > >  import six
> > >  from six.moves import range
> > > @@ -25,6 +26,10 @@ import ovs.fatal_signal
> > >  import ovs.poller
> > >  import ovs.vlog
> > >
> > > +if sys.platform == 'win32':
> > > +    import ovs.winutils as winutils
> > > +    import win32file
> > > +
> > >  vlog = ovs.vlog.Vlog("socket_util")
> > >
> > >
> > > @@ -158,7 +163,17 @@ def make_unix_socket(style, nonblock, bind_path, 
> > > connect_path, short=False):
> > >
> > >  def check_connection_completion(sock):
> > >      p = ovs.poller.SelectPoll()
> > > -    p.register(sock, ovs.poller.POLLOUT)
> > > +    if sys.platform == "win32":
> > > +        event = winutils.get_new_event(None, False, True, None)
> > > +        # Receive notification of readiness for writing, of completed
> > > +        # connection or multipoint join operation, and of socket closure.
> > > +        win32file.WSAEventSelect(sock, event,
> > > +                                 win32file.FD_WRITE |
> > > +                                 win32file.FD_CONNECT |
> > > +                                 win32file.FD_CLOSE)
> > > +        p.register(event, ovs.poller.POLLOUT)
> > > +    else:
> > > +        p.register(sock, ovs.poller.POLLOUT)
> > >      pfds = p.poll(0)
> > >      if len(pfds) == 1:
> > >          revents = pfds[0][1]
> > > @@ -228,7 +243,12 @@ def inet_open_active(style, target, default_port, 
> > > dscp):
> > >          try:
> > >              sock.connect(address)
> > >          except socket.error as e:
> > > -            if get_exception_errno(e) != errno.EINPROGRESS:
> > > +            error = get_exception_errno(e)
> > > +            if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
> > > +                # WSAEWOULDBLOCK would be the equivalent on Windows
> > > +                # for EINPROGRESS on Unix.
> > > +                error = errno.EINPROGRESS
> > > +            if error != errno.EINPROGRESS:
> > >                  raise
> > >          return 0, sock
> > >      except socket.error as e:
> > > @@ -257,9 +277,12 @@ def get_null_fd():
> > >      global null_fd
> > >      if null_fd < 0:
> > >          try:
> > > -            null_fd = os.open("/dev/null", os.O_RDWR)
> > > +            # os.devnull ensures compatibility with Windows, returns
> > > +            # '/dev/null' for Unix and 'nul' for Windows
> > > +            null_fd = os.open(os.devnull, os.O_RDWR)
> > >          except OSError as e:
> > > -            vlog.err("could not open /dev/null: %s" % 
> > > os.strerror(e.errno))
> > > +            vlog.err("could not open %s: %s" % (os.devnull,
> > > +                                                os.strerror(e.errno)))
> > >              return -e.errno
> > >      return null_fd
> > >
> > > diff --git a/python/ovs/stream.py b/python/ovs/stream.py
> > > index cd57eb3..e8e5700 100644
> > > --- a/python/ovs/stream.py
> > > +++ b/python/ovs/stream.py
> > > @@ -15,6 +15,7 @@
> > >  import errno
> > >  import os
> > >  import socket
> > > +import sys
> > >
> > >  import six
> > >
> > > @@ -27,6 +28,13 @@ try:
> > >  except ImportError:
> > >      SSL = None
> > >
> > > +if sys.platform == 'win32':
> > > +    import ovs.winutils as winutils
> > > +    import pywintypes
> > > +    import win32event
> > > +    import win32file
> > > +    import win32pipe
> > > +
> > >  vlog = ovs.vlog.Vlog("stream")
> > >
> > >
> > > @@ -63,6 +71,13 @@ class Stream(object):
> > >      _SSL_certificate_file = None
> > >      _SSL_ca_cert_file = None
> > >
> > > +    # Windows only
> > > +    _write = None                # overlapped for write operation
> > > +    _read = None                 # overlapped for read operation
> > > +    _write_pending = False
> > > +    _read_pending = False
> > > +    _retry_connect = False
> > > +
> > >      @staticmethod
> > >      def register_method(method, cls):
> > >          Stream._SOCKET_METHODS[method + ":"] = cls
> > > @@ -81,8 +96,23 @@ class Stream(object):
> > >          otherwise False."""
> > >          return bool(Stream._find_method(name))
> > >
> > > -    def __init__(self, socket, name, status):
> > > +    def __init__(self, socket, name, status, pipe=None, is_server=False):
> > >          self.socket = socket
> > > +        self.pipe = pipe
> > > +        if sys.platform == 'win32':
> > > +            self._read = pywintypes.OVERLAPPED()
> > > +            self._read.hEvent = winutils.get_new_event()
> > > +            self._write = pywintypes.OVERLAPPED()
> > > +            self._write.hEvent = winutils.get_new_event()
> > > +            if pipe is not None:
> > > +                # Flag to check if fd is a server HANDLE.  In the case 
> > > of a
> > > +                # server handle we have to issue a disconnect before 
> > > closing
> > > +                # the actual handle.
> > > +                self._server = is_server
> > > +                suffix = name.split(":", 1)[1]
> > > +                suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> > > +                self._pipename = winutils.get_pipe_name(suffix)
> > > +
> > >          http://self.name = name
> > >          if status == errno.EAGAIN:
> > >              self.state = Stream.__S_CONNECTING
> > > @@ -120,6 +150,38 @@ class Stream(object):
> > >          suffix = name.split(":", 1)[1]
> > >          if name.startswith("unix:"):
> > >              suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> > > +            if sys.platform == 'win32':
> > > +                pipename = winutils.get_pipe_name(suffix)
> > > +
> > > +                if len(suffix) > 255:
> > > +                    # Return invalid argument if the name is too long
> > > +                    return errno.ENOENT, None
> > > +
> > > +                try:
> > > +                    # In case of "unix:" argument, the assumption is that
> > > +                    # there is a file created in the path (suffix).
> > > +                    open(suffix, 'r').close()
> > > +                except:
> > > +                    return errno.ENOENT, None
> > > +
> > > +                try:
> > > +                    npipe = winutils.create_file(pipename)
> > > +                    try:
> > > +                        winutils.set_pipe_mode(npipe,
> > > +                                               
> > > win32pipe.PIPE_READMODE_BYTE)
> > > +                    except pywintypes.error as e:
> > > +                        return errno.ENOENT, None
> > > +                except pywintypes.error as e:
> > > +                    if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> > > +                        # Pipe is busy, set the retry flag to true and 
> > > retry
> > > +                        # again during the connect function.
> > > +                        Stream.retry_connect = True
> > > +                        return 0, cls(None, name, errno.EAGAIN,
> > > +                                      
> > > pipe=win32file.INVALID_HANDLE_VALUE,
> > > +                                      is_server=False)
> > > +                    return errno.ENOENT, None
> > > +                return 0, cls(None, name, 0, pipe=npipe, is_server=False)
> > > +
> > >          error, sock = cls._open(suffix, dscp)
> > >          if error:
> > >              return error, None
> > > @@ -145,6 +207,10 @@ class Stream(object):
> > >          if not error:
> > >              while True:
> > >                  error = stream.connect()
> > > +                if sys.platform == 'win32' and error == 
> > > errno.WSAEWOULDBLOCK:
> > > +                    # WSAEWOULDBLOCK would be the equivalent on Windows
> > > +                    # for EAGAIN on Unix.
> > > +                    error = errno.EAGAIN
> > >                  if error != errno.EAGAIN:
> > >                      break
> > >                  stream.run()
> > > @@ -152,7 +218,8 @@ class Stream(object):
> > >                  stream.run_wait(poller)
> > >                  stream.connect_wait(poller)
> > >                  poller.block()
> > > -            assert error != errno.EINPROGRESS
> > > +            if stream.socket is not None:
> > > +                assert error != errno.EINPROGRESS
> > >
> > >          if error and stream:
> > >              stream.close()
> > > @@ -160,11 +227,35 @@ class Stream(object):
> > >          return error, stream
> > >
> > >      def close(self):
> > > -        self.socket.close()
> > > +        if self.socket is not None:
> > > +            self.socket.close()
> > > +        if self.pipe is not None:
> > > +            if self._server:
> > > +                win32pipe.DisconnectNamedPipe(self.pipe)
> > > +            winutils.close_handle(self.pipe, vlog.warn)
> > > +            winutils.close_handle(self._read.hEvent, vlog.warn)
> > > +            winutils.close_handle(self._write.hEvent, vlog.warn)
> > >
> > >      def __scs_connecting(self):
> > > -        retval = ovs.socket_util.check_connection_completion(self.socket)
> > > -        assert retval != errno.EINPROGRESS
> > > +        if self.socket is not None:
> > > +            retval = 
> > > ovs.socket_util.check_connection_completion(self.socket)
> > > +            assert retval != errno.EINPROGRESS
> > > +        elif sys.platform == 'win32' and self.retry_connect:
> > > +            try:
> > > +                self.pipe = winutils.create_file(self._pipename)
> > > +                self._retry_connect = False
> > > +                retval = 0
> > > +            except pywintypes.error as e:
> > > +                if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> > > +                    retval = errno.EAGAIN
> > > +                else:
> > > +                    self._retry_connect = False
> > > +                    retval = errno.ENOENT
> > > +        else:
> > > +            # Windows only, if retry_connect is false, it means it's 
> > > already
> > > +            # connected so we can set the value of retval to 0
> > > +            retval = 0
> > > +
> > >          if retval == 0:
> > >              self.state = Stream.__S_CONNECTED
> > >          elif retval != errno.EAGAIN:
> > > @@ -209,11 +300,63 @@ class Stream(object):
> > >          elif n == 0:
> > >              return (0, "")
> > >
> > > +        if sys.platform == 'win32' and self.socket is None:
> > > +            return self.__recv_windows(n)
> > > +
> > >          try:
> > >              return (0, self.socket.recv(n))
> > >          except socket.error as e:
> > >              return (ovs.socket_util.get_exception_errno(e), "")
> > >
> > > +    def __recv_windows(self, n):
> > > +        if self._read_pending:
> > > +            try:
> > > +                nBytesRead = winutils.get_overlapped_result(self.pipe,
> > > +                                                            self._read,
> > > +                                                            False)
> > > +                self._read_pending = False
> > > +                recvBuffer = self._read_buffer[:nBytesRead]
> > > +
> > > +                return (0, winutils.get_decoded_buffer(recvBuffer))
> > > +            except pywintypes.error as e:
> > > +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> > > +                    # The operation is still pending, try again
> > > +                    self._read_pending = True
> > > +                    return (errno.EAGAIN, "")
> > > +                elif e.winerror in winutils.pipe_disconnected_errors:
> > > +                    # If the pipe was disconnected, return 0.
> > > +                    return (0, "")
> > > +                else:
> > > +                    return (errno.EINVAL, "")
> > > +
> > > +        (errCode, self._read_buffer) = winutils.read_file(self.pipe,
> > > +                                                          n,
> > > +                                                          self._read)
> > > +        if errCode:
> > > +            if errCode == winutils.winerror.ERROR_IO_PENDING:
> > > +                self._read_pending = True
> > > +                return (errno.EAGAIN, "")
> > > +            elif errCode in winutils.pipe_disconnected_errors:
> > > +                # If the pipe was disconnected, return 0.
> > > +                return (0, "")
> > > +            else:
> > > +                return (errCode, "")
> > > +
> > > +        try:
> > > +            nBytesRead = winutils.get_overlapped_result(self.pipe,
> > > +                                                        self._read,
> > > +                                                        False)
> > > +            winutils.win32event.SetEvent(self._read.hEvent)
> > > +        except pywintypes.error as e:
> > > +            if e.winerror in winutils.pipe_disconnected_errors:
> > > +                # If the pipe was disconnected, return 0.
> > > +                return (0, "")
> > > +            else:
> > > +                return (e.winerror, "")
> > > +
> > > +        recvBuffer = self._read_buffer[:nBytesRead]
> > > +        return (0, winutils.get_decoded_buffer(recvBuffer))
> > > +
> > >      def send(self, buf):
> > >          """Tries to send 'buf' on this stream.
> > >
> > > @@ -231,6 +374,9 @@ class Stream(object):
> > >          elif len(buf) == 0:
> > >              return 0
> > >
> > > +        if sys.platform == 'win32' and self.socket is None:
> > > +            return self.__send_windows(buf)
> > > +
> > >          try:
> > >              # Python 3 has separate types for strings and bytes.  We 
> > > must have
> > >              # bytes here.
> > > @@ -240,6 +386,40 @@ class Stream(object):
> > >          except socket.error as e:
> > >              return -ovs.socket_util.get_exception_errno(e)
> > >
> > > +    def __send_windows(self, buf):
> > > +        if self._write_pending:
> > > +            try:
> > > +                nBytesWritten = winutils.get_overlapped_result(self.pipe,
> > > +                                                               
> > > self._write,
> > > +                                                               False)
> > > +                self._write_pending = False
> > > +                return nBytesWritten
> > > +            except pywintypes.error as e:
> > > +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> > > +                    # The operation is still pending, try again
> > > +                    self._read_pending = True
> > > +                    return -errno.EAGAIN
> > > +                elif e.winerror in winutils.pipe_disconnected_errors:
> > > +                    # If the pipe was disconnected, return connection 
> > > reset.
> > > +                    return -errno.ECONNRESET
> > > +                else:
> > > +                    return -errno.EINVAL
> > > +
> > > +        buf = winutils.get_encoded_buffer(buf)
> > > +        self._write_pending = False
> > > +        (errCode, nBytesWritten) = winutils.write_file(self.pipe,
> > > +                                                       buf,
> > > +                                                       self._write)
> > > +        if errCode:
> > > +            if errCode == winutils.winerror.ERROR_IO_PENDING:
> > > +                self._write_pending = True
> > > +                return -errno.EAGAIN
> > > +            if (not nBytesWritten and
> > > +                    errCode in winutils.pipe_disconnected_errors):
> > > +                # If the pipe was disconnected, return connection reset.
> > > +                return -errno.ECONNRESET
> > > +        return nBytesWritten
> > > +
> > >      def run(self):
> > >          pass
> > >
> > > @@ -255,11 +435,52 @@ class Stream(object):
> > >
> > >          if self.state == Stream.__S_CONNECTING:
> > >              wait = Stream.W_CONNECT
> > > +
> > > +        if sys.platform == 'win32':
> > > +            self.__wait_windows(poller, wait)
> > > +            return
> > > +
> > >          if wait == Stream.W_RECV:
> > >              poller.fd_wait(self.socket, ovs.poller.POLLIN)
> > >          else:
> > >              poller.fd_wait(self.socket, ovs.poller.POLLOUT)
> > >
> > > +    def __wait_windows(self, poller, wait):
> > > +        if self.socket is not None:
> > > +            if wait == Stream.W_RECV:
> > > +                read_flags = (win32file.FD_READ |
> > > +                              win32file.FD_ACCEPT |
> > > +                              win32file.FD_CLOSE)
> > > +                try:
> > > +                    win32file.WSAEventSelect(self.socket,
> > > +                                             self._read.hEvent,
> > > +                                             read_flags)
> > > +                except pywintypes.error as e:
> > > +                    vlog.err("failed to associate events with socket: %s"
> > > +                             % e.strerror)
> > > +                poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
> > > +            else:
> > > +                write_flags = (win32file.FD_WRITE |
> > > +                               win32file.FD_CONNECT |
> > > +                               win32file.FD_CLOSE)
> > > +                try:
> > > +                    win32file.WSAEventSelect(self.socket,
> > > +                                             self._write.hEvent,
> > > +                                             write_flags)
> > > +                except pywintypes.error as e:
> > > +                    vlog.err("failed to associate events with socket: %s"
> > > +                             % e.strerror)
> > > +                poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
> > > +        else:
> > > +            if wait == Stream.W_RECV:
> > > +                if self._read:
> > > +                    poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
> > > +            elif wait == Stream.W_SEND:
> > > +                if self._write:
> > > +                    poller.fd_wait(self._write.hEvent, 
> > > ovs.poller.POLLOUT)
> > > +            elif wait == Stream.W_CONNECT:
> > > +                return
> > > +
> > >      def connect_wait(self, poller):
> > >          self.wait(poller, Stream.W_CONNECT)
> > >
> > > @@ -267,11 +488,22 @@ class Stream(object):
> > >          self.wait(poller, Stream.W_RECV)
> > >
> > >      def send_wait(self, poller):
> > > +        if sys.platform == 'win32':
> > > +            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
> > >          self.wait(poller, Stream.W_SEND)
> > >
> > >      def __del__(self):
> > >          # Don't delete the file: we might have forked.
> > > -        self.socket.close()
> > > +        if self.socket is not None:
> > > +            self.socket.close()
> > > +        if self.pipe is not None:
> > > +            # Check if there are any remaining valid handles and close 
> > > them
> > > +            if self.pipe:
> > > +                winutils.close_handle(self.pipe)
> > > +            if self._read.hEvent:
> > > +                winutils.close_handle(self._read.hEvent)
> > > +            if self._write.hEvent:
> > > +                winutils.close_handle(self._write.hEvent)
> > >
> > >      @staticmethod
> > >      def ssl_set_private_key_file(file_name):
> > > @@ -287,6 +519,10 @@ class Stream(object):
> > >
> > >
> > >  class PassiveStream(object):
> > > +    # Windows only
> > > +    connect = None                  # overlapped for read operation
> > > +    connect_pending = False
> > > +
> > >      @staticmethod
> > >      def is_valid_name(name):
> > >          """Returns True if 'name' is a passive stream name in the form
> > > @@ -294,9 +530,18 @@ class PassiveStream(object):
> > >          "punix:" or "ptcp"), otherwise False."""
> > >          return name.startswith("punix:") | name.startswith("ptcp:")
> > >
> > > -    def __init__(self, sock, name, bind_path):
> > > +    def __init__(self, sock, name, bind_path, pipe=None):
> > >          http://self.name = name
> > > +        self.pipe = pipe
> > >          self.socket = sock
> > > +        if pipe is not None:
> > > +            self.connect = pywintypes.OVERLAPPED()
> > > +            self.connect.hEvent = 
> > > winutils.get_new_event(bManualReset=True)
> > > +            self.connect_pending = False
> > > +            suffix = name.split(":", 1)[1]
> > > +            suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> > > +            self._pipename = winutils.get_pipe_name(suffix)
> > > +
> > >          self.bind_path = bind_path
> > >
> > >      @staticmethod
> > > @@ -315,11 +560,27 @@ class PassiveStream(object):
> > >          bind_path = name[6:]
> > >          if name.startswith("punix:"):
> > >              bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, 
> > > bind_path)
> > > -            error, sock = 
> > > ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
> > > -                                                           True, 
> > > bind_path,
> > > -                                                           None)
> > > -            if error:
> > > -                return error, None
> > > +            if sys.platform != 'win32':
> > > +                error, sock = ovs.socket_util.make_unix_socket(
> > > +                    socket.SOCK_STREAM, True, bind_path, None)
> > > +                if error:
> > > +                    return error, None
> > > +            else:
> > > +                # Branch used only on Windows
> > > +                try:
> > > +                    open(bind_path, 'w').close()
> > > +                except:
> > > +                    return errno.ENOENT, None
> > > +
> > > +                pipename = winutils.get_pipe_name(bind_path)
> > > +                if len(pipename) > 255:
> > > +                    # Return invalid argument if the name is too long
> > > +                    return errno.ENOENT, None
> > > +
> > > +                npipe = winutils.create_named_pipe(pipename)
> > > +                if not npipe:
> > > +                    return errno.ENOENT, None
> > > +                return 0, PassiveStream(None, name, bind_path, 
> > > pipe=npipe)
> > >
> > >          elif name.startswith("ptcp:"):
> > >              sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> > > @@ -341,7 +602,11 @@ class PassiveStream(object):
> > >
> > >      def close(self):
> > >          """Closes this PassiveStream."""
> > > -        self.socket.close()
> > > +        if self.socket is not None:
> > > +            self.socket.close()
> > > +        if self.pipe is not None:
> > > +            winutils.close_handle(self.pipe, vlog.warn)
> > > +            winutils.close_handle(self.connect.hEvent, vlog.warn)
> > >          if self.bind_path is not None:
> > >              ovs.fatal_signal.unlink_file_now(self.bind_path)
> > >              self.bind_path = None
> > > @@ -354,28 +619,80 @@ class PassiveStream(object):
> > >
> > >          Will not block waiting for a connection.  If no connection is 
> > > ready to
> > >          be accepted, returns (errno.EAGAIN, None) immediately."""
> > > -
> > > +        if sys.platform == 'win32' and self.socket is None:
> > > +            return self.__accept_windows()
> > >          while True:
> > >              try:
> > >                  sock, addr = self.socket.accept()
> > >                  ovs.socket_util.set_nonblocking(sock)
> > > -                if (sock.family == socket.AF_UNIX):
> > > +                if (sys.platform != 'win32' and sock.family == 
> > > socket.AF_UNIX):
> > >                      return 0, Stream(sock, "unix:%s" % addr, 0)
> > >                  return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
> > >                                                         str(addr[1])), 0)
> > >              except socket.error as e:
> > >                  error = ovs.socket_util.get_exception_errno(e)
> > > +                if sys.platform == 'win32' and error == 
> > > errno.WSAEWOULDBLOCK:
> > > +                    # WSAEWOULDBLOCK would be the equivalent on Windows
> > > +                    # for EAGAIN on Unix.
> > > +                    error = errno.EAGAIN
> > >                  if error != errno.EAGAIN:
> > >                      # XXX rate-limit
> > >                      vlog.dbg("accept: %s" % os.strerror(error))
> > >                  return error, None
> > >
> > > +    def __accept_windows(self):
> > > +        if self.connect_pending:
> > > +            try:
> > > +                winutils.get_overlapped_result(self.pipe, self.connect, 
> > > False)
> > > +            except pywintypes.error as e:
> > > +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> > > +                    # The operation is still pending, try again
> > > +                    self.connect_pending = True
> > > +                    return errno.EAGAIN, None
> > > +                else:
> > > +                    if self.pipe:
> > > +                        win32pipe.DisconnectNamedPipe(self.pipe)
> > > +                    return errno.EINVAL, None
> > > +            self.connect_pending = False
> > > +
> > > +        error = winutils.connect_named_pipe(self.pipe, self.connect)
> > > +        if error:
> > > +            if error == winutils.winerror.ERROR_IO_PENDING:
> > > +                self.connect_pending = True
> > > +                return errno.EAGAIN, None
> > > +            elif error != winutils.winerror.ERROR_PIPE_CONNECTED:
> > > +                if self.pipe:
> > > +                    win32pipe.DisconnectNamedPipe(self.pipe)
> > > +                self.connect_pending = False
> > > +                return errno.EINVAL, None
> > > +            else:
> > > +                win32event.SetEvent(self.connect.hEvent)
> > > +
> > > +        npipe = winutils.create_named_pipe(self._pipename)
> > > +        if not npipe:
> > > +            return errno.ENOENT, None
> > > +
> > > +        old_pipe = self.pipe
> > > +        self.pipe = npipe
> > > +        winutils.win32event.ResetEvent(self.connect.hEvent)
> > > +        return 0, Stream(None, http://self.name, 0, pipe=old_pipe)
> > > +
> > >      def wait(self, poller):
> > > -        poller.fd_wait(self.socket, ovs.poller.POLLIN)
> > > +        if sys.platform != 'win32' or self.socket is not None:
> > > +            poller.fd_wait(self.socket, ovs.poller.POLLIN)
> > > +        else:
> > > +            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
> > >
> > >      def __del__(self):
> > >          # Don't delete the file: we might have forked.
> > > -        self.socket.close()
> > > +        if self.socket is not None:
> > > +            self.socket.close()
> > > +        if self.pipe is not None:
> > > +            # Check if there are any remaining valid handles and close 
> > > them
> > > +            if self.pipe:
> > > +                winutils.close_handle(self.pipe)
> > > +            if self._connect.hEvent:
> > > +                winutils.close_handle(self._read.hEvent)
> > >
> > >
> > >  def usage(name):
> > > diff --git a/python/ovs/unixctl/server.py b/python/ovs/unixctl/server.py
> > > index 8595ed8..3f3e051 100644
> > > --- a/python/ovs/unixctl/server.py
> > > +++ b/python/ovs/unixctl/server.py
> > > @@ -148,6 +148,10 @@ class UnixctlServer(object):
> > >      def run(self):
> > >          for _ in range(10):
> > >              error, stream = self._listener.accept()
> > > +            if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK:
> > > +                # WSAEWOULDBLOCK would be the equivalent on Windows
> > > +                # for EAGAIN on Unix.
> > > +                error = errno.EAGAIN
> > >              if not error:
> > >                  rpc = ovs.jsonrpc.Connection(stream)
> > >                  self._conns.append(UnixctlConnection(rpc))
> > > diff --git a/tests/test-jsonrpc.py b/tests/test-jsonrpc.py
> > > index 18634e6..3eabcd7 100644
> > > --- a/tests/test-jsonrpc.py
> > > +++ b/tests/test-jsonrpc.py
> > > @@ -53,11 +53,17 @@ def handle_rpc(rpc, msg):
> > >
> > >
> > >  def do_listen(name):
> > > -    error, pstream = ovs.stream.PassiveStream.open(name)
> > > -    if error:
> > > -        sys.stderr.write("could not listen on \"%s\": %s\n"
> > > -                         % (name, os.strerror(error)))
> > > -        sys.exit(1)
> > > +    if sys.platform != 'win32' or (
> > > +            ovs.daemon._detach and ovs.daemon._detached):
> > > +        # On Windows the child is a new process created which should be 
> > > the
> > > +        # one that creates the PassiveStream. Without this check, the new
> > > +        # child process will create a new PassiveStream overwriting the 
> > > one
> > > +        # that the parent process created.
> > > +        error, pstream = ovs.stream.PassiveStream.open(name)
> > > +        if error:
> > > +            sys.stderr.write("could not listen on \"%s\": %s\n"
> > > +                             % (name, os.strerror(error)))
> > > +            sys.exit(1)
> > >
> > >      ovs.daemon.daemonize()
> > >
> > > --
> > > 2.10.0.windows.1
> > > _______________________________________________
> > > dev mailing list
> > > mailto:mailto:mailto:mailto:d...@openvswitch.org
> > > https://mail.openvswitch.org/mailman/listinfo/ovs-dev

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to