From e91ec07029df3422184e316d47e2b1e4b74835cc Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Thu, 17 Sep 2020 14:10:28 -0700
Subject: [PATCH v1] Support Parallel FOJ and ROJ

To support parallel FOJ and ROJ,
- re-enable setting match bit for tuples in the hash table
- a single worker preps for unmatched inner tuple scan in
  HJ_NEED_NEW_OUTER and transitions to HJ_FILL_INNER to avoid deadlock.
        ExecParallelScanHashTableForUnmatched() is no longer safe for
	multiple workers. A single worker will scan each HashMemoryChunk
	in the hash table, freeing it after finishing with it.
---
 src/backend/executor/nodeHash.c         | 116 ++++++++++++++++++++++--
 src/backend/executor/nodeHashjoin.c     |  60 ++++++------
 src/backend/optimizer/path/joinpath.c   |   9 +-
 src/backend/postmaster/pgstat.c         |   3 +
 src/backend/storage/ipc/barrier.c       |  23 ++++-
 src/include/executor/hashjoin.h         |   8 +-
 src/include/executor/nodeHash.h         |   3 +
 src/include/nodes/execnodes.h           |   2 +
 src/include/pgstat.h                    |   1 +
 src/include/storage/barrier.h           |   1 +
 src/test/regress/expected/join_hash.out |  56 +++++++++++-
 src/test/regress/sql/join_hash.sql      |  23 ++++-
 12 files changed, 261 insertions(+), 44 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index ea69eeb2a1..e6487cc35b 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -2056,6 +2056,52 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
 	hjstate->hj_CurBucketNo = 0;
 	hjstate->hj_CurSkewBucketNo = 0;
 	hjstate->hj_CurTuple = NULL;
+	hjstate->hj_AllocatedBucketRange = 0;
+}
+
+/*
+ * ExecPrepHashTableForUnmatched
+ *		set up for a series of ExecScanHashTableForUnmatched calls
+ *		return true if this worker is elected to do the unmatched inner scan
+ */
+bool
+ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
+{
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	int curbatch = hashtable->curbatch;
+	ParallelHashJoinBatchAccessor *batch_accessor  = &hashtable->batches[curbatch];
+	ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+	bool last = false;
+	hjstate->hj_CurBucketNo = 0;
+	hjstate->hj_CurSkewBucketNo = 0;
+	hjstate->hj_CurTuple = NULL;
+	hjstate->hj_AllocatedBucketRange = 0;
+	if (curbatch < 0)
+		return false;
+	last = BarrierDetachOrElect(&batch->batch_barrier);
+	if (!last)
+	{
+		hashtable->batches[hashtable->curbatch].done = true;
+		/* Make sure any temporary files are closed. */
+		sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
+		sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
+		/*
+		 * Track the largest batch we've been attached to.  Though each
+		 * backend might see a different subset of batches, explain.c will
+		 * scan the results from all backends to find the largest value.
+		 */
+		hashtable->spacePeak =
+			Max(hashtable->spacePeak,
+			    batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
+		hashtable->curbatch = -1;
+	}
+	else
+	{
+		batch_accessor->shared_chunk = batch->chunks;
+		batch_accessor->current_chunk = dsa_get_address(hashtable->area, batch_accessor->shared_chunk);
+		batch_accessor->current_chunk_idx = 0;
+	}
+	return last;
 }
 
 /*
@@ -2132,6 +2178,65 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
 	return false;
 }
 
+/*
+ * ExecParallelScanHashTableForUnmatched
+ *		scan the hash table for unmatched inner tuples, in parallel
+ *
+ * On success, the inner tuple is stored into hjstate->hj_CurTuple and
+ * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
+ * for the latter.
+ */
+bool
+ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+									  ExprContext *econtext)
+{
+	dsa_pointer next;
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	int curbatch = hashtable->curbatch;
+	ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[curbatch];
+
+	while (accessor->current_chunk)
+	{
+		while (accessor->current_chunk_idx < accessor->current_chunk->used)
+		{
+			HashJoinTuple hashTuple = (HashJoinTuple) (
+				HASH_CHUNK_DATA(accessor->current_chunk) + accessor->current_chunk_idx
+			);
+			accessor->current_chunk_idx += MAXALIGN(HJTUPLE_OVERHEAD + HJTUPLE_MINTUPLE(hashTuple)->t_len);
+
+			if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+				continue;
+
+			/* insert hashtable's tuple into exec slot */
+			econtext->ecxt_innertuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), hjstate->hj_HashTupleSlot,false);
+
+			/*
+			 * Reset temp memory each time; although this function doesn't
+			 * do any qual eval, the caller will, so let's keep it parallel to
+			 * ExecScanHashBucket.
+			 */
+			ResetExprContext(econtext);
+
+			hjstate->hj_CurTuple = hashTuple;
+			return true;
+		}
+
+		next = accessor->current_chunk->next.shared;
+		dsa_free(hashtable->area, accessor->shared_chunk);
+		accessor->shared_chunk = next;
+		accessor->current_chunk = dsa_get_address(hashtable->area, accessor->shared_chunk);
+		accessor->current_chunk_idx = 0;
+
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	accessor->shared->chunks = InvalidDsaPointer;
+	/*
+	 * no more unmatched tuples
+	 */
+	return false;
+}
+
 /*
  * ExecHashTableReset
  *
@@ -2971,6 +3076,7 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
 		 * up the Barrier.
 		 */
 		BarrierInit(&shared->batch_barrier, 0);
+		pg_atomic_init_u32(&shared->bucket, 0);
 		if (i == 0)
 		{
 			/* Batch 0 doesn't need to be loaded. */
@@ -3131,13 +3237,6 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
 		/* Detach from the batch we were last working on. */
 		if (BarrierArriveAndDetach(&batch->batch_barrier))
 		{
-			/*
-			 * Technically we shouldn't access the barrier because we're no
-			 * longer attached, but since there is no way it's moving after
-			 * this point it seems safe to make the following assertion.
-			 */
-			Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE);
-
 			/* Free shared chunks and buckets. */
 			while (DsaPointerIsValid(batch->chunks))
 			{
@@ -3271,6 +3370,9 @@ ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
 	hashtable->current_chunk = NULL;
 	hashtable->current_chunk_shared = InvalidDsaPointer;
 	hashtable->batches[batchno].at_least_one_chunk = false;
+	hashtable->batches[batchno].shared_chunk = InvalidDsaPointer;
+	hashtable->batches[batchno].current_chunk = NULL;
+	hashtable->batches[batchno].current_chunk_idx = 0;
 }
 
 /*
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 5532b91a71..23e652d130 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -82,7 +82,9 @@
  *  PHJ_BATCH_ALLOCATING     -- one allocates buckets
  *  PHJ_BATCH_LOADING        -- all load the hash table from disk
  *  PHJ_BATCH_PROBING        -- all probe
- *  PHJ_BATCH_DONE           -- end
+
+ *  PHJ_BATCH_DONE           -- queries not requiring inner fill done
+ *  PHJ_BATCH_FILL_INNER_DONE -- inner fill completed, all queries done
  *
  * Batch 0 is a special case, because it starts out in phase
  * PHJ_BATCH_PROBING; populating batch 0's hash table is done during
@@ -238,6 +240,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				 * from the outer plan node.  If we succeed, we have to stash
 				 * it away for later consumption by ExecHashJoinOuterGetTuple.
 				 */
+				//volatile int mybp = 0; while (mybp == 0) {};
 				if (HJ_FILL_INNER(node))
 				{
 					/* no chance to not build the hash table */
@@ -360,9 +363,19 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 					/* end of batch, or maybe whole join */
 					if (HJ_FILL_INNER(node))
 					{
-						/* set up to scan for unmatched inner tuples */
-						ExecPrepHashTableForUnmatched(node);
-						node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+						if (parallel)
+						{
+							if (ExecParallelPrepHashTableForUnmatched(node))
+								node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+							else
+								node->hj_JoinState = HJ_NEED_NEW_BATCH;
+						}
+						else
+						{
+							/* set up to scan for unmatched inner tuples */
+							ExecPrepHashTableForUnmatched(node);
+							node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+						}
 					}
 					else
 						node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -455,25 +468,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				{
 					node->hj_MatchedOuter = true;
 
-					if (parallel)
-					{
-						/*
-						 * Full/right outer joins are currently not supported
-						 * for parallel joins, so we don't need to set the
-						 * match bit.  Experiments show that it's worth
-						 * avoiding the shared memory traffic on large
-						 * systems.
-						 */
-						Assert(!HJ_FILL_INNER(node));
-					}
-					else
-					{
-						/*
-						 * This is really only needed if HJ_FILL_INNER(node),
-						 * but we'll avoid the branch and just set it always.
-						 */
+
+					/*
+					 * This is really only needed if HJ_FILL_INNER(node),
+					 * but we'll avoid the branch and just set it always.
+					 */
+					if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
 						HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
-					}
 
 					/* In an antijoin, we never return a matched tuple */
 					if (node->js.jointype == JOIN_ANTI)
@@ -531,7 +532,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				 * so any unmatched inner tuples in the hashtable have to be
 				 * emitted before we continue to the next batch.
 				 */
-				if (!ExecScanHashTableForUnmatched(node, econtext))
+				if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
+							   : ExecScanHashTableForUnmatched(node, econtext)))
 				{
 					/* no more unmatched tuples */
 					node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -742,6 +744,7 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
 	hjstate->hj_CurBucketNo = 0;
 	hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
 	hjstate->hj_CurTuple = NULL;
+	hjstate->hj_AllocatedBucketRange = 0;
 
 	hjstate->hj_OuterHashKeys = ExecInitExprList(node->hashkeys,
 												 (PlanState *) hjstate);
@@ -1173,15 +1176,17 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
 					 * hash table stays alive until everyone's finished
 					 * probing it, but no participant is allowed to wait at
 					 * this barrier again (or else a deadlock could occur).
-					 * All attached participants must eventually call
-					 * BarrierArriveAndDetach() so that the final phase
-					 * PHJ_BATCH_DONE can be reached.
+					 * All attached participants must eventually detach from
+					 * the barrier and one worker must advance the phase
+					 * so that the final phase is reached.
 					 */
 					ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
 					sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
 					return true;
-
 				case PHJ_BATCH_DONE:
+					/* Fall through. */
+
+				case PHJ_BATCH_FILL_INNER_DONE:
 
 					/*
 					 * Already done.  Detach and go around again (if any
@@ -1360,6 +1365,7 @@ ExecReScanHashJoin(HashJoinState *node)
 	node->hj_CurBucketNo = 0;
 	node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
 	node->hj_CurTuple = NULL;
+	node->hj_AllocatedBucketRange = 0;
 
 	node->hj_MatchedOuter = false;
 	node->hj_FirstOuterTupleSlot = NULL;
diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c
index db54a6ba2e..d30ea4d86d 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -1852,8 +1852,6 @@ hash_inner_and_outer(PlannerInfo *root,
 		 */
 		if (joinrel->consider_parallel &&
 			save_jointype != JOIN_UNIQUE_OUTER &&
-			save_jointype != JOIN_FULL &&
-			save_jointype != JOIN_RIGHT &&
 			outerrel->partial_pathlist != NIL &&
 			bms_is_empty(joinrel->lateral_relids))
 		{
@@ -1887,9 +1885,12 @@ hash_inner_and_outer(PlannerInfo *root,
 			 * total inner path will also be parallel-safe, but if not, we'll
 			 * have to search for the cheapest safe, unparameterized inner
 			 * path.  If doing JOIN_UNIQUE_INNER, we can't use any alternative
-			 * inner path.
+			 * inner path.  If full or right join, we can't use parallelism
+			 * at all because no one process has all the match bits.
 			 */
-			if (cheapest_total_inner->parallel_safe)
+			if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT)
+				cheapest_safe_inner = NULL;
+			else if (cheapest_total_inner->parallel_safe)
 				cheapest_safe_inner = cheapest_total_inner;
 			else if (save_jointype != JOIN_UNIQUE_INNER)
 				cheapest_safe_inner =
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index e6be2b7836..f6f242d806 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3782,6 +3782,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 		case WAIT_EVENT_HASH_BATCH_LOAD:
 			event_name = "HashBatchLoad";
 			break;
+		case WAIT_EVENT_HASH_BATCH_PROBE:
+			event_name = "HashBatchProbe";
+			break;
 		case WAIT_EVENT_HASH_BUILD_ALLOCATE:
 			event_name = "HashBuildAllocate";
 			break;
diff --git a/src/backend/storage/ipc/barrier.c b/src/backend/storage/ipc/barrier.c
index 3e200e02cc..5ebd18314f 100644
--- a/src/backend/storage/ipc/barrier.c
+++ b/src/backend/storage/ipc/barrier.c
@@ -204,6 +204,28 @@ BarrierArriveAndDetach(Barrier *barrier)
 {
 	return BarrierDetachImpl(barrier, true);
 }
+/*
+ * Upon arriving at the barrier, if this worker is not the last worker attached,
+ * detach from the barrier and return false. If this worker is the last worker,
+ * remain attached and advance the phase of the barrier, return true to indicate
+ * you are the last or "elected" worker who is still attached to the barrier.
+ * Another name I considered was BarrierUniqueify or BarrierSoloAssign
+ */
+bool
+BarrierDetachOrElect(Barrier *barrier)
+{
+	SpinLockAcquire(&barrier->mutex);
+	if (barrier->participants > 1)
+	{
+		--barrier->participants;
+		SpinLockRelease(&barrier->mutex);
+		return false;
+	}
+	Assert(barrier->participants == 1);
+	++barrier->phase;
+	SpinLockRelease(&barrier->mutex);
+	return true;
+}
 
 /*
  * Attach to a barrier.  All waiting participants will now wait for this
@@ -221,7 +243,6 @@ BarrierAttach(Barrier *barrier)
 	++barrier->participants;
 	phase = barrier->phase;
 	SpinLockRelease(&barrier->mutex);
-
 	return phase;
 }
 
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index eb5daba36b..8cf36cb6c5 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -160,6 +160,8 @@ typedef struct ParallelHashJoinBatch
 	size_t		old_ntuples;	/* number of tuples before repartitioning */
 	bool		space_exhausted;
 
+	pg_atomic_uint32 bucket;	/* bucket allocator for unmatched inner scan */
+
 	/*
 	 * Variable-sized SharedTuplestore objects follow this struct in memory.
 	 * See the accessor macros below.
@@ -205,6 +207,9 @@ typedef struct ParallelHashJoinBatchAccessor
 	bool		at_least_one_chunk; /* has this backend allocated a chunk? */
 
 	bool		done;			/* flag to remember that a batch is done */
+	dsa_pointer shared_chunk; /* current chunk in hashtable for scanning for unmatched inner tuples serially */
+	HashMemoryChunk current_chunk;
+	size_t current_chunk_idx;
 	SharedTuplestoreAccessor *inner_tuples;
 	SharedTuplestoreAccessor *outer_tuples;
 } ParallelHashJoinBatchAccessor;
@@ -265,7 +270,8 @@ typedef struct ParallelHashJoinState
 #define PHJ_BATCH_ALLOCATING			1
 #define PHJ_BATCH_LOADING				2
 #define PHJ_BATCH_PROBING				3
-#define PHJ_BATCH_DONE					4
+#define PHJ_BATCH_DONE			        4
+#define PHJ_BATCH_FILL_INNER_DONE		5
 
 /* The phases of batch growth while hashing, for grow_batches_barrier. */
 #define PHJ_GROW_BATCHES_ELECTING		0
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 2db4e2f672..a642736d54 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -56,8 +56,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
+extern bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate);
 extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate,
 										  ExprContext *econtext);
+extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+												  ExprContext *econtext);
 extern void ExecHashTableReset(HashJoinTable hashtable);
 extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable);
 extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index a5ab1aed14..a6b2446958 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1921,6 +1921,7 @@ typedef struct MergeJoinState
  *								tuple, or NULL if starting search
  *								(hj_CurXXX variables are undefined if
  *								OuterTupleSlot is empty!)
+ *		hj_AllocatedBucketRange	range allocated for parallel unmatched scan
  *		hj_OuterTupleSlot		tuple slot for outer tuples
  *		hj_HashTupleSlot		tuple slot for inner (hashed) tuples
  *		hj_NullOuterTupleSlot	prepared null tuple for right/full outer joins
@@ -1947,6 +1948,7 @@ typedef struct HashJoinState
 	uint32		hj_CurHashValue;
 	int			hj_CurBucketNo;
 	int			hj_CurSkewBucketNo;
+	int			hj_AllocatedBucketRange;
 	HashJoinTuple hj_CurTuple;
 	TupleTableSlot *hj_OuterTupleSlot;
 	TupleTableSlot *hj_HashTupleSlot;
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 0dfbac46b4..62d5f1d16b 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -856,6 +856,7 @@ typedef enum
 	WAIT_EVENT_HASH_BATCH_ALLOCATE,
 	WAIT_EVENT_HASH_BATCH_ELECT,
 	WAIT_EVENT_HASH_BATCH_LOAD,
+	WAIT_EVENT_HASH_BATCH_PROBE,
 	WAIT_EVENT_HASH_BUILD_ALLOCATE,
 	WAIT_EVENT_HASH_BUILD_ELECT,
 	WAIT_EVENT_HASH_BUILD_HASH_INNER,
diff --git a/src/include/storage/barrier.h b/src/include/storage/barrier.h
index d71927cc2f..8b563fa7e0 100644
--- a/src/include/storage/barrier.h
+++ b/src/include/storage/barrier.h
@@ -37,6 +37,7 @@ typedef struct Barrier
 extern void BarrierInit(Barrier *barrier, int num_workers);
 extern bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info);
 extern bool BarrierArriveAndDetach(Barrier *barrier);
+extern bool BarrierDetachOrElect(Barrier *barrier);
 extern int	BarrierAttach(Barrier *barrier);
 extern bool BarrierDetach(Barrier *barrier);
 extern int	BarrierPhase(Barrier *barrier);
diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out
index 3a91c144a2..4ca0e01756 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -767,8 +767,9 @@ select  count(*) from simple r full outer join simple s using (id);
 (1 row)
 
 rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s using (id);
@@ -788,6 +789,31 @@ select  count(*) from simple r full outer join simple s using (id);
  20000
 (1 row)
 
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
 rollback to settings;
 -- An full outer join where every record is not matched.
 -- non-parallel
@@ -812,8 +838,9 @@ select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 (1 row)
 
 rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
@@ -833,6 +860,31 @@ select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
  40000
 (1 row)
 
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: ((0 - s.id) = r.id)
+                     ->  Parallel Seq Scan on simple s
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple r
+(9 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
 rollback to settings;
 -- exercise special code paths for huge tuples (note use of non-strict
 -- expression and left join required to get the detoasted tuple into
diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql
index 68c1a8c7b6..504b3611ca 100644
--- a/src/test/regress/sql/join_hash.sql
+++ b/src/test/regress/sql/join_hash.sql
@@ -418,7 +418,16 @@ explain (costs off)
 select  count(*) from simple r full outer join simple s using (id);
 rollback to settings;
 
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
 savepoint settings;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
@@ -436,14 +445,24 @@ explain (costs off)
 select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 rollback to settings;
 
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 rollback to settings;
 
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+
 -- exercise special code paths for huge tuples (note use of non-strict
 -- expression and left join required to get the detoasted tuple into
 -- the hash table)
-- 
2.20.1

