It's similar to the processing we do for address sets.  There's a bit
more mechanics involved due to the fact that we need to split NB port
groups per datapath.

We currently only partially implement incremental processing of
port_group changes in the lflow node.  That is, we deal with the case
when the sets of "switches per port group" doesn't change.  In that
specific case ACL lflows don't need to be reprocessed.

In a synthetic benchmark that created (in this order):
- 500 switches
- 2000 port groups
- 4 ACLs per port group
- 10000 ports distributed equally between the switches and port groups

we measured the following ovn-northd CPU usage:

  +-------------------------+------------+--------------------+
  | Incremental processing? | --wait=sb? | northd avg cpu (%) |
  +-------------------------+------------+--------------------+
  |           N             |     Y      |        84.2        |
  +-------------------------+------------+--------------------+
  |           Y             |     Y      |        41.5        |
  +-------------------------+------------+--------------------+
  |           N             |     N      |        93.2        |
  +-------------------------+------------+--------------------+
  |           Y             |     N      |        53.6        |
  +-------------------------+------------+--------------------+

where '--wait=sb' set to 'Y'  means the benchmark was waiting for the
port and port group operations to be propagated to the Southbound DB
before continuing to the next operation.

Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=2228162
Signed-off-by: Dumitru Ceara <dce...@redhat.com>
---
 northd/en-lflow.c        |   17 ++
 northd/en-lflow.h        |    1 
 northd/en-port-group.c   |  451 ++++++++++++++++++++++++++++++++++++++++------
 northd/en-port-group.h   |   36 +++-
 northd/inc-proc-northd.c |   13 +
 northd/ovn-northd.c      |    4 
 tests/ovn-northd.at      |  246 +++++++++++++++++++++++++
 7 files changed, 708 insertions(+), 60 deletions(-)

diff --git a/northd/en-lflow.c b/northd/en-lflow.c
index 7f6a7872b2..1321f79036 100644
--- a/northd/en-lflow.c
+++ b/northd/en-lflow.c
@@ -119,6 +119,23 @@ lflow_northd_handler(struct engine_node *node,
     return true;
 }
 
+bool
+lflow_port_group_handler(struct engine_node *node, void *data OVS_UNUSED)
+{
+    struct port_group_data *pg_data =
+        engine_get_input_data("port_group", node);
+
+    /* If the set of switches per port group didn't change then there's no
+     * need to reprocess lflows.  Otherwise, there might be a need to add
+     * port-group ACLs to new switches. */
+    if (!pg_data->ls_port_groups_sets_unchanged) {
+        return false;
+    }
+
+    engine_set_node_state(node, EN_UPDATED);
+    return true;
+}
+
 void *en_lflow_init(struct engine_node *node OVS_UNUSED,
                      struct engine_arg *arg OVS_UNUSED)
 {
diff --git a/northd/en-lflow.h b/northd/en-lflow.h
index 5e3fbc25e3..5417b2faff 100644
--- a/northd/en-lflow.h
+++ b/northd/en-lflow.h
@@ -13,5 +13,6 @@ void en_lflow_run(struct engine_node *node, void *data);
 void *en_lflow_init(struct engine_node *node, struct engine_arg *arg);
 void en_lflow_cleanup(void *data);
 bool lflow_northd_handler(struct engine_node *, void *data);
+bool lflow_port_group_handler(struct engine_node *, void *data);
 
 #endif /* EN_LFLOW_H */
diff --git a/northd/en-port-group.c b/northd/en-port-group.c
index 2c36410246..6902695a01 100644
--- a/northd/en-port-group.c
+++ b/northd/en-port-group.c
@@ -33,15 +33,46 @@ static struct ls_port_group *ls_port_group_create(
 static void ls_port_group_destroy(struct ls_port_group_table *,
                                   struct ls_port_group *);
 
+static bool ls_port_group_process(
+    struct ls_port_group_table *,
+    struct port_group_to_ls_table *,
+    const struct hmap *ls_ports,
+    const struct nbrec_port_group *,
+    struct hmapx *updated_ls_port_groups
+);
+
+static void ls_port_group_record_clear(
+    struct ls_port_group_table *,
+    struct port_group_to_ls *,
+    struct hmapx *updated_ls_port_groups);
+static void ls_port_group_record_prune(struct ls_port_group *);
+
 static struct ls_port_group_record *ls_port_group_record_add(
     struct ls_port_group *,
     const struct nbrec_port_group *,
     const char *port_name);
 
+static struct ls_port_group_record *ls_port_group_record_find(
+    struct ls_port_group *, const struct nbrec_port_group *nb_pg);
+
 static void ls_port_group_record_destroy(
     struct ls_port_group *,
     struct ls_port_group_record *);
 
+static struct port_group_to_ls *port_group_to_ls_create(
+    struct port_group_to_ls_table *,
+    const struct nbrec_port_group *);
+static void port_group_to_ls_destroy(struct port_group_to_ls_table *,
+                                     struct port_group_to_ls *);
+
+static void update_sb_port_group(struct sorted_array *nb_ports,
+                                 const struct sbrec_port_group *sb_pg);
+static void sync_port_group(struct ovsdb_idl_txn *, const char *sb_pg_name,
+                            struct sorted_array *ports,
+                            struct shash *sb_port_groups);
+static const struct sbrec_port_group *sb_port_group_lookup_by_name(
+    struct ovsdb_idl_index *sbrec_port_group_by_name, const char *name);
+
 void
 ls_port_group_table_init(struct ls_port_group_table *table)
 {
@@ -82,39 +113,16 @@ ls_port_group_table_find(const struct ls_port_group_table 
*table,
 }
 
 void
-ls_port_group_table_build(struct ls_port_group_table *ls_port_groups,
-                          const struct nbrec_port_group_table *pg_table,
-                          const struct hmap *ls_ports)
+ls_port_group_table_build(
+    struct ls_port_group_table *ls_port_groups,
+    struct port_group_to_ls_table *port_group_to_switches,
+    const struct nbrec_port_group_table *pg_table,
+    const struct hmap *ls_ports)
 {
     const struct nbrec_port_group *nb_pg;
     NBREC_PORT_GROUP_TABLE_FOR_EACH (nb_pg, pg_table) {
-        for (size_t i = 0; i < nb_pg->n_ports; i++) {
-            const char *port_name = nb_pg->ports[i]->name;
-            const struct ovn_datapath *od =
-                northd_get_datapath_for_port(ls_ports, port_name);
-
-            if (!od) {
-                static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
-                VLOG_ERR_RL(&rl, "lport %s in port group %s not found.",
-                            port_name, nb_pg->name);
-                continue;
-            }
-
-            if (!od->nbs) {
-                static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
-                VLOG_WARN_RL(&rl, "lport %s in port group %s has no lswitch.",
-                             nb_pg->ports[i]->name,
-                             nb_pg->name);
-                continue;
-            }
-
-            struct ls_port_group *ls_pg =
-                ls_port_group_table_find(ls_port_groups, od->nbs);
-            if (!ls_pg) {
-                ls_pg = ls_port_group_create(ls_port_groups, od->nbs, od->sb);
-            }
-            ls_port_group_record_add(ls_pg, nb_pg, port_name);
-        }
+        ls_port_group_process(ls_port_groups, port_group_to_switches,
+                              ls_ports, nb_pg, NULL);
     }
 }
 
@@ -145,18 +153,11 @@ ls_port_group_table_sync(
             get_sb_port_group_name(ls_pg_rec->nb_pg->name,
                                    ls_pg->sb_datapath_key,
                                    &sb_name);
-            sb_port_group = shash_find_and_delete(&sb_port_groups,
-                                                  ds_cstr(&sb_name));
-            if (!sb_port_group) {
-                sb_port_group = sbrec_port_group_insert(ovnsb_txn);
-                sbrec_port_group_set_name(sb_port_group, ds_cstr(&sb_name));
-            }
-
-            const char **nb_port_names = sset_array(&ls_pg_rec->ports);
-            sbrec_port_group_set_ports(sb_port_group,
-                                       nb_port_names,
-                                       sset_count(&ls_pg_rec->ports));
-            free(nb_port_names);
+            struct sorted_array ports =
+                sorted_array_from_sset(&ls_pg_rec->ports);
+            sync_port_group(ovnsb_txn, ds_cstr(&sb_name),
+                            &ports, &sb_port_groups);
+            sorted_array_destroy(&ports);
         }
     }
     ds_destroy(&sb_name);
@@ -201,31 +202,165 @@ ls_port_group_destroy(struct ls_port_group_table 
*ls_port_groups,
     }
 }
 
+/* Process a NB.Port_Group record and stores any updated ls_port_groups
+ * in updated_ls_port_groups.  Returns true if a new ls_port_group had
+ * to be created or destroyed.
+ */
+static bool
+ls_port_group_process(struct ls_port_group_table *ls_port_groups,
+                      struct port_group_to_ls_table *port_group_to_switches,
+                      const struct hmap *ls_ports,
+                      const struct nbrec_port_group *nb_pg,
+                      struct hmapx *updated_ls_port_groups)
+{
+    struct hmapx cleared_ls_port_groups =
+        HMAPX_INITIALIZER(&cleared_ls_port_groups);
+    bool ls_port_group_created = false;
+
+    struct port_group_to_ls *pg_ls =
+        port_group_to_ls_table_find(port_group_to_switches, nb_pg);
+    if (!pg_ls) {
+        pg_ls = port_group_to_ls_create(port_group_to_switches, nb_pg);
+    } else {
+        /* Clear all old records corresponding to this port group; we'll
+         * reprocess it below. */
+        ls_port_group_record_clear(ls_port_groups, pg_ls,
+                                   &cleared_ls_port_groups);
+    }
+
+    for (size_t i = 0; i < nb_pg->n_ports; i++) {
+        const char *port_name = nb_pg->ports[i]->name;
+        const struct ovn_datapath *od =
+            northd_get_datapath_for_port(ls_ports, port_name);
+
+        if (!od) {
+            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
+            VLOG_ERR_RL(&rl, "lport %s in port group %s not found.",
+                        port_name, nb_pg->name);
+            continue;
+        }
+
+        if (!od->nbs) {
+            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
+            VLOG_WARN_RL(&rl, "lport %s in port group %s has no lswitch.",
+                         nb_pg->ports[i]->name,
+                         nb_pg->name);
+            continue;
+        }
+
+        struct ls_port_group *ls_pg =
+            ls_port_group_table_find(ls_port_groups, od->nbs);
+        if (!ls_pg) {
+            ls_pg = ls_port_group_create(ls_port_groups, od->nbs, od->sb);
+            ls_port_group_created = true;
+        }
+        ls_port_group_record_add(ls_pg, nb_pg, port_name);
+        hmapx_add(&pg_ls->switches,
+                  CONST_CAST(struct nbrec_logical_switch *, od->nbs));
+        if (updated_ls_port_groups) {
+            hmapx_add(updated_ls_port_groups, ls_pg);
+        }
+    }
+
+    bool ls_port_group_destroyed = false;
+    struct hmapx_node *node;
+    HMAPX_FOR_EACH (node, &cleared_ls_port_groups) {
+        struct ls_port_group *ls_pg = node->data;
+
+        ls_port_group_record_prune(ls_pg);
+
+        if (hmap_is_empty(&ls_pg->nb_pgs)) {
+            ls_port_group_destroy(ls_port_groups, ls_pg);
+            ls_port_group_destroyed = true;
+        }
+    }
+    hmapx_destroy(&cleared_ls_port_groups);
+
+    return ls_port_group_created || ls_port_group_destroyed;
+}
+
+/* Destroys all the struct ls_port_group_record that might be associated to
+ * northbound database logical switches.  Stores ls_port_groups that became
+ * were updated in the 'updated_ls_port_groups' map.
+ */
+static void
+ls_port_group_record_clear(struct ls_port_group_table *ls_port_groups,
+                           struct port_group_to_ls *pg_ls,
+                           struct hmapx *cleared_ls_port_groups)
+{
+    struct hmapx_node *node;
+
+    HMAPX_FOR_EACH (node, &pg_ls->switches) {
+        const struct nbrec_logical_switch *nbs = node->data;
+
+        struct ls_port_group *ls_pg =
+            ls_port_group_table_find(ls_port_groups, nbs);
+        if (!ls_pg) {
+            continue;
+        }
+
+        /* Clear ports in the port group record. */
+        struct ls_port_group_record *ls_pg_rec =
+            ls_port_group_record_find(ls_pg, pg_ls->nb_pg);
+        if (!ls_pg_rec) {
+            continue;
+        }
+
+        sset_clear(&ls_pg_rec->ports);
+        hmapx_add(cleared_ls_port_groups, ls_pg);
+    }
+}
+
+static void
+ls_port_group_record_prune(struct ls_port_group *ls_pg)
+{
+    struct ls_port_group_record *ls_pg_rec;
+
+    HMAP_FOR_EACH_SAFE (ls_pg_rec, key_node, &ls_pg->nb_pgs) {
+        if (sset_is_empty(&ls_pg_rec->ports)) {
+            ls_port_group_record_destroy(ls_pg, ls_pg_rec);
+        }
+    }
+}
+
 static struct ls_port_group_record *
 ls_port_group_record_add(struct ls_port_group *ls_pg,
                          const struct nbrec_port_group *nb_pg,
                          const char *port_name)
 {
-    struct ls_port_group_record *ls_pg_rec = NULL;
+    struct ls_port_group_record *ls_pg_rec =
+        ls_port_group_record_find(ls_pg, nb_pg);
     size_t hash = uuid_hash(&nb_pg->header_.uuid);
 
-    HMAP_FOR_EACH_WITH_HASH (ls_pg_rec, key_node, hash, &ls_pg->nb_pgs) {
-        if (ls_pg_rec->nb_pg == nb_pg) {
-            goto done;
-        }
+    if (!ls_pg_rec) {
+        ls_pg_rec = xzalloc(sizeof *ls_pg_rec);
+        *ls_pg_rec = (struct ls_port_group_record) {
+            .nb_pg = nb_pg,
+            .ports = SSET_INITIALIZER(&ls_pg_rec->ports),
+        };
+        hmap_insert(&ls_pg->nb_pgs, &ls_pg_rec->key_node, hash);
     }
 
-    ls_pg_rec = xzalloc(sizeof *ls_pg_rec);
-    *ls_pg_rec = (struct ls_port_group_record) {
-        .nb_pg = nb_pg,
-        .ports = SSET_INITIALIZER(&ls_pg_rec->ports),
-    };
-    hmap_insert(&ls_pg->nb_pgs, &ls_pg_rec->key_node, hash);
-done:
     sset_add(&ls_pg_rec->ports, port_name);
     return ls_pg_rec;
 }
 
+static struct ls_port_group_record *
+ls_port_group_record_find(struct ls_port_group *ls_pg,
+                          const struct nbrec_port_group *nb_pg)
+{
+    size_t hash = uuid_hash(&nb_pg->header_.uuid);
+    struct ls_port_group_record *ls_pg_rec;
+
+    HMAP_FOR_EACH_WITH_HASH (ls_pg_rec, key_node, hash, &ls_pg->nb_pgs) {
+        if (ls_pg_rec->nb_pg == nb_pg) {
+            return ls_pg_rec;
+        }
+    }
+    return NULL;
+}
+
+
 static void
 ls_port_group_record_destroy(struct ls_port_group *ls_pg,
                              struct ls_port_group_record *ls_pg_rec)
@@ -237,6 +372,71 @@ ls_port_group_record_destroy(struct ls_port_group *ls_pg,
     }
 }
 
+void
+port_group_to_ls_table_init(struct port_group_to_ls_table *table)
+{
+    *table = (struct port_group_to_ls_table) {
+        .entries = HMAP_INITIALIZER(&table->entries),
+    };
+}
+
+void
+port_group_to_ls_table_clear(struct port_group_to_ls_table *table)
+{
+    struct port_group_to_ls *pg_ls;
+    HMAP_FOR_EACH_SAFE (pg_ls, key_node, &table->entries) {
+        port_group_to_ls_destroy(table, pg_ls);
+    }
+}
+
+void
+port_group_to_ls_table_destroy(struct port_group_to_ls_table *table)
+{
+    port_group_to_ls_table_clear(table);
+    hmap_destroy(&table->entries);
+}
+
+struct port_group_to_ls *
+port_group_to_ls_table_find(const struct port_group_to_ls_table *table,
+                            const struct nbrec_port_group *nb_pg)
+{
+    struct port_group_to_ls *pg_ls;
+
+    HMAP_FOR_EACH_WITH_HASH (pg_ls, key_node, uuid_hash(&nb_pg->header_.uuid),
+                             &table->entries) {
+        if (nb_pg == pg_ls->nb_pg) {
+            return pg_ls;
+        }
+    }
+    return NULL;
+}
+
+static struct port_group_to_ls *
+port_group_to_ls_create(struct port_group_to_ls_table *table,
+                        const struct nbrec_port_group *nb_pg)
+{
+    struct port_group_to_ls *pg_ls = xmalloc(sizeof *pg_ls);
+
+    *pg_ls = (struct port_group_to_ls) {
+        .nb_pg = nb_pg,
+        .switches = HMAPX_INITIALIZER(&pg_ls->switches),
+    };
+    hmap_insert(&table->entries, &pg_ls->key_node,
+                uuid_hash(&nb_pg->header_.uuid));
+    return pg_ls;
+}
+
+static void
+port_group_to_ls_destroy(struct port_group_to_ls_table *table,
+                         struct port_group_to_ls *pg_ls)
+{
+    if (pg_ls) {
+        hmapx_destroy(&pg_ls->switches);
+        hmap_remove(&table->entries, &pg_ls->key_node);
+        free(pg_ls);
+    }
+}
+
 /* Incremental processing implementation. */
 static struct port_group_input
 port_group_get_input_data(struct engine_node *node)
@@ -259,6 +459,7 @@ en_port_group_init(struct engine_node *node OVS_UNUSED,
     struct port_group_data *pg_data = xmalloc(sizeof *pg_data);
 
     ls_port_group_table_init(&pg_data->ls_port_groups);
+    port_group_to_ls_table_init(&pg_data->port_groups_to_ls);
     return pg_data;
 }
 
@@ -268,6 +469,15 @@ en_port_group_cleanup(void *data_)
     struct port_group_data *data = data_;
 
     ls_port_group_table_destroy(&data->ls_port_groups);
+    port_group_to_ls_table_destroy(&data->port_groups_to_ls);
+}
+
+void
+en_port_group_clear_tracked_data(void *data_)
+{
+    struct port_group_data *data = data_;
+
+    data->ls_port_groups_sets_unchanged = false;
 }
 
 void
@@ -280,7 +490,10 @@ en_port_group_run(struct engine_node *node, void *data_)
     stopwatch_start(PORT_GROUP_RUN_STOPWATCH_NAME, time_msec());
 
     ls_port_group_table_clear(&data->ls_port_groups);
+    port_group_to_ls_table_clear(&data->port_groups_to_ls);
+
     ls_port_group_table_build(&data->ls_port_groups,
+                              &data->port_groups_to_ls,
                               input_data.nbrec_port_group_table,
                               input_data.ls_ports);
 
@@ -291,3 +504,133 @@ en_port_group_run(struct engine_node *node, void *data_)
     stopwatch_stop(PORT_GROUP_RUN_STOPWATCH_NAME, time_msec());
     engine_set_node_state(node, EN_UPDATED);
 }
+
+bool
+port_group_nb_port_group_handler(struct engine_node *node, void *data_)
+{
+    struct port_group_input input_data = port_group_get_input_data(node);
+    struct port_group_data *data = data_;
+    bool success = true;
+
+    const struct nbrec_port_group_table *nb_pg_table =
+        EN_OVSDB_GET(engine_get_input("NB_port_group", node));
+    const struct nbrec_port_group *nb_pg;
+
+    /* Return false if a port group is created or deleted.
+     * Handle I-P for only updated port groups. */
+    NBREC_PORT_GROUP_TABLE_FOR_EACH_TRACKED (nb_pg, nb_pg_table) {
+        if (nbrec_port_group_is_new(nb_pg) ||
+                nbrec_port_group_is_deleted(nb_pg)) {
+            return false;
+        }
+    }
+
+    struct hmapx updated_ls_port_groups =
+        HMAPX_INITIALIZER(&updated_ls_port_groups);
+
+    NBREC_PORT_GROUP_TABLE_FOR_EACH_TRACKED (nb_pg, nb_pg_table) {
+        /* Newly created port groups can't be incrementally processed;
+         * the rest yes. */
+        if (ls_port_group_process(&data->ls_port_groups,
+                                  &data->port_groups_to_ls,
+                                  input_data.ls_ports,
+                                  nb_pg, &updated_ls_port_groups)) {
+            success = false;
+            break;
+        }
+    }
+
+    /* If changes have been successfully processed incrementally then update
+     * the SB too. */
+    if (success) {
+        struct ovsdb_idl_index *sbrec_port_group_by_name =
+            engine_ovsdb_node_get_index(
+                    engine_get_input("SB_port_group", node),
+                    "sbrec_port_group_by_name");
+        struct ds sb_pg_name = DS_EMPTY_INITIALIZER;
+
+        struct hmapx_node *updated_node;
+        HMAPX_FOR_EACH (updated_node, &updated_ls_port_groups) {
+            const struct ls_port_group *ls_pg = updated_node->data;
+            struct ls_port_group_record *ls_pg_rec;
+
+            HMAP_FOR_EACH (ls_pg_rec, key_node, &ls_pg->nb_pgs) {
+                get_sb_port_group_name(ls_pg_rec->nb_pg->name,
+                                        ls_pg->sb_datapath_key,
+                                        &sb_pg_name);
+
+                const struct sbrec_port_group *sb_pg =
+                    sb_port_group_lookup_by_name(sbrec_port_group_by_name,
+                                                 ds_cstr(&sb_pg_name));
+                if (!sb_pg) {
+                    success = false;
+                    break;
+                }
+                struct sorted_array nb_ports =
+                    sorted_array_from_sset(&ls_pg_rec->ports);
+                update_sb_port_group(&nb_ports, sb_pg);
+                sorted_array_destroy(&nb_ports);
+            }
+        }
+        ds_destroy(&sb_pg_name);
+    }
+
+    data->ls_port_groups_sets_unchanged = success;
+    engine_set_node_state(node, EN_UPDATED);
+    hmapx_destroy(&updated_ls_port_groups);
+    return success;
+}
+
+static void
+sb_port_group_apply_diff(const void *arg, const char *item, bool add)
+{
+    const struct sbrec_port_group *pg = arg;
+    if (add) {
+        sbrec_port_group_update_ports_addvalue(pg, item);
+    } else {
+        sbrec_port_group_update_ports_delvalue(pg, item);
+    }
+}
+
+static void
+update_sb_port_group(struct sorted_array *nb_ports,
+                     const struct sbrec_port_group *sb_pg)
+{
+    struct sorted_array sb_ports = sorted_array_from_dbrec(sb_pg, ports);
+    sorted_array_apply_diff(nb_ports, &sb_ports,
+                            sb_port_group_apply_diff, sb_pg);
+    sorted_array_destroy(&sb_ports);
+}
+
+static void
+sync_port_group(struct ovsdb_idl_txn *ovnsb_txn, const char *sb_pg_name,
+                struct sorted_array *ports,
+                struct shash *sb_port_groups)
+{
+    const struct sbrec_port_group *sb_port_group =
+        shash_find_and_delete(sb_port_groups, sb_pg_name);
+    if (!sb_port_group) {
+        sb_port_group = sbrec_port_group_insert(ovnsb_txn);
+        sbrec_port_group_set_name(sb_port_group, sb_pg_name);
+        sbrec_port_group_set_ports(sb_port_group, ports->arr, ports->n);
+    } else {
+        update_sb_port_group(ports, sb_port_group);
+    }
+}
+
+/* Finds and returns the port group set with the given 'name', or NULL
+ * if no such port group exists. */
+static const struct sbrec_port_group *
+sb_port_group_lookup_by_name(struct ovsdb_idl_index *sbrec_port_group_by_name,
+                             const char *name)
+{
+    struct sbrec_port_group *target = sbrec_port_group_index_init_row(
+        sbrec_port_group_by_name);
+    sbrec_port_group_index_set_name(target, name);
+
+    struct sbrec_port_group *retval = sbrec_port_group_index_find(
+        sbrec_port_group_by_name, target);
+
+    sbrec_port_group_index_destroy_row(target);
+    return retval;
+}
diff --git a/northd/en-port-group.h b/northd/en-port-group.h
index 5cbf6c6c4a..c3975f64ee 100644
--- a/northd/en-port-group.h
+++ b/northd/en-port-group.h
@@ -18,6 +18,7 @@
 
 #include <stdint.h>
 
+#include "lib/hmapx.h"
 #include "lib/inc-proc-eng.h"
 #include "lib/ovn-nb-idl.h"
 #include "lib/ovn-sb-idl.h"
@@ -54,9 +55,33 @@ struct ls_port_group *ls_port_group_table_find(
     const struct ls_port_group_table *,
     const struct nbrec_logical_switch *);
 
-void ls_port_group_table_build(struct ls_port_group_table *ls_port_groups,
-                               const struct nbrec_port_group_table *,
-                               const struct hmap *ls_ports);
+/* Per port group map of datapaths with ports in the group. */
+struct port_group_to_ls_table {
+    struct hmap entries; /* Stores struct port_group_to_ls. */
+};
+
+struct port_group_to_ls {
+    struct hmap_node key_node; /* Index on 'pg->header_.uuid'. */
+
+    const struct nbrec_port_group *nb_pg;
+
+    /* Map of 'struct nbrec_logical_switch *' with ports in the group. */
+    struct hmapx switches;
+};
+
+void port_group_to_ls_table_init(struct port_group_to_ls_table *);
+void port_group_to_ls_table_clear(struct port_group_to_ls_table *);
+void port_group_to_ls_table_destroy(struct port_group_to_ls_table *);
+
+struct port_group_to_ls *port_group_to_ls_table_find(
+    const struct port_group_to_ls_table *,
+    const struct nbrec_port_group *);
+
+void ls_port_group_table_build(
+    struct ls_port_group_table *ls_port_groups,
+    struct port_group_to_ls_table *port_group_to_switches,
+    const struct nbrec_port_group_table *,
+    const struct hmap *ls_ports);
 void ls_port_group_table_sync(const struct ls_port_group_table *ls_port_groups,
                               const struct sbrec_port_group_table *,
                               struct ovsdb_idl_txn *ovnsb_txn);
@@ -75,10 +100,15 @@ struct port_group_input {
 
 struct port_group_data {
     struct ls_port_group_table ls_port_groups;
+    struct port_group_to_ls_table port_groups_to_ls;
+    bool ls_port_groups_sets_unchanged;
 };
 
 void *en_port_group_init(struct engine_node *, struct engine_arg *);
 void en_port_group_cleanup(void *data);
+void en_port_group_clear_tracked_data(void *data);
 void en_port_group_run(struct engine_node *, void *data);
 
+bool port_group_nb_port_group_handler(struct engine_node *, void *data);
+
 #endif /* EN_PORT_GROUP_H */
diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c
index 6d5f9e8d16..bd598ba5e2 100644
--- a/northd/inc-proc-northd.c
+++ b/northd/inc-proc-northd.c
@@ -137,7 +137,7 @@ static ENGINE_NODE(mac_binding_aging_waker, 
"mac_binding_aging_waker");
 static ENGINE_NODE(northd_output, "northd_output");
 static ENGINE_NODE(sync_to_sb, "sync_to_sb");
 static ENGINE_NODE(sync_to_sb_addr_set, "sync_to_sb_addr_set");
-static ENGINE_NODE(port_group, "port_group");
+static ENGINE_NODE_WITH_CLEAR_TRACK_DATA(port_group, "port_group");
 static ENGINE_NODE(fdb_aging, "fdb_aging");
 static ENGINE_NODE(fdb_aging_waker, "fdb_aging_waker");
 
@@ -193,7 +193,7 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
     engine_add_input(&en_lflow, &en_sb_multicast_group, NULL);
     engine_add_input(&en_lflow, &en_sb_igmp_group, NULL);
     engine_add_input(&en_lflow, &en_northd, lflow_northd_handler);
-    engine_add_input(&en_lflow, &en_port_group, NULL);
+    engine_add_input(&en_lflow, &en_port_group, lflow_port_group_handler);
 
     engine_add_input(&en_sync_to_sb_addr_set, &en_nb_address_set,
                      sync_to_sb_addr_set_nb_address_set_handler);
@@ -202,7 +202,8 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
     engine_add_input(&en_sync_to_sb_addr_set, &en_northd, NULL);
     engine_add_input(&en_sync_to_sb_addr_set, &en_sb_address_set, NULL);
 
-    engine_add_input(&en_port_group, &en_nb_port_group, NULL);
+    engine_add_input(&en_port_group, &en_nb_port_group,
+                     port_group_nb_port_group_handler);
     engine_add_input(&en_port_group, &en_sb_port_group, NULL);
     /* No need for an explicit handler for northd changes.  Port changes
      * that affect port_groups trigger updates to the NB.Port_Group
@@ -287,6 +288,12 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
                                 "sbrec_address_set_by_name",
                                 sbrec_address_set_by_name);
 
+    struct ovsdb_idl_index *sbrec_port_group_by_name
+        = ovsdb_idl_index_create1(sb->idl, &sbrec_port_group_col_name);
+    engine_ovsdb_node_add_index(&en_sb_port_group,
+                                "sbrec_port_group_by_name",
+                                sbrec_port_group_by_name);
+
     struct ovsdb_idl_index *sbrec_fdb_by_dp_and_port
         = ovsdb_idl_index_create2(sb->idl, &sbrec_fdb_col_dp_key,
                                   &sbrec_fdb_col_port_key);
diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
index 4fa1b039ea..44385d604c 100644
--- a/northd/ovn-northd.c
+++ b/northd/ovn-northd.c
@@ -836,6 +836,10 @@ main(int argc, char *argv[])
         ovsdb_idl_omit_alert(ovnsb_idl_loop.idl,
                              &sbrec_multicast_group_columns[i]);
     }
+    for (size_t i = 0; i < SBREC_PORT_GROUP_N_COLUMNS; i++) {
+        ovsdb_idl_omit_alert(ovnsb_idl_loop.idl,
+                             &sbrec_port_group_columns[i]);
+    }
 
     unixctl_command_register("sb-connection-status", "", 0, 0,
                              ovn_conn_show, ovnsb_idl_loop.idl);
diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
index 1a12513d7a..a04ba2b23f 100644
--- a/tests/ovn-northd.at
+++ b/tests/ovn-northd.at
@@ -8936,6 +8936,252 @@ AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE 
inc-engine/show-stats sync_to_sb_a
 AT_CLEANUP
 ])
 
+OVN_FOR_EACH_NORTHD_NO_HV([
+AT_SETUP([Port group incremental processing])
+ovn_start
+
+check ovn-nbctl ls-add sw1 \
+  -- lsp-add sw1 sw1.1     \
+  -- lsp-add sw1 sw1.2     \
+  -- lsp-add sw1 sw1.3     \
+  -- ls-add sw2            \
+  -- lsp-add sw2 sw2.1     \
+  -- lsp-add sw2 sw2.2     \
+  -- lsp-add sw2 sw2.3
+
+check ovn-nbctl --wait=sb sync
+sw1_key=$(fetch_column Datapath_Binding tunnel_key external_ids:name=sw1)
+sw2_key=$(fetch_column Datapath_Binding tunnel_key external_ids:name=sw2)
+
+check_acl_lflows() {
+AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw1 | grep ls_in_acl_eval | grep 
eth.src==41:41:41:41:41:41 -c], [ignore], [dnl
+$1
+])
+AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw1 | grep ls_in_acl_eval | grep 
eth.src==42:42:42:42:42:42 -c], [ignore], [dnl
+$2
+])
+AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw2 | grep ls_in_acl_eval | grep 
eth.src==41:41:41:41:41:41 -c], [ignore], [dnl
+$3
+])
+AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw2 | grep ls_in_acl_eval | grep 
eth.src==42:42:42:42:42:42 -c], [ignore], [dnl
+$4
+])
+}
+
+AS_BOX([Create new PG1 and PG2])
+check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
+check ovn-nbctl --wait=sb -- pg-add pg1 -- pg-add pg2
+dnl The northd node should not recompute, it should handle nb_global update
+dnl though, therefore "compute: 1".
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats northd], 
[0], [dnl
+Node: northd
+- recompute:            0
+- compute:              1
+- abort:                0
+])
+dnl The port_group node recomputes every time a NB port group is added/deleted.
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats 
port_group], [0], [dnl
+Node: port_group
+- recompute:            1
+- compute:              0
+- abort:                0
+])
+dnl The port_group node is an input for the lflow node.  Port_group
+dnl recompute/compute triggers lflow recompute.
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats lflow], 
[0], [dnl
+Node: lflow
+- recompute:            1
+- compute:              0
+- abort:                0
+])
+
+AS_BOX([Add ACLs on PG1 and PG2])
+check ovn-nbctl --wait=sb             \
+  -- acl-add pg1 from-lport 1 eth.src==41:41:41:41:41:41 allow \
+  -- acl-add pg2 from-lport 1 eth.src==42:42:42:42:42:42 allow
+
+AS_BOX([Add one port from the two switches to PG1])
+check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
+check ovn-nbctl --wait=sb         \
+  -- pg-set-ports pg1 sw1.1 sw2.1
+check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
+check_column "sw2.1" sb:Port_Group ports name="${sw2_key}_pg1"
+
+dnl The northd node should not recompute, it should handle nb_global update
+dnl though, therefore "compute: 1".
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats northd], 
[0], [dnl
+Node: northd
+- recompute:            0
+- compute:              1
+- abort:                0
+])
+dnl The port_group node recomputes also every time a port from a new switch
+dnl is added to the group.
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats 
port_group], [0], [dnl
+Node: port_group
+- recompute:            1
+- compute:              0
+- abort:                0
+])
+dnl The port_group node is an input for the lflow node.  Port_group
+dnl recompute/compute triggers lflow recompute.
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats lflow], 
[0], [dnl
+Node: lflow
+- recompute:            1
+- compute:              0
+- abort:                0
+])
+dnl Expect ACL1 on sw1 and sw2
+check_acl_lflows 1 0 1 0
+
+AS_BOX([Add one port from the two switches to PG2])
+check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
+check ovn-nbctl --wait=sb \
+  -- pg-set-ports pg2 sw1.2 sw2.2
+check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
+check_column "sw2.1" sb:Port_Group ports name="${sw2_key}_pg1"
+check_column "sw1.2" sb:Port_Group ports name="${sw1_key}_pg2"
+check_column "sw2.2" sb:Port_Group ports name="${sw2_key}_pg2"
+
+dnl The northd node should not recompute, it should handle nb_global update
+dnl though, therefore "compute: 1".
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats northd], 
[0], [dnl
+Node: northd
+- recompute:            0
+- compute:              1
+- abort:                0
+])
+dnl The port_group node recomputes also every time a port from a new switch
+dnl is added to the group.
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats 
port_group], [0], [dnl
+Node: port_group
+- recompute:            1
+- compute:              0
+- abort:                0
+])
+dnl The port_group node is an input for the lflow node.  Port_group
+dnl recompute/compute triggers lflow recompute (for ACLs).
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats lflow], 
[0], [dnl
+Node: lflow
+- recompute:            1
+- compute:              0
+- abort:                0
+])
+dnl Expect both ACLs on sw1 and sw2
+check_acl_lflows 1 1 1 1
+
+AS_BOX([Add one more port from the two switches to PG1 and PG2])
+check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
+check ovn-nbctl --wait=sb                     \
+  -- pg-set-ports pg1 sw1.1 sw2.1 sw1.3 sw2.3 \
+  -- pg-set-ports pg2 sw1.2 sw2.2 sw1.3 sw2.3
+check_column "sw1.1 sw1.3" sb:Port_Group ports name="${sw1_key}_pg1"
+check_column "sw2.1 sw2.3" sb:Port_Group ports name="${sw2_key}_pg1"
+check_column "sw1.2 sw1.3" sb:Port_Group ports name="${sw1_key}_pg2"
+check_column "sw2.2 sw2.3" sb:Port_Group ports name="${sw2_key}_pg2"
+
+dnl The northd node should not recompute, it should handle nb_global update
+dnl though, therefore "compute: 1".
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats northd], 
[0], [dnl
+Node: northd
+- recompute:            0
+- compute:              1
+- abort:                0
+])
+dnl We did not change the set of switches a pg is applied to, there should be
+dnl no recompute.
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats 
port_group], [0], [dnl
+Node: port_group
+- recompute:            0
+- compute:              1
+- abort:                0
+])
+dnl We did not change the set of switches a pg is applied to, there should be
+dnl no recompute.
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats lflow], 
[0], [dnl
+Node: lflow
+- recompute:            0
+- compute:              1
+- abort:                0
+])
+dnl Expect both ACLs on sw1 and sw2
+check_acl_lflows 1 1 1 1
+
+AS_BOX([Remove the last port from PG1 and PG2])
+check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
+check ovn-nbctl --wait=sb         \
+  -- pg-set-ports pg1 sw1.1 sw2.1 \
+  -- pg-set-ports pg2 sw1.2 sw2.2
+check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
+check_column "sw2.1" sb:Port_Group ports name="${sw2_key}_pg1"
+check_column "sw1.2" sb:Port_Group ports name="${sw1_key}_pg2"
+check_column "sw2.2" sb:Port_Group ports name="${sw2_key}_pg2"
+
+dnl The northd node should not recompute, it should handle nb_global update
+dnl though, therefore "compute: 1".
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats northd], 
[0], [dnl
+Node: northd
+- recompute:            0
+- compute:              1
+- abort:                0
+])
+dnl We did not change the set of switches a pg is applied to, there should be
+dnl no recompute.
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats 
port_group], [0], [dnl
+Node: port_group
+- recompute:            0
+- compute:              1
+- abort:                0
+])
+dnl We did not change the set of switches a pg is applied to, there should be
+dnl no recompute.
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats lflow], 
[0], [dnl
+Node: lflow
+- recompute:            0
+- compute:              1
+- abort:                0
+])
+dnl Expect both ACLs on sw1 and sw2
+check_acl_lflows 1 1 1 1
+
+AS_BOX([Remove the second port from PG1 and PG2])
+check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
+check ovn-nbctl --wait=sb         \
+  -- pg-set-ports pg1 sw1.1 \
+  -- pg-set-ports pg2 sw1.2
+check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
+check_column "sw1.2" sb:Port_Group ports name="${sw1_key}_pg2"
+
+dnl The northd node should not recompute, it should handle nb_global update
+dnl though, therefore "compute: 1".
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats northd], 
[0], [dnl
+Node: northd
+- recompute:            0
+- compute:              1
+- abort:                0
+])
+dnl We did changed the set of switches a pg is applied to, there should be
+dnl a recompute.
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats 
port_group], [0], [dnl
+Node: port_group
+- recompute:            1
+- compute:              0
+- abort:                0
+])
+dnl We did changed the set of switches a pg is applied to, there should be
+dnl a recompute (for ACLs).
+AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats lflow], 
[0], [dnl
+Node: lflow
+- recompute:            1
+- compute:              0
+- abort:                0
+])
+dnl Expect both ACLs on sw1 and not on sw2.
+check_acl_lflows 1 1 0 0
+
+AT_CLEANUP
+])
+
 OVN_FOR_EACH_NORTHD([
 AT_SETUP([Check default drop])
 AT_KEYWORDS([drop])


_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to