This patch adds support for the creation of multicolumn indexes in the C IDL to enable for efficient search and retrieval of database rows by key.
Signed-off-by: Esteban Rodriguez Betancourt <esteb...@hpe.com> Co-authored-by: Lance Richardson <lrich...@redhat.com> Signed-off-by: Lance Richardson <lrich...@redhat.com> --- v7: - Reworked "row_sync" comments and changed field name to "ins_del" to (hopefully) better explain why this mechanism exists. - Removed ovsdb_idl_index_read() since ovsdb_idl_read() is equivalent (suggested by Ben Pfaff). - Removed several declared-but-not-defined functions from ovsdb-idl.h, also re-ordered indexing-related items in this file to have macros followed by data type definitions and finally function declarations. v6: - Reworked v5 memory leak fix to avoid need for dynamically allocating strings passed to ovsdb_idl_index_write(). - Fixed null pointer dereference in ovsdb_idl_initialize_cursor() when called with non-existent index name. v5: - Coding style fixes (checkpatch.py) - Fixed memory leak (missing ovsdb_datum_destroy() in ovsdb_idl_index_destroy_row__()). - Some polishing of comment and log message text. lib/ovsdb-idl-provider.h | 29 ++++ lib/ovsdb-idl.c | 401 +++++++++++++++++++++++++++++++++++++++++++++++ lib/ovsdb-idl.h | 51 ++++++ 3 files changed, 481 insertions(+) diff --git a/lib/ovsdb-idl-provider.h b/lib/ovsdb-idl-provider.h index 8cfbb95aa..59fb24015 100644 --- a/lib/ovsdb-idl-provider.h +++ b/lib/ovsdb-idl-provider.h @@ -1,4 +1,5 @@ /* Copyright (c) 2009, 2010, 2011, 2012, 2016 Nicira, Inc. + * Copyright (C) 2016 Hewlett Packard Enterprise Development LP * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -111,6 +112,7 @@ struct ovsdb_idl_table { struct hmap rows; /* Contains "struct ovsdb_idl_row"s. */ struct ovsdb_idl *idl; /* Containing idl. */ unsigned int change_seqno[OVSDB_IDL_CHANGE_MAX]; + struct shash indexes; /* Contains "struct ovsdb_idl_index"s */ struct ovs_list track_list; /* Tracked rows (ovsdb_idl_row.track_node). */ struct ovsdb_idl_condition condition; bool cond_changed; @@ -122,6 +124,33 @@ struct ovsdb_idl_class { size_t n_tables; }; +/* + * Structure containing the per-column configuration of the index. + */ +struct ovsdb_idl_index_column { + const struct ovsdb_idl_column *column; /* Column used for index key. */ + column_comparator *comparer; /* Column comparison function. */ + int sorting_order; /* Sorting order (ascending or descending). */ +}; + +/* + * Defines a IDL compound index + */ +struct ovsdb_idl_index { + struct skiplist *skiplist; /* Skiplist with pointers to rows. */ + struct ovsdb_idl_index_column *columns; /* Columns configuration */ + size_t n_columns; /* Number of columns in index. */ + size_t alloc_columns; /* Size allocated memory for columns, + comparers and sorting order. */ + bool ins_del; /* True if a row in the index is being + inserted or deleted; if true, the + search key is augmented with the + UUID and address in order to discriminate + between entries with identical keys. */ + const struct ovsdb_idl_table *table; /* Table that owns this index */ + const char *index_name; /* The name of this index. */ +}; + struct ovsdb_idl_row *ovsdb_idl_get_row_arc( struct ovsdb_idl_row *src, const struct ovsdb_idl_table_class *dst_table, diff --git a/lib/ovsdb-idl.c b/lib/ovsdb-idl.c index 893143c55..5753c1dc6 100644 --- a/lib/ovsdb-idl.c +++ b/lib/ovsdb-idl.c @@ -1,4 +1,5 @@ /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017 Nicira, Inc. + * Copyright (C) 2016 Hewlett Packard Enterprise Development LP * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,8 +38,10 @@ #include "ovsdb-parser.h" #include "poll-loop.h" #include "openvswitch/shash.h" +#include "skiplist.h" #include "sset.h" #include "util.h" +#include "uuid.h" #include "openvswitch/vlog.h" VLOG_DEFINE_THIS_MODULE(ovsdb_idl); @@ -228,6 +231,14 @@ ovsdb_idl_table_from_class(const struct ovsdb_idl *, static bool ovsdb_idl_track_is_set(struct ovsdb_idl_table *table); static void ovsdb_idl_send_cond_change(struct ovsdb_idl *idl); +static struct ovsdb_idl_index *ovsdb_idl_create_index_(const struct + ovsdb_idl_table *table, + size_t allocated_cols); +static void + ovsdb_idl_destroy_indexes(struct ovsdb_idl_table *table); +static void ovsdb_idl_add_to_indexes(const struct ovsdb_idl_row *); +static void ovsdb_idl_remove_from_indexes(const struct ovsdb_idl_row *); + /* Creates and returns a connection to database 'remote', which should be in a * form acceptable to jsonrpc_session_open(). The connection will maintain an * in-memory replica of the remote database whose schema is described by @@ -274,6 +285,7 @@ ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class, memset(table->modes, default_mode, tc->n_columns); table->need_table = false; shash_init(&table->columns); + shash_init(&table->indexes); for (j = 0; j < tc->n_columns; j++) { const struct ovsdb_idl_column *column = &tc->columns[j]; @@ -329,6 +341,7 @@ ovsdb_idl_destroy(struct ovsdb_idl *idl) for (i = 0; i < idl->class->n_tables; i++) { struct ovsdb_idl_table *table = &idl->tables[i]; ovsdb_idl_condition_destroy(&table->condition); + ovsdb_idl_destroy_indexes(table); shash_destroy(&table->columns); hmap_destroy(&table->rows); free(table->modes); @@ -364,6 +377,7 @@ ovsdb_idl_clear(struct ovsdb_idl *idl) struct ovsdb_idl_arc *arc, *next_arc; if (!ovsdb_idl_row_is_orphan(row)) { + ovsdb_idl_remove_from_indexes(row); ovsdb_idl_row_unparse(row); } LIST_FOR_EACH_SAFE (arc, next_arc, src_node, &row->src_arcs) { @@ -2026,6 +2040,389 @@ ovsdb_idl_row_unparse(struct ovsdb_idl_row *row) (c->unparse)(row); } } + +/* The OVSDB-IDL Compound Indexes feature allows for the creation of custom + * table indexes over one or more columns in the IDL. These indexes provide + * the ability to retrieve rows matching a particular search criteria and to + * iterate over a subset of rows in a defined order. + */ + +/* Creates a new index with the provided name, attached to the given idl and + * table. Note that all indexes must be created and indexing columns added + * before the first call to ovsdb_idl_run() is made. + */ +struct ovsdb_idl_index * +ovsdb_idl_create_index(struct ovsdb_idl *idl, + const struct ovsdb_idl_table_class *tc, + const char *index_name) +{ + struct ovsdb_idl_index *index; + size_t i; + + for (i = 0; i < idl->class->n_tables; i++) { + struct ovsdb_idl_table *table = &idl->tables[i]; + + if (table->class == tc) { + index = ovsdb_idl_create_index_(table, 1); + if (!shash_add_once(&table->indexes, index_name, index)) { + VLOG_ERR("Duplicate index name '%s' in table %s", + index_name, table->class->name); + return NULL; + } + index->index_name = index_name; + return index; + } + } + OVS_NOT_REACHED(); + return NULL; +} + +/* Generic comparator that can compare each index, using the custom + * configuration (an struct ovsdb_idl_index) passed to it. + * Not intended for direct usage. + */ +static int +ovsdb_idl_index_generic_comparer(const void *a, + const void *b, const void *conf) +{ + const struct ovsdb_idl_column *column; + const struct ovsdb_idl_index *index; + size_t i; + + index = CONST_CAST(struct ovsdb_idl_index *, conf); + + if (a == b) { + return 0; + } + + for (i = 0; i < index->n_columns; i++) { + int val; + if (index->columns[i].comparer) { + val = index->columns[i].comparer(a, b); + } else { + column = index->columns[i].column; + const struct ovsdb_idl_row *row_a, *row_b; + row_a = CONST_CAST(struct ovsdb_idl_row *, a); + row_b = CONST_CAST(struct ovsdb_idl_row *, b); + const struct ovsdb_datum *datum_a, *datum_b; + datum_a = ovsdb_idl_read(row_a, column); + datum_b = ovsdb_idl_read(row_b, column); + val = ovsdb_datum_compare_3way(datum_a, datum_b, &column->type); + } + + if (val) { + return val * index->columns[i].sorting_order; + } + } + + /* If ins_del is true then a row is being inserted into or deleted from + * the index list. In this case, we augment the search key with + * additional values (row UUID and memory address) to create a unique + * search key in order to locate the correct entry efficiently and to + * ensure that the correct entry is deleted in the case of a "delete" + * operation. + */ + if (index->ins_del) { + const struct ovsdb_idl_row *row_a, *row_b; + + row_a = (const struct ovsdb_idl_row *) a; + row_b = (const struct ovsdb_idl_row *) b; + int value = uuid_compare_3way(&row_a->uuid, &row_b->uuid); + + return value ? value : (a < b) - (a > b); + } else { + return 0; + } +} + +static struct ovsdb_idl_index * +ovsdb_idl_create_index_(const struct ovsdb_idl_table *table, + size_t allocated_cols) +{ + struct ovsdb_idl_index *index; + + index = xmalloc(sizeof (struct ovsdb_idl_index)); + index->n_columns = 0; + index->alloc_columns = allocated_cols; + index->skiplist = skiplist_create(ovsdb_idl_index_generic_comparer, index); + index->columns = xmalloc(allocated_cols * + sizeof (struct ovsdb_idl_index_column)); + index->ins_del = false; + index->table = table; + return index; +} + +static void +ovsdb_idl_destroy_indexes(struct ovsdb_idl_table *table) +{ + struct ovsdb_idl_index *index; + struct shash_node *node; + + SHASH_FOR_EACH (node, &(table->indexes)) { + index = node->data; + skiplist_destroy(index->skiplist, NULL); + free(index->columns); + } +} + +static void +ovsdb_idl_add_to_indexes(const struct ovsdb_idl_row *row) +{ + struct ovsdb_idl_table *table = row->table; + struct ovsdb_idl_index *index; + struct shash_node *node; + + SHASH_FOR_EACH (node, &(table->indexes)) { + index = node->data; + index->ins_del = true; + skiplist_insert(index->skiplist, row); + index->ins_del = false; + } +} + +static void +ovsdb_idl_remove_from_indexes(const struct ovsdb_idl_row *row) +{ + struct ovsdb_idl_table *table = row->table; + struct ovsdb_idl_index *index; + struct shash_node *node; + + SHASH_FOR_EACH (node, &(table->indexes)) { + index = node->data; + index->ins_del = true; + skiplist_delete(index->skiplist, row); + index->ins_del = false; + } +} + +/* Adds a column to an existing index (note that columns can only be added to + * an index before the first call to ovsdb_idl_run()). The 'order' parameter + * specifies whether the sort order should be ascending (OVSDB_INDEX_ASC) or + * descending (OVSDB_INDEX_DESC). The 'custom_comparer' parameter, if non-NULL, + * contains a pointer to a custom comparison function. A default comparison + * function is used if a custom comparison function is not provided (the + * default comparison function can only be used for columns of type string, + * uuid, integer, real, or boolean). + */ +void +ovsdb_idl_index_add_column(struct ovsdb_idl_index *index, + const struct ovsdb_idl_column *column, + int order, column_comparator *custom_comparer) +{ + /* Check that the column or table is tracked */ + if (!index->table->need_table && + !((OVSDB_IDL_MONITOR | OVSDB_IDL_ALERT) & + *ovsdb_idl_get_mode(index->table->idl, column))) { + VLOG_ERR("Can't add unmonitored column '%s' at index '%s' in " + "table '%s'.", + column->name, index->index_name, index->table->class->name); + } + if (!ovsdb_type_is_scalar(&column->type) && !custom_comparer) { + VLOG_WARN("Comparing non-scalar values."); + } + + /* Allocate more memory for column configuration */ + if (index->n_columns == index->alloc_columns) { + index->alloc_columns++; + index->columns = xrealloc(index->columns, + index->alloc_columns * + sizeof(struct ovsdb_idl_index_column)); + } + + /* Append column to index */ + int i = index->n_columns; + + index->columns[i].column = column; + index->columns[i].comparer = custom_comparer ? custom_comparer : NULL; + if (order == OVSDB_INDEX_ASC) { + index->columns[i].sorting_order = OVSDB_INDEX_ASC; + } else { + index->columns[i].sorting_order = OVSDB_INDEX_DESC; + } + index->n_columns++; +} + +bool +ovsdb_idl_initialize_cursor(struct ovsdb_idl *idl, + const struct ovsdb_idl_table_class *tc, + const char *index_name, + struct ovsdb_idl_index_cursor *cursor) +{ + size_t i; + + for (i = 0; i < idl->class->n_tables; i++) { + struct ovsdb_idl_table *table = &idl->tables[i]; + + if (table->class == tc) { + struct shash_node *node = shash_find(&table->indexes, index_name); + + if (!node || !node->data) { + VLOG_ERR("Cursor initialization failed, " + "index %s at table %s does not exist.", + index_name, tc->name); + cursor->index = NULL; + cursor->position = NULL; + return false; + } + cursor->index = node->data; + cursor->position = skiplist_first(cursor->index->skiplist); + return true; + } + } + VLOG_ERR("Cursor initialization failed, " + "index %s at table %s does not exist.", index_name, tc->name); + return false; +} + +/* ovsdb_idl_index_write_ writes a datum in an ovsdb_idl_row, + * and updates the corresponding field in the table record. + * Not intended for direct usage. + */ +void +ovsdb_idl_index_write_(struct ovsdb_idl_row *const_row, + const struct ovsdb_idl_column *column, + struct ovsdb_datum *datum, + const struct ovsdb_idl_table_class *class) +{ + struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, const_row); + size_t column_idx = column - class->columns; + + if (bitmap_is_set(row->written, column_idx)) { + free(row->new[column_idx].values); + free(row->new[column_idx].keys); + } else { + bitmap_set1(row->written, column_idx); + } + row->new[column_idx] = *datum; + (column->unparse)(row); + (column->parse)(row, &row->new[column_idx]); +} + +/* Initializes a row for use in an indexed query. + * Not intended for direct usage. + */ +struct ovsdb_idl_row * +ovsdb_idl_index_init_row(struct ovsdb_idl * idl, + const struct ovsdb_idl_table_class *class) +{ + struct ovsdb_idl_row *row = xzalloc(class->allocation_size); + class->row_init(row); + row->new = xmalloc(class->n_columns * sizeof *row->new); + row->written = bitmap_allocate(class->n_columns); + row->table = ovsdb_idl_table_from_class(idl, class); + return row; +} + +/* Destroys 'row_' and frees all associated memory. This function is intended + * to be used indirectly through one of the "index_destroy_row" functions + * generated by ovsdb-idlc. + */ +void +ovsdb_idl_index_destroy_row__(const struct ovsdb_idl_row *row_) +{ + struct ovsdb_idl_row *row = CONST_CAST(struct ovsdb_idl_row *, row_); + const struct ovsdb_idl_table_class *class = row->table->class; + const struct ovsdb_idl_column *c; + size_t i; + + BITMAP_FOR_EACH_1 (i, class->n_columns, row->written) { + c = &class->columns[i]; + (c->unparse) (row); + free(row->new[i].values); + free(row->new[i].keys); + } + free(row->new); + free(row->written); + free(row); +} + +/* Moves the cursor to the first entry in the index. Returns a pointer to the + * corresponding ovsdb_idl_row, or NULL if the index list is empy. + */ +struct ovsdb_idl_row * +ovsdb_idl_index_first(struct ovsdb_idl_index_cursor *cursor) +{ + cursor->position = skiplist_first(cursor->index->skiplist); + return ovsdb_idl_index_data(cursor); +} + +/* Moves the cursor to the next record in the index list. + */ +struct ovsdb_idl_row * +ovsdb_idl_index_next(struct ovsdb_idl_index_cursor *cursor) +{ + if (!cursor->position) { + return NULL; + } + cursor->position = skiplist_next(cursor->position); + return ovsdb_idl_index_data(cursor); + } + +/* Returns the ovsdb_idl_row pointer corresponding to the record at the + * current cursor location. + */ +struct ovsdb_idl_row * +ovsdb_idl_index_data(struct ovsdb_idl_index_cursor *cursor) +{ + return skiplist_get_data(cursor->position); +} + +/* Moves the cursor to the first entry in the index matching the specified + * value. If 'value' is NULL, the cursor is moved to the last entry in the + * list. Returns a pointer to the corresponding ovsdb_idl_row or NULL. + */ +struct ovsdb_idl_row * +ovsdb_idl_index_find(struct ovsdb_idl_index_cursor *cursor, + struct ovsdb_idl_row *value) +{ + if (value) { + cursor->position = skiplist_find(cursor->index->skiplist, value); + } else { + cursor->position = skiplist_first(cursor->index->skiplist); + } + return ovsdb_idl_index_data(cursor); +} + +/* Moves the cursor to the first entry in the index with a value greater than + * or equal to the given value. If 'value' is NULL, the cursor is moved to the + * first entry in the index. Returns a pointer to the corresponding + * ovsdb_idl_row or NULL if such a row does not exist. + */ +struct ovsdb_idl_row * +ovsdb_idl_index_forward_to(struct ovsdb_idl_index_cursor *cursor, + struct ovsdb_idl_row *value) +{ + if (value) { + cursor->position = skiplist_forward_to(cursor->index->skiplist, value); + } else { + cursor->position = skiplist_first(cursor->index->skiplist); + } + return ovsdb_idl_index_data(cursor); +} + +/* Returns the result of comparing two rows using the comparison function + * for this index. + * Returns: + * < 0 if a < b + * 0 if a == b + * > 0 if a > b + * When the pointer to either row is NULL, this function considers NULL to be + * greater than any other value, and NULL == NULL. + */ +int +ovsdb_idl_index_compare(struct ovsdb_idl_index_cursor *cursor, + struct ovsdb_idl_row *a, struct ovsdb_idl_row *b) +{ + if (a && b) { + return ovsdb_idl_index_generic_comparer(a, b, cursor->index); + } else if (!a && !b) { + return 0; + } else if (a) { + return -1; + } else { + return 1; + } +} static void ovsdb_idl_row_clear_old(struct ovsdb_idl_row *row) @@ -2235,11 +2632,13 @@ ovsdb_idl_insert_row(struct ovsdb_idl_row *row, const struct json *row_json) ovsdb_idl_row_parse(row); ovsdb_idl_row_reparse_backrefs(row); + ovsdb_idl_add_to_indexes(row); } static void ovsdb_idl_delete_row(struct ovsdb_idl_row *row) { + ovsdb_idl_remove_from_indexes(row); ovsdb_idl_row_unparse(row); ovsdb_idl_row_clear_arcs(row, true); ovsdb_idl_row_clear_old(row); @@ -2257,10 +2656,12 @@ ovsdb_idl_modify_row(struct ovsdb_idl_row *row, const struct json *row_json) { bool changed; + ovsdb_idl_remove_from_indexes(row); ovsdb_idl_row_unparse(row); ovsdb_idl_row_clear_arcs(row, true); changed = ovsdb_idl_row_update(row, row_json, OVSDB_IDL_CHANGE_MODIFY); ovsdb_idl_row_parse(row); + ovsdb_idl_add_to_indexes(row); return changed; } diff --git a/lib/ovsdb-idl.h b/lib/ovsdb-idl.h index 281873d09..b00b10543 100644 --- a/lib/ovsdb-idl.h +++ b/lib/ovsdb-idl.h @@ -1,4 +1,5 @@ /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc. + * Copyright (C) 2016 Hewlett Packard Enterprise Development LP * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,6 +42,7 @@ #include "ovsdb-data.h" #include "openvswitch/list.h" #include "ovsdb-condition.h" +#include "skiplist.h" struct json; struct ovsdb_datum; @@ -358,4 +360,53 @@ unsigned int ovsdb_idl_set_condition(struct ovsdb_idl *, const struct ovsdb_idl_condition *); unsigned int ovsdb_idl_get_condition_seqno(const struct ovsdb_idl *); + +/* The OVSDB-IDL Compound Indexes feature allows for the creation of custom + * table indexes over one or more columns in the IDL. These indexes provide + * the ability to retrieve rows matching a particular search criteria and to + * iterate over a subset of rows in a defined order. + */ + +#define OVSDB_INDEX_DESC -1 +#define OVSDB_INDEX_ASC 1 + +/* + * Skiplist comparison function. Allows to store sorted data. + */ +typedef int (column_comparator)(const void *a, const void *b); + +struct ovsdb_idl_index_cursor { + struct ovsdb_idl_index *index; /* Index used by this cursor */ + struct skiplist_node *position; /* Current position in the index */ +}; + +struct ovsdb_idl_index *ovsdb_idl_create_index(struct ovsdb_idl *idl, + const struct ovsdb_idl_table_class *tc, + const char *index_name); +void ovsdb_idl_index_add_column(struct ovsdb_idl_index *, + const struct ovsdb_idl_column *, + int order, + column_comparator *custom_comparer); +bool ovsdb_idl_initialize_cursor(struct ovsdb_idl *, + const struct ovsdb_idl_table_class *tc, + const char *index_name, + struct ovsdb_idl_index_cursor *cursor); +void ovsdb_idl_index_write_(struct ovsdb_idl_row *, + const struct ovsdb_idl_column *, + struct ovsdb_datum *, + const struct ovsdb_idl_table_class *); +struct ovsdb_idl_row *ovsdb_idl_index_init_row(struct ovsdb_idl *, + const struct ovsdb_idl_table_class *); +void ovsdb_idl_index_destroy_row__(const struct ovsdb_idl_row *); +struct ovsdb_idl_row *ovsdb_idl_index_first(struct ovsdb_idl_index_cursor *); +struct ovsdb_idl_row *ovsdb_idl_index_next(struct ovsdb_idl_index_cursor *); +struct ovsdb_idl_row *ovsdb_idl_index_data(struct ovsdb_idl_index_cursor *); +struct ovsdb_idl_row *ovsdb_idl_index_find(struct ovsdb_idl_index_cursor *, + struct ovsdb_idl_row *); +struct ovsdb_idl_row *ovsdb_idl_index_forward_to( + struct ovsdb_idl_index_cursor *, + struct ovsdb_idl_row *); +int ovsdb_idl_index_compare(struct ovsdb_idl_index_cursor *, + struct ovsdb_idl_row *a, + struct ovsdb_idl_row *b); #endif /* ovsdb-idl.h */ -- 2.13.0 _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev