В Чт, 20/01/2022 в 09:32 +1300, David Rowley пишет:
> On Fri, 31 Dec 2021 at 00:14, Yura Sokolov <y.soko...@postgrespro.ru> wrote:
> > Suggested quick (and valid) fix in the patch attached:
> > - If Append has single child, then copy its parallel awareness.
> 
> I've been looking at this and I've gone through changing my mind about
> what's the right fix quite a number of times.
> 
> My current thoughts are that I don't really like the fact that we can
> have plans in the following shape:
> 
>  Finalize Aggregate
>    ->  Gather
>          Workers Planned: 1
>          ->  Partial Aggregate
>                ->  Parallel Hash Left Join
>                      Hash Cond: (gather_append_1.fk = gather_append_2.fk)
>                      ->  Index Scan using gather_append_1_ix on 
> gather_append_1
>                            Index Cond: (f = true)
>                      ->  Parallel Hash
>                            ->  Parallel Seq Scan on gather_append_2
> 
> It's only made safe by the fact that Gather will only use 1 worker.
> To me, it just seems too fragile to assume that's always going to be
> the case. I feel like this fix just relies on the fact that
> create_gather_path() and create_gather_merge_path() do
> "pathnode->num_workers = subpath->parallel_workers;". If someone
> decided that was to work a different way, then we risk this breaking
> again. Additionally, today we have Gather and GatherMerge, but we may
> one day end up with more node types that gather results from parallel
> workers, or even a completely different way of executing plans.

It seems strange parallel_aware and parallel_safe flags neither affect
execution nor are properly checked.

Except parallel_safe is checked in ExecSerializePlan which is called from
ExecInitParallelPlan, which is called from ExecGather and ExecGatherMerge.
But looks like this check doesn't affect execution as well.

> 
> I think a safer way to fix this is to just not remove the
> Append/MergeAppend node if the parallel_aware flag of the only-child
> and the Append/MergeAppend don't match. I've done that in the
> attached.
> 
> I believe the code at the end of add_paths_to_append_rel() can remain as is.

I found clean_up_removed_plan_level also called from 
set_subqueryscan_references.
Is there a need to patch there as well?

And there is strange state:
- in the loop by subpaths, pathnode->node.parallel_safe is set to AND of
  all its subpath's parallel_safe
  (therefore there were need to copy it in my patch version),
- that means, our AppendPath is parallel_aware but not parallel_safe.
It is ridiculous a bit.

And it is strange AppendPath could have more parallel_workers than sum of
its children parallel_workers.

So it looks like whole machinery around parallel_aware/parallel_safe has
no enough consistency.

Either way, I attach you version of fix with my tests as new patch version.

regards,
Yura Sokolov
From 359df37ae76170a4621cafd3ad8b318473c94a46 Mon Sep 17 00:00:00 2001
From: Yura Sokolov <y.soko...@postgrespro.ru>
Date: Sun, 23 Jan 2022 14:53:21 +0300
Subject: [PATCH v2] Fix duplicate result rows after Append path removal.

It could happen Append path is created with "parallel_aware" flag,
but its single child is not. Append path parent (Gather or Gather Merge)
thinks its child is parallel_aware, but after Append path removal Gather's
child become not parallel_aware. Then when Gather/Gather Merge decides
to run child in several workers or worker + leader participation, it
gathers duplicate result rows from several child path invocations.

To fix it don't remove Append/MergeAppend node if it's parallel_aware !=
single child parallel_aware.

Authors: David Rowley, Sokolov Yura.
---
 src/backend/optimizer/plan/setrefs.c          |  24 +++-
 .../expected/gather_removed_append.out        | 135 ++++++++++++++++++
 src/test/regress/parallel_schedule            |   1 +
 .../regress/sql/gather_removed_append.sql     |  82 +++++++++++
 4 files changed, 238 insertions(+), 4 deletions(-)
 create mode 100644 src/test/regress/expected/gather_removed_append.out
 create mode 100644 src/test/regress/sql/gather_removed_append.sql

diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index e44ae971b4b..a7b11b7f03a 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -1512,8 +1512,16 @@ set_append_references(PlannerInfo *root,
 		lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset);
 	}
 
-	/* Now, if there's just one, forget the Append and return that child */
-	if (list_length(aplan->appendplans) == 1)
+	/*
+	 * See if it's safe to get rid of the Append entirely.  For this to be
+	 * safe, there must be only one child plan and that child plan's parallel
+	 * awareness must match that of the Append's.  The reason for the latter
+	 * is that the if the Append is parallel aware and the child is not then
+	 * the calling plan may execute the non-parallel aware child multiple
+	 * times.
+	 */
+	if (list_length(aplan->appendplans) == 1 &&
+		((Plan *) linitial(aplan->appendplans))->parallel_aware == aplan->plan.parallel_aware)
 		return clean_up_removed_plan_level((Plan *) aplan,
 										   (Plan *) linitial(aplan->appendplans));
 
@@ -1576,8 +1584,16 @@ set_mergeappend_references(PlannerInfo *root,
 		lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset);
 	}
 
-	/* Now, if there's just one, forget the MergeAppend and return that child */
-	if (list_length(mplan->mergeplans) == 1)
+	/*
+	 * See if it's safe to get rid of the MergeAppend entirely.  For this to
+	 * be safe, there must be only one child plan and that child plan's
+	 * parallel awareness must match that of the MergeAppend's.  The reason
+	 * for the latter is that the if the MergeAppend is parallel aware and the
+	 * child is not then the calling plan may execute the non-parallel aware
+	 * child multiple times.
+	 */
+	if (list_length(mplan->mergeplans) == 1 &&
+		((Plan *) linitial(mplan->mergeplans))->parallel_aware == mplan->plan.parallel_aware)
 		return clean_up_removed_plan_level((Plan *) mplan,
 										   (Plan *) linitial(mplan->mergeplans));
 
diff --git a/src/test/regress/expected/gather_removed_append.out b/src/test/regress/expected/gather_removed_append.out
new file mode 100644
index 00000000000..1c2d40d7c76
--- /dev/null
+++ b/src/test/regress/expected/gather_removed_append.out
@@ -0,0 +1,135 @@
+-- Test correctness of parallel query execution after removal
+-- of Append path due to single non-trivial child.
+DROP TABLE IF EXISTS gather_append_1, gather_append_2;
+NOTICE:  table "gather_append_1" does not exist, skipping
+NOTICE:  table "gather_append_2" does not exist, skipping
+CREATE TABLE gather_append_1 (
+    fk int,
+    f bool
+);
+INSERT INTO gather_append_1 (fk, f) SELECT i, i%50=0 from generate_series(1, 2000) as i;
+CREATE INDEX gather_append_1_ix on gather_append_1 (f);
+CREATE TABLE gather_append_2 (
+    fk int,
+    val serial
+);
+INSERT INTO gather_append_2 (fk) SELECT fk from gather_append_1, generate_series(1, 5) as i;
+ANALYZE gather_append_1, gather_append_2;
+SET max_parallel_workers_per_gather = 0;
+-- Find correct rows count
+SELECT count(1)
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+ count 
+-------
+   200
+(1 row)
+
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0.1;
+SET min_parallel_table_scan_size = 0;
+SET max_parallel_workers_per_gather = 2;
+SELECT count(1)
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+ count 
+-------
+   200
+(1 row)
+
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT count(1)
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+                                                  QUERY PLAN                                                   
+---------------------------------------------------------------------------------------------------------------
+ Finalize Aggregate (actual rows=1 loops=1)
+   ->  Gather (actual rows=2 loops=1)
+         Workers Planned: 1
+         Workers Launched: 1
+         ->  Partial Aggregate (actual rows=1 loops=2)
+               ->  Parallel Hash Left Join (actual rows=100 loops=2)
+                     Hash Cond: (gather_append_1.fk = gather_append_2.fk)
+                     ->  Parallel Append (actual rows=20 loops=2)
+                           ->  Index Scan using gather_append_1_ix on gather_append_1 (actual rows=40 loops=1)
+                                 Index Cond: (f = true)
+                     ->  Parallel Hash (actual rows=5000 loops=2)
+                           Buckets: 16384  Batches: 1  Memory Usage: 544kB
+                           ->  Parallel Seq Scan on gather_append_2 (actual rows=5000 loops=2)
+(13 rows)
+
+-- Result rows in root node should be equal to non-parallel count
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT val
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;
+                                               QUERY PLAN                                                
+---------------------------------------------------------------------------------------------------------
+ Gather Merge (actual rows=200 loops=1)
+   Workers Planned: 1
+   Workers Launched: 1
+   ->  Sort (actual rows=100 loops=2)
+         Sort Key: gather_append_2.val
+         Sort Method: quicksort  Memory: 25kB
+         Worker 0:  Sort Method: quicksort  Memory: 25kB
+         ->  Parallel Hash Left Join (actual rows=100 loops=2)
+               Hash Cond: (gather_append_1.fk = gather_append_2.fk)
+               ->  Parallel Append (actual rows=20 loops=2)
+                     ->  Index Scan using gather_append_1_ix on gather_append_1 (actual rows=40 loops=1)
+                           Index Cond: (f = true)
+               ->  Parallel Hash (actual rows=5000 loops=2)
+                     Buckets: 16384  Batches: 1  Memory Usage: 576kB
+                     ->  Parallel Seq Scan on gather_append_2 (actual rows=5000 loops=2)
+(15 rows)
+
+-- Result rows in root node should be equal to non-parallel count
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT val
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;
+                                               QUERY PLAN                                                
+---------------------------------------------------------------------------------------------------------
+ Gather Merge (actual rows=200 loops=1)
+   Workers Planned: 1
+   Workers Launched: 1
+   ->  Sort (actual rows=100 loops=2)
+         Sort Key: gather_append_2.val
+         Sort Method: quicksort  Memory: 25kB
+         Worker 0:  Sort Method: quicksort  Memory: 25kB
+         ->  Parallel Hash Left Join (actual rows=100 loops=2)
+               Hash Cond: (gather_append_1.fk = gather_append_2.fk)
+               ->  Parallel Append (actual rows=20 loops=2)
+                     ->  Index Scan using gather_append_1_ix on gather_append_1 (actual rows=40 loops=1)
+                           Index Cond: (f = true)
+               ->  Parallel Hash (actual rows=5000 loops=2)
+                     Buckets: 16384  Batches: 1  Memory Usage: 544kB
+                     ->  Parallel Seq Scan on gather_append_2 (actual rows=5000 loops=2)
+(15 rows)
+
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 5b0c73d7e37..84f2f81255d 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -100,6 +100,7 @@ test: rules psql psql_crosstab amutils stats_ext collate.linux.utf8
 test: select_parallel
 test: write_parallel
 test: vacuum_parallel
+test: gather_removed_append
 
 # no relation related tests can be put in this group
 test: publication subscription
diff --git a/src/test/regress/sql/gather_removed_append.sql b/src/test/regress/sql/gather_removed_append.sql
new file mode 100644
index 00000000000..df1b796a4f6
--- /dev/null
+++ b/src/test/regress/sql/gather_removed_append.sql
@@ -0,0 +1,82 @@
+-- Test correctness of parallel query execution after removal
+-- of Append path due to single non-trivial child.
+
+DROP TABLE IF EXISTS gather_append_1, gather_append_2;
+
+CREATE TABLE gather_append_1 (
+    fk int,
+    f bool
+);
+
+INSERT INTO gather_append_1 (fk, f) SELECT i, i%50=0 from generate_series(1, 2000) as i;
+
+CREATE INDEX gather_append_1_ix on gather_append_1 (f);
+
+CREATE TABLE gather_append_2 (
+    fk int,
+    val serial
+);
+
+INSERT INTO gather_append_2 (fk) SELECT fk from gather_append_1, generate_series(1, 5) as i;
+
+ANALYZE gather_append_1, gather_append_2;
+
+SET max_parallel_workers_per_gather = 0;
+
+-- Find correct rows count
+SELECT count(1)
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0.1;
+SET min_parallel_table_scan_size = 0;
+SET max_parallel_workers_per_gather = 2;
+
+SELECT count(1)
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT count(1)
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+
+-- Result rows in root node should be equal to non-parallel count
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT val
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;
+
+-- Result rows in root node should be equal to non-parallel count
+EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false)
+SELECT val
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;
-- 
2.34.1

Reply via email to