On 26 August 2016 at 07:40, Paul Boca <pb...@cloudbasesolutions.com> wrote:
> AF_UNIX sockets are not supported on Windows. > Instead of an AF_UNIX socket use named pipes to communicate > between components. This makes the python sockets compatible with > the named pipe used in Windows applications. > Added stream_windows.py with named pipe and localhost > tcp connections support. > > Signed-off-by: Paul-Daniel Boca <pb...@cloudbasesolutions.com> > --- > python/automake.mk | 1 + > python/ovs/jsonrpc.py | 9 +- > python/ovs/poller.py | 49 +++- > python/ovs/socket_util.py | 20 +- > python/ovs/stream_windows.py | 611 ++++++++++++++++++++++++++++++ > +++++++++++++ > python/ovs/unixctl/client.py | 6 +- > python/ovs/unixctl/server.py | 11 +- > tests/test-jsonrpc.py | 17 +- > tests/test-ovsdb.py | 8 +- > 9 files changed, 703 insertions(+), 29 deletions(-) > create mode 100644 python/ovs/stream_windows.py > > diff --git a/python/automake.mk b/python/automake.mk > index 3fe9519..7bbf382 100644 > --- a/python/automake.mk > +++ b/python/automake.mk > @@ -27,6 +27,7 @@ ovs_pyfiles = \ > python/ovs/process.py \ > python/ovs/reconnect.py \ > python/ovs/socket_util.py \ > + python/ovs/stream_windows.py \ > python/ovs/stream_unix.py \ > python/ovs/timeval.py \ > python/ovs/unixctl/__init__.py \ > diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py > index 8ca01a0..d70f13e 100644 > --- a/python/ovs/jsonrpc.py > +++ b/python/ovs/jsonrpc.py > @@ -14,13 +14,17 @@ > > import errno > import os > +import sys > > import six > > import ovs.json > import ovs.poller > import ovs.reconnect > -import ovs.stream_unix as ovs_stream > +if sys.platform == "win32": > + import ovs.stream_windows as ovs_stream > +else: > + import ovs.stream_unix as ovs_stream > import ovs.timeval > import ovs.util > import ovs.vlog > @@ -274,6 +278,9 @@ class Connection(object): > except UnicodeError: > error = errno.EILSEQ > if error: > + if (sys.platform == "win32" > + and error == errno.WSAEWOULDBLOCK): > + error = errno.EAGAIN > if error == errno.EAGAIN: > return error, None > else: > diff --git a/python/ovs/poller.py b/python/ovs/poller.py > index de6bf22..970decc 100644 > --- a/python/ovs/poller.py > +++ b/python/ovs/poller.py > @@ -18,6 +18,7 @@ import ovs.vlog > import select > import socket > import os > +import sys > > try: > import eventlet.patcher > @@ -54,7 +55,8 @@ class _SelectSelect(object): > def register(self, fd, events): > if isinstance(fd, socket.socket): > fd = fd.fileno() > - assert isinstance(fd, int) > + if not sys.platform == "win32": > + assert isinstance(fd, int) > if events & POLLIN: > self.rlist.append(fd) > events &= ~POLLIN > @@ -75,18 +77,39 @@ class _SelectSelect(object): > if timeout == 0 and _using_eventlet_green_select(): > timeout = 0.1 > > - rlist, wlist, xlist = select.select(self.rlist, self.wlist, > self.xlist, > - timeout) > - events_dict = {} > - for fd in rlist: > - events_dict[fd] = events_dict.get(fd, 0) | POLLIN > - for fd in wlist: > - events_dict[fd] = events_dict.get(fd, 0) | POLLOUT > - for fd in xlist: > - events_dict[fd] = events_dict.get(fd, 0) | (POLLERR | > - POLLHUP | > - POLLNVAL) > - return list(events_dict.items()) > + if sys.platform == "win32": > + import win32event > + import winerror > + > + if timeout is None: > + timeout = 0xFFFFFFFF > + else: > + timeout = int(timeout * 1000) > + > + events = self.rlist + self.wlist + self.xlist > + if not events: > + return list() > + error = win32event.WaitForMultipleObjectsEx(events, False, > + timeout, False) > + if error == winerror.WAIT_TIMEOUT: > + return list() > + > + return [(events[error], 0)] > + else: > + rlist, wlist, xlist = select.select(self.rlist, > + self.wlist, > + self.xlist, > + timeout) > + events_dict = {} > + for fd in rlist: > + events_dict[fd] = events_dict.get(fd, 0) | POLLIN > + for fd in wlist: > + events_dict[fd] = events_dict.get(fd, 0) | POLLOUT > + for fd in xlist: > + events_dict[fd] = events_dict.get(fd, 0) | (POLLERR | > + POLLHUP | > + POLLNVAL) > + return list(events_dict.items()) > > > SelectPoll = _SelectSelect > diff --git a/python/ovs/socket_util.py b/python/ovs/socket_util.py > index b358b05..54f448d 100644 > --- a/python/ovs/socket_util.py > +++ b/python/ovs/socket_util.py > @@ -17,6 +17,7 @@ import os > import os.path > import random > import socket > +import sys > > import six > from six.moves import range > @@ -25,6 +26,10 @@ import ovs.fatal_signal > import ovs.poller > import ovs.vlog > > +if sys.platform == "win32": > + import win32file > + import win32event > + > vlog = ovs.vlog.Vlog("socket_util") > > > @@ -158,7 +163,15 @@ def make_unix_socket(style, nonblock, bind_path, > connect_path, short=False): > > def check_connection_completion(sock): > p = ovs.poller.SelectPoll() > - p.register(sock, ovs.poller.POLLOUT) > + if sys.platform == "win32": > + event = win32event.CreateEvent(None, False, True, None) > + win32file.WSAEventSelect(sock, event, > + win32file.FD_WRITE | > + win32file.FD_CONNECT | > + win32file.FD_CLOSE) > + p.register(event, ovs.poller.POLLOUT) > + else: > + p.register(sock, ovs.poller.POLLOUT) > pfds = p.poll(0) > if len(pfds) == 1: > revents = pfds[0][1] > @@ -228,7 +241,10 @@ def inet_open_active(style, target, default_port, > dscp): > try: > sock.connect(address) > except socket.error as e: > - if get_exception_errno(e) != errno.EINPROGRESS: > + error = get_exception_errno(e) > + if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK: > + error = errno.EINPROGRESS > + if error != errno.EINPROGRESS: > raise > return 0, sock > except socket.error as e: > diff --git a/python/ovs/stream_windows.py b/python/ovs/stream_windows.py > new file mode 100644 > index 0000000..dd8d4ba > --- /dev/null > +++ b/python/ovs/stream_windows.py > @@ -0,0 +1,611 @@ > +# Copyright (c) 2010, 2011, 2012 Nicira, Inc. > +# > +# Licensed under the Apache License, Version 2.0 (the "License"); > +# you may not use this file except in compliance with the License. > +# You may obtain a copy of the License at: > +# > +# http://www.apache.org/licenses/LICENSE-2.0 > +# > +# Unless required by applicable law or agreed to in writing, software > +# distributed under the License is distributed on an "AS IS" BASIS, > +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > +# See the License for the specific language governing permissions and > +# limitations under the License. > + > +import errno > +import os > +import socket > +import sys > +import six > + > +import ovs.poller > +import ovs.socket_util > +import ovs.vlog > + > +import pywintypes > +import winerror > +import win32pipe > +import win32con > +import win32security > +import win32file > +import win32event > + > +vlog = ovs.vlog.Vlog("stream") > + > + > +def stream_or_pstream_needs_probes(name): > + """ 1 if the stream or pstream specified by 'name' needs periodic > probes to > + verify connectivity. For [p]streams which need probes, it can take a > long > + time to notice the connection was dropped. Returns 0 if probes aren't > + needed, and -1 if 'name' is invalid""" > + > + if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name): > + # Only unix and punix are supported currently. > + return 0 > + else: > + return -1 > + > + > +class Stream(object): > + """Bidirectional byte stream. Currently only Unix domain sockets > + are implemented.""" > + > + # States. > + __S_CONNECTING = 0 > + __S_CONNECTED = 1 > + __S_DISCONNECTED = 2 > + > + # Kinds of events that one might wait for. > + W_CONNECT = 0 # Connect complete (success or failure). > + W_RECV = 1 # Data received. > + W_SEND = 2 # Send buffer room available. > + > + _SOCKET_METHODS = {} > + > + write = None # overlapped for write operation > + read = None # overlapped for read operation > + write_pending = False > + read_pending = False > + retry_connect = False > + > + @staticmethod > + def register_method(method, cls): > + Stream._SOCKET_METHODS[method + ":"] = cls > + > + @staticmethod > + def _find_method(name): > + for method, cls in six.iteritems(Stream._SOCKET_METHODS): > + if name.startswith(method): > + return cls > + return None > + > + @staticmethod > + def is_valid_name(name): > + """Returns True if 'name' is a stream name in the form > "TYPE:ARGS" and > + TYPE is a supported stream type (currently only "unix:" and > "tcp:"), > + otherwise False.""" > + return bool(Stream._find_method(name)) > + > + def __init__(self, sock, name, status): > + if isinstance(sock, socket.socket): > + self.socket = sock > + else: > + self.pipe = sock > + self.read = pywintypes.OVERLAPPED() > + self.read.hEvent = win32event.CreateEvent(None, True, True, > None) > + self.write = pywintypes.OVERLAPPED() > + self.write.hEvent = win32event.CreateEvent(None, True, True, > None) > + > + self.name = name > + if status == errno.EAGAIN: > + self.state = Stream.__S_CONNECTING > + elif status == 0: > + self.state = Stream.__S_CONNECTED > + else: > + self.state = Stream.__S_DISCONNECTED > + > + self.error = 0 > + > + # Default value of dscp bits for connection between controller and > manager. > + # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined > + # in <netinet/ip.h> is used. > + IPTOS_PREC_INTERNETCONTROL = 0xc0 > + DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2 > + > + @staticmethod > + def open(name, dscp=DSCP_DEFAULT): > + """Attempts to connect a stream to a remote peer. 'name' is a > + connection name in the form "TYPE:ARGS", where TYPE is an active > stream > + class's name and ARGS are stream class-specific. Currently the > only > + supported TYPEs are "unix" and "tcp". > + > + Returns (error, stream): on success 'error' is 0 and 'stream' is > the > + new Stream, on failure 'error' is a positive errno value and > 'stream' > + is None. > + > + Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, > returns 0 > + and a new Stream. The connect() method can be used to check for > + successful connection completion.""" > + cls = Stream._find_method(name) > + if not cls: > + return errno.EAFNOSUPPORT, None > + > + suffix = name.split(":", 1)[1] > + if name.startswith("unix:"): > + suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) > + suffix = suffix.replace('/', '') > + suffix = suffix.replace('\\', '') > + suffix = "\\\\.\\pipe\\" + suffix > + > + saAttr = win32security.SECURITY_ATTRIBUTES() > + saAttr.bInheritHandle = 1 > + try: > + npipe = win32file.CreateFile( > + suffix, > + win32file.GENERIC_READ | > win32file.GENERIC_WRITE, > + 0, None, > + win32file.OPEN_EXISTING, > + win32file.FILE_ATTRIBUTE_NORMAL | > + win32file.FILE_FLAG_OVERLAPPED | > + win32file.FILE_FLAG_NO_BUFFERING, > + None) > + except pywintypes.error as e: > + return e.winerror, None > + > + return 0, Stream(npipe, suffix, 0) > + else: > + error, sock = cls._open(suffix, dscp) > + if error: > + return error, None > + else: > + status = ovs.socket_util.check_ > connection_completion(sock) > + return 0, Stream(sock, name, status) > + > + @staticmethod > + def _open(suffix, dscp): > + raise NotImplementedError("This method must be overrided by > subclass") > + > + @staticmethod > + def open_block(error_stream): > + """Blocks until a Stream completes its connection attempt, either > + succeeding or failing. (error, stream) should be the tuple > returned by > + Stream.open(). Returns a tuple of the same form. > + > + Typical usage: > + error, stream = Stream.open_block(Stream.open( > "unix:/tmp/socket"))""" > + > + # Py3 doesn't support tuple parameter unpacking - PEP 3113 > + error, stream = error_stream > + if not error: > + while True: > + error = stream.connect() > + if sys.platform == "win32" and error == > errno.WSAEWOULDBLOCK: > + error = errno.EAGAIN > + if error != errno.EAGAIN: > + break > + stream.run() > + poller = ovs.poller.Poller() > + stream.run_wait(poller) > + stream.connect_wait(poller) > + poller.block() > + assert error != errno.EINPROGRESS > + > + if error and stream: > + stream.close() > + stream = None > + return error, stream > + > + def close(self): > + if hasattr(self, "socket"): > + self.socket.close() > + > + def __scs_connecting(self): > + if hasattr(self, "socket"): > + retval = ovs.socket_util.check_connection_completion(self. > socket) > + elif self.retry_connect: > + saAttr = win32security.SECURITY_ATTRIBUTES() > + saAttr.bInheritHandle = 1 > + > + try: > + self.pipe = win32file.CreateFile( > + self.name, > + win32file.GENERIC_READ | > win32file.GENERIC_WRITE, > + 0, None, > + win32file.OPEN_EXISTING, > + win32file.FILE_ATTRIBUTE_NORMAL | > + win32file.FILE_FLAG_OVERLAPPED | > + win32file.FILE_FLAG_NO_BUFFERING, > + None) > + except pywintypes.error: > + retval = errno.EAGAIN > + self.retry_connect = True > + > + assert retval != errno.EINPROGRESS > + if retval == 0: > + self.state = Stream.__S_CONNECTED > + elif retval != errno.EAGAIN: > + self.state = Stream.__S_DISCONNECTED > + self.error = retval > + > + def connect(self): > + """Tries to complete the connection on this stream. If the > connection > + is complete, returns 0 if the connection was successful or a > positive > + errno value if it failed. If the connection is still in progress, > + returns errno.EAGAIN.""" > + # raise > + if self.state == Stream.__S_CONNECTING: > + self.__scs_connecting() > + > + if self.state == Stream.__S_CONNECTING: > + return errno.EAGAIN > + elif self.state == Stream.__S_CONNECTED: > + return 0 > + else: > + assert self.state == Stream.__S_DISCONNECTED > + return self.error > + > + def recv(self, n): > + """Tries to receive up to 'n' bytes from this stream. Returns a > + (error, string) tuple: > + > + - If successful, 'error' is zero and 'string' contains > between 1 > + and 'n' bytes of data. > + > + - On error, 'error' is a positive errno value. > + > + - If the connection has been closed in the normal fashion or > if 'n' > + is 0, the tuple is (0, ""). > + > + The recv function will not block waiting for data to arrive. If > no > + data have been received, it returns (errno.EAGAIN, "") > immediately.""" > + > + retval = self.connect() > + if retval != 0: > + return (retval, "") > + elif n == 0: > + return (0, "") > + if hasattr(self, "socket"): > + try: > + return (0, self.socket.recv(n)) > + except socket.error as e: > + return (ovs.socket_util.get_exception_errno(e), "") > + else: > + if self.read_pending: > + try: > + nBytesRead = win32file.GetOverlappedResult(self.pipe, > + self.read, > + False) > + self.read_pending = False > + recvBuffer = self.read_buffer[:nBytesRead] > + if six.PY3: > + return (0, bytes(recvBuffer).decode("utf-8")) > + else: > + return (0, str(recvBuffer)) > + except pywintypes.error as e: > + return (errno.EAGAIN, "") > + > + try: > + (errCode, self.read_buffer) = > win32file.ReadFile(self.pipe, > + n, > + self.read) > + > + if errCode == winerror.ERROR_IO_PENDING: > + self.read_pending = True > + return (errno.EAGAIN, "") > + # elif errCode: > + # return (errCode, "") > + > + nBytesRead = win32file.GetOverlappedResult(self.pipe, > + self.read, > + False) > + win32event.SetEvent(self.read.hEvent) > + recvBuffer = self.read_buffer[:nBytesRead] > + if six.PY3: > + return (0, bytes(recvBuffer).decode("utf-8")) > + else: > + return (0, str(recvBuffer)) > + except pywintypes.error as e: > + return (e.winerror, "") > + > + def send(self, buf): > + """Tries to send 'buf' on this stream. > + > + If successful, returns the number of bytes sent, between 1 and > + len(buf). 0 is only a valid return value if len(buf) is 0. > + > + On error, returns a negative errno value. > + > + Will not block. If no bytes can be immediately accepted for > + transmission, returns -errno.EAGAIN immediately.""" > + > + retval = self.connect() > + if retval != 0: > + return -retval > + elif len(buf) == 0: > + return 0 > + > + if hasattr(self, "socket"): > + try: > + # Python 3 has separate types for strings and bytes. We > must > + # have bytes here. > + if six.PY3 and not isinstance(buf, six.binary_type): > + buf = six.binary_type(buf, 'utf-8') > + return self.socket.send(buf) > + except socket.error as e: > + return -ovs.socket_util.get_exception_errno(e) > + else: > + if self.write_pending: > + try: > + nBytesWritten = win32file.GetOverlappedResult( > self.pipe, > + self.write, > + False) > + self.write_pending = False > + return nBytesWritten > + except pywintypes.error as e: > + return -errno.EAGAIN > + > + try: > + # Python 3 has separate types for strings and bytes. We > must > + # have bytes here. > + if not isinstance(buf, six.binary_type): > + if six.PY3: > + buf = six.binary_type(buf, 'utf-8') > + else: > + buf = six.binary_type(buf) > + > + self.write_pending = False > + (errCode, nBytesWritten) = win32file.WriteFile(self.pipe, > + buf, > + self.write) > + if errCode == winerror.ERROR_IO_PENDING: > + self.write_pending = True > + return -errno.EAGAIN > + # elif errCode: > + # return -errCode > + > + nBytesWritten = win32file.GetOverlappedResult(self.pipe, > + self.write, > + False) > + win32event.SetEvent(self.write.hEvent) > + > + return nBytesWritten > + except pywintypes.error as e: > + return -e.winerror > + > + def run(self): > + pass > + > + def run_wait(self, poller): > + pass > + > + def wait(self, poller, wait): > + if hasattr(self, "socket"): > + import win32file > + import win32event > + > + assert wait in (Stream.W_CONNECT, Stream.W_RECV, > Stream.W_SEND) > + > + if self.state == Stream.__S_DISCONNECTED: > + poller.immediate_wake() > + return > + > + if self.state == Stream.__S_CONNECTING: > + wait = Stream.W_CONNECT > + > + event = win32event.CreateEvent(None, True, True, None) > + > + if wait == Stream.W_RECV: > + win32file.WSAEventSelect(self.socket, event, > + win32file.FD_READ | > + win32file.FD_ACCEPT | > + win32file.FD_CLOSE) > + poller.fd_wait(event, ovs.poller.POLLIN) > + else: > + win32file.WSAEventSelect(self.socket, event, > + win32file.FD_WRITE | > + win32file.FD_CONNECT | > + win32file.FD_CLOSE) > + poller.fd_wait(event, ovs.poller.POLLOUT) > + else: > + if wait == Stream.W_RECV: > + if self.read: > + poller.fd_wait(self.read.hEvent, ovs.poller.POLLIN) > + else: > + if self.write: > + poller.fd_wait(self.write.hEvent, ovs.poller.POLLOUT) > + > + def connect_wait(self, poller): > + self.wait(poller, Stream.W_CONNECT) > + > + def recv_wait(self, poller): > + self.wait(poller, Stream.W_RECV) > + > + def send_wait(self, poller): > + poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN) > + self.wait(poller, Stream.W_SEND) > + > + def __del__(self): > + # Don't delete the file: we might have forked. > + if hasattr(self, "socket"): > + self.socket.close() > + else: > + win32file.CloseHandle(self.pipe) > + self.pipe = None > + > + > +class PassiveStream(object): > + connect = None # overlapped for read operation > + connect_pending = False > + > + @staticmethod > + def is_valid_name(name): > + """Returns True if 'name' is a passive stream name in the form > + "TYPE:ARGS" and TYPE is a supported passive stream type (currently > + "punix:" or "ptcp"), otherwise False.""" > + return name.startswith("punix:") | name.startswith("ptcp:") > + > + def __init__(self, sock, name, bind_path): > + self.name = name > + if isinstance(sock, socket.socket): > + self.socket = sock > + else: > + self.pipe = sock > + self.bind_path = bind_path > + > + @staticmethod > + def open(name): > + """Attempts to start listening for remote stream connections. > 'name' > + is a connection name in the form "TYPE:ARGS", where TYPE is an > passive > + stream class's name and ARGS are stream class-specific. Currently > the > + supported values for TYPE are "punix" and "ptcp". > + > + Returns (error, pstream): on success 'error' is 0 and 'pstream' > is the > + new PassiveStream, on failure 'error' is a positive errno value > and > + 'pstream' is None.""" > + # raise OSError > + suffix = name.split(":", 1)[1] > + if name.startswith("punix:"): > + suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) > + try: > + open(suffix, 'w').close() > + except: > + return errno.EAFNOSUPPORT, None > + > + pipename = suffix.replace('/', '') > + pipename = pipename.replace('\\', '') > + pipename = "\\\\.\\pipe\\" + pipename > + > + saAttr = win32security.SECURITY_ATTRIBUTES() > + saAttr.bInheritHandle = 1 > + > + npipe = win32pipe.CreateNamedPipe( > + pipename, > + win32con.PIPE_ACCESS_DUPLEX | > + win32con.FILE_FLAG_OVERLAPPED, > + win32con.PIPE_TYPE_MESSAGE | > + win32con.PIPE_READMODE_BYTE | > + win32con.PIPE_WAIT, > + 64, 65000, 65000, 0, saAttr > + ) > + return 0, PassiveStream(npipe, pipename, suffix) > + else: > + return errno.EAFNOSUPPORT, None > + > + def close(self): > + """Closes this PassiveStream.""" > + if hasattr(self, "socket"): > + self.socket.close() > + else: > + win32pipe.DisconnectNamedPipe(self.pipe) > + if self.bind_path is not None: > + ovs.fatal_signal.unlink_file_now(self.bind_path) > + self.bind_path = None > + > + def accept(self): > + """Tries to accept a new connection on this passive stream. > Returns > + (error, stream): if successful, 'error' is 0 and 'stream' is the > new > + Stream object, and on failure 'error' is a positive errno value > and > + 'stream' is None. > + > + Will not block waiting for a connection. If no connection is > ready to > + be accepted, returns (errno.EAGAIN, None) immediately.""" > + > + if hasattr(self, "socket"): > + while True: > + try: > + sock, addr = self.socket.accept() > + ovs.socket_util.set_nonblocking(sock) > + if (sys.platform != "win32" > + and sock.family == socket.AF_UNIX): > + return 0, Stream(sock, "unix:%s" % addr, 0) > + return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0], > + str(addr[1])), > 0) > + except socket.error as e: > + error = ovs.socket_util.get_exception_errno(e) > + if (sys.platform == "win32" and > + error == errno.WSAEWOULDBLOCK): > + error = errno.EAGAIN > + if error != errno.EAGAIN: > + # XXX rate-limit > + vlog.dbg("accept: %s" % os.strerror(error)) > + return error, None > + else: > + if self.connect_pending: > + try: > + win32file.GetOverlappedResult(self.pipe, > self.connect) > + self.connect_pending = False > + except pywintypes.error as e: > + return (errno.EAGAIN, "") > + return 0, Stream(self.pipe, "", 0) > + > + try: > + self.connect_pending = False > + self.connect = pywintypes.OVERLAPPED() > + self.connect.hEvent = win32event.CreateEvent(None, True, > + True, None) > + error = win32pipe.ConnectNamedPipe(self.pipe, > self.connect) > + if error == winerror.ERROR_IO_PENDING: > + self.connect_pending = True > + return errno.EAGAIN, None > + > + stream = Stream(self.pipe, "", 0) > + > + saAttr = win32security.SECURITY_ATTRIBUTES() > + saAttr.bInheritHandle = 1 > + self.pipe = win32pipe.CreateNamedPipe( > + self.name, > + win32con.PIPE_ACCESS_DUPLEX | > + win32con.FILE_FLAG_OVERLAPPED, > + win32con.PIPE_TYPE_MESSAGE | > + win32con.PIPE_READMODE_BYTE | > + win32con.PIPE_WAIT, > + 64, 65000, 65000, 0, saAttr > + ) > + > + return 0, stream > + except pywintypes.error as e: > + return errno.EAGAIN, None > + > + def wait(self, poller): > + if hasattr(self, "socket"): > + poller.fd_wait(self.socket, ovs.poller.POLLIN) > + else: > + poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN) > + > + def __del__(self): > + # Don't delete the file: we might have forked. > + if hasattr(self, "socket"): > + self.socket.close() > + else: > + win32file.CloseHandle(self.pipe) > + self.pipe = None > + > + > +def usage(name): > + return """ > +Active %s connection methods: > + unix:FILE Unix domain socket named FILE > + tcp:IP:PORT TCP socket to IP with port no of PORT > + > +Passive %s connection methods: > + punix:FILE Listen on Unix domain socket FILE""" % (name, > name) > + > + > +class UnixStream(Stream): > + @staticmethod > + def _open(suffix, dscp): > + connect_path = suffix > + return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, > + True, None, connect_path) > +Stream.register_method("unix", UnixStream) > + > + > +class TCPStream(Stream): > + @staticmethod > + def _open(suffix, dscp): > + error, sock = ovs.socket_util.inet_open_ > active(socket.SOCK_STREAM, > + suffix, 0, dscp) > + if not error: > + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) > + return error, sock > +Stream.register_method("tcp", TCPStream) > diff --git a/python/ovs/unixctl/client.py b/python/ovs/unixctl/client.py > index fde674e..ede4855 100644 > --- a/python/ovs/unixctl/client.py > +++ b/python/ovs/unixctl/client.py > @@ -13,12 +13,16 @@ > # limitations under the License. > > import os > +import sys > > import six > > import ovs.jsonrpc > -import ovs.stream_unix as ovs_stream > import ovs.util > +if sys.platform == "win32": > + import ovs.stream_windows as ovs_stream > +else: > + import ovs.stream_unix as ovs_stream > > > vlog = ovs.vlog.Vlog("unixctl_client") > diff --git a/python/ovs/unixctl/server.py b/python/ovs/unixctl/server.py > index 50a11d4..d457a2c 100644 > --- a/python/ovs/unixctl/server.py > +++ b/python/ovs/unixctl/server.py > @@ -22,7 +22,10 @@ from six.moves import range > > import ovs.dirs > import ovs.jsonrpc > -import ovs.stream_unix as ovs_stream > +if sys.platform == "win32": > + import ovs.stream_windows as ovs_stream > +else: > + import ovs.stream_unix as ovs_stream > import ovs.unixctl > import ovs.util > import ovs.version > @@ -148,6 +151,8 @@ class UnixctlServer(object): > def run(self): > for _ in range(10): > error, stream = self._listener.accept() > + if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK: > + error = errno.EAGAIN > if not error: > rpc = ovs.jsonrpc.Connection(stream) > self._conns.append(UnixctlConnection(rpc)) > @@ -155,8 +160,8 @@ class UnixctlServer(object): > break > else: > # XXX: rate-limit > - vlog.warn("%s: accept failed: %s" % (self._listener.name, > - os.strerror(error))) > + vlog.warn("%s: accept failed: %s %d" > + % (self._listener.name, os.strerror(error), > error)) > > for conn in copy.copy(self._conns): > error = conn.run() > diff --git a/tests/test-jsonrpc.py b/tests/test-jsonrpc.py > index 18634e6..8d9010d 100644 > --- a/tests/test-jsonrpc.py > +++ b/tests/test-jsonrpc.py > @@ -23,7 +23,10 @@ import ovs.daemon > import ovs.json > import ovs.jsonrpc > import ovs.poller > -import ovs.stream > Doesn't the above need to be changed in the previous patch? Since Alin implemented the named pipes, I will let him look at it first. > +if sys.platform == "win32": > + import ovs.stream_windows as ovs_stream > +else: > + import ovs.stream_unix as ovs_stream > > > > def handle_rpc(rpc, msg): > @@ -53,14 +56,14 @@ def handle_rpc(rpc, msg): > > > def do_listen(name): > - error, pstream = ovs.stream.PassiveStream.open(name) > + ovs.daemon.daemonize() > + > + error, pstream = ovs_stream.PassiveStream.open(name) > if error: > sys.stderr.write("could not listen on \"%s\": %s\n" > % (name, os.strerror(error))) > sys.exit(1) > > - ovs.daemon.daemonize() > - > rpcs = [] > done = False > while True: > @@ -111,7 +114,7 @@ def do_request(name, method, params_string): > sys.stderr.write("not a valid JSON-RPC request: %s\n" % s) > sys.exit(1) > > - error, stream = ovs.stream.Stream.open_block( > ovs.stream.Stream.open(name)) > + error, stream = ovs_stream.Stream.open_block( > ovs_stream.Stream.open(name)) > if error: > sys.stderr.write("could not open \"%s\": %s\n" > % (name, os.strerror(error))) > @@ -142,7 +145,7 @@ def do_notify(name, method, params_string): > sys.stderr.write("not a valid JSON-RPC notification: %s\n" % s) > sys.exit(1) > > - error, stream = ovs.stream.Stream.open_block( > ovs.stream.Stream.open(name)) > + error, stream = ovs_stream.Stream.open_block( > ovs_stream.Stream.open(name)) > if error: > sys.stderr.write("could not open \"%s\": %s\n" > % (name, os.strerror(error))) > @@ -174,7 +177,7 @@ def main(argv): > listen LOCAL listen for connections on LOCAL > request REMOTE METHOD PARAMS send request, print reply > notify REMOTE METHOD PARAMS send notification and exit > -""" + ovs.stream.usage("JSON-RPC") > +""" + ovs_stream.usage("JSON-RPC") > > group = parser.add_argument_group(title="Commands", > description=group_description) > diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py > index e4e3395..9f6ef49 100644 > --- a/tests/test-ovsdb.py > +++ b/tests/test-ovsdb.py > @@ -30,6 +30,10 @@ import ovs.poller > import ovs.util > from ovs.fatal_signal import signal_alarm > import six > +if sys.platform == "win32": > + import ovs.stream_windows as ovs_stream > +else: > + import ovs.stream_unix as ovs_stream > > > def unbox_json(json): > @@ -534,8 +538,8 @@ def do_idl(schema_file, remote, *commands): > idl = ovs.db.idl.Idl(remote, schema_helper) > > if commands: > - error, stream = ovs.stream.Stream.open_block( > - ovs.stream.Stream.open(remote)) > + error, stream = ovs_stream.Stream.open_block( > + ovs_stream.Stream.open(remote)) > if error: > sys.stderr.write("failed to connect to \"%s\"" % remote) > sys.exit(1) > -- > 2.7.2.windows.1 > _______________________________________________ > dev mailing list > dev@openvswitch.org > http://openvswitch.org/mailman/listinfo/dev > _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev