This commit transforms the 'changed' field in struct engine_node in a 'state' field. Possible node states are: - "Stale": data in the node is not up to date with the DB. - "Updated": data in the node is valid but was updated during the last run of the engine. - "Valid": data in the node is valid and didn't change during the last run of the engine. - "Aborted": during the last run, processing was aborted for this node.
This commit also further refactors the I-P engine: - instead of recursively performing all the engine processing a preprocessing stage is added (engine_get_nodes()) before the main processing loop is executed in order to topologically sort nodes in the engine such that all inputs of a given node appear in the sorted array before the node itself. This simplifies a bit the code in engine_run(). - remove the need for using an engine_run_id by using the newly added states. Signed-off-by: Dumitru Ceara <dce...@redhat.com> --- controller/ovn-controller.c | 88 ++++++++++------- lib/inc-proc-eng.c | 219 ++++++++++++++++++++++++++++++++----------- lib/inc-proc-eng.h | 75 +++++++++++---- 3 files changed, 271 insertions(+), 111 deletions(-) diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c index c56190f..033eff4 100644 --- a/controller/ovn-controller.c +++ b/controller/ovn-controller.c @@ -758,10 +758,10 @@ en_ofctrl_is_connected_run(struct engine_node *node) (struct ed_type_ofctrl_is_connected *)node->data; if (data->connected != ofctrl_is_connected()) { data->connected = !data->connected; - node->changed = true; + engine_set_node_state(node, EN_UPDATED); return; } - node->changed = false; + engine_set_node_state(node, EN_VALID); } struct ed_type_addr_sets { @@ -811,7 +811,7 @@ en_addr_sets_run(struct engine_node *node) addr_sets_init(as_table, &as->addr_sets); as->change_tracked = false; - node->changed = true; + engine_set_node_state(node, EN_UPDATED); } static bool @@ -830,11 +830,14 @@ addr_sets_sb_address_set_handler(struct engine_node *node) addr_sets_update(as_table, &as->addr_sets, &as->new, &as->deleted, &as->updated); - node->changed = !sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) - || !sset_is_empty(&as->updated); + if (!sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) || + !sset_is_empty(&as->updated)) { + engine_set_node_state(node, EN_UPDATED); + } else { + engine_set_node_state(node, EN_VALID); + } as->change_tracked = true; - node->changed = true; return true; } @@ -885,7 +888,7 @@ en_port_groups_run(struct engine_node *node) port_groups_init(pg_table, &pg->port_groups); pg->change_tracked = false; - node->changed = true; + engine_set_node_state(node, EN_UPDATED); } static bool @@ -904,11 +907,14 @@ port_groups_sb_port_group_handler(struct engine_node *node) port_groups_update(pg_table, &pg->port_groups, &pg->new, &pg->deleted, &pg->updated); - node->changed = !sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) - || !sset_is_empty(&pg->updated); + if (!sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) || + !sset_is_empty(&pg->updated)) { + engine_set_node_state(node, EN_UPDATED); + } else { + engine_set_node_state(node, EN_VALID); + } pg->change_tracked = true; - node->changed = true; return true; } @@ -1091,7 +1097,7 @@ en_runtime_data_run(struct engine_node *node) update_ct_zones(local_lports, local_datapaths, ct_zones, ct_zone_bitmap, pending_ct_zones); - node->changed = true; + engine_set_node_state(node, EN_UPDATED); } static bool @@ -1157,10 +1163,10 @@ en_mff_ovn_geneve_run(struct engine_node *node) enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id(); if (data->mff_ovn_geneve != mff_ovn_geneve) { data->mff_ovn_geneve = mff_ovn_geneve; - node->changed = true; + engine_set_node_state(node, EN_UPDATED); return; } - node->changed = false; + engine_set_node_state(node, EN_VALID); } struct ed_type_flow_output { @@ -1322,7 +1328,7 @@ en_flow_output_run(struct engine_node *node) active_tunnels, flow_table); - node->changed = true; + engine_set_node_state(node, EN_UPDATED); } static bool @@ -1404,7 +1410,7 @@ flow_output_sb_logical_flow_handler(struct engine_node *node) flow_table, group_table, meter_table, lfrr, conj_id_ofs); - node->changed = true; + engine_set_node_state(node, EN_UPDATED); return handled; } @@ -1427,7 +1433,7 @@ flow_output_sb_mac_binding_handler(struct engine_node *node) lflow_handle_changed_neighbors(sbrec_port_binding_by_name, mac_binding_table, flow_table); - node->changed = true; + engine_set_node_state(node, EN_UPDATED); return true; } @@ -1531,7 +1537,7 @@ flow_output_sb_port_binding_handler(struct engine_node *node) chassis, ct_zones, local_datapaths, active_tunnels, flow_table); - node->changed = true; + engine_set_node_state(node, EN_UPDATED); return true; } @@ -1580,7 +1586,7 @@ flow_output_sb_multicast_group_handler(struct engine_node *node) mff_ovn_geneve, chassis, ct_zones, local_datapaths, flow_table); - node->changed = true; + engine_set_node_state(node, EN_UPDATED); return true; } @@ -1694,7 +1700,9 @@ _flow_output_resource_ref_handler(struct engine_node *node, conj_id_ofs, &changed)) { return false; } - node->changed = changed || node->changed; + if (changed) { + engine_set_node_state(node, EN_UPDATED); + } } SSET_FOR_EACH (ref_name, updated) { if (!lflow_handle_changed_ref(ref_type, ref_name, @@ -1707,7 +1715,9 @@ _flow_output_resource_ref_handler(struct engine_node *node, conj_id_ofs, &changed)) { return false; } - node->changed = changed || node->changed; + if (changed) { + engine_set_node_state(node, EN_UPDATED); + } } SSET_FOR_EACH (ref_name, new) { if (!lflow_handle_changed_ref(ref_type, ref_name, @@ -1720,7 +1730,9 @@ _flow_output_resource_ref_handler(struct engine_node *node, conj_id_ofs, &changed)) { return false; } - node->changed = changed || node->changed; + if (changed) { + engine_set_node_state(node, EN_UPDATED); + } } return true; @@ -1922,7 +1934,11 @@ main(int argc, char *argv[]) engine_add_input(&en_runtime_data, &en_sb_port_binding, runtime_data_sb_port_binding_handler); - engine_init(&en_flow_output); + /* Get the sorted engine nodes to be used for every engine run. */ + size_t en_count = 0; + struct engine_node **en_nodes = engine_get_nodes(&en_flow_output, + &en_count); + engine_init(en_nodes, en_count); ofctrl_init(&ed_flow_output.group_table, &ed_flow_output.meter_table, @@ -1941,9 +1957,6 @@ main(int argc, char *argv[]) unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1, inject_pkt, &pending_pkt); - uint64_t engine_run_id = 0; - bool engine_run_done = true; - unsigned int ovs_cond_seqno = UINT_MAX; unsigned int ovnsb_cond_seqno = UINT_MAX; @@ -1951,7 +1964,7 @@ main(int argc, char *argv[]) exiting = false; restart = false; while (!exiting) { - engine_run_id++; + engine_init_run(en_nodes, en_count, &en_flow_output); update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl); update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl)); @@ -2044,15 +2057,13 @@ main(int argc, char *argv[]) * this round of engine_run and continue processing * acculated changes incrementally later when * ofctrl_can_put() returns true. */ - if (engine_run_done) { + if (!engine_aborted(&en_flow_output)) { engine_set_abort_recompute(true); - engine_run_done = engine_run(&en_flow_output, - engine_run_id); + engine_run(en_nodes, en_count); } } else { engine_set_abort_recompute(false); - engine_run_done = true; - engine_run(&en_flow_output, engine_run_id); + engine_run(en_nodes, en_count); } } stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME, @@ -2071,7 +2082,7 @@ main(int argc, char *argv[]) sbrec_meter_table_get(ovnsb_idl_loop.idl), get_nb_cfg(sbrec_sb_global_table_get( ovnsb_idl_loop.idl)), - en_flow_output.changed); + engine_node_changed(&en_flow_output)); pinctrl_run(ovnsb_idl_txn, sbrec_datapath_binding_by_key, sbrec_port_binding_by_datapath, @@ -2089,7 +2100,7 @@ main(int argc, char *argv[]) &ed_runtime_data.local_datapaths, &ed_runtime_data.active_tunnels); - if (en_runtime_data.changed) { + if (engine_node_changed(&en_runtime_data)) { update_sb_monitors(ovnsb_idl_loop.idl, chassis, &ed_runtime_data.local_lports, &ed_runtime_data.local_datapaths); @@ -2097,17 +2108,17 @@ main(int argc, char *argv[]) } } - if (engine_need_run(&en_flow_output, engine_run_id)) { + if (engine_need_run(en_nodes, en_count, &en_flow_output)) { VLOG_DBG("engine did not run, force recompute next time: " "br_int %p, chassis %p", br_int, chassis); engine_set_force_recompute(true); poll_immediate_wake(); - } else if (!engine_run_done) { + } else if (engine_aborted(&en_flow_output)) { VLOG_DBG("engine was aborted, force recompute next time: " "br_int %p, chassis %p", br_int, chassis); engine_set_force_recompute(true); poll_immediate_wake(); - } else if (!engine_has_run(&en_flow_output, engine_run_id)) { + } else if (!engine_has_run(&en_flow_output)) { VLOG_DBG("engine did not run, and it was not needed" " either: br_int %p, chassis %p", br_int, chassis); @@ -2135,8 +2146,7 @@ main(int argc, char *argv[]) } } else { VLOG_DBG("Pending_pkt conn but br_int %p or chassis " - "%p not ready. run-id: %"PRIu64, br_int, - chassis, engine_run_id); + "%p not ready.", br_int, chassis); unixctl_command_reply_error(pending_pkt.conn, "ovn-controller not ready."); } @@ -2185,7 +2195,7 @@ main(int argc, char *argv[]) } engine_set_context(NULL); - engine_cleanup(&en_flow_output); + engine_cleanup(en_nodes, en_count); /* It's time to exit. Clean up the databases if we are not restarting */ if (!restart) { diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c index 8a085e2..ee6afbe 100644 --- a/lib/inc-proc-eng.c +++ b/lib/inc-proc-eng.c @@ -34,6 +34,13 @@ static bool engine_force_recompute = false; static bool engine_abort_recompute = false; static const struct engine_context *engine_context; +static const char *engine_node_state_name[EN_STATE_MAX] = { + [EN_STALE] = "Stale", + [EN_UPDATED] = "Updated", + [EN_VALID] = "Valid", + [EN_ABORTED] = "Aborted", +}; + void engine_set_force_recompute(bool val) { @@ -58,26 +65,62 @@ engine_set_context(const struct engine_context *ctx) engine_context = ctx; } -void -engine_init(struct engine_node *node) +/* Builds the topologically sorted 'sorted_nodes' array starting from + * 'node'. + */ +static struct engine_node ** +engine_topo_sort(struct engine_node *node, struct engine_node **sorted_nodes, + size_t *n_count, size_t *n_size) { + /* It's not so efficient to walk the array of already sorted nodes but + * we know that sorting is done only once at startup so it's ok for now. + */ + for (size_t i = 0; i < *n_count; i++) { + if (sorted_nodes[i] == node) { + return sorted_nodes; + } + } + for (size_t i = 0; i < node->n_inputs; i++) { - engine_init(node->inputs[i].node); + sorted_nodes = engine_topo_sort(node->inputs[i].node, sorted_nodes, + n_count, n_size); } - if (node->init) { - node->init(node); + if (*n_count == *n_size) { + sorted_nodes = x2nrealloc(sorted_nodes, n_size, sizeof *sorted_nodes); } + sorted_nodes[(*n_count)] = node; + (*n_count)++; + return sorted_nodes; +} + +struct engine_node ** +engine_get_nodes(struct engine_node *root_node, size_t *n_count) +{ + size_t n_size = 0; + + *n_count = 0; + return engine_topo_sort(root_node, NULL, n_count, &n_size); } void -engine_cleanup(struct engine_node *node) +engine_init(struct engine_node **nodes, size_t n_count) { - for (size_t i = 0; i < node->n_inputs; i++) { - engine_cleanup(node->inputs[i].node); + for (size_t i = 0; i < n_count; i++) { + if (nodes[i]->init) { + nodes[i]->init(nodes[i]); + } } - if (node->cleanup) { - node->cleanup(node); +} + +void +engine_cleanup(struct engine_node **nodes, size_t n_count) +{ + for (size_t i = 0; i < n_count; i++) { + if (nodes[i]->cleanup) { + nodes[i]->cleanup(nodes[i]); + } } + free(nodes); } struct engine_node * @@ -128,16 +171,66 @@ engine_ovsdb_node_add_index(struct engine_node *node, const char *name, ed->n_indexes ++; } +void +engine_set_node_state_at(struct engine_node *node, + enum engine_node_state state, + const char *where) +{ + if (node->state == state) { + return; + } + + VLOG_DBG("%s: node: %s, old_state %s, new_state %s", + where, node->name, + engine_node_state_name[node->state], + engine_node_state_name[state]); + + node->state = state; +} + +static bool +engine_node_valid(struct engine_node *node) +{ + return (node->state == EN_UPDATED || node->state == EN_VALID); +} + bool -engine_has_run(struct engine_node *node, uint64_t run_id) +engine_node_changed(struct engine_node *node) { - return node->run_id == run_id; + return node->state == EN_UPDATED; +} + +bool +engine_has_run(struct engine_node *node) +{ + return node->state != EN_STALE; +} + +bool +engine_aborted(struct engine_node *node) +{ + return node->state == EN_ABORTED; +} + +void +engine_init_run(struct engine_node **nodes, size_t n_count, + struct engine_node *root_node) +{ + /* No need to reinitialize if last run didn't happen. */ + if (!engine_has_run(root_node)) { + return; + } + + VLOG_DBG("Initializing new run"); + for (size_t i = 0; i < n_count; i++) { + engine_set_node_state(nodes[i], EN_STALE); + } } /* Do a full recompute (or at least try). If we're not allowed then * mark the node as "aborted". */ -static bool +static void engine_recompute(struct engine_node *node, bool forced, bool allowed) { VLOG_DBG("node: %s, recompute (%s)", node->name, @@ -145,12 +238,12 @@ engine_recompute(struct engine_node *node, bool forced, bool allowed) if (!allowed) { VLOG_DBG("node: %s, recompute aborted", node->name); - return false; + engine_set_node_state(node, EN_ABORTED); + return; } + /* Run the node handler which might change state. */ node->run(node); - VLOG_DBG("node: %s, changed: %d", node->name, node->changed); - return true; } /* Return true if the node could be computed without triggerring a full @@ -161,7 +254,7 @@ engine_compute(struct engine_node *node, bool recompute_allowed) { for (size_t i = 0; i < node->n_inputs; i++) { /* If the input node data changed call its change handler. */ - if (node->inputs[i].node->changed) { + if (node->inputs[i].node->state == EN_UPDATED) { VLOG_DBG("node: %s, handle change for input %s", node->name, node->inputs[i].node->name); @@ -172,57 +265,61 @@ engine_compute(struct engine_node *node, bool recompute_allowed) VLOG_DBG("node: %s, can't handle change for input %s, " "fall back to recompute", node->name, node->inputs[i].node->name); - if (!engine_recompute(node, false, recompute_allowed)) { + engine_recompute(node, false, recompute_allowed); + if (engine_aborted(node)) { return false; } } } } - return true; } -bool engine_run(struct engine_node *node, uint64_t run_id) +static void +engine_run_node(struct engine_node *node) { - if (node->run_id == run_id) { - /* The node was already updated in this run (could be input for - * multiple other nodes). Stop processing. - */ - return true; - } - - /* Initialize the node for this run. */ - node->run_id = run_id; - node->changed = false; - if (!node->n_inputs) { + /* Run the node handler which might change state. */ node->run(node); - VLOG_DBG("node: %s, changed: %d", node->name, node->changed); - return true; + return; } + bool input_stale = false; for (size_t i = 0; i < node->n_inputs; i++) { - if (!engine_run(node->inputs[i].node, run_id)) { - return false; + if (!engine_node_valid(node->inputs[i].node)) { + /* If the input node aborted computation, move to EN_ABORTED. + * This will be propagated to following nodes. + */ + if (engine_aborted(node->inputs[i].node)) { + engine_set_node_state(node, EN_ABORTED); + } + + input_stale = true; } } - bool need_compute = false; + /* If at least one input is stale, don't change state. */ + if (input_stale) { + return; + } if (engine_force_recompute) { - return engine_recompute(node, true, !engine_abort_recompute); + engine_recompute(node, true, !engine_abort_recompute); + return; } /* If any of the inputs updated data but there is no change_handler, then * recompute the current node too. */ + bool need_compute = false; for (size_t i = 0; i < node->n_inputs; i++) { - if (node->inputs[i].node->changed) { + if (node->inputs[i].node->state == EN_UPDATED) { need_compute = true; /* Trigger a recompute if we don't have a change handler. */ if (!node->inputs[i].change_handler) { - return engine_recompute(node, false, !engine_abort_recompute); + engine_recompute(node, false, !engine_abort_recompute); + return; } } } @@ -231,33 +328,47 @@ bool engine_run(struct engine_node *node, uint64_t run_id) /* If we couldn't compute the node we either aborted or triggered * a full recompute. In any case, stop processing. */ - return engine_compute(node, !engine_abort_recompute); + if (!engine_compute(node, !engine_abort_recompute)) { + return; + } } - VLOG_DBG("node: %s, changed: %d", node->name, node->changed); - return true; + /* If we reached this point, either the node was updated or its state is + * still valid. + */ + if (!engine_node_changed(node)) { + engine_set_node_state(node, EN_VALID); + } } -bool -engine_need_run(struct engine_node *node, uint64_t run_id) +void +engine_run(struct engine_node **nodes, size_t n_count) { - size_t i; + for (size_t i = 0; i < n_count; i++) { + engine_run_node(nodes[i]); + } +} - if (node->run_id == run_id) { +bool +engine_need_run(struct engine_node **nodes, size_t n_count, + struct engine_node *root_node) +{ + if (engine_has_run(root_node)) { return false; } - if (!node->n_inputs) { - node->run(node); - VLOG_DBG("input node: %s, changed: %d", node->name, node->changed); - return node->changed; - } + for (size_t i = 0; i < n_count; i++) { + /* Check only leaf nodes. */ + if (nodes[i]->n_inputs) { + continue; + } - for (i = 0; i < node->n_inputs; i++) { - if (engine_need_run(node->inputs[i].node, run_id)) { + nodes[i]->run(nodes[i]); + VLOG_DBG("input node: %s, state: %s", nodes[i]->name, + engine_node_state_name[nodes[i]->state]); + if (nodes[i]->state == EN_UPDATED) { return true; } } - return false; } diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h index abd41b2..69eb9b6 100644 --- a/lib/inc-proc-eng.h +++ b/lib/inc-proc-eng.h @@ -82,10 +82,21 @@ struct engine_node_input { bool (*change_handler)(struct engine_node *node); }; -struct engine_node { - /* A unique id to distinguish each iteration of the engine_run(). */ - uint64_t run_id; +enum engine_node_state { + EN_STALE, /* Data in the node is not up to date with the DB. */ + EN_UPDATED, /* Data in the node is valid but was updated during the + * last run. + */ + EN_VALID, /* Data in the node is valid and didn't change during the + * last run. + */ + EN_ABORTED, /* During the last run, processing was aborted for + * this node. + */ + EN_STATE_MAX, +}; +struct engine_node { /* A unique name for each node. */ char *name; @@ -102,8 +113,8 @@ struct engine_node { * node. */ void *data; - /* Whether the data changed in the last engine run. */ - bool changed; + /* State of the node after the last engine run. */ + enum engine_node_state state; /* Method to initialize data. It may be NULL. */ void (*init)(struct engine_node *); @@ -116,23 +127,36 @@ struct engine_node { void (*run)(struct engine_node *); }; -/* Initialize the data for the engine nodes recursively. It calls each node's +/* Return the array of topologically sorted nodes when starting from + * 'root_node'. Stores the number of nodes in 'n_count'. + * It should be called before the main loop. + */ +struct engine_node **engine_get_nodes(struct engine_node *root_node, + size_t *n_count); + +/* Initialize the data for the engine nodes. It calls each node's * init() method if not NULL. It should be called before the main loop. */ -void engine_init(struct engine_node *); +void engine_init(struct engine_node **nodes, size_t n_count); + +/* Initialize the engine nodes for a new run. It should be called in the + * main processing loop before every potential engine_run(). + */ +void engine_init_run(struct engine_node **nodes, size_t n_count, + struct engine_node *root_node); /* Execute the processing recursively, which should be called in the main - * loop. Returns true if the execution is compelte, false if it is aborted, - * which could happen when engine_abort_recompute is set. */ -bool engine_run(struct engine_node *, uint64_t run_id); + * loop. Updates the engine node's states accordingly. + */ +void engine_run(struct engine_node **nodes, size_t n_count); -/* Clean up the data for the engine nodes recursively. It calls each node's +/* Clean up the data for the engine nodes. It calls each node's * cleanup() method if not NULL. It should be called before the program * terminates. */ -void engine_cleanup(struct engine_node *); +void engine_cleanup(struct engine_node **nodes, size_t n_count); /* Check if engine needs to run but didn't. */ -bool -engine_need_run(struct engine_node *, uint64_t run_id); +bool engine_need_run(struct engine_node **nodes, size_t n_count, + struct engine_node *root_node); /* Get the input node with <name> for <node> */ struct engine_node * engine_get_input(const char *input_name, @@ -159,8 +183,22 @@ const struct engine_context * engine_get_context(void); void engine_set_context(const struct engine_context *); -/* Return true if the engine has run for 'node' in the 'run_id' iteration. */ -bool engine_has_run(struct engine_node *node, uint64_t run_id); +void engine_set_node_state_at(struct engine_node *node, + enum engine_node_state state, + const char *where); + +/* Return true if during the last iteration the node's data was updated. */ +bool engine_node_changed(struct engine_node *node); + +/* Return true if the engine has run for 'node' in the last iteration. */ +bool engine_has_run(struct engine_node *node); + +/* Returns true if during the last engine run we had to abort processing. */ +bool engine_aborted(struct engine_node *node); + +/* Set the state of the node and log changes. */ +#define engine_set_node_state(node, state) \ + engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR) struct ed_ovsdb_index { const char *name; @@ -187,6 +225,7 @@ void engine_ovsdb_node_add_index(struct engine_node *, const char *name, struct engine_node en_##NAME = { \ .name = NAME_STR, \ .data = &ed_##NAME, \ + .state = EN_STALE, \ .init = en_##NAME##_init, \ .run = en_##NAME##_run, \ .cleanup = en_##NAME##_cleanup, \ @@ -201,10 +240,10 @@ en_##DB_NAME##_##TBL_NAME##_run(struct engine_node *node) \ const struct DB_NAME##rec_##TBL_NAME##_table *table = \ EN_OVSDB_GET(node); \ if (DB_NAME##rec_##TBL_NAME##_table_track_get_first(table)) { \ - node->changed = true; \ + engine_set_node_state(node, EN_UPDATED); \ return; \ } \ - node->changed = false; \ + engine_set_node_state(node, EN_VALID); \ } \ static void (*en_##DB_NAME##_##TBL_NAME##_init)(struct engine_node *node) \ = NULL; \ _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev