From: Numan Siddique <num...@ovn.org>

Presently, replication is not allowed if there is a schema version mismatch 
between
the schema returned by the active ovsdb-server and the local db schema. This is
causing failures in OVN DB HA deployments during uprades.

In the case of OpenStack tripleo deployment with OVN, OVN DB ovsdb-servers are
deployed on a multi node controller cluster in active/standby mode. During
minor updates or major upgrades, the cluster is updated one at a time. If
a node A is running active OVN DB ovsdb-servers and when it is updated, another
node B becomes active. After the update when OVN DB ovsdb-servers in A are 
started,
these ovsdb-servers fail to replicate from the active if there is a schema
version mismatch.

This patch addresses this issue by allowing replication even if there is a
schema version mismatch only if all the active db schema tables and its colums 
are
present in the local db schema.

This should not result in any data loss.

Signed-off-by: Numan Siddique <num...@ovn.org>
---
v1 -> v2
--------
 * Addressed review comments from Ben. The schema version numbers are
   not checked.


 ovsdb/replication.c        | 156 +++++++++++++++++++++++++++++++------
 tests/ovsdb-replication.at |  23 ++++++
 tests/ovsdb-server.at      | 109 ++++++++++++++++++++++++++
 3 files changed, 263 insertions(+), 25 deletions(-)

diff --git a/ovsdb/replication.c b/ovsdb/replication.c
index 752b3c89c..6191cb934 100644
--- a/ovsdb/replication.c
+++ b/ovsdb/replication.c
@@ -43,7 +43,7 @@ static struct uuid server_uuid;
 static struct jsonrpc_session *session;
 static unsigned int session_seqno = UINT_MAX;
 
-static struct jsonrpc_msg *create_monitor_request(struct ovsdb *db);
+static struct jsonrpc_msg *create_monitor_request(struct ovsdb_schema *);
 static void add_monitored_table(struct ovsdb_table_schema *table,
                                 struct json *monitor_requests);
 
@@ -100,16 +100,27 @@ enum ovsdb_replication_state {
 static enum ovsdb_replication_state state;
 
 
+struct replication_db {
+    struct ovsdb *db;
+    bool schema_version_higher;
+     /* Points to the schema received from the active server if
+      * the local db schema version is higher. NULL otherwise. */
+    struct ovsdb_schema *active_db_schema;
+};
+
+static bool check_replication_possible(struct ovsdb_schema *local_db_schema,
+                                       struct ovsdb_schema *active_db_schema);
+
 /* All DBs known to ovsdb-server.  The actual replication dbs are stored
  * in 'replication dbs', which is a subset of all dbs and remote dbs whose
  * schema matches.  */
 static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs);
 static struct shash *replication_dbs;
 
-static struct shash *replication_db_clone(struct shash *dbs);
+static struct shash *replication_dbs_create(void);
 static void replication_dbs_destroy(void);
 /* Find 'struct ovsdb' by name within 'replication_dbs' */
-static struct ovsdb* find_db(const char *db_name);
+static struct replication_db *find_db(const char *db_name);
 
 
 void
@@ -152,8 +163,8 @@ send_schema_requests(const struct json *result)
         if (name->type == JSON_STRING) {
             /* Send one schema request for each remote DB. */
             const char *db_name = json_string(name);
-            struct ovsdb *db = find_db(db_name);
-            if (db) {
+            struct replication_db *rdb = find_db(db_name);
+            if (rdb) {
                 struct jsonrpc_msg *request =
                     jsonrpc_create_request(
                         "get_schema",
@@ -161,7 +172,7 @@ send_schema_requests(const struct json *result)
                             json_string_create(db_name)),
                         NULL);
 
-                request_ids_add(request->id, db);
+                request_ids_add(request->id, rdb->db);
                 jsonrpc_session_send(session, request);
             }
         }
@@ -206,11 +217,11 @@ replication_run(void)
                 && msg->params->array.n == 2
                 && msg->params->array.elems[0]->type == JSON_STRING) {
                 char *db_name = msg->params->array.elems[0]->string;
-                struct ovsdb *db = find_db(db_name);
-                if (db) {
+                struct replication_db *rdb = find_db(db_name);
+                if (rdb) {
                     struct ovsdb_error *error;
                     error = process_notification(msg->params->array.elems[1],
-                                                 db);
+                                                 rdb->db);
                     if (error) {
                         ovsdb_error_assert(error);
                         state = RPL_S_ERR;
@@ -218,6 +229,7 @@ replication_run(void)
                 }
             }
         } else if (msg->type == JSONRPC_REPLY) {
+            struct replication_db *rdb;
             struct ovsdb *db;
             if (!request_ids_lookup_and_free(msg->id, &db)) {
                 VLOG_WARN("received unexpected reply");
@@ -256,7 +268,7 @@ replication_run(void)
                 jsonrpc_session_send(session, request);
 
                 replication_dbs_destroy();
-                replication_dbs = replication_db_clone(&local_dbs);
+                replication_dbs = replication_dbs_create();
                 state = RPL_S_DB_REQUESTED;
                 break;
             }
@@ -284,17 +296,37 @@ replication_run(void)
                     state = RPL_S_ERR;
                 }
 
-                if (db != find_db(schema->name)) {
+                rdb = find_db(schema->name);
+                if (!rdb) {
                     /* Unexpected schema. */
                     VLOG_WARN("unexpected schema %s", schema->name);
                     state = RPL_S_ERR;
-                } else if (!ovsdb_schema_equal(schema, db->schema)) {
+                } else if (!ovsdb_schema_equal(schema, rdb->db->schema)) {
                     /* Schmea version mismatch. */
-                    VLOG_INFO("Schema version mismatch, %s not replicated",
+                    VLOG_INFO("Schema version mismatch, checking if %s can "
+                              "still be replicated or not.",
                               schema->name);
-                    shash_find_and_delete(replication_dbs, schema->name);
+                    if (check_replication_possible(rdb->db->schema, schema)) {
+                        VLOG_INFO("%s can be replicated.", schema->name);
+                        rdb->schema_version_higher = true;
+                        if (rdb->active_db_schema) {
+                            ovsdb_schema_destroy(rdb->active_db_schema);
+                        }
+                        rdb->active_db_schema = schema;
+                    } else {
+                        VLOG_INFO("%s cannot be replicated.", schema->name);
+                        struct replication_db *r =
+                            shash_find_and_delete(replication_dbs,
+                                                  schema->name);
+                        if (r->active_db_schema) {
+                            ovsdb_schema_destroy(r->active_db_schema);
+                        }
+                        free(r);
+                        ovsdb_schema_destroy(schema);
+                    }
+                } else {
+                    ovsdb_schema_destroy(schema);
                 }
-                ovsdb_schema_destroy(schema);
 
                 /* After receiving schemas, reset the local databases that
                  * will be monitored and send out monitor requests for them. */
@@ -306,11 +338,13 @@ replication_run(void)
                         state = RPL_S_ERR;
                     } else {
                         SHASH_FOR_EACH (node, replication_dbs) {
-                            db = node->data;
+                            rdb = node->data;
                             struct jsonrpc_msg *request =
-                                create_monitor_request(db);
+                                create_monitor_request(
+                                    rdb->schema_version_higher ?
+                                    rdb->active_db_schema : rdb->db->schema);
 
-                            request_ids_add(request->id, db);
+                            request_ids_add(request->id, rdb->db);
                             jsonrpc_session_send(session, request);
                             VLOG_DBG("Send monitor requests");
                             state = RPL_S_MONITOR_REQUESTED;
@@ -509,7 +543,7 @@ replication_destroy(void)
     shash_destroy(&local_dbs);
 }
 
-static struct ovsdb *
+static struct replication_db *
 find_db(const char *db_name)
 {
     return shash_find_data(replication_dbs, db_name);
@@ -541,11 +575,10 @@ reset_database(struct ovsdb *db)
  * Caller is responsible for disposing 'request'.
  */
 static struct jsonrpc_msg *
-create_monitor_request(struct ovsdb *db)
+create_monitor_request(struct ovsdb_schema *schema)
 {
     struct jsonrpc_msg *request;
     struct json *monitor;
-    struct ovsdb_schema *schema = db->schema;
     const char *db_name = schema->name;
 
     struct json *monitor_request = json_object_create();
@@ -779,14 +812,18 @@ request_ids_clear(void)
 }
 
 static struct shash *
-replication_db_clone(struct shash *dbs)
+replication_dbs_create(void)
 {
     struct shash *new = xmalloc(sizeof *new);
     shash_init(new);
 
     struct shash_node *node;
-    SHASH_FOR_EACH (node, dbs) {
-        shash_add(new, node->name, node->data);
+    SHASH_FOR_EACH (node, &local_dbs) {
+        struct replication_db *repl_db = xmalloc(sizeof *repl_db);
+        repl_db->db = node->data;
+        repl_db->schema_version_higher = false;
+        repl_db->active_db_schema = NULL;
+        shash_add(new, node->name, repl_db);
     }
 
     return new;
@@ -795,7 +832,24 @@ replication_db_clone(struct shash *dbs)
 static void
 replication_dbs_destroy(void)
 {
-    shash_destroy(replication_dbs);
+    if (!replication_dbs) {
+        return;
+    }
+
+    struct shash_node *node, *next;
+
+    SHASH_FOR_EACH_SAFE (node, next, replication_dbs) {
+        hmap_remove(&replication_dbs->map, &node->node);
+        struct replication_db *rdb = node->data;
+        if (rdb->active_db_schema) {
+            ovsdb_schema_destroy(rdb->active_db_schema);
+        }
+        free(rdb);
+        free(node->name);
+        free(node);
+    }
+
+    hmap_destroy(&replication_dbs->map);
     free(replication_dbs);
     replication_dbs = NULL;
 }
@@ -877,6 +931,58 @@ replication_status(void)
     return ds_steal_cstr(&ds);
 }
 
+/* Checks if its possible to replicate to the local db from the active db
+ * schema.
+ *
+ * Returns true, if
+ *   - local db schema has all the tables and columns of active db schema.
+ * False, otherwise.
+ */
+
+static bool
+check_replication_possible(struct ovsdb_schema *local_db_schema,
+                           struct ovsdb_schema *active_db_schema)
+{
+    struct shash_node *node;
+    SHASH_FOR_EACH (node, &active_db_schema->tables) {
+        struct ovsdb_table_schema *ldb_table_schema =
+            shash_find_data(&local_db_schema->tables, node->name);
+        if (!ldb_table_schema) {
+            VLOG_INFO("Table - %s not present in the local db schema",
+                      node->name);
+            return false;
+        }
+
+        /* Local schema table should have all the columns
+         * of active schema table. */
+        struct ovsdb_table_schema *adb_table_schema = node->data;
+        struct shash_node *n;
+        SHASH_FOR_EACH (n, &adb_table_schema->columns) {
+            struct ovsdb_column *ldb_col =
+                shash_find_data(&ldb_table_schema->columns, n->name);
+            if (!ldb_col) {
+                VLOG_INFO("Column - %s not present in the local "
+                          "db schema table - %s", n->name, node->name);
+                return false;
+            }
+
+            struct json *ldb_col_json = ovsdb_column_to_json(ldb_col);
+            struct json *adb_col_json = ovsdb_column_to_json(n->data);
+            bool cols_equal = json_equal(ldb_col_json, adb_col_json);
+            json_destroy(ldb_col_json);
+            json_destroy(adb_col_json);
+
+            if (!cols_equal) {
+                VLOG_INFO("Column - %s mismatch in the local "
+                          "db schema table - %s", n->name, node->name);
+                return false;
+            }
+        }
+    }
+
+    return true;
+}
+
 void
 replication_usage(void)
 {
diff --git a/tests/ovsdb-replication.at b/tests/ovsdb-replication.at
index f81381bdb..82c416052 100644
--- a/tests/ovsdb-replication.at
+++ b/tests/ovsdb-replication.at
@@ -19,6 +19,29 @@ replication_schema () {
     }
 EOF
 }
+replication_schema_v2 () {
+    cat <<'EOF'
+    {"name": "mydb",
+     "tables": {
+       "a": {
+         "columns": {
+           "number": {"type": "integer"},
+           "name": {"type": "string"}},
+         "indexes": [["number"]]},
+       "b": {
+         "columns": {
+           "number": {"type": "integer"},
+           "name": {"type": "string"},
+           "foo" : {"type": "string"}},
+         "indexes": [["number"]]},
+       "c": {
+         "columns": {
+           "number": {"type": "integer"},
+           "name": {"type": "string"}},
+         "indexes": [["number"]]}}
+    }
+EOF
+}
 ]
 m4_divert_pop([PREPARE_TESTS])
 
diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at
index 09629410a..0b15758f2 100644
--- a/tests/ovsdb-server.at
+++ b/tests/ovsdb-server.at
@@ -1903,3 +1903,112 @@ AT_CHECK([uuidfilt output], [0], [[[{"details":"insert 
operation not allowed whe
 ], [ignore])
 OVSDB_SERVER_SHUTDOWN
 AT_CLEANUP
+
+AT_SETUP([ovsdb-server replication with schema mismatch])
+AT_KEYWORDS([ovsdb server replication])
+replication_schema > subset_schema
+replication_schema_v2 > superset_schema
+
+AT_CHECK([ovsdb-tool create db1 subset_schema], [0], [stdout], [ignore])
+AT_CHECK([ovsdb-tool create db2 superset_schema], [0], [stdout], [ignore])
+
+dnl Add some data to both DBs
+AT_CHECK([ovsdb-tool transact db1 \
+'[["mydb",
+  {"op": "insert",
+   "table": "a",
+   "row": {"number": 9, "name": "nine"}}]]'], [0], [ignore], [ignore])
+
+AT_CHECK([ovsdb-tool transact db2 \
+'[["mydb",
+  {"op": "insert",
+   "table": "a",
+   "row": {"number": 10, "name": "ten"}}]]'], [0], [ignore], [ignore])
+
+dnl Start both 'db1' and 'db2'.
+AT_CHECK([ovsdb-server --detach --no-chdir --log-file=ovsdb-server1.log 
--pidfile="`pwd`"/pid --remote=punix:db.sock --unixctl="`pwd`"/unixctl db1 
--active ], [0], [ignore], [ignore])
+on_exit 'test ! -e pid || kill `cat pid`'
+
+
+AT_CHECK([ovsdb-server --detach --no-chdir --log-file=ovsdb-server2.log 
--pidfile="`pwd`"/pid2 --remote=punix:db2.sock --unixctl="`pwd`"/unixctl2 db2], 
[0], [ignore], [ignore])
+on_exit 'test ! -e pid2 || kill `cat pid2`'
+
+OVS_WAIT_UNTIL([ovs-appctl -t "`pwd`"/unixctl ovsdb-server/sync-status |grep 
active])
+OVS_WAIT_UNTIL([ovs-appctl -t "`pwd`"/unixctl2 ovsdb-server/sync-status |grep 
active])
+
+AT_CHECK([ovsdb-client dump unix:db.sock a number name], 0, [dnl
+a table
+name number
+---- ------
+nine 9
+])
+
+AT_CHECK([ovsdb-client dump unix:db2.sock a number name], 0, [dnl
+a table
+name number
+---- ------
+ten  10
+])
+
+# Replicate db1 from db2. It should fail since db2 schema
+# doesn't match with db1 and has additional tables/columns.
+AT_CHECK([ovs-appctl -t "`pwd`"/unixctl ovsdb-server/set-active-ovsdb-server 
unix:db2.sock])
+AT_CHECK([ovs-appctl -t "`pwd`"/unixctl 
ovsdb-server/connect-active-ovsdb-server])
+
+OVS_WAIT_UNTIL(
+  [test 1 = `cat ovsdb-server1.log | grep "Schema version mismatch, checking 
if mydb can still be replicated or not" | wc -l]`
+)
+
+OVS_WAIT_UNTIL(
+  [test 1 = `cat ovsdb-server1.log | grep "mydb cannot be replicated" | wc -l]`
+)
+
+OVS_WAIT_UNTIL([ovs-appctl -t "`pwd`"/unixctl ovsdb-server/sync-status |grep 
active])
+
+# Replicate db2 from db1. This should be successful.
+AT_CHECK([ovs-appctl -t "`pwd`"/unixctl 
ovsdb-server/disconnect-active-ovsdb-server])
+AT_CHECK([ovs-appctl -t "`pwd`"/unixctl2 ovsdb-server/set-active-ovsdb-server 
unix:db.sock])
+AT_CHECK([ovs-appctl -t "`pwd`"/unixctl2 
ovsdb-server/connect-active-ovsdb-server])
+
+OVS_WAIT_UNTIL(
+  [test 1 = `cat ovsdb-server2.log | grep "Schema version mismatch, checking 
if mydb can still be replicated or not" | wc -l]`
+)
+
+OVS_WAIT_UNTIL(
+  [test 1 = `cat ovsdb-server2.log | grep "mydb can be replicated" | wc -l]`
+)
+
+OVS_WAIT_UNTIL([ovs-appctl -t "`pwd`"/unixctl2 ovsdb-server/sync-status |grep 
replicating])
+
+AT_CHECK([ovsdb-client dump unix:db.sock a number name], 0, [dnl
+a table
+name number
+---- ------
+nine 9
+])
+
+AT_CHECK([ovsdb-client dump unix:db2.sock a number name], 0, [dnl
+a table
+name number
+---- ------
+nine 9
+])
+
+AT_CHECK([ovsdb-client transact unix:db.sock \
+'[["mydb",
+  {"op": "insert",
+   "table": "a",
+   "row": {"number": 6, "name": "six"}}]]'], [0], [ignore], [ignore])
+
+OVS_WAIT_UNTIL([test 1 = `ovsdb-client dump unix:db2.sock a number name | grep 
six | wc -l`])
+
+AT_CHECK([
+  ovsdb-client dump unix:db2.sock a number name], 0, [dnl
+a table
+name number
+---- ------
+nine 9
+six  6
+])
+
+AT_CLEANUP
-- 
2.21.0

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to