On 1/9/24 23:49, Ilya Maximets wrote: > Refactoring of the replication code, so each database is handled > separately from each other. Supposed to work the same way as before > with the only difference that each backup database will have its own > connection to the source and will have its own state machine. > > From the user's perspective, the only visible difference is that > ovsdb-server/sync-status appctl now shows the status of each > database separately. > > If one of the connections is permanently broken, all the databases > will be switched to active. This is done in order to preserve the > old behavior where we had only one connection. > > Signed-off-by: Ilya Maximets <i.maxim...@ovn.org> > ---
Hi Ilya, > ovsdb/ovsdb-server.c | 74 +++-- > ovsdb/replication.c | 676 ++++++++++++++++++++----------------------- > ovsdb/replication.h | 36 +-- > 3 files changed, 384 insertions(+), 402 deletions(-) > > diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c > index 7e95b3813..9a3b0add1 100644 > --- a/ovsdb/ovsdb-server.c > +++ b/ovsdb/ovsdb-server.c > @@ -166,12 +166,12 @@ ovsdb_replication_init(const char *sync_from, const > char *exclude, > struct shash *all_dbs, const struct uuid *server_uuid, > int probe_interval) > { > - replication_init(sync_from, exclude, server_uuid, probe_interval); > struct shash_node *node; > SHASH_FOR_EACH (node, all_dbs) { > struct db *db = node->data; > if (node->name[0] != '_' && db->db) { > - replication_add_local_db(node->name, db->db); > + replication_set_db(db->db, sync_from, exclude, > + server_uuid, probe_interval); > } > } > } > @@ -228,11 +228,20 @@ main_loop(struct server_config *config, > report_error_if_changed(reconfigure_ssl(all_dbs), &ssl_error); > ovsdb_jsonrpc_server_run(jsonrpc); > > + replication_run(); > if (*is_backup) { > - replication_run(); > - if (!replication_is_alive()) { > - disconnect_active_server(); > - *is_backup = false; > + SHASH_FOR_EACH (node, all_dbs) { > + struct db *db = node->data; > + if (db->db->name[0] != '_' && !replication_is_alive(db->db)) > { > + *is_backup = false; > + break; > + } > + } > + if (!*is_backup) { > + SHASH_FOR_EACH (node, all_dbs) { > + struct db *db = node->data; > + replication_remove_db(db->db); > + } > } > } > > @@ -283,10 +292,8 @@ main_loop(struct server_config *config, > update_server_status(all_dbs); > > memory_wait(); > - if (*is_backup) { > - replication_wait(); > - } > > + replication_wait(); > ovsdb_relay_wait(); > > ovsdb_jsonrpc_server_wait(jsonrpc); > @@ -518,7 +525,7 @@ main(int argc, char *argv[]) > &server_config); > unixctl_command_register("ovsdb-server/get-sync-exclude-tables", "", > 0, 0, ovsdb_server_get_sync_exclude_tables, > - NULL); > + &server_config); > unixctl_command_register("ovsdb-server/sync-status", "", > 0, 0, ovsdb_server_get_sync_status, > &server_config); > @@ -606,6 +613,9 @@ close_db(struct server_config *config, struct db *db, > char *comment) > if (db->db->is_relay) { > ovsdb_relay_del_db(db->db); > } > + if (*config->is_backup) { > + replication_remove_db(db->db); > + } > ovsdb_destroy(db->db); > free(db->filename); > free(db); > @@ -1500,8 +1510,12 @@ ovsdb_server_disconnect_active_ovsdb_server(struct > unixctl_conn *conn, > void *config_) > { > struct server_config *config = config_; > + struct shash_node *node; > > - disconnect_active_server(); > + SHASH_FOR_EACH (node, config->all_dbs) { > + struct db *db = node->data; > + replication_remove_db(db->db); > + } > *config->is_backup = false; > save_config(config); > unixctl_command_reply(conn, NULL); > @@ -1520,7 +1534,11 @@ > ovsdb_server_set_active_ovsdb_server_probe_interval(struct unixctl_conn *conn, > *config->replication_probe_interval = probe_interval; > save_config(config); > if (*config->is_backup) { > - replication_set_probe_interval(probe_interval); > + const struct uuid *server_uuid; > + server_uuid = ovsdb_jsonrpc_server_get_uuid(config->jsonrpc); > + ovsdb_replication_init(*config->sync_from, *config->sync_exclude, > + config->all_dbs, server_uuid, > + *config->replication_probe_interval); > } > unixctl_command_reply(conn, NULL); > } else { > @@ -1557,7 +1575,7 @@ ovsdb_server_set_sync_exclude_tables(struct > unixctl_conn *conn, > { > struct server_config *config = config_; > > - char *err = set_excluded_tables(argv[1], true); > + char *err = parse_excluded_tables(argv[1]); > if (!err) { > free(*config->sync_exclude); > *config->sync_exclude = xstrdup(argv[1]); > @@ -1569,7 +1587,6 @@ ovsdb_server_set_sync_exclude_tables(struct > unixctl_conn *conn, > config->all_dbs, server_uuid, > *config->replication_probe_interval); > } > - err = set_excluded_tables(argv[1], false); > } > unixctl_command_reply(conn, err); > free(err); > @@ -1579,11 +1596,11 @@ static void > ovsdb_server_get_sync_exclude_tables(struct unixctl_conn *conn, > int argc OVS_UNUSED, > const char *argv[] OVS_UNUSED, > - void *arg_ OVS_UNUSED) > + void *config_) > { > - char *reply = get_excluded_tables(); > - unixctl_command_reply(conn, reply); > - free(reply); > + struct server_config *config = config_; > + > + unixctl_command_reply(conn, *config->sync_exclude); > } > > static void > @@ -1842,13 +1859,6 @@ remove_db(struct server_config *config, struct > shash_node *node, char *comment) > shash_delete(config->all_dbs, node); > > save_config(config); > - if (*config->is_backup) { > - const struct uuid *server_uuid; > - server_uuid = ovsdb_jsonrpc_server_get_uuid(config->jsonrpc); > - ovsdb_replication_init(*config->sync_from, *config->sync_exclude, > - config->all_dbs, server_uuid, > - *config->replication_probe_interval); > - } > } > > static void > @@ -1990,7 +2000,17 @@ ovsdb_server_get_sync_status(struct unixctl_conn > *conn, int argc OVS_UNUSED, > ds_put_format(&ds, "state: %s\n", is_backup ? "backup" : "active"); > > if (is_backup) { > - ds_put_and_free_cstr(&ds, replication_status()); > + const struct shash_node **db_nodes = shash_sort(config->all_dbs); > + > + for (size_t i = 0; i < shash_count(config->all_dbs); i++) { > + const struct db *db = db_nodes[i]->data; > + > + if (db->db && db->db->name[0] != '_') { > + ds_put_and_free_cstr(&ds, replication_status(db->db)); > + ds_put_char(&ds, '\n'); > + } > + } > + free(db_nodes); > } > > unixctl_command_reply(conn, ds_cstr(&ds)); > @@ -2154,7 +2174,7 @@ parse_options(int argc, char *argv[], > break; > > case OPT_SYNC_EXCLUDE: { > - char *err = set_excluded_tables(optarg, false); > + char *err = parse_excluded_tables(optarg); > if (err) { > ovs_fatal(0, "%s", err); > } > diff --git a/ovsdb/replication.c b/ovsdb/replication.c > index 477c69d70..d0d48aad5 100644 > --- a/ovsdb/replication.c > +++ b/ovsdb/replication.c > @@ -38,16 +38,7 @@ > > VLOG_DEFINE_THIS_MODULE(replication); > > -static char *sync_from; > static struct uuid server_uuid; > -static struct jsonrpc_session *session; > -static unsigned int session_seqno = UINT_MAX; > - > -static struct jsonrpc_msg *create_monitor_request(struct ovsdb_schema *); > -static void add_monitored_table(struct ovsdb_table_schema *table, > - struct json *monitor_requests); > - > -static struct ovsdb_error *reset_database(struct ovsdb *db); > > static struct ovsdb_error *process_notification(struct json *, struct ovsdb > *); > static struct ovsdb_error *process_table_update(struct json *table_update, > @@ -55,27 +46,6 @@ static struct ovsdb_error *process_table_update(struct > json *table_update, > struct ovsdb *database, > struct ovsdb_txn *txn); > > -/* Maps from db name to sset of table names. */ > -static struct shash excluded_tables = SHASH_INITIALIZER(&excluded_tables); > - > -static void excluded_tables_clear(void); > -static void excluded_tables_add(const char *database, const char *table); > -static bool excluded_tables_find(const char *database, const char *table); > - > - > -/* Keep track of request IDs of all outstanding OVSDB requests. */ > -static struct hmap request_ids = HMAP_INITIALIZER(&request_ids); > - > -struct request_ids_hmap_node { > - struct hmap_node hmap; > - struct json *request_id; > - struct ovsdb *db; /* associated database */ > -}; > -void request_ids_add(const struct json *id, struct ovsdb *db); > -bool request_ids_lookup_and_free(const struct json *id, struct ovsdb **db); > -static void request_ids_destroy(void); > -void request_ids_clear(void); > - > enum ovsdb_replication_state { > RPL_S_INIT, > RPL_S_SERVER_ID_REQUESTED, > @@ -85,168 +55,231 @@ enum ovsdb_replication_state { > RPL_S_REPLICATING, > RPL_S_ERR /* Error, no longer replicating. */ > }; > -static enum ovsdb_replication_state state; > > - > struct replication_db { > struct ovsdb *db; > + > bool schema_version_higher; > /* Points to the schema received from the active server if > * the local db schema version is higher. NULL otherwise. */ > struct ovsdb_schema *active_db_schema; > + > + char *sync_from; > + char *excluded_tables_str; > + struct sset excluded_tables; > + > + struct json *request_id; /* Id of the outstanding OVSDB request. */ > + > + struct jsonrpc_session *session; > + unsigned int session_seqno; > + > + enum ovsdb_replication_state state; > }; > > static bool is_replication_possible(struct ovsdb_schema *local_db_schema, > struct ovsdb_schema *active_db_schema); > > +static struct jsonrpc_msg *create_monitor_request(struct replication_db *, > + struct ovsdb_schema *); > +static void add_monitored_table(struct ovsdb_table_schema *table, > + struct json *monitor_requests); > + > + > /* All DBs known to ovsdb-server. The actual replication dbs are stored > * in 'replication dbs', which is a subset of all dbs and remote dbs whose > * schema matches. */ > -static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs); > -static struct shash *replication_dbs; > +static struct shash replication_dbs = SHASH_INITIALIZER(&replication_dbs); > + > +static void replication_db_destroy(struct replication_db *); > +static struct ovsdb_error *reset_database(struct replication_db *); > > -static struct shash *replication_dbs_create(void); > -static void replication_dbs_destroy(void); > /* Find 'struct ovsdb' by name within 'replication_dbs' */ > static struct replication_db *find_db(const char *db_name); > + > +static char *set_excluded_tables(struct replication_db *, const char > *excluded) > + OVS_WARN_UNUSED_RESULT; > + > +static void request_id_set(struct replication_db *, const struct json *id); > +static void request_id_clear(struct replication_db *); > +static bool request_id_compare_and_free(struct replication_db *, > + const struct json *id); > > > void > -replication_init(const char *sync_from_, const char *exclude_tables, > - const struct uuid *server, int probe_interval) > +replication_set_db(struct ovsdb *db, const char *sync_from, > + const char *exclude_tables, const struct uuid *server, > + int probe_interval) > { > - free(sync_from); > - sync_from = xstrdup(sync_from_); > - /* Caller should have verified that the 'exclude_tables' is > - * parseable. An error here is unexpected. */ > - ovs_assert(!set_excluded_tables(exclude_tables, false)); > + struct replication_db *rdb = find_db(db->name); > > - replication_dbs_destroy(); > + if (uuid_is_zero(&server_uuid)) { > + /* Keep a copy of local server uuid. */ > + server_uuid = *server; > + } else { > + ovs_assert(uuid_equals(&server_uuid, server)); > + } > + > + ovs_assert(sync_from); > + > + if (rdb > + && nullable_string_is_equal(rdb->excluded_tables_str, exclude_tables) > + && nullable_string_is_equal(rdb->sync_from, sync_from)) { > + jsonrpc_session_set_probe_interval(rdb->session, probe_interval); > + return; > + } > > - shash_clear(&local_dbs); > - if (session) { > - jsonrpc_session_close(session); > + if (!rdb) { > + rdb = xzalloc(sizeof *rdb); > + rdb->db = db; > + sset_init(&rdb->excluded_tables); > + rdb->schema_version_higher = false; > + shash_add(&replication_dbs, db->name, rdb); > + } else { > + replication_db_destroy(rdb); > } > > - session = jsonrpc_session_open(sync_from, true); > - session_seqno = UINT_MAX; > + rdb->sync_from = xstrdup(sync_from); > + rdb->excluded_tables_str = nullable_xstrdup(exclude_tables); > + /* Caller should have verified that the 'exclude_tables' is > + * parseable. An error here is unexpected. */ > + ovs_assert(!set_excluded_tables(rdb, exclude_tables)); > > - jsonrpc_session_set_probe_interval(session, probe_interval); > + rdb->session = jsonrpc_session_open(rdb->sync_from, true); > + rdb->session_seqno = UINT_MAX; > > - /* Keep a copy of local server uuid. */ > - server_uuid = *server; > + jsonrpc_session_set_probe_interval(rdb->session, probe_interval); > > - state = RPL_S_INIT; > + rdb->state = RPL_S_INIT; > } > > void > -replication_add_local_db(const char *database, struct ovsdb *db) > +replication_remove_db(const struct ovsdb *db) > { > - shash_add_assert(&local_dbs, database, db); > + struct replication_db *rdb; > + > + rdb = shash_find_and_delete(&replication_dbs, db->name); > + if (rdb) { > + replication_db_destroy(rdb); > + free(rdb); > + } > } > > -static void > -send_schema_requests(const struct json *result) > +static bool > +json_array_contains_string(const struct json *js, const char *str) Doesn't this fit better in json.[ch]? > { > - for (size_t i = 0; i < result->array.n; i++) { > - const struct json *name = result->array.elems[i]; > - if (name->type == JSON_STRING) { > - /* Send one schema request for each remote DB. */ > - const char *db_name = json_string(name); > - struct replication_db *rdb = find_db(db_name); > - if (rdb) { > - struct jsonrpc_msg *request = > - jsonrpc_create_request( > - "get_schema", > - json_array_create_1( > - json_string_create(db_name)), > - NULL); > - > - request_ids_add(request->id, rdb->db); > - jsonrpc_session_send(session, request); > - } > + bool found = false; > + I guess this comment is more relevant if we move the function to json.[ch] but should we ensure we don't use this incorrectly? E.g.: ovs_assert(json->type == JSON_ARRAY); Or without aborting and just returning early? > + for (size_t i = 0; i < js->array.n; i++) { > + const struct json *elem = js->array.elems[i]; > + > + if (elem->type == JSON_STRING && !strcmp(json_string(elem), str)) { > + found = true; > + break; > } > } > + return found; > } > > -void > -replication_run(void) > +static void > +send_schema_request(struct replication_db *rdb) > +{ > + struct jsonrpc_msg *request = > + jsonrpc_create_request( > + "get_schema", > + json_array_create_1(json_string_create(rdb->db->name)), > + NULL); > + > + request_id_set(rdb, request->id); > + jsonrpc_session_send(rdb->session, request); > +} > + > +static void > +replication_run_db(struct replication_db *rdb) > { > - if (!session) { > + if (!rdb->session) { > return; > } > > - jsonrpc_session_run(session); > + jsonrpc_session_run(rdb->session); > > - for (int i = 0; jsonrpc_session_is_connected(session) && i < 50; i++) { > + for (int i = 0; i < 50; i++) { > struct jsonrpc_msg *msg; > unsigned int seqno; > > - seqno = jsonrpc_session_get_seqno(session); > - if (seqno != session_seqno || state == RPL_S_INIT) { > - session_seqno = seqno; > - request_ids_clear(); > + if (!jsonrpc_session_is_connected(rdb->session)) { > + break; > + } > + > + seqno = jsonrpc_session_get_seqno(rdb->session); > + if (seqno != rdb->session_seqno || rdb->state == RPL_S_INIT) { > + rdb->session_seqno = seqno; > + request_id_clear(rdb); > + > struct jsonrpc_msg *request; > request = jsonrpc_create_request("get_server_id", > json_array_create_empty(), > NULL); > - request_ids_add(request->id, NULL); > - jsonrpc_session_send(session, request); > + request_id_set(rdb, request->id); > + jsonrpc_session_send(rdb->session, request); > > - state = RPL_S_SERVER_ID_REQUESTED; > - VLOG_DBG("send server ID request."); > + rdb->state = RPL_S_SERVER_ID_REQUESTED; > + VLOG_DBG("%s: send server ID request.", rdb->db->name); > } > > - msg = jsonrpc_session_recv(session); > + msg = jsonrpc_session_recv(rdb->session); > if (!msg) { > continue; > } > > - if (msg->type == JSONRPC_NOTIFY && state != RPL_S_ERR > + if (msg->type == JSONRPC_NOTIFY && rdb->state != RPL_S_ERR > && !strcmp(msg->method, "update")) { > if (msg->params->type == JSON_ARRAY > && msg->params->array.n == 2 > && msg->params->array.elems[0]->type == JSON_STRING) { > char *db_name = msg->params->array.elems[0]->string; > - struct replication_db *rdb = find_db(db_name); > - if (rdb) { > + > + if (!strcmp(db_name, rdb->db->name)) { > struct ovsdb_error *error; > error = process_notification(msg->params->array.elems[1], > rdb->db); > if (error) { > ovsdb_error_assert(error); > - state = RPL_S_ERR; > + rdb->state = RPL_S_ERR; > } > + } else { > + VLOG_WARN("%s: received update for unexpected database > %s", > + rdb->db->name, db_name); > + rdb->state = RPL_S_ERR; > } > } > } else if (msg->type == JSONRPC_REPLY) { > - struct replication_db *rdb; > - struct ovsdb *db; > - if (!request_ids_lookup_and_free(msg->id, &db)) { > - VLOG_WARN("received unexpected reply"); > + if (!request_id_compare_and_free(rdb, msg->id)) { > + VLOG_WARN("%s: received unexpected reply.", rdb->db->name); > goto next; > } > > - switch (state) { > + switch (rdb->state) { > case RPL_S_SERVER_ID_REQUESTED: { > struct uuid uuid; > if (msg->result->type != JSON_STRING || > !uuid_from_string(&uuid, json_string(msg->result))) { > struct ovsdb_error *error; > error = ovsdb_error("get_server_id failed", > - "Server ID is not valid UUID"); > + "%s: Server ID is not valid UUID", > + rdb->db->name); > > ovsdb_error_assert(error); > - state = RPL_S_ERR; > + rdb->state = RPL_S_ERR; > break; > } > > if (uuid_equals(&uuid, &server_uuid)) { > struct ovsdb_error *error; > error = ovsdb_error("Server ID check failed", > - "Self replicating is not allowed"); > + "%s: Self replicating is not > allowed", > + rdb->db->name); > > ovsdb_error_assert(error); > - state = RPL_S_ERR; > + rdb->state = RPL_S_ERR; > break; > } > > @@ -254,25 +287,32 @@ replication_run(void) > request = jsonrpc_create_request("list_dbs", > json_array_create_empty(), > NULL); > - request_ids_add(request->id, NULL); > - jsonrpc_session_send(session, request); > + request_id_set(rdb, request->id); > + jsonrpc_session_send(rdb->session, request); > > - replication_dbs_destroy(); > - replication_dbs = replication_dbs_create(); > - state = RPL_S_DB_REQUESTED; > + rdb->state = RPL_S_DB_REQUESTED; > break; > } > case RPL_S_DB_REQUESTED: > if (msg->result->type != JSON_ARRAY) { > struct ovsdb_error *error; > error = ovsdb_error("list_dbs failed", > - "list_dbs response is not array"); > + "%s: list_dbs response is not array", > + rdb->db->name); > + ovsdb_error_assert(error); > + rdb->state = RPL_S_ERR; > + } else if (!json_array_contains_string(msg->result, > + rdb->db->name)) { > + struct ovsdb_error *error; > + error = ovsdb_error("list_dbs failed", > + "%s: database name is not in the > list", > + rdb->db->name); > ovsdb_error_assert(error); > - state = RPL_S_ERR; > + rdb->state = RPL_S_ERR; > } else { > - send_schema_requests(msg->result); > - VLOG_DBG("Send schema requests"); > - state = RPL_S_SCHEMA_REQUESTED; > + send_schema_request(rdb); > + VLOG_DBG("%s: send schema request.", rdb->db->name); > + rdb->state = RPL_S_SCHEMA_REQUESTED; > } > break; > > @@ -283,19 +323,22 @@ replication_run(void) > error = ovsdb_schema_from_json(msg->result, &schema); > if (error) { > ovsdb_error_assert(error); > - state = RPL_S_ERR; > + rdb->state = RPL_S_ERR; > + break; > } > > - rdb = find_db(schema->name); > - if (!rdb) { > + if (strcmp(rdb->db->name, schema->name)) { > /* Unexpected schema. */ > - VLOG_WARN("unexpected schema %s", schema->name); > - state = RPL_S_ERR; > + VLOG_WARN("%s: unexpected schema %s.", > + rdb->db->name, schema->name); > + rdb->state = RPL_S_ERR; > + ovsdb_schema_destroy(schema); > + break; > } else if (!ovsdb_schema_equal(schema, rdb->db->schema)) { > /* Schmea version mismatch. */ > - VLOG_INFO("Schema version mismatch, checking if %s can " > - "still be replicated or not.", > - schema->name); > + VLOG_INFO("%s: Schema version mismatch, checking if %s > can" > + " still be replicated or not.", > + rdb->db->name, schema->name); > if (is_replication_possible(rdb->db->schema, schema)) { > VLOG_INFO("%s can be replicated.", schema->name); > rdb->schema_version_higher = true; > @@ -305,68 +348,48 @@ replication_run(void) > rdb->active_db_schema = schema; > } else { > VLOG_INFO("%s cannot be replicated.", schema->name); > - struct replication_db *r = > - shash_find_and_delete(replication_dbs, > - schema->name); > - if (r->active_db_schema) { > - ovsdb_schema_destroy(r->active_db_schema); > - } > - free(r); > + rdb->state = RPL_S_ERR; > ovsdb_schema_destroy(schema); > + break; > } > } else { > ovsdb_schema_destroy(schema); > } > > - /* After receiving schemas, reset the local databases that > - * will be monitored and send out monitor requests for them. > */ > - if (hmap_is_empty(&request_ids)) { > - struct shash_node *node; > - > - if (shash_is_empty(replication_dbs)) { > - VLOG_WARN("Nothing to replicate."); > - state = RPL_S_ERR; > - } else { > - SHASH_FOR_EACH (node, replication_dbs) { > - rdb = node->data; > - struct jsonrpc_msg *request = > - create_monitor_request( > - rdb->schema_version_higher ? > - rdb->active_db_schema : rdb->db->schema); > - > - request_ids_add(request->id, rdb->db); > - jsonrpc_session_send(session, request); > - VLOG_DBG("Send monitor requests"); > - state = RPL_S_MONITOR_REQUESTED; > - } > - } > - } > + /* Send out a monitor request. */ > + struct jsonrpc_msg *request = > + create_monitor_request(rdb, rdb->schema_version_higher > + ? rdb->active_db_schema > + : rdb->db->schema); > + > + request_id_set(rdb, request->id); > + jsonrpc_session_send(rdb->session, request); > + VLOG_DBG("%s: send monitor request.", rdb->db->name); > + rdb->state = RPL_S_MONITOR_REQUESTED; > break; > } > > case RPL_S_MONITOR_REQUESTED: { > /* Reply to monitor requests. */ > struct ovsdb_error *error; > - VLOG_INFO("Monitor request received. Resetting the > database"); > + VLOG_INFO("%s: Monitor reply received. " > + "Resetting the database.", rdb->db->name); > /* Resetting the database here has few risks. If the > * process_notification() fails, the database is completely > * lost locally. In case that node becomes active, then > * there is a chance of complete data loss in the > active/standy > * cluster. */ > - error = reset_database(db); > + error = reset_database(rdb); > if (!error) { > - error = process_notification(msg->result, db); > + error = process_notification(msg->result, rdb->db); > } > if (error) { > ovsdb_error_assert(error); > - state = RPL_S_ERR; > + rdb->state = RPL_S_ERR; > } else { > - /* Transition to replicating state after receiving > - * all replies of "monitor" requests. */ > - if (hmap_is_empty(&request_ids)) { > - VLOG_DBG("Listening to monitor updates"); > - state = RPL_S_REPLICATING; > - } > + VLOG_DBG("%s: Listening to monitor updates.", > + rdb->db->name); > + rdb->state = RPL_S_REPLICATING; > } > break; > } > @@ -378,7 +401,7 @@ replication_run(void) > case RPL_S_INIT: > case RPL_S_REPLICATING: > default: > - OVS_NOT_REACHED(); > + VLOG_WARN("%s: received unexpected reply.", rdb->db->name); Why do we just warn instead of aborting? I assume the chance to get here is as high as it was before the patch, right? > } > } > next: > @@ -386,24 +409,40 @@ replication_run(void) > } > } > > +void > +replication_run(void) > +{ > + struct shash_node *node; > + > + SHASH_FOR_EACH (node, &replication_dbs) { > + replication_run_db(node->data); > + } > +} > + > void > replication_wait(void) > { > - if (session) { > - jsonrpc_session_wait(session); > - jsonrpc_session_recv_wait(session); > + struct shash_node *node; > + > + SHASH_FOR_EACH (node, &replication_dbs) { > + struct replication_db *rdb = node->data; > + > + if (rdb->session) { > + jsonrpc_session_wait(rdb->session); > + jsonrpc_session_recv_wait(rdb->session); > + } > } > } > > -/* Parse 'excluded' to rebuild 'excluded_tables'. If 'dryrun' is false, the > - * current set of excluded tables will be wiped out, regardless of whether > - * 'excluded' can be parsed. If 'dryrun' is true, only parses 'excluded' and > +/* Parse 'excluded' to rebuild 'rdb->excluded_tables'. If 'rdb' is not NULL, > + * the current set of excluded tables will be wiped out, regardless of > whether > + * 'excluded' can be parsed. If 'rdb' is NULL, only parses 'excluded' and > * reports any errors, without modifying the list of exclusions. > * > - * On error, returns the error string, which the caller is > - * responsible for freeing. Returns NULL otherwise. */ > -char * OVS_WARN_UNUSED_RESULT > -set_excluded_tables(const char *excluded, bool dryrun) > + * On error, returns the error string, which the caller is responsible for > + * freeing. Returns NULL otherwise. */ > +static char * OVS_WARN_UNUSED_RESULT > +set_excluded_tables__(struct replication_db *rdb, const char *excluded) > { > struct sset set = SSET_INITIALIZER(&set); > char *err = NULL; > @@ -411,17 +450,22 @@ set_excluded_tables(const char *excluded, bool dryrun) > if (excluded) { > const char *longname; > > - if (!dryrun) { > - /* Can only add to an empty shash. */ > - excluded_tables_clear(); > + if (rdb) { > + /* Can only add to an empty set. */ > + sset_clear(&rdb->excluded_tables); > } > > sset_from_delimited_string(&set, excluded, " ,"); > SSET_FOR_EACH (longname, &set) { > + if (rdb && !strchr(longname, ':')) { > + sset_add(&rdb->excluded_tables, longname); > + continue; > + } > + > char *database = xstrdup(longname), *table = NULL; > strtok_r(database, ":", &table); > - if (table && !dryrun) { > - excluded_tables_add(database, table); > + if (table && rdb && !strcmp(rdb->db->name, database)) { > + sset_add(&rdb->excluded_tables, table); > } > > free(database); > @@ -434,120 +478,74 @@ set_excluded_tables(const char *excluded, bool dryrun) > > done: > sset_destroy(&set); > - if (err && !dryrun) { > + if (err && rdb) { > /* On error, destroy the partially built 'excluded_tables'. */ > - excluded_tables_clear(); > + sset_clear(&rdb->excluded_tables); > } > return err; > } > > char * OVS_WARN_UNUSED_RESULT > -get_excluded_tables(void) > +parse_excluded_tables(const char *excluded) > { > - struct shash_node *node; > - struct sset set = SSET_INITIALIZER(&set); > - > - SHASH_FOR_EACH (node, &excluded_tables) { > - const char *database = node->name; > - const char *table; > - struct sset *tables = node->data; > - > - SSET_FOR_EACH (table, tables) { > - sset_add_and_free(&set, xasprintf("%s:%s", database, table)); > - } > - } > - > - /* Output the table list in an sorted order, so that > - * the output string will not depend on the hash function > - * that used to implement the hmap data structure. This is > - * only useful for writting unit tests. */ > - const char **sorted = sset_sort(&set); > - struct ds ds = DS_EMPTY_INITIALIZER; > - size_t i; > - for (i = 0; i < sset_count(&set); i++) { > - ds_put_format(&ds, "%s,", sorted[i]); > - } > - > - ds_chomp(&ds, ','); > - > - free(sorted); > - sset_destroy(&set); > - > - return ds_steal_cstr(&ds); > + return set_excluded_tables__(NULL, excluded); > } > > -static void > -excluded_tables_clear(void) > +static char * OVS_WARN_UNUSED_RESULT > +set_excluded_tables(struct replication_db *rdb, const char *excluded) > { > - struct shash_node *node; > - SHASH_FOR_EACH (node, &excluded_tables) { > - struct sset *tables = node->data; > - sset_destroy(tables); > - } > - > - shash_clear_free_data(&excluded_tables); > + return set_excluded_tables__(rdb, excluded); > } > > -static void > -excluded_tables_add(const char *database, const char *table) > +char * OVS_WARN_UNUSED_RESULT > +get_excluded_tables(const struct ovsdb *db) > { > - struct sset *tables = shash_find_data(&excluded_tables, database); > + const struct replication_db *rdb = find_db(db->name); > > - if (!tables) { > - tables = xmalloc(sizeof *tables); > - sset_init(tables); > - shash_add(&excluded_tables, database, tables); > + if (!rdb) { > + return xstrdup(""); > } > > - sset_add(tables, table); > -} > + struct sset set = SSET_INITIALIZER(&set); > + const char *table; > + char *result; > > -static bool > -excluded_tables_find(const char *database, const char *table) > -{ > - struct sset *tables = shash_find_data(&excluded_tables, database); > - return tables && sset_contains(tables, table); > -} > + SSET_FOR_EACH (table, &rdb->excluded_tables) { > + sset_add_and_free(&set, xasprintf("%s:%s", rdb->db->name, table)); > + } > > -void > -disconnect_active_server(void) > -{ > - jsonrpc_session_close(session); > - session = NULL; > + result = sset_join(&set, ",", ""); > + sset_destroy(&set); > + > + return result; > } > > void > replication_destroy(void) > { > - excluded_tables_clear(); > - shash_destroy(&excluded_tables); > + struct shash_node *node; > > - if (sync_from) { > - free(sync_from); > - sync_from = NULL; > + SHASH_FOR_EACH (node, &replication_dbs) { > + replication_db_destroy(node->data); > } > - > - request_ids_destroy(); > - replication_dbs_destroy(); > - > - shash_destroy(&local_dbs); > + shash_destroy_free_data(&replication_dbs); > } > > static struct replication_db * > find_db(const char *db_name) > { > - return shash_find_data(replication_dbs, db_name); > + return shash_find_data(&replication_dbs, db_name); > } > > static struct ovsdb_error * > -reset_database(struct ovsdb *db) > +reset_database(struct replication_db *rdb) > { > - struct ovsdb_txn *txn = ovsdb_txn_create(db); > + struct ovsdb_txn *txn = ovsdb_txn_create(rdb->db); > struct shash_node *table_node; > > - SHASH_FOR_EACH (table_node, &db->tables) { > + SHASH_FOR_EACH (table_node, &rdb->db->tables) { > /* Delete all rows if the table is not excluded. */ > - if (!excluded_tables_find(db->schema->name, table_node->name)) { > + if (!sset_contains(&rdb->excluded_tables, table_node->name)) { > struct ovsdb_table *table = table_node->data; > struct ovsdb_row *row; > HMAP_FOR_EACH_SAFE (row, hmap_node, &table->rows) { > @@ -565,7 +563,7 @@ reset_database(struct ovsdb *db) > * Caller is responsible for disposing 'request'. > */ > static struct jsonrpc_msg * > -create_monitor_request(struct ovsdb_schema *schema) > +create_monitor_request(struct replication_db *rdb, struct ovsdb_schema > *schema) > { > struct jsonrpc_msg *request; > struct json *monitor; > @@ -579,7 +577,7 @@ create_monitor_request(struct ovsdb_schema *schema) > struct ovsdb_table_schema *table = nodes[j]->data; > > /* Monitor all tables not excluded. */ > - if (!excluded_tables_find(db_name, table->name)) { > + if (!sset_contains(&rdb->excluded_tables, table->name)) { > add_monitored_table(table, monitor_request); > } > } > @@ -689,114 +687,76 @@ process_table_update(struct json *table_update, const > char *table_name, > return NULL; > } > > -void > -request_ids_add(const struct json *id, struct ovsdb *db) > +static void > +request_id_set(struct replication_db *rdb, const struct json *id) > { > - struct request_ids_hmap_node *node = xmalloc(sizeof *node); > + ovs_assert(!rdb->request_id); > + rdb->request_id = json_clone(id); > +} > > - node->request_id = json_clone(id); > - node->db = db; > - hmap_insert(&request_ids, &node->hmap, json_hash(id, 0)); > +static void > +request_id_clear(struct replication_db *rdb) > +{ > + json_destroy(rdb->request_id); > + rdb->request_id = NULL; > } > > -/* Look up 'id' from 'request_ids', if found, remove the found id from > - * 'request_ids' and free its memory. If not found, 'request_ids' does > - * not change. Sets '*db' to the database for the request (NULL if not > - * found). > +/* Compare 'id' with sent 'request_id'. If it mtches, clear the current Typo: mtches > + * 'request_id'. If it doesn't match, 'request_id' does not change. > * > - * Return true if 'id' is found, false otherwise. > + * Return true if 'id' matches, false otherwise. > */ > -bool > -request_ids_lookup_and_free(const struct json *id, struct ovsdb **db) > +static bool > +request_id_compare_and_free(struct replication_db *rdb, const struct json > *id) > { > - struct request_ids_hmap_node *node; > - > - HMAP_FOR_EACH_WITH_HASH (node, hmap, json_hash(id, 0), &request_ids) { > - if (json_equal(id, node->request_id)) { > - hmap_remove(&request_ids, &node->hmap); > - *db = node->db; > - json_destroy(node->request_id); > - free(node); > - return true; > - } > + if (rdb->request_id && json_equal(id, rdb->request_id)) { > + request_id_clear(rdb); > + return true; > } > - > - *db = NULL; > return false; > } > > static void > -request_ids_destroy(void) > +replication_db_destroy(struct replication_db *rdb) > { > - struct request_ids_hmap_node *node; > - > - HMAP_FOR_EACH_POP (node, hmap, &request_ids) { > - json_destroy(node->request_id); > - free(node); > + if (!rdb) { > + return; > } > - hmap_destroy(&request_ids); > -} > > -void > -request_ids_clear(void) > -{ > - request_ids_destroy(); > - hmap_init(&request_ids); > -} > + free(rdb->sync_from); > + rdb->sync_from = NULL; > > -static struct shash * > -replication_dbs_create(void) > -{ > - struct shash *new = xmalloc(sizeof *new); > - shash_init(new); > + free(rdb->excluded_tables_str); > + rdb->excluded_tables_str = NULL; > + sset_destroy(&rdb->excluded_tables); > > - struct shash_node *node; > - SHASH_FOR_EACH (node, &local_dbs) { > - struct replication_db *repl_db = xmalloc(sizeof *repl_db); > - repl_db->db = node->data; > - repl_db->schema_version_higher = false; > - repl_db->active_db_schema = NULL; > - shash_add(new, node->name, repl_db); > - } > + request_id_clear(rdb); > > - return new; > -} > - > -static void > -replication_dbs_destroy(void) > -{ > - if (!replication_dbs) { > - return; > + if (rdb->session) { > + jsonrpc_session_close(rdb->session); > + rdb->session = NULL; > } > > - struct shash_node *node; > - > - SHASH_FOR_EACH_SAFE (node, replication_dbs) { > - hmap_remove(&replication_dbs->map, &node->node); > - struct replication_db *rdb = node->data; > - if (rdb->active_db_schema) { > - ovsdb_schema_destroy(rdb->active_db_schema); > - } > - free(rdb); > - free(node->name); > - free(node); > + if (rdb->active_db_schema) { > + ovsdb_schema_destroy(rdb->active_db_schema); > + rdb->active_db_schema = NULL; > } > > - hmap_destroy(&replication_dbs->map); > - free(replication_dbs); > - replication_dbs = NULL; > + rdb->schema_version_higher = false; > } > > /* Return true if replication just started or is ongoing. > * Return false if the connection failed, or the replication > * was not able to start. */ > bool > -replication_is_alive(void) > +replication_is_alive(const struct ovsdb *db) > { > - if (session) { > - return jsonrpc_session_is_alive(session) && state != RPL_S_ERR; > + const struct replication_db *rdb = find_db(db->name); > + > + if (!rdb || !rdb->session) { > + return false; > } > - return false; > + return jsonrpc_session_is_alive(rdb->session) && rdb->state != RPL_S_ERR; > } > > /* Return the last error reported on a connection by 'session'. The > @@ -806,60 +766,60 @@ replication_is_alive(void) > * Return a negative value if replication session has error, or the > * replication was not able to start. */ > int > -replication_get_last_error(void) > +replication_get_last_error(const struct ovsdb *db) > { > + const struct replication_db *rdb = find_db(db->name); > int err = 0; > > - if (session) { > - err = jsonrpc_session_get_last_error(session); > + if (rdb && rdb->session) { > + err = jsonrpc_session_get_last_error(rdb->session); > if (!err) { > - err = (state == RPL_S_ERR) ? ENOENT : 0; > + err = (rdb->state == RPL_S_ERR) ? ENOENT : 0; > } > } > > return err; > } > > -char * > -replication_status(void) > +char * OVS_WARN_UNUSED_RESULT > +replication_status(const struct ovsdb *db) > { > - bool alive = session && jsonrpc_session_is_alive(session); > + const struct replication_db *rdb = find_db(db->name); > + > + if (!rdb) { > + return xasprintf("%s is not configured for replication", db->name); > + } > + > + bool alive = rdb->session && jsonrpc_session_is_alive(rdb->session); > struct ds ds = DS_EMPTY_INITIALIZER; > > + ds_put_format(&ds, "database: %s\n", db->name); > if (alive) { > - switch(state) { > + switch (rdb->state) { > case RPL_S_INIT: > case RPL_S_SERVER_ID_REQUESTED: > case RPL_S_DB_REQUESTED: > case RPL_S_SCHEMA_REQUESTED: > case RPL_S_MONITOR_REQUESTED: > - ds_put_format(&ds, "connecting: %s", sync_from); > + ds_put_format(&ds, "connecting: %s", rdb->sync_from); > break; > case RPL_S_REPLICATING: { > - struct shash_node *node; > - > - ds_put_format(&ds, "replicating: %s\n", sync_from); > - ds_put_cstr(&ds, "database:"); > - SHASH_FOR_EACH (node, replication_dbs) { > - ds_put_format(&ds, " %s,", node->name); > - } > - ds_chomp(&ds, ','); > + ds_put_format(&ds, "replicating: %s\n", rdb->sync_from); > > - if (!shash_is_empty(&excluded_tables)) { > - ds_put_char(&ds, '\n'); > + if (!sset_is_empty(&rdb->excluded_tables)) { > ds_put_cstr(&ds, "exclude: "); > - ds_put_and_free_cstr(&ds, get_excluded_tables()); > + ds_put_and_free_cstr(&ds, get_excluded_tables(db)); > } > break; > } > case RPL_S_ERR: > - ds_put_format(&ds, "Replication to (%s) failed\n", sync_from); > + ds_put_format(&ds, "Replication to (%s) failed", rdb->sync_from); > break; > default: > OVS_NOT_REACHED(); > } > } else { > - ds_put_format(&ds, "not connected to %s", sync_from); > + ds_put_format(&ds, "not connected to %s", rdb->sync_from); > } > return ds_steal_cstr(&ds); > } > @@ -913,10 +873,12 @@ is_replication_possible(struct ovsdb_schema > *local_db_schema, > } > > void > -replication_set_probe_interval(int probe_interval) > +replication_set_probe_interval(const struct ovsdb *db, int probe_interval) > { > - if (session) { > - jsonrpc_session_set_probe_interval(session, probe_interval); > + const struct replication_db *rdb = find_db(db->name); > + > + if (rdb && rdb->session) { > + jsonrpc_session_set_probe_interval(rdb->session, probe_interval); > } > } > > diff --git a/ovsdb/replication.h b/ovsdb/replication.h > index 6d1be820f..f5e226753 100644 > --- a/ovsdb/replication.h > +++ b/ovsdb/replication.h > @@ -26,41 +26,41 @@ struct ovsdb; > * API Usage > *=========== > * > - * - replication_init() needs to be called whenever OVSDB server switches > into > + * - replication_set_db() needs to be called whenever database switches into > * the backup mode. > * > - * - replication_add_local_db() should be called immediately after to add all > - * known database that OVSDB server owns, one at a time. > + * - replication_remove_db() needs to be called whenever backup database > + * switches into an active mode. > * > * - replication_destroy() should be called when OVSDB server shutdown to > * reclaim resources. > * > * - replication_run(), replication_wait(), replication_is_alive() and > * replication_get_last_error() should be call within the main loop > - * whenever OVSDB server runs in the backup mode. > + * whenever OVSDB has backup databases. > * > - * - set_excluded_tables(), get_excluded_tables(), disconnect_active_server() > - * and replication_usage() are support functions used mainly by unixctl > - * commands. > + * - parse_excluded_tables(), get_excluded_tables() and replication_usage() > + * are support functions used mainly by unixctl commands. > */ > > #define REPLICATION_DEFAULT_PROBE_INTERVAL 60000 > > -void replication_init(const char *sync_from, const char *exclude_tables, > - const struct uuid *server, int probe_interval); > +void replication_set_db(struct ovsdb *, const char *sync_from, > + const char *exclude_tables, const struct uuid > *server, > + int probe_interval); > +void replication_remove_db(const struct ovsdb *); > + > void replication_run(void); > void replication_wait(void); > void replication_destroy(void); > void replication_usage(void); > -void replication_add_local_db(const char *databse, struct ovsdb *db); > -bool replication_is_alive(void); > -int replication_get_last_error(void); > -char *replication_status(void); > -void replication_set_probe_interval(int); > +bool replication_is_alive(const struct ovsdb *); > +int replication_get_last_error(const struct ovsdb *); > +char *replication_status(const struct ovsdb *); > +void replication_set_probe_interval(const struct ovsdb *, int > probe_interval); > > -char *set_excluded_tables(const char *excluded, bool dryrun) > - OVS_WARN_UNUSED_RESULT; > -char *get_excluded_tables(void) OVS_WARN_UNUSED_RESULT; > -void disconnect_active_server(void); > +char *parse_excluded_tables(const char *excluded) OVS_WARN_UNUSED_RESULT; > +char *get_excluded_tables(const struct ovsdb *) OVS_WARN_UNUSED_RESULT; > +void disconnect_active_server(const struct ovsdb *); This should be removed too. Regards, Dumitru _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev