Thanks Dumitru. Please see my comments inline below.

On Mon, Nov 18, 2019 at 6:07 AM Dumitru Ceara <dce...@redhat.com> wrote:
>
> This commit transforms the 'changed' field in struct engine_node in a
> 'state' field. Possible node states are:
> - "Stale": data in the node is not up to date with the DB.
> - "Updated": data in the node is valid but was updated during
>   the last run of the engine.
> - "Valid": data in the node is valid and didn't change during
>   the last run of the engine.
> - "Aborted": during the last run, processing was aborted for
>   this node.
>
> This commit also further refactors the I-P engine:
> - instead of recursively performing all the engine processing a
>   preprocessing stage is added (engine_get_nodes()) before the main
processing
>   loop is executed in order to topologically sort nodes in the engine such
>   that all inputs of a given node appear in the sorted array before the
node
>   itself. This simplifies a bit the code in engine_run().

Could you tell the reason of changing it to non-recursive? It seems adding
more code rather than simplifying, and effort is needed to ensure the
correctness for the new code. Probably there are some benefit that make the
later patches easier, but it is not obvious to me. Could you help point out
if that's the case?

> - remove the need for using an engine_run_id by using the newly added
states.
>
> Signed-off-by: Dumitru Ceara <dce...@redhat.com>
> ---
>  controller/ovn-controller.c |   88 ++++++++++-------
>  lib/inc-proc-eng.c          |  219
++++++++++++++++++++++++++++++++-----------
>  lib/inc-proc-eng.h          |   75 +++++++++++----
>  3 files changed, 271 insertions(+), 111 deletions(-)
>
> diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
> index c56190f..033eff4 100644
> --- a/controller/ovn-controller.c
> +++ b/controller/ovn-controller.c
> @@ -758,10 +758,10 @@ en_ofctrl_is_connected_run(struct engine_node *node)
>          (struct ed_type_ofctrl_is_connected *)node->data;
>      if (data->connected != ofctrl_is_connected()) {
>          data->connected = !data->connected;
> -        node->changed = true;
> +        engine_set_node_state(node, EN_UPDATED);
>          return;
>      }
> -    node->changed = false;
> +    engine_set_node_state(node, EN_VALID);
>  }
>
>  struct ed_type_addr_sets {
> @@ -811,7 +811,7 @@ en_addr_sets_run(struct engine_node *node)
>      addr_sets_init(as_table, &as->addr_sets);
>
>      as->change_tracked = false;
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>  }
>
>  static bool
> @@ -830,11 +830,14 @@ addr_sets_sb_address_set_handler(struct engine_node
*node)
>      addr_sets_update(as_table, &as->addr_sets, &as->new,
>                       &as->deleted, &as->updated);
>
> -    node->changed = !sset_is_empty(&as->new) ||
!sset_is_empty(&as->deleted)
> -                    || !sset_is_empty(&as->updated);
> +    if (!sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) ||
> +            !sset_is_empty(&as->updated)) {
> +        engine_set_node_state(node, EN_UPDATED);
> +    } else {
> +        engine_set_node_state(node, EN_VALID);
> +    }
>
>      as->change_tracked = true;
> -    node->changed = true;
>      return true;
>  }
>
> @@ -885,7 +888,7 @@ en_port_groups_run(struct engine_node *node)
>      port_groups_init(pg_table, &pg->port_groups);
>
>      pg->change_tracked = false;
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>  }
>
>  static bool
> @@ -904,11 +907,14 @@ port_groups_sb_port_group_handler(struct
engine_node *node)
>      port_groups_update(pg_table, &pg->port_groups, &pg->new,
>                       &pg->deleted, &pg->updated);
>
> -    node->changed = !sset_is_empty(&pg->new) ||
!sset_is_empty(&pg->deleted)
> -                    || !sset_is_empty(&pg->updated);
> +    if (!sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) ||
> +            !sset_is_empty(&pg->updated)) {
> +        engine_set_node_state(node, EN_UPDATED);
> +    } else {
> +        engine_set_node_state(node, EN_VALID);
> +    }
>
>      pg->change_tracked = true;
> -    node->changed = true;
>      return true;
>  }
>
> @@ -1091,7 +1097,7 @@ en_runtime_data_run(struct engine_node *node)
>      update_ct_zones(local_lports, local_datapaths, ct_zones,
>                      ct_zone_bitmap, pending_ct_zones);
>
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>  }
>
>  static bool
> @@ -1157,10 +1163,10 @@ en_mff_ovn_geneve_run(struct engine_node *node)
>      enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id();
>      if (data->mff_ovn_geneve != mff_ovn_geneve) {
>          data->mff_ovn_geneve = mff_ovn_geneve;
> -        node->changed = true;
> +        engine_set_node_state(node, EN_UPDATED);
>          return;
>      }
> -    node->changed = false;
> +    engine_set_node_state(node, EN_VALID);
>  }
>
>  struct ed_type_flow_output {
> @@ -1322,7 +1328,7 @@ en_flow_output_run(struct engine_node *node)
>                   active_tunnels,
>                   flow_table);
>
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>  }
>
>  static bool
> @@ -1404,7 +1410,7 @@ flow_output_sb_logical_flow_handler(struct
engine_node *node)
>                flow_table, group_table, meter_table, lfrr,
>                conj_id_ofs);
>
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>      return handled;
>  }
>
> @@ -1427,7 +1433,7 @@ flow_output_sb_mac_binding_handler(struct
engine_node *node)
>      lflow_handle_changed_neighbors(sbrec_port_binding_by_name,
>              mac_binding_table, flow_table);
>
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>      return true;
>  }
>
> @@ -1531,7 +1537,7 @@ flow_output_sb_port_binding_handler(struct
engine_node *node)
>              chassis, ct_zones, local_datapaths,
>              active_tunnels, flow_table);
>
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>      return true;
>  }
>
> @@ -1580,7 +1586,7 @@ flow_output_sb_multicast_group_handler(struct
engine_node *node)
>              mff_ovn_geneve, chassis, ct_zones, local_datapaths,
>              flow_table);
>
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>      return true;
>
>  }
> @@ -1694,7 +1700,9 @@ _flow_output_resource_ref_handler(struct
engine_node *node,
>                      conj_id_ofs, &changed)) {
>              return false;
>          }
> -        node->changed = changed || node->changed;
> +        if (changed) {
> +            engine_set_node_state(node, EN_UPDATED);
> +        }
>      }
>      SSET_FOR_EACH (ref_name, updated) {
>          if (!lflow_handle_changed_ref(ref_type, ref_name,
> @@ -1707,7 +1715,9 @@ _flow_output_resource_ref_handler(struct
engine_node *node,
>                      conj_id_ofs, &changed)) {
>              return false;
>          }
> -        node->changed = changed || node->changed;
> +        if (changed) {
> +            engine_set_node_state(node, EN_UPDATED);
> +        }
>      }
>      SSET_FOR_EACH (ref_name, new) {
>          if (!lflow_handle_changed_ref(ref_type, ref_name,
> @@ -1720,7 +1730,9 @@ _flow_output_resource_ref_handler(struct
engine_node *node,
>                      conj_id_ofs, &changed)) {
>              return false;
>          }
> -        node->changed = changed || node->changed;
> +        if (changed) {
> +            engine_set_node_state(node, EN_UPDATED);
> +        }
>      }
>
>      return true;
> @@ -1922,7 +1934,11 @@ main(int argc, char *argv[])
>      engine_add_input(&en_runtime_data, &en_sb_port_binding,
>                       runtime_data_sb_port_binding_handler);
>
> -    engine_init(&en_flow_output);
> +    /* Get the sorted engine nodes to be used for every engine run. */
> +    size_t en_count = 0;
> +    struct engine_node **en_nodes = engine_get_nodes(&en_flow_output,
> +                                                     &en_count);
> +    engine_init(en_nodes, en_count);

I think engine_get_nodes() and engine_init() can be combined. We only need
to expose engine_init() interface, which can call engine_get_nodes()
internally and store n_count and nodes internally in engine module.

In addition, there can be more than 1 root node in the DAG. Originally we
can just call engine_run(root_node) for each root node. Now with the
non-recursive, I think we can handle this by:
  - in engine_get_nodes, we don't need to pass the root_node, but instead,
the engine can just process all engine nodes.
  - remove the root_node parameter from all interfaces.

>
>      ofctrl_init(&ed_flow_output.group_table,
>                  &ed_flow_output.meter_table,
> @@ -1941,9 +1957,6 @@ main(int argc, char *argv[])
>      unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1, inject_pkt,
>                               &pending_pkt);
>
> -    uint64_t engine_run_id = 0;
> -    bool engine_run_done = true;
> -
>      unsigned int ovs_cond_seqno = UINT_MAX;
>      unsigned int ovnsb_cond_seqno = UINT_MAX;
>
> @@ -1951,7 +1964,7 @@ main(int argc, char *argv[])
>      exiting = false;
>      restart = false;
>      while (!exiting) {
> -        engine_run_id++;
> +        engine_init_run(en_nodes, en_count, &en_flow_output);
>
>          update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl);
>          update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl));
> @@ -2044,15 +2057,13 @@ main(int argc, char *argv[])
>                               * this round of engine_run and continue
processing
>                               * acculated changes incrementally later when
>                               * ofctrl_can_put() returns true. */
> -                            if (engine_run_done) {
> +                            if (!engine_aborted(&en_flow_output)) {
>                                  engine_set_abort_recompute(true);
> -                                engine_run_done =
engine_run(&en_flow_output,
> -
engine_run_id);
> +                                engine_run(en_nodes, en_count);
>                              }
>                          } else {
>                              engine_set_abort_recompute(false);
> -                            engine_run_done = true;
> -                            engine_run(&en_flow_output, engine_run_id);
> +                            engine_run(en_nodes, en_count);
>                          }
>                      }
>                      stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
> @@ -2071,7 +2082,7 @@ main(int argc, char *argv[])
>                                 sbrec_meter_table_get(ovnsb_idl_loop.idl),
>                                 get_nb_cfg(sbrec_sb_global_table_get(
>                                                ovnsb_idl_loop.idl)),
> -                               en_flow_output.changed);
> +                               engine_node_changed(&en_flow_output));
>                      pinctrl_run(ovnsb_idl_txn,
>                                  sbrec_datapath_binding_by_key,
>                                  sbrec_port_binding_by_datapath,
> @@ -2089,7 +2100,7 @@ main(int argc, char *argv[])
>                                  &ed_runtime_data.local_datapaths,
>                                  &ed_runtime_data.active_tunnels);
>
> -                    if (en_runtime_data.changed) {
> +                    if (engine_node_changed(&en_runtime_data)) {
>                          update_sb_monitors(ovnsb_idl_loop.idl, chassis,
>                                             &ed_runtime_data.local_lports,
>
&ed_runtime_data.local_datapaths);
> @@ -2097,17 +2108,17 @@ main(int argc, char *argv[])
>                  }
>
>              }
> -            if (engine_need_run(&en_flow_output, engine_run_id)) {
> +            if (engine_need_run(en_nodes, en_count, &en_flow_output)) {
>                  VLOG_DBG("engine did not run, force recompute next time:
"
>                              "br_int %p, chassis %p", br_int, chassis);
>                  engine_set_force_recompute(true);
>                  poll_immediate_wake();
> -            } else if (!engine_run_done) {
> +            } else if (engine_aborted(&en_flow_output)) {
>                  VLOG_DBG("engine was aborted, force recompute next time:
"
>                           "br_int %p, chassis %p", br_int, chassis);
>                  engine_set_force_recompute(true);
>                  poll_immediate_wake();
> -            } else if (!engine_has_run(&en_flow_output, engine_run_id)) {
> +            } else if (!engine_has_run(&en_flow_output)) {
>                  VLOG_DBG("engine did not run, and it was not needed"
>                           " either: br_int %p, chassis %p",
>                           br_int, chassis);
> @@ -2135,8 +2146,7 @@ main(int argc, char *argv[])
>                      }
>                  } else {
>                      VLOG_DBG("Pending_pkt conn but br_int %p or chassis "
> -                             "%p not ready. run-id: %"PRIu64, br_int,
> -                             chassis, engine_run_id);
> +                             "%p not ready.", br_int, chassis);
>                      unixctl_command_reply_error(pending_pkt.conn,
>                          "ovn-controller not ready.");
>                  }
> @@ -2185,7 +2195,7 @@ main(int argc, char *argv[])
>      }
>
>      engine_set_context(NULL);
> -    engine_cleanup(&en_flow_output);
> +    engine_cleanup(en_nodes, en_count);
>
>      /* It's time to exit.  Clean up the databases if we are not
restarting */
>      if (!restart) {
> diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
> index 8a085e2..ee6afbe 100644
> --- a/lib/inc-proc-eng.c
> +++ b/lib/inc-proc-eng.c
> @@ -34,6 +34,13 @@ static bool engine_force_recompute = false;
>  static bool engine_abort_recompute = false;
>  static const struct engine_context *engine_context;
>
> +static const char *engine_node_state_name[EN_STATE_MAX] = {
> +    [EN_STALE]   = "Stale",
> +    [EN_UPDATED] = "Updated",
> +    [EN_VALID]   = "Valid",
> +    [EN_ABORTED] = "Aborted",
> +};
> +
>  void
>  engine_set_force_recompute(bool val)
>  {
> @@ -58,26 +65,62 @@ engine_set_context(const struct engine_context *ctx)
>      engine_context = ctx;
>  }
>
> -void
> -engine_init(struct engine_node *node)
> +/* Builds the topologically sorted 'sorted_nodes' array starting from
> + * 'node'.
> + */
> +static struct engine_node **
> +engine_topo_sort(struct engine_node *node, struct engine_node
**sorted_nodes,
> +                 size_t *n_count, size_t *n_size)
>  {
> +    /* It's not so efficient to walk the array of already sorted nodes
but
> +     * we know that sorting is done only once at startup so it's ok for
now.
> +     */
> +    for (size_t i = 0; i < *n_count; i++) {
> +        if (sorted_nodes[i] == node) {
> +            return sorted_nodes;
> +        }
> +    }
> +
>      for (size_t i = 0; i < node->n_inputs; i++) {
> -        engine_init(node->inputs[i].node);
> +        sorted_nodes = engine_topo_sort(node->inputs[i].node,
sorted_nodes,
> +                                        n_count, n_size);
>      }
> -    if (node->init) {
> -        node->init(node);
> +    if (*n_count == *n_size) {
> +        sorted_nodes = x2nrealloc(sorted_nodes, n_size, sizeof
*sorted_nodes);
>      }
> +    sorted_nodes[(*n_count)] = node;
> +    (*n_count)++;
> +    return sorted_nodes;
> +}
> +
> +struct engine_node **
> +engine_get_nodes(struct engine_node *root_node, size_t *n_count)
> +{
> +    size_t n_size = 0;
> +
> +    *n_count = 0;
> +    return engine_topo_sort(root_node, NULL, n_count, &n_size);
>  }
>
>  void
> -engine_cleanup(struct engine_node *node)
> +engine_init(struct engine_node **nodes, size_t n_count)
>  {
> -    for (size_t i = 0; i < node->n_inputs; i++) {
> -        engine_cleanup(node->inputs[i].node);
> +    for (size_t i = 0; i < n_count; i++) {
> +        if (nodes[i]->init) {
> +            nodes[i]->init(nodes[i]);
> +        }
>      }
> -    if (node->cleanup) {
> -        node->cleanup(node);
> +}
> +
> +void
> +engine_cleanup(struct engine_node **nodes, size_t n_count)
> +{
> +    for (size_t i = 0; i < n_count; i++) {
> +        if (nodes[i]->cleanup) {
> +            nodes[i]->cleanup(nodes[i]);
> +        }
>      }
> +    free(nodes);
>  }
>
>  struct engine_node *
> @@ -128,16 +171,66 @@ engine_ovsdb_node_add_index(struct engine_node
*node, const char *name,
>      ed->n_indexes ++;
>  }
>
> +void
> +engine_set_node_state_at(struct engine_node *node,
> +                         enum engine_node_state state,
> +                         const char *where)
> +{
> +    if (node->state == state) {
> +        return;
> +    }
> +
> +    VLOG_DBG("%s: node: %s, old_state %s, new_state %s",
> +             where, node->name,
> +             engine_node_state_name[node->state],
> +             engine_node_state_name[state]);
> +
> +    node->state = state;
> +}
> +
> +static bool
> +engine_node_valid(struct engine_node *node)
> +{
> +    return (node->state == EN_UPDATED || node->state == EN_VALID);
> +}
> +
>  bool
> -engine_has_run(struct engine_node *node, uint64_t run_id)
> +engine_node_changed(struct engine_node *node)
>  {
> -    return node->run_id == run_id;
> +    return node->state == EN_UPDATED;
> +}
> +
> +bool
> +engine_has_run(struct engine_node *node)
> +{

engine_has_run() should go through all nodes. If any node is NOT STALE,
return true. Engine hasn't run only if all nodes are STALE. (orginially it
is easier to tell by utilizing engine_run_id)
If some nodes are STALE, some are not, it means engine has run but aborted.

> +    return node->state != EN_STALE;
> +}
> +
> +bool
> +engine_aborted(struct engine_node *node)
> +{
> +    return node->state == EN_ABORTED;
> +}
> +
> +void
> +engine_init_run(struct engine_node **nodes, size_t n_count,
> +                struct engine_node *root_node)
> +{
> +    /* No need to reinitialize if last run didn't happen. */
> +    if (!engine_has_run(root_node)) {

I think here is a problem. If in the last round, the root node didn't run
because it is aborted at some intermediate node, but many other nodes could
have run and state already changed to VALID/UPDATED. Now if we don't do the
init, those nodes may contain invalid data since it is a new round of
iteration, but the state telling they are valid.

We don't need to pass "root_node" for engine_init_run. We can just reset
all nodes to STALE state.


> +        return;
> +    }
> +
> +    VLOG_DBG("Initializing new run");
> +    for (size_t i = 0; i < n_count; i++) {
> +        engine_set_node_state(nodes[i], EN_STALE);
> +    }
>  }
>
>  /* Do a full recompute (or at least try). If we're not allowed then
>   * mark the node as "aborted".
>   */
> -static bool
> +static void
>  engine_recompute(struct engine_node *node, bool forced, bool allowed)
>  {
>      VLOG_DBG("node: %s, recompute (%s)", node->name,
> @@ -145,12 +238,12 @@ engine_recompute(struct engine_node *node, bool
forced, bool allowed)
>
>      if (!allowed) {
>          VLOG_DBG("node: %s, recompute aborted", node->name);
> -        return false;
> +        engine_set_node_state(node, EN_ABORTED);
> +        return;
>      }
>
> +    /* Run the node handler which might change state. */
>      node->run(node);
> -    VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> -    return true;
>  }
>
>  /* Return true if the node could be computed without triggerring a full
> @@ -161,7 +254,7 @@ engine_compute(struct engine_node *node, bool
recompute_allowed)
>  {
>      for (size_t i = 0; i < node->n_inputs; i++) {
>          /* If the input node data changed call its change handler. */
> -        if (node->inputs[i].node->changed) {
> +        if (node->inputs[i].node->state == EN_UPDATED) {
>              VLOG_DBG("node: %s, handle change for input %s",
>                       node->name, node->inputs[i].node->name);
>
> @@ -172,57 +265,61 @@ engine_compute(struct engine_node *node, bool
recompute_allowed)
>                  VLOG_DBG("node: %s, can't handle change for input %s, "
>                           "fall back to recompute",
>                           node->name, node->inputs[i].node->name);
> -                if (!engine_recompute(node, false, recompute_allowed)) {
> +                engine_recompute(node, false, recompute_allowed);
> +                if (engine_aborted(node)) {

The aborted state was propagated through the recursive logic, but it is not
the case in this new implementation. Is this on purpose?

>                      return false;
>                  }
>              }
>          }
>      }
> -
>      return true;
>  }
>
> -bool engine_run(struct engine_node *node, uint64_t run_id)
> +static void
> +engine_run_node(struct engine_node *node)
>  {
> -    if (node->run_id == run_id) {
> -        /* The node was already updated in this run (could be input for
> -         * multiple other nodes). Stop processing.
> -         */
> -        return true;
> -    }
> -
> -    /* Initialize the node for this run. */
> -    node->run_id = run_id;
> -    node->changed = false;
> -
>      if (!node->n_inputs) {
> +        /* Run the node handler which might change state. */
>          node->run(node);
> -        VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> -        return true;
> +        return;
>      }
>
> +    bool input_stale = false;
>      for (size_t i = 0; i < node->n_inputs; i++) {
> -        if (!engine_run(node->inputs[i].node, run_id)) {
> -            return false;
> +        if (!engine_node_valid(node->inputs[i].node)) {
> +            /* If the input node aborted computation, move to EN_ABORTED.
> +             * This will be propagated to following nodes.
> +             */
> +            if (engine_aborted(node->inputs[i].node)) {
> +                engine_set_node_state(node, EN_ABORTED);
> +            }
> +
> +            input_stale = true;
>          }
>      }
>
> -    bool need_compute = false;
> +    /* If at least one input is stale, don't change state. */
> +    if (input_stale) {
> +        return;
> +    }
>
>      if (engine_force_recompute) {
> -        return engine_recompute(node, true, !engine_abort_recompute);
> +        engine_recompute(node, true, !engine_abort_recompute);
> +        return;
>      }
>
>      /* If any of the inputs updated data but there is no change_handler,
then
>       * recompute the current node too.
>       */
> +    bool need_compute = false;
>      for (size_t i = 0; i < node->n_inputs; i++) {
> -        if (node->inputs[i].node->changed) {
> +        if (node->inputs[i].node->state == EN_UPDATED) {
>              need_compute = true;
>
>              /* Trigger a recompute if we don't have a change handler. */
>              if (!node->inputs[i].change_handler) {
> -                return engine_recompute(node, false,
!engine_abort_recompute);
> +                engine_recompute(node, false, !engine_abort_recompute);
> +                return;
>              }
>          }
>      }
> @@ -231,33 +328,47 @@ bool engine_run(struct engine_node *node, uint64_t
run_id)
>          /* If we couldn't compute the node we either aborted or triggered
>           * a full recompute. In any case, stop processing.
>           */
> -        return engine_compute(node, !engine_abort_recompute);
> +        if (!engine_compute(node, !engine_abort_recompute)) {
> +            return;
> +        }
>      }
>
> -    VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> -    return true;
> +    /* If we reached this point, either the node was updated or its
state is
> +     * still valid.
> +     */
> +    if (!engine_node_changed(node)) {
> +        engine_set_node_state(node, EN_VALID);
> +    }
>  }
>
> -bool
> -engine_need_run(struct engine_node *node, uint64_t run_id)
> +void
> +engine_run(struct engine_node **nodes, size_t n_count)
>  {
> -    size_t i;
> +    for (size_t i = 0; i < n_count; i++) {

If an input node didn't finish the run, e.g. aborted, then we shouldn't
continue running for the node depends on it.

> +        engine_run_node(nodes[i]);
> +    }
> +}
>
> -    if (node->run_id == run_id) {
> +bool
> +engine_need_run(struct engine_node **nodes, size_t n_count,
> +                struct engine_node *root_node)
> +{
> +    if (engine_has_run(root_node)) {
>          return false;
>      }
>
> -    if (!node->n_inputs) {
> -        node->run(node);
> -        VLOG_DBG("input node: %s, changed: %d", node->name,
node->changed);
> -        return node->changed;
> -    }
> +    for (size_t i = 0; i < n_count; i++) {
> +        /* Check only leaf nodes. */
> +        if (nodes[i]->n_inputs) {
> +            continue;
> +        }
>
> -    for (i = 0; i < node->n_inputs; i++) {
> -        if (engine_need_run(node->inputs[i].node, run_id)) {
> +        nodes[i]->run(nodes[i]);
> +        VLOG_DBG("input node: %s, state: %s", nodes[i]->name,
> +                 engine_node_state_name[nodes[i]->state]);
> +        if (nodes[i]->state == EN_UPDATED) {
>              return true;
>          }
>      }
> -
>      return false;
>  }
> diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
> index abd41b2..69eb9b6 100644
> --- a/lib/inc-proc-eng.h
> +++ b/lib/inc-proc-eng.h
> @@ -82,10 +82,21 @@ struct engine_node_input {
>      bool (*change_handler)(struct engine_node *node);
>  };
>
> -struct engine_node {
> -    /* A unique id to distinguish each iteration of the engine_run(). */
> -    uint64_t run_id;
> +enum engine_node_state {
> +    EN_STALE,     /* Data in the node is not up to date with the DB. */
> +    EN_UPDATED,   /* Data in the node is valid but was updated during the
> +                   * last run.
> +                   */
> +    EN_VALID,     /* Data in the node is valid and didn't change during
the
> +                   * last run.
> +                   */
> +    EN_ABORTED,   /* During the last run, processing was aborted for
> +                   * this node.
> +                   */
> +    EN_STATE_MAX,
> +};
>
> +struct engine_node {
>      /* A unique name for each node. */
>      char *name;
>
> @@ -102,8 +113,8 @@ struct engine_node {
>       * node. */
>      void *data;
>
> -    /* Whether the data changed in the last engine run. */
> -    bool changed;
> +    /* State of the node after the last engine run. */
> +    enum engine_node_state state;
>
>      /* Method to initialize data. It may be NULL. */
>      void (*init)(struct engine_node *);
> @@ -116,23 +127,36 @@ struct engine_node {
>      void (*run)(struct engine_node *);
>  };
>
> -/* Initialize the data for the engine nodes recursively. It calls each
node's
> +/* Return the array of topologically sorted nodes when starting from
> + * 'root_node'. Stores the number of nodes in 'n_count'.
> + * It should be called before the main loop.
> + */
> +struct engine_node **engine_get_nodes(struct engine_node *root_node,
> +                                      size_t *n_count);
> +
> +/* Initialize the data for the engine nodes. It calls each node's
>   * init() method if not NULL. It should be called before the main loop.
*/
> -void engine_init(struct engine_node *);
> +void engine_init(struct engine_node **nodes, size_t n_count);
> +
> +/* Initialize the engine nodes for a new run. It should be called in the
> + * main processing loop before every potential engine_run().
> + */
> +void engine_init_run(struct engine_node **nodes, size_t n_count,
> +                     struct engine_node *root_node);
>
>  /* Execute the processing recursively, which should be called in the main

This comment should be updated since you changed it to be non-recursive.

> - * loop. Returns true if the execution is compelte, false if it is
aborted,
> - * which could happen when engine_abort_recompute is set. */
> -bool engine_run(struct engine_node *, uint64_t run_id);
> + * loop. Updates the engine node's states accordingly.
> + */
> +void engine_run(struct engine_node **nodes, size_t n_count);
>
> -/* Clean up the data for the engine nodes recursively. It calls each
node's
> +/* Clean up the data for the engine nodes. It calls each node's
>   * cleanup() method if not NULL. It should be called before the program
>   * terminates. */
> -void engine_cleanup(struct engine_node *);
> +void engine_cleanup(struct engine_node **nodes, size_t n_count);
>
>  /* Check if engine needs to run but didn't. */
> -bool
> -engine_need_run(struct engine_node *, uint64_t run_id);
> +bool engine_need_run(struct engine_node **nodes, size_t n_count,
> +                     struct engine_node *root_node);
>
>  /* Get the input node with <name> for <node> */
>  struct engine_node * engine_get_input(const char *input_name,
> @@ -159,8 +183,22 @@ const struct engine_context *
engine_get_context(void);
>
>  void engine_set_context(const struct engine_context *);
>
> -/* Return true if the engine has run for 'node' in the 'run_id'
iteration. */
> -bool engine_has_run(struct engine_node *node, uint64_t run_id);
> +void engine_set_node_state_at(struct engine_node *node,
> +                              enum engine_node_state state,
> +                              const char *where);
> +
> +/* Return true if during the last iteration the node's data was updated.
*/
> +bool engine_node_changed(struct engine_node *node);
> +
> +/* Return true if the engine has run for 'node' in the last iteration. */
> +bool engine_has_run(struct engine_node *node);
> +
> +/* Returns true if during the last engine run we had to abort
processing. */
> +bool engine_aborted(struct engine_node *node);
> +
> +/* Set the state of the node and log changes. */
> +#define engine_set_node_state(node, state) \
> +    engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR)
>
>  struct ed_ovsdb_index {
>      const char *name;
> @@ -187,6 +225,7 @@ void engine_ovsdb_node_add_index(struct engine_node
*, const char *name,
>      struct engine_node en_##NAME = { \
>          .name = NAME_STR, \
>          .data = &ed_##NAME, \
> +        .state = EN_STALE, \
>          .init = en_##NAME##_init, \
>          .run = en_##NAME##_run, \
>          .cleanup = en_##NAME##_cleanup, \
> @@ -201,10 +240,10 @@ en_##DB_NAME##_##TBL_NAME##_run(struct engine_node
*node) \
>      const struct DB_NAME##rec_##TBL_NAME##_table *table = \
>          EN_OVSDB_GET(node); \
>      if (DB_NAME##rec_##TBL_NAME##_table_track_get_first(table)) { \
> -        node->changed = true; \
> +        engine_set_node_state(node, EN_UPDATED); \
>          return; \
>      } \
> -    node->changed = false; \
> +    engine_set_node_state(node, EN_VALID); \
>  } \
>  static void (*en_##DB_NAME##_##TBL_NAME##_init)(struct engine_node
*node) \
>              = NULL; \
>
_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to