Thanks for the review, I will send the patches soon. > From: Guru Shetty [mailto:g...@ovn.org] > Sent: Tuesday, January 3, 2017 9:35 PM > To: Alin Balutoiu <abalut...@cloudbasesolutions.com> > Cc: d...@openvswitch.org > Subject: Re: [ovs-dev] [PATCH V3 2/5] Python tests: Ported UNIX sockets to > Windows > > > > On 3 January 2017 at 11:16, Alin Balutoiu > <mailto:abalut...@cloudbasesolutions.com> wrote: > Ok, I'll include that in the next version. > Mind if I respin the series after you take a look over the rest of the > patches? > > I am done looking at the rest of the patches. When you respin, please fix the > spelling of "intented" in the first patch of the series. Also, in the first > patch, edit (and add) AUTHORS file to include yourself. > > Thanks. > > > > From: Guru Shetty [mailto:mailto:g...@ovn.org] > > Sent: Tuesday, January 3, 2017 9:09 PM > > To: Alin Balutoiu <mailto:abalut...@cloudbasesolutions.com> > > Cc: mailto:d...@openvswitch.org > > Subject: Re: [ovs-dev] [PATCH V3 2/5] Python tests: Ported UNIX sockets to > > Windows > > > > > > > > On 3 January 2017 at 11:03, Alin Balutoiu > > <mailto:mailto:abalut...@cloudbasesolutions.com> wrote: > > Thanks for the comment. > > The socket attribute of the class Stream cannot be None if the code runs on > > Unix. > > Therefore the condition "self.socket is not None" is True only if sockets > > are not used, > > and named pipes are used instead (i.e. it runs on Windows). > > > > However, if you prefer I can replace it with the following code: > > I like it better with the below piece. So please go ahead and send it as > > part of the next version. > > > > if self.socket is not None: > > retval = ovs.socket_util.check_connection_completion(self.socket) > > assert retval != errno.EINPROGRESS > > elif sys.platform == 'win32': > > if self.retry_connect: > > try: > > self.pipe = winutils.create_file(self._pipename) > > self._retry_connect = False > > retval = 0 > > except pywintypes.error as e: > > if e.winerror == winutils.winerror.ERROR_PIPE_BUSY: > > retval = errno.EAGAIN > > else: > > self._retry_connect = False > > retval = errno.ENOENT > > else: > > # If retry_connect is false, it means it's already > > # connected so we can set the value of retval to 0 > > retval = 0 > > > > > > > From: Guru Shetty [mailto:mailto:mailto:mailto:g...@ovn.org] > > > Sent: Tuesday, January 3, 2017 8:17 PM > > > To: Alin Balutoiu <mailto:mailto:abalut...@cloudbasesolutions.com> > > > Cc: mailto:mailto:d...@openvswitch.org > > > Subject: Re: [ovs-dev] [PATCH V3 2/5] Python tests: Ported UNIX sockets > > > to Windows > > > > > > > > > > > > On 3 January 2017 at 08:46, Alin Balutoiu > > > <mailto:mailto:mailto:mailto:abalut...@cloudbasesolutions.com> wrote: > > > From: Alin Balutoiu > > > <mailto:mailto:mailto:mailto:abalut...@cloudbasesolutions.com> > > > > > > Unix sockets (AF_UNIX) are not supported on Windows. > > > The replacement of Unix sockets on Windows is implemented > > > using named pipes, we are trying to mimic the behaviour > > > of unix sockets. > > > > > > Instead of using Unix sockets to communicate > > > between components Named Pipes are used. This > > > makes the python sockets compatible with the > > > Named Pipe used in Windows applications. > > > > > > Signed-off-by: Paul-Daniel Boca > > > <mailto:mailto:mailto:mailto:pb...@cloudbasesolutions.com> > > > Signed-off-by: Alin Balutoiu > > > <mailto:mailto:mailto:mailto:abalut...@cloudbasesolutions.com> > > > Acked-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions> > > > Tested-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions> > > > --- > > > V2: No changes. > > > V3: Changed Signed-off-by name and added previous Acked-by's, Tested-by's. > > > > > > I intend to add the following diff: > > > > > > diff --git a/python/ovs/stream.py b/python/ovs/stream.py > > > index e8e5700..af35afd 100644 > > > --- a/python/ovs/stream.py > > > +++ b/python/ovs/stream.py > > > @@ -251,7 +251,7 @@ class Stream(object): > > > else: > > > self._retry_connect = False > > > retval = errno.ENOENT > > > - else: > > > + elif sys.platform == 'win32': > > > # Windows only, if retry_connect is false, it means it's > > > already > > > # connected so we can set the value of retval to 0 > > > retval = 0 > > > > > > > > > --- > > > python/ovs/jsonrpc.py | 6 + > > > python/ovs/poller.py | 72 ++++++--- > > > python/ovs/socket_util.py | 31 +++- > > > python/ovs/stream.py | 351 > > > ++++++++++++++++++++++++++++++++++++++++--- > > > python/ovs/unixctl/server.py | 4 + > > > tests/test-jsonrpc.py | 16 +- > > > 6 files changed, 436 insertions(+), 44 deletions(-) > > > > > > diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py > > > index 6300c67..5a11500 100644 > > > --- a/python/ovs/jsonrpc.py > > > +++ b/python/ovs/jsonrpc.py > > > @@ -14,6 +14,7 @@ > > > > > > import errno > > > import os > > > +import sys > > > > > > import six > > > > > > @@ -274,6 +275,11 @@ class Connection(object): > > > except UnicodeError: > > > error = errno.EILSEQ > > > if error: > > > + if (sys.platform == "win32" and > > > + error == errno.WSAEWOULDBLOCK): > > > + # WSAEWOULDBLOCK would be the equivalent on > > > Windows > > > + # for EAGAIN on Unix. > > > + error = errno.EAGAIN > > > if error == errno.EAGAIN: > > > return error, None > > > else: > > > diff --git a/python/ovs/poller.py b/python/ovs/poller.py > > > index d7cb7d3..d836483 100644 > > > --- a/python/ovs/poller.py > > > +++ b/python/ovs/poller.py > > > @@ -18,6 +18,10 @@ import ovs.vlog > > > import select > > > import socket > > > import os > > > +import sys > > > + > > > +if sys.platform == "win32": > > > + import ovs.winutils as winutils > > > > > > try: > > > from OpenSSL import SSL > > > @@ -62,7 +66,9 @@ class _SelectSelect(object): > > > if SSL and isinstance(fd, SSL.Connection): > > > fd = fd.fileno() > > > > > > - assert isinstance(fd, int) > > > + if sys.platform != 'win32': > > > + # Skip this on Windows, it also register events > > > + assert isinstance(fd, int) > > > if events & POLLIN: > > > self.rlist.append(fd) > > > events &= ~POLLIN > > > @@ -73,28 +79,58 @@ class _SelectSelect(object): > > > self.xlist.append(fd) > > > > > > def poll(self, timeout): > > > - if timeout == -1: > > > - # epoll uses -1 for infinite timeout, select uses None. > > > - timeout = None > > > - else: > > > - timeout = float(timeout) / 1000 > > > # XXX workaround a bug in eventlet > > > # see https://github.com/eventlet/eventlet/pull/25 > > > if timeout == 0 and _using_eventlet_green_select(): > > > timeout = 0.1 > > > + if sys.platform == 'win32': > > > + events = self.rlist + self.wlist + self.xlist > > > + if not events: > > > + return [] > > > + if len(events) > winutils.win32event.MAXIMUM_WAIT_OBJECTS: > > > + raise WindowsError("Cannot handle more than maximum wait" > > > + "objects\n") > > > + > > > + # win32event.INFINITE timeout is -1 > > > + # timeout must be an int number, expressed in ms > > > + if timeout == 0.1: > > > + timeout = 100 > > > + else: > > > + timeout = int(timeout) > > > + > > > + # Wait until any of the events is set to signaled > > > + try: > > > + retval = winutils.win32event.WaitForMultipleObjects( > > > + events, > > > + False, # Wait all > > > + timeout) > > > + except winutils.pywintypes.error: > > > + return [(0, POLLERR)] > > > > > > - rlist, wlist, xlist = select.select(self.rlist, self.wlist, > > > self.xlist, > > > - timeout) > > > - events_dict = {} > > > - for fd in rlist: > > > - events_dict[fd] = events_dict.get(fd, 0) | POLLIN > > > - for fd in wlist: > > > - events_dict[fd] = events_dict.get(fd, 0) | POLLOUT > > > - for fd in xlist: > > > - events_dict[fd] = events_dict.get(fd, 0) | (POLLERR | > > > - POLLHUP | > > > - POLLNVAL) > > > - return list(events_dict.items()) > > > + if retval == winutils.winerror.WAIT_TIMEOUT: > > > + return [] > > > + > > > + return [(events[retval], 0)] > > > + else: > > > + if timeout == -1: > > > + # epoll uses -1 for infinite timeout, select uses None. > > > + timeout = None > > > + else: > > > + timeout = float(timeout) / 1000 > > > + rlist, wlist, xlist = select.select(self.rlist, > > > + self.wlist, > > > + self.xlist, > > > + timeout) > > > + events_dict = {} > > > + for fd in rlist: > > > + events_dict[fd] = events_dict.get(fd, 0) | POLLIN > > > + for fd in wlist: > > > + events_dict[fd] = events_dict.get(fd, 0) | POLLOUT > > > + for fd in xlist: > > > + events_dict[fd] = events_dict.get(fd, 0) | (POLLERR | > > > + POLLHUP | > > > + POLLNVAL) > > > + return list(events_dict.items()) > > > > > > > > > SelectPoll = _SelectSelect > > > diff --git a/python/ovs/socket_util.py b/python/ovs/socket_util.py > > > index b358b05..fb6cee4 100644 > > > --- a/python/ovs/socket_util.py > > > +++ b/python/ovs/socket_util.py > > > @@ -17,6 +17,7 @@ import os > > > import os.path > > > import random > > > import socket > > > +import sys > > > > > > import six > > > from six.moves import range > > > @@ -25,6 +26,10 @@ import ovs.fatal_signal > > > import ovs.poller > > > import ovs.vlog > > > > > > +if sys.platform == 'win32': > > > + import ovs.winutils as winutils > > > + import win32file > > > + > > > vlog = ovs.vlog.Vlog("socket_util") > > > > > > > > > @@ -158,7 +163,17 @@ def make_unix_socket(style, nonblock, bind_path, > > > connect_path, short=False): > > > > > > def check_connection_completion(sock): > > > p = ovs.poller.SelectPoll() > > > - p.register(sock, ovs.poller.POLLOUT) > > > + if sys.platform == "win32": > > > + event = winutils.get_new_event(None, False, True, None) > > > + # Receive notification of readiness for writing, of completed > > > + # connection or multipoint join operation, and of socket closure. > > > + win32file.WSAEventSelect(sock, event, > > > + win32file.FD_WRITE | > > > + win32file.FD_CONNECT | > > > + win32file.FD_CLOSE) > > > + p.register(event, ovs.poller.POLLOUT) > > > + else: > > > + p.register(sock, ovs.poller.POLLOUT) > > > pfds = p.poll(0) > > > if len(pfds) == 1: > > > revents = pfds[0][1] > > > @@ -228,7 +243,12 @@ def inet_open_active(style, target, default_port, > > > dscp): > > > try: > > > sock.connect(address) > > > except socket.error as e: > > > - if get_exception_errno(e) != errno.EINPROGRESS: > > > + error = get_exception_errno(e) > > > + if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK: > > > + # WSAEWOULDBLOCK would be the equivalent on Windows > > > + # for EINPROGRESS on Unix. > > > + error = errno.EINPROGRESS > > > + if error != errno.EINPROGRESS: > > > raise > > > return 0, sock > > > except socket.error as e: > > > @@ -257,9 +277,12 @@ def get_null_fd(): > > > global null_fd > > > if null_fd < 0: > > > try: > > > - null_fd = os.open("/dev/null", os.O_RDWR) > > > + # os.devnull ensures compatibility with Windows, returns > > > + # '/dev/null' for Unix and 'nul' for Windows > > > + null_fd = os.open(os.devnull, os.O_RDWR) > > > except OSError as e: > > > - vlog.err("could not open /dev/null: %s" % > > > os.strerror(e.errno)) > > > + vlog.err("could not open %s: %s" % (os.devnull, > > > + os.strerror(e.errno))) > > > return -e.errno > > > return null_fd > > > > > > diff --git a/python/ovs/stream.py b/python/ovs/stream.py > > > index cd57eb3..e8e5700 100644 > > > --- a/python/ovs/stream.py > > > +++ b/python/ovs/stream.py > > > @@ -15,6 +15,7 @@ > > > import errno > > > import os > > > import socket > > > +import sys > > > > > > import six > > > > > > @@ -27,6 +28,13 @@ try: > > > except ImportError: > > > SSL = None > > > > > > +if sys.platform == 'win32': > > > + import ovs.winutils as winutils > > > + import pywintypes > > > + import win32event > > > + import win32file > > > + import win32pipe > > > + > > > vlog = ovs.vlog.Vlog("stream") > > > > > > > > > @@ -63,6 +71,13 @@ class Stream(object): > > > _SSL_certificate_file = None > > > _SSL_ca_cert_file = None > > > > > > + # Windows only > > > + _write = None # overlapped for write operation > > > + _read = None # overlapped for read operation > > > + _write_pending = False > > > + _read_pending = False > > > + _retry_connect = False > > > + > > > @staticmethod > > > def register_method(method, cls): > > > Stream._SOCKET_METHODS[method + ":"] = cls > > > @@ -81,8 +96,23 @@ class Stream(object): > > > otherwise False.""" > > > return bool(Stream._find_method(name)) > > > > > > - def __init__(self, socket, name, status): > > > + def __init__(self, socket, name, status, pipe=None, is_server=False): > > > self.socket = socket > > > + self.pipe = pipe > > > + if sys.platform == 'win32': > > > + self._read = pywintypes.OVERLAPPED() > > > + self._read.hEvent = winutils.get_new_event() > > > + self._write = pywintypes.OVERLAPPED() > > > + self._write.hEvent = winutils.get_new_event() > > > + if pipe is not None: > > > + # Flag to check if fd is a server HANDLE. In the case > > > of a > > > + # server handle we have to issue a disconnect before > > > closing > > > + # the actual handle. > > > + self._server = is_server > > > + suffix = name.split(":", 1)[1] > > > + suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) > > > + self._pipename = winutils.get_pipe_name(suffix) > > > + > > > http://self.name = name > > > if status == errno.EAGAIN: > > > self.state = Stream.__S_CONNECTING > > > @@ -120,6 +150,38 @@ class Stream(object): > > > suffix = name.split(":", 1)[1] > > > if name.startswith("unix:"): > > > suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) > > > + if sys.platform == 'win32': > > > + pipename = winutils.get_pipe_name(suffix) > > > + > > > + if len(suffix) > 255: > > > + # Return invalid argument if the name is too long > > > + return errno.ENOENT, None > > > + > > > + try: > > > + # In case of "unix:" argument, the assumption is that > > > + # there is a file created in the path (suffix). > > > + open(suffix, 'r').close() > > > + except: > > > + return errno.ENOENT, None > > > + > > > + try: > > > + npipe = winutils.create_file(pipename) > > > + try: > > > + winutils.set_pipe_mode(npipe, > > > + > > > win32pipe.PIPE_READMODE_BYTE) > > > + except pywintypes.error as e: > > > + return errno.ENOENT, None > > > + except pywintypes.error as e: > > > + if e.winerror == winutils.winerror.ERROR_PIPE_BUSY: > > > + # Pipe is busy, set the retry flag to true and > > > retry > > > + # again during the connect function. > > > + Stream.retry_connect = True > > > + return 0, cls(None, name, errno.EAGAIN, > > > + > > > pipe=win32file.INVALID_HANDLE_VALUE, > > > + is_server=False) > > > + return errno.ENOENT, None > > > + return 0, cls(None, name, 0, pipe=npipe, is_server=False) > > > + > > > error, sock = cls._open(suffix, dscp) > > > if error: > > > return error, None > > > @@ -145,6 +207,10 @@ class Stream(object): > > > if not error: > > > while True: > > > error = stream.connect() > > > + if sys.platform == 'win32' and error == > > > errno.WSAEWOULDBLOCK: > > > + # WSAEWOULDBLOCK would be the equivalent on Windows > > > + # for EAGAIN on Unix. > > > + error = errno.EAGAIN > > > if error != errno.EAGAIN: > > > break > > > stream.run() > > > @@ -152,7 +218,8 @@ class Stream(object): > > > stream.run_wait(poller) > > > stream.connect_wait(poller) > > > poller.block() > > > - assert error != errno.EINPROGRESS > > > + if stream.socket is not None: > > > + assert error != errno.EINPROGRESS > > > > > > if error and stream: > > > stream.close() > > > @@ -160,11 +227,35 @@ class Stream(object): > > > return error, stream > > > > > > def close(self): > > > - self.socket.close() > > > + if self.socket is not None: > > > + self.socket.close() > > > + if self.pipe is not None: > > > + if self._server: > > > + win32pipe.DisconnectNamedPipe(self.pipe) > > > + winutils.close_handle(self.pipe, vlog.warn) > > > + winutils.close_handle(self._read.hEvent, vlog.warn) > > > + winutils.close_handle(self._write.hEvent, vlog.warn) > > > > > > def __scs_connecting(self): > > > - retval = ovs.socket_util.check_connection_completion(self.socket) > > > - assert retval != errno.EINPROGRESS > > > + if self.socket is not None: > > > + retval = > > > ovs.socket_util.check_connection_completion(self.socket) > > > + assert retval != errno.EINPROGRESS > > > + elif sys.platform == 'win32' and self.retry_connect: > > > + try: > > > + self.pipe = winutils.create_file(self._pipename) > > > + self._retry_connect = False > > > + retval = 0 > > > + except pywintypes.error as e: > > > + if e.winerror == winutils.winerror.ERROR_PIPE_BUSY: > > > + retval = errno.EAGAIN > > > + else: > > > + self._retry_connect = False > > > + retval = errno.ENOENT > > > + else: > > > + # Windows only, if retry_connect is false, it means it's > > > already > > > + # connected so we can set the value of retval to 0 > > > + retval = 0 > > > + > > > if retval == 0: > > > self.state = Stream.__S_CONNECTED > > > elif retval != errno.EAGAIN: > > > @@ -209,11 +300,63 @@ class Stream(object): > > > elif n == 0: > > > return (0, "") > > > > > > + if sys.platform == 'win32' and self.socket is None: > > > + return self.__recv_windows(n) > > > + > > > try: > > > return (0, self.socket.recv(n)) > > > except socket.error as e: > > > return (ovs.socket_util.get_exception_errno(e), "") > > > > > > + def __recv_windows(self, n): > > > + if self._read_pending: > > > + try: > > > + nBytesRead = winutils.get_overlapped_result(self.pipe, > > > + self._read, > > > + False) > > > + self._read_pending = False > > > + recvBuffer = self._read_buffer[:nBytesRead] > > > + > > > + return (0, winutils.get_decoded_buffer(recvBuffer)) > > > + except pywintypes.error as e: > > > + if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE: > > > + # The operation is still pending, try again > > > + self._read_pending = True > > > + return (errno.EAGAIN, "") > > > + elif e.winerror in winutils.pipe_disconnected_errors: > > > + # If the pipe was disconnected, return 0. > > > + return (0, "") > > > + else: > > > + return (errno.EINVAL, "") > > > + > > > + (errCode, self._read_buffer) = winutils.read_file(self.pipe, > > > + n, > > > + self._read) > > > + if errCode: > > > + if errCode == winutils.winerror.ERROR_IO_PENDING: > > > + self._read_pending = True > > > + return (errno.EAGAIN, "") > > > + elif errCode in winutils.pipe_disconnected_errors: > > > + # If the pipe was disconnected, return 0. > > > + return (0, "") > > > + else: > > > + return (errCode, "") > > > + > > > + try: > > > + nBytesRead = winutils.get_overlapped_result(self.pipe, > > > + self._read, > > > + False) > > > + winutils.win32event.SetEvent(self._read.hEvent) > > > + except pywintypes.error as e: > > > + if e.winerror in winutils.pipe_disconnected_errors: > > > + # If the pipe was disconnected, return 0. > > > + return (0, "") > > > + else: > > > + return (e.winerror, "") > > > + > > > + recvBuffer = self._read_buffer[:nBytesRead] > > > + return (0, winutils.get_decoded_buffer(recvBuffer)) > > > + > > > def send(self, buf): > > > """Tries to send 'buf' on this stream. > > > > > > @@ -231,6 +374,9 @@ class Stream(object): > > > elif len(buf) == 0: > > > return 0 > > > > > > + if sys.platform == 'win32' and self.socket is None: > > > + return self.__send_windows(buf) > > > + > > > try: > > > # Python 3 has separate types for strings and bytes. We > > > must have > > > # bytes here. > > > @@ -240,6 +386,40 @@ class Stream(object): > > > except socket.error as e: > > > return -ovs.socket_util.get_exception_errno(e) > > > > > > + def __send_windows(self, buf): > > > + if self._write_pending: > > > + try: > > > + nBytesWritten = winutils.get_overlapped_result(self.pipe, > > > + > > > self._write, > > > + False) > > > + self._write_pending = False > > > + return nBytesWritten > > > + except pywintypes.error as e: > > > + if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE: > > > + # The operation is still pending, try again > > > + self._read_pending = True > > > + return -errno.EAGAIN > > > + elif e.winerror in winutils.pipe_disconnected_errors: > > > + # If the pipe was disconnected, return connection > > > reset. > > > + return -errno.ECONNRESET > > > + else: > > > + return -errno.EINVAL > > > + > > > + buf = winutils.get_encoded_buffer(buf) > > > + self._write_pending = False > > > + (errCode, nBytesWritten) = winutils.write_file(self.pipe, > > > + buf, > > > + self._write) > > > + if errCode: > > > + if errCode == winutils.winerror.ERROR_IO_PENDING: > > > + self._write_pending = True > > > + return -errno.EAGAIN > > > + if (not nBytesWritten and > > > + errCode in winutils.pipe_disconnected_errors): > > > + # If the pipe was disconnected, return connection reset. > > > + return -errno.ECONNRESET > > > + return nBytesWritten > > > + > > > def run(self): > > > pass > > > > > > @@ -255,11 +435,52 @@ class Stream(object): > > > > > > if self.state == Stream.__S_CONNECTING: > > > wait = Stream.W_CONNECT > > > + > > > + if sys.platform == 'win32': > > > + self.__wait_windows(poller, wait) > > > + return > > > + > > > if wait == Stream.W_RECV: > > > poller.fd_wait(self.socket, ovs.poller.POLLIN) > > > else: > > > poller.fd_wait(self.socket, ovs.poller.POLLOUT) > > > > > > + def __wait_windows(self, poller, wait): > > > + if self.socket is not None: > > > + if wait == Stream.W_RECV: > > > + read_flags = (win32file.FD_READ | > > > + win32file.FD_ACCEPT | > > > + win32file.FD_CLOSE) > > > + try: > > > + win32file.WSAEventSelect(self.socket, > > > + self._read.hEvent, > > > + read_flags) > > > + except pywintypes.error as e: > > > + vlog.err("failed to associate events with socket: %s" > > > + % e.strerror) > > > + poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN) > > > + else: > > > + write_flags = (win32file.FD_WRITE | > > > + win32file.FD_CONNECT | > > > + win32file.FD_CLOSE) > > > + try: > > > + win32file.WSAEventSelect(self.socket, > > > + self._write.hEvent, > > > + write_flags) > > > + except pywintypes.error as e: > > > + vlog.err("failed to associate events with socket: %s" > > > + % e.strerror) > > > + poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT) > > > + else: > > > + if wait == Stream.W_RECV: > > > + if self._read: > > > + poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN) > > > + elif wait == Stream.W_SEND: > > > + if self._write: > > > + poller.fd_wait(self._write.hEvent, > > > ovs.poller.POLLOUT) > > > + elif wait == Stream.W_CONNECT: > > > + return > > > + > > > def connect_wait(self, poller): > > > self.wait(poller, Stream.W_CONNECT) > > > > > > @@ -267,11 +488,22 @@ class Stream(object): > > > self.wait(poller, Stream.W_RECV) > > > > > > def send_wait(self, poller): > > > + if sys.platform == 'win32': > > > + poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN) > > > self.wait(poller, Stream.W_SEND) > > > > > > def __del__(self): > > > # Don't delete the file: we might have forked. > > > - self.socket.close() > > > + if self.socket is not None: > > > + self.socket.close() > > > + if self.pipe is not None: > > > + # Check if there are any remaining valid handles and close > > > them > > > + if self.pipe: > > > + winutils.close_handle(self.pipe) > > > + if self._read.hEvent: > > > + winutils.close_handle(self._read.hEvent) > > > + if self._write.hEvent: > > > + winutils.close_handle(self._write.hEvent) > > > > > > @staticmethod > > > def ssl_set_private_key_file(file_name): > > > @@ -287,6 +519,10 @@ class Stream(object): > > > > > > > > > class PassiveStream(object): > > > + # Windows only > > > + connect = None # overlapped for read operation > > > + connect_pending = False > > > + > > > @staticmethod > > > def is_valid_name(name): > > > """Returns True if 'name' is a passive stream name in the form > > > @@ -294,9 +530,18 @@ class PassiveStream(object): > > > "punix:" or "ptcp"), otherwise False.""" > > > return name.startswith("punix:") | name.startswith("ptcp:") > > > > > > - def __init__(self, sock, name, bind_path): > > > + def __init__(self, sock, name, bind_path, pipe=None): > > > http://self.name = name > > > + self.pipe = pipe > > > self.socket = sock > > > + if pipe is not None: > > > + self.connect = pywintypes.OVERLAPPED() > > > + self.connect.hEvent = > > > winutils.get_new_event(bManualReset=True) > > > + self.connect_pending = False > > > + suffix = name.split(":", 1)[1] > > > + suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) > > > + self._pipename = winutils.get_pipe_name(suffix) > > > + > > > self.bind_path = bind_path > > > > > > @staticmethod > > > @@ -315,11 +560,27 @@ class PassiveStream(object): > > > bind_path = name[6:] > > > if name.startswith("punix:"): > > > bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, > > > bind_path) > > > - error, sock = > > > ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, > > > - True, > > > bind_path, > > > - None) > > > - if error: > > > - return error, None > > > + if sys.platform != 'win32': > > > + error, sock = ovs.socket_util.make_unix_socket( > > > + socket.SOCK_STREAM, True, bind_path, None) > > > + if error: > > > + return error, None > > > + else: > > > + # Branch used only on Windows > > > + try: > > > + open(bind_path, 'w').close() > > > + except: > > > + return errno.ENOENT, None > > > + > > > + pipename = winutils.get_pipe_name(bind_path) > > > + if len(pipename) > 255: > > > + # Return invalid argument if the name is too long > > > + return errno.ENOENT, None > > > + > > > + npipe = winutils.create_named_pipe(pipename) > > > + if not npipe: > > > + return errno.ENOENT, None > > > + return 0, PassiveStream(None, name, bind_path, > > > pipe=npipe) > > > > > > elif name.startswith("ptcp:"): > > > sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) > > > @@ -341,7 +602,11 @@ class PassiveStream(object): > > > > > > def close(self): > > > """Closes this PassiveStream.""" > > > - self.socket.close() > > > + if self.socket is not None: > > > + self.socket.close() > > > + if self.pipe is not None: > > > + winutils.close_handle(self.pipe, vlog.warn) > > > + winutils.close_handle(self.connect.hEvent, vlog.warn) > > > if self.bind_path is not None: > > > ovs.fatal_signal.unlink_file_now(self.bind_path) > > > self.bind_path = None > > > @@ -354,28 +619,80 @@ class PassiveStream(object): > > > > > > Will not block waiting for a connection. If no connection is > > > ready to > > > be accepted, returns (errno.EAGAIN, None) immediately.""" > > > - > > > + if sys.platform == 'win32' and self.socket is None: > > > + return self.__accept_windows() > > > while True: > > > try: > > > sock, addr = self.socket.accept() > > > ovs.socket_util.set_nonblocking(sock) > > > - if (sock.family == socket.AF_UNIX): > > > + if (sys.platform != 'win32' and sock.family == > > > socket.AF_UNIX): > > > return 0, Stream(sock, "unix:%s" % addr, 0) > > > return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0], > > > str(addr[1])), 0) > > > except socket.error as e: > > > error = ovs.socket_util.get_exception_errno(e) > > > + if sys.platform == 'win32' and error == > > > errno.WSAEWOULDBLOCK: > > > + # WSAEWOULDBLOCK would be the equivalent on Windows > > > + # for EAGAIN on Unix. > > > + error = errno.EAGAIN > > > if error != errno.EAGAIN: > > > # XXX rate-limit > > > vlog.dbg("accept: %s" % os.strerror(error)) > > > return error, None > > > > > > + def __accept_windows(self): > > > + if self.connect_pending: > > > + try: > > > + winutils.get_overlapped_result(self.pipe, self.connect, > > > False) > > > + except pywintypes.error as e: > > > + if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE: > > > + # The operation is still pending, try again > > > + self.connect_pending = True > > > + return errno.EAGAIN, None > > > + else: > > > + if self.pipe: > > > + win32pipe.DisconnectNamedPipe(self.pipe) > > > + return errno.EINVAL, None > > > + self.connect_pending = False > > > + > > > + error = winutils.connect_named_pipe(self.pipe, self.connect) > > > + if error: > > > + if error == winutils.winerror.ERROR_IO_PENDING: > > > + self.connect_pending = True > > > + return errno.EAGAIN, None > > > + elif error != winutils.winerror.ERROR_PIPE_CONNECTED: > > > + if self.pipe: > > > + win32pipe.DisconnectNamedPipe(self.pipe) > > > + self.connect_pending = False > > > + return errno.EINVAL, None > > > + else: > > > + win32event.SetEvent(self.connect.hEvent) > > > + > > > + npipe = winutils.create_named_pipe(self._pipename) > > > + if not npipe: > > > + return errno.ENOENT, None > > > + > > > + old_pipe = self.pipe > > > + self.pipe = npipe > > > + winutils.win32event.ResetEvent(self.connect.hEvent) > > > + return 0, Stream(None, http://self.name, 0, pipe=old_pipe) > > > + > > > def wait(self, poller): > > > - poller.fd_wait(self.socket, ovs.poller.POLLIN) > > > + if sys.platform != 'win32' or self.socket is not None: > > > + poller.fd_wait(self.socket, ovs.poller.POLLIN) > > > + else: > > > + poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN) > > > > > > def __del__(self): > > > # Don't delete the file: we might have forked. > > > - self.socket.close() > > > + if self.socket is not None: > > > + self.socket.close() > > > + if self.pipe is not None: > > > + # Check if there are any remaining valid handles and close > > > them > > > + if self.pipe: > > > + winutils.close_handle(self.pipe) > > > + if self._connect.hEvent: > > > + winutils.close_handle(self._read.hEvent) > > > > > > > > > def usage(name): > > > diff --git a/python/ovs/unixctl/server.py b/python/ovs/unixctl/server.py > > > index 8595ed8..3f3e051 100644 > > > --- a/python/ovs/unixctl/server.py > > > +++ b/python/ovs/unixctl/server.py > > > @@ -148,6 +148,10 @@ class UnixctlServer(object): > > > def run(self): > > > for _ in range(10): > > > error, stream = self._listener.accept() > > > + if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK: > > > + # WSAEWOULDBLOCK would be the equivalent on Windows > > > + # for EAGAIN on Unix. > > > + error = errno.EAGAIN > > > if not error: > > > rpc = ovs.jsonrpc.Connection(stream) > > > self._conns.append(UnixctlConnection(rpc)) > > > diff --git a/tests/test-jsonrpc.py b/tests/test-jsonrpc.py > > > index 18634e6..3eabcd7 100644 > > > --- a/tests/test-jsonrpc.py > > > +++ b/tests/test-jsonrpc.py > > > @@ -53,11 +53,17 @@ def handle_rpc(rpc, msg): > > > > > > > > > def do_listen(name): > > > - error, pstream = ovs.stream.PassiveStream.open(name) > > > - if error: > > > - sys.stderr.write("could not listen on \"%s\": %s\n" > > > - % (name, os.strerror(error))) > > > - sys.exit(1) > > > + if sys.platform != 'win32' or ( > > > + ovs.daemon._detach and ovs.daemon._detached): > > > + # On Windows the child is a new process created which should be > > > the > > > + # one that creates the PassiveStream. Without this check, the new > > > + # child process will create a new PassiveStream overwriting the > > > one > > > + # that the parent process created. > > > + error, pstream = ovs.stream.PassiveStream.open(name) > > > + if error: > > > + sys.stderr.write("could not listen on \"%s\": %s\n" > > > + % (name, os.strerror(error))) > > > + sys.exit(1) > > > > > > ovs.daemon.daemonize() > > > > > > -- > > > 2.10.0.windows.1 > > > _______________________________________________ > > > dev mailing list > > > mailto:mailto:mailto:mailto:d...@openvswitch.org > > > https://mail.openvswitch.org/mailman/listinfo/ovs-dev
_______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev