From: Anton Ivanov <anton.iva...@cambridgegreys.com>

Generate structures of the type {"table":{"row uuid":"row"}}
in parallel.
This is one of the slowest operations in ovsdb as it involves
walks as well as convertion to jason. Both are parellelized and the
results from worker threads working on "slices" of the hash
are merged.
Use the parallel generation to produce snapshots.

Signed-off-by: Anton Ivanov <anton.iva...@cambridgegreys.com>
---
 ovsdb/automake.mk     |   2 +
 ovsdb/file.c          | 120 +++++++++++++++++++++++++++++++++++++++---
 ovsdb/parallel-json.c |  91 ++++++++++++++++++++++++++++++++
 ovsdb/parallel-json.h |  24 +++++++++
 4 files changed, 229 insertions(+), 8 deletions(-)
 create mode 100644 ovsdb/parallel-json.c
 create mode 100644 ovsdb/parallel-json.h

diff --git a/ovsdb/automake.mk b/ovsdb/automake.mk
index 62cc02686..0fece2912 100644
--- a/ovsdb/automake.mk
+++ b/ovsdb/automake.mk
@@ -22,6 +22,8 @@ ovsdb_libovsdb_la_SOURCES = \
        ovsdb/ovsdb.h \
        ovsdb/monitor.c \
        ovsdb/monitor.h \
+       ovsdb/parallel-json.c \
+       ovsdb/parallel-json.h \
        ovsdb/query.c \
        ovsdb/query.h \
        ovsdb/raft.c \
diff --git a/ovsdb/file.c b/ovsdb/file.c
index 59220824f..7ee66932e 100644
--- a/ovsdb/file.c
+++ b/ovsdb/file.c
@@ -37,6 +37,8 @@
 #include "unixctl.h"
 #include "uuid.h"
 #include "util.h"
+#include "parallel-hmap.h"
+#include "parallel-json.h"
 #include "openvswitch/vlog.h"
 
 VLOG_DEFINE_THIS_MODULE(ovsdb_file);
@@ -367,23 +369,125 @@ ovsdb_file_change_cb(const struct ovsdb_row *old,
     return true;
 }
 
+/* Workload descriptor for parallel processing */
+
+struct snapshot_info {
+     struct ovsdb_file_txn *ftxn;
+     struct shash *tables;
+};
+
+/* Parallel processing thread */
+
+static void *
+build_ftxn_thread(void *arg)
+{
+    struct worker_control *control = (struct worker_control *) arg;
+    struct worker_pool *workload;
+    struct snapshot_info *si;
+    struct ovsdb_row *row;
+    int bnum;
+
+    while (!stop_parallel_processing()) {
+        wait_for_work(control);
+        workload = (struct worker_pool *) control->workload;
+        si = (struct snapshot_info *) control->data;
+        if (stop_parallel_processing()) {
+            return NULL;
+        }
+        if (si && workload) {
+            struct shash_node *node;
+
+            SHASH_FOR_EACH (node, si->tables) {
+                const struct ovsdb_table *table = node->data;
+
+                for (bnum = control->id;
+                        bnum <= table->rows.mask;
+                        bnum += workload->size)
+                {
+                    HMAP_FOR_EACH_IN_PARALLEL (row, hmap_node, bnum, 
&table->rows) {
+                        if (stop_parallel_processing()) {
+                            return NULL;
+                        }
+                        ovsdb_file_txn_add_row(si->ftxn, NULL, row, NULL);
+                    }
+                }
+            }
+        }
+        post_completed_work(control);
+    }
+    return NULL;
+}
+
+static struct worker_pool *snapshot_pool = NULL;
+static bool pool_init_done = false;
+
+static void
+init_snapshot_thread_pool(void)
+{
+    if (!pool_init_done) {
+        if (can_parallelize_hashes(false)) {
+            snapshot_pool = add_worker_pool(build_ftxn_thread);
+            if (snapshot_pool) {
+                int i;
+                for (i = 0; i < snapshot_pool->size; i++) {
+                   snapshot_pool->controls[i].workload = snapshot_pool;
+                }
+            }
+        }
+    }
+    pool_init_done = true;
+}
+
+static void
+ftxn_merge_cb(struct worker_pool *pool OVS_UNUSED,
+              void *fin_result,
+              void *result_frags,
+              int index)
+{
+    struct ovsdb_file_txn *result = fin_result;
+    struct ovsdb_file_txn *frags = result_frags;
+
+    parallel_json_merge_tables(&result->json, frags[index].json);
+
+}
+
+
+
 struct json *
 ovsdb_to_txn_json(const struct ovsdb *db, const char *comment)
 {
     struct ovsdb_file_txn ftxn;
 
     ovsdb_file_txn_init(&ftxn);
+    init_snapshot_thread_pool();
+
+    if (snapshot_pool) {
+        /* System supports parallel processing. */
+        struct ovsdb_file_txn *ftxns =
+            xcalloc(sizeof(struct ovsdb_file_txn), snapshot_pool->size);
+        struct snapshot_info *sis =
+            xcalloc(sizeof(struct snapshot_info), snapshot_pool->size);
+        int i;
+        for (i = 0; i < snapshot_pool->size; i++) {
+            ovsdb_file_txn_init(&ftxns[i]);
+            sis[i].ftxn = &ftxns[i];
+            sis[i].tables = (struct shash *) &db->tables;
+            snapshot_pool->controls[i].data = &sis[i];
+        }
+        run_pool_callback(snapshot_pool, &ftxn, ftxns, ftxn_merge_cb);
+        free(ftxns);
+        free(sis);
+    } else {
+        struct shash_node *node;
+        SHASH_FOR_EACH (node, &db->tables) {
+            const struct ovsdb_table *table = node->data;
+            const struct ovsdb_row *row;
 
-    struct shash_node *node;
-    SHASH_FOR_EACH (node, &db->tables) {
-        const struct ovsdb_table *table = node->data;
-        const struct ovsdb_row *row;
-
-        HMAP_FOR_EACH (row, hmap_node, &table->rows) {
-            ovsdb_file_txn_add_row(&ftxn, NULL, row, NULL);
+            HMAP_FOR_EACH (row, hmap_node, &table->rows) {
+                ovsdb_file_txn_add_row(&ftxn, NULL, row, NULL);
+            }
         }
     }
-
     return ovsdb_file_txn_annotate(ftxn.json, comment);
 }
 
diff --git a/ovsdb/parallel-json.c b/ovsdb/parallel-json.c
new file mode 100644
index 000000000..6c0af8891
--- /dev/null
+++ b/ovsdb/parallel-json.c
@@ -0,0 +1,91 @@
+/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2016, 2017 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "openvswitch/json.h"
+#include "openvswitch/list.h"
+#include "parallel-hmap.h"
+#include "parallel-json.h"
+#include "openvswitch/vlog.h"
+
+VLOG_DEFINE_THIS_MODULE(ovsdb_parallel_json);
+
+
+/* Second level json object merge.
+ * The second level in ftxn is a (s)hash of rows keyed by UUID.
+ * They are unique, so we can brute force add them to the
+ * destination (s)hash.
+ */
+
+static void
+merge_ovsdb_rows(struct json *dest, struct json *inc)
+{
+    ovs_assert(dest->type == JSON_OBJECT);
+    ovs_assert(dest->type == JSON_OBJECT);
+    if (dest->object->map.mask == inc->object->map.mask) {
+        fast_hmap_merge(&dest->object->map, &inc->object->map);
+    } else {
+        struct shash_node *node, *next;
+        SHASH_FOR_EACH_SAFE (node, next, inc->object) {
+            shash_add_once(dest->object, node->name, node->data);
+            /* shash delete frees the name and the node struct */
+            shash_delete(inc->object, node);
+        }
+    }
+    /* The inc object should be empty at this point */
+    json_destroy(inc);
+}
+/* First level merge.
+ * The first level in ftxn is a (s)hash of tables with rows at
+ * level 2.
+ * If the destination table entry exists, we merge rows. Otherwise,
+ * we create it by moving the new entry and its rows in place.
+ */
+
+void
+parallel_json_merge_tables(struct json **dest, struct json *inc)
+{
+    struct json *target;
+    struct shash_node *node, *next;
+
+    if (!inc) {
+        return;
+    }
+
+    if (!*dest) {
+        *dest = inc;
+        return;
+    }
+
+    SHASH_FOR_EACH_SAFE (node, next, inc->object) {
+        target = shash_find_data((*dest)->object, node->name);
+        if (target) {
+            /* Target exists, merge rows. Merge will destroy
+             * the remnant json rows object in the second argument.
+             */
+            merge_ovsdb_rows(target, (struct json *) node->data);
+        } else {
+            /* Target does not exists, move rows.
+             * The json object is now part of dest.
+             */
+            shash_add((*dest)->object, node->name, node->data);
+        }
+        /* Delete the remnant tables object */
+        shash_delete(inc->object, node);
+    }
+    /* Destroy remnant inc (should be empty at this point). */
+    json_destroy(inc);
+}
diff --git a/ovsdb/parallel-json.h b/ovsdb/parallel-json.h
new file mode 100644
index 000000000..967c62cc8
--- /dev/null
+++ b/ovsdb/parallel-json.h
@@ -0,0 +1,24 @@
+/* Copyright (c) 2009, 2010, 2011, 2016, 2017 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef OVSDB_PARALLEL_JSON_H
+#define OVSDB_PARALLEL_JSON_H 1
+
+#include "compiler.h"
+#include "openvswitch/json.h"
+
+void parallel_json_merge_tables(struct json **dest, struct json *inc);
+
+#endif /* ovsdb/parallel-json.h */
-- 
2.20.1

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to