From: Anton Ivanov <anton.iva...@cambridgegreys.com>

1. Make pool size user defineable.
2. Expose pool destruction.
3. Make pools resizeable at runtime.
4. Split pool start and completion to allow background execution.
5. Add a simplified API for SAFE walking single hash.

Signed-off-by: Anton Ivanov <anton.iva...@cambridgegreys.com>
---
 lib/ovn-parallel-hmap.c | 290 +++++++++++++++++++++++++++++++---------
 lib/ovn-parallel-hmap.h |  77 ++++++++++-
 northd/northd.c         |  72 +++-------
 3 files changed, 321 insertions(+), 118 deletions(-)

diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
index b8c7ac786..1b3883441 100644
--- a/lib/ovn-parallel-hmap.c
+++ b/lib/ovn-parallel-hmap.c
@@ -51,7 +51,6 @@ static bool can_parallelize = false;
  * accompanied by a fence. It does not need to be atomic or be
  * accessed under a lock.
  */
-static bool workers_must_exit = false;
 
 static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
 
@@ -70,10 +69,27 @@ static void merge_hash_results(struct worker_pool *pool 
OVS_UNUSED,
                                void *fin_result, void *result_frags,
                                int index);
 
+
+static bool init_control(struct worker_control *control, int id,
+                         struct worker_pool *pool);
+
+static void cleanup_control(struct worker_pool *pool, int id);
+
+static void free_controls(struct worker_pool *pool);
+
+static struct worker_control *alloc_controls(int size);
+
+static void *standard_helper_thread(void *arg);
+
+struct worker_pool *ovn_add_standard_pool(int size)
+{
+    return add_worker_pool(standard_helper_thread, size);
+}
+
 bool
-ovn_stop_parallel_processing(void)
+ovn_stop_parallel_processing(struct worker_pool *pool)
 {
-    return workers_must_exit;
+    return pool->workers_must_exit;
 }
 
 bool
@@ -92,11 +108,67 @@ ovn_can_parallelize_hashes(bool force_parallel)
     return can_parallelize;
 }
 
+
+void
+destroy_pool(struct worker_pool *pool) {
+    char sem_name[256];
+
+    free_controls(pool);
+    sem_close(pool->done);
+    sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
+    sem_unlink(sem_name);
+    free(pool);
+}
+
+bool
+ovn_resize_pool(struct worker_pool *pool, int size)
+{
+    int i;
+
+    ovs_assert(pool != NULL);
+
+    if (!size) {
+        size = pool_size;
+    }
+
+    ovs_mutex_lock(&init_mutex);
+
+    if (can_parallelize) {
+        free_controls(pool);
+        pool->size = size;
+
+        /* Allocate new control structures. */
+
+        pool->controls = alloc_controls(size);
+        pool->workers_must_exit = false;
+
+        for (i = 0; i < pool->size; i++) {
+            if (! init_control(&pool->controls[i], i, pool)) {
+                goto cleanup;
+            }
+        }
+    }
+    ovs_mutex_unlock(&init_mutex);
+    return true;
+cleanup:
+
+    /* Something went wrong when opening semaphores. In this case
+     * it is better to shut off parallel procesing altogether
+     */
+
+    VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
+    can_parallelize = false;
+    free_controls(pool);
+
+    ovs_mutex_unlock(&init_mutex);
+    return false;
+}
+
+
 struct worker_pool *
-ovn_add_worker_pool(void *(*start)(void *))
+ovn_add_worker_pool(void *(*start)(void *), int size)
 {
     struct worker_pool *new_pool = NULL;
-    struct worker_control *new_control;
     bool test = false;
     int i;
     char sem_name[256];
@@ -113,38 +185,29 @@ ovn_add_worker_pool(void *(*start)(void *))
         ovs_mutex_unlock(&init_mutex);
     }
 
+    if (!size) {
+        size = pool_size;
+    }
+
     ovs_mutex_lock(&init_mutex);
     if (can_parallelize) {
         new_pool = xmalloc(sizeof(struct worker_pool));
-        new_pool->size = pool_size;
-        new_pool->controls = NULL;
+        new_pool->size = size;
+        new_pool->start = start;
         sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
         new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
         if (new_pool->done == SEM_FAILED) {
             goto cleanup;
         }
 
-        new_pool->controls =
-            xmalloc(sizeof(struct worker_control) * new_pool->size);
+        new_pool->controls = alloc_controls(size);
+        new_pool->workers_must_exit = false;
 
         for (i = 0; i < new_pool->size; i++) {
-            new_control = &new_pool->controls[i];
-            new_control->id = i;
-            new_control->done = new_pool->done;
-            new_control->data = NULL;
-            ovs_mutex_init(&new_control->mutex);
-            new_control->finished = ATOMIC_VAR_INIT(false);
-            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
-            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
-            if (new_control->fire == SEM_FAILED) {
+            if (!init_control(&new_pool->controls[i], i, new_pool)) {
                 goto cleanup;
             }
         }
-
-        for (i = 0; i < pool_size; i++) {
-            new_pool->controls[i].worker =
-                ovs_thread_create("worker pool helper", start, 
&new_pool->controls[i]);
-        }
         ovs_list_push_back(&worker_pools, &new_pool->list_node);
     }
     ovs_mutex_unlock(&init_mutex);
@@ -157,16 +220,7 @@ cleanup:
 
     VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
     can_parallelize = false;
-    if (new_pool->controls) {
-        for (i = 0; i < new_pool->size; i++) {
-            if (new_pool->controls[i].fire != SEM_FAILED) {
-                sem_close(new_pool->controls[i].fire);
-                sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
-                sem_unlink(sem_name);
-                break; /* semaphores past this one are uninitialized */
-            }
-        }
-    }
+    free_controls(new_pool);
     if (new_pool->done != SEM_FAILED) {
         sem_close(new_pool->done);
         sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
@@ -176,7 +230,6 @@ cleanup:
     return NULL;
 }
 
-
 /* Initializes 'hmap' as an empty hash table with mask N. */
 void
 ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
@@ -221,13 +274,9 @@ ovn_fast_hmap_size_for(struct hmap *hmap, int size)
 /* Run a thread pool which uses a callback function to process results
  */
 void
-ovn_run_pool_callback(struct worker_pool *pool,
-                      void *fin_result, void *result_frags,
-                      void (*helper_func)(struct worker_pool *pool,
-                                          void *fin_result,
-                                          void *result_frags, int index))
+ovn_start_pool(struct worker_pool *pool)
 {
-    int index, completed;
+    int index;
 
     /* Ensure that all worker threads see the same data as the
      * main thread.
@@ -238,8 +287,20 @@ ovn_run_pool_callback(struct worker_pool *pool,
     for (index = 0; index < pool->size; index++) {
         sem_post(pool->controls[index].fire);
     }
+}
 
-    completed = 0;
+/* Complete a thread pool which uses a callback function to process results
+ */
+void
+ovn_complete_pool_callback(struct worker_pool *pool,
+                           void *fin_result, void *result_frags,
+                           void (*helper_func)(struct worker_pool *pool,
+                                               void *fin_result,
+                                               void *result_frags,
+                                               int index))
+{
+    int index;
+    int completed = 0;
 
     do {
         bool test;
@@ -282,6 +343,19 @@ ovn_run_pool_callback(struct worker_pool *pool,
     } while (completed < pool->size);
 }
 
+/* Complete a thread pool which uses a callback function to process results
+ */
+void
+ovn_run_pool_callback(struct worker_pool *pool,
+                      void *fin_result, void *result_frags,
+                      void (*helper_func)(struct worker_pool *pool,
+                                          void *fin_result,
+                                          void *result_frags, int index))
+{
+    ovn_start_pool(pool);
+    ovn_complete_pool_callback(pool, fin_result, result_frags, helper_func);
+}
+
 /* Run a thread pool - basic, does not do results processing.
  */
 void
@@ -350,29 +424,99 @@ ovn_run_pool_list(struct worker_pool *pool,
 }
 
 void
-ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl)
+ovn_update_hashrow_locks(struct hmap *target, struct hashrow_locks *hrl)
 {
     int i;
-    if (hrl->mask != lflows->mask) {
+    if (hrl->mask != target->mask) {
         if (hrl->row_locks) {
             free(hrl->row_locks);
         }
-        hrl->row_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1);
-        hrl->mask = lflows->mask;
-        for (i = 0; i <= lflows->mask; i++) {
+        hrl->row_locks = xcalloc(sizeof(struct ovs_mutex), target->mask + 1);
+        hrl->mask = target->mask;
+        for (i = 0; i <= target->mask; i++) {
             ovs_mutex_init(&hrl->row_locks[i]);
         }
     }
 }
 
+static bool
+init_control(struct worker_control *control, int id,
+             struct worker_pool *pool)
+{
+    char sem_name[256];
+    control->id = id;
+    control->done = pool->done;
+    control->data = NULL;
+    ovs_mutex_init(&control->mutex);
+    control->finished = ATOMIC_VAR_INIT(false);
+    sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, id);
+    control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
+    control->pool = pool;
+    control->worker = 0;
+    if (control->fire == SEM_FAILED) {
+        return false;
+    }
+    control->worker =
+        ovs_thread_create("worker pool helper", pool->start, control);
+    return true;
+}
+
 static void
-worker_pool_hook(void *aux OVS_UNUSED) {
+cleanup_control(struct worker_pool *pool, int id)
+{
+    char sem_name[256];
+    struct worker_control *control = &pool->controls[id];
+
+    if (control->fire != SEM_FAILED) {
+        sem_close(control->fire);
+        sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, id);
+        sem_unlink(sem_name);
+    }
+}
+
+static void
+free_controls(struct worker_pool *pool)
+{
+    int i;
+    if (pool->controls) {
+        pool->workers_must_exit = true;
+        for (i = 0; i < pool->size ; i++) {
+            if (pool->controls[i].fire != SEM_FAILED) {
+                sem_post(pool->controls[i].fire);
+            }
+        }
+        for (i = 0; i < pool->size ; i++) {
+            if (pool->controls[i].worker) {
+                pthread_join(pool->controls[i].worker, NULL);
+                pool->controls[i].worker = 0;
+            }
+        }
+        for (i = 0; i < pool->size; i++) {
+                cleanup_control(pool, i);
+            }
+        free(pool->controls);
+        pool->controls = NULL;
+        pool->workers_must_exit = false;
+    }
+}
+
+static struct worker_control *alloc_controls(int size)
+{
     int i;
+    struct worker_control *controls =
+        xcalloc(sizeof(struct worker_control), size);
+
+    for (i = 0; i < size ; i++) {
+        controls[i].fire = SEM_FAILED;
+    }
+    return controls;
+}
+
+static void
+worker_pool_hook(void *aux OVS_UNUSED) {
     static struct worker_pool *pool;
     char sem_name[256];
 
-    workers_must_exit = true;
-
     /* All workers must honour the must_exit flag and check for it regularly.
      * We can make it atomic and check it via atomics in workers, but that
      * is not really necessary as it is set just once - when the program
@@ -383,17 +527,7 @@ worker_pool_hook(void *aux OVS_UNUSED) {
     /* Wake up the workers after the must_exit flag has been set */
 
     LIST_FOR_EACH (pool, list_node, &worker_pools) {
-        for (i = 0; i < pool->size ; i++) {
-            sem_post(pool->controls[i].fire);
-        }
-        for (i = 0; i < pool->size ; i++) {
-            pthread_join(pool->controls[i].worker, NULL);
-        }
-        for (i = 0; i < pool->size ; i++) {
-            sem_close(pool->controls[i].fire);
-            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
-            sem_unlink(sem_name);
-        }
+        free_controls(pool);
         sem_close(pool->done);
         sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
         sem_unlink(sem_name);
@@ -458,4 +592,40 @@ merge_hash_results(struct worker_pool *pool OVS_UNUSED,
     hmap_destroy(&res_frags[index]);
 }
 
+static void *
+standard_helper_thread(void *arg)
+{
+    struct worker_control *control = (struct worker_control *) arg;
+    struct helper_data *hd;
+    int bnum;
+    struct hmap_node *element, *next;
+
+    while (!stop_parallel_processing(control->pool)) {
+        wait_for_work(control);
+        hd = (struct helper_data *) control->data;
+        if (stop_parallel_processing(control->pool)) {
+            return NULL;
+        }
+        if (hd) {
+            for (bnum = control->id; bnum <= hd->target->mask;
+                 bnum += control->pool->size)
+            {
+                element = hmap_first_in_bucket_num(hd->target, bnum);
+                while (element) {
+                    if (stop_parallel_processing(control->pool)) {
+                        return NULL;
+                    }
+                    next = element->next;
+                    hd->element_func(element, hd->target, hd->data);
+                    element = next;
+                }
+            }
+
+        }
+        post_completed_work(control);
+    }
+    return NULL;
+}
+
+
 #endif
diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
index 2df132ea8..4cdd5c4e5 100644
--- a/lib/ovn-parallel-hmap.h
+++ b/lib/ovn-parallel-hmap.h
@@ -83,6 +83,7 @@ struct worker_control {
     void *data; /* Pointer to data to be processed. */
     void *workload; /* back-pointer to the worker pool structure. */
     pthread_t worker;
+    struct worker_pool *pool;
 };
 
 struct worker_pool {
@@ -90,16 +91,31 @@ struct worker_pool {
     struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
     struct worker_control *controls; /* "Handles" in this pool. */
     sem_t *done; /* Work completion semaphorew. */
+    void *(*start)(void *); /* Work function. */
+    bool workers_must_exit; /* Pool to be destroyed flag. */
+};
+
+
+struct helper_data {
+    struct hmap *target;
+    void (*element_func)(struct hmap_node *element, struct hmap *target,
+                    void *data);
+    void *data;
 };
 
 /* Add a worker pool for thread function start() which expects a pointer to
- * a worker_control structure as an argument. */
+ * a worker_control structure as an argument.
+ * If size is non-zero, it is used for pool sizing. If size is zero, pool
+ * size uses system defaults.
+ */
 
-struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
+struct worker_pool *ovn_add_worker_pool(void *(*start)(void *), int size);
+
+struct worker_pool *ovn_add_standard_pool(int size);
 
 /* Setting this to true will make all processing threads exit */
 
-bool ovn_stop_parallel_processing(void);
+bool ovn_stop_parallel_processing(struct worker_pool *pool);
 
 /* Build a hmap pre-sized for size elements */
 
@@ -143,6 +159,35 @@ void ovn_run_pool_callback(struct worker_pool *pool, void 
*fin_result,
                            void *fin_result, void *result_frags, int index));
 
 
+/* Start a pool in background
+ */
+
+void ovn_start_pool(struct worker_pool *pool);
+
+/* Complete a background pool run, merge results from hash frags into a final
+ * hash result.
+ * The hash frags must be pre-sized to the same size.
+ */
+
+void ovn_complete_pool_hash(struct worker_pool *pool,
+                       struct hmap *result, struct hmap *result_frags);
+/* Complete a background pool run, merge results from list frags into a final
+ * list result.
+ */
+
+void ovn_complete_pool_list(struct worker_pool *pool,
+                       struct ovs_list *result, struct ovs_list *result_frags);
+
+/* Complete a background pool run, call a callback function to perform
+ * processing of results.
+ */
+
+void ovn_complete_pool_callback(struct worker_pool *pool, void *fin_result,
+                           void *result_frags,
+                           void (*helper_func)(struct worker_pool *pool,
+                           void *fin_result, void *result_frags, int index));
+
+
 /* Returns the first node in 'hmap' in the bucket in which the given 'hash'
  * would land, or a null pointer if that bucket is empty. */
 
@@ -230,7 +275,7 @@ struct hashrow_locks {
 
 /* Update an hash row locks structure to match the current hash size */
 
-void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl);
+void ovn_update_hashrow_locks(struct hmap *, struct hashrow_locks *);
 
 /* Lock a hash row */
 static inline void lock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
@@ -253,6 +298,10 @@ static inline void init_hash_row_locks(struct 
hashrow_locks *hrl)
 
 bool ovn_can_parallelize_hashes(bool force_parallel);
 
+void ovn_destroy_pool(struct worker_pool *pool);
+
+bool ovn_resize_pool(struct worker_pool *pool, int size);
+
 /* Use the OVN library functions for stuff which OVS has not defined
  * If OVS has defined these, they will still compile using the OVN
  * local names, but will be dropped by the linker in favour of the OVS
@@ -263,9 +312,11 @@ bool ovn_can_parallelize_hashes(bool force_parallel);
 
 #define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
 
-#define stop_parallel_processing() ovn_stop_parallel_processing()
+#define stop_parallel_processing(pool) ovn_stop_parallel_processing(pool)
+
+#define add_worker_pool(start, size) ovn_add_worker_pool(start, size)
 
-#define add_worker_pool(start) ovn_add_worker_pool(start)
+#define add_standard_pool(start, size) ovn_add_standard_pool(start, size)
 
 #define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)
 
@@ -286,6 +337,20 @@ bool ovn_can_parallelize_hashes(bool force_parallel);
 #define run_pool_callback(pool, fin_result, result_frags, helper_func) \
     ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)
 
+#define start_pool(pool) ovn_start_pool(pool)
+
+#define complete_pool_hash(pool, result, result_frags) \
+    ovn_complete_pool_hash(pool, result, result_frags)
+
+#define complete_pool_list(pool, result, result_frags) \
+    ovn_complete_pool_list(pool, result, result_frags)
+
+#define complete_pool_callback(pool, fin_result, result_frags, helper_func) \
+    ovn_complete_pool_callback(pool, fin_result, result_frags, helper_func)
+
+#define destroy_pool(pool) ovn_destroy_pool(pool)
+
+#define resize_pool(pool, size) ovn_resize_pool(pool, size)
 
 
 #ifdef __clang__
diff --git a/northd/northd.c b/northd/northd.c
index d1b87891c..7724d27e9 100644
--- a/northd/northd.c
+++ b/northd/northd.c
@@ -12871,16 +12871,10 @@ build_lswitch_and_lrouter_iterate_by_op(struct 
ovn_port *op,
                                       &lsi->actions);
 }
 
-struct lflows_thread_pool {
-    struct worker_pool *pool;
-};
-
-
 static void *
 build_lflows_thread(void *arg)
 {
     struct worker_control *control = (struct worker_control *) arg;
-    struct lflows_thread_pool *workload;
     struct lswitch_flow_build_info *lsi;
 
     struct ovn_datapath *od;
@@ -12889,21 +12883,20 @@ build_lflows_thread(void *arg)
     struct ovn_igmp_group *igmp_group;
     int bnum;
 
-    while (!stop_parallel_processing()) {
+    while (!stop_parallel_processing(control->pool)) {
         wait_for_work(control);
-        workload = (struct lflows_thread_pool *) control->workload;
         lsi = (struct lswitch_flow_build_info *) control->data;
-        if (stop_parallel_processing()) {
+        if (stop_parallel_processing(control->pool)) {
             return NULL;
         }
-        if (lsi && workload) {
+        if (lsi) {
             /* Iterate over bucket ThreadID, ThreadID+size, ... */
             for (bnum = control->id;
                     bnum <= lsi->datapaths->mask;
-                    bnum += workload->pool->size)
+                    bnum += control->pool->size)
             {
                 HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, lsi->datapaths) 
{
-                    if (stop_parallel_processing()) {
+                    if (stop_parallel_processing(control->pool)) {
                         return NULL;
                     }
                     build_lswitch_and_lrouter_iterate_by_od(od, lsi);
@@ -12911,10 +12904,10 @@ build_lflows_thread(void *arg)
             }
             for (bnum = control->id;
                     bnum <= lsi->ports->mask;
-                    bnum += workload->pool->size)
+                    bnum += control->pool->size)
             {
                 HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, lsi->ports) {
-                    if (stop_parallel_processing()) {
+                    if (stop_parallel_processing(control->pool)) {
                         return NULL;
                     }
                     build_lswitch_and_lrouter_iterate_by_op(op, lsi);
@@ -12922,10 +12915,10 @@ build_lflows_thread(void *arg)
             }
             for (bnum = control->id;
                     bnum <= lsi->lbs->mask;
-                    bnum += workload->pool->size)
+                    bnum += control->pool->size)
             {
                 HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, lsi->lbs) {
-                    if (stop_parallel_processing()) {
+                    if (stop_parallel_processing(control->pool)) {
                         return NULL;
                     }
                     build_lswitch_arp_nd_service_monitor(lb, lsi->lflows,
@@ -12943,11 +12936,11 @@ build_lflows_thread(void *arg)
             }
             for (bnum = control->id;
                     bnum <= lsi->igmp_groups->mask;
-                    bnum += workload->pool->size)
+                    bnum += control->pool->size)
             {
                 HMAP_FOR_EACH_IN_PARALLEL (
                         igmp_group, hmap_node, bnum, lsi->igmp_groups) {
-                    if (stop_parallel_processing()) {
+                    if (stop_parallel_processing(control->pool)) {
                         return NULL;
                     }
                     build_lswitch_ip_mcast_igmp_mld(igmp_group, lsi->lflows,
@@ -12962,43 +12955,18 @@ build_lflows_thread(void *arg)
 }
 
 static bool pool_init_done = false;
-static struct lflows_thread_pool *build_lflows_pool = NULL;
+static struct worker_pool *build_lflows_pool = NULL;
 
 static void
 init_lflows_thread_pool(void)
 {
-    int index;
 
     if (!pool_init_done) {
-        struct worker_pool *pool = add_worker_pool(build_lflows_thread);
+        build_lflows_pool = add_worker_pool(build_lflows_thread, 0);
         pool_init_done = true;
-        if (pool) {
-            build_lflows_pool = xmalloc(sizeof(*build_lflows_pool));
-            build_lflows_pool->pool = pool;
-            for (index = 0; index < build_lflows_pool->pool->size; index++) {
-                build_lflows_pool->pool->controls[index].workload =
-                    build_lflows_pool;
-            }
-        }
     }
 }
 
-/* TODO: replace hard cutoffs by configurable via commands. These are
- * temporary defines to determine single-thread to multi-thread processing
- * cutoff.
- * Setting to 1 forces "all parallel" lflow build.
- */
-
-static void
-noop_callback(struct worker_pool *pool OVS_UNUSED,
-              void *fin_result OVS_UNUSED,
-              void *result_frags OVS_UNUSED,
-              int index OVS_UNUSED)
-{
-    /* Do nothing */
-}
-
-
 static void
 build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
                                 struct hmap *port_groups, struct hmap *lflows,
@@ -13022,16 +12990,16 @@ build_lswitch_and_lrouter_flows(struct hmap 
*datapaths, struct hmap *ports,
         struct lswitch_flow_build_info *lsiv;
         int index;
 
-        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size);
+        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->size);
         if (use_logical_dp_groups) {
             lflow_segs = NULL;
         } else {
-            lflow_segs = xcalloc(sizeof(*lflow_segs), 
build_lflows_pool->pool->size);
+            lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->size);
         }
 
         /* Set up "work chunks" for each thread to work on. */
 
-        for (index = 0; index < build_lflows_pool->pool->size; index++) {
+        for (index = 0; index < build_lflows_pool->size; index++) {
             if (use_logical_dp_groups) {
                 /* if dp_groups are in use we lock a shared lflows hash
                  * on a per-bucket level instead of merging hash frags */
@@ -13053,17 +13021,17 @@ build_lswitch_and_lrouter_flows(struct hmap 
*datapaths, struct hmap *ports,
             ds_init(&lsiv[index].match);
             ds_init(&lsiv[index].actions);
 
-            build_lflows_pool->pool->controls[index].data = &lsiv[index];
+            build_lflows_pool->controls[index].data = &lsiv[index];
         }
 
         /* Run thread pool. */
         if (use_logical_dp_groups) {
-            run_pool_callback(build_lflows_pool->pool, NULL, NULL, 
noop_callback);
+            run_pool_callback(build_lflows_pool, NULL, NULL, NULL);
         } else {
-            run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
+            run_pool_hash(build_lflows_pool, lflows, lflow_segs);
         }
 
-        for (index = 0; index < build_lflows_pool->pool->size; index++) {
+        for (index = 0; index < build_lflows_pool->size; index++) {
             ds_destroy(&lsiv[index].match);
             ds_destroy(&lsiv[index].actions);
         }
-- 
2.20.1

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to