On Fri, Feb 12, 2021 at 11:02 AM Melanie Plageman
<melanieplage...@gmail.com> wrote:
> I just attached the diff.

Squashed into one patch for the cfbot to chew on, with a few minor
adjustments to a few comments.
From 87c74af25940b0fc85186b0defe6e21ea2324c28 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Wed, 4 Nov 2020 14:25:33 -0800
Subject: [PATCH v5] Parallel Hash {Full,Right} Outer Join.

Previously, parallel full and right outer joins were not supported due
to a potential deadlock hazard posed by allowing workers to wait on a
barrier after barrier participants have started emitting tuples. More
details on the deadlock hazard can be found in the referenced
discussion.

For now, sidestep the problem by terminating parallelism for the
unmatched inner tuple scan. The last process to arrive at the barrier
prepares for the unmatched inner tuple scan in HJ_NEED_NEW_OUTER and
transitions to HJ_FILL_INNER, scanning the hash table and emitting
unmatched inner tuples.

To align parallel and serial hash join, change
ExecScanHashTableForUnmatched() to also scan HashMemoryChunks for the
unmatched tuple scan instead of accessing tuples through the hash table
buckets.

Author: Melanie Plageman <melanieplage...@gmail.com>
Author: Thomas Munro <thomas.mu...@gmail.com>
Discussion: https://postgr.es/m/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com
---
 src/backend/executor/nodeHash.c         | 205 ++++++++++++++++++------
 src/backend/executor/nodeHashjoin.c     |  58 +++----
 src/backend/optimizer/path/joinpath.c   |  14 +-
 src/include/executor/hashjoin.h         |  15 +-
 src/include/executor/nodeHash.h         |   3 +
 src/test/regress/expected/join_hash.out |  56 ++++++-
 src/test/regress/sql/join_hash.sql      |  23 ++-
 7 files changed, 283 insertions(+), 91 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index c5f2d1d22b..6305688efd 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -510,6 +510,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 		hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100;
 	hashtable->chunks = NULL;
 	hashtable->current_chunk = NULL;
+	hashtable->current_chunk_idx = 0;
 	hashtable->parallel_state = state->parallel_state;
 	hashtable->area = state->ps.state->es_query_dsa;
 	hashtable->batches = NULL;
@@ -2046,16 +2047,72 @@ void
 ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
 {
 	/*----------
-	 * During this scan we use the HashJoinState fields as follows:
+	 * During this scan we use the HashJoinTable fields as follows:
 	 *
-	 * hj_CurBucketNo: next regular bucket to scan
-	 * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums)
-	 * hj_CurTuple: last tuple returned, or NULL to start next bucket
+	 * current_chunk: current HashMemoryChunk to scan
+	 * current_chunk_idx: index in current HashMemoryChunk
 	 *----------
 	 */
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+
 	hjstate->hj_CurBucketNo = 0;
 	hjstate->hj_CurSkewBucketNo = 0;
 	hjstate->hj_CurTuple = NULL;
+	hashtable->current_chunk = hashtable->chunks;
+	hashtable->current_chunk_idx = 0;
+}
+
+/*
+ * ExecParallelPrepHashTableForUnmatched
+ *		set up for a series of ExecParallelScanHashTableForUnmatched calls
+ *		return true if this worker is elected to do the unmatched inner scan
+ */
+bool
+ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
+{
+	/*----------
+	 * During this scan we use the ParallelHashJoinBatchAccessor fields for the
+	 * current batch as follows:
+	 *
+	 * current_chunk: current HashMemoryChunk to scan
+	 * current_chunk_idx: index in current HashMemoryChunk
+	 *----------
+	 */
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	int			curbatch = hashtable->curbatch;
+	ParallelHashJoinBatchAccessor *batch_accessor = &hashtable->batches[curbatch];
+	ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+	bool		last = false;
+
+	hjstate->hj_CurBucketNo = 0;
+	hjstate->hj_CurSkewBucketNo = 0;
+	hjstate->hj_CurTuple = NULL;
+	if (curbatch < 0)
+		return false;
+	last = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
+	if (!last)
+	{
+		hashtable->batches[hashtable->curbatch].done = true;
+		/* Make sure any temporary files are closed. */
+		sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
+		sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
+
+		/*
+		 * Track the largest batch we've been attached to.  Though each
+		 * backend might see a different subset of batches, explain.c will
+		 * scan the results from all backends to find the largest value.
+		 */
+		hashtable->spacePeak =
+			Max(hashtable->spacePeak, batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
+		hashtable->curbatch = -1;
+	}
+	else
+	{
+		batch_accessor->shared_chunk = batch->chunks;
+		batch_accessor->current_chunk = dsa_get_address(hashtable->area, batch_accessor->shared_chunk);
+		batch_accessor->current_chunk_idx = 0;
+	}
+	return last;
 }
 
 /*
@@ -2069,60 +2126,110 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
 bool
 ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
 {
+	HashMemoryChunk next;
 	HashJoinTable hashtable = hjstate->hj_HashTable;
-	HashJoinTuple hashTuple = hjstate->hj_CurTuple;
 
-	for (;;)
+	while (hashtable->current_chunk)
 	{
-		/*
-		 * hj_CurTuple is the address of the tuple last returned from the
-		 * current bucket, or NULL if it's time to start scanning a new
-		 * bucket.
-		 */
-		if (hashTuple != NULL)
-			hashTuple = hashTuple->next.unshared;
-		else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
-		{
-			hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
-			hjstate->hj_CurBucketNo++;
-		}
-		else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
+		while (hashtable->current_chunk_idx < hashtable->current_chunk->used)
 		{
-			int			j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
+			HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(hashtable->current_chunk) +
+													   hashtable->current_chunk_idx);
+			MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
+			int			hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
+
+			/* next tuple in this chunk */
+			hashtable->current_chunk_idx += MAXALIGN(hashTupleSize);
+
+			if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+				continue;
+
+			/* insert hashtable's tuple into exec slot */
+			econtext->ecxt_innertuple =
+				ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+									  hjstate->hj_HashTupleSlot,
+									  false);
+
+			/*
+			 * Reset temp memory each time; although this function doesn't do
+			 * any qual eval, the caller will, so let's keep it parallel to
+			 * ExecScanHashBucket.
+			 */
+			ResetExprContext(econtext);
 
-			hashTuple = hashtable->skewBucket[j]->tuples;
-			hjstate->hj_CurSkewBucketNo++;
+			hjstate->hj_CurTuple = hashTuple;
+			return true;
 		}
-		else
-			break;				/* finished all buckets */
 
-		while (hashTuple != NULL)
+		next = hashtable->current_chunk->next.unshared;
+		hashtable->current_chunk = next;
+		hashtable->current_chunk_idx = 0;
+
+		/* allow this loop to be cancellable */
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	/*
+	 * no more unmatched tuples
+	 */
+	return false;
+}
+
+/*
+ * ExecParallelScanHashTableForUnmatched
+ *		scan the hash table for unmatched inner tuples, in parallel
+ *
+ * On success, the inner tuple is stored into hjstate->hj_CurTuple and
+ * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
+ * for the latter.
+ */
+bool
+ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+									  ExprContext *econtext)
+{
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	int			curbatch = hashtable->curbatch;
+	ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[curbatch];
+
+	/*
+	 * Only one worker should execute this function. Since tuples have already
+	 * been emitted, it is hazardous for workers to wait at the batch_barrier
+	 * again. Instead, all workers except the last will detach and the last
+	 * will conduct this unmatched inner tuple scan.
+	 */
+	Assert(BarrierParticipants(&accessor->shared->batch_barrier) == 1);
+	while (accessor->current_chunk)
+	{
+		while (accessor->current_chunk_idx < accessor->current_chunk->used)
 		{
-			if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
-			{
-				TupleTableSlot *inntuple;
+			HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(accessor->current_chunk) +
+													   accessor->current_chunk_idx);
 
-				/* insert hashtable's tuple into exec slot */
-				inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
-												 hjstate->hj_HashTupleSlot,
-												 false);	/* do not pfree */
-				econtext->ecxt_innertuple = inntuple;
+			accessor->current_chunk_idx += MAXALIGN(HJTUPLE_OVERHEAD +
+													HJTUPLE_MINTUPLE(hashTuple)->t_len);
 
-				/*
-				 * Reset temp memory each time; although this function doesn't
-				 * do any qual eval, the caller will, so let's keep it
-				 * parallel to ExecScanHashBucket.
-				 */
-				ResetExprContext(econtext);
+			if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+				continue;
 
-				hjstate->hj_CurTuple = hashTuple;
-				return true;
-			}
+			/* insert hashtable's tuple into exec slot */
+			econtext->ecxt_innertuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+															  hjstate->hj_HashTupleSlot, false);
+
+			/*
+			 * Reset temp memory each time; although this function doesn't do
+			 * any qual eval, the caller will, so let's keep it parallel to
+			 * ExecScanHashBucket.
+			 */
+			ResetExprContext(econtext);
 
-			hashTuple = hashTuple->next.unshared;
+			hjstate->hj_CurTuple = hashTuple;
+			return true;
 		}
 
-		/* allow this loop to be cancellable */
+		accessor->shared_chunk = accessor->current_chunk->next.shared;
+		accessor->current_chunk = dsa_get_address(hashtable->area, accessor->shared_chunk);
+		accessor->current_chunk_idx = 0;
+
 		CHECK_FOR_INTERRUPTS();
 	}
 
@@ -3131,13 +3238,6 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
 		/* Detach from the batch we were last working on. */
 		if (BarrierArriveAndDetach(&batch->batch_barrier))
 		{
-			/*
-			 * Technically we shouldn't access the barrier because we're no
-			 * longer attached, but since there is no way it's moving after
-			 * this point it seems safe to make the following assertion.
-			 */
-			Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE);
-
 			/* Free shared chunks and buckets. */
 			while (DsaPointerIsValid(batch->chunks))
 			{
@@ -3271,6 +3371,9 @@ ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
 	hashtable->current_chunk = NULL;
 	hashtable->current_chunk_shared = InvalidDsaPointer;
 	hashtable->batches[batchno].at_least_one_chunk = false;
+	hashtable->batches[batchno].shared_chunk = InvalidDsaPointer;
+	hashtable->batches[batchno].current_chunk = NULL;
+	hashtable->batches[batchno].current_chunk_idx = 0;
 }
 
 /*
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 510bdd39ad..dc526b49bd 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -82,7 +82,8 @@
  *  PHJ_BATCH_ALLOCATING     -- one allocates buckets
  *  PHJ_BATCH_LOADING        -- all load the hash table from disk
  *  PHJ_BATCH_PROBING        -- all probe
- *  PHJ_BATCH_DONE           -- end
+ *  PHJ_BATCH_FILLING_INNER  -- full/right outer scan
+ *  PHJ_BATCH_DONE           -- done
  *
  * Batch 0 is a special case, because it starts out in phase
  * PHJ_BATCH_PROBING; populating batch 0's hash table is done during
@@ -99,7 +100,9 @@
  * while attached to a barrier, unless the barrier has reached its final
  * state.  In the slightly special case of the per-batch barrier, we return
  * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
- * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
+ * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE or
+ * PHJ_BATCH_DONE_INNER, depending on whether or not the join requires
+ * a scan for unmatched inner tuples, without waiting.
  *
  *-------------------------------------------------------------------------
  */
@@ -360,9 +363,19 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 					/* end of batch, or maybe whole join */
 					if (HJ_FILL_INNER(node))
 					{
-						/* set up to scan for unmatched inner tuples */
-						ExecPrepHashTableForUnmatched(node);
-						node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+						if (parallel)
+						{
+							if (ExecParallelPrepHashTableForUnmatched(node))
+								node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+							else
+								node->hj_JoinState = HJ_NEED_NEW_BATCH;
+						}
+						else
+						{
+							/* set up to scan for unmatched inner tuples */
+							ExecPrepHashTableForUnmatched(node);
+							node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+						}
 					}
 					else
 						node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -455,25 +468,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				{
 					node->hj_MatchedOuter = true;
 
-					if (parallel)
-					{
-						/*
-						 * Full/right outer joins are currently not supported
-						 * for parallel joins, so we don't need to set the
-						 * match bit.  Experiments show that it's worth
-						 * avoiding the shared memory traffic on large
-						 * systems.
-						 */
-						Assert(!HJ_FILL_INNER(node));
-					}
-					else
-					{
-						/*
-						 * This is really only needed if HJ_FILL_INNER(node),
-						 * but we'll avoid the branch and just set it always.
-						 */
+
+					/*
+					 * This is really only needed if HJ_FILL_INNER(node), but
+					 * we'll avoid the branch and just set it always.
+					 */
+					if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
 						HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
-					}
 
 					/* In an antijoin, we never return a matched tuple */
 					if (node->js.jointype == JOIN_ANTI)
@@ -531,7 +532,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				 * so any unmatched inner tuples in the hashtable have to be
 				 * emitted before we continue to the next batch.
 				 */
-				if (!ExecScanHashTableForUnmatched(node, econtext))
+				if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
+					  : ExecScanHashTableForUnmatched(node, econtext)))
 				{
 					/* no more unmatched tuples */
 					node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -1173,13 +1175,15 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
 					 * hash table stays alive until everyone's finished
 					 * probing it, but no participant is allowed to wait at
 					 * this barrier again (or else a deadlock could occur).
-					 * All attached participants must eventually call
-					 * BarrierArriveAndDetach() so that the final phase
-					 * PHJ_BATCH_DONE can be reached.
+					 * All attached participants must eventually detach from
+					 * the barrier and one worker must advance the phase so
+					 * that the final phase is reached.
 					 */
 					ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
 					sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
 					return true;
+				case PHJ_BATCH_FILLING_INNER:
+					/* Fall through. */
 
 				case PHJ_BATCH_DONE:
 
diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c
index 57ce97fd53..dd9e26dcd3 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -1845,15 +1845,9 @@ hash_inner_and_outer(PlannerInfo *root,
 		 * able to properly guarantee uniqueness.  Similarly, we can't handle
 		 * JOIN_FULL and JOIN_RIGHT, because they can produce false null
 		 * extended rows.  Also, the resulting path must not be parameterized.
-		 * We would be able to support JOIN_FULL and JOIN_RIGHT for Parallel
-		 * Hash, since in that case we're back to a single hash table with a
-		 * single set of match bits for each batch, but that will require
-		 * figuring out a deadlock-free way to wait for the probe to finish.
 		 */
 		if (joinrel->consider_parallel &&
 			save_jointype != JOIN_UNIQUE_OUTER &&
-			save_jointype != JOIN_FULL &&
-			save_jointype != JOIN_RIGHT &&
 			outerrel->partial_pathlist != NIL &&
 			bms_is_empty(joinrel->lateral_relids))
 		{
@@ -1887,9 +1881,13 @@ hash_inner_and_outer(PlannerInfo *root,
 			 * total inner path will also be parallel-safe, but if not, we'll
 			 * have to search for the cheapest safe, unparameterized inner
 			 * path.  If doing JOIN_UNIQUE_INNER, we can't use any alternative
-			 * inner path.
+			 * inner path.  If full or right join, we can't use parallelism
+			 * (building the hash table in each backend) because no one
+			 * process has all the match bits.
 			 */
-			if (cheapest_total_inner->parallel_safe)
+			if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT)
+				cheapest_safe_inner = NULL;
+			else if (cheapest_total_inner->parallel_safe)
 				cheapest_safe_inner = cheapest_total_inner;
 			else if (save_jointype != JOIN_UNIQUE_INNER)
 				cheapest_safe_inner =
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index d74034f64f..66fea4ac58 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -205,6 +205,15 @@ typedef struct ParallelHashJoinBatchAccessor
 	bool		at_least_one_chunk; /* has this backend allocated a chunk? */
 
 	bool		done;			/* flag to remember that a batch is done */
+
+	/*
+	 * While doing the unmatched inner scan, the assigned worker may emit
+	 * tuples. Thus, we must keep track of where it was in the hashtable so it
+	 * can return to the correct offset within the correct chunk.
+	 */
+	dsa_pointer shared_chunk;
+	HashMemoryChunk current_chunk;
+	size_t		current_chunk_idx;
 	SharedTuplestoreAccessor *inner_tuples;
 	SharedTuplestoreAccessor *outer_tuples;
 } ParallelHashJoinBatchAccessor;
@@ -265,7 +274,8 @@ typedef struct ParallelHashJoinState
 #define PHJ_BATCH_ALLOCATING			1
 #define PHJ_BATCH_LOADING				2
 #define PHJ_BATCH_PROBING				3
-#define PHJ_BATCH_DONE					4
+#define PHJ_BATCH_FILLING_INNER			4
+#define PHJ_BATCH_DONE		            5
 
 /* The phases of batch growth while hashing, for grow_batches_barrier. */
 #define PHJ_GROW_BATCHES_ELECTING		0
@@ -351,6 +361,9 @@ typedef struct HashJoinTableData
 	/* used for dense allocation of tuples (into linked chunks) */
 	HashMemoryChunk chunks;		/* one list for the whole batch */
 
+	/* index of tuple within current chunk for serial unmatched inner scan */
+	size_t		current_chunk_idx;
+
 	/* Shared and private state for Parallel Hash. */
 	HashMemoryChunk current_chunk;	/* this backend's current chunk */
 	dsa_area   *area;			/* DSA area to allocate memory from */
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 3fbe02e80d..7460dfff64 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -56,8 +56,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
+extern bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate);
 extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate,
 										  ExprContext *econtext);
+extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+												  ExprContext *econtext);
 extern void ExecHashTableReset(HashJoinTable hashtable);
 extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable);
 extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out
index 3a91c144a2..4ca0e01756 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -767,8 +767,9 @@ select  count(*) from simple r full outer join simple s using (id);
 (1 row)
 
 rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s using (id);
@@ -788,6 +789,31 @@ select  count(*) from simple r full outer join simple s using (id);
  20000
 (1 row)
 
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
 rollback to settings;
 -- An full outer join where every record is not matched.
 -- non-parallel
@@ -812,8 +838,9 @@ select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 (1 row)
 
 rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
@@ -833,6 +860,31 @@ select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
  40000
 (1 row)
 
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: ((0 - s.id) = r.id)
+                     ->  Parallel Seq Scan on simple s
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple r
+(9 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
 rollback to settings;
 -- exercise special code paths for huge tuples (note use of non-strict
 -- expression and left join required to get the detoasted tuple into
diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql
index 68c1a8c7b6..504b3611ca 100644
--- a/src/test/regress/sql/join_hash.sql
+++ b/src/test/regress/sql/join_hash.sql
@@ -418,7 +418,16 @@ explain (costs off)
 select  count(*) from simple r full outer join simple s using (id);
 rollback to settings;
 
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
 savepoint settings;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
@@ -436,14 +445,24 @@ explain (costs off)
 select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 rollback to settings;
 
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 rollback to settings;
 
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+
 -- exercise special code paths for huge tuples (note use of non-strict
 -- expression and left join required to get the detoasted tuple into
 -- the hash table)
-- 
2.30.1

Reply via email to