Currently all OVSDB database queries except for UUID lookups all result
in linear lookups over the entire table, even if an index is present.

This patch modifies ovsdb_query() to attempt an index lookup first, if
possible. If no matching indexes are present then a linear index is
still conducted.

To test this, I set up an ovsdb database with a variable number of rows
and timed the average of how long ovsdb-client took to query a single
row. The first two tests involved a linear scan that didn't match any
rows, so there was no overhead associated with sending or encoding
output. The post-patch linear scan was a worst case scenario where the
table did have an appropriate index but the conditions made its usage
impossible. The indexed lookup test was for a matching row, which did
also include overhead associated with a match. The results are included
in the table below.

Rows                   | 100k | 200k | 300k | 400k | 500k
-----------------------+------+------+------+------+-----
Pre-patch linear scan  |  9ms | 24ms | 37ms | 49ms | 61ms
Post-patch linear scan |  9ms | 24ms | 38ms | 49ms | 61ms
Indexed lookup         |  3ms |  3ms |  3ms |  3ms |  3ms

I also tested the performance of ovsdb_query() by wrapping it in a loop
and measuring the time it took to perform 1000 linear scans on 1, 10,
100k, and 200k rows. This test showed that the new index checking code
did not slow down worst case lookups to a statistically detectable
degree.

Reported-at: https://issues.redhat.com/browse/FDP-590
Signed-off-by: Mike Pattrick <m...@redhat.com>

---

v3:
 - Included txn in index code
 - Added benchmarks
 - Refactored code
 - Added more tests
 - Now a mock row is created to perform the search with standard
 functions
Signed-off-by: Mike Pattrick <m...@redhat.com>
---
 ovsdb/execution.c        |  20 +++--
 ovsdb/query.c            | 174 +++++++++++++++++++++++++++++++++++----
 ovsdb/query.h            |   6 +-
 ovsdb/rbac.c             |  15 ++--
 ovsdb/rbac.h             |  10 ++-
 ovsdb/row.h              |  28 +++++++
 ovsdb/transaction.c      |  29 +------
 ovsdb/transaction.h      |   5 ++
 tests/ovsdb-execution.at | 108 +++++++++++++++++++++++-
 tests/ovsdb-macros.at    |  10 +++
 tests/ovsdb-query.at     |  18 ++--
 tests/ovsdb-server.at    |   2 +-
 tests/ovsdb-tool.at      |   2 +-
 tests/test-ovsdb.c       |  15 +++-
 14 files changed, 363 insertions(+), 79 deletions(-)

diff --git a/ovsdb/execution.c b/ovsdb/execution.c
index f4cc9e802..212839bca 100644
--- a/ovsdb/execution.c
+++ b/ovsdb/execution.c
@@ -459,7 +459,7 @@ ovsdb_execute_select(struct ovsdb_execution *x, struct 
ovsdb_parser *parser,
     if (!error) {
         struct ovsdb_row_set rows = OVSDB_ROW_SET_INITIALIZER;
 
-        ovsdb_query_distinct(table, &condition, &columns, &rows);
+        ovsdb_query_distinct(table, &condition, &columns, &rows, x->txn);
         ovsdb_row_set_sort(&rows, &sort);
         json_object_put(result, "rows",
                         ovsdb_row_set_to_json(&rows, &columns));
@@ -545,8 +545,8 @@ ovsdb_execute_update(struct ovsdb_execution *x, struct 
ovsdb_parser *parser,
         ur.row = row;
         ur.columns = &columns;
         if (ovsdb_rbac_update(x->db, table, &columns, &condition, x->role,
-                              x->id)) {
-            ovsdb_query(table, &condition, update_row_cb, &ur);
+                              x->id, x->txn)) {
+            ovsdb_query(table, &condition, update_row_cb, &ur, x->txn);
         } else {
             error = ovsdb_perm_error("RBAC rules for client \"%s\" role "
                                      "\"%s\" prohibit modification of "
@@ -626,7 +626,7 @@ ovsdb_execute_mutate(struct ovsdb_execution *x, struct 
ovsdb_parser *parser,
                             json_integer_create(hmap_count(&table->rows)));
         } else {
             size_t row_count = 0;
-            ovsdb_query(table, &condition, count_row_cb, &row_count);
+            ovsdb_query(table, &condition, count_row_cb, &row_count, x->txn);
             json_object_put(result, "count",
                             json_integer_create(row_count));
         }
@@ -636,8 +636,8 @@ ovsdb_execute_mutate(struct ovsdb_execution *x, struct 
ovsdb_parser *parser,
         mr.mutations = &mutations;
         mr.error = &error;
         if (ovsdb_rbac_mutate(x->db, table, &mutations, &condition, x->role,
-                              x->id)) {
-            ovsdb_query(table, &condition, mutate_row_cb, &mr);
+                              x->id, x->txn)) {
+            ovsdb_query(table, &condition, mutate_row_cb, &mr, x->txn);
         } else {
             error = ovsdb_perm_error("RBAC rules for client \"%s\" role "
                                      "\"%s\" prohibit mutate operation on "
@@ -693,8 +693,9 @@ ovsdb_execute_delete(struct ovsdb_execution *x, struct 
ovsdb_parser *parser,
         dr.table = table;
         dr.txn = x->txn;
 
-        if (ovsdb_rbac_delete(x->db, table, &condition, x->role, x->id)) {
-            ovsdb_query(table, &condition, delete_row_cb, &dr);
+        if (ovsdb_rbac_delete(x->db, table, &condition, x->role, x->id,
+                              x->txn)) {
+            ovsdb_query(table, &condition, delete_row_cb, &dr, x->txn);
         } else {
             error = ovsdb_perm_error("RBAC rules for client \"%s\" role "
                                      "\"%s\" prohibit row deletion from "
@@ -811,7 +812,8 @@ ovsdb_execute_wait(struct ovsdb_execution *x, struct 
ovsdb_parser *parser,
         aux.actual = &actual;
         aux.expected = &expected;
         aux.equal = &equal;
-        ovsdb_query(table, &condition, ovsdb_execute_wait_query_cb, &aux);
+        ovsdb_query(table, &condition, ovsdb_execute_wait_query_cb, &aux,
+                    x->txn);
         if (equal) {
             /* We know that every row in 'actual' is also in 'expected'.  We
              * also know that all of the rows in 'actual' are distinct and that
diff --git a/ovsdb/query.c b/ovsdb/query.c
index eebe56412..586b90118 100644
--- a/ovsdb/query.c
+++ b/ovsdb/query.c
@@ -21,32 +21,174 @@
 #include "condition.h"
 #include "row.h"
 #include "table.h"
+#include "transaction.h"
+
+struct txn_state {
+    const struct ovsdb_condition *cnd;
+    struct ovsdb_table *table;
+    bool match;
+};
+
+static bool
+search_txn(const struct ovsdb_row *old, const struct ovsdb_row *new,
+           const unsigned long int *changed OVS_UNUSED, void *aux)
+{
+    struct txn_state *ts = aux;
+
+    if (ts->match) {
+        return false;
+    }
+
+    if (new && new->table == ts->table) {
+        if (ovsdb_condition_match_every_clause(new, ts->cnd)) {
+            ts->match = true;
+            return false;
+        }
+    }
+
+    if (old && old->table == ts->table) {
+        if (ovsdb_condition_match_every_clause(old, ts->cnd)) {
+            ts->match = true;
+            return false;
+        }
+    }
+
+    return true;
+}
+
+static bool
+ovsdb_query_index(struct ovsdb_table *table,
+                  const struct ovsdb_condition *cnd,
+                  const struct ovsdb_row **out,
+                  const struct ovsdb_txn *txn)
+{
+    size_t n_fields = shash_count(&table->schema->columns);
+    struct ovsdb_row *row;
+    bool ret = false;
+
+    if (txn) {
+        /* Check the transaction first. If a modified row matches the
+         * conditions then bail out before an index search. There are a lot of
+         * different possible conditions and including support for returning
+         * mid-transaction rows adds a lot of complications. */
+
+        struct txn_state ts = {
+            .cnd = cnd,
+            .table = table,
+            .match = false,
+        };
+
+        ovsdb_txn_for_each_change(txn, search_txn, &ts);
+
+        if (ts.match) {
+            return false;
+        }
+    }
+
+    /* Construct a mock row. */
+    row = xzalloc(sizeof *row + sizeof *row->fields * n_fields);
+    row->table = table;
+
+    for (size_t c = 0; c < cnd->n_clauses; c++) {
+        const struct ovsdb_clause *cnd_cls = &cnd->clauses[c];
+
+        if (cnd_cls->function != OVSDB_F_EQ) {
+            goto done;
+        }
+
+        row->fields[cnd_cls->index] = cnd_cls->arg;
+    }
+
+    /* Per index search. */
+    for (size_t idx = 0; idx < table->schema->n_indexes; idx++) {
+        const struct ovsdb_column_set *index = &table->schema->indexes[idx];
+        size_t matches = 0;
+        size_t hash;
+
+        if (index->n_columns > cnd->n_clauses) {
+            /* Only index search if condition counts are greater or equal to
+             * index length. */
+            continue;
+        }
+
+        hash = 0;
+
+        /* The conditions may not be in the same order as the index. */
+        for (size_t i = 0; i < index->n_columns && matches == i; i++) {
+            const struct ovsdb_column *idx_col = index->columns[i];
+
+            if (row->fields[idx_col->index].n) {
+                hash = ovsdb_datum_hash(&row->fields[idx_col->index],
+                                        &idx_col->type, hash);
+                matches++;
+            }
+        }
+
+        if (matches != index->n_columns) {
+            continue;
+        }
+
+        /* The index matches so a linear search isn't needed. */
+        ret = true;
+
+        /* The return code doesn't matter in this case, because this function
+         * returns true on a suitable index instead of a matching key. */
+        *out = ovsdb_index_search(&table->indexes[idx], row, idx, hash);
+
+        /* If the index has fewer rows than the condition, verify that all
+         * conditions are true. */
+        if (*out && index->n_columns != cnd->n_clauses) {
+            if (!ovsdb_condition_match_every_clause(*out, cnd)) {
+                /* Non-index condition doesn't match. */
+                *out = NULL;
+            }
+        }
+
+        /* In the case that there was a matching index but no matching row, the
+         * index check is still considered to be a success. */
+        break;
+    }
+
+done:
+    free(row);
+    return ret;
+}
 
 void
 ovsdb_query(struct ovsdb_table *table, const struct ovsdb_condition *cnd,
-            bool (*output_row)(const struct ovsdb_row *, void *aux), void *aux)
+            bool (*output_row)(const struct ovsdb_row *, void *aux), void *aux,
+            const struct ovsdb_txn *txn)
 {
+    const struct ovsdb_row *row = NULL;
+
     if (cnd->n_clauses > 0
         && cnd->clauses[0].column->index == OVSDB_COL_UUID
         && cnd->clauses[0].function == OVSDB_F_EQ) {
         /* Optimize the case where the query has a clause of the form "uuid ==
          * <some-uuid>", since we have an index on UUID. */
-        const struct ovsdb_row *row;
 
         row = ovsdb_table_get_row(table, &cnd->clauses[0].arg.keys[0].uuid);
         if (row && row->table == table &&
             ovsdb_condition_match_every_clause(row, cnd)) {
             output_row(row, aux);
         }
-    } else {
-        /* Linear scan. */
-        const struct ovsdb_row *row;
+        return;
+    }
 
-        HMAP_FOR_EACH_SAFE (row, hmap_node, &table->rows) {
-            if (ovsdb_condition_match_every_clause(row, cnd) &&
-                !output_row(row, aux)) {
-                break;
-            }
+    /* Check the indexes. */
+    if (ovsdb_query_index(table, cnd, &row, txn)) {
+        if (row) {
+            output_row(row, aux);
+            return;
+        }
+        return;
+    }
+
+    /* Linear scan. */
+    HMAP_FOR_EACH_SAFE (row, hmap_node, &table->rows) {
+        if (ovsdb_condition_match_every_clause(row, cnd) &&
+            !output_row(row, aux)) {
+            break;
         }
     }
 }
@@ -62,9 +204,10 @@ query_row_set_cb(const struct ovsdb_row *row, void 
*results_)
 void
 ovsdb_query_row_set(struct ovsdb_table *table,
                     const struct ovsdb_condition *condition,
-                    struct ovsdb_row_set *results)
+                    struct ovsdb_row_set *results,
+                    const struct ovsdb_txn *txn)
 {
-    ovsdb_query(table, condition, query_row_set_cb, results);
+    ovsdb_query(table, condition, query_row_set_cb, results, txn);
 }
 
 static bool
@@ -79,11 +222,12 @@ void
 ovsdb_query_distinct(struct ovsdb_table *table,
                      const struct ovsdb_condition *condition,
                      const struct ovsdb_column_set *columns,
-                     struct ovsdb_row_set *results)
+                     struct ovsdb_row_set *results,
+                     const struct ovsdb_txn *txn)
 {
     if (!columns || ovsdb_column_set_contains(columns, OVSDB_COL_UUID)) {
         /* All the result rows are guaranteed to be distinct anyway. */
-        ovsdb_query_row_set(table, condition, results);
+        ovsdb_query_row_set(table, condition, results, txn);
         return;
     } else {
         /* Use hash table to drop duplicates. */
@@ -91,7 +235,7 @@ ovsdb_query_distinct(struct ovsdb_table *table,
         struct ovsdb_row_hash hash;
 
         ovsdb_row_hash_init(&hash, columns);
-        ovsdb_query(table, condition, query_distinct_cb, &hash);
+        ovsdb_query(table, condition, query_distinct_cb, &hash, txn);
         HMAP_FOR_EACH (node, hmap_node, &hash.rows) {
             ovsdb_row_set_add_row(results, node->row);
         }
diff --git a/ovsdb/query.h b/ovsdb/query.h
index 5cc9b11da..e1bdef762 100644
--- a/ovsdb/query.h
+++ b/ovsdb/query.h
@@ -27,11 +27,11 @@ struct ovsdb_txn;
 
 void ovsdb_query(struct ovsdb_table *, const struct ovsdb_condition *,
                  bool (*output_row)(const struct ovsdb_row *, void *aux),
-                 void *aux);
+                 void *aux, const struct ovsdb_txn *);
 void ovsdb_query_row_set(struct ovsdb_table *, const struct ovsdb_condition *,
-                         struct ovsdb_row_set *);
+                         struct ovsdb_row_set *, const struct ovsdb_txn *);
 void ovsdb_query_distinct(struct ovsdb_table *, const struct ovsdb_condition *,
                           const struct ovsdb_column_set *,
-                          struct ovsdb_row_set *);
+                          struct ovsdb_row_set *, const struct ovsdb_txn *);
 
 #endif /* ovsdb/query.h */
diff --git a/ovsdb/rbac.c b/ovsdb/rbac.c
index a3fe97120..d543df41d 100644
--- a/ovsdb/rbac.c
+++ b/ovsdb/rbac.c
@@ -219,7 +219,8 @@ denied:
 bool
 ovsdb_rbac_delete(const struct ovsdb *db, struct ovsdb_table *table,
                   struct ovsdb_condition *condition,
-                  const char *role, const char *id)
+                  const char *role, const char *id,
+                  const struct ovsdb_txn *txn)
 {
     const struct ovsdb_table_schema *ts = table->schema;
     const struct ovsdb_row *perms;
@@ -244,7 +245,7 @@ ovsdb_rbac_delete(const struct ovsdb *db, struct 
ovsdb_table *table,
     rd.role = role;
     rd.id = id;
 
-    ovsdb_query(table, condition, rbac_delete_cb, &rd);
+    ovsdb_query(table, condition, rbac_delete_cb, &rd, txn);
 
     if (rd.permitted) {
         return true;
@@ -309,7 +310,8 @@ ovsdb_rbac_update(const struct ovsdb *db,
                   struct ovsdb_table *table,
                   struct ovsdb_column_set *columns,
                   struct ovsdb_condition *condition,
-                  const char *role, const char *id)
+                  const char *role, const char *id,
+                  const struct ovsdb_txn *txn)
 {
     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
     const struct ovsdb_table_schema *ts = table->schema;
@@ -348,7 +350,7 @@ ovsdb_rbac_update(const struct ovsdb *db,
     ru.modifiable = datum;
     ru.permitted = true;
 
-    ovsdb_query(table, condition, rbac_update_cb, &ru);
+    ovsdb_query(table, condition, rbac_update_cb, &ru, txn);
 
     if (ru.permitted) {
         return true;
@@ -398,7 +400,8 @@ ovsdb_rbac_mutate(const struct ovsdb *db,
                   struct ovsdb_table *table,
                   struct ovsdb_mutation_set *mutations,
                   struct ovsdb_condition *condition,
-                  const char *role, const char *id)
+                  const char *role, const char *id,
+                  const struct ovsdb_txn *txn)
 {
     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
     const struct ovsdb_table_schema *ts = table->schema;
@@ -437,7 +440,7 @@ ovsdb_rbac_mutate(const struct ovsdb *db,
     rm.modifiable = datum;
     rm.permitted = true;
 
-    ovsdb_query(table, condition, rbac_mutate_cb, &rm);
+    ovsdb_query(table, condition, rbac_mutate_cb, &rm, txn);
 
     if (rm.permitted) {
         return true;
diff --git a/ovsdb/rbac.h b/ovsdb/rbac.h
index 42f2ef925..b46a6235e 100644
--- a/ovsdb/rbac.h
+++ b/ovsdb/rbac.h
@@ -25,6 +25,7 @@ struct ovsdb_condition;
 struct ovsdb_mutation_set;
 struct ovsdb_row;
 struct ovsdb_table;
+struct ovsdb_txn;
 
 bool ovsdb_rbac_insert(const struct ovsdb *,
                        const struct ovsdb_table *,
@@ -33,16 +34,19 @@ bool ovsdb_rbac_insert(const struct ovsdb *,
 bool ovsdb_rbac_delete(const struct ovsdb *,
                        struct ovsdb_table *,
                        struct ovsdb_condition *,
-                       const char *role, const char *id);
+                       const char *role, const char *id,
+                       const struct ovsdb_txn *);
 bool ovsdb_rbac_update(const struct ovsdb *,
                        struct ovsdb_table *,
                        struct ovsdb_column_set *,
                        struct ovsdb_condition *condition,
-                       const char *role, const char *id);
+                       const char *role, const char *id,
+                       const struct ovsdb_txn *);
 bool ovsdb_rbac_mutate(const struct ovsdb *,
                        struct ovsdb_table *,
                        struct ovsdb_mutation_set *,
                        struct ovsdb_condition *,
-                       const char *role, const char *id);
+                       const char *role, const char *id,
+                       const struct ovsdb_txn *);
 
 #endif /* ovsdb/rbac.h */
diff --git a/ovsdb/row.h b/ovsdb/row.h
index 6f5e58acb..f585b9fdc 100644
--- a/ovsdb/row.h
+++ b/ovsdb/row.h
@@ -22,6 +22,7 @@
 #include "openvswitch/hmap.h"
 #include "openvswitch/list.h"
 #include "ovsdb-data.h"
+#include "table.h"
 
 struct ovsdb_column_set;
 
@@ -152,6 +153,33 @@ ovsdb_row_hash(const struct ovsdb_row *row)
 {
     return uuid_hash(ovsdb_row_get_uuid(row));
 }
+
+/* Returns the offset in bytes from the start of an ovsdb_row for 'table' to
+ * the hmap_node for the index numbered 'i'. */
+static inline size_t
+ovsdb_row_index_offset__(const struct ovsdb_table *table, size_t i)
+{
+    size_t n_fields = shash_count(&table->schema->columns);
+    return (offsetof(struct ovsdb_row, fields)
+            + n_fields * sizeof(struct ovsdb_datum)
+            + i * sizeof(struct hmap_node));
+}
+
+/* Returns the hmap_node in 'row' for the index numbered 'i'. */
+static inline struct hmap_node *
+ovsdb_row_get_index_node(struct ovsdb_row *row, size_t i)
+{
+    return (void *) ((char *) row + ovsdb_row_index_offset__(row->table, i));
+}
+
+/* Returns the ovsdb_row given 'index_node', which is a pointer to that row's
+ * hmap_node for the index numbered 'i' within 'table'. */
+static inline struct ovsdb_row *
+ovsdb_row_from_index_node(struct hmap_node *index_node,
+                          const struct ovsdb_table *table, size_t i)
+{
+    return (void *) ((char *) index_node - ovsdb_row_index_offset__(table, i));
+}
 
 /* An unordered collection of rows. */
 struct ovsdb_row_set {
diff --git a/ovsdb/transaction.c b/ovsdb/transaction.c
index 484a88e1c..1c29ff123 100644
--- a/ovsdb/transaction.c
+++ b/ovsdb/transaction.c
@@ -194,33 +194,6 @@ ovsdb_txn_row_abort(struct ovsdb_txn *txn OVS_UNUSED,
     return NULL;
 }
 
-/* Returns the offset in bytes from the start of an ovsdb_row for 'table' to
- * the hmap_node for the index numbered 'i'. */
-static size_t
-ovsdb_row_index_offset__(const struct ovsdb_table *table, size_t i)
-{
-    size_t n_fields = shash_count(&table->schema->columns);
-    return (offsetof(struct ovsdb_row, fields)
-            + n_fields * sizeof(struct ovsdb_datum)
-            + i * sizeof(struct hmap_node));
-}
-
-/* Returns the hmap_node in 'row' for the index numbered 'i'. */
-static struct hmap_node *
-ovsdb_row_get_index_node(struct ovsdb_row *row, size_t i)
-{
-    return (void *) ((char *) row + ovsdb_row_index_offset__(row->table, i));
-}
-
-/* Returns the ovsdb_row given 'index_node', which is a pointer to that row's
- * hmap_node for the index numbered 'i' within 'table'. */
-static struct ovsdb_row *
-ovsdb_row_from_index_node(struct hmap_node *index_node,
-                          const struct ovsdb_table *table, size_t i)
-{
-    return (void *) ((char *) index_node - ovsdb_row_index_offset__(table, i));
-}
-
 void
 ovsdb_txn_abort(struct ovsdb_txn *txn)
 {
@@ -889,7 +862,7 @@ check_max_rows(struct ovsdb_txn *txn)
     return NULL;
 }
 
-static struct ovsdb_row *
+struct ovsdb_row *
 ovsdb_index_search(struct hmap *index, struct ovsdb_row *row, size_t i,
                    uint32_t hash)
 {
diff --git a/ovsdb/transaction.h b/ovsdb/transaction.h
index f659838dc..d94205414 100644
--- a/ovsdb/transaction.h
+++ b/ovsdb/transaction.h
@@ -17,8 +17,10 @@
 #define OVSDB_TRANSACTION_H 1
 
 #include <stdbool.h>
+#include <stdint.h>
 #include "compiler.h"
 
+struct hmap;
 struct json;
 struct ovsdb;
 struct ovsdb_row;
@@ -66,6 +68,9 @@ typedef bool ovsdb_txn_row_cb_func(const struct ovsdb_row 
*old,
                                    void *aux);
 void ovsdb_txn_for_each_change(const struct ovsdb_txn *,
                                ovsdb_txn_row_cb_func *, void *aux);
+struct ovsdb_row *ovsdb_index_search(struct hmap *index,
+                                     struct ovsdb_row *, size_t i,
+                                     uint32_t hash);
 
 void ovsdb_txn_add_comment(struct ovsdb_txn *, const char *);
 const char *ovsdb_txn_get_comment(const struct ovsdb_txn *);
diff --git a/tests/ovsdb-execution.at b/tests/ovsdb-execution.at
index 1ffa2b738..b1a461b79 100644
--- a/tests/ovsdb-execution.at
+++ b/tests/ovsdb-execution.at
@@ -11,7 +11,7 @@ ordinal_schema () {
          "columns": {
            "number": {"type": "integer"},
            "name": {"type": "string"}},
-         "indexes": [["number"]]}},
+         "indexes": [["number"], ["number", "name"]]}},
      "version": "5.1.3",
      "cksum": "12345678 9"}
 EOF
@@ -273,6 +273,112 @@ dnl Attempt to insert second row with same UUID (fails).
 [{"details":"This UUID would duplicate a UUID already present within the table 
or deleted within the same transaction.","error":"duplicate 
uuid","syntax":"\"ffffffff-971b-4cba-bf42-520515973b7e\""}]
 ]])
 
+OVSDB_CHECK_EXECUTION([insert rows, query by index],
+  [ordinal_schema],
+dnl Insert initial rows.
+  [[[["ordinals",
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 0, "name": "zero"}}]]],
+   [[["ordinals",
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 1, "name": "one"}}]]],
+   [[["ordinals",
+      {"op": "delete",
+       "table": "ordinals",
+       "where": [["number", "==", 1]]}]]],
+   [[["ordinals",
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 2, "name": "two"}}]]],
+   [[["ordinals",
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 3, "name": "three"}}]]],
+dnl Query the deleted row.
+   [[["ordinals",
+      {"op": "select",
+       "table": "ordinals",
+       "where": [["number", "==", 1]]}]]],
+dnl Query a non-deleted row with first index.
+   [[["ordinals",
+      {"op": "select",
+       "table": "ordinals",
+       "where": [["number", "==", 3]]}]]],
+dnl Query with second index.
+   [[["ordinals",
+      {"op": "select",
+       "table": "ordinals",
+       "where": [["name", "==", "two"], ["number", "==", 2]]}]]],
+dnl Delete a row and query in the same transaction.
+   [[["ordinals",
+      {"op": "delete",
+       "table": "ordinals",
+       "where": [["number", "==", 2]]},
+      {"op": "select",
+       "table": "ordinals",
+       "where": [["number", "==", 2]]}]]],
+dnl Mutate a row and query in the same transaction.
+   [[["ordinals",
+      {"op": "mutate",
+       "table": "ordinals",
+       "mutations": [["number", "+=", 2]],
+       "where": [["number", "==", 0]]},
+      {"op": "select",
+       "table": "ordinals",
+       "where": [["number", "==", 0]]},
+      {"op": "select",
+       "table": "ordinals",
+       "where": [["number", "==", 2]]}]]]],
+[[[{"uuid":["uuid","<0>"]}]
+[{"uuid":["uuid","<1>"]}]
+[{"count":1}]
+[{"uuid":["uuid","<2>"]}]
+[{"uuid":["uuid","<3>"]}]
+[{"rows":[]}]
+[{"rows":[{"_uuid":["uuid","<3>"],"_version":["uuid","<4>"],"name":"three","number":3}]}]
+[{"rows":[{"_uuid":["uuid","<2>"],"_version":["uuid","<5>"],"name":"two","number":2}]}]
+[{"count":1},{"rows":[]}]
+[{"count":1},{"rows":[]},{"rows":[{"_uuid":["uuid","<0>"],"_version":["uuid","<6>"],"name":"zero","number":2}]}]
+]])
+
+OVSDB_CHECK_EXECUTION([reinsert indexed rows mid flight],
+  [ordinal_schema],
+dnl Insert initial rows.
+  [[[["ordinals",
+      {"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 0, "name": "orig"}
+      }]]],
+  [[["ordinals",
+     {"op": "select",
+      "table": "ordinals",
+      "where": [["number", "==", 0]]
+      },
+     {"op": "insert",
+      "table": "ordinals",
+      "uuid-name": "dup",
+      "row": {"number": 0, "name": "dup"}
+      },
+     {"op": "select",
+      "table": "ordinals",
+      "sort": ["name"],
+      "where": [["number", "==", 0]]
+      },
+     {"op": "delete",
+      "table": "ordinals",
+      "where": [["_uuid", "==", ["named-uuid", "dup"]]]
+      },
+     {"op": "select",
+      "table": "ordinals",
+      "where": [["number", "==", 0]]
+      }]]]
+   ],
+[[[{"uuid":["uuid","<0>"]}]
+[{"rows":[{"_uuid":["uuid","<0>"],"_version":["uuid","<1>"],"name":"orig","number":0}]},{"uuid":["uuid","<2>"]},{"rows":[{"_uuid":["uuid","<2>"],"_version":["uuid","<3>"],"name":"dup","number":0},{"_uuid":["uuid","<0>"],"_version":["uuid","<1>"],"name":"orig","number":0}]},{"count":1},{"rows":[{"_uuid":["uuid","<0>"],"_version":["uuid","<1>"],"name":"orig","number":0}]}]
+]])
+
 OVSDB_CHECK_EXECUTION([insert rows, query by value],
   [ordinal_schema],
   [[[["ordinals",
diff --git a/tests/ovsdb-macros.at b/tests/ovsdb-macros.at
index 503b8b722..ac726fd1f 100644
--- a/tests/ovsdb-macros.at
+++ b/tests/ovsdb-macros.at
@@ -38,6 +38,16 @@ m4_define([OVSDB_CHECK_POSITIVE],
 ], [])
    AT_CLEANUP])
 
+# OVSDB_CHECK_POSITIVE_IDX(TITLE, TEST-OVSDB-ARGS, OUTPUT, [KEYWORDS], 
[PREREQ], [INDEX])
+#
+# Runs "test-ovsdb TEST-OVSDB-ARGS" twice, with and without an index, and 
checks
+# that it exits with status 0 and prints OUTPUT on stdout.
+#
+# TITLE is provided to AT_SETUP and KEYWORDS to AT_KEYWORDS.
+m4_define([OVSDB_CHECK_POSITIVE_IDX],
+  [OVSDB_CHECK_POSITIVE($1, [$2], $3, $4, $5)
+  OVSDB_CHECK_POSITIVE([indexed $1], [m4_bpatsubst([$2], ["columns":], 
["indexes": [$6], "columns":])], $3, $4, $5)])
+
 # OVSDB_CHECK_POSITIVE_PY(TITLE, TEST-OVSDB-ARGS, OUTPUT, [KEYWORDS], [PREREQ],
 #                          [PY-CHECK])
 #
diff --git a/tests/ovsdb-query.at b/tests/ovsdb-query.at
index 0814fda2a..7d891da06 100644
--- a/tests/ovsdb-query.at
+++ b/tests/ovsdb-query.at
@@ -1,6 +1,6 @@
 AT_BANNER([OVSDB -- queries])
 
-OVSDB_CHECK_POSITIVE([queries on scalars],
+OVSDB_CHECK_POSITIVE_IDX([queries on scalars],
   [[query \
     '{"columns":
         {"i": {"type": "integer"},
@@ -90,9 +90,9 @@ query 24: 111-1
 query 25: 1----
 query 26: 1-111
 query 27: --1--],
-  [query])
+  [query], [], [["i"], ["r"], ["s"], ["u"]])
 
-OVSDB_CHECK_POSITIVE([queries on sets],
+OVSDB_CHECK_POSITIVE_IDX([queries on sets],
   [[query \
     '{"columns": {"i": {"type": {"key": "integer", "min": 0, "max": 
"unlimited"}}}}' \
     '[{"i": ["set", []]},
@@ -169,11 +169,11 @@ query 28: 1---1 ---
 query 29: 1111- ---
 query 30: 1-1-- ---
 query 31: 11--- ---
-query 32: 1---- ---], [query])
+query 32: 1---- ---], [query], [], [["i"]])
 
 # This is the same as the "set" test except that it adds values,
 # all of which always match.
-OVSDB_CHECK_POSITIVE([queries on maps (1)],
+OVSDB_CHECK_POSITIVE_IDX([queries on maps (1)],
   [[query \
     '{"columns": {"i": {"type": {"key": "integer",
                                  "value": "boolean",
@@ -253,11 +253,11 @@ query 28: 1---1 ---
 query 29: 1111- ---
 query 30: 1-1-- ---
 query 31: 11--- ---
-query 32: 1---- ---], [query])
+query 32: 1---- ---], [query], [], [["i"]])
 
 # This is the same as the "set" test except that it adds values,
 # and those values don't always match.
-OVSDB_CHECK_POSITIVE([queries on maps (2)],
+OVSDB_CHECK_POSITIVE_IDX([queries on maps (2)],
   [[query \
     '{"columns": {"i": {"type": {"key": "integer",
                                  "value": "boolean",
@@ -346,7 +346,7 @@ query 28: 1-1-1 --11- --1--
 query 29: 11111 11-1- 1----
 query 30: 1-111 ---1- -----
 query 31: 111-1 -1-1- 1----
-query 32: 1-1-1 ---1- -----], [query])
+query 32: 1-1-1 ---1- -----], [query], [], [["i"]])
 
 OVSDB_CHECK_POSITIVE([UUID-distinct queries on scalars],
   [[query-distinct \
@@ -439,7 +439,7 @@ query 24: abc-e
 query 25: a----
 query 26: a-cde
 query 27: --c--],
-  [query])
+  [query], [], [["i"], ["r"], ["s"], ["u"]])
 
 OVSDB_CHECK_POSITIVE([Boolean-distinct queries on scalars],
   [[query-distinct \
diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at
index ce6d32aee..7b4b280e1 100644
--- a/tests/ovsdb-server.at
+++ b/tests/ovsdb-server.at
@@ -1007,7 +1007,7 @@ ovsdb_check_online_compaction() {
         AT_CHECK([[uuidfilt db | grep -v ^OVSDB | \
             sed 's/"_date":[0-9]*/"_date":0/' |  sed 's/"_is_diff":true,//' | \
             ovstest test-json --multiple -]], [0],
-[[{"cksum":"12345678 
9","name":"ordinals","tables":{"ordinals":{"columns":{"name":{"type":"string"},"number":{"type":"integer"}},"indexes":[["number"]]}},"version":"5.1.3"}
+[[{"cksum":"12345678 
9","name":"ordinals","tables":{"ordinals":{"columns":{"name":{"type":"string"},"number":{"type":"integer"}},"indexes":[["number"],["number","name"]]}},"version":"5.1.3"}
 {"_comment":"add row for zero 0","_date":0,"ordinals":{"<0>":{"name":"zero"}}}
 {"_comment":"delete row for 0","_date":0,"ordinals":{"<0>":null}}
 {"_comment":"add back row for zero 
0","_date":0,"ordinals":{"<1>":{"name":"zero"}}}
diff --git a/tests/ovsdb-tool.at b/tests/ovsdb-tool.at
index d8d2b1c99..cf4a0fd2e 100644
--- a/tests/ovsdb-tool.at
+++ b/tests/ovsdb-tool.at
@@ -95,7 +95,7 @@ AT_CHECK(
 dnl Check that all the crap is in fact in the database log.
 AT_CHECK([[uuidfilt db | grep -v ^OVSDB | sed 's/"_date":[0-9]*/"_date":0/' | \
             sed 's/"_is_diff":true,//' | ovstest test-json --multiple -]], [0],
-  [[{"cksum":"12345678 
9","name":"ordinals","tables":{"ordinals":{"columns":{"name":{"type":"string"},"number":{"type":"integer"}},"indexes":[["number"]]}},"version":"5.1.3"}
+  [[{"cksum":"12345678 
9","name":"ordinals","tables":{"ordinals":{"columns":{"name":{"type":"string"},"number":{"type":"integer"}},"indexes":[["number"],["number","name"]]}},"version":"5.1.3"}
 {"_comment":"add row for zero 0","_date":0,"ordinals":{"<0>":{"name":"zero"}}}
 {"_comment":"delete row for 0","_date":0,"ordinals":{"<0>":null}}
 {"_comment":"add back row for zero 
0","_date":0,"ordinals":{"<1>":{"name":"zero"}}}
diff --git a/tests/test-ovsdb.c b/tests/test-ovsdb.c
index 41c1525f4..997a56301 100644
--- a/tests/test-ovsdb.c
+++ b/tests/test-ovsdb.c
@@ -1277,7 +1277,8 @@ do_execute_mutations(struct ovs_cmdl_context *ctx)
     ovsdb_table_destroy(table); /* Also destroys 'ts'. */
 }
 
-/* Inserts a row, without bothering to update metadata such as refcounts. */
+/* Inserts a row and update the indexes, without bothering to update metadata
+ * such as refcounts. */
 static void
 put_row(struct ovsdb_table *table, struct ovsdb_row *row)
 {
@@ -1285,6 +1286,14 @@ put_row(struct ovsdb_table *table, struct ovsdb_row *row)
     if (!ovsdb_table_get_row(table, uuid)) {
         hmap_insert(&table->rows, &row->hmap_node, uuid_hash(uuid));
     }
+
+    for (size_t i = 0; i < table->schema->n_indexes; i++) {
+        const struct ovsdb_column_set *index = &table->schema->indexes[i];
+        struct hmap_node *node = ovsdb_row_get_index_node(row, i);
+        uint32_t hash = ovsdb_row_hash_columns(row, index, 0);
+
+        hmap_insert(&table->indexes[i], node, hash);
+    }
 }
 
 struct do_query_cbdata {
@@ -1362,7 +1371,7 @@ do_query(struct ovs_cmdl_context *ctx)
                                                     NULL, &cnd));
 
         memset(cbdata.counts, 0, cbdata.n_rows * sizeof *cbdata.counts);
-        ovsdb_query(table, &cnd, do_query_cb, &cbdata);
+        ovsdb_query(table, &cnd, do_query_cb, &cbdata, NULL);
 
         printf("query %2"PRIuSIZE":", i);
         for (j = 0; j < cbdata.n_rows; j++) {
@@ -1490,7 +1499,7 @@ do_query_distinct(struct ovs_cmdl_context *ctx)
             classes[j].count = 0;
         }
         ovsdb_row_set_init(&results);
-        ovsdb_query_distinct(table, &cnd, &columns, &results);
+        ovsdb_query_distinct(table, &cnd, &columns, &results, NULL);
         for (j = 0; j < results.n_rows; j++) {
             size_t k;
 
-- 
2.39.3

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to