Add support for monitor_cond_since / update3 to python-ovs to allow more efficient reconnections when connecting to clustered OVSDB servers.
Signed-off-by: Terry Wilson <twil...@redhat.com> --- python/ovs/db/idl.py | 231 ++++++++++++++++++++++++++++++++++++------- tests/ovsdb-idl.at | 2 +- 2 files changed, 197 insertions(+), 36 deletions(-) diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py index 87ee06cde..c5d3ccdba 100644 --- a/python/ovs/db/idl.py +++ b/python/ovs/db/idl.py @@ -13,6 +13,7 @@ # limitations under the License. import collections +import enum import functools import uuid @@ -36,6 +37,7 @@ ROW_DELETE = "delete" OVSDB_UPDATE = 0 OVSDB_UPDATE2 = 1 +OVSDB_UPDATE3 = 2 CLUSTERED = "clustered" RELAY = "relay" @@ -45,6 +47,60 @@ Notice = collections.namedtuple('Notice', ('event', 'row', 'updates')) Notice.__new__.__defaults__ = (None,) # default updates=None +class Monitor(enum.IntEnum): + monitor = OVSDB_UPDATE + monitor_cond = OVSDB_UPDATE2 + monitor_cond_since = OVSDB_UPDATE3 + + +class ConditionState(object): + def __init__(self): + self._ack_cond = None + self._req_cond = None + self._new_cond = [True] + + def __iter__(self): + return iter([self._new_cond, self._req_cond, self._ack_cond]) + + @property + def new(self): + """The latest freshly initialized condition change""" + return self._new_cond + + @property + def acked(self): + """The last condition change that has been accepted by the server""" + return self._ack_cond + + @property + def latest(self): + """The most recent condition change""" + return next(cond for cond in self if cond is not None) + + @staticmethod + def is_true(condition): + return condition == [True] + + def init(self, cond): + """Signal that a a condition change is being initiated""" + self._new_cond = cond + + def ack(self): + """Signal that a condition change has been acked""" + if self._req_cond is not None: + self._ack_cond, self._req_cond = (self._req_cond, None) + + def request(self): + """Signal that a condition change has been requested""" + if self._new_cond is not None: + self._req_cond, self._new_cond = (self._new_cond, None) + + def reset(self): + """Reset a requested condition change back to new""" + if self._req_cond is not None and self._new_cond is None: + self._new_cond, self._req_cond = (self._req_cond, None) + + class Idl(object): """Open vSwitch Database Interface Definition Language (OVSDB IDL). @@ -102,7 +158,13 @@ class Idl(object): IDL_S_SERVER_MONITOR_REQUESTED = 2 IDL_S_DATA_MONITOR_REQUESTED = 3 IDL_S_DATA_MONITOR_COND_REQUESTED = 4 - IDL_S_MONITORING = 5 + IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED = 5 + IDL_S_MONITORING = 6 + + monitor_map = { + Monitor.monitor: IDL_S_SERVER_MONITOR_REQUESTED, + Monitor.monitor_cond: IDL_S_DATA_MONITOR_COND_REQUESTED, + Monitor.monitor_cond_since: IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED} def __init__(self, remote, schema_helper, probe_interval=None, leader_only=True): @@ -146,10 +208,12 @@ class Idl(object): remotes = self._parse_remotes(remote) self._session = ovs.jsonrpc.Session.open_multiple(remotes, probe_interval=probe_interval) + self._request_id = None self._monitor_request_id = None self._last_seqno = None self.change_seqno = 0 self.uuid = uuid.uuid1() + self.last_id = str(uuid.UUID(int=0)) # Server monitor. self._server_schema_request_id = None @@ -176,6 +240,9 @@ class Idl(object): self.txn = None self._outstanding_txns = {} + self.cond_changed = False + self.cond_seqno = 0 + for table in schema.tables.values(): for column in table.columns.values(): if not hasattr(column, 'alert'): @@ -183,8 +250,7 @@ class Idl(object): table.need_table = False table.rows = custom_index.IndexedRows(table) table.idl = self - table.condition = [True] - table.cond_changed = False + table.condition = ConditionState() def _parse_remotes(self, remote): # If remote is - @@ -222,6 +288,38 @@ class Idl(object): update.""" self._session.close() + def ack_conditions(self): + """Mark all requested table conditions as acked""" + for table in self.tables.values(): + table.condition.ack() + + def sync_conditions(self): + """Synchronize condition state when the FSM is restarted + + If a non-zero last_id is available for the DB, then upon reconnect + the IDL should first request acked conditions to avoid missing updates + about records that were added before the transaction with + txn-id == last_id. If there were requested condition changes in flight + and the IDL client didn't set new conditions, then reset the requested + conditions to new to trigger a follow-up monitor_cond_change request + """ + ack_all = self.last_id == str(uuid.UUID(int=0)) + for table in self.tables.values(): + if ack_all: + table.condition.request() + table.condition.ack() + else: + table.condition.reset() + self.cond_changed = True + + def restart_fsm(self): + # Resync data DB table conditions to avoid missing updated due to + # conditions that were in flight or changed locally while the + # connection was down. + self.sync_conditions() + self.__send_server_schema_request() + self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED + def run(self): """Processes a batch of messages from the database server. Returns True if the database as seen through the IDL changed, False if it did @@ -256,7 +354,7 @@ class Idl(object): if seqno != self._last_seqno: self._last_seqno = seqno self.__txn_abort_all() - self.__send_server_schema_request() + self.restart_fsm() if self.lock_name: self.__send_lock_request() break @@ -264,8 +362,19 @@ class Idl(object): msg = self._session.recv() if msg is None: break + is_response = msg.type in (ovs.jsonrpc.Message.T_REPLY, + ovs.jsonrpc.Message.T_ERROR) + + if is_response and self._request_id and self._request_id == msg.id: + self._request_id = None + # process_response follows if (msg.type == ovs.jsonrpc.Message.T_NOTIFY + and msg.method == "update3" + and len(msg.params) == 3): + self.__parse_update(msg.params[2], OVSDB_UPDATE3) + self.last_id = msg.params[1] + elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.method == "update2" and len(msg.params) == 2): # Database contents changed. @@ -290,11 +399,18 @@ class Idl(object): try: self.change_seqno += 1 self._monitor_request_id = None - self.__clear() - if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED: + if (self.state == + self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED): + # If 'found' is false, clear table rows for new dump + if not msg.result[0]: + self.__clear() + self.__parse_update(msg.result[2], OVSDB_UPDATE3) + elif self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED: + self.__clear() self.__parse_update(msg.result, OVSDB_UPDATE2) else: assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED + self.__clear() self.__parse_update(msg.result, OVSDB_UPDATE) self.state = self.IDL_S_MONITORING @@ -368,11 +484,17 @@ class Idl(object): elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo": # Reply to our echo request. Ignore it. pass + elif (msg.type == ovs.jsonrpc.Message.T_ERROR and + self.state == ( + self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED) and + self._monitor_request_id == msg.id): + if msg.error == "unknown method": + self.__send_monitor_request(Monitor.monitor_cond) elif (msg.type == ovs.jsonrpc.Message.T_ERROR and self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and self._monitor_request_id == msg.id): if msg.error == "unknown method": - self.__send_monitor_request() + self.__send_monitor_request(Monitor.monitor) elif (msg.type == ovs.jsonrpc.Message.T_ERROR and self._server_schema_request_id is not None and self._server_schema_request_id == msg.id): @@ -388,6 +510,13 @@ class Idl(object): and self.__txn_process_reply(msg)): # __txn_process_reply() did everything needed. pass + elif (msg.type == ovs.jsonrpc.Message.T_REPLY and + self.state == self.IDL_S_MONITORING): + # Mark the last requested conditions as acked and if further + # condition changes were pending, send them now. + self.ack_conditions() + self.send_cond_change() + self.cond_seqno += 1 else: # This can happen if a transaction is destroyed before we # receive the reply, so keep the log level low. @@ -397,14 +526,36 @@ class Idl(object): return initial_change_seqno != self.change_seqno - def send_cond_change(self): - if not self._session.is_connected(): + def compose_cond_change(self): + if not self.cond_changed: return + change_requests = {} for table in self.tables.values(): - if table.cond_changed: - self.__send_cond_change(table, table.condition) - table.cond_changed = False + # Always use the most recent conditions set by the IDL client when + # requesting monitor_cond_change + if table.condition.new is not None: + change_requests[table.name] = [ + {"where": table.condition.new}] + table.condition.request() + + if not change_requests: + return + + self.cond_changed = False + old_uuid = str(self.uuid) + self.uuid = uuid.uuid1() + params = [old_uuid, str(self.uuid), change_requests] + return ovs.jsonrpc.Message.create_request( + "monitor_cond_change", params) + + def send_cond_change(self): + if not self._session.is_connected() or self._request_id is not None: + return + + msg = self.compose_cond_change() + if msg: + self.send_request(msg) def cond_change(self, table_name, cond): """Sets the condition for 'table_name' to 'cond', which should be a @@ -420,9 +571,16 @@ class Idl(object): if cond == []: cond = [False] - if table.condition != cond: - table.condition = cond - table.cond_changed = True + + # Compare the new condition to the last known condition + if table.condition.latest != cond: + table.condition.init(cond) + self.cond_changed = True + p = ovs.poller.Poller() + p.immediate_wake() + return self.cond_seqno + 1 + + return self.cond_seqno def wait(self, poller): """Arranges for poller.block() to wake up when self.run() has something @@ -501,14 +659,6 @@ class Idl(object): to doing nothing to avoid overhead where it is not needed. """ - def __send_cond_change(self, table, cond): - monitor_cond_change = {table.name: [{"where": cond}]} - old_uuid = str(self.uuid) - self.uuid = uuid.uuid1() - params = [old_uuid, str(self.uuid), monitor_cond_change] - msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params) - self._session.send(msg) - def __clear(self): changed = False @@ -517,6 +667,8 @@ class Idl(object): changed = True table.rows = custom_index.IndexedRows(table) + self.cond_seqno = 0 + if changed: self.change_seqno += 1 @@ -571,11 +723,18 @@ class Idl(object): self._db_change_aware_request_id = msg.id self._session.send(msg) - def __send_monitor_request(self): - if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED, - self.IDL_S_INITIAL]): + def send_request(self, request): + self._request_id = request.id + if self._session.is_connected(): + return self._session.send(request) + + def __send_monitor_request(self, max_version=Monitor.monitor_cond_since): + if self.state == self.IDL_S_INITIAL: self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED method = "monitor_cond" + elif self.state == self.IDL_S_SERVER_MONITOR_REQUESTED: + self.state = self.monitor_map[Monitor(max_version)] + method = Monitor(max_version).name else: self.state = self.IDL_S_DATA_MONITOR_REQUESTED method = "monitor" @@ -589,22 +748,24 @@ class Idl(object): (column not in self.readonly[table.name])): columns.append(column) monitor_request = {"columns": columns} - if method == "monitor_cond" and table.condition != [True]: - monitor_request["where"] = table.condition - table.cond_change = False + if method in ("monitor_cond", "monitor_cond_since") and ( + not ConditionState.is_true(table.condition.acked)): + monitor_request["where"] = table.condition.acked monitor_requests[table.name] = [monitor_request] - msg = ovs.jsonrpc.Message.create_request( - method, [self._db.name, str(self.uuid), monitor_requests]) + args = [self._db.name, str(self.uuid), monitor_requests] + if method == "monitor_cond_since": + args.append(str(self.last_id)) + msg = ovs.jsonrpc.Message.create_request(method, args) self._monitor_request_id = msg.id - self._session.send(msg) + self.send_request(msg) def __send_server_schema_request(self): self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED msg = ovs.jsonrpc.Message.create_request( "get_schema", [self._server_db_name, str(self.uuid)]) self._server_schema_request_id = msg.id - self._session.send(msg) + self.send_request(msg) def __send_server_monitor_request(self): self.state = self.IDL_S_SERVER_MONITOR_REQUESTED @@ -624,7 +785,7 @@ class Idl(object): str(self.server_monitor_uuid), monitor_requests]) self._server_monitor_request_id = msg.id - self._session.send(msg) + self.send_request(msg) def __parse_update(self, update, version, tables=None): try: @@ -668,7 +829,7 @@ class Idl(object): self.cooperative_yield() - if version == OVSDB_UPDATE2: + if version in (OVSDB_UPDATE2, OVSDB_UPDATE3): changes = self.__process_update2(table, uuid, row_update) if changes: notices.append(changes) diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at index 0f229b2f9..1de7f4e67 100644 --- a/tests/ovsdb-idl.at +++ b/tests/ovsdb-idl.at @@ -2293,7 +2293,7 @@ m4_define([OVSDB_CHECK_CLUSTER_IDL], # Checks that monitor_cond_since works fine when disconnects happen # with cond_change requests in flight (i.e., IDL is properly updated). -OVSDB_CHECK_CLUSTER_IDL_C([simple idl, monitor_cond_since, cluster disconnect], +OVSDB_CHECK_CLUSTER_IDL([simple idl, monitor_cond_since, cluster disconnect], 3, [['["idltest", {"op": "insert", -- 2.31.1 _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev