Python idl works now with "monitor_cond" method. Add test
for backward compatibility with old "monitor" method.

Signed-off-by: Liran Schour <lir...@il.ibm.com>

---
v2->v3:
*cond_update() receives a single condition
---
 python/ovs/db/data.py |  18 ++++-
 python/ovs/db/idl.py  | 181 ++++++++++++++++++++++++++++++++++++++++++++------
 tests/ovsdb-idl.at    |  97 +++++++++++++++++++++++++++
 3 files changed, 271 insertions(+), 25 deletions(-)

diff --git a/python/ovs/db/data.py b/python/ovs/db/data.py
index 3075ee6..162ab19 100644
--- a/python/ovs/db/data.py
+++ b/python/ovs/db/data.py
@@ -146,7 +146,7 @@ class Atom(object):
                 % (self.to_string(), base.enum.to_string()))
         elif base.type in [ovs.db.types.IntegerType, ovs.db.types.RealType]:
             if ((base.min is None or self.value >= base.min) and
-                (base.max is None or self.value <= base.max)):
+                    (base.max is None or self.value <= base.max)):
                 pass
             elif base.min is not None and base.max is not None:
                 raise ConstraintViolation(
@@ -155,7 +155,7 @@ class Atom(object):
             elif base.min is not None:
                 raise ConstraintViolation(
                     "%s is less than minimum allowed value %.15g"
-                            % (self.to_string(), base.min))
+                    % (self.to_string(), base.min))
             else:
                 raise ConstraintViolation(
                     "%s is greater than maximum allowed value %.15g"
@@ -313,7 +313,7 @@ class Datum(object):
         that this function accepts."""
         is_map = type_.is_map()
         if (is_map or
-            (type(json) == list and len(json) > 0 and json[0] == "set")):
+                (type(json) == list and len(json) > 0 and json[0] == "set")):
             if is_map:
                 class_ = "map"
             else:
@@ -388,6 +388,18 @@ class Datum(object):
             s.append(tail)
         return ''.join(s)
 
+    def diff(self, datum):
+        if self.type.n_max > 1 or len(self.values) == 0:
+            for k, v in six.iteritems(datum.values):
+                if k in self.values and v == self.values[k]:
+                    del self.values[k]
+                else:
+                    self.values[k] = v
+        else:
+            return datum
+
+        return self
+
     def as_list(self):
         if self.type.is_map():
             return [[k.value, v.value] for k, v in six.iteritems(self.values)]
diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
index 3187db9..693ec91 100644
--- a/python/ovs/db/idl.py
+++ b/python/ovs/db/idl.py
@@ -32,6 +32,9 @@ ROW_CREATE = "create"
 ROW_UPDATE = "update"
 ROW_DELETE = "delete"
 
+OVSDB_UPDATE = 0
+OVSDB_UPDATE2 = 1
+
 
 class Idl(object):
     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
@@ -85,6 +88,10 @@ class Idl(object):
       currently being constructed, if there is one, or None otherwise.
 """
 
+    IDL_S_INITIAL = 0
+    IDL_S_MONITOR_REQUESTED = 1
+    IDL_S_MONITOR_COND_REQUESTED = 2
+
     def __init__(self, remote, schema):
         """Creates and returns a connection to the database named 'db_name' on
         'remote', which should be in a form acceptable to
@@ -115,6 +122,8 @@ class Idl(object):
         self._monitor_request_id = None
         self._last_seqno = None
         self.change_seqno = 0
+        self.uuid = uuid.uuid1()
+        self.state = self.IDL_S_INITIAL
 
         # Database locking.
         self.lock_name = None          # Name of lock we need, None if none.
@@ -133,6 +142,7 @@ class Idl(object):
             table.need_table = False
             table.rows = {}
             table.idl = self
+            table.condition = []
 
     def close(self):
         """Closes the connection to the database.  The IDL will no longer
@@ -179,11 +189,15 @@ class Idl(object):
             if msg is None:
                 break
             if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
-                and msg.method == "update"
-                and len(msg.params) == 2
-                and msg.params[0] is None):
+                    and msg.method == "update2"
+                    and len(msg.params) == 2):
+                # Database contents changed.
+                self.__parse_update(msg.params[1], OVSDB_UPDATE2)
+            elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
+                    and msg.method == "update"
+                    and len(msg.params) == 2):
                 # Database contents changed.
-                self.__parse_update(msg.params[1])
+                self.__parse_update(msg.params[1], OVSDB_UPDATE)
             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
                   and self._monitor_request_id is not None
                   and self._monitor_request_id == msg.id):
@@ -192,10 +206,15 @@ class Idl(object):
                     self.change_seqno += 1
                     self._monitor_request_id = None
                     self.__clear()
-                    self.__parse_update(msg.result)
-                except error.Error as e:
+                    if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
+                        self.__parse_update(msg.result, OVSDB_UPDATE2)
+                    else:
+                        assert self.state == self.IDL_S_MONITOR_REQUESTED
+                        self.__parse_update(msg.result, OVSDB_UPDATE)
+
+                except error.Error, e:
                     vlog.err("%s: parse error in received schema: %s"
-                              % (self._session.get_name(), e))
+                             % (self._session.get_name(), e))
                     self.__error()
             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
                   and self._lock_request_id is not None
@@ -213,6 +232,11 @@ class Idl(object):
             elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
                 # Reply to our echo request.  Ignore it.
                 pass
+            elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
+                  self.state == self.IDL_S_MONITOR_COND_REQUESTED and
+                  self._monitor_request_id == msg.id):
+                if msg.error == "unknown method":
+                    self.__send_monitor_request()
             elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
                                ovs.jsonrpc.Message.T_REPLY)
                   and self.__txn_process_reply(msg)):
@@ -227,6 +251,19 @@ class Idl(object):
 
         return initial_change_seqno != self.change_seqno
 
+    def cond_update(self, table_name, cond):
+        """Change conditions for this IDL session. If session is not already
+        connected, add condtion to table and submit it on send_monitor_request.
+        Otherwise  send monitor_cond_update method with the requested
+        changes."""
+        table = self.tables.get(table_name)
+        if not table:
+            raise error.Error('Unknown table "%s"' % table_name)
+        if self._session.is_connected():
+            self.__send_cond_update(table, cond)
+        else:
+            table.condition = cond
+
     def wait(self, poller):
         """Arranges for poller.block() to wake up when self.run() has something
         to do or when activity occurs on a transaction on 'self'."""
@@ -282,6 +319,14 @@ class Idl(object):
         :type updates:  Row
         """
 
+    def __send_cond_update(self, table, cond):
+        monitor_cond_update = {table.name: [{"where": cond}]}
+        old_uuid = str(self.uuid)
+        self.uuid = uuid.uuid1()
+        params = [old_uuid, str(self.uuid), monitor_cond_update]
+        msg = ovs.jsonrpc.Message.create_request("monitor_cond_update", params)
+        self._session.send(msg)
+
     def __clear(self):
         changed = False
 
@@ -331,36 +376,47 @@ class Idl(object):
 
     def __parse_lock_notify(self, params, new_has_lock):
         if (self.lock_name is not None
-            and type(params) in (list, tuple)
-            and params
-            and params[0] == self.lock_name):
+                and type(params) in (list, tuple)
+                and params
+                and params[0] == self.lock_name):
             self.__update_has_lock(new_has_lock)
             if not new_has_lock:
                 self.is_lock_contended = True
 
     def __send_monitor_request(self):
+        if self.state == self.IDL_S_INITIAL:
+            self.state = self.IDL_S_MONITOR_COND_REQUESTED
+            method = "monitor_cond"
+        else:
+            self.state = self.IDL_S_MONITOR_REQUESTED
+            method = "monitor"
+
         monitor_requests = {}
         for table in six.itervalues(self.tables):
             columns = []
             for column in six.iterkeys(table.columns):
                 if ((table.name not in self.readonly) or
-                    (table.name in self.readonly) and
-                    (column not in self.readonly[table.name])):
+                        (table.name in self.readonly) and
+                        (column not in self.readonly[table.name])):
                     columns.append(column)
             monitor_requests[table.name] = {"columns": columns}
+            if method == "monitor_cond" and table.condition:
+                monitor_requests[table.name]["where"] = table.condition
+                table.condition = None
+
         msg = ovs.jsonrpc.Message.create_request(
-            "monitor", [self._db.name, None, monitor_requests])
+            method, [self._db.name, str(self.uuid), monitor_requests])
         self._monitor_request_id = msg.id
         self._session.send(msg)
 
-    def __parse_update(self, update):
+    def __parse_update(self, update, version):
         try:
-            self.__do_parse_update(update)
-        except error.Error as e:
+            self.__do_parse_update(update, version)
+        except error.Error, e:
             vlog.err("%s: error parsing update: %s"
                      % (self._session.get_name(), e))
 
-    def __do_parse_update(self, table_updates):
+    def __do_parse_update(self, table_updates, version):
         if type(table_updates) != dict:
             raise error.Error("<table-updates> is not an object",
                               table_updates)
@@ -389,6 +445,11 @@ class Idl(object):
                                       'is not an object'
                                       % (table_name, uuid_string))
 
+                if version == OVSDB_UPDATE2:
+                    if self.__process_update2(table, uuid, row_update):
+                        self.change_seqno += 1
+                    continue
+
                 parser = ovs.db.parser.Parser(row_update, "row-update")
                 old = parser.get_optional("old", [dict])
                 new = parser.get_optional("new", [dict])
@@ -401,6 +462,46 @@ class Idl(object):
                 if self.__process_update(table, uuid, old, new):
                     self.change_seqno += 1
 
+    def __process_update2(self, table, uuid, row_update):
+        row = table.rows.get(uuid)
+        changed = False
+        if "delete" in row_update:
+            if row:
+                del table.rows[uuid]
+                self.notify(ROW_DELETE, row)
+                changed = True
+            else:
+                # XXX rate-limit
+                vlog.warn("cannot delete missing row %s from table"
+                          "%s" % (uuid, table.name))
+        elif "insert" in row_update or "initial" in row_update:
+            if row:
+                vlog.warn("cannot add existing row %s from table"
+                          " %s" % (uuid, table.name))
+                del table.rows[uuid]
+            row = self.__create_row(table, uuid)
+            if "insert" in row_update:
+                row_update = row_update['insert']
+            else:
+                row_update = row_update['initial']
+            self.__add_default(table, row_update)
+            if self.__row_update(table, row, row_update):
+                changed = True
+                self.notify(ROW_CREATE, row)
+        elif "modify" in row_update:
+            if not row:
+                raise error.Error('Modify non-existing row')
+
+            self.__apply_diff(table, row, row_update['modify'])
+            self.notify(ROW_UPDATE, row,
+                        Row.from_json(self, table,
+                                      uuid, row_update['modify']))
+            changed = True
+        else:
+            raise error.Error('<row-update> unknown operation',
+                              row_update)
+        return changed
+
     def __process_update(self, table, uuid, old, new):
         """Returns True if a column changed, False otherwise."""
         row = table.rows.get(uuid)
@@ -441,6 +542,42 @@ class Idl(object):
                 self.notify(op, row, Row.from_json(self, table, uuid, old))
         return changed
 
+    def __column_name(self, column):
+        if column.type.key.type == ovs.db.types.UuidType:
+            return ovs.ovsuuid.to_json(column.type.key.type.default)
+        else:
+            return column.type.key.type.default
+
+    def __add_default(self, table, row_update):
+        for column in table.columns.itervalues():
+            if column.name not in row_update:
+                if ((table.name not in self.readonly) or
+                        (table.name in self.readonly) and
+                        (column.name not in self.readonly[table.name])):
+                    if column.type.n_min != 0 and not column.type.is_map():
+                        row_update[column.name] = self.__column_name(column)
+
+    def __apply_diff(self, table, row, row_diff):
+        for column_name, datum_json in row_diff.iteritems():
+            column = table.columns.get(column_name)
+            if not column:
+                # XXX rate-limit
+                vlog.warn("unknown column %s updating table %s"
+                          % (column_name, table.name))
+                continue
+
+            try:
+                datum = ovs.db.data.Datum.from_json(column.type, datum_json)
+            except error.Error, e:
+                # XXX rate-limit
+                vlog.warn("error parsing column %s in table %s: %s"
+                          % (column_name, table.name, e))
+                continue
+
+            datum = row._data[column_name].diff(datum)
+            if datum != row._data[column_name]:
+                row._data[column_name] = datum
+
     def __row_update(self, table, row, row_json):
         changed = False
         for column_name, datum_json in six.iteritems(row_json):
@@ -593,7 +730,7 @@ class Row(object):
         assert self._idl.txn
 
         if ((self._table.name in self._idl.readonly) and
-            (column_name in self._idl.readonly[self._table.name])):
+                (column_name in self._idl.readonly[self._table.name])):
             vlog.warn("attempting to write to readonly column %s"
                       % column_name)
             return
@@ -829,8 +966,8 @@ class Transaction(object):
     def _substitute_uuids(self, json):
         if type(json) in (list, tuple):
             if (len(json) == 2
-                and json[0] == 'uuid'
-                and ovs.ovsuuid.is_valid_string(json[1])):
+                    and json[0] == 'uuid'
+                    and ovs.ovsuuid.is_valid_string(json[1])):
                 uuid = ovs.ovsuuid.from_string(json[1])
                 row = self._txn_rows.get(uuid, None)
                 if row and row._data is None:
@@ -967,14 +1104,14 @@ class Transaction(object):
                 for column_name, datum in six.iteritems(row._changes):
                     if row._data is not None or not datum.is_default():
                         row_json[column_name] = (
-                                self._substitute_uuids(datum.to_json()))
+                            self._substitute_uuids(datum.to_json()))
 
                         # If anything really changed, consider it an update.
                         # We can't suppress not-really-changed values earlier
                         # or transactions would become nonatomic (see the big
                         # comment inside Transaction._write()).
                         if (not any_updates and row._data is not None and
-                            row._data[column_name] != datum):
+                                row._data[column_name] != datum):
                             any_updates = True
 
                 if row._data is None or row_json:
diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at
index 4baac46..2238653 100644
--- a/tests/ovsdb-idl.at
+++ b/tests/ovsdb-idl.at
@@ -646,6 +646,103 @@ OVSDB_CHECK_IDL_FETCH_COLUMNS([simple idl, initially 
populated],
 003: done
 ]])
 
+m4_define([OVSDB_CHECK_IDL_WO_MONITOR_COND_PY],
+  [AT_SETUP([$1 - Python])
+   AT_SKIP_IF([test $HAVE_PYTHON = no])
+   AT_KEYWORDS([ovsdb server idl Python monitor $4])
+   AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema],
+                  [0], [stdout], [ignore])
+   AT_CHECK([ovsdb-server '-vPATTERN:console:ovsdb-server|%c|%m' --detach 
--no-chdir --pidfile="`pwd`"/pid --remote=punix:socket 
--unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
+   AT_CHECK([ovs-appctl -t "`pwd`"/unixctl ovsdb-server/disable-monitor-cond])
+   AT_CHECK([$PYTHON $srcdir/test-ovsdb.py  -t10 idl $srcdir/idltest.ovsschema 
unix:socket $2],
+            [0], [stdout], [ignore], [kill `cat pid`])
+   AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$5],,, [[| $5]]),
+            [0], [$3], [], [kill `cat pid`])
+   OVSDB_SERVER_SHUTDOWN
+   AT_CLEANUP])
+
+
+m4_define([OVSDB_CHECK_IDL_WO_MONITOR_COND],
+   [OVSDB_CHECK_IDL_WO_MONITOR_COND_PY($@)])
+
+
+OVSDB_CHECK_IDL_WO_MONITOR_COND([simple idl disable monitor-cond],
+  [['["idltest",
+      {"op": "insert",
+       "table": "simple",
+       "row": {"i": 1,
+               "r": 2.0,
+               "b": true,
+               "s": "mystring",
+               "u": ["uuid", "84f5c8f5-ac76-4dbc-a24f-8860eb407fc1"],
+               "ia": ["set", [1, 2, 3]],
+               "ra": ["set", [-0.5]],
+               "ba": ["set", [true]],
+               "sa": ["set", ["abc", "def"]],
+               "ua": ["set", [["uuid", "69443985-7806-45e2-b35f-574a04e720f9"],
+                              ["uuid", 
"aad11ef0-816a-4b01-93e6-03b8b4256b98"]]]}},
+      {"op": "insert",
+       "table": "simple",
+       "row": {}}]' \
+    '["idltest",
+      {"op": "update",
+       "table": "simple",
+       "where": [],
+       "row": {"b": true}}]' \
+    '["idltest",
+      {"op": "update",
+       "table": "simple",
+       "where": [],
+       "row": {"r": 123.5}}]' \
+    '["idltest",
+      {"op": "insert",
+       "table": "simple",
+       "row": {"i": -1,
+               "r": 125,
+               "b": false,
+               "s": "",
+               "ia": ["set", [1]],
+               "ra": ["set", [1.5]],
+               "ba": ["set", [false]],
+               "sa": ["set", []],
+               "ua": ["set", []]}}]' \
+    '["idltest",
+      {"op": "update",
+       "table": "simple",
+       "where": [["i", "<", 1]],
+       "row": {"s": "newstring"}}]' \
+    '["idltest",
+      {"op": "delete",
+       "table": "simple",
+       "where": [["i", "==", 0]]}]' \
+    'reconnect']],
+  [[000: empty
+001: {"error":null,"result":[{"uuid":["uuid","<0>"]},{"uuid":["uuid","<1>"]}]}
+002: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
+002: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc 
def] ua=[<4> <5>] uuid=<0>
+003: {"error":null,"result":[{"count":2}]}
+004: i=0 r=0 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
+004: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc 
def] ua=[<4> <5>] uuid=<0>
+005: {"error":null,"result":[{"count":2}]}
+006: i=0 r=123.5 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
+006: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] 
sa=[abc def] ua=[<4> <5>] uuid=<0>
+007: {"error":null,"result":[{"uuid":["uuid","<6>"]}]}
+008: i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] 
uuid=<6>
+008: i=0 r=123.5 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
+008: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] 
sa=[abc def] ua=[<4> <5>] uuid=<0>
+009: {"error":null,"result":[{"count":2}]}
+010: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] 
ua=[] uuid=<6>
+010: i=0 r=123.5 b=true s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] 
uuid=<1>
+010: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] 
sa=[abc def] ua=[<4> <5>] uuid=<0>
+011: {"error":null,"result":[{"count":1}]}
+012: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] 
ua=[] uuid=<6>
+012: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] 
sa=[abc def] ua=[<4> <5>] uuid=<0>
+013: reconnect
+014: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] 
ua=[] uuid=<6>
+014: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] 
sa=[abc def] ua=[<4> <5>] uuid=<0>
+015: done
+]])
+
 m4_define([OVSDB_CHECK_IDL_TRACK_C],
   [AT_SETUP([$1 - C])
    AT_KEYWORDS([ovsdb server idl tracking positive $5])
-- 
2.1.4


_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to