New database service model 'relay' that is needed to scale out read-mostly database access, e.g. ovn-controller connections to OVN_Southbound.
In this service model ovsdb-server connects to existing OVSDB server and maintains in-memory copy of the database. It serves read-only transactions and monitor requests by its own, but forwards write transactions to the relay source. Key differences from the active-backup replication: - support for "write" transactions (next commit). - no on-disk storage. (probably, faster operation) - support for multiple remotes (connect to the clustered db). - doesn't try to keep connection as long as possible, but faster reconnects to other remotes to avoid missing updates. - No need to know the complete database schema beforehand, only the schema name. - can be used along with other standalone and clustered databases by the same ovsdb-server process. (doesn't turn the whole jsonrpc server to read-only mode) - supports modern version of monitors (monitor_cond_since), because based on ovsdb-cs. - could be chained, i.e. multiple relays could be connected one to another in a row or in a tree-like form. - doesn't increase availability. - cannot be converted to other service models or become a main active server. Acked-by: Dumitru Ceara <dce...@redhat.com> Signed-off-by: Ilya Maximets <i.maxim...@ovn.org> --- ovsdb/_server.ovsschema | 7 +- ovsdb/_server.xml | 18 ++- ovsdb/automake.mk | 2 + ovsdb/execution.c | 5 + ovsdb/ovsdb-server.c | 100 ++++++++---- ovsdb/ovsdb.c | 2 + ovsdb/ovsdb.h | 3 + ovsdb/relay.c | 343 ++++++++++++++++++++++++++++++++++++++++ ovsdb/relay.h | 34 ++++ 9 files changed, 473 insertions(+), 41 deletions(-) create mode 100644 ovsdb/relay.c create mode 100644 ovsdb/relay.h diff --git a/ovsdb/_server.ovsschema b/ovsdb/_server.ovsschema index a867e5cbf..e3d9d893b 100644 --- a/ovsdb/_server.ovsschema +++ b/ovsdb/_server.ovsschema @@ -1,13 +1,14 @@ {"name": "_Server", - "version": "1.1.0", - "cksum": "3236486585 698", + "version": "1.2.0", + "cksum": "3009684573 744", "tables": { "Database": { "columns": { "name": {"type": "string"}, "model": { "type": {"key": {"type": "string", - "enum": ["set", ["standalone", "clustered"]]}}}, + "enum": ["set", + ["standalone", "clustered", "relay"]]}}}, "connected": {"type": "boolean"}, "leader": {"type": "boolean"}, "schema": { diff --git a/ovsdb/_server.xml b/ovsdb/_server.xml index 70cd22db7..37297da73 100644 --- a/ovsdb/_server.xml +++ b/ovsdb/_server.xml @@ -60,12 +60,15 @@ <column name="model"> The storage model: <code>standalone</code> for a standalone or - active-backup database, <code>clustered</code> for a clustered database. + active-backup database, <code>clustered</code> for a clustered database, + <code>relay</code> for a relay database. </column> <column name="schema"> The database schema, as a JSON string. In the case of a clustered - database, this is empty until it finishes joining its cluster. + database, this is empty until it finishes joining its cluster. In the + case of a relay database, this is empty until it connects to the relay + source. </column> <group title="Clustered Databases"> @@ -85,20 +88,21 @@ <column name="leader"> True if the database is the leader in its cluster. For a standalone or - active-backup database, this is always true. + active-backup database, this is always true. For a relay database, + this is always false. </column> <column name="cid"> The cluster ID for this database, which is the same for all of the - servers that host this particular clustered database. For a standalone - or active-backup database, this is empty. + servers that host this particular clustered database. For a + standalone, active-backup or relay database, this is empty. </column> <column name="sid"> The server ID for this database, different for each server that hosts a particular clustered database. A server that hosts more than one clustered database will have a different <code>sid</code> in each one. - For a standalone or active-backup database, this is empty. + For a standalone, active-backup or relay database, this is empty. </column> <column name="index"> @@ -112,7 +116,7 @@ </p> <p> - For a standalone or active-backup database, this is empty. + For a standalone, active-backup or relay database, this is empty. </p> </column> </group> diff --git a/ovsdb/automake.mk b/ovsdb/automake.mk index 446d6c136..05c8ebbdf 100644 --- a/ovsdb/automake.mk +++ b/ovsdb/automake.mk @@ -34,6 +34,8 @@ ovsdb_libovsdb_la_SOURCES = \ ovsdb/rbac.h \ ovsdb/replication.c \ ovsdb/replication.h \ + ovsdb/relay.c \ + ovsdb/relay.h \ ovsdb/row.c \ ovsdb/row.h \ ovsdb/server.c \ diff --git a/ovsdb/execution.c b/ovsdb/execution.c index f6150e944..dd2569055 100644 --- a/ovsdb/execution.c +++ b/ovsdb/execution.c @@ -196,6 +196,11 @@ ovsdb_execute_compose(struct ovsdb *db, const struct ovsdb_session *session, "%s operation not allowed on " "table in reserved database %s", op_name, db->schema->name); + } else if (db->is_relay) { + error = ovsdb_error("not allowed", + "%s operation not allowed when " + "database server is in relay mode", + op_name); } } if (error) { diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c index 23bd226a3..ddf868d16 100644 --- a/ovsdb/ovsdb-server.c +++ b/ovsdb/ovsdb-server.c @@ -44,6 +44,7 @@ #include "openvswitch/poll-loop.h" #include "process.h" #include "replication.h" +#include "relay.h" #include "row.h" #include "simap.h" #include "openvswitch/shash.h" @@ -225,6 +226,8 @@ main_loop(struct server_config *config, } } + ovsdb_relay_run(); + struct shash_node *next; SHASH_FOR_EACH_SAFE (node, next, all_dbs) { struct db *db = node->data; @@ -273,6 +276,8 @@ main_loop(struct server_config *config, replication_wait(); } + ovsdb_relay_wait(); + ovsdb_jsonrpc_server_wait(jsonrpc); unixctl_server_wait(unixctl); SHASH_FOR_EACH(node, all_dbs) { @@ -546,6 +551,9 @@ close_db(struct server_config *config, struct db *db, char *comment) { if (db) { ovsdb_jsonrpc_server_remove_db(config->jsonrpc, db->db, comment); + if (db->db->is_relay) { + ovsdb_relay_del_db(db->db); + } ovsdb_destroy(db->db); free(db->filename); free(db); @@ -554,6 +562,28 @@ close_db(struct server_config *config, struct db *db, char *comment) } } +static void +update_schema(struct ovsdb *db, const struct ovsdb_schema *schema, void *aux) +{ + struct server_config *config = aux; + + if (!db->schema || strcmp(schema->version, db->schema->version)) { + ovsdb_jsonrpc_server_reconnect( + config->jsonrpc, false, + (db->schema + ? xasprintf("database %s schema changed", db->name) + : xasprintf("database %s connected to storage", db->name))); + } + + ovsdb_replace(db, ovsdb_create(ovsdb_schema_clone(schema), NULL)); + + /* Force update to schema in _Server database. */ + struct db *dbp = shash_find_data(config->all_dbs, db->name); + if (dbp) { + dbp->row_uuid = UUID_ZERO; + } +} + static struct ovsdb_error * OVS_WARN_UNUSED_RESULT parse_txn(struct server_config *config, struct db *db, const struct ovsdb_schema *schema, const struct json *txn_json, @@ -575,21 +605,7 @@ parse_txn(struct server_config *config, struct db *db, if (error) { return error; } - - if (!db->db->schema || - strcmp(schema->version, db->db->schema->version)) { - ovsdb_jsonrpc_server_reconnect( - config->jsonrpc, false, - (db->db->schema - ? xasprintf("database %s schema changed", db->db->name) - : xasprintf("database %s connected to storage", - db->db->name))); - } - - ovsdb_replace(db->db, ovsdb_create(ovsdb_schema_clone(schema), NULL)); - - /* Force update to schema in _Server database. */ - db->row_uuid = UUID_ZERO; + update_schema(db->db, schema, config); } if (txn_json) { @@ -660,27 +676,45 @@ add_db(struct server_config *config, struct db *db) static struct ovsdb_error * OVS_WARN_UNUSED_RESULT open_db(struct server_config *config, const char *filename) { + const char *relay_prefix = "relay:"; + const char *relay_remotes = NULL; + const int relay_prefix_len = strlen(relay_prefix); + struct ovsdb_storage *storage; + struct ovsdb_error *error; struct db *db; + bool is_relay; + char *name; + + is_relay = !strncmp(filename, relay_prefix, relay_prefix_len); + if (!is_relay) { + /* If we know that the file is already open, return a good error + * message. Otherwise, if the file is open, we'll fail later on with + * a harder to interpret file locking error. */ + if (is_already_open(config, filename)) { + return ovsdb_error(NULL, "%s: already open", filename); + } - /* If we know that the file is already open, return a good error message. - * Otherwise, if the file is open, we'll fail later on with a harder to - * interpret file locking error. */ - if (is_already_open(config, filename)) { - return ovsdb_error(NULL, "%s: already open", filename); - } + error = ovsdb_storage_open(filename, true, &storage); + if (error) { + return error; + } + name = xstrdup(filename); + } else { + /* Parsing the relay in format 'relay:DB_NAME:<list of remotes>'*/ + relay_remotes = strchr(filename + relay_prefix_len, ':'); - struct ovsdb_storage *storage; - struct ovsdb_error *error; - error = ovsdb_storage_open(filename, true, &storage); - if (error) { - return error; + if (!relay_remotes || relay_remotes[0] == '\0') { + return ovsdb_error(NULL, "%s: invalid syntax", filename); + } + name = xmemdup0(filename, relay_remotes - filename); + storage = ovsdb_storage_create_unbacked(name + relay_prefix_len); + relay_remotes++; /* Skip the ':'. */ } - db = xzalloc(sizeof *db); - db->filename = xstrdup(filename); + db->filename = name; struct ovsdb_schema *schema; - if (ovsdb_storage_is_clustered(storage)) { + if (is_relay || ovsdb_storage_is_clustered(storage)) { schema = NULL; } else { struct json *txn_json; @@ -716,6 +750,10 @@ open_db(struct server_config *config, const char *filename) } add_db(config, db); + + if (is_relay) { + ovsdb_relay_add_db(db->db, relay_remotes, update_schema, config); + } return NULL; } @@ -1153,11 +1191,11 @@ update_database_status(struct ovsdb_row *row, struct db *db) { ovsdb_util_write_string_column(row, "name", db->db->name); ovsdb_util_write_string_column(row, "model", - ovsdb_storage_get_model(db->db->storage)); + db->db->is_relay ? "relay" : ovsdb_storage_get_model(db->db->storage)); ovsdb_util_write_bool_column(row, "connected", ovsdb_storage_is_connected(db->db->storage)); ovsdb_util_write_bool_column(row, "leader", - ovsdb_storage_is_leader(db->db->storage)); + db->db->is_relay ? false : ovsdb_storage_is_leader(db->db->storage)); ovsdb_util_write_uuid_column(row, "cid", ovsdb_storage_get_cid(db->db->storage)); ovsdb_util_write_uuid_column(row, "sid", diff --git a/ovsdb/ovsdb.c b/ovsdb/ovsdb.c index e019631e9..999cd0d75 100644 --- a/ovsdb/ovsdb.c +++ b/ovsdb/ovsdb.c @@ -421,6 +421,8 @@ ovsdb_create(struct ovsdb_schema *schema, struct ovsdb_storage *storage) ovs_list_init(&db->triggers); db->run_triggers_now = db->run_triggers = false; + db->is_relay = false; + shash_init(&db->tables); if (schema) { SHASH_FOR_EACH (node, &schema->tables) { diff --git a/ovsdb/ovsdb.h b/ovsdb/ovsdb.h index 72e127c84..16bd5f5ec 100644 --- a/ovsdb/ovsdb.h +++ b/ovsdb/ovsdb.h @@ -91,6 +91,9 @@ struct ovsdb { bool need_txn_history; /* Need to maintain history of transactions. */ unsigned int n_txn_history; /* Current number of history transactions. */ struct ovs_list txn_history; /* Contains "struct ovsdb_txn_history_node. */ + + /* Relay mode. */ + bool is_relay; }; struct ovsdb *ovsdb_create(struct ovsdb_schema *, struct ovsdb_storage *); diff --git a/ovsdb/relay.c b/ovsdb/relay.c new file mode 100644 index 000000000..740b34ddf --- /dev/null +++ b/ovsdb/relay.c @@ -0,0 +1,343 @@ +/* + * Copyright (c) 2021, Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> + +#include "relay.h" + +#include "coverage.h" +#include "jsonrpc.h" +#include "openvswitch/hmap.h" +#include "openvswitch/json.h" +#include "openvswitch/list.h" +#include "openvswitch/poll-loop.h" +#include "openvswitch/shash.h" +#include "openvswitch/vlog.h" +#include "ovsdb.h" +#include "ovsdb-cs.h" +#include "ovsdb-error.h" +#include "row.h" +#include "table.h" +#include "transaction.h" +#include "util.h" + +VLOG_DEFINE_THIS_MODULE(relay); + +static struct shash relay_dbs = SHASH_INITIALIZER(&relay_dbs); + +struct relay_ctx { + struct ovsdb *db; + struct ovsdb_cs *cs; + + /* Schema updates. */ + struct ovsdb_schema *new_schema; + schema_change_callback schema_change_cb; + void *schema_change_aux; +}; + +static struct json * +ovsdb_relay_compose_monitor_request(const struct json *schema_json, void *ctx_) +{ + struct json *monitor_request = json_object_create(); + struct relay_ctx *ctx = ctx_; + struct ovsdb_schema *schema; + struct ovsdb *db = ctx->db; + struct ovsdb_error *error; + + error = ovsdb_schema_from_json(schema_json, &schema); + if (error) { + char *msg = ovsdb_error_to_string_free(error); + VLOG_WARN("%s: Failed to parse db schema: %s", db->name, msg); + free(msg); + /* There is nothing we can really do here. */ + return monitor_request; + } + + const struct shash_node *node; + SHASH_FOR_EACH (node, &schema->tables) { + struct json *monitor_request_array = json_array_create_empty(); + struct ovsdb_table_schema *table = node->data; + + json_array_add(monitor_request_array, json_object_create()); + json_object_put(monitor_request, table->name, monitor_request_array); + } + + if (!db->schema || !ovsdb_schema_equal(schema, db->schema)) { + VLOG_DBG("database %s schema changed.", db->name); + if (ctx->new_schema) { + ovsdb_schema_destroy(ctx->new_schema); + } + /* We will update the schema later when we will receive actual data + * from the mointor in order to avoid sitting with an empty database + * until the monitor reply. */ + ctx->new_schema = schema; + } else { + ovsdb_schema_destroy(schema); + } + return monitor_request; +} + +static struct ovsdb_cs_ops relay_cs_ops = { + .compose_monitor_requests = ovsdb_relay_compose_monitor_request, +}; + +void +ovsdb_relay_add_db(struct ovsdb *db, const char *remote, + schema_change_callback schema_change_cb, + void *schema_change_aux) +{ + struct relay_ctx *ctx; + + if (!db || !remote) { + return; + } + + ctx = shash_find_data(&relay_dbs, db->name); + if (ctx) { + ovsdb_cs_set_remote(ctx->cs, remote, true); + VLOG_DBG("%s: relay source set to '%s'", db->name, remote); + return; + } + + db->is_relay = true; + ctx = xzalloc(sizeof *ctx); + ctx->schema_change_cb = schema_change_cb; + ctx->schema_change_aux = schema_change_aux; + ctx->db = db; + ctx->cs = ovsdb_cs_create(db->name, 3, &relay_cs_ops, ctx); + shash_add(&relay_dbs, db->name, ctx); + ovsdb_cs_set_leader_only(ctx->cs, false); + ovsdb_cs_set_remote(ctx->cs, remote, true); + + VLOG_DBG("added database: %s, %s", db->name, remote); +} + +void +ovsdb_relay_del_db(struct ovsdb *db) +{ + struct relay_ctx *ctx; + + if (!db) { + return; + } + + ctx = shash_find_and_delete(&relay_dbs, db->name); + if (!ctx) { + VLOG_WARN("Failed to remove relay database %s: not found.", db->name); + return; + } + + VLOG_DBG("removed database: %s", db->name); + + db->is_relay = false; + ovsdb_cs_destroy(ctx->cs); + free(ctx); +} + +static struct ovsdb_error * +ovsdb_relay_process_row_update(struct ovsdb_table *table, + const struct ovsdb_cs_row_update *ru, + struct ovsdb_txn *txn) +{ + const struct uuid *uuid = &ru->row_uuid; + struct ovsdb_error * error = NULL; + + /* XXX: ovsdb-cs module returns shash which was previously part of a json + * structure and we need json row format in order to use ovsdb_row* + * functions. Creating a json object out of shash. */ + struct json *json_row = json_object_create(); + struct shash *obj = json_row->object; + json_row->object = CONST_CAST(struct shash *, ru->columns); + + switch (ru->type) { + case OVSDB_CS_ROW_DELETE: + error = ovsdb_table_execute_delete(txn, uuid, table); + break; + + case OVSDB_CS_ROW_INSERT: + error = ovsdb_table_execute_insert(txn, uuid, table, json_row); + break; + + case OVSDB_CS_ROW_UPDATE: + error = ovsdb_table_execute_update(txn, uuid, table, json_row, false); + break; + + case OVSDB_CS_ROW_XOR: + error = ovsdb_table_execute_update(txn, uuid, table, json_row, true); + break; + + default: + OVS_NOT_REACHED(); + } + + json_row->object = obj; + json_destroy(json_row); + + return error; +} + +static struct ovsdb_error * +ovsdb_relay_parse_update__(struct ovsdb *db, + const struct ovsdb_cs_db_update *du) +{ + struct ovsdb_error *error = NULL; + struct ovsdb_txn *txn; + + txn = ovsdb_txn_create(db); + + for (size_t i = 0; i < du->n; i++) { + const struct ovsdb_cs_table_update *tu = &du->table_updates[i]; + struct ovsdb_table *table = ovsdb_get_table(db, tu->table_name); + + if (!table) { + error = ovsdb_error("unknown table", "unknown table %s", + tu->table_name); + goto exit; + } + + for (size_t j = 0; j < tu->n; j++) { + const struct ovsdb_cs_row_update *ru = &tu->row_updates[j]; + + error = ovsdb_relay_process_row_update(table, ru, txn); + if (error) { + goto exit; + } + } + } + +exit: + if (error) { + ovsdb_txn_abort(txn); + return error; + } else { + /* Commit transaction. */ + error = ovsdb_txn_propose_commit_block(txn, false); + } + + return error; +} + +static struct ovsdb_error * +ovsdb_relay_clear(struct ovsdb *db) +{ + struct ovsdb_txn *txn = ovsdb_txn_create(db); + struct shash_node *table_node; + + SHASH_FOR_EACH (table_node, &db->tables) { + struct ovsdb_table *table = table_node->data; + struct ovsdb_row *row, *next; + + HMAP_FOR_EACH_SAFE (row, next, hmap_node, &table->rows) { + ovsdb_txn_row_delete(txn, row); + } + } + + return ovsdb_txn_propose_commit_block(txn, false); +} + +static void +ovsdb_relay_parse_update(struct relay_ctx *ctx, + const struct ovsdb_cs_update_event *update) +{ + if (!ctx->db) { + return; + } + + if (update->monitor_reply && ctx->new_schema) { + /* There was a schema change. Updating a database with a new schema + * before processing monitor reply with the new data. */ + ctx->schema_change_cb(ctx->db, ctx->new_schema, + ctx->schema_change_aux); + ovsdb_schema_destroy(ctx->new_schema); + ctx->new_schema = NULL; + } + + struct ovsdb_cs_db_update *du; + struct ovsdb_error *error = ovsdb_cs_parse_db_update(update->table_updates, + update->version, &du); + if (!error) { + if (update->clear) { + error = ovsdb_relay_clear(ctx->db); + } + if (!error) { + error = ovsdb_relay_parse_update__(ctx->db, du); + } + } + ovsdb_cs_db_update_destroy(du); + if (error) { + static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); + if (!VLOG_DROP_WARN(&rl)) { + char *s = ovsdb_error_to_string(error); + VLOG_WARN_RL(&rl, "%s", s); + free(s); + } + /* Something bad happened. Try to recover. */ + if (!strcmp(ovsdb_error_get_tag(error), "consistency violation")) { + ovsdb_cs_flag_inconsistency(ctx->cs); + } else { + ovsdb_cs_force_reconnect(ctx->cs); + } + ovsdb_error_destroy(error); + } +} + +void +ovsdb_relay_run(void) +{ + struct shash_node *node; + SHASH_FOR_EACH (node, &relay_dbs) { + struct relay_ctx *ctx = node->data; + struct ovs_list events; + + ovsdb_cs_run(ctx->cs, &events); + + struct ovsdb_cs_event *event; + LIST_FOR_EACH_POP (event, list_node, &events) { + if (!ctx->db) { + ovsdb_cs_event_destroy(event); + continue; + } + + switch (event->type) { + case OVSDB_CS_EVENT_TYPE_RECONNECT: + /* Nothing to do. */ + break; + + case OVSDB_CS_EVENT_TYPE_UPDATE: + ovsdb_relay_parse_update(ctx, &event->update); + break; + + case OVSDB_CS_EVENT_TYPE_TXN_REPLY: + case OVSDB_CS_EVENT_TYPE_LOCKED: + /* Not expected. */ + break; + } + ovsdb_cs_event_destroy(event); + } + } +} + +void +ovsdb_relay_wait(void) +{ + struct shash_node *node; + + SHASH_FOR_EACH (node, &relay_dbs) { + struct relay_ctx *ctx = node->data; + + ovsdb_cs_wait(ctx->cs); + } +} diff --git a/ovsdb/relay.h b/ovsdb/relay.h new file mode 100644 index 000000000..68586e9db --- /dev/null +++ b/ovsdb/relay.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2021, Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OVSDB_RELAY_H +#define OVSDB_RELAY_H 1 + +struct json; +struct ovsdb; +struct ovsdb_schema; + +typedef void (*schema_change_callback)(struct ovsdb *, + const struct ovsdb_schema *, void *aux); + +void ovsdb_relay_add_db(struct ovsdb *, const char *remote, + schema_change_callback schema_change_cb, + void *schema_change_aux); +void ovsdb_relay_del_db(struct ovsdb *); +void ovsdb_relay_run(void); +void ovsdb_relay_wait(void); + +#endif /* OVSDB_RELAY_H */ -- 2.31.1 _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev