From fa7fc19c8d021fe314cd780af7ae724c1dd53096 Mon Sep 17 00:00:00 2001
From: Anthonin Bonnefoy <anthonin.bonnefoy@datadoghq.com>
Date: Thu, 23 May 2024 11:24:44 +0200
Subject: Fix row estimation in gather paths

In parallel plans, the row count of a partial plan is estimated to
(rows/parallel_divisor). The parallel_divisor is the number of
parallel_workers plus a possible leader contribution.

When creating a gather path, we currently estimate the sum of gathered
rows to worker_rows*parallel_workers which leads to a lower estimated
row count.

This patch changes the gather path row estimation to
worker_rows*parallel_divisor to get a more accurate estimation.
---
 src/backend/optimizer/path/allpaths.c   |  7 +++----
 src/backend/optimizer/path/costsize.c   | 19 +++++++++++++++++++
 src/backend/optimizer/plan/planner.c    |  6 +++---
 src/include/nodes/pathnodes.h           |  5 +++++
 src/include/optimizer/optimizer.h       |  7 ++++++-
 src/test/regress/expected/join_hash.out | 19 +++++++++----------
 6 files changed, 45 insertions(+), 18 deletions(-)

diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 4895cee994..c1244a9b83 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -3071,8 +3071,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 	 * of partial_pathlist because of the way add_partial_path works.
 	 */
 	cheapest_partial_path = linitial(rel->partial_pathlist);
-	rows =
-		cheapest_partial_path->rows * cheapest_partial_path->parallel_workers;
+	rows = gather_rows_estimate(cheapest_partial_path);
 	simple_gather_path = (Path *)
 		create_gather_path(root, rel, cheapest_partial_path, rel->reltarget,
 						   NULL, rowsp);
@@ -3090,7 +3089,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 		if (subpath->pathkeys == NIL)
 			continue;
 
-		rows = subpath->rows * subpath->parallel_workers;
+		rows = gather_rows_estimate(subpath);
 		path = create_gather_merge_path(root, rel, subpath, rel->reltarget,
 										subpath->pathkeys, NULL, rowsp);
 		add_path(rel, &path->path);
@@ -3274,7 +3273,7 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 													subpath,
 													useful_pathkeys,
 													-1.0);
-				rows = subpath->rows * subpath->parallel_workers;
+				rows = gather_rows_estimate(subpath);
 			}
 			else
 				subpath = (Path *) create_incremental_sort_path(root,
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index ee23ed7835..24feb513ce 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -217,6 +217,25 @@ clamp_row_est(double nrows)
 	return nrows;
 }
 
+/*
+ * gather_rows_estimate
+ *		Estimate the number of rows for gather nodes.
+ *
+ * When creating a gather (merge) path, we need to estimate the sum of rows
+ * distributed to all workers. A worker will have an estimated row set to
+ * (rows / parallel_divisor). Since parallel_divisor may include the leader
+ * contribution, we can't simply multiply workers' rows by the number of
+ * parallel_workers and instead need to reuse the parallel_divisor to get a
+ * more accurate estimation.
+ */
+double
+gather_rows_estimate(Path *partial_path)
+{
+	double		parallel_divisor = get_parallel_divisor(partial_path);
+
+	return clamp_row_est(partial_path->rows * parallel_divisor);
+}
+
 /*
  * clamp_width_est
  *		Force a tuple-width estimate to a sane value.
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 4711f91239..c3e234902e 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -5370,8 +5370,8 @@ create_ordered_paths(PlannerInfo *root,
 																	root->sort_pathkeys,
 																	presorted_keys,
 																	limit_tuples);
-			total_groups = input_path->rows *
-				input_path->parallel_workers;
+			total_groups = gather_rows_estimate(input_path);
+
 			sorted_path = (Path *)
 				create_gather_merge_path(root, ordered_rel,
 										 sorted_path,
@@ -7543,7 +7543,7 @@ gather_grouping_paths(PlannerInfo *root, RelOptInfo *rel)
 			(presorted_keys == 0 || !enable_incremental_sort))
 			continue;
 
-		total_groups = path->rows * path->parallel_workers;
+		total_groups = gather_rows_estimate(path);
 
 		/*
 		 * We've no need to consider both a sort and incremental sort. We'll
diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h
index 14ccfc1ac1..f460539d6c 100644
--- a/src/include/nodes/pathnodes.h
+++ b/src/include/nodes/pathnodes.h
@@ -1616,6 +1616,11 @@ typedef struct ParamPathInfo
  * between RelOptInfo and Path nodes can't be handled easily in a simple
  * depth-first traversal.  We also don't have read support at the moment.
  */
+#ifndef HAVE_PATH_TYPEDEF
+typedef struct Path Path;
+#define HAVE_PATH_TYPEDEF 1
+#endif
+
 typedef struct Path
 {
 	pg_node_attr(no_copy_equal, no_read, no_query_jumble)
diff --git a/src/include/optimizer/optimizer.h b/src/include/optimizer/optimizer.h
index 7b63c5cf71..d1b16d1258 100644
--- a/src/include/optimizer/optimizer.h
+++ b/src/include/optimizer/optimizer.h
@@ -35,7 +35,7 @@ typedef struct PlannerInfo PlannerInfo;
 #define HAVE_PLANNERINFO_TYPEDEF 1
 #endif
 
-/* Likewise for IndexOptInfo and SpecialJoinInfo. */
+/* Likewise for IndexOptInfo, SpecialJoinInfo and Path. */
 #ifndef HAVE_INDEXOPTINFO_TYPEDEF
 typedef struct IndexOptInfo IndexOptInfo;
 #define HAVE_INDEXOPTINFO_TYPEDEF 1
@@ -44,6 +44,10 @@ typedef struct IndexOptInfo IndexOptInfo;
 typedef struct SpecialJoinInfo SpecialJoinInfo;
 #define HAVE_SPECIALJOININFO_TYPEDEF 1
 #endif
+#ifndef HAVE_PATH_TYPEDEF
+typedef struct Path Path;
+#define HAVE_PATH_TYPEDEF 1
+#endif
 
 /* It also seems best not to include plannodes.h, params.h, or htup.h here */
 struct PlannedStmt;
@@ -92,6 +96,7 @@ extern PGDLLIMPORT int effective_cache_size;
 extern double clamp_row_est(double nrows);
 extern int32 clamp_width_est(int64 tuple_width);
 extern long clamp_cardinality_to_long(Cardinality x);
+extern double gather_rows_estimate(Path *partial_path);
 
 /* in path/indxpath.c: */
 
diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out
index 262fa71ed8..4fc34a0e72 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -508,18 +508,17 @@ set local hash_mem_multiplier = 1.0;
 set local enable_parallel_hash = on;
 explain (costs off)
   select count(*) from simple r join extremely_skewed s using (id);
-                              QUERY PLAN                               
------------------------------------------------------------------------
- Finalize Aggregate
+                           QUERY PLAN                            
+-----------------------------------------------------------------
+ Aggregate
    ->  Gather
          Workers Planned: 1
-         ->  Partial Aggregate
-               ->  Parallel Hash Join
-                     Hash Cond: (r.id = s.id)
-                     ->  Parallel Seq Scan on simple r
-                     ->  Parallel Hash
-                           ->  Parallel Seq Scan on extremely_skewed s
-(9 rows)
+         ->  Parallel Hash Join
+               Hash Cond: (r.id = s.id)
+               ->  Parallel Seq Scan on simple r
+               ->  Parallel Hash
+                     ->  Parallel Seq Scan on extremely_skewed s
+(8 rows)
 
 select count(*) from simple r join extremely_skewed s using (id);
  count 
-- 
2.39.3 (Apple Git-146)

