On 26 August 2016 at 07:40, Paul Boca <pb...@cloudbasesolutions.com> wrote:

> AF_UNIX sockets are not supported on Windows.
> Instead of an AF_UNIX socket use named pipes to communicate
> between components. This makes the python sockets compatible with
> the named pipe used in Windows applications.
> Added stream_windows.py with named pipe and localhost
> tcp connections support.
>
> Signed-off-by: Paul-Daniel Boca <pb...@cloudbasesolutions.com>
> ---
>  python/automake.mk           |   1 +
>  python/ovs/jsonrpc.py        |   9 +-
>  python/ovs/poller.py         |  49 +++-
>  python/ovs/socket_util.py    |  20 +-
>  python/ovs/stream_windows.py | 611 ++++++++++++++++++++++++++++++
> +++++++++++++
>  python/ovs/unixctl/client.py |   6 +-
>  python/ovs/unixctl/server.py |  11 +-
>  tests/test-jsonrpc.py        |  17 +-
>  tests/test-ovsdb.py          |   8 +-
>  9 files changed, 703 insertions(+), 29 deletions(-)
>  create mode 100644 python/ovs/stream_windows.py
>
> diff --git a/python/automake.mk b/python/automake.mk
> index 3fe9519..7bbf382 100644
> --- a/python/automake.mk
> +++ b/python/automake.mk
> @@ -27,6 +27,7 @@ ovs_pyfiles = \
>         python/ovs/process.py \
>         python/ovs/reconnect.py \
>         python/ovs/socket_util.py \
> +       python/ovs/stream_windows.py \
>         python/ovs/stream_unix.py \
>         python/ovs/timeval.py \
>         python/ovs/unixctl/__init__.py \
> diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py
> index 8ca01a0..d70f13e 100644
> --- a/python/ovs/jsonrpc.py
> +++ b/python/ovs/jsonrpc.py
> @@ -14,13 +14,17 @@
>
>  import errno
>  import os
> +import sys
>
>  import six
>
>  import ovs.json
>  import ovs.poller
>  import ovs.reconnect
> -import ovs.stream_unix as ovs_stream
> +if sys.platform == "win32":
> +    import ovs.stream_windows as ovs_stream
> +else:
> +    import ovs.stream_unix as ovs_stream
>  import ovs.timeval
>  import ovs.util
>  import ovs.vlog
> @@ -274,6 +278,9 @@ class Connection(object):
>                      except UnicodeError:
>                          error = errno.EILSEQ
>                  if error:
> +                    if (sys.platform == "win32"
> +                       and error == errno.WSAEWOULDBLOCK):
> +                        error = errno.EAGAIN
>                      if error == errno.EAGAIN:
>                          return error, None
>                      else:
> diff --git a/python/ovs/poller.py b/python/ovs/poller.py
> index de6bf22..970decc 100644
> --- a/python/ovs/poller.py
> +++ b/python/ovs/poller.py
> @@ -18,6 +18,7 @@ import ovs.vlog
>  import select
>  import socket
>  import os
> +import sys
>
>  try:
>      import eventlet.patcher
> @@ -54,7 +55,8 @@ class _SelectSelect(object):
>      def register(self, fd, events):
>          if isinstance(fd, socket.socket):
>              fd = fd.fileno()
> -        assert isinstance(fd, int)
> +        if not sys.platform == "win32":
> +            assert isinstance(fd, int)
>          if events & POLLIN:
>              self.rlist.append(fd)
>              events &= ~POLLIN
> @@ -75,18 +77,39 @@ class _SelectSelect(object):
>          if timeout == 0 and _using_eventlet_green_select():
>              timeout = 0.1
>
> -        rlist, wlist, xlist = select.select(self.rlist, self.wlist,
> self.xlist,
> -                                            timeout)
> -        events_dict = {}
> -        for fd in rlist:
> -            events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> -        for fd in wlist:
> -            events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> -        for fd in xlist:
> -            events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> -                                                        POLLHUP |
> -                                                        POLLNVAL)
> -        return list(events_dict.items())
> +        if sys.platform == "win32":
> +            import win32event
> +            import winerror
> +
> +            if timeout is None:
> +                timeout = 0xFFFFFFFF
> +            else:
> +                timeout = int(timeout * 1000)
> +
> +            events = self.rlist + self.wlist + self.xlist
> +            if not events:
> +                return list()
> +            error = win32event.WaitForMultipleObjectsEx(events, False,
> +                                                        timeout, False)
> +            if error == winerror.WAIT_TIMEOUT:
> +                return list()
> +
> +            return [(events[error], 0)]
> +        else:
> +            rlist, wlist, xlist = select.select(self.rlist,
> +                                                self.wlist,
> +                                                self.xlist,
> +                                                timeout)
> +            events_dict = {}
> +            for fd in rlist:
> +                events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> +            for fd in wlist:
> +                events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> +            for fd in xlist:
> +                events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> +                                                            POLLHUP |
> +                                                            POLLNVAL)
> +            return list(events_dict.items())
>
>
>  SelectPoll = _SelectSelect
> diff --git a/python/ovs/socket_util.py b/python/ovs/socket_util.py
> index b358b05..54f448d 100644
> --- a/python/ovs/socket_util.py
> +++ b/python/ovs/socket_util.py
> @@ -17,6 +17,7 @@ import os
>  import os.path
>  import random
>  import socket
> +import sys
>
>  import six
>  from six.moves import range
> @@ -25,6 +26,10 @@ import ovs.fatal_signal
>  import ovs.poller
>  import ovs.vlog
>
> +if sys.platform == "win32":
> +    import win32file
> +    import win32event
> +
>  vlog = ovs.vlog.Vlog("socket_util")
>
>
> @@ -158,7 +163,15 @@ def make_unix_socket(style, nonblock, bind_path,
> connect_path, short=False):
>
>  def check_connection_completion(sock):
>      p = ovs.poller.SelectPoll()
> -    p.register(sock, ovs.poller.POLLOUT)
> +    if sys.platform == "win32":
> +        event = win32event.CreateEvent(None, False, True, None)
> +        win32file.WSAEventSelect(sock, event,
> +                                win32file.FD_WRITE |
> +                                win32file.FD_CONNECT |
> +                                win32file.FD_CLOSE)
> +        p.register(event, ovs.poller.POLLOUT)
> +    else:
> +        p.register(sock, ovs.poller.POLLOUT)
>      pfds = p.poll(0)
>      if len(pfds) == 1:
>          revents = pfds[0][1]
> @@ -228,7 +241,10 @@ def inet_open_active(style, target, default_port,
> dscp):
>          try:
>              sock.connect(address)
>          except socket.error as e:
> -            if get_exception_errno(e) != errno.EINPROGRESS:
> +            error = get_exception_errno(e)
> +            if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK:
> +                error = errno.EINPROGRESS
> +            if error != errno.EINPROGRESS:
>                  raise
>          return 0, sock
>      except socket.error as e:
> diff --git a/python/ovs/stream_windows.py b/python/ovs/stream_windows.py
> new file mode 100644
> index 0000000..dd8d4ba
> --- /dev/null
> +++ b/python/ovs/stream_windows.py
> @@ -0,0 +1,611 @@
> +# Copyright (c) 2010, 2011, 2012 Nicira, Inc.
> +#
> +# Licensed under the Apache License, Version 2.0 (the "License");
> +# you may not use this file except in compliance with the License.
> +# You may obtain a copy of the License at:
> +#
> +#     http://www.apache.org/licenses/LICENSE-2.0
> +#
> +# Unless required by applicable law or agreed to in writing, software
> +# distributed under the License is distributed on an "AS IS" BASIS,
> +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> +# See the License for the specific language governing permissions and
> +# limitations under the License.
> +
> +import errno
> +import os
> +import socket
> +import sys
> +import six
> +
> +import ovs.poller
> +import ovs.socket_util
> +import ovs.vlog
> +
> +import pywintypes
> +import winerror
> +import win32pipe
> +import win32con
> +import win32security
> +import win32file
> +import win32event
> +
> +vlog = ovs.vlog.Vlog("stream")
> +
> +
> +def stream_or_pstream_needs_probes(name):
> +    """ 1 if the stream or pstream specified by 'name' needs periodic
> probes to
> +    verify connectivity.  For [p]streams which need probes, it can take a
> long
> +    time to notice the connection was dropped.  Returns 0 if probes aren't
> +    needed, and -1 if 'name' is invalid"""
> +
> +    if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
> +        # Only unix and punix are supported currently.
> +        return 0
> +    else:
> +        return -1
> +
> +
> +class Stream(object):
> +    """Bidirectional byte stream.  Currently only Unix domain sockets
> +    are implemented."""
> +
> +    # States.
> +    __S_CONNECTING = 0
> +    __S_CONNECTED = 1
> +    __S_DISCONNECTED = 2
> +
> +    # Kinds of events that one might wait for.
> +    W_CONNECT = 0               # Connect complete (success or failure).
> +    W_RECV = 1                  # Data received.
> +    W_SEND = 2                  # Send buffer room available.
> +
> +    _SOCKET_METHODS = {}
> +
> +    write = None                # overlapped for write operation
> +    read = None                 # overlapped for read operation
> +    write_pending = False
> +    read_pending = False
> +    retry_connect = False
> +
> +    @staticmethod
> +    def register_method(method, cls):
> +        Stream._SOCKET_METHODS[method + ":"] = cls
> +
> +    @staticmethod
> +    def _find_method(name):
> +        for method, cls in six.iteritems(Stream._SOCKET_METHODS):
> +            if name.startswith(method):
> +                return cls
> +        return None
> +
> +    @staticmethod
> +    def is_valid_name(name):
> +        """Returns True if 'name' is a stream name in the form
> "TYPE:ARGS" and
> +        TYPE is a supported stream type (currently only "unix:" and
> "tcp:"),
> +        otherwise False."""
> +        return bool(Stream._find_method(name))
> +
> +    def __init__(self, sock, name, status):
> +        if isinstance(sock, socket.socket):
> +            self.socket = sock
> +        else:
> +            self.pipe = sock
> +            self.read = pywintypes.OVERLAPPED()
> +            self.read.hEvent = win32event.CreateEvent(None, True, True,
> None)
> +            self.write = pywintypes.OVERLAPPED()
> +            self.write.hEvent = win32event.CreateEvent(None, True, True,
> None)
> +
> +        self.name = name
> +        if status == errno.EAGAIN:
> +            self.state = Stream.__S_CONNECTING
> +        elif status == 0:
> +            self.state = Stream.__S_CONNECTED
> +        else:
> +            self.state = Stream.__S_DISCONNECTED
> +
> +        self.error = 0
> +
> +    # Default value of dscp bits for connection between controller and
> manager.
> +    # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
> +    # in <netinet/ip.h> is used.
> +    IPTOS_PREC_INTERNETCONTROL = 0xc0
> +    DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
> +
> +    @staticmethod
> +    def open(name, dscp=DSCP_DEFAULT):
> +        """Attempts to connect a stream to a remote peer.  'name' is a
> +        connection name in the form "TYPE:ARGS", where TYPE is an active
> stream
> +        class's name and ARGS are stream class-specific.  Currently the
> only
> +        supported TYPEs are "unix" and "tcp".
> +
> +        Returns (error, stream): on success 'error' is 0 and 'stream' is
> the
> +        new Stream, on failure 'error' is a positive errno value and
> 'stream'
> +        is None.
> +
> +        Never returns errno.EAGAIN or errno.EINPROGRESS.  Instead,
> returns 0
> +        and a new Stream.  The connect() method can be used to check for
> +        successful connection completion."""
> +        cls = Stream._find_method(name)
> +        if not cls:
> +            return errno.EAFNOSUPPORT, None
> +
> +        suffix = name.split(":", 1)[1]
> +        if name.startswith("unix:"):
> +            suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> +            suffix = suffix.replace('/', '')
> +            suffix = suffix.replace('\\', '')
> +            suffix = "\\\\.\\pipe\\" + suffix
> +
> +            saAttr = win32security.SECURITY_ATTRIBUTES()
> +            saAttr.bInheritHandle = 1
> +            try:
> +                npipe = win32file.CreateFile(
> +                            suffix,
> +                            win32file.GENERIC_READ |
> win32file.GENERIC_WRITE,
> +                            0, None,
> +                            win32file.OPEN_EXISTING,
> +                            win32file.FILE_ATTRIBUTE_NORMAL |
> +                            win32file.FILE_FLAG_OVERLAPPED |
> +                            win32file.FILE_FLAG_NO_BUFFERING,
> +                            None)
> +            except pywintypes.error as e:
> +                return e.winerror, None
> +
> +            return 0, Stream(npipe, suffix, 0)
> +        else:
> +            error, sock = cls._open(suffix, dscp)
> +            if error:
> +                return error, None
> +            else:
> +                status = ovs.socket_util.check_
> connection_completion(sock)
> +                return 0, Stream(sock, name, status)
> +
> +    @staticmethod
> +    def _open(suffix, dscp):
> +        raise NotImplementedError("This method must be overrided by
> subclass")
> +
> +    @staticmethod
> +    def open_block(error_stream):
> +        """Blocks until a Stream completes its connection attempt, either
> +        succeeding or failing.  (error, stream) should be the tuple
> returned by
> +        Stream.open().  Returns a tuple of the same form.
> +
> +        Typical usage:
> +        error, stream = Stream.open_block(Stream.open(
> "unix:/tmp/socket"))"""
> +
> +        # Py3 doesn't support tuple parameter unpacking - PEP 3113
> +        error, stream = error_stream
> +        if not error:
> +            while True:
> +                error = stream.connect()
> +                if sys.platform == "win32" and error ==
> errno.WSAEWOULDBLOCK:
> +                    error = errno.EAGAIN
> +                if error != errno.EAGAIN:
> +                    break
> +                stream.run()
> +                poller = ovs.poller.Poller()
> +                stream.run_wait(poller)
> +                stream.connect_wait(poller)
> +                poller.block()
> +            assert error != errno.EINPROGRESS
> +
> +        if error and stream:
> +            stream.close()
> +            stream = None
> +        return error, stream
> +
> +    def close(self):
> +        if hasattr(self, "socket"):
> +            self.socket.close()
> +
> +    def __scs_connecting(self):
> +        if hasattr(self, "socket"):
> +            retval = ovs.socket_util.check_connection_completion(self.
> socket)
> +        elif self.retry_connect:
> +            saAttr = win32security.SECURITY_ATTRIBUTES()
> +            saAttr.bInheritHandle = 1
> +
> +            try:
> +                self.pipe = win32file.CreateFile(
> +                            self.name,
> +                            win32file.GENERIC_READ |
> win32file.GENERIC_WRITE,
> +                            0, None,
> +                            win32file.OPEN_EXISTING,
> +                            win32file.FILE_ATTRIBUTE_NORMAL |
> +                            win32file.FILE_FLAG_OVERLAPPED |
> +                            win32file.FILE_FLAG_NO_BUFFERING,
> +                            None)
> +            except pywintypes.error:
> +                retval = errno.EAGAIN
> +                self.retry_connect = True
> +
> +        assert retval != errno.EINPROGRESS
> +        if retval == 0:
> +            self.state = Stream.__S_CONNECTED
> +        elif retval != errno.EAGAIN:
> +            self.state = Stream.__S_DISCONNECTED
> +            self.error = retval
> +
> +    def connect(self):
> +        """Tries to complete the connection on this stream.  If the
> connection
> +        is complete, returns 0 if the connection was successful or a
> positive
> +        errno value if it failed.  If the connection is still in progress,
> +        returns errno.EAGAIN."""
> +        # raise
> +        if self.state == Stream.__S_CONNECTING:
> +            self.__scs_connecting()
> +
> +        if self.state == Stream.__S_CONNECTING:
> +            return errno.EAGAIN
> +        elif self.state == Stream.__S_CONNECTED:
> +            return 0
> +        else:
> +            assert self.state == Stream.__S_DISCONNECTED
> +            return self.error
> +
> +    def recv(self, n):
> +        """Tries to receive up to 'n' bytes from this stream.  Returns a
> +        (error, string) tuple:
> +
> +            - If successful, 'error' is zero and 'string' contains
> between 1
> +              and 'n' bytes of data.
> +
> +            - On error, 'error' is a positive errno value.
> +
> +            - If the connection has been closed in the normal fashion or
> if 'n'
> +              is 0, the tuple is (0, "").
> +
> +        The recv function will not block waiting for data to arrive.  If
> no
> +        data have been received, it returns (errno.EAGAIN, "")
> immediately."""
> +
> +        retval = self.connect()
> +        if retval != 0:
> +            return (retval, "")
> +        elif n == 0:
> +            return (0, "")
> +        if hasattr(self, "socket"):
> +            try:
> +                return (0, self.socket.recv(n))
> +            except socket.error as e:
> +                return (ovs.socket_util.get_exception_errno(e), "")
> +        else:
> +            if self.read_pending:
> +                try:
> +                    nBytesRead = win32file.GetOverlappedResult(self.pipe,
> +                                                        self.read,
> +                                                        False)
> +                    self.read_pending = False
> +                    recvBuffer = self.read_buffer[:nBytesRead]
> +                    if six.PY3:
> +                        return (0, bytes(recvBuffer).decode("utf-8"))
> +                    else:
> +                        return (0, str(recvBuffer))
> +                except pywintypes.error as e:
> +                    return (errno.EAGAIN, "")
> +
> +            try:
> +                (errCode, self.read_buffer) =
> win32file.ReadFile(self.pipe,
> +                                                             n,
> +                                                             self.read)
> +
> +                if errCode == winerror.ERROR_IO_PENDING:
> +                    self.read_pending = True
> +                    return (errno.EAGAIN, "")
> +                # elif errCode:
> +                    # return (errCode, "")
> +
> +                nBytesRead = win32file.GetOverlappedResult(self.pipe,
> +                                                        self.read,
> +                                                        False)
> +                win32event.SetEvent(self.read.hEvent)
> +                recvBuffer = self.read_buffer[:nBytesRead]
> +                if six.PY3:
> +                    return (0, bytes(recvBuffer).decode("utf-8"))
> +                else:
> +                    return (0, str(recvBuffer))
> +            except pywintypes.error as e:
> +                return (e.winerror, "")
> +
> +    def send(self, buf):
> +        """Tries to send 'buf' on this stream.
> +
> +        If successful, returns the number of bytes sent, between 1 and
> +        len(buf).  0 is only a valid return value if len(buf) is 0.
> +
> +        On error, returns a negative errno value.
> +
> +        Will not block.  If no bytes can be immediately accepted for
> +        transmission, returns -errno.EAGAIN immediately."""
> +
> +        retval = self.connect()
> +        if retval != 0:
> +            return -retval
> +        elif len(buf) == 0:
> +            return 0
> +
> +        if hasattr(self, "socket"):
> +            try:
> +                # Python 3 has separate types for strings and bytes.  We
> must
> +                # have bytes here.
> +                if six.PY3 and not isinstance(buf, six.binary_type):
> +                    buf = six.binary_type(buf, 'utf-8')
> +                return self.socket.send(buf)
> +            except socket.error as e:
> +                return -ovs.socket_util.get_exception_errno(e)
> +        else:
> +            if self.write_pending:
> +                try:
> +                    nBytesWritten = win32file.GetOverlappedResult(
> self.pipe,
> +                                                            self.write,
> +                                                            False)
> +                    self.write_pending = False
> +                    return nBytesWritten
> +                except pywintypes.error as e:
> +                    return -errno.EAGAIN
> +
> +            try:
> +                # Python 3 has separate types for strings and bytes.  We
> must
> +                # have bytes here.
> +                if not isinstance(buf, six.binary_type):
> +                    if six.PY3:
> +                        buf = six.binary_type(buf, 'utf-8')
> +                    else:
> +                        buf = six.binary_type(buf)
> +
> +                self.write_pending = False
> +                (errCode, nBytesWritten) = win32file.WriteFile(self.pipe,
> +                                                            buf,
> +                                                            self.write)
> +                if errCode == winerror.ERROR_IO_PENDING:
> +                    self.write_pending = True
> +                    return -errno.EAGAIN
> +                # elif errCode:
> +                    # return -errCode
> +
> +                nBytesWritten = win32file.GetOverlappedResult(self.pipe,
> +                                                            self.write,
> +                                                            False)
> +                win32event.SetEvent(self.write.hEvent)
> +
> +                return nBytesWritten
> +            except pywintypes.error as e:
> +                return -e.winerror
> +
> +    def run(self):
> +        pass
> +
> +    def run_wait(self, poller):
> +        pass
> +
> +    def wait(self, poller, wait):
> +        if hasattr(self, "socket"):
> +            import win32file
> +            import win32event
> +
> +            assert wait in (Stream.W_CONNECT, Stream.W_RECV,
> Stream.W_SEND)
> +
> +            if self.state == Stream.__S_DISCONNECTED:
> +                poller.immediate_wake()
> +                return
> +
> +            if self.state == Stream.__S_CONNECTING:
> +                wait = Stream.W_CONNECT
> +
> +            event = win32event.CreateEvent(None, True, True, None)
> +
> +            if wait == Stream.W_RECV:
> +                win32file.WSAEventSelect(self.socket, event,
> +                                        win32file.FD_READ |
> +                                        win32file.FD_ACCEPT |
> +                                        win32file.FD_CLOSE)
> +                poller.fd_wait(event, ovs.poller.POLLIN)
> +            else:
> +                win32file.WSAEventSelect(self.socket, event,
> +                                        win32file.FD_WRITE |
> +                                        win32file.FD_CONNECT |
> +                                        win32file.FD_CLOSE)
> +                poller.fd_wait(event, ovs.poller.POLLOUT)
> +        else:
> +            if wait == Stream.W_RECV:
> +                if self.read:
> +                    poller.fd_wait(self.read.hEvent, ovs.poller.POLLIN)
> +            else:
> +                if self.write:
> +                    poller.fd_wait(self.write.hEvent, ovs.poller.POLLOUT)
> +
> +    def connect_wait(self, poller):
> +        self.wait(poller, Stream.W_CONNECT)
> +
> +    def recv_wait(self, poller):
> +        self.wait(poller, Stream.W_RECV)
> +
> +    def send_wait(self, poller):
> +        poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
> +        self.wait(poller, Stream.W_SEND)
> +
> +    def __del__(self):
> +        # Don't delete the file: we might have forked.
> +        if hasattr(self, "socket"):
> +            self.socket.close()
> +        else:
> +            win32file.CloseHandle(self.pipe)
> +            self.pipe = None
> +
> +
> +class PassiveStream(object):
> +    connect = None                  # overlapped for read operation
> +    connect_pending = False
> +
> +    @staticmethod
> +    def is_valid_name(name):
> +        """Returns True if 'name' is a passive stream name in the form
> +        "TYPE:ARGS" and TYPE is a supported passive stream type (currently
> +        "punix:" or "ptcp"), otherwise False."""
> +        return name.startswith("punix:") | name.startswith("ptcp:")
> +
> +    def __init__(self, sock, name, bind_path):
> +        self.name = name
> +        if isinstance(sock, socket.socket):
> +            self.socket = sock
> +        else:
> +            self.pipe = sock
> +        self.bind_path = bind_path
> +
> +    @staticmethod
> +    def open(name):
> +        """Attempts to start listening for remote stream connections.
> 'name'
> +        is a connection name in the form "TYPE:ARGS", where TYPE is an
> passive
> +        stream class's name and ARGS are stream class-specific. Currently
> the
> +        supported values for TYPE are "punix" and "ptcp".
> +
> +        Returns (error, pstream): on success 'error' is 0 and 'pstream'
> is the
> +        new PassiveStream, on failure 'error' is a positive errno value
> and
> +        'pstream' is None."""
> +        # raise OSError
> +        suffix = name.split(":", 1)[1]
> +        if name.startswith("punix:"):
> +            suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> +            try:
> +                open(suffix, 'w').close()
> +            except:
> +                return errno.EAFNOSUPPORT, None
> +
> +            pipename = suffix.replace('/', '')
> +            pipename = pipename.replace('\\', '')
> +            pipename = "\\\\.\\pipe\\" + pipename
> +
> +            saAttr = win32security.SECURITY_ATTRIBUTES()
> +            saAttr.bInheritHandle = 1
> +
> +            npipe = win32pipe.CreateNamedPipe(
> +                        pipename,
> +                        win32con.PIPE_ACCESS_DUPLEX |
> +                        win32con.FILE_FLAG_OVERLAPPED,
> +                        win32con.PIPE_TYPE_MESSAGE |
> +                        win32con.PIPE_READMODE_BYTE |
> +                        win32con.PIPE_WAIT,
> +                        64, 65000, 65000, 0, saAttr
> +                        )
> +            return 0, PassiveStream(npipe, pipename, suffix)
> +        else:
> +            return errno.EAFNOSUPPORT, None
> +
> +    def close(self):
> +        """Closes this PassiveStream."""
> +        if hasattr(self, "socket"):
> +            self.socket.close()
> +        else:
> +            win32pipe.DisconnectNamedPipe(self.pipe)
> +        if self.bind_path is not None:
> +            ovs.fatal_signal.unlink_file_now(self.bind_path)
> +            self.bind_path = None
> +
> +    def accept(self):
> +        """Tries to accept a new connection on this passive stream.
> Returns
> +        (error, stream): if successful, 'error' is 0 and 'stream' is the
> new
> +        Stream object, and on failure 'error' is a positive errno value
> and
> +        'stream' is None.
> +
> +        Will not block waiting for a connection.  If no connection is
> ready to
> +        be accepted, returns (errno.EAGAIN, None) immediately."""
> +
> +        if hasattr(self, "socket"):
> +            while True:
> +                try:
> +                    sock, addr = self.socket.accept()
> +                    ovs.socket_util.set_nonblocking(sock)
> +                    if (sys.platform != "win32"
> +                       and sock.family == socket.AF_UNIX):
> +                        return 0, Stream(sock, "unix:%s" % addr, 0)
> +                    return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
> +                                                           str(addr[1])),
> 0)
> +                except socket.error as e:
> +                    error = ovs.socket_util.get_exception_errno(e)
> +                    if (sys.platform == "win32" and
> +                        error == errno.WSAEWOULDBLOCK):
> +                        error = errno.EAGAIN
> +                    if error != errno.EAGAIN:
> +                        # XXX rate-limit
> +                        vlog.dbg("accept: %s" % os.strerror(error))
> +                    return error, None
> +        else:
> +            if self.connect_pending:
> +                try:
> +                    win32file.GetOverlappedResult(self.pipe,
> self.connect)
> +                    self.connect_pending = False
> +                except pywintypes.error as e:
> +                    return (errno.EAGAIN, "")
> +                return 0, Stream(self.pipe, "", 0)
> +
> +            try:
> +                self.connect_pending = False
> +                self.connect = pywintypes.OVERLAPPED()
> +                self.connect.hEvent = win32event.CreateEvent(None, True,
> +                                                            True, None)
> +                error = win32pipe.ConnectNamedPipe(self.pipe,
> self.connect)
> +                if error == winerror.ERROR_IO_PENDING:
> +                    self.connect_pending = True
> +                    return errno.EAGAIN, None
> +
> +                stream = Stream(self.pipe, "", 0)
> +
> +                saAttr = win32security.SECURITY_ATTRIBUTES()
> +                saAttr.bInheritHandle = 1
> +                self.pipe = win32pipe.CreateNamedPipe(
> +                        self.name,
> +                        win32con.PIPE_ACCESS_DUPLEX |
> +                        win32con.FILE_FLAG_OVERLAPPED,
> +                        win32con.PIPE_TYPE_MESSAGE |
> +                        win32con.PIPE_READMODE_BYTE |
> +                        win32con.PIPE_WAIT,
> +                        64, 65000, 65000, 0, saAttr
> +                        )
> +
> +                return 0, stream
> +            except pywintypes.error as e:
> +                return errno.EAGAIN, None
> +
> +    def wait(self, poller):
> +        if hasattr(self, "socket"):
> +            poller.fd_wait(self.socket, ovs.poller.POLLIN)
> +        else:
> +            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
> +
> +    def __del__(self):
> +        # Don't delete the file: we might have forked.
> +        if hasattr(self, "socket"):
> +            self.socket.close()
> +        else:
> +            win32file.CloseHandle(self.pipe)
> +            self.pipe = None
> +
> +
> +def usage(name):
> +    return """
> +Active %s connection methods:
> +  unix:FILE               Unix domain socket named FILE
> +  tcp:IP:PORT             TCP socket to IP with port no of PORT
> +
> +Passive %s connection methods:
> +  punix:FILE              Listen on Unix domain socket FILE""" % (name,
> name)
> +
> +
> +class UnixStream(Stream):
> +    @staticmethod
> +    def _open(suffix, dscp):
> +        connect_path = suffix
> +        return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
> +                                                True, None, connect_path)
> +Stream.register_method("unix", UnixStream)
> +
> +
> +class TCPStream(Stream):
> +    @staticmethod
> +    def _open(suffix, dscp):
> +        error, sock = ovs.socket_util.inet_open_
> active(socket.SOCK_STREAM,
> +                                                       suffix, 0, dscp)
> +        if not error:
> +            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
> +        return error, sock
> +Stream.register_method("tcp", TCPStream)
> diff --git a/python/ovs/unixctl/client.py b/python/ovs/unixctl/client.py
> index fde674e..ede4855 100644
> --- a/python/ovs/unixctl/client.py
> +++ b/python/ovs/unixctl/client.py
> @@ -13,12 +13,16 @@
>  # limitations under the License.
>
>  import os
> +import sys
>
>  import six
>
>  import ovs.jsonrpc
> -import ovs.stream_unix as ovs_stream
>  import ovs.util
> +if sys.platform == "win32":
> +    import ovs.stream_windows as ovs_stream
> +else:
> +    import ovs.stream_unix as ovs_stream
>
>
>  vlog = ovs.vlog.Vlog("unixctl_client")
> diff --git a/python/ovs/unixctl/server.py b/python/ovs/unixctl/server.py
> index 50a11d4..d457a2c 100644
> --- a/python/ovs/unixctl/server.py
> +++ b/python/ovs/unixctl/server.py
> @@ -22,7 +22,10 @@ from six.moves import range
>
>  import ovs.dirs
>  import ovs.jsonrpc
> -import ovs.stream_unix as ovs_stream
> +if sys.platform == "win32":
> +    import ovs.stream_windows as ovs_stream
> +else:
> +    import ovs.stream_unix as ovs_stream
>
 import ovs.unixctl
>  import ovs.util
>  import ovs.version
> @@ -148,6 +151,8 @@ class UnixctlServer(object):
>      def run(self):
>          for _ in range(10):
>              error, stream = self._listener.accept()
> +            if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK:
> +                error = errno.EAGAIN
>              if not error:
>                  rpc = ovs.jsonrpc.Connection(stream)
>                  self._conns.append(UnixctlConnection(rpc))
> @@ -155,8 +160,8 @@ class UnixctlServer(object):
>                  break
>              else:
>                  # XXX: rate-limit
> -                vlog.warn("%s: accept failed: %s" % (self._listener.name,
> -                                                     os.strerror(error)))
> +                vlog.warn("%s: accept failed: %s %d"
> +                          % (self._listener.name, os.strerror(error),
> error))
>
>          for conn in copy.copy(self._conns):
>              error = conn.run()
> diff --git a/tests/test-jsonrpc.py b/tests/test-jsonrpc.py
> index 18634e6..8d9010d 100644
> --- a/tests/test-jsonrpc.py
> +++ b/tests/test-jsonrpc.py
> @@ -23,7 +23,10 @@ import ovs.daemon
>  import ovs.json
>  import ovs.jsonrpc
>  import ovs.poller
> -import ovs.stream
>
Doesn't the above need to be changed in the previous patch?
Since Alin implemented the named pipes, I will let him look at it first.



> +if sys.platform == "win32":
> +    import ovs.stream_windows as ovs_stream
> +else:
> +    import ovs.stream_unix as ovs_stream
>
>

>
>  def handle_rpc(rpc, msg):
> @@ -53,14 +56,14 @@ def handle_rpc(rpc, msg):
>
>
>  def do_listen(name):
> -    error, pstream = ovs.stream.PassiveStream.open(name)
> +    ovs.daemon.daemonize()
> +
> +    error, pstream = ovs_stream.PassiveStream.open(name)
>      if error:
>          sys.stderr.write("could not listen on \"%s\": %s\n"
>                           % (name, os.strerror(error)))
>          sys.exit(1)
>
> -    ovs.daemon.daemonize()
> -
>      rpcs = []
>      done = False
>      while True:
> @@ -111,7 +114,7 @@ def do_request(name, method, params_string):
>          sys.stderr.write("not a valid JSON-RPC request: %s\n" % s)
>          sys.exit(1)
>
> -    error, stream = ovs.stream.Stream.open_block(
> ovs.stream.Stream.open(name))
> +    error, stream = ovs_stream.Stream.open_block(
> ovs_stream.Stream.open(name))
>      if error:
>          sys.stderr.write("could not open \"%s\": %s\n"
>                           % (name, os.strerror(error)))
> @@ -142,7 +145,7 @@ def do_notify(name, method, params_string):
>          sys.stderr.write("not a valid JSON-RPC notification: %s\n" % s)
>          sys.exit(1)
>
> -    error, stream = ovs.stream.Stream.open_block(
> ovs.stream.Stream.open(name))
> +    error, stream = ovs_stream.Stream.open_block(
> ovs_stream.Stream.open(name))
>      if error:
>          sys.stderr.write("could not open \"%s\": %s\n"
>                           % (name, os.strerror(error)))
> @@ -174,7 +177,7 @@ def main(argv):
>  listen LOCAL             listen for connections on LOCAL
>  request REMOTE METHOD PARAMS   send request, print reply
>  notify REMOTE METHOD PARAMS  send notification and exit
> -""" + ovs.stream.usage("JSON-RPC")
> +""" + ovs_stream.usage("JSON-RPC")
>
>      group = parser.add_argument_group(title="Commands",
>                                        description=group_description)
> diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py
> index e4e3395..9f6ef49 100644
> --- a/tests/test-ovsdb.py
> +++ b/tests/test-ovsdb.py
> @@ -30,6 +30,10 @@ import ovs.poller
>  import ovs.util
>  from ovs.fatal_signal import signal_alarm
>  import six
> +if sys.platform == "win32":
> +    import ovs.stream_windows as ovs_stream
> +else:
> +    import ovs.stream_unix as ovs_stream
>
>
>  def unbox_json(json):
> @@ -534,8 +538,8 @@ def do_idl(schema_file, remote, *commands):
>      idl = ovs.db.idl.Idl(remote, schema_helper)
>
>      if commands:
> -        error, stream = ovs.stream.Stream.open_block(
> -            ovs.stream.Stream.open(remote))
> +        error, stream = ovs_stream.Stream.open_block(
> +            ovs_stream.Stream.open(remote))
>          if error:
>              sys.stderr.write("failed to connect to \"%s\"" % remote)
>              sys.exit(1)
> --
> 2.7.2.windows.1
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev
>
_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to