Hi Russel.

Did you have a chance to check V6 patch?

This should fix the testing problem.

Ofer.

---------- Forwarded message ---------
From: Ofer Ben Yacov <ofer.benya...@gmail.com>
‪Date: יום ד׳, 17 בפבר׳ 2016 ב-17:21‬
Subject: [PATCH V6] Add Passive TCP connection to IDL.
To: <ofer.benya...@gmail.com>


From: Ofer Ben-Yacov <ofer.benya...@gmail.com>

Currently the IDL does not support passive TCP connection,
i.e. when the OVSDB connects to its manager.

This patch enables IDL to use an already-open session
(the one which was previously used for retrieving the db schema).
In addition, it enables IDL to go back to "listen mode" in case the
connection
is lost.

LIMITATIONS:
----------------------

This patch enables a **SINGLE** TCP connection from an OVSDB server to an
agent that uses IDL with {IP,PORT} pair. Therefore, the agent will support
only **ONE** OVSDB server using {IP,PORT} pair.

Future development may add multi-session server capability that will allow
an agent to use single {IP,PORT} pair to connect to multiple OVSDB servers.


CAVEAT:
--------------

When a database first connects to the agent, the agent gets the schema and
data and builds its tables. If the session disconnects, the agent goes back
to "listen mode" and accepts **ANY** TCP connection, which means that if
another database will try to connect to the agent using the same {IP,PORT}
pair, it will be connected to the IDL that has the schema and data from
the first database.

A future patch can resolve this problem.

USAGE:
-------------

To use IDL in passive mode, the following example code can be use:

(snippet)

from ovs.jsonrpc import Session
...

from neutron.agent.ovsdb.native import idlutils

...

session = Session.open('ptcp:192.168.10.10:6640')

# first call to session.run creates the PassiveStream object and second one
# accept incoming connection
session.run()
session.run()

# this static method is similar to the original neutron method but the
# rpc.close() command that would result closing the socket.
helper = idlutils.get_schema_helper_from_stream_no_close(session.stream,
        'hardware_vtep')
helper.register_all()
self.idl = idl.Idl(self.connection, helper, session)
idlutils.wait_for_change(self.idl, self.timeout)

self.poller = poller.Poller()
self.thread = threading.Thread(target=self.run)
self.thread.setDaemon(True)
self.thread.start()


TESTING:
---------------
Added unit test for passive mode. See ovsdb-idl.at file.

TODO
----
Test this patch against C implementation


Signed-off-by: "Ofer Ben-Yacov" <ofer.benya...@gmail.com>

Tested-by: "Ofer Ben-Yacov" <ofer.benya...@gmail.com>

Requested-by: Ben Pfaff <b...@nicira.com>,
Requested-by: "D M, Vikas" <vikas....@hpe.com>,
Requested-by: "Kamat, Maruti Haridas" <maruti.ka...@hpe.com>,
Requested-by: "Sukhdev Kapur" <sukh...@arista.com>,
Requested-by: "Migliaccio, Armando" <armando.migliac...@hpe.com>

---
 python/ovs/db/idl.py  | 18 +++++++++++++++---
 python/ovs/jsonrpc.py | 18 ++++++++++--------
 python/ovs/stream.py  | 37 +++++++++++++++++++++++++------------
 tests/ovsdb-idl.at    | 31 +++++++++++++++++++++++++++++++
 tests/test-ovsdb.py   | 47 ++++++++++++++++++++++++++++++-----------------
 5 files changed, 111 insertions(+), 40 deletions(-)

diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
index e69d35e..24e9f11 100644
--- a/python/ovs/db/idl.py
+++ b/python/ovs/db/idl.py
@@ -86,7 +86,7 @@ class Idl(object):
       currently being constructed, if there is one, or None otherwise.
 """

-    def __init__(self, remote, schema):
+    def __init__(self, remote, schema, session=None):
         """Creates and returns a connection to the database named
'db_name' on
         'remote', which should be in a form acceptable to
         ovs.jsonrpc.session.open().  The connection will maintain an
in-memory
@@ -104,7 +104,16 @@ class Idl(object):
         As a convenience to users, 'schema' may also be an instance of the
         SchemaHelper class.

-        The IDL uses and modifies 'schema' directly."""
+        The IDL uses and modifies 'schema' directly.
+
+        In passive mode ( where the OVSDB server connects to its manager ),
+        we first need to wait for the OVSDB server to connect and then
+        pass the 'session' object (while the it is still open ) and
+        the schema we retrieved from the open session to the IDL to use it.
+
+        If in active mode, do not pass 'session' and it will be created
+        by IDL by using 'remote'.
+        """

         assert isinstance(schema, SchemaHelper)
         schema = schema.get_idl_schema()
@@ -112,7 +121,10 @@ class Idl(object):
         self.tables = schema.tables
         self.readonly = schema.readonly
         self._db = schema
-        self._session = ovs.jsonrpc.Session.open(remote)
+        if session:
+            self._session = session
+        else:
+            self._session = ovs.jsonrpc.Session.open(remote)
         self._monitor_request_id = None
         self._last_seqno = None
         self.change_seqno = 0
diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py
index e3ef6db..8f63a75 100644
--- a/python/ovs/jsonrpc.py
+++ b/python/ovs/jsonrpc.py
@@ -429,23 +429,25 @@ class Session(object):
         self.__disconnect()

         name = self.reconnect.get_name()
-        if not self.reconnect.is_passive():
-            error, self.stream = ovs.stream.Stream.open(name)
+        if self.reconnect.is_passive():
+            if self.pstream is not None:
+                self.pstream.close()
+            error, self.pstream = ovs.stream.PassiveStream.open(name)
             if not error:
-                self.reconnect.connecting(ovs.timeval.msec())
+                self.reconnect.listening(ovs.timeval.msec())
             else:
                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
-        elif self.pstream is not None:
-            error, self.pstream = ovs.stream.PassiveStream.open(name)
+        else:
+            error, self.stream = ovs.stream.Stream.open(name)
             if not error:
-                self.reconnect.listening(ovs.timeval.msec())
+                self.reconnect.connecting(ovs.timeval.msec())
             else:
                 self.reconnect.connect_failed(ovs.timeval.msec(), error)

         self.seqno += 1

     def run(self):
-        if self.pstream is not None:
+        if self.pstream is not None and self.stream is None:
             error, stream = self.pstream.accept()
             if error == 0:
                 if self.rpc or self.stream:
@@ -455,11 +457,11 @@ class Session(object):
                     self.__disconnect()
                 self.reconnect.connected(ovs.timeval.msec())
                 self.rpc = Connection(stream)
+                self.stream = stream
             elif error != errno.EAGAIN:
                 self.reconnect.listen_error(ovs.timeval.msec(), error)
                 self.pstream.close()
                 self.pstream = None
-
         if self.rpc:
             backlog = self.rpc.get_backlog()
             self.rpc.run()
diff --git a/python/ovs/stream.py b/python/ovs/stream.py
index bc14836..656ba74 100644
--- a/python/ovs/stream.py
+++ b/python/ovs/stream.py
@@ -271,9 +271,9 @@ class PassiveStream(object):
     @staticmethod
     def is_valid_name(name):
         """Returns True if 'name' is a passive stream name in the form
-        "TYPE:ARGS" and TYPE is a supported passive stream type (currently
only
-        "punix:"), otherwise False."""
-        return name.startswith("punix:")
+        "TYPE:ARGS" and TYPE is a supported passive stream type (currently
+        "punix:" or "ptcp"), otherwise False."""
+        return name.startswith("punix:") | name.startswith("ptcp:")

     def __init__(self, sock, name, bind_path):
         self.name = name
@@ -284,22 +284,32 @@ class PassiveStream(object):
     def open(name):
         """Attempts to start listening for remote stream connections.
'name'
         is a connection name in the form "TYPE:ARGS", where TYPE is an
passive
-        stream class's name and ARGS are stream class-specific.  Currently
the
-        only supported TYPE is "punix".
+        stream class's name and ARGS are stream class-specific. Currently
the
+        supported values for TYPE are "punix" and "ptcp".

         Returns (error, pstream): on success 'error' is 0 and 'pstream' is
the
         new PassiveStream, on failure 'error' is a positive errno value and
         'pstream' is None."""
         if not PassiveStream.is_valid_name(name):
             return errno.EAFNOSUPPORT, None
-
-        bind_path = name[6:]
+        bind_path = None
         if name.startswith("punix:"):
+            bind_path = name[6:]
             bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
-        error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
-                                                       True, bind_path,
None)
-        if error:
-            return error, None
+            error, sock =
ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
+                                                           True, bind_path,
+                                                           None)
+            if error:
+                return error, None
+
+        elif name.startswith("ptcp:"):
+            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+            remote = name.split(':')
+            sock.bind((remote[1], int(remote[2])))
+
+        else:
+            raise Exception('Unknown connection string')

         try:
             sock.listen(10)
@@ -330,7 +340,10 @@ class PassiveStream(object):
             try:
                 sock, addr = self.socket.accept()
                 ovs.socket_util.set_nonblocking(sock)
-                return 0, Stream(sock, "unix:%s" % addr, 0)
+                if (sock.family == socket.AF_UNIX):
+                    return 0, Stream(sock, "unix:%s" % addr, 0)
+                return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
+                                                       str(addr[1])), 0)
             except socket.error as e:
                 error = ovs.socket_util.get_exception_errno(e)
                 if error != errno.EAGAIN:
diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at
index ebf82a5..813812e 100644
--- a/tests/ovsdb-idl.at
+++ b/tests/ovsdb-idl.at
@@ -111,6 +111,37 @@ m4_define([OVSDB_CHECK_IDL],
    OVSDB_CHECK_IDL_TCP_PY($@)
    OVSDB_CHECK_IDL_TCP6_PY($@)])

+
+# This test uses the Python IDL implementation with passive tcp
+m4_define([OVSDB_CHECK_IDL_PASSIVE_TCP_PY],
+  [AT_SETUP([$1 - Python ptcp])
+   AT_SKIP_IF([test $HAVE_PYTHON = no])
+   AT_KEYWORDS([ovsdb server idl positive Python with tcp socket $5])
+   AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema],
+                  [0], [stdout], [ignore])
+   # find free TCP port
+   AT_CHECK([ovsdb-server --log-file
'-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir
--pidfile="`pwd`"/pid --remote=punix:socket --remote=ptcp:0:127.0.0.1
--unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
+   PARSE_LISTENING_PORT([ovsdb-server.log], [TCP_PORT])
+   AT_CHECK([kill `cat pid`])
+
+   # start OVSDB server in passive mode
+   AT_CHECK([ovsdb-server --log-file
'-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir
--pidfile="`pwd`"/pid --remote=punix:socket --remote=tcp:127.0.0.1:$TCP_PORT
--unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
+   AT_CHECK([$PYTHON $srcdir/test-ovsdb.py -t10 idl
$srcdir/idltest.ovsschema ptcp:127.0.0.1:$TCP_PORT $3],
+      [0], [stdout], [ignore], [kill `cat pid`])
+   AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$6],,, [[|
$6]]),
+            [0], [$4], [], [kill `cat pid`])
+   AT_CLEANUP
+   ])
+
+
+OVSDB_CHECK_IDL_PASSIVE_TCP_PY([simple passive idl, initially empty,
select empty],
+  [],
+  [['["idltest",{"op":"select","table":"link1","where":[]}]']],
+  [[000: empty
+001: {"error":null,"result":[{"rows":[]}]}
+002: done
+]])
+
 OVSDB_CHECK_IDL([simple idl, initially empty, no ops],
   [],
   [],
diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py
index 73c3048..c28ed6b 100644
--- a/tests/test-ovsdb.py
+++ b/tests/test-ovsdb.py
@@ -407,17 +407,29 @@ def do_idl(schema_file, remote, *commands):
         commands = commands[1:]
     else:
         schema_helper.register_all()
-    idl = ovs.db.idl.Idl(remote, schema_helper)

-    if commands:
-        error, stream = ovs.stream.Stream.open_block(
-            ovs.stream.Stream.open(remote))
-        if error:
-            sys.stderr.write("failed to connect to \"%s\"" % remote)
-            sys.exit(1)
-        rpc = ovs.jsonrpc.Connection(stream)
+    passive = remote.startswith('ptcp')
+    if passive:
+        session = ovs.jsonrpc.Session.open(remote)
+        # first call to session.run creates the PassiveStream object and
+        # second one accept incoming connection
+        session.run()
+        session.run()
+
+        rpc = session.rpc
+        idl = ovs.db.idl.Idl(remote, schema_helper, session)
     else:
-        rpc = None
+        idl = ovs.db.idl.Idl(remote, schema_helper)
+
+        if commands:
+            error, stream = ovs.stream.Stream.open_block(
+                ovs.stream.Stream.open(remote))
+            if error:
+                sys.stderr.write("failed to connect to \"%s\"" % remote)
+                sys.exit(1)
+            rpc = ovs.jsonrpc.Connection(stream)
+        else:
+            rpc = None

     symtab = {}
     seqno = 0
@@ -475,14 +487,15 @@ def do_idl(schema_file, remote, *commands):
             sys.stdout.write("%s\n" % ovs.json.to_string(reply.to_json()))
             sys.stdout.flush()

-    if rpc:
-        rpc.close()
-    while idl.change_seqno == seqno and not idl.run():
-        poller = ovs.poller.Poller()
-        idl.wait(poller)
-        poller.block()
-    print_idl(idl, step)
-    step += 1
+    if not passive:
+        if rpc:
+            rpc.close()
+        while idl.change_seqno == seqno and not idl.run():
+            poller = ovs.poller.Poller()
+            idl.wait(poller)
+            poller.block()
+        print_idl(idl, step)
+        step += 1
     idl.close()
     print("%03d: done" % step)

--
2.1.4
_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to