From 05b543dfc54a30538e408ed898e7682a954df6b9 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Wed, 4 Nov 2020 14:25:33 -0800
Subject: [PATCH v8 3/3] Parallel Hash {Full,Right} Outer Join.

Previously, parallel full and right outer joins were not supported due
to a potential deadlock hazard (see discussion).

For now, sidestep the problem by terminating parallelism for the
unmatched inner tuple scan. The last process to arrive at the barrier
prepares for the unmatched inner tuple scan in HJ_NEED_NEW_OUTER and
transitions to HJ_FILL_INNER, scanning the hash table and emitting
unmatched inner tuples.  Other processes are free to go and work on
other batches, if there are any.

To make parallel and serial hash join more consistent, change the serial
version to scan match bits in tuple chunk order, instead of doing it in
hash table bucket order.

Author: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
Discussion: https://postgr.es/m/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com
---
 src/backend/executor/nodeHash.c         | 205 ++++++++++++++++++------
 src/backend/executor/nodeHashjoin.c     |  59 +++----
 src/backend/optimizer/path/joinpath.c   |  14 +-
 src/include/executor/hashjoin.h         |  15 +-
 src/include/executor/nodeHash.h         |   3 +
 src/test/regress/expected/join_hash.out |  56 ++++++-
 src/test/regress/sql/join_hash.sql      |  23 ++-
 7 files changed, 283 insertions(+), 92 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 3ba688e8e0..e7d420ee12 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -517,6 +517,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 		hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100;
 	hashtable->chunks = NULL;
 	hashtable->current_chunk = NULL;
+	hashtable->current_chunk_idx = 0;
 	hashtable->parallel_state = state->parallel_state;
 	hashtable->area = state->ps.state->es_query_dsa;
 	hashtable->batches = NULL;
@@ -2070,16 +2071,72 @@ void
 ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
 {
 	/*----------
-	 * During this scan we use the HashJoinState fields as follows:
+	 * During this scan we use the HashJoinTable fields as follows:
 	 *
-	 * hj_CurBucketNo: next regular bucket to scan
-	 * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums)
-	 * hj_CurTuple: last tuple returned, or NULL to start next bucket
+	 * current_chunk: current HashMemoryChunk to scan
+	 * current_chunk_idx: index in current HashMemoryChunk
 	 *----------
 	 */
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+
 	hjstate->hj_CurBucketNo = 0;
 	hjstate->hj_CurSkewBucketNo = 0;
 	hjstate->hj_CurTuple = NULL;
+	hashtable->current_chunk = hashtable->chunks;
+	hashtable->current_chunk_idx = 0;
+}
+
+/*
+ * ExecParallelPrepHashTableForUnmatched
+ *		set up for a series of ExecParallelScanHashTableForUnmatched calls
+ *		return true if this worker is elected to do the unmatched inner scan
+ */
+bool
+ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
+{
+	/*----------
+	 * During this scan we use the ParallelHashJoinBatchAccessor fields for the
+	 * current batch as follows:
+	 *
+	 * current_chunk: current HashMemoryChunk to scan
+	 * current_chunk_idx: index in current HashMemoryChunk
+	 *----------
+	 */
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	int			curbatch = hashtable->curbatch;
+	ParallelHashJoinBatchAccessor *batch_accessor = &hashtable->batches[curbatch];
+	ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+	bool		last = false;
+
+	hjstate->hj_CurBucketNo = 0;
+	hjstate->hj_CurSkewBucketNo = 0;
+	hjstate->hj_CurTuple = NULL;
+	if (curbatch < 0)
+		return false;
+	last = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
+	if (!last)
+	{
+		hashtable->batches[hashtable->curbatch].done = true;
+		/* Make sure any temporary files are closed. */
+		sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
+		sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
+
+		/*
+		 * Track the largest batch we've been attached to.  Though each
+		 * backend might see a different subset of batches, explain.c will
+		 * scan the results from all backends to find the largest value.
+		 */
+		hashtable->spacePeak =
+			Max(hashtable->spacePeak, batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
+		hashtable->curbatch = -1;
+	}
+	else
+	{
+		batch_accessor->shared_chunk = batch->chunks;
+		batch_accessor->current_chunk = dsa_get_address(hashtable->area, batch_accessor->shared_chunk);
+		batch_accessor->current_chunk_idx = 0;
+	}
+	return last;
 }
 
 /*
@@ -2093,60 +2150,110 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
 bool
 ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
 {
+	HashMemoryChunk next;
 	HashJoinTable hashtable = hjstate->hj_HashTable;
-	HashJoinTuple hashTuple = hjstate->hj_CurTuple;
 
-	for (;;)
+	while (hashtable->current_chunk)
 	{
-		/*
-		 * hj_CurTuple is the address of the tuple last returned from the
-		 * current bucket, or NULL if it's time to start scanning a new
-		 * bucket.
-		 */
-		if (hashTuple != NULL)
-			hashTuple = hashTuple->next.unshared;
-		else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
-		{
-			hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
-			hjstate->hj_CurBucketNo++;
-		}
-		else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
+		while (hashtable->current_chunk_idx < hashtable->current_chunk->used)
 		{
-			int			j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
+			HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(hashtable->current_chunk) +
+													   hashtable->current_chunk_idx);
+			MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
+			int			hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
+
+			/* next tuple in this chunk */
+			hashtable->current_chunk_idx += MAXALIGN(hashTupleSize);
+
+			if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+				continue;
+
+			/* insert hashtable's tuple into exec slot */
+			econtext->ecxt_innertuple =
+				ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+									  hjstate->hj_HashTupleSlot,
+									  false);
+
+			/*
+			 * Reset temp memory each time; although this function doesn't do
+			 * any qual eval, the caller will, so let's keep it parallel to
+			 * ExecScanHashBucket.
+			 */
+			ResetExprContext(econtext);
 
-			hashTuple = hashtable->skewBucket[j]->tuples;
-			hjstate->hj_CurSkewBucketNo++;
+			hjstate->hj_CurTuple = hashTuple;
+			return true;
 		}
-		else
-			break;				/* finished all buckets */
 
-		while (hashTuple != NULL)
+		next = hashtable->current_chunk->next.unshared;
+		hashtable->current_chunk = next;
+		hashtable->current_chunk_idx = 0;
+
+		/* allow this loop to be cancellable */
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	/*
+	 * no more unmatched tuples
+	 */
+	return false;
+}
+
+/*
+ * ExecParallelScanHashTableForUnmatched
+ *		scan the hash table for unmatched inner tuples, in parallel
+ *
+ * On success, the inner tuple is stored into hjstate->hj_CurTuple and
+ * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
+ * for the latter.
+ */
+bool
+ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+									  ExprContext *econtext)
+{
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	int			curbatch = hashtable->curbatch;
+	ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[curbatch];
+
+	/*
+	 * Only one worker should execute this function. Since tuples have already
+	 * been emitted, it is hazardous for workers to wait at the batch_barrier
+	 * again. Instead, all workers except the last will detach and the last
+	 * will conduct this unmatched inner tuple scan.
+	 */
+	Assert(BarrierParticipants(&accessor->shared->batch_barrier) == 1);
+	while (accessor->current_chunk)
+	{
+		while (accessor->current_chunk_idx < accessor->current_chunk->used)
 		{
-			if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
-			{
-				TupleTableSlot *inntuple;
+			HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(accessor->current_chunk) +
+													   accessor->current_chunk_idx);
 
-				/* insert hashtable's tuple into exec slot */
-				inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
-												 hjstate->hj_HashTupleSlot,
-												 false);	/* do not pfree */
-				econtext->ecxt_innertuple = inntuple;
+			accessor->current_chunk_idx += MAXALIGN(HJTUPLE_OVERHEAD +
+													HJTUPLE_MINTUPLE(hashTuple)->t_len);
 
-				/*
-				 * Reset temp memory each time; although this function doesn't
-				 * do any qual eval, the caller will, so let's keep it
-				 * parallel to ExecScanHashBucket.
-				 */
-				ResetExprContext(econtext);
+			if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+				continue;
 
-				hjstate->hj_CurTuple = hashTuple;
-				return true;
-			}
+			/* insert hashtable's tuple into exec slot */
+			econtext->ecxt_innertuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+															  hjstate->hj_HashTupleSlot, false);
+
+			/*
+			 * Reset temp memory each time; although this function doesn't do
+			 * any qual eval, the caller will, so let's keep it parallel to
+			 * ExecScanHashBucket.
+			 */
+			ResetExprContext(econtext);
 
-			hashTuple = hashTuple->next.unshared;
+			hjstate->hj_CurTuple = hashTuple;
+			return true;
 		}
 
-		/* allow this loop to be cancellable */
+		accessor->shared_chunk = accessor->current_chunk->next.shared;
+		accessor->current_chunk = dsa_get_address(hashtable->area, accessor->shared_chunk);
+		accessor->current_chunk_idx = 0;
+
 		CHECK_FOR_INTERRUPTS();
 	}
 
@@ -3152,13 +3259,6 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
 		/* Detach from the batch we were last working on. */
 		if (BarrierArriveAndDetach(&batch->batch_barrier))
 		{
-			/*
-			 * Technically we shouldn't access the barrier because we're no
-			 * longer attached, but since there is no way it's moving after
-			 * this point it seems safe to make the following assertion.
-			 */
-			Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_FREE);
-
 			/* Free shared chunks and buckets. */
 			while (DsaPointerIsValid(batch->chunks))
 			{
@@ -3305,6 +3405,9 @@ ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
 	hashtable->current_chunk = NULL;
 	hashtable->current_chunk_shared = InvalidDsaPointer;
 	hashtable->batches[batchno].at_least_one_chunk = false;
+	hashtable->batches[batchno].shared_chunk = InvalidDsaPointer;
+	hashtable->batches[batchno].current_chunk = NULL;
+	hashtable->batches[batchno].current_chunk_idx = 0;
 }
 
 /*
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 8d2d0f4070..145c075151 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -86,6 +86,7 @@
  *  PHJ_BATCH_ALLOCATE*      -- one allocates buckets
  *  PHJ_BATCH_LOAD           -- all load the hash table from disk
  *  PHJ_BATCH_PROBE          -- all probe
+ *  PHJ_BATCH_SCAN*          -- full/right outer scan
  *  PHJ_BATCH_FREE*          -- one frees memory
  *
  * Batch 0 is a special case, because it starts out in phase
@@ -103,9 +104,10 @@
  * to a barrier, unless the barrier has reached a phase that means that no
  * process will wait on it again.  We emit tuples while attached to the build
  * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
- * PHJ_BATCH_PROBE.  These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_FREE
- * respectively without waiting, using BarrierArriveAndDetach().  The last to
- * detach receives a different return value so that it knows that it's safe to
+ * PHJ_BATCH_PROBE.  These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN
+ * respectively without waiting, using BarrierArriveAndDetach() and
+ * BarrierArriveAndDetachExceptLast() respectively.  The last to detach
+ * receives a different return value so that it knows that it's safe to
  * clean up.  Any straggler process that attaches after that phase is reached
  * will see that it's too late to participate or access the relevant shared
  * memory objects.
@@ -394,9 +396,19 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 					/* end of batch, or maybe whole join */
 					if (HJ_FILL_INNER(node))
 					{
-						/* set up to scan for unmatched inner tuples */
-						ExecPrepHashTableForUnmatched(node);
-						node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+						if (parallel)
+						{
+							if (ExecParallelPrepHashTableForUnmatched(node))
+								node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+							else
+								node->hj_JoinState = HJ_NEED_NEW_BATCH;
+						}
+						else
+						{
+							/* set up to scan for unmatched inner tuples */
+							ExecPrepHashTableForUnmatched(node);
+							node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+						}
 					}
 					else
 						node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -489,25 +501,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				{
 					node->hj_MatchedOuter = true;
 
-					if (parallel)
-					{
-						/*
-						 * Full/right outer joins are currently not supported
-						 * for parallel joins, so we don't need to set the
-						 * match bit.  Experiments show that it's worth
-						 * avoiding the shared memory traffic on large
-						 * systems.
-						 */
-						Assert(!HJ_FILL_INNER(node));
-					}
-					else
-					{
-						/*
-						 * This is really only needed if HJ_FILL_INNER(node),
-						 * but we'll avoid the branch and just set it always.
-						 */
+
+					/*
+					 * This is really only needed if HJ_FILL_INNER(node), but
+					 * we'll avoid the branch and just set it always.
+					 */
+					if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
 						HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
-					}
 
 					/* In an antijoin, we never return a matched tuple */
 					if (node->js.jointype == JOIN_ANTI)
@@ -565,7 +565,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				 * so any unmatched inner tuples in the hashtable have to be
 				 * emitted before we continue to the next batch.
 				 */
-				if (!ExecScanHashTableForUnmatched(node, econtext))
+				if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
+					  : ExecScanHashTableForUnmatched(node, econtext)))
 				{
 					/* no more unmatched tuples */
 					node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -1199,13 +1200,15 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
 					 * hash table stays alive until everyone's finished
 					 * probing it, but no participant is allowed to wait at
 					 * this barrier again (or else a deadlock could occur).
-					 * All attached participants must eventually call
-					 * BarrierArriveAndDetach() so that the final phase
-					 * PHJ_BATCH_FREE can be reached.
+					 * All attached participants must eventually detach from
+					 * the barrier and one worker must advance the phase so
+					 * that the final phase is reached.
 					 */
 					ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
 					sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
 					return true;
+				case PHJ_BATCH_SCAN:
+					/* Fall through. */
 
 				case PHJ_BATCH_FREE:
 
diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c
index 0f3ad8aa65..07726e84c8 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -2106,15 +2106,9 @@ hash_inner_and_outer(PlannerInfo *root,
 		 * able to properly guarantee uniqueness.  Similarly, we can't handle
 		 * JOIN_FULL and JOIN_RIGHT, because they can produce false null
 		 * extended rows.  Also, the resulting path must not be parameterized.
-		 * We would be able to support JOIN_FULL and JOIN_RIGHT for Parallel
-		 * Hash, since in that case we're back to a single hash table with a
-		 * single set of match bits for each batch, but that will require
-		 * figuring out a deadlock-free way to wait for the probe to finish.
 		 */
 		if (joinrel->consider_parallel &&
 			save_jointype != JOIN_UNIQUE_OUTER &&
-			save_jointype != JOIN_FULL &&
-			save_jointype != JOIN_RIGHT &&
 			outerrel->partial_pathlist != NIL &&
 			bms_is_empty(joinrel->lateral_relids))
 		{
@@ -2148,9 +2142,13 @@ hash_inner_and_outer(PlannerInfo *root,
 			 * total inner path will also be parallel-safe, but if not, we'll
 			 * have to search for the cheapest safe, unparameterized inner
 			 * path.  If doing JOIN_UNIQUE_INNER, we can't use any alternative
-			 * inner path.
+			 * inner path.  If full or right join, we can't use parallelism
+			 * (building the hash table in each backend) because no one
+			 * process has all the match bits.
 			 */
-			if (cheapest_total_inner->parallel_safe)
+			if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT)
+				cheapest_safe_inner = NULL;
+			else if (cheapest_total_inner->parallel_safe)
 				cheapest_safe_inner = cheapest_total_inner;
 			else if (save_jointype != JOIN_UNIQUE_INNER)
 				cheapest_safe_inner =
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 176fbef149..fb0f1339d3 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -205,6 +205,15 @@ typedef struct ParallelHashJoinBatchAccessor
 	bool		at_least_one_chunk; /* has this backend allocated a chunk? */
 
 	bool		done;			/* flag to remember that a batch is done */
+
+	/*
+	 * While doing the unmatched inner scan, the assigned worker may emit
+	 * tuples. Thus, we must keep track of where it was in the hashtable so it
+	 * can return to the correct offset within the correct chunk.
+	 */
+	dsa_pointer shared_chunk;
+	HashMemoryChunk current_chunk;
+	size_t		current_chunk_idx;
 	SharedTuplestoreAccessor *inner_tuples;
 	SharedTuplestoreAccessor *outer_tuples;
 } ParallelHashJoinBatchAccessor;
@@ -266,7 +275,8 @@ typedef struct ParallelHashJoinState
 #define PHJ_BATCH_ALLOCATE				1
 #define PHJ_BATCH_LOAD					2
 #define PHJ_BATCH_PROBE					3
-#define PHJ_BATCH_FREE					4
+#define PHJ_BATCH_SCAN					4
+#define PHJ_BATCH_FREE					5
 
 /* The phases of batch growth while hashing, for grow_batches_barrier. */
 #define PHJ_GROW_BATCHES_ELECT			0
@@ -352,6 +362,9 @@ typedef struct HashJoinTableData
 	/* used for dense allocation of tuples (into linked chunks) */
 	HashMemoryChunk chunks;		/* one list for the whole batch */
 
+	/* index of tuple within current chunk for serial unmatched inner scan */
+	size_t		current_chunk_idx;
+
 	/* Shared and private state for Parallel Hash. */
 	HashMemoryChunk current_chunk;	/* this backend's current chunk */
 	dsa_area   *area;			/* DSA area to allocate memory from */
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 3fbe02e80d..7460dfff64 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -56,8 +56,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
+extern bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate);
 extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate,
 										  ExprContext *econtext);
+extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+												  ExprContext *econtext);
 extern void ExecHashTableReset(HashJoinTable hashtable);
 extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable);
 extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out
index 3a91c144a2..4ca0e01756 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -767,8 +767,9 @@ select  count(*) from simple r full outer join simple s using (id);
 (1 row)
 
 rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s using (id);
@@ -788,6 +789,31 @@ select  count(*) from simple r full outer join simple s using (id);
  20000
 (1 row)
 
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
 rollback to settings;
 -- An full outer join where every record is not matched.
 -- non-parallel
@@ -812,8 +838,9 @@ select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 (1 row)
 
 rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
@@ -833,6 +860,31 @@ select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
  40000
 (1 row)
 
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: ((0 - s.id) = r.id)
+                     ->  Parallel Seq Scan on simple s
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple r
+(9 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
 rollback to settings;
 -- exercise special code paths for huge tuples (note use of non-strict
 -- expression and left join required to get the detoasted tuple into
diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql
index 68c1a8c7b6..504b3611ca 100644
--- a/src/test/regress/sql/join_hash.sql
+++ b/src/test/regress/sql/join_hash.sql
@@ -418,7 +418,16 @@ explain (costs off)
 select  count(*) from simple r full outer join simple s using (id);
 rollback to settings;
 
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
 savepoint settings;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
@@ -436,14 +445,24 @@ explain (costs off)
 select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 rollback to settings;
 
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 rollback to settings;
 
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+
 -- exercise special code paths for huge tuples (note use of non-strict
 -- expression and left join required to get the detoasted tuple into
 -- the hash table)
-- 
2.32.0

