From: Anton Ivanov <anton.iva...@cambridgegreys.com> Processing monitor cond changes in parallel where applicable.
The cut-off at 32 entries is set based on experimentation on a 6/12 Ryzen. It is reasonably conservative and should be OK for other CPUs with higher cost of thread IPC. Signed-off-by: Anton Ivanov <anton.iva...@cambridgegreys.com> --- ovsdb/monitor.c | 143 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 132 insertions(+), 11 deletions(-) diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c index 7c7a9faea..a7f6c1cf9 100644 --- a/ovsdb/monitor.c +++ b/ovsdb/monitor.c @@ -1254,6 +1254,109 @@ ovsdb_monitor_compose_update( return json; } +/* Parallel processing thread for conditional updates. + * This parallelisation is significantly less agressive, + * because ovsdb_monitor_table_condition_updated has to + * be run after each table. + */ + +struct compose_cond_change_info { + compose_row_update_cb_func row_update; + struct json *json; + struct ovsdb_monitor_table *mt; + struct ovsdb_monitor_session_condition *condition; + struct ovsdb_monitor *dbmon; +}; + +static void * +compose_cond_change_thread(void *arg) +{ + struct worker_control *control = (struct worker_control *) arg; + struct worker_pool *workload; + struct compose_cond_change_info *cci; + int bnum; + + while (!stop_parallel_processing()) { + wait_for_work(control); + workload = (struct worker_pool *) control->workload; + cci = (struct compose_cond_change_info *) control->data; + if (stop_parallel_processing()) { + return NULL; + } + if (cci && workload) { + size_t max_columns = ovsdb_monitor_max_columns(cci->dbmon); + unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns)); + struct json *table_json = NULL; + + for (bnum = control->id; + bnum <= cci->mt->table->rows.mask; + bnum += workload->size) { + + /* Iterate over all rows in table */ + struct ovsdb_row *row; + HMAP_FOR_EACH_IN_PARALLEL (row, + hmap_node, bnum, &cci->mt->table->rows) { + struct json *row_json; + + row_json = + ovsdb_monitor_compose_row_update2(cci->mt, + cci->condition, + OVSDB_ROW, row, + false, changed, + cci->mt->n_columns); + if (row_json) { + ovsdb_monitor_add_json_row(&cci->json, + cci->mt->table->schema->name, + &table_json, row_json, + ovsdb_row_get_uuid(row)); + } + } + } + free(changed); + } + post_completed_work(control); + } + return NULL; +} + +static struct worker_pool *monitor_pool_b = NULL; +static bool pool_init_b_done = false; + +static void +init_monitor_pool_b(void) +{ + if (!pool_init_b_done) { + if (can_parallelize_hashes(false)) { + monitor_pool_b = add_worker_pool(compose_cond_change_thread); + if (monitor_pool_b) { + int i; + for (i = 0; i < monitor_pool_b->size; i++) { + monitor_pool_b->controls[i].workload = monitor_pool_b; + } + } + } + } + pool_init_b_done = true; +} + +static void +pool_b_merge_cb(struct worker_pool *pool OVS_UNUSED, + void *fin_result, + void *result_frags, + int index) +{ + struct json **result = fin_result; + struct compose_cond_change_info *cci = result_frags; + + parallel_json_merge_tables(result, cci[index].json); +} + +/* Rather conservative cutoff. The JSON stack speed is such that > 8 + * seems to perform better in parallel. + */ + +#define POOL_B_CUTOFF 32 + static struct json* ovsdb_monitor_compose_cond_change_update( struct ovsdb_monitor *dbmon, @@ -1264,6 +1367,8 @@ ovsdb_monitor_compose_cond_change_update( size_t max_columns = ovsdb_monitor_max_columns(dbmon); unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns)); + init_monitor_pool_b(); + SHASH_FOR_EACH (node, &dbmon->tables) { struct ovsdb_monitor_table *mt = node->data; struct ovsdb_row *row; @@ -1280,17 +1385,33 @@ ovsdb_monitor_compose_cond_change_update( } /* Iterate over all rows in table */ - HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) { - struct json *row_json; - - row_json = ovsdb_monitor_compose_row_update2(mt, condition, - OVSDB_ROW, row, - false, changed, - mt->n_columns); - if (row_json) { - ovsdb_monitor_add_json_row(&json, mt->table->schema->name, - &table_json, row_json, - ovsdb_row_get_uuid(row)); + if (monitor_pool_b && hmap_count(&mt->table->rows) > POOL_B_CUTOFF) { + struct compose_cond_change_info *cci = + xcalloc(sizeof(struct compose_cond_change_info), + monitor_pool_b->size); + int i; + for (i = 0; i < monitor_pool_b->size; i++) { + cci[i].json = NULL; + cci[i].mt = mt; + cci[i].condition = condition; + cci[i].dbmon = dbmon; + monitor_pool_b->controls[i].data = &cci[i]; + } + run_pool_callback(monitor_pool_b, &json, cci, pool_b_merge_cb); + free(cci); + } else { + HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) { + struct json *row_json; + + row_json = ovsdb_monitor_compose_row_update2(mt, condition, + OVSDB_ROW, row, + false, changed, + mt->n_columns); + if (row_json) { + ovsdb_monitor_add_json_row(&json, mt->table->schema->name, + &table_json, row_json, + ovsdb_row_get_uuid(row)); + } } } ovsdb_monitor_table_condition_updated(mt, condition); -- 2.20.1 _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev