From 549bd36fc39282503d2ab83f7827437ddf6f3e1b Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Thu, 12 Sep 2019 00:30:49 +1200
Subject: [PATCH] WIP:  Add support for Parallel Full Hash Join.

This has an unsolved problem: it's dangerous to run BarrierArriveAndWait()
when you're in a phase that has emitted tuples, because some process P
might be blocked writing to a full tuple queue, while the leader process,
which should be reading it, is also running the subplan and is waiting
for P!
---
 src/backend/executor/nodeHash.c         | 90 +++++++++++++++++++++++--
 src/backend/executor/nodeHashjoin.c     | 35 ++++++++--
 src/backend/optimizer/path/joinpath.c   |  9 +--
 src/backend/postmaster/pgstat.c         |  3 +
 src/backend/storage/ipc/barrier.c       |  1 -
 src/include/executor/hashjoin.h         |  5 +-
 src/include/executor/nodeHash.h         |  2 +
 src/include/nodes/execnodes.h           |  2 +
 src/include/pgstat.h                    |  1 +
 src/test/regress/expected/join_hash.out | 56 ++++++++++++++-
 src/test/regress/sql/join_hash.sql      | 23 ++++++-
 11 files changed, 204 insertions(+), 23 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 224cbb32ba..c366a523c6 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -2050,6 +2050,7 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
 	hjstate->hj_CurBucketNo = 0;
 	hjstate->hj_CurSkewBucketNo = 0;
 	hjstate->hj_CurTuple = NULL;
+	hjstate->hj_AllocatedBucketRange = 0;
 }
 
 /*
@@ -2126,6 +2127,87 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
 	return false;
 }
 
+/*
+ * ExecParallelScanHashTableForUnmatched
+ *		scan the hash table for unmatched inner tuples, in parallel
+ *
+ * On success, the inner tuple is stored into hjstate->hj_CurTuple and
+ * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
+ * for the latter.
+ */
+bool
+ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+									  ExprContext *econtext)
+{
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	HashJoinTuple hashTuple = hjstate->hj_CurTuple;
+
+	for (;;)
+	{
+		/*
+		 * hj_CurTuple is the address of the tuple last returned from the
+		 * current bucket, or NULL if it's time to start scanning a new
+		 * bucket.
+		 */
+		if (hashTuple != NULL)
+			hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
+		else if (hjstate->hj_CurBucketNo < hjstate->hj_AllocatedBucketRange)
+			hashTuple = ExecParallelHashFirstTuple(hashtable,
+												   hjstate->hj_CurBucketNo++);
+		else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
+		{
+			/*
+			 * Allocate a few cachelines' worth of buckets and loop around.
+			 * Testing shows that 8 is a good factor.
+			 */
+			int step = (PG_CACHE_LINE_SIZE * 8) / sizeof(dsa_pointer_atomic);
+
+			hjstate->hj_CurBucketNo =
+				pg_atomic_fetch_add_u32(&hashtable->batches[hashtable->curbatch].shared->bucket,
+										step);
+			hjstate->hj_AllocatedBucketRange =
+				Min(hjstate->hj_CurBucketNo + step, hashtable->nbuckets);
+		}
+		else
+			break;				/* finished all buckets */
+
+		while (hashTuple != NULL)
+		{
+			if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+			{
+				TupleTableSlot *inntuple;
+
+				/* insert hashtable's tuple into exec slot */
+				inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+												 hjstate->hj_HashTupleSlot,
+												 false);	/* do not pfree */
+				econtext->ecxt_innertuple = inntuple;
+
+				/*
+				 * Reset temp memory each time; although this function doesn't
+				 * do any qual eval, the caller will, so let's keep it
+				 * parallel to ExecScanHashBucket.
+				 */
+				ResetExprContext(econtext);
+
+				hjstate->hj_CurTuple = hashTuple;
+				return true;
+			}
+
+			hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
+		}
+
+		/* allow this loop to be cancellable */
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	/*
+	 * no more unmatched tuples
+	 */
+	return false;
+}
+
+
 /*
  * ExecHashTableReset
  *
@@ -2937,6 +3019,7 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
 		 * up the Barrier.
 		 */
 		BarrierInit(&shared->batch_barrier, 0);
+		pg_atomic_init_u32(&shared->bucket, 0);
 		if (i == 0)
 		{
 			/* Batch 0 doesn't need to be loaded. */
@@ -3097,13 +3180,6 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
 		/* Detach from the batch we were last working on. */
 		if (BarrierArriveAndDetach(&batch->batch_barrier))
 		{
-			/*
-			 * Technically we shouldn't access the barrier because we're no
-			 * longer attached, but since there is no way it's moving after
-			 * this point it seems safe to make the following assertion.
-			 */
-			Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE);
-
 			/* Free shared chunks and buckets. */
 			while (DsaPointerIsValid(batch->chunks))
 			{
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index ec37558c12..1c9a40dcff 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -82,6 +82,7 @@
  *  PHJ_BATCH_ALLOCATING     -- one allocates buckets
  *  PHJ_BATCH_LOADING        -- all load the hash table from disk
  *  PHJ_BATCH_PROBING        -- all probe
+ *  PHJ_BATCH_SCAN_INNER     -- scan unmatched inner tuples
  *  PHJ_BATCH_DONE           -- end
  *
  * Batch 0 is a special case, because it starts out in phase
@@ -97,9 +98,9 @@
  * all other backends attached to it are actively executing the node or have
  * already arrived.  Practically, that means that we never return a tuple
  * while attached to a barrier, unless the barrier has reached its final
- * state.  In the slightly special case of the per-batch barrier, we return
- * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
- * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
+ * state.
+ *
+ * TODO: WIP: That's now not true, because of PHJ_SCAN_INNER.
  *
  *-------------------------------------------------------------------------
  */
@@ -144,6 +145,7 @@ static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
 												 uint32 *hashvalue,
 												 TupleTableSlot *tupleSlot);
 static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
+static void ExecParallelHashEndProbe(HashJoinState *hjstate);
 static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
 static void ExecParallelHashJoinPartitionOuter(HashJoinState *node);
 
@@ -358,6 +360,9 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				if (TupIsNull(outerTupleSlot))
 				{
 					/* end of batch, or maybe whole join */
+					if (parallel)
+						ExecParallelHashEndProbe(node);
+
 					if (HJ_FILL_INNER(node))
 					{
 						/* set up to scan for unmatched inner tuples */
@@ -512,7 +517,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				 * so any unmatched inner tuples in the hashtable have to be
 				 * emitted before we continue to the next batch.
 				 */
-				if (!ExecScanHashTableForUnmatched(node, econtext))
+				if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
+							   : ExecScanHashTableForUnmatched(node, econtext)))
 				{
 					/* no more unmatched tuples */
 					node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -723,6 +729,7 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
 	hjstate->hj_CurBucketNo = 0;
 	hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
 	hjstate->hj_CurTuple = NULL;
+	hjstate->hj_AllocatedBucketRange = 0;
 
 	hjstate->hj_OuterHashKeys = ExecInitExprList(node->hashkeys,
 												 (PlanState *) hjstate);
@@ -1060,6 +1067,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 	return true;
 }
 
+static void
+ExecParallelHashEndProbe(HashJoinState *hjstate)
+{
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	Barrier *batch_barrier =
+		&hashtable->batches[hashtable->curbatch].shared->batch_barrier;
+
+	Assert(BarrierPhase(batch_barrier) == PHJ_BATCH_PROBING);
+	BarrierArriveAndWait(batch_barrier, WAIT_EVENT_HASH_BATCH_PROBING);
+	Assert(BarrierPhase(batch_barrier) == PHJ_BATCH_SCAN_INNER);
+}
+
 /*
  * Choose a batch to work on, and attach to it.  Returns true if successful,
  * false if there are no more batches.
@@ -1155,13 +1174,16 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
 					 * probing it, but no participant is allowed to wait at
 					 * this barrier again (or else a deadlock could occur).
 					 * All attached participants must eventually call
-					 * BarrierArriveAndDetach() so that the final phase
-					 * PHJ_BATCH_DONE can be reached.
+					 * BarrierArriveAndDetach() so that the next phase can be
+					 * reached.
 					 */
 					ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
 					sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
 					return true;
 
+				case PHJ_BATCH_SCAN_INNER:
+					/* TODO -- not right */
+
 				case PHJ_BATCH_DONE:
 
 					/*
@@ -1335,6 +1357,7 @@ ExecReScanHashJoin(HashJoinState *node)
 	node->hj_CurBucketNo = 0;
 	node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
 	node->hj_CurTuple = NULL;
+	node->hj_AllocatedBucketRange = 0;
 
 	node->hj_MatchedOuter = false;
 	node->hj_FirstOuterTupleSlot = NULL;
diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c
index dc28b56e74..ef613d7696 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -1853,8 +1853,6 @@ hash_inner_and_outer(PlannerInfo *root,
 		 */
 		if (joinrel->consider_parallel &&
 			save_jointype != JOIN_UNIQUE_OUTER &&
-			save_jointype != JOIN_FULL &&
-			save_jointype != JOIN_RIGHT &&
 			outerrel->partial_pathlist != NIL &&
 			bms_is_empty(joinrel->lateral_relids))
 		{
@@ -1888,9 +1886,12 @@ hash_inner_and_outer(PlannerInfo *root,
 			 * total inner path will also be parallel-safe, but if not, we'll
 			 * have to search for the cheapest safe, unparameterized inner
 			 * path.  If doing JOIN_UNIQUE_INNER, we can't use any alternative
-			 * inner path.
+			 * inner path.  If full or right join, we can't use parallelism
+			 * at all because no one process has all the match bits.
 			 */
-			if (cheapest_total_inner->parallel_safe)
+			if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT)
+				cheapest_safe_inner = NULL;
+			else if (cheapest_total_inner->parallel_safe)
 				cheapest_safe_inner = cheapest_total_inner;
 			else if (save_jointype != JOIN_UNIQUE_INNER)
 				cheapest_safe_inner =
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 011076c3e3..d64cb976db 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3774,6 +3774,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 		case WAIT_EVENT_HASH_BATCH_LOADING:
 			event_name = "Hash/Batch/Loading";
 			break;
+		case WAIT_EVENT_HASH_BATCH_PROBING:
+			event_name = "Hash/Batch/Probing";
+			break;
 		case WAIT_EVENT_HASH_BUILD_ALLOCATING:
 			event_name = "Hash/Build/Allocating";
 			break;
diff --git a/src/backend/storage/ipc/barrier.c b/src/backend/storage/ipc/barrier.c
index 83cbe33107..170d002444 100644
--- a/src/backend/storage/ipc/barrier.c
+++ b/src/backend/storage/ipc/barrier.c
@@ -221,7 +221,6 @@ BarrierAttach(Barrier *barrier)
 	++barrier->participants;
 	phase = barrier->phase;
 	SpinLockRelease(&barrier->mutex);
-
 	return phase;
 }
 
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 2c94b926d3..f68f4568df 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -160,6 +160,8 @@ typedef struct ParallelHashJoinBatch
 	size_t		old_ntuples;	/* number of tuples before repartitioning */
 	bool		space_exhausted;
 
+	pg_atomic_uint32 bucket;	/* bucket allocator for unmatched inner scan */
+
 	/*
 	 * Variable-sized SharedTuplestore objects follow this struct in memory.
 	 * See the accessor macros below.
@@ -265,7 +267,8 @@ typedef struct ParallelHashJoinState
 #define PHJ_BATCH_ALLOCATING			1
 #define PHJ_BATCH_LOADING				2
 #define PHJ_BATCH_PROBING				3
-#define PHJ_BATCH_DONE					4
+#define PHJ_BATCH_SCAN_INNER			4
+#define PHJ_BATCH_DONE					5
 
 /* The phases of batch growth while hashing, for grow_batches_barrier. */
 #define PHJ_GROW_BATCHES_ELECTING		0
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index fc80f03aa8..94b0be380a 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -58,6 +58,8 @@ extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econ
 extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
 extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate,
 										  ExprContext *econtext);
+extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+												  ExprContext *econtext);
 extern void ExecHashTableReset(HashJoinTable hashtable);
 extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable);
 extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index b593d22c48..bdf58cd2ee 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1913,6 +1913,7 @@ typedef struct MergeJoinState
  *								tuple, or NULL if starting search
  *								(hj_CurXXX variables are undefined if
  *								OuterTupleSlot is empty!)
+ *		hj_AllocatedBucketRange	range allocated for parallel unmatched scan
  *		hj_OuterTupleSlot		tuple slot for outer tuples
  *		hj_HashTupleSlot		tuple slot for inner (hashed) tuples
  *		hj_NullOuterTupleSlot	prepared null tuple for right/full outer joins
@@ -1939,6 +1940,7 @@ typedef struct HashJoinState
 	uint32		hj_CurHashValue;
 	int			hj_CurBucketNo;
 	int			hj_CurSkewBucketNo;
+	int			hj_AllocatedBucketRange;
 	HashJoinTuple hj_CurTuple;
 	TupleTableSlot *hj_OuterTupleSlot;
 	TupleTableSlot *hj_HashTupleSlot;
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index fe076d823d..ef2fbe39ca 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -827,6 +827,7 @@ typedef enum
 	WAIT_EVENT_HASH_BATCH_ALLOCATING,
 	WAIT_EVENT_HASH_BATCH_ELECTING,
 	WAIT_EVENT_HASH_BATCH_LOADING,
+	WAIT_EVENT_HASH_BATCH_PROBING,
 	WAIT_EVENT_HASH_BUILD_ALLOCATING,
 	WAIT_EVENT_HASH_BUILD_ELECTING,
 	WAIT_EVENT_HASH_BUILD_HASHING_INNER,
diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out
index 3a91c144a2..4ca0e01756 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -767,8 +767,9 @@ select  count(*) from simple r full outer join simple s using (id);
 (1 row)
 
 rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s using (id);
@@ -788,6 +789,31 @@ select  count(*) from simple r full outer join simple s using (id);
  20000
 (1 row)
 
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
 rollback to settings;
 -- An full outer join where every record is not matched.
 -- non-parallel
@@ -812,8 +838,9 @@ select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 (1 row)
 
 rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
@@ -833,6 +860,31 @@ select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
  40000
 (1 row)
 
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: ((0 - s.id) = r.id)
+                     ->  Parallel Seq Scan on simple s
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple r
+(9 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
 rollback to settings;
 -- exercise special code paths for huge tuples (note use of non-strict
 -- expression and left join required to get the detoasted tuple into
diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql
index 68c1a8c7b6..504b3611ca 100644
--- a/src/test/regress/sql/join_hash.sql
+++ b/src/test/regress/sql/join_hash.sql
@@ -418,7 +418,16 @@ explain (costs off)
 select  count(*) from simple r full outer join simple s using (id);
 rollback to settings;
 
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
 savepoint settings;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
@@ -436,14 +445,24 @@ explain (costs off)
 select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 rollback to settings;
 
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 rollback to settings;
 
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+
 -- exercise special code paths for huge tuples (note use of non-strict
 -- expression and left join required to get the detoasted tuple into
 -- the hash table)
-- 
2.22.0

