Thanks for having a look at this.
On Fri, 4 Feb 2022 at 13:48, Robert Haas <[email protected]> wrote:
> I think the actual rule is: every path under a Gather or GatherMerge
> must be parallel-safe.
I've adjusted the patch so that it counts parallel_aware and
parallel_safe Paths independently and verifies everything below a
Gather[Merge] is parallel_safe.
The diff stat currently looks like:
src/backend/optimizer/plan/createplan.c | 230
1 file changed, 230 insertions(+)
I still feel this is quite a bit of code for what we're getting here.
I'd be more for it if the path traversal function existed for some
other reason and I was just adding the callback functions and Asserts.
I'm keen to hear what others think about that.
David
diff --git a/src/backend/optimizer/plan/createplan.c
b/src/backend/optimizer/plan/createplan.c
index cd6d72c763..898046ca07 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -313,6 +313,20 @@ static ModifyTable *make_modifytable(PlannerInfo *root,
Plan *subplan,
List
*rowMarks, OnConflictExpr *onconflict, int epqParam);
static GatherMerge *create_gather_merge_plan(PlannerInfo *root,
GatherMergePath *best_path);
+static bool contains_a_parallel_aware_path(Path *path);
+static bool contains_only_parallel_safe_paths(Path *path);
+
+/*
+ * PathTypeCount
+ * Used for various checks to assert plans are sane in assert
enabled
+ * builds.
+ */
+typedef struct PathTypeCount
+{
+ uint64 count;
+ uint64 parallel_safe_count;
+ uint64 parallel_aware_count;
+} PathTypeCount;
/*
@@ -389,6 +403,10 @@ create_plan_recurse(PlannerInfo *root, Path *best_path,
int flags)
/* Guard against stack overflow due to overly complex plans */
check_stack_depth();
+ /* Parallel aware paths should contain only parallel safe subpaths. */
+ Assert(!best_path->parallel_aware ||
+ contains_only_parallel_safe_paths(best_path));
+
switch (best_path->pathtype)
{
case T_SeqScan:
@@ -481,6 +499,14 @@ create_plan_recurse(PlannerInfo *root, Path *best_path,
int flags)
case T_Gather:
plan = (Plan *) create_gather_plan(root,
(GatherPath *) best_path);
+
+ /*
+ * We expect a Gather to contain at least one parallel
aware path
+ * unless running in single_copy mode.
+ */
+ Assert(((GatherPath *) best_path)->single_copy ||
+ contains_a_parallel_aware_path(((GatherPath
*)
+
best_path)->subpath));
break;
case T_Sort:
plan = (Plan *) create_sort_plan(root,
@@ -537,6 +563,9 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int
flags)
case T_GatherMerge:
plan = (Plan *) create_gather_merge_plan(root,
(GatherMergePath *) best_path);
+ /* GatherMerge must contain at least one parallel aware
path */
+ Assert(contains_a_parallel_aware_path(((GatherMergePath
*)
+
best_path)->subpath));
break;
default:
elog(ERROR, "unrecognized node type: %d",
@@ -7052,6 +7081,207 @@ make_modifytable(PlannerInfo *root, Plan *subplan,
return node;
}
+/*
+ * path_tree_walker
+ * Walk a path tree beginning with 'path' and call the 'walker'
function
+ * for that path and each of its subpaths recursively.
+ */
+static void
+path_tree_walker(Path *path, void (*walker) (), void *context)
+
+{
+ if (path == NULL)
+ return;
+
+ /* Guard against stack overflow due to overly complex path trees */
+ check_stack_depth();
+
+ walker(path, context);
+
+ switch (path->pathtype)
+ {
+ case T_SeqScan:
+ case T_SampleScan:
+ case T_IndexScan:
+ case T_IndexOnlyScan:
+ case T_BitmapHeapScan:
+ case T_TidScan:
+ case T_TidRangeScan:
+ case T_SubqueryScan:
+ case T_FunctionScan:
+ case T_TableFuncScan:
+ case T_ValuesScan:
+ case T_CteScan:
+ case T_WorkTableScan:
+ case T_NamedTuplestoreScan:
+ case T_ForeignScan:
+ case T_CustomScan:
+ /* Scan paths have no subpaths */
+ break;
+ case T_HashJoin:
+ case T_MergeJoin:
+ case T_NestLoop:
+ path_tree_walker(((JoinPath *) path)->outerjoinpath,
walker, context);
+ path_tree_walker(((JoinPath *) path)->innerjoinpath,
walker, context);
+ break;
+ case T_Append:
+ {
+ AppendPath *apath = (AppendPath *) path;
+ ListCell *lc;
+
+ foreach(lc, apath->subpaths)
+ {
+ Path *subpath = lfirst(lc);
+
+ path_tree_walker(subpath, walker,
context);
+ }
+ }
+ break;
+ case T_MergeAppend:
+ {
+ MergeAppendPath *mpath = (MergeAppendPath *)
path;
+ ListCell *lc;
+
+ foreach(lc, mpath->subpaths)
+ {
+ Path *subpath = lfirst(lc);
+
+ path_tree_walker(subpath, walker,
context);
+ }
+ }
+ break;
+ case T_Result:
+ if (IsA(path, ProjectionPath))
+ {
+ path_tree_walker(((ProjectionPath *)
path)->subpath, walker, context);
+ }
+ else if (IsA(path, MinMaxAggPath))
+ {
+ /* MinMaxAggPath has no subpaths */
+ }
+ else if (IsA(path, GroupResultPath))
+ {
+ /* GroupResultPath has no subpaths */
+ }
+ else
+ {
+ /* No subpaths for any other Result type path */
+ }
+ break;
+ case T_ProjectSet:
+ path_tree_walker(((ProjectSetPath *) path)->subpath,
walker, context);
+ break;
+ case T_Material:
+ path_tree_walker(((MaterialPath *) path)->subpath,
walker, context);
+ break;
+ case T_Memoize:
+ path_tree_walker(((MemoizePath *) path)->subpath,
walker, context);
+ break;
+ case T_Unique:
+ if (IsA(path, UpperUniquePath))
+ path_tree_walker(((UpperUniquePath *)
path)->subpath, walker, context);
+ else
+ {
+ Assert(IsA(path, UniquePath));
+ path_tree_walker(((UniquePath *)
path)->subpath, walker, context);
+ }
+ break;
+ case T_Gather:
+ path_tree_walker(((GatherPath *) path)->subpath,
walker, context);
+ break;
+ case T_Sort:
+ path_tree_walker(((SortPath *) path)->subpath, walker,
context);
+ break;
+ case T_IncrementalSort:
+ path_tree_walker(((IncrementalSortPath *)
path)->spath.subpath, walker, context);
+ break;
+ case T_Group:
+ path_tree_walker(((GroupPath *) path)->subpath, walker,
context);
+ break;
+ case T_Agg:
+ if (IsA(path, GroupingSetsPath))
+ path_tree_walker(((GroupingSetsPath *)
path)->subpath, walker, context);
+ else
+ {
+ Assert(IsA(path, AggPath));
+ path_tree_walker(((AggPath *) path)->subpath,
walker, context);
+ }
+ break;
+ case T_WindowAgg:
+ path_tree_walker(((WindowAggPath *) path)->subpath,
walker, context);
+ break;
+ case T_SetOp:
+ path_tree_walker(((SetOpPath *) path)->subpath, walker,
context);
+ break;
+ case T_RecursiveUnion:
+ path_tree_walker(((RecursiveUnionPath *)
path)->leftpath, walker, context);
+ path_tree_walker(((RecursiveUnionPath *)
path)->rightpath, walker, context);
+ break;
+ case T_LockRows:
+ path_tree_walker(((LockRowsPath *) path)->subpath,
walker, context);
+ break;
+ case T_ModifyTable:
+ path_tree_walker(((ModifyTablePath *) path)->subpath,
walker, context);
+ break;
+ case T_Limit:
+ path_tree_walker(((LimitPath *) path)->subpath, walker,
context);
+ break;
+ case T_GatherMerge:
+ path_tree_walker(((GatherMergePath *) path)->subpath,
walker, context);
+ break;
+ default:
+ elog(ERROR, "unrecognized node type: %d", (int)
path->pathtype);
+ break;
+ }
+}
+
+/*
+ * path_type_counter
+ * Determine the total number of paths and the number of paths
that are
+ * parallel_aware and the number that are parallel safe.
+ */
+static void
+path_type_counter(Path *path, PathTypeCount *pathcount)
+{
+ pathcount->count++;
+ if (path->parallel_aware)
+ pathcount->parallel_aware_count++;
+ if (path->parallel_safe)
+ pathcount->parallel_safe_count++;
+}
+
+/*
+ * contains_a_parallel_aware_path
+ * Determine if 'path' or any of its subpaths are parallel aware
+ */
+static bool
+contains_a_parallel_aware_path(Path *path)
+{
+ PathTypeCount pathcount;
+
+ memset(&pathcount, 0, sizeof(pathcount));
+
+ path_tree_walker(path, path_type_counter, (void *) &pathcount);
+
+ return (pathcount.parallel_aware_count > 0);
+}
+
+/*
+ * contains_only_parallel_safe_paths
+ * Returns true if 'path' and all of its subpaths are parallel safe
+ */
+static bool
+contains_only_parallel_safe_paths(Path *path)
+{
+ PathTypeCount pathcount;
+
+ memset(&pathcount, 0, sizeof(pathcount));
+
+ path_tree_walker(path, path_type_counter, (void *) &pathcount);
+
+ return (pathcount.parallel_safe_count == pathcount.count);
+}
+
/*
* is_projection_capable_path
* Check whether a given Path node is able to do projection.