Add support for monitor_cond_since / update3 to python-ovs to
allow more efficient reconnections when connecting to clustered
OVSDB servers.

Signed-off-by: Terry Wilson <twil...@redhat.com>
---
 python/ovs/db/idl.py | 245 ++++++++++++++++++++++++++++++++++++-------
 tests/ovsdb-idl.at   |   2 +-
 2 files changed, 211 insertions(+), 36 deletions(-)

diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
index 60e58b03e..0d5e00208 100644
--- a/python/ovs/db/idl.py
+++ b/python/ovs/db/idl.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 
 import collections
+import enum
 import functools
 import uuid
 
@@ -36,6 +37,7 @@ ROW_DELETE = "delete"
 
 OVSDB_UPDATE = 0
 OVSDB_UPDATE2 = 1
+OVSDB_UPDATE3 = 2
 
 CLUSTERED = "clustered"
 RELAY = "relay"
@@ -75,6 +77,65 @@ class ColumnDefaultDict(dict):
         return item in self.keys()
 
 
+class Monitor(enum.IntEnum):
+    monitor = OVSDB_UPDATE
+    monitor_cond = OVSDB_UPDATE2
+    monitor_cond_since = OVSDB_UPDATE3
+
+
+class ConditionState(object):
+    def __init__(self):
+        self._ack_cond = None
+        self._req_cond = None
+        self._new_cond = [True]
+
+    def __iter__(self):
+        return iter([self._new_cond, self._req_cond, self._ack_cond])
+
+    @property
+    def new(self):
+        """The latest freshly initialized condition change"""
+        return self._new_cond
+
+    @property
+    def acked(self):
+        """The last condition change that has been accepted by the server"""
+        return self._ack_cond
+
+    @property
+    def requested(self):
+        """A condition that's been requested, but not acked by the server"""
+        return self._req_cond
+
+    @property
+    def latest(self):
+        """The most recent condition change"""
+        return next(cond for cond in self if cond is not None)
+
+    @staticmethod
+    def is_true(condition):
+        return condition == [True]
+
+    def init(self, cond):
+        """Signal that a condition change is being initiated"""
+        self._new_cond = cond
+
+    def ack(self):
+        """Signal that a condition change has been acked"""
+        if self._req_cond is not None:
+            self._ack_cond, self._req_cond = (self._req_cond, None)
+
+    def request(self):
+        """Signal that a condition change has been requested"""
+        if self._new_cond is not None:
+            self._req_cond, self._new_cond = (self._new_cond, None)
+
+    def reset(self):
+        """Reset a requested condition change back to new"""
+        if self._req_cond is not None and self._new_cond is None:
+            self._new_cond, self._req_cond = (self._req_cond, None)
+
+
 class Idl(object):
     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
 
@@ -132,7 +193,13 @@ class Idl(object):
     IDL_S_SERVER_MONITOR_REQUESTED = 2
     IDL_S_DATA_MONITOR_REQUESTED = 3
     IDL_S_DATA_MONITOR_COND_REQUESTED = 4
-    IDL_S_MONITORING = 5
+    IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED = 5
+    IDL_S_MONITORING = 6
+
+    monitor_map = {
+        Monitor.monitor: IDL_S_SERVER_MONITOR_REQUESTED,
+        Monitor.monitor_cond: IDL_S_DATA_MONITOR_COND_REQUESTED,
+        Monitor.monitor_cond_since: IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED}
 
     def __init__(self, remote, schema_helper, probe_interval=None,
                  leader_only=True):
@@ -176,10 +243,12 @@ class Idl(object):
         remotes = self._parse_remotes(remote)
         self._session = ovs.jsonrpc.Session.open_multiple(remotes,
             probe_interval=probe_interval)
+        self._request_id = None
         self._monitor_request_id = None
         self._last_seqno = None
         self.change_seqno = 0
         self.uuid = uuid.uuid1()
+        self.last_id = str(uuid.UUID(int=0))
 
         # Server monitor.
         self._server_schema_request_id = None
@@ -206,6 +275,9 @@ class Idl(object):
         self.txn = None
         self._outstanding_txns = {}
 
+        self.cond_changed = False
+        self.cond_seqno = 0
+
         for table in schema.tables.values():
             for column in table.columns.values():
                 if not hasattr(column, 'alert'):
@@ -213,8 +285,7 @@ class Idl(object):
             table.need_table = False
             table.rows = custom_index.IndexedRows(table)
             table.idl = self
-            table.condition = [True]
-            table.cond_changed = False
+            table.condition = ConditionState()
 
     def _parse_remotes(self, remote):
         # If remote is -
@@ -252,6 +323,38 @@ class Idl(object):
         update."""
         self._session.close()
 
+    def ack_conditions(self):
+        """Mark all requested table conditions as acked"""
+        for table in self.tables.values():
+            table.condition.ack()
+
+    def sync_conditions(self):
+        """Synchronize condition state when the FSM is restarted
+
+        If a non-zero last_id is available for the DB, then upon reconnect
+        the IDL should first request acked conditions to avoid missing updates
+        about records that were added before the transaction with
+        txn-id == last_id. If there were requested condition changes in flight
+        and the IDL client didn't set new conditions, then reset the requested
+        conditions to new to trigger a follow-up monitor_cond_change request
+        """
+        ack_all = self.last_id == str(uuid.UUID(int=0))
+        for table in self.tables.values():
+            if ack_all:
+                table.condition.request()
+                table.condition.ack()
+            else:
+                table.condition.reset()
+                self.cond_changed = True
+
+    def restart_fsm(self):
+        # Resync data DB table conditions to avoid missing updated due to
+        # conditions that were in flight or changed locally while the
+        # connection was down.
+        self.sync_conditions()
+        self.__send_server_schema_request()
+        self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
+
     def run(self):
         """Processes a batch of messages from the database server.  Returns
         True if the database as seen through the IDL changed, False if it did
@@ -286,7 +389,7 @@ class Idl(object):
             if seqno != self._last_seqno:
                 self._last_seqno = seqno
                 self.__txn_abort_all()
-                self.__send_server_schema_request()
+                self.restart_fsm()
                 if self.lock_name:
                     self.__send_lock_request()
                 break
@@ -294,8 +397,20 @@ class Idl(object):
             msg = self._session.recv()
             if msg is None:
                 break
+            is_response = msg.type in (ovs.jsonrpc.Message.T_REPLY,
+                                       ovs.jsonrpc.Message.T_ERROR)
+
+            if is_response and self._request_id and self._request_id == msg.id:
+                self._request_id = None
+                # process_response follows
 
             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
+                    and msg.method == "update3"
+                    and len(msg.params) == 3):
+                # Database contents changed.
+                self.__parse_update(msg.params[2], OVSDB_UPDATE3)
+                self.last_id = msg.params[1]
+            elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
                     and msg.method == "update2"
                     and len(msg.params) == 2):
                 # Database contents changed.
@@ -320,11 +435,18 @@ class Idl(object):
                 try:
                     self.change_seqno += 1
                     self._monitor_request_id = None
-                    self.__clear()
-                    if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
+                    if (self.state ==
+                            self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED):
+                        # If 'found' is false, clear table rows for new dump
+                        if not msg.result[0]:
+                            self.__clear()
+                        self.__parse_update(msg.result[2], OVSDB_UPDATE3)
+                    elif self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
+                        self.__clear()
                         self.__parse_update(msg.result, OVSDB_UPDATE2)
                     else:
                         assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
+                        self.__clear()
                         self.__parse_update(msg.result, OVSDB_UPDATE)
                     self.state = self.IDL_S_MONITORING
 
@@ -398,11 +520,17 @@ class Idl(object):
             elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
                 # Reply to our echo request.  Ignore it.
                 pass
+            elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
+                  self.state == (
+                      self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED) and
+                      self._monitor_request_id == msg.id):
+                if msg.error == "unknown method":
+                    self.__send_monitor_request(Monitor.monitor_cond)
             elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
                   self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
                   self._monitor_request_id == msg.id):
                 if msg.error == "unknown method":
-                    self.__send_monitor_request()
+                    self.__send_monitor_request(Monitor.monitor)
             elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
                   self._server_schema_request_id is not None and
                   self._server_schema_request_id == msg.id):
@@ -418,6 +546,13 @@ class Idl(object):
                   and self.__txn_process_reply(msg)):
                 # __txn_process_reply() did everything needed.
                 pass
+            elif (msg.type == ovs.jsonrpc.Message.T_REPLY and
+                  self.state == self.IDL_S_MONITORING):
+                # Mark the last requested conditions as acked and if further
+                # condition changes were pending, send them now.
+                self.ack_conditions()
+                self.send_cond_change()
+                self.cond_seqno += 1
             else:
                 # This can happen if a transaction is destroyed before we
                 # receive the reply, so keep the log level low.
@@ -427,14 +562,36 @@ class Idl(object):
 
         return initial_change_seqno != self.change_seqno
 
-    def send_cond_change(self):
-        if not self._session.is_connected():
+    def compose_cond_change(self):
+        if not self.cond_changed:
             return
 
+        change_requests = {}
         for table in self.tables.values():
-            if table.cond_changed:
-                self.__send_cond_change(table, table.condition)
-                table.cond_changed = False
+            # Always use the most recent conditions set by the IDL client when
+            # requesting monitor_cond_change
+            if table.condition.new is not None:
+                change_requests[table.name] = [
+                    {"where": table.condition.new}]
+                table.condition.request()
+
+        if not change_requests:
+            return
+
+        self.cond_changed = False
+        old_uuid = str(self.uuid)
+        self.uuid = uuid.uuid1()
+        params = [old_uuid, str(self.uuid), change_requests]
+        return ovs.jsonrpc.Message.create_request(
+            "monitor_cond_change", params)
+
+    def send_cond_change(self):
+        if not self._session.is_connected() or self._request_id is not None:
+            return
+
+        msg = self.compose_cond_change()
+        if msg:
+            self.send_request(msg)
 
     def cond_change(self, table_name, cond):
         """Sets the condition for 'table_name' to 'cond', which should be a
@@ -450,13 +607,28 @@ class Idl(object):
 
         if cond == []:
             cond = [False]
-        if table.condition != cond:
-            table.condition = cond
-            table.cond_changed = True
+
+        # Compare the new condition to the last known condition
+        if table.condition.latest != cond:
+            table.condition.init(cond)
+            self.cond_changed = True
+
+        # New condition will be sent out after all already requested ones
+        # are acked.
+        if table.condition.new:
+            any_reqs = any(t.condition.request for t in self.tables.values())
+            return self.cond_seqno + int(any_reqs) + 1
+
+        # Already requested conditions should be up to date at
+        # self.cond_seqno + 1 while acked conditions are already up to date
+        return self.cond_seqno + int(bool(table.condition.requested))
 
     def wait(self, poller):
         """Arranges for poller.block() to wake up when self.run() has something
         to do or when activity occurs on a transaction on 'self'."""
+        if self.cond_changed:
+            poller.immediate_wake()
+            return
         self._session.wait(poller)
         self._session.recv_wait(poller)
 
@@ -531,14 +703,6 @@ class Idl(object):
         to doing nothing to avoid overhead where it is not needed.
         """
 
-    def __send_cond_change(self, table, cond):
-        monitor_cond_change = {table.name: [{"where": cond}]}
-        old_uuid = str(self.uuid)
-        self.uuid = uuid.uuid1()
-        params = [old_uuid, str(self.uuid), monitor_cond_change]
-        msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
-        self._session.send(msg)
-
     def __clear(self):
         changed = False
 
@@ -547,6 +711,8 @@ class Idl(object):
                 changed = True
                 table.rows = custom_index.IndexedRows(table)
 
+        self.cond_seqno = 0
+
         if changed:
             self.change_seqno += 1
 
@@ -601,11 +767,18 @@ class Idl(object):
         self._db_change_aware_request_id = msg.id
         self._session.send(msg)
 
-    def __send_monitor_request(self):
-        if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED,
-                           self.IDL_S_INITIAL]):
+    def send_request(self, request):
+        self._request_id = request.id
+        if self._session.is_connected():
+            return self._session.send(request)
+
+    def __send_monitor_request(self, max_version=Monitor.monitor_cond_since):
+        if self.state == self.IDL_S_INITIAL:
             self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
             method = "monitor_cond"
+        elif self.state == self.IDL_S_SERVER_MONITOR_REQUESTED:
+            self.state = self.monitor_map[Monitor(max_version)]
+            method = Monitor(max_version).name
         else:
             self.state = self.IDL_S_DATA_MONITOR_REQUESTED
             method = "monitor"
@@ -619,22 +792,24 @@ class Idl(object):
                         (column not in self.readonly[table.name])):
                     columns.append(column)
             monitor_request = {"columns": columns}
-            if method == "monitor_cond" and table.condition != [True]:
-                monitor_request["where"] = table.condition
-                table.cond_change = False
+            if method in ("monitor_cond", "monitor_cond_since") and (
+                    not ConditionState.is_true(table.condition.acked)):
+                monitor_request["where"] = table.condition.acked
             monitor_requests[table.name] = [monitor_request]
 
-        msg = ovs.jsonrpc.Message.create_request(
-            method, [self._db.name, str(self.uuid), monitor_requests])
+        args = [self._db.name, str(self.uuid), monitor_requests]
+        if method == "monitor_cond_since":
+            args.append(str(self.last_id))
+        msg = ovs.jsonrpc.Message.create_request(method, args)
         self._monitor_request_id = msg.id
-        self._session.send(msg)
+        self.send_request(msg)
 
     def __send_server_schema_request(self):
         self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
         msg = ovs.jsonrpc.Message.create_request(
             "get_schema", [self._server_db_name, str(self.uuid)])
         self._server_schema_request_id = msg.id
-        self._session.send(msg)
+        self.send_request(msg)
 
     def __send_server_monitor_request(self):
         self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
@@ -654,7 +829,7 @@ class Idl(object):
                              str(self.server_monitor_uuid),
                              monitor_requests])
         self._server_monitor_request_id = msg.id
-        self._session.send(msg)
+        self.send_request(msg)
 
     def __parse_update(self, update, version, tables=None):
         try:
@@ -698,7 +873,7 @@ class Idl(object):
 
                 self.cooperative_yield()
 
-                if version == OVSDB_UPDATE2:
+                if version in (OVSDB_UPDATE2, OVSDB_UPDATE3):
                     changes = self.__process_update2(table, uuid, row_update)
                     if changes:
                         notices.append(changes)
diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at
index 3adb9d638..86a75f920 100644
--- a/tests/ovsdb-idl.at
+++ b/tests/ovsdb-idl.at
@@ -2319,7 +2319,7 @@ m4_define([OVSDB_CHECK_CLUSTER_IDL],
 
 # Checks that monitor_cond_since works fine when disconnects happen
 # with cond_change requests in flight (i.e., IDL is properly updated).
-OVSDB_CHECK_CLUSTER_IDL_C([simple idl, monitor_cond_since, cluster disconnect],
+OVSDB_CHECK_CLUSTER_IDL([simple idl, monitor_cond_since, cluster disconnect],
   3,
   [['["idltest",
        {"op": "insert",
-- 
2.31.1

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to