On Wed, Dec 4, 2019 at 10:27 PM Han Zhou <hz...@ovn.org> wrote:
>
>
>
> On Wed, Dec 4, 2019 at 8:28 AM Dumitru Ceara <dce...@redhat.com> wrote:
> >
> > The incremental processing engine might stop a run before the
> > en_runtime_data node is processed. In such cases the ed_runtime_data
> > fields might contain pointers to already deleted SB records. For
> > example, if a port binding corresponding to a patch port is removed from
> > the SB database and the incremental processing engine aborts before the
> > en_runtime_data node is processed then the corresponding local_datapath
> > hashtable entry in ed_runtime_data is stale and will store a pointer to
> > the already freed sbrec_port_binding record.
> >
> > This will cause invalid memory accesses in various places (e.g.,
> > pinctrl_run() -> prepare_ipv6_ras()).
> >
> > To fix the issue we introduce the engine_get_data() API which must be
> > called in order to safely access internal node data. If the node is in
> > state EN_STALE or EN_ABORTED, engine_get_data() returns NULL as the
> > references might be stale.
> >
> > This commit also adds an "is_valid()" method to engine nodes to allow
> > users to override the default behavior of determining if data is valid in a
> > node (e.g., for the ct-zones node the data is always safe to access).
> >
> > Also, all interactions with node data outside inc-proc-eng.c are now
> > performed through APIs and never by directly accessing the node->data
> > field. This makes it easier to ensure that we don't access invalid
> > (stale) data.
> >
> > CC: Han Zhou <hz...@ovn.org>
> > Fixes: ca278d98a4f5 ("ovn-controller: Initial use of incremental engine - 
> > quiet mode.")
> > Signed-off-by: Dumitru Ceara <dce...@redhat.com>
> >
> > ---
> > v8:
> > - First two patches were applied to master, so resending the last patch
> >   in the series as standalone patch.
> > - Address Han's comments:
> >     - Remove internal_data from engine_node.
> >     - Use the newly added engine_get_data() to make sure we access valid
> >       data outside the incremental processing engine.
> >     - Remove data storage outside the nodes and have the init()
> >       callbacks allocate and initialize required memory.
> > - Also, for better data encapsulations:
> >     - Remove all references of engine_node->data from ovn-controller.c
> >       and use inc-proc-eng APIs to access the data.
> >     - Change all init/cleanup/run/change_handlers to use data supplied
> >       as argument.
> >     - Use the newly added engine_get_input_data api to access input node
> >       data.
> >     - At init time, use engine_get_internal_data() to initialize the
> >       callback arguments for the unix cmd handlers and to initialize
> >       ofctrl.
> > ---
> >  controller/ovn-controller.c | 457 
> > +++++++++++++++++++++++---------------------
> >  lib/inc-proc-eng.c          |  59 +++++-
> >  lib/inc-proc-eng.h          | 115 ++++++++---
> >  3 files changed, 370 insertions(+), 261 deletions(-)
> >
> > diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
> > index 64c44c9..5874776 100644
> > --- a/controller/ovn-controller.c
> > +++ b/controller/ovn-controller.c
> > @@ -739,26 +739,25 @@ struct ed_type_ofctrl_is_connected {
> >      bool connected;
> >  };
> >
> > -static void
> > -en_ofctrl_is_connected_init(struct engine_node *node)
> > +static void *
> > +en_ofctrl_is_connected_init(struct engine_node *node OVS_UNUSED,
> > +                            struct engine_arg *arg OVS_UNUSED)
> >  {
> > -    struct ed_type_ofctrl_is_connected *data =
> > -        (struct ed_type_ofctrl_is_connected *)node->data;
> > -    data->connected = false;
> > +    struct ed_type_ofctrl_is_connected *data = xzalloc(sizeof *data);
> > +    return data;
> >  }
> >
> >  static void
> > -en_ofctrl_is_connected_cleanup(struct engine_node *node OVS_UNUSED)
> > +en_ofctrl_is_connected_cleanup(void *data OVS_UNUSED)
> >  {
> >  }
> >
> >  static void
> > -en_ofctrl_is_connected_run(struct engine_node *node)
> > +en_ofctrl_is_connected_run(struct engine_node *node, void *data)
> >  {
> > -    struct ed_type_ofctrl_is_connected *data =
> > -        (struct ed_type_ofctrl_is_connected *)node->data;
> > -    if (data->connected != ofctrl_is_connected()) {
> > -        data->connected = !data->connected;
> > +    struct ed_type_ofctrl_is_connected *of_data = data;
> > +    if (of_data->connected != ofctrl_is_connected()) {
> > +        of_data->connected = !of_data->connected;
> >          engine_set_node_state(node, EN_UPDATED);
> >          return;
> >      }
> > @@ -773,21 +772,24 @@ struct ed_type_addr_sets {
> >      struct sset updated;
> >  };
> >
> > -static void
> > -en_addr_sets_init(struct engine_node *node)
> > +static void *
> > +en_addr_sets_init(struct engine_node *node OVS_UNUSED,
> > +                  struct engine_arg *arg OVS_UNUSED)
> >  {
> > -    struct ed_type_addr_sets *as = (struct ed_type_addr_sets *)node->data;
> > +    struct ed_type_addr_sets *as = xzalloc(sizeof *as);
> > +
> >      shash_init(&as->addr_sets);
> >      as->change_tracked = false;
> >      sset_init(&as->new);
> >      sset_init(&as->deleted);
> >      sset_init(&as->updated);
> > +    return as;
> >  }
> >
> >  static void
> > -en_addr_sets_cleanup(struct engine_node *node)
> > +en_addr_sets_cleanup(void *data)
> >  {
> > -    struct ed_type_addr_sets *as = (struct ed_type_addr_sets *)node->data;
> > +    struct ed_type_addr_sets *as = data;
> >      expr_const_sets_destroy(&as->addr_sets);
> >      shash_destroy(&as->addr_sets);
> >      sset_destroy(&as->new);
> > @@ -796,9 +798,9 @@ en_addr_sets_cleanup(struct engine_node *node)
> >  }
> >
> >  static void
> > -en_addr_sets_run(struct engine_node *node)
> > +en_addr_sets_run(struct engine_node *node, void *data)
> >  {
> > -    struct ed_type_addr_sets *as = (struct ed_type_addr_sets *)node->data;
> > +    struct ed_type_addr_sets *as = data;
> >
> >      sset_clear(&as->new);
> >      sset_clear(&as->deleted);
> > @@ -816,9 +818,9 @@ en_addr_sets_run(struct engine_node *node)
> >  }
> >
> >  static bool
> > -addr_sets_sb_address_set_handler(struct engine_node *node)
> > +addr_sets_sb_address_set_handler(struct engine_node *node, void *data)
> >  {
> > -    struct ed_type_addr_sets *as = (struct ed_type_addr_sets *)node->data;
> > +    struct ed_type_addr_sets *as = data;
> >
> >      sset_clear(&as->new);
> >      sset_clear(&as->deleted);
> > @@ -850,21 +852,24 @@ struct ed_type_port_groups{
> >      struct sset updated;
> >  };
> >
> > -static void
> > -en_port_groups_init(struct engine_node *node)
> > +static void *
> > +en_port_groups_init(struct engine_node *node OVS_UNUSED,
> > +                    struct engine_arg *arg OVS_UNUSED)
> >  {
> > -    struct ed_type_port_groups *pg = (struct ed_type_port_groups 
> > *)node->data;
> > +    struct ed_type_port_groups *pg = xzalloc(sizeof *pg);
> > +
> >      shash_init(&pg->port_groups);
> >      pg->change_tracked = false;
> >      sset_init(&pg->new);
> >      sset_init(&pg->deleted);
> >      sset_init(&pg->updated);
> > +    return pg;
> >  }
> >
> >  static void
> > -en_port_groups_cleanup(struct engine_node *node)
> > +en_port_groups_cleanup(void *data)
> >  {
> > -    struct ed_type_port_groups *pg = (struct ed_type_port_groups 
> > *)node->data;
> > +    struct ed_type_port_groups *pg = data;
> >      expr_const_sets_destroy(&pg->port_groups);
> >      shash_destroy(&pg->port_groups);
> >      sset_destroy(&pg->new);
> > @@ -873,9 +878,9 @@ en_port_groups_cleanup(struct engine_node *node)
> >  }
> >
> >  static void
> > -en_port_groups_run(struct engine_node *node)
> > +en_port_groups_run(struct engine_node *node, void *data)
> >  {
> > -    struct ed_type_port_groups *pg = (struct ed_type_port_groups 
> > *)node->data;
> > +    struct ed_type_port_groups *pg = data;
> >
> >      sset_clear(&pg->new);
> >      sset_clear(&pg->deleted);
> > @@ -893,9 +898,9 @@ en_port_groups_run(struct engine_node *node)
> >  }
> >
> >  static bool
> > -port_groups_sb_port_group_handler(struct engine_node *node)
> > +port_groups_sb_port_group_handler(struct engine_node *node, void *data)
> >  {
> > -    struct ed_type_port_groups *pg = (struct ed_type_port_groups 
> > *)node->data;
> > +    struct ed_type_port_groups *pg = data;
> >
> >      sset_clear(&pg->new);
> >      sset_clear(&pg->deleted);
> > @@ -936,47 +941,46 @@ struct ed_type_runtime_data {
> >      struct sset active_tunnels;
> >  };
> >
> > -static void
> > -en_runtime_data_init(struct engine_node *node)
> > +static void *
> > +en_runtime_data_init(struct engine_node *node OVS_UNUSED,
> > +                     struct engine_arg *arg OVS_UNUSED)
> >  {
> > -    struct ed_type_runtime_data *data =
> > -        (struct ed_type_runtime_data *)node->data;
> > +    struct ed_type_runtime_data *data = xzalloc(sizeof *data);
> >
> >      hmap_init(&data->local_datapaths);
> >      sset_init(&data->local_lports);
> >      sset_init(&data->local_lport_ids);
> >      sset_init(&data->active_tunnels);
> > +    return data;
> >  }
> >
> >  static void
> > -en_runtime_data_cleanup(struct engine_node *node)
> > +en_runtime_data_cleanup(void *data)
> >  {
> > -    struct ed_type_runtime_data *data =
> > -        (struct ed_type_runtime_data *)node->data;
> > +    struct ed_type_runtime_data *rt_data = data;
> >
> > -    sset_destroy(&data->local_lports);
> > -    sset_destroy(&data->local_lport_ids);
> > -    sset_destroy(&data->active_tunnels);
> > +    sset_destroy(&rt_data->local_lports);
> > +    sset_destroy(&rt_data->local_lport_ids);
> > +    sset_destroy(&rt_data->active_tunnels);
> >      struct local_datapath *cur_node, *next_node;
> >      HMAP_FOR_EACH_SAFE (cur_node, next_node, hmap_node,
> > -                        &data->local_datapaths) {
> > +                        &rt_data->local_datapaths) {
> >          free(cur_node->peer_ports);
> >          free(cur_node->ports);
> > -        hmap_remove(&data->local_datapaths, &cur_node->hmap_node);
> > +        hmap_remove(&rt_data->local_datapaths, &cur_node->hmap_node);
> >          free(cur_node);
> >      }
> > -    hmap_destroy(&data->local_datapaths);
> > +    hmap_destroy(&rt_data->local_datapaths);
> >  }
> >
> >  static void
> > -en_runtime_data_run(struct engine_node *node)
> > +en_runtime_data_run(struct engine_node *node, void *data)
> >  {
> > -    struct ed_type_runtime_data *data =
> > -        (struct ed_type_runtime_data *)node->data;
> > -    struct hmap *local_datapaths = &data->local_datapaths;
> > -    struct sset *local_lports = &data->local_lports;
> > -    struct sset *local_lport_ids = &data->local_lport_ids;
> > -    struct sset *active_tunnels = &data->active_tunnels;
> > +    struct ed_type_runtime_data *rt_data = data;
> > +    struct hmap *local_datapaths = &rt_data->local_datapaths;
> > +    struct sset *local_lports = &rt_data->local_lports;
> > +    struct sset *local_lport_ids = &rt_data->local_lport_ids;
> > +    struct sset *active_tunnels = &rt_data->active_tunnels;
> >
> >      static bool first_run = true;
> >      if (first_run) {
> > @@ -1020,8 +1024,7 @@ en_runtime_data_run(struct engine_node *node)
> >      ovs_assert(chassis);
> >
> >      struct ed_type_ofctrl_is_connected *ed_ofctrl_is_connected =
> > -        (struct ed_type_ofctrl_is_connected *)engine_get_input(
> > -            "ofctrl_is_connected", node)->data;
> > +        engine_get_input_data("ofctrl_is_connected", node);
> >      if (ed_ofctrl_is_connected->connected) {
> >          /* Calculate the active tunnels only if have an an active
> >           * OpenFlow connection to br-int.
> > @@ -1075,12 +1078,11 @@ en_runtime_data_run(struct engine_node *node)
> >  }
> >
> >  static bool
> > -runtime_data_sb_port_binding_handler(struct engine_node *node)
> > +runtime_data_sb_port_binding_handler(struct engine_node *node, void *data)
> >  {
> > -    struct ed_type_runtime_data *data =
> > -        (struct ed_type_runtime_data *)node->data;
> > -    struct sset *local_lports = &data->local_lports;
> > -    struct sset *active_tunnels = &data->active_tunnels;
> > +    struct ed_type_runtime_data *rt_data = data;
> > +    struct sset *local_lports = &rt_data->local_lports;
> > +    struct sset *active_tunnels = &rt_data->active_tunnels;
> >
> >      struct ovsrec_open_vswitch_table *ovs_table =
> >          (struct ovsrec_open_vswitch_table *)EN_OVSDB_GET(
> > @@ -1119,10 +1121,10 @@ struct ed_type_ct_zones {
> >      struct simap current;
> >  };
> >
> > -static void
> > -en_ct_zones_init(struct engine_node *node)
> > +static void *
> > +en_ct_zones_init(struct engine_node *node, struct engine_arg *arg 
> > OVS_UNUSED)
> >  {
> > -    struct ed_type_ct_zones *data = node->data;
> > +    struct ed_type_ct_zones *data = xzalloc(sizeof *data);
> >      struct ovsrec_open_vswitch_table *ovs_table =
> >          (struct ovsrec_open_vswitch_table *)EN_OVSDB_GET(
> >              engine_get_input("OVS_open_vswitch", node));
> > @@ -1136,56 +1138,63 @@ en_ct_zones_init(struct engine_node *node)
> >      memset(data->bitmap, 0, sizeof data->bitmap);
> >      bitmap_set1(data->bitmap, 0); /* Zone 0 is reserved. */
> >      restore_ct_zones(bridge_table, ovs_table, &data->current, 
> > data->bitmap);
> > +    return data;
> >  }
> >
> >  static void
> > -en_ct_zones_cleanup(struct engine_node *node)
> > +en_ct_zones_cleanup(void *data)
> >  {
> > -    struct ed_type_ct_zones *data = node->data;
> > +    struct ed_type_ct_zones *ct_zones_data = data;
> >
> > -    simap_destroy(&data->current);
> > -    shash_destroy(&data->pending);
> > +    simap_destroy(&ct_zones_data->current);
> > +    shash_destroy(&ct_zones_data->pending);
> >  }
> >
> >  static void
> > -en_ct_zones_run(struct engine_node *node)
> > +en_ct_zones_run(struct engine_node *node, void *data)
> >  {
> > -    struct ed_type_ct_zones *data = node->data;
> > +    struct ed_type_ct_zones *ct_zones_data = data;
> >      struct ed_type_runtime_data *rt_data =
> > -        (struct ed_type_runtime_data *)engine_get_input(
> > -            "runtime_data", node)->data;
> > +        engine_get_input_data("runtime_data", node);
> >
> >      update_ct_zones(&rt_data->local_lports, &rt_data->local_datapaths,
> > -                    &data->current, data->bitmap, &data->pending);
> > +                    &ct_zones_data->current, ct_zones_data->bitmap,
> > +                    &ct_zones_data->pending);
> >
> >      engine_set_node_state(node, EN_UPDATED);
> >  }
> >
> > +/* The data in the ct_zones node is always valid (i.e., no stale 
> > pointers). */
> > +static bool
> > +en_ct_zones_is_valid(struct engine_node *node OVS_UNUSED)
> > +{
> > +    return true;
> > +}
> > +
> >  struct ed_type_mff_ovn_geneve {
> >      enum mf_field_id mff_ovn_geneve;
> >  };
> >
> > -static void
> > -en_mff_ovn_geneve_init(struct engine_node *node)
> > +static void *
> > +en_mff_ovn_geneve_init(struct engine_node *node OVS_UNUSED,
> > +                       struct engine_arg *arg OVS_UNUSED)
> >  {
> > -    struct ed_type_mff_ovn_geneve *data =
> > -        (struct ed_type_mff_ovn_geneve *)node->data;
> > -    data->mff_ovn_geneve = 0;
> > +    struct ed_type_mff_ovn_geneve *data = xzalloc(sizeof *data);
> > +    return data;
> >  }
> >
> >  static void
> > -en_mff_ovn_geneve_cleanup(struct engine_node *node OVS_UNUSED)
> > +en_mff_ovn_geneve_cleanup(void *data OVS_UNUSED)
> >  {
> >  }
> >
> >  static void
> > -en_mff_ovn_geneve_run(struct engine_node *node)
> > +en_mff_ovn_geneve_run(struct engine_node *node, void *data)
> >  {
> > -    struct ed_type_mff_ovn_geneve *data =
> > -        (struct ed_type_mff_ovn_geneve *)node->data;
> > +    struct ed_type_mff_ovn_geneve *ed_mff_ovn_geneve = data;
> >      enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id();
> > -    if (data->mff_ovn_geneve != mff_ovn_geneve) {
> > -        data->mff_ovn_geneve = mff_ovn_geneve;
> > +    if (ed_mff_ovn_geneve->mff_ovn_geneve != mff_ovn_geneve) {
> > +        ed_mff_ovn_geneve->mff_ovn_geneve = mff_ovn_geneve;
> >          engine_set_node_state(node, EN_UPDATED);
> >          return;
> >      }
> > @@ -1205,48 +1214,46 @@ struct ed_type_flow_output {
> >      struct lflow_resource_ref lflow_resource_ref;
> >  };
> >
> > -static void
> > -en_flow_output_init(struct engine_node *node)
> > +static void *
> > +en_flow_output_init(struct engine_node *node OVS_UNUSED,
> > +                    struct engine_arg *arg OVS_UNUSED)
> >  {
> > -    struct ed_type_flow_output *data =
> > -        (struct ed_type_flow_output *)node->data;
> > +    struct ed_type_flow_output *data = xzalloc(sizeof *data);
> > +
> >      ovn_desired_flow_table_init(&data->flow_table);
> >      ovn_extend_table_init(&data->group_table);
> >      ovn_extend_table_init(&data->meter_table);
> >      data->conj_id_ofs = 1;
> >      lflow_resource_init(&data->lflow_resource_ref);
> > +    return data;
> >  }
> >
> >  static void
> > -en_flow_output_cleanup(struct engine_node *node)
> > +en_flow_output_cleanup(void *data)
> >  {
> > -    struct ed_type_flow_output *data =
> > -        (struct ed_type_flow_output *)node->data;
> > -    ovn_desired_flow_table_destroy(&data->flow_table);
> > -    ovn_extend_table_destroy(&data->group_table);
> > -    ovn_extend_table_destroy(&data->meter_table);
> > -    lflow_resource_destroy(&data->lflow_resource_ref);
> > +    struct ed_type_flow_output *flow_output_data = data;
> > +    ovn_desired_flow_table_destroy(&flow_output_data->flow_table);
> > +    ovn_extend_table_destroy(&flow_output_data->group_table);
> > +    ovn_extend_table_destroy(&flow_output_data->meter_table);
> > +    lflow_resource_destroy(&flow_output_data->lflow_resource_ref);
> >  }
> >
> >  static void
> > -en_flow_output_run(struct engine_node *node)
> > +en_flow_output_run(struct engine_node *node, void *data)
> >  {
> >      struct ed_type_runtime_data *rt_data =
> > -        (struct ed_type_runtime_data *)engine_get_input(
> > -            "runtime_data", node)->data;
> > +        engine_get_input_data("runtime_data", node);
> >      struct hmap *local_datapaths = &rt_data->local_datapaths;
> >      struct sset *local_lports = &rt_data->local_lports;
> >      struct sset *local_lport_ids = &rt_data->local_lport_ids;
> >      struct sset *active_tunnels = &rt_data->active_tunnels;
> >
> >      struct ed_type_ct_zones *ct_zones_data =
> > -        (struct ed_type_ct_zones *)engine_get_input(
> > -            "ct_zones", node)->data;
> > +        engine_get_input_data("ct_zones", node);
> >      struct simap *ct_zones = &ct_zones_data->current;
> >
> >      struct ed_type_mff_ovn_geneve *ed_mff_ovn_geneve =
> > -        (struct ed_type_mff_ovn_geneve *)engine_get_input(
> > -            "mff_ovn_geneve", node)->data;
> > +        engine_get_input_data("mff_ovn_geneve", node);
> >      enum mf_field_id mff_ovn_geneve = ed_mff_ovn_geneve->mff_ovn_geneve;
> >
> >      struct ovsrec_open_vswitch_table *ovs_table =
> > @@ -1263,12 +1270,11 @@ en_flow_output_run(struct engine_node *node)
> >                  engine_get_input("SB_chassis", node),
> >                  "name");
> >      struct ed_type_addr_sets *as_data =
> > -        (struct ed_type_addr_sets *)engine_get_input("addr_sets", 
> > node)->data;
> > +        engine_get_input_data("addr_sets", node);
> >      struct shash *addr_sets = &as_data->addr_sets;
> >
> >      struct ed_type_port_groups *pg_data =
> > -        (struct ed_type_port_groups *)engine_get_input(
> > -            "port_groups", node)->data;
> > +        engine_get_input_data("port_groups", node);
> >      struct shash *port_groups = &pg_data->port_groups;
> >
> >      const struct sbrec_chassis *chassis = NULL;
> > @@ -1278,8 +1284,7 @@ en_flow_output_run(struct engine_node *node)
> >
> >      ovs_assert(br_int && chassis);
> >
> > -    struct ed_type_flow_output *fo =
> > -        (struct ed_type_flow_output *)node->data;
> > +    struct ed_type_flow_output *fo = data;
> >      struct ovn_desired_flow_table *flow_table = &fo->flow_table;
> >      struct ovn_extend_table *group_table = &fo->group_table;
> >      struct ovn_extend_table *meter_table = &fo->meter_table;
> > @@ -1359,21 +1364,19 @@ en_flow_output_run(struct engine_node *node)
> >  }
> >
> >  static bool
> > -flow_output_sb_logical_flow_handler(struct engine_node *node)
> > -{
> > -    struct ed_type_runtime_data *data =
> > -        (struct ed_type_runtime_data *)engine_get_input(
> > -                "runtime_data", node)->data;
> > -    struct hmap *local_datapaths = &data->local_datapaths;
> > -    struct sset *local_lport_ids = &data->local_lport_ids;
> > -    struct sset *active_tunnels = &data->active_tunnels;
> > +flow_output_sb_logical_flow_handler(struct engine_node *node, void *data)
> > +{
> > +    struct ed_type_runtime_data *rt_data =
> > +        engine_get_input_data("runtime_data", node);
> > +    struct hmap *local_datapaths = &rt_data->local_datapaths;
> > +    struct sset *local_lport_ids = &rt_data->local_lport_ids;
> > +    struct sset *active_tunnels = &rt_data->active_tunnels;
> >      struct ed_type_addr_sets *as_data =
> > -        (struct ed_type_addr_sets *)engine_get_input("addr_sets", 
> > node)->data;
> > +        engine_get_input_data("addr_sets", node);
> >      struct shash *addr_sets = &as_data->addr_sets;
> >
> >      struct ed_type_port_groups *pg_data =
> > -        (struct ed_type_port_groups *)engine_get_input(
> > -            "port_groups", node)->data;
> > +        engine_get_input_data("port_groups", node);
> >      struct shash *port_groups = &pg_data->port_groups;
> >
> >      struct ovsrec_open_vswitch_table *ovs_table =
> > @@ -1397,8 +1400,7 @@ flow_output_sb_logical_flow_handler(struct 
> > engine_node *node)
> >
> >      ovs_assert(br_int && chassis);
> >
> > -    struct ed_type_flow_output *fo =
> > -        (struct ed_type_flow_output *)node->data;
> > +    struct ed_type_flow_output *fo = data;
> >      struct ovn_desired_flow_table *flow_table = &fo->flow_table;
> >      struct ovn_extend_table *group_table = &fo->group_table;
> >      struct ovn_extend_table *meter_table = &fo->meter_table;
> > @@ -1442,7 +1444,7 @@ flow_output_sb_logical_flow_handler(struct 
> > engine_node *node)
> >  }
> >
> >  static bool
> > -flow_output_sb_mac_binding_handler(struct engine_node *node)
> > +flow_output_sb_mac_binding_handler(struct engine_node *node, void *data)
> >  {
> >      struct ovsdb_idl_index *sbrec_port_binding_by_name =
> >          engine_ovsdb_node_get_index(
> > @@ -1453,8 +1455,7 @@ flow_output_sb_mac_binding_handler(struct engine_node 
> > *node)
> >          (struct sbrec_mac_binding_table *)EN_OVSDB_GET(
> >              engine_get_input("SB_mac_binding", node));
> >
> > -    struct ed_type_flow_output *fo =
> > -        (struct ed_type_flow_output *)node->data;
> > +    struct ed_type_flow_output *fo = data;
> >      struct ovn_desired_flow_table *flow_table = &fo->flow_table;
> >
> >      lflow_handle_changed_neighbors(sbrec_port_binding_by_name,
> > @@ -1465,22 +1466,19 @@ flow_output_sb_mac_binding_handler(struct 
> > engine_node *node)
> >  }
> >
> >  static bool
> > -flow_output_sb_port_binding_handler(struct engine_node *node)
> > +flow_output_sb_port_binding_handler(struct engine_node *node, void *data)
> >  {
> > -    struct ed_type_runtime_data *data =
> > -        (struct ed_type_runtime_data *)engine_get_input(
> > -                "runtime_data", node)->data;
> > -    struct hmap *local_datapaths = &data->local_datapaths;
> > -    struct sset *active_tunnels = &data->active_tunnels;
> > +    struct ed_type_runtime_data *rt_data =
> > +        engine_get_input_data("runtime_data", node);
> > +    struct hmap *local_datapaths = &rt_data->local_datapaths;
> > +    struct sset *active_tunnels = &rt_data->active_tunnels;
> >
> >      struct ed_type_ct_zones *ct_zones_data =
> > -        (struct ed_type_ct_zones *)engine_get_input(
> > -            "ct_zones", node)->data;
> > +        engine_get_input_data("ct_zones", node);
> >      struct simap *ct_zones = &ct_zones_data->current;
> >
> >      struct ed_type_mff_ovn_geneve *ed_mff_ovn_geneve =
> > -        (struct ed_type_mff_ovn_geneve *)engine_get_input(
> > -            "mff_ovn_geneve", node)->data;
> > +        engine_get_input_data("mff_ovn_geneve", node);
> >      enum mf_field_id mff_ovn_geneve = ed_mff_ovn_geneve->mff_ovn_geneve;
> >
> >      struct ovsrec_open_vswitch_table *ovs_table =
> > @@ -1502,8 +1500,7 @@ flow_output_sb_port_binding_handler(struct 
> > engine_node *node)
> >      }
> >      ovs_assert(br_int && chassis);
> >
> > -    struct ed_type_flow_output *fo =
> > -        (struct ed_type_flow_output *)node->data;
> > +    struct ed_type_flow_output *fo = data;
> >      struct ovn_desired_flow_table *flow_table = &fo->flow_table;
> >
> >      struct ovsdb_idl_index *sbrec_port_binding_by_name =
> > @@ -1573,21 +1570,18 @@ flow_output_sb_port_binding_handler(struct 
> > engine_node *node)
> >  }
> >
> >  static bool
> > -flow_output_sb_multicast_group_handler(struct engine_node *node)
> > +flow_output_sb_multicast_group_handler(struct engine_node *node, void 
> > *data)
> >  {
> > -    struct ed_type_runtime_data *data =
> > -        (struct ed_type_runtime_data *)engine_get_input(
> > -                "runtime_data", node)->data;
> > -    struct hmap *local_datapaths = &data->local_datapaths;
> > +    struct ed_type_runtime_data *rt_data =
> > +        engine_get_input_data("runtime_data", node);
> > +    struct hmap *local_datapaths = &rt_data->local_datapaths;
> >
> >      struct ed_type_ct_zones *ct_zones_data =
> > -        (struct ed_type_ct_zones *)engine_get_input(
> > -            "ct_zones", node)->data;
> > +        engine_get_input_data("ct_zones", node);
> >      struct simap *ct_zones = &ct_zones_data->current;
> >
> >      struct ed_type_mff_ovn_geneve *ed_mff_ovn_geneve =
> > -        (struct ed_type_mff_ovn_geneve *)engine_get_input(
> > -            "mff_ovn_geneve", node)->data;
> > +        engine_get_input_data("mff_ovn_geneve", node);
> >      enum mf_field_id mff_ovn_geneve = ed_mff_ovn_geneve->mff_ovn_geneve;
> >
> >      struct ovsrec_open_vswitch_table *ovs_table =
> > @@ -1609,8 +1603,7 @@ flow_output_sb_multicast_group_handler(struct 
> > engine_node *node)
> >      }
> >      ovs_assert(br_int && chassis);
> >
> > -    struct ed_type_flow_output *fo =
> > -        (struct ed_type_flow_output *)node->data;
> > +    struct ed_type_flow_output *fo = data;
> >      struct ovn_desired_flow_table *flow_table = &fo->flow_table;
> >
> >      struct sbrec_multicast_group_table *multicast_group_table =
> > @@ -1627,23 +1620,21 @@ flow_output_sb_multicast_group_handler(struct 
> > engine_node *node)
> >  }
> >
> >  static bool
> > -_flow_output_resource_ref_handler(struct engine_node *node,
> > -                                 enum ref_type ref_type)
> > +_flow_output_resource_ref_handler(struct engine_node *node, void *data,
> > +                                  enum ref_type ref_type)
> >  {
> > -    struct ed_type_runtime_data *data =
> > -        (struct ed_type_runtime_data *)engine_get_input(
> > -                "runtime_data", node)->data;
> > -    struct hmap *local_datapaths = &data->local_datapaths;
> > -    struct sset *local_lport_ids = &data->local_lport_ids;
> > -    struct sset *active_tunnels = &data->active_tunnels;
> > +    struct ed_type_runtime_data *rt_data =
> > +        engine_get_input_data("runtime_data", node);
> > +    struct hmap *local_datapaths = &rt_data->local_datapaths;
> > +    struct sset *local_lport_ids = &rt_data->local_lport_ids;
> > +    struct sset *active_tunnels = &rt_data->active_tunnels;
> >
> >      struct ed_type_addr_sets *as_data =
> > -        (struct ed_type_addr_sets *)engine_get_input("addr_sets", 
> > node)->data;
> > +        engine_get_input_data("addr_sets", node);
> >      struct shash *addr_sets = &as_data->addr_sets;
> >
> >      struct ed_type_port_groups *pg_data =
> > -        (struct ed_type_port_groups *)engine_get_input(
> > -            "port_groups", node)->data;
> > +        engine_get_input_data("port_groups", node);
> >      struct shash *port_groups = &pg_data->port_groups;
> >
> >      struct ovsrec_open_vswitch_table *ovs_table =
> > @@ -1666,8 +1657,7 @@ _flow_output_resource_ref_handler(struct engine_node 
> > *node,
> >
> >      ovs_assert(br_int && chassis);
> >
> > -    struct ed_type_flow_output *fo =
> > -        (struct ed_type_flow_output *)node->data;
> > +    struct ed_type_flow_output *fo = data;
> >      struct ovn_desired_flow_table *flow_table = &fo->flow_table;
> >      struct ovn_extend_table *group_table = &fo->group_table;
> >      struct ovn_extend_table *meter_table = &fo->meter_table;
> > @@ -1774,15 +1764,15 @@ _flow_output_resource_ref_handler(struct 
> > engine_node *node,
> >  }
> >
> >  static bool
> > -flow_output_addr_sets_handler(struct engine_node *node)
> > +flow_output_addr_sets_handler(struct engine_node *node, void *data)
> >  {
> > -    return _flow_output_resource_ref_handler(node, REF_TYPE_ADDRSET);
> > +    return _flow_output_resource_ref_handler(node, data, REF_TYPE_ADDRSET);
> >  }
> >
> >  static bool
> > -flow_output_port_groups_handler(struct engine_node *node)
> > +flow_output_port_groups_handler(struct engine_node *node, void *data)
> >  {
> > -    return _flow_output_resource_ref_handler(node, REF_TYPE_PORTGROUP);
> > +    return _flow_output_resource_ref_handler(node, data, 
> > REF_TYPE_PORTGROUP);
> >  }
> >
> >  struct ovn_controller_exit_args {
> > @@ -1892,15 +1882,7 @@ main(int argc, char *argv[])
> >      stopwatch_create(CONTROLLER_LOOP_STOPWATCH_NAME, SW_MS);
> >
> >      /* Define inc-proc-engine nodes. */
> > -    struct ed_type_ct_zones ed_ct_zones;
> > -    struct ed_type_runtime_data ed_runtime_data;
> > -    struct ed_type_mff_ovn_geneve ed_mff_ovn_geneve;
> > -    struct ed_type_ofctrl_is_connected ed_ofctrl_is_connected;
> > -    struct ed_type_flow_output ed_flow_output;
> > -    struct ed_type_addr_sets ed_addr_sets;
> > -    struct ed_type_port_groups ed_port_groups;
> > -
> > -    ENGINE_NODE(ct_zones, "ct_zones");
> > +    ENGINE_NODE_CUSTOM_DATA(ct_zones, "ct_zones");
> >      ENGINE_NODE(runtime_data, "runtime_data");
> >      ENGINE_NODE(mff_ovn_geneve, "mff_ovn_geneve");
> >      ENGINE_NODE(ofctrl_is_connected, "ofctrl_is_connected");
> > @@ -1916,18 +1898,6 @@ main(int argc, char *argv[])
> >      OVS_NODES
> >  #undef OVS_NODE
> >
> > -    engine_ovsdb_node_add_index(&en_sb_chassis, "name", 
> > sbrec_chassis_by_name);
> > -    engine_ovsdb_node_add_index(&en_sb_multicast_group, "name_datapath",
> > -                                sbrec_multicast_group_by_name_datapath);
> > -    engine_ovsdb_node_add_index(&en_sb_port_binding, "name",
> > -                                sbrec_port_binding_by_name);
> > -    engine_ovsdb_node_add_index(&en_sb_port_binding, "key",
> > -                                sbrec_port_binding_by_key);
> > -    engine_ovsdb_node_add_index(&en_sb_port_binding, "datapath",
> > -                                sbrec_port_binding_by_datapath);
> > -    engine_ovsdb_node_add_index(&en_sb_datapath_binding, "key",
> > -                                sbrec_datapath_binding_by_key);
> > -
> >      /* Add dependencies between inc-proc-engine nodes. */
> >
> >      engine_add_input(&en_addr_sets, &en_sb_address_set,
> > @@ -1976,20 +1946,45 @@ main(int argc, char *argv[])
> >      engine_add_input(&en_runtime_data, &en_sb_port_binding,
> >                       runtime_data_sb_port_binding_handler);
> >
> > -    engine_init(&en_flow_output);
> > +    struct engine_arg engine_arg = {
> > +        .sb_idl = ovnsb_idl_loop.idl,
> > +        .ovs_idl = ovs_idl_loop.idl,
> > +    };
> > +    engine_init(&en_flow_output, &engine_arg);
> >
> > -    ofctrl_init(&ed_flow_output.group_table,
> > -                &ed_flow_output.meter_table,
> > +    engine_ovsdb_node_add_index(&en_sb_chassis, "name", 
> > sbrec_chassis_by_name);
> > +    engine_ovsdb_node_add_index(&en_sb_multicast_group, "name_datapath",
> > +                                sbrec_multicast_group_by_name_datapath);
> > +    engine_ovsdb_node_add_index(&en_sb_port_binding, "name",
> > +                                sbrec_port_binding_by_name);
> > +    engine_ovsdb_node_add_index(&en_sb_port_binding, "key",
> > +                                sbrec_port_binding_by_key);
> > +    engine_ovsdb_node_add_index(&en_sb_port_binding, "datapath",
> > +                                sbrec_port_binding_by_datapath);
> > +    engine_ovsdb_node_add_index(&en_sb_datapath_binding, "key",
> > +                                sbrec_datapath_binding_by_key);
> > +
> > +    struct ed_type_flow_output *flow_output_data =
> > +        engine_get_internal_data(&en_flow_output);
> > +    struct ed_type_ct_zones *ct_zones_data =
> > +        engine_get_internal_data(&en_ct_zones);
> > +    struct ed_type_runtime_data *runtime_data = NULL;
> > +
> > +    ofctrl_init(&flow_output_data->group_table,
> > +                &flow_output_data->meter_table,
> >                  get_ofctrl_probe_interval(ovs_idl_loop.idl));
> >
> >      unixctl_command_register("group-table-list", "", 0, 0,
> > -                             group_table_list, 
> > &ed_flow_output.group_table);
> > +                             group_table_list,
> > +                             &flow_output_data->group_table);
> >
> >      unixctl_command_register("meter-table-list", "", 0, 0,
> > -                             meter_table_list, 
> > &ed_flow_output.meter_table);
> > +                             meter_table_list,
> > +                             &flow_output_data->meter_table);
> >
> >      unixctl_command_register("ct-zone-list", "", 0, 0,
> > -                             ct_zone_list, &ed_ct_zones.current);
> > +                             ct_zone_list,
> > +                             &ct_zones_data->current);
> >
> >      struct pending_pkt pending_pkt = { .conn = NULL };
> >      unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1, inject_pkt,
> > @@ -2065,7 +2060,10 @@ main(int argc, char *argv[])
> >              }
> >
> >              if (br_int) {
> > -                ofctrl_run(br_int, &ed_ct_zones.pending);
> > +                ct_zones_data = engine_get_data(&en_ct_zones);
> > +                if (ct_zones_data) {
> > +                    ofctrl_run(br_int, &ct_zones_data->pending);
> > +                }
> >
> >                  if (chassis) {
> >                      patch_run(ovs_idl_txn,
> > @@ -2105,41 +2103,50 @@ main(int argc, char *argv[])
> >                      }
> >                      stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
> >                                     time_msec());
> > +                    ct_zones_data = engine_get_data(&en_ct_zones);
> >                      if (ovs_idl_txn) {
> > -                        commit_ct_zones(br_int, &ed_ct_zones.pending);
> > +                        if (ct_zones_data) {
> > +                            commit_ct_zones(br_int, 
> > &ct_zones_data->pending);
> > +                        }
> >                          
> > bfd_run(ovsrec_interface_table_get(ovs_idl_loop.idl),
> >                                  br_int, chassis,
> >                                  sbrec_ha_chassis_group_table_get(
> >                                      ovnsb_idl_loop.idl),
> >                                  
> > sbrec_sb_global_table_get(ovnsb_idl_loop.idl));
> >                      }
> > -                    ofctrl_put(&ed_flow_output.flow_table,
> > -                               &ed_ct_zones.pending,
> > -                               sbrec_meter_table_get(ovnsb_idl_loop.idl),
> > -                               get_nb_cfg(sbrec_sb_global_table_get(
> > -                                              ovnsb_idl_loop.idl)),
> > -                               engine_node_changed(&en_flow_output));
> > -                    pinctrl_run(ovnsb_idl_txn,
> > -                                sbrec_datapath_binding_by_key,
> > -                                sbrec_port_binding_by_datapath,
> > -                                sbrec_port_binding_by_key,
> > -                                sbrec_port_binding_by_name,
> > -                                sbrec_mac_binding_by_lport_ip,
> > -                                sbrec_igmp_group,
> > -                                sbrec_ip_multicast,
> > -                                sbrec_dns_table_get(ovnsb_idl_loop.idl),
> > -                                sbrec_controller_event_table_get(
> > -                                    ovnsb_idl_loop.idl),
> > -                                sbrec_service_monitor_table_get(
> > -                                    ovnsb_idl_loop.idl),
> > -                                br_int, chassis,
> > -                                &ed_runtime_data.local_datapaths,
> > -                                &ed_runtime_data.active_tunnels);
> >
> > -                    if (engine_node_changed(&en_runtime_data)) {
> > -                        update_sb_monitors(ovnsb_idl_loop.idl, chassis,
> > -                                           &ed_runtime_data.local_lports,
> > -                                           
> > &ed_runtime_data.local_datapaths);
> > +                    flow_output_data = engine_get_data(&en_flow_output);
> > +                    if (flow_output_data && ct_zones_data) {
> > +                        ofctrl_put(&flow_output_data->flow_table,
> > +                                   &ct_zones_data->pending,
> > +                                   
> > sbrec_meter_table_get(ovnsb_idl_loop.idl),
> > +                                   get_nb_cfg(sbrec_sb_global_table_get(
> > +                                                   ovnsb_idl_loop.idl)),
> > +                                   engine_node_changed(&en_flow_output));
> > +                    }
> > +                    runtime_data = engine_get_data(&en_runtime_data);
> > +                    if (runtime_data) {
> > +                        pinctrl_run(ovnsb_idl_txn,
> > +                                    sbrec_datapath_binding_by_key,
> > +                                    sbrec_port_binding_by_datapath,
> > +                                    sbrec_port_binding_by_key,
> > +                                    sbrec_port_binding_by_name,
> > +                                    sbrec_mac_binding_by_lport_ip,
> > +                                    sbrec_igmp_group,
> > +                                    sbrec_ip_multicast,
> > +                                    
> > sbrec_dns_table_get(ovnsb_idl_loop.idl),
> > +                                    sbrec_controller_event_table_get(
> > +                                        ovnsb_idl_loop.idl),
> > +                                    sbrec_service_monitor_table_get(
> > +                                        ovnsb_idl_loop.idl),
> > +                                    br_int, chassis,
> > +                                    &runtime_data->local_datapaths,
> > +                                    &runtime_data->active_tunnels);
> > +                        if (engine_node_changed(&en_runtime_data)) {
> > +                            update_sb_monitors(ovnsb_idl_loop.idl, chassis,
> > +                                               &runtime_data->local_lports,
> > +                                               
> > &runtime_data->local_datapaths);
> > +                        }
> >                      }
> >                  }
> >
> > @@ -2174,9 +2181,13 @@ main(int argc, char *argv[])
> >
> >
> >              if (pending_pkt.conn) {
> > -                if (br_int && chassis) {
> > +                struct ed_type_addr_sets *as_data =
> > +                    engine_get_data(&en_addr_sets);
> > +                struct ed_type_port_groups *pg_data =
> > +                    engine_get_data(&en_port_groups);
> > +                if (br_int && chassis && as_data && pg_data) {
> >                      char *error = ofctrl_inject_pkt(br_int, 
> > pending_pkt.flow_s,
> > -                        &ed_addr_sets.addr_sets, 
> > &ed_port_groups.port_groups);
> > +                        &as_data->addr_sets, &pg_data->port_groups);
> >                      if (error) {
> >                          unixctl_command_reply_error(pending_pkt.conn, 
> > error);
> >                          free(error);
> > @@ -2214,12 +2225,16 @@ main(int argc, char *argv[])
> >          }
> >
> >          if (ovsdb_idl_loop_commit_and_wait(&ovs_idl_loop) == 1) {
> > -            struct shash_node *iter, *iter_next;
> > -            SHASH_FOR_EACH_SAFE (iter, iter_next, &ed_ct_zones.pending) {
> > -                struct ct_zone_pending_entry *ctzpe = iter->data;
> > -                if (ctzpe->state == CT_ZONE_DB_SENT) {
> > -                    shash_delete(&ed_ct_zones.pending, iter);
> > -                    free(ctzpe);
> > +            ct_zones_data = engine_get_data(&en_ct_zones);
> > +            if (ct_zones_data) {
> > +                struct shash_node *iter, *iter_next;
> > +                SHASH_FOR_EACH_SAFE (iter, iter_next,
> > +                                     &ct_zones_data->pending) {
> > +                    struct ct_zone_pending_entry *ctzpe = iter->data;
> > +                    if (ctzpe->state == CT_ZONE_DB_SENT) {
> > +                        shash_delete(&ct_zones_data->pending, iter);
> > +                        free(ctzpe);
> > +                    }
> >                  }
> >              }
> >          }
> > diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
> > index 59b5cac..9b1479a 100644
> > --- a/lib/inc-proc-eng.c
> > +++ b/lib/inc-proc-eng.c
> > @@ -103,13 +103,16 @@ engine_get_nodes(struct engine_node *node, size_t 
> > *n_count)
> >  }
> >
> >  void
> > -engine_init(struct engine_node *node)
> > +engine_init(struct engine_node *node, struct engine_arg *arg)
> >  {
> >      engine_nodes = engine_get_nodes(node, &engine_n_nodes);
> >
> >      for (size_t i = 0; i < engine_n_nodes; i++) {
> >          if (engine_nodes[i]->init) {
> > -            engine_nodes[i]->init(engine_nodes[i]);
> > +            engine_nodes[i]->data =
> > +                engine_nodes[i]->init(engine_nodes[i], arg);
> > +        } else {
> > +            engine_nodes[i]->data = NULL;
> >          }
> >      }
> >  }
> > @@ -119,8 +122,9 @@ engine_cleanup(void)
> >  {
> >      for (size_t i = 0; i < engine_n_nodes; i++) {
> >          if (engine_nodes[i]->cleanup) {
> > -            engine_nodes[i]->cleanup(engine_nodes[i]);
> > +            engine_nodes[i]->cleanup(engine_nodes[i]->data);
> >          }
> > +        free(engine_nodes[i]->data);
> >      }
> >      free(engine_nodes);
> >      engine_nodes = NULL;
> > @@ -140,9 +144,16 @@ engine_get_input(const char *input_name, struct 
> > engine_node *node)
> >      return NULL;
> >  }
> >
> > +void *
> > +engine_get_input_data(const char *input_name, struct engine_node *node)
> > +{
> > +    struct engine_node *input_node = engine_get_input(input_name, node);
> > +    return engine_get_data(input_node);
> > +}
> > +
> >  void
> >  engine_add_input(struct engine_node *node, struct engine_node *input,
> > -                 bool (*change_handler)(struct engine_node *))
> > +                 bool (*change_handler)(struct engine_node *, void *))
> >  {
> >      ovs_assert(node->n_inputs < ENGINE_MAX_INPUT);
> >      node->inputs[node->n_inputs].node = input;
> > @@ -153,7 +164,7 @@ engine_add_input(struct engine_node *node, struct 
> > engine_node *input,
> >  struct ovsdb_idl_index *
> >  engine_ovsdb_node_get_index(struct engine_node *node, const char *name)
> >  {
> > -    struct ed_type_ovsdb_table *ed = (struct ed_type_ovsdb_table 
> > *)node->data;
> > +    struct ed_type_ovsdb_table *ed = node->data;
> >      for (size_t i = 0; i < ed->n_indexes; i++) {
> >          if (!strcmp(ed->indexes[i].name, name)) {
> >              return ed->indexes[i].index;
> > @@ -167,7 +178,7 @@ void
> >  engine_ovsdb_node_add_index(struct engine_node *node, const char *name,
> >                              struct ovsdb_idl_index *index)
> >  {
> > -    struct ed_type_ovsdb_table *ed = (struct ed_type_ovsdb_table 
> > *)node->data;
> > +    struct ed_type_ovsdb_table *ed = node->data;
> >      ovs_assert(ed->n_indexes < ENGINE_MAX_OVSDB_INDEX);
> >
> >      ed->indexes[ed->n_indexes].name = name;
> > @@ -192,6 +203,19 @@ engine_set_node_state_at(struct engine_node *node,
> >      node->state = state;
> >  }
> >
> > +static bool
> > +engine_node_valid(struct engine_node *node)
> > +{
> > +    if (node->state == EN_UPDATED || node->state == EN_VALID) {
> > +        return true;
> > +    }
> > +
> > +    if (node->is_valid) {
> > +        return node->is_valid(node);
> > +    }
> > +    return false;
> > +}
> > +
> >  bool
> >  engine_node_changed(struct engine_node *node)
> >  {
> > @@ -215,6 +239,21 @@ engine_aborted(void)
> >      return engine_run_aborted;
> >  }
> >
> > +void *
> > +engine_get_data(struct engine_node *node)
> > +{
> > +    if (engine_node_valid(node)) {
> > +        return node->data;
> > +    }
> > +    return NULL;
> > +}
> > +
> > +void *
> > +engine_get_internal_data(struct engine_node *node)
> > +{
> > +    return node->data;
> > +}
> > +
> >  void
> >  engine_init_run(void)
> >  {
> > @@ -240,7 +279,7 @@ engine_recompute(struct engine_node *node, bool forced, 
> > bool allowed)
> >      }
> >
> >      /* Run the node handler which might change state. */
> > -    node->run(node);
> > +    node->run(node, node->data);
> >  }
> >
> >  /* Return true if the node could be computed, false otherwise. */
> > @@ -256,7 +295,7 @@ engine_compute(struct engine_node *node, bool 
> > recompute_allowed)
> >              /* If the input change can't be handled incrementally, run
> >               * the node handler.
> >               */
> > -            if (!node->inputs[i].change_handler(node)) {
> > +            if (!node->inputs[i].change_handler(node, node->data)) {
> >                  VLOG_DBG("node: %s, can't handle change for input %s, "
> >                           "fall back to recompute",
> >                           node->name, node->inputs[i].node->name);
> > @@ -273,7 +312,7 @@ engine_run_node(struct engine_node *node, bool 
> > recompute_allowed)
> >  {
> >      if (!node->n_inputs) {
> >          /* Run the node handler which might change state. */
> > -        node->run(node);
> > +        node->run(node, node->data);
> >          return;
> >      }
> >
> > @@ -345,7 +384,7 @@ engine_need_run(void)
> >              continue;
> >          }
> >
> > -        engine_nodes[i]->run(engine_nodes[i]);
> > +        engine_nodes[i]->run(engine_nodes[i], engine_nodes[i]->data);
> >          VLOG_DBG("input node: %s, state: %s", engine_nodes[i]->name,
> >                   engine_node_state_name[engine_nodes[i]->state]);
> >          if (engine_nodes[i]->state == EN_UPDATED) {
> > diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
> > index bb8df07..5b92971 100644
> > --- a/lib/inc-proc-eng.h
> > +++ b/lib/inc-proc-eng.h
> > @@ -68,6 +68,12 @@ struct engine_context {
> >      struct ovsdb_idl_txn *ovnsb_idl_txn;
> >  };
> >
> > +/* Arguments to be passed to the engine at engine_init(). */
> > +struct engine_arg {
> > +    struct ovsdb_idl *sb_idl;
> > +    struct ovsdb_idl *ovs_idl;
> > +};
> > +
> >  struct engine_node;
> >
> >  struct engine_node_input {
> > @@ -79,7 +85,7 @@ struct engine_node_input {
> >       *  - true: if change can be handled
> >       *  - false: if change cannot be handled (indicating full recompute 
> > needed)
> >       */
> > -    bool (*change_handler)(struct engine_node *node);
> > +    bool (*change_handler)(struct engine_node *node, void *data);
> >  };
> >
> >  enum engine_node_state {
> > @@ -106,30 +112,42 @@ struct engine_node {
> >      /* Inputs of this node. */
> >      struct engine_node_input inputs[ENGINE_MAX_INPUT];
> >
> > -    /* Data of this node. It is vague and interpreted by the related 
> > functions.
> > -     * The content of the data should be changed only by the 
> > change_handlers
> > -     * and run() function of the current node. Users should ensure that the
> > -     * data is read-only in change-handlers of the nodes that depends on 
> > this
> > -     * node. */
> > +    /* A pointer to node internal data. The data is safely accessible to
> > +     * users through the engine_get_data() API. For special cases, when the
> > +     * data is known to be valid (e.g., at init time), users can also call
> > +     * engine_get_internal_data().
> > +     */
> >      void *data;
> >
> >      /* State of the node after the last engine run. */
> >      enum engine_node_state state;
> >
> > -    /* Method to initialize data. It may be NULL. */
> > -    void (*init)(struct engine_node *);
> > +    /* Method to allocate and initialize node data. It may be NULL.
> > +     * The user supplied argument 'arg' is passed from the call to
> > +     * engine_init().
> > +     */
> > +    void *(*init)(struct engine_node *node, struct engine_arg *arg);
> >
> >      /* Method to clean up data. It may be NULL. */
> > -    void (*cleanup)(struct engine_node *);
> > +    void (*cleanup)(void *data);
> >
> >      /* Fully processes all inputs of this node and regenerates the data
> > -     * of this node */
> > -    void (*run)(struct engine_node *);
> > +     * of this node. The pointer to the node's data is passed as argument.
> > +     */
> > +    void (*run)(struct engine_node *node, void *data);
> > +
> > +    /* Method to validate if the 'internal_data' is valid. This allows 
> > users
> > +     * to customize when 'data' can be used (e.g., even if the node
> > +     * hasn't been refreshed in the last iteration, if 'data'
> > +     * doesn't store pointers to DB records it's still safe to use).
> > +     */
> > +    bool (*is_valid)(struct engine_node *);
> >  };
> >
> >  /* Initialize the data for the engine nodes. It calls each node's
> > - * init() method if not NULL. It should be called before the main loop. */
> > -void engine_init(struct engine_node *node);
> > + * init() method if not NULL passing the user supplied 'arg'.
> > + * It should be called before the main loop. */
> > +void engine_init(struct engine_node *node, struct engine_arg *arg);
> >
> >  /* Initialize the engine nodes for a new run. It should be called in the
> >   * main processing loop before every potential engine_run().
> > @@ -155,12 +173,15 @@ bool engine_need_run(void);
> >  struct engine_node * engine_get_input(const char *input_name,
> >                                        struct engine_node *);
> >
> > +/* Get the data from the input node with <name> for <node> */
> > +void *engine_get_input_data(const char *input_name, struct engine_node *);
> > +
> >  /* Add an input (dependency) for <node>, with corresponding change_handler,
> >   * which can be NULL. If the change_handler is NULL, the engine will not
> >   * be able to process the change incrementally, and will fall back to call
> >   * the run method to recompute. */
> >  void engine_add_input(struct engine_node *node, struct engine_node *input,
> > -                      bool (*change_handler)(struct engine_node *));
> > +                      bool (*change_handler)(struct engine_node *, void 
> > *));
> >
> >  /* Force the engine to recompute everything if set to true. It is used
> >   * in circumstances when we are not sure there is change or not, or
> > @@ -185,6 +206,25 @@ bool engine_has_run(void);
> >  /* Returns true if during the last engine run we had to abort processing. 
> > */
> >  bool engine_aborted(void);
> >
> > +/* Return a pointer to node data accessible for users outside the 
> > processing
> > + * engine. If the node data is not valid (e.g., last engine_run() failed or
> > + * didn't happen), the node's is_valid() method is used to determine if the
> > + * data can be safely accessed. If it's not the case, the function returns
> > + * NULL.
> > + * The content of the data should be changed only by the change_handlers
> > + * and run() function of the current node. Users should ensure that the
> > + * data is read-only in change-handlers of the nodes that depends on this
> > + * node.
> > + */
> > +void *engine_get_data(struct engine_node *node);
> > +
> > +/* Return a pointer to node data *without* performing any sanity checks on
> > + * the state of the node. This may be used only in specific cases when data
> > + * is guaranteed to be valid, e.g., immediately after initialization and
> > + * before the first engine_run().
> > + */
> > +void *engine_get_internal_data(struct engine_node *node);
> > +
> >  /* Set the state of the node and log changes. */
> >  #define engine_set_node_state(node, state) \
> >      engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR)
> > @@ -201,30 +241,42 @@ struct ed_type_ovsdb_table {
> >  };
> >
> >  #define EN_OVSDB_GET(NODE) \
> > -    (((struct ed_type_ovsdb_table *)NODE->data)->table)
> > +    (((struct ed_type_ovsdb_table *)(NODE)->data)->table)
> >
> >  struct ovsdb_idl_index * engine_ovsdb_node_get_index(struct engine_node *,
> >                                                       const char *name);
> >
> > +/* Adds an OVSDB IDL index to the node. This should be called only after
> > + * engine_init() as the index is stored in the node data.
> > + */
> >  void engine_ovsdb_node_add_index(struct engine_node *, const char *name,
> >                                   struct ovsdb_idl_index *);
> >
> >  /* Macro to define an engine node. */
> > -#define ENGINE_NODE(NAME, NAME_STR) \
> > +#define ENGINE_NODE_DEF(NAME, NAME_STR) \
> >      struct engine_node en_##NAME = { \
> >          .name = NAME_STR, \
> > -        .data = &ed_##NAME, \
> > +        .data = NULL, \
> >          .state = EN_STALE, \
> >          .init = en_##NAME##_init, \
> >          .run = en_##NAME##_run, \
> >          .cleanup = en_##NAME##_cleanup, \
> > +        .is_valid = en_##NAME##_is_valid, \
> >      };
> >
> > +#define ENGINE_NODE_CUSTOM_DATA(NAME, NAME_STR) \
> > +    ENGINE_NODE_DEF(NAME, NAME_STR)
> > +
> > +#define ENGINE_NODE(NAME, NAME_STR) \
> > +    static bool (*en_##NAME##_is_valid)(struct engine_node *node) = NULL; \
> > +    ENGINE_NODE_DEF(NAME, NAME_STR)
> > +
> >  /* Macro to define member functions of an engine node which represents
> >   * a table of OVSDB */
> >  #define ENGINE_FUNC_OVSDB(DB_NAME, TBL_NAME) \
> >  static void \
> > -en_##DB_NAME##_##TBL_NAME##_run(struct engine_node *node) \
> > +en_##DB_NAME##_##TBL_NAME##_run(struct engine_node *node, \
> > +                                void *data OVS_UNUSED) \
> >  { \
> >      const struct DB_NAME##rec_##TBL_NAME##_table *table = \
> >          EN_OVSDB_GET(node); \
> > @@ -234,10 +286,18 @@ en_##DB_NAME##_##TBL_NAME##_run(struct engine_node 
> > *node) \
> >      } \
> >      engine_set_node_state(node, EN_VALID); \
> >  } \
> > -static void (*en_##DB_NAME##_##TBL_NAME##_init)(struct engine_node *node) \
> > -            = NULL; \
> > -static void (*en_##DB_NAME##_##TBL_NAME##_cleanup)(struct engine_node 
> > *node) \
> > -            = NULL;
> > +static void *en_##DB_NAME##_##TBL_NAME##_init( \
> > +    struct engine_node *node OVS_UNUSED, \
> > +    struct engine_arg *arg) \
> > +{ \
> > +    struct ovsdb_idl *idl = arg->DB_NAME##_idl; \
> > +    struct ed_type_ovsdb_table *data = xzalloc(sizeof *data); \
> > +    data->table = DB_NAME##rec_##TBL_NAME##_table_get(idl); \
> > +    return data; \
> > +} \
> > +static void en_##DB_NAME##_##TBL_NAME##_cleanup(void *data OVS_UNUSED) \
> > +{ \
> > +}
> >
> >  /* Macro to define member functions of an engine node which represents
> >   * a table of OVN SB DB */
> > @@ -250,21 +310,16 @@ static void 
> > (*en_##DB_NAME##_##TBL_NAME##_cleanup)(struct engine_node *node) \
> >      ENGINE_FUNC_OVSDB(ovs, TBL_NAME)
> >
> >  /* Macro to define an engine node which represents a table of OVSDB */
> > -#define ENGINE_NODE_OVSDB(DB_NAME, DB_NAME_STR, TBL_NAME, TBL_NAME_STR, 
> > IDL) \
> > -    struct ed_type_ovsdb_table ed_##DB_NAME##_##TBL_NAME; \
> > -    memset(&ed_##DB_NAME##_##TBL_NAME, 0, sizeof 
> > ed_##DB_NAME##_##TBL_NAME); \
> > -    ovs_assert(IDL); \
> > -    ed_##DB_NAME##_##TBL_NAME.table = \
> > -        DB_NAME##rec_##TBL_NAME##_table_get(IDL); \
> > +#define ENGINE_NODE_OVSDB(DB_NAME, DB_NAME_STR, TBL_NAME, TBL_NAME_STR) \
> >      ENGINE_NODE(DB_NAME##_##TBL_NAME, DB_NAME_STR"_"TBL_NAME_STR)
> >
> >  /* Macro to define an engine node which represents a table of OVN SB DB */
> >  #define ENGINE_NODE_SB(TBL_NAME, TBL_NAME_STR) \
> > -    ENGINE_NODE_OVSDB(sb, "SB", TBL_NAME, TBL_NAME_STR, 
> > ovnsb_idl_loop.idl);
> > +    ENGINE_NODE_OVSDB(sb, "SB", TBL_NAME, TBL_NAME_STR);
> >
> >  /* Macro to define an engine node which represents a table of open_vswitch
> >   * DB */
> >  #define ENGINE_NODE_OVS(TBL_NAME, TBL_NAME_STR) \
> > -    ENGINE_NODE_OVSDB(ovs, "OVS", TBL_NAME, TBL_NAME_STR, 
> > ovs_idl_loop.idl);
> > +    ENGINE_NODE_OVSDB(ovs, "OVS", TBL_NAME, TBL_NAME_STR);
> >
> >  #endif /* lib/inc-proc-eng.h */
> > --
> > 1.8.3.1
> >
>
> Thanks again. I applied this to master.

Great, thanks!

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to