On Sun, Mar 26, 2023 at 9:52 AM Melanie Plageman
<melanieplage...@gmail.com> wrote:
> I have some very minor pieces of feedback, mainly about extraneous
> commas that made me uncomfortable ;)

Offensive punctuation removed.

> > discussion).  Therefore FULL JOIN inhibited page-based parallelism,
> > as the other join strategies can't do it either.
>
> I actually don't quite understand what this means? It's been awhile for
> me, so perhaps I'm being dense, but what is page-based parallelism?

Reworded.  I just meant our usual kind of "partial path" parallelism
(the kind when you don't know anything at all about the values of the
tuples that each process sees, and typically it's chopped up by
storage pages at the scan level).

> > That unfairness is considered acceptable for now, because it's better
> > than no parallelism at all.  The build and probe phases are run in
> > parallel, and the new scan-for-unmatched phase, while serial, is usually
> > applied to the smaller of the two relations and is either limited by
> > some multiple of work_mem, or it's too big and is partitioned into
> > batches and then the situation is improved by batch-level parallelism.
> > In future work on deadlock avoidance strategies, we may find a way to
> > parallelize the new phase safely.
>
> Is it worth mentioning something about parallel-oblivious parallel hash
> join not being able to do this still? Or is that obvious?

That's kind of what I meant above.

> > @@ -3116,18 +3256,31 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)

> full/right joins should never fall into this code path, right?

Yeah, this is the normal way we detach from a batch.  This is reached
when shutting down the executor early, or when moving to the next
batch, etc.

***

I found another problem.  I realised that ... FULL JOIN ... LIMIT n
might be able to give wrong answers with unlucky scheduling.
Unfortunately I have been unable to reproduce the phenomenon I am
imagining yet but I can't think of any mechanism that prevents the
following sequence of events:

P0 probes, pulls n tuples from the outer relation and then the
executor starts to shut down (see commit 3452dc52 which pushed down
LIMIT), but just then P1 attaches, right before P0 does.  P1
continues, and finds < n outer tuples while probing and then runs out
so it enters the unmatched scan phase, and starts emitting bogusly
unmatched tuples.  Some outer tuples we needed to get the complete set
of match bits and thus the right answer were buffered inside P0's
subplan and abandoned.

I've attached a simple fixup for this problem.  Short version: if
you're abandoning your PHJ_BATCH_PROBE phase without reaching the end,
you must be shutting down, so the executor must think it's OK to
abandon tuples this process has buffered, so it must also be OK to
throw all unmatched tuples out the window too, as if this process was
about to emit them.  Right?

***

With all the long and abstract discussion of hard to explain problems
in this thread and related threads, I thought I should take a step
back and figure out a way to demonstrate what this thing really does
visually.  I wanted to show that this is a very useful feature that
unlocks previously unobtainable parallelism, and to show the
compromise we've had to make so far in an intuitive way.  With some
extra instrumentation hacked up locally, I produced the attached
"progress" graphs for a very simple query: SELECT COUNT(*) FROM r FULL
JOIN s USING (i).  Imagine a time axis along the bottom, but I didn't
bother to add numbers because I'm just trying to convey the 'shape' of
execution with relative times and synchronisation points.

Figures 1-3 show that phases 'h' (hash) and 'p' (probe) are
parallelised and finish sooner as we add more processes to help out,
but 's' (= the unmatched inner tuple scan) is not.  Note that if all
inner tuples are matched, 's' becomes extremely small and the
parallelism is almost as fair as a plain old inner join, but here I've
maximised it: all inner tuples were unmatched, because the two
relations have no matches at all.  Even if we achieve perfect linear
scalability for the other phases, the speedup will be governed by
https://en.wikipedia.org/wiki/Amdahl%27s_law and the only thing that
can mitigate that is if there is more useful work those early-quitting
processes could do somewhere else in your query plan.

Figure 4 shows that it gets a lot fairer in a multi-batch join,
because there is usually useful work to do on other batches of the
same join.  Notice how processes initially work on loading, probing
and scanning different batches to reduce contention, but they are
capable of ganging up to load and/or probe the same batch if there is
nothing else left to do (for example P2 and P3 both work on p5 near
the end).  For now, they can't do that for the s phases.  (BTW, the
little gaps before loading is the allocation phase that I didn't
bother to plot because they can't fit a label on them; this
visualisation technique is a WIP.)

With the "opportunistic" change we are discussing for later work,
figure 4 would become completely fair (P0 and P2 would be able to join
in and help out with s6 and s7), but single-batch figures 1-3 would
not (that would require a different executor design AFAICT, or a
eureka insight we haven't had yet; see long-winded discussion).

The last things I'm thinking about now:  Are the planner changes
right?  Are the tests enough?  I suspect we'll finish up changing that
chunk-based approach yet again in future work on memory efficiency,
but I'm OK with that; this change suits the current problem and we
don't know what we'll eventually settle on with more research.

Attachment: g.pdf
Description: Adobe PDF document

From 0bd32dd1a7e45e95a62f7f587bfba64bed87da28 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Fri, 24 Mar 2023 14:19:07 +1300
Subject: [PATCH v13 1/4] Scan for unmatched hash join tuples in memory order.

In a full/right outer join, we need to scan every tuple in the hash
table to find the ones that were not matched while probing so that we
can emit null-extended inner tuples.  Previously we did that in hash
table bucket order, which means that we dereferenced pointers to tuples
that were randomly scattered in memory (ie in an order derived from the
hash of the join key).

Change to a memory-order scan, using the linked list of memory chunks
that hold the tuples.  This isn't really being done for performance
reasons (a subject for later work), but it certainly can't be worse than
the previous random order.  The goal for now is to harmonize the scan
logic with a subsequent patch that will parallelize full joins.

Author: Melanie Plageman <melanieplage...@gmail.com>
Reviewed-by: Thomas Munro <thomas.mu...@gmail.com>
Discussion: https://postgr.es/m/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com
---
 src/backend/executor/nodeHash.c | 85 +++++++++++++--------------------
 src/include/executor/hashjoin.h |  4 ++
 2 files changed, 38 insertions(+), 51 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 748c9b0024..91fd806c97 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -517,6 +517,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 		hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100;
 	hashtable->chunks = NULL;
 	hashtable->current_chunk = NULL;
+	hashtable->unmatched_scan_chunk = NULL;
+	hashtable->unmatched_scan_idx = 0;
 	hashtable->parallel_state = state->parallel_state;
 	hashtable->area = state->ps.state->es_query_dsa;
 	hashtable->batches = NULL;
@@ -2058,16 +2060,10 @@ ExecParallelScanHashBucket(HashJoinState *hjstate,
 void
 ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
 {
-	/*----------
-	 * During this scan we use the HashJoinState fields as follows:
-	 *
-	 * hj_CurBucketNo: next regular bucket to scan
-	 * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums)
-	 * hj_CurTuple: last tuple returned, or NULL to start next bucket
-	 *----------
-	 */
-	hjstate->hj_CurBucketNo = 0;
-	hjstate->hj_CurSkewBucketNo = 0;
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+
+	hashtable->unmatched_scan_chunk = hashtable->chunks;
+	hashtable->unmatched_scan_idx = 0;
 	hjstate->hj_CurTuple = NULL;
 }
 
@@ -2083,58 +2079,45 @@ bool
 ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
 {
 	HashJoinTable hashtable = hjstate->hj_HashTable;
-	HashJoinTuple hashTuple = hjstate->hj_CurTuple;
+	HashMemoryChunk chunk;
 
-	for (;;)
+	while ((chunk = hashtable->unmatched_scan_chunk))
 	{
-		/*
-		 * hj_CurTuple is the address of the tuple last returned from the
-		 * current bucket, or NULL if it's time to start scanning a new
-		 * bucket.
-		 */
-		if (hashTuple != NULL)
-			hashTuple = hashTuple->next.unshared;
-		else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
-		{
-			hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
-			hjstate->hj_CurBucketNo++;
-		}
-		else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
+		while (hashtable->unmatched_scan_idx < chunk->used)
 		{
-			int			j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
+			HashJoinTuple hashTuple = (HashJoinTuple)
+			(HASH_CHUNK_DATA(hashtable->unmatched_scan_chunk) +
+			 hashtable->unmatched_scan_idx);
 
-			hashTuple = hashtable->skewBucket[j]->tuples;
-			hjstate->hj_CurSkewBucketNo++;
-		}
-		else
-			break;				/* finished all buckets */
+			MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
+			int			hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
 
-		while (hashTuple != NULL)
-		{
-			if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
-			{
-				TupleTableSlot *inntuple;
+			/* next tuple in this chunk */
+			hashtable->unmatched_scan_idx += MAXALIGN(hashTupleSize);
 
-				/* insert hashtable's tuple into exec slot */
-				inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
-												 hjstate->hj_HashTupleSlot,
-												 false);	/* do not pfree */
-				econtext->ecxt_innertuple = inntuple;
+			if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+				continue;
 
-				/*
-				 * Reset temp memory each time; although this function doesn't
-				 * do any qual eval, the caller will, so let's keep it
-				 * parallel to ExecScanHashBucket.
-				 */
-				ResetExprContext(econtext);
+			/* insert hashtable's tuple into exec slot */
+			econtext->ecxt_innertuple =
+				ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+									  hjstate->hj_HashTupleSlot,
+									  false);
 
-				hjstate->hj_CurTuple = hashTuple;
-				return true;
-			}
+			/*
+			 * Reset temp memory each time; although this function doesn't do
+			 * any qual eval, the caller will, so let's keep it parallel to
+			 * ExecScanHashBucket.
+			 */
+			ResetExprContext(econtext);
 
-			hashTuple = hashTuple->next.unshared;
+			hjstate->hj_CurTuple = hashTuple;
+			return true;
 		}
 
+		hashtable->unmatched_scan_chunk = chunk->next.unshared;
+		hashtable->unmatched_scan_idx = 0;
+
 		/* allow this loop to be cancellable */
 		CHECK_FOR_INTERRUPTS();
 	}
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index acb7592ca0..0abd888d1e 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -352,6 +352,10 @@ typedef struct HashJoinTableData
 	/* used for dense allocation of tuples (into linked chunks) */
 	HashMemoryChunk chunks;		/* one list for the whole batch */
 
+	/* current position in unmatched scan (full/right join only) */
+	HashMemoryChunk unmatched_scan_chunk;
+	size_t		unmatched_scan_idx;
+
 	/* Shared and private state for Parallel Hash. */
 	HashMemoryChunk current_chunk;	/* this backend's current chunk */
 	dsa_area   *area;			/* DSA area to allocate memory from */
-- 
2.39.2

From 9f9a3f56040a3b68ce74a17b57ce3ffe4aca5036 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Fri, 24 Mar 2023 15:23:14 +1300
Subject: [PATCH v13 2/4] Parallel Hash Full Join.

Full and right outer joins were not supported in the initial
implementation of Parallel Hash Join because of deadlock hazards.
Therefore FULL JOIN inhibited parallelism (the other join strategies
can't do it either).

Add a new PHJ phase PHJ_BATCH_SCAN that scans for unmatched tuples on
the inner side of one batch's hash table.  For now, sidestep the
deadlock problem by terminating parallelism there.  The last process to
arrive at that phase emits the unmatched tuples, while others detach and
are free to go and work on other batches, if there are any, but
otherwise they finish the join early.

That unfairness is considered acceptable for now, because it's better
than no parallelism at all.  The build and probe phases are run in
parallel, and the new scan-for-unmatched phase, while serial, is usually
applied to the smaller of the two relations and is either limited by
some multiple of work_mem, or it's too big and is partitioned into
batches and then the situation is improved by batch-level parallelism.
In future work on deadlock avoidance strategies, we may find a way to
parallelize the new phase safely.

Author: Melanie Plageman <melanieplage...@gmail.com>
Reviewed-by: Thomas Munro <thomas.mu...@gmail.com>
Discussion: https://postgr.es/m/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com
---
 src/backend/executor/nodeHash.c         | 163 +++++++++++++++++++++++-
 src/backend/executor/nodeHashjoin.c     |  91 +++++++++----
 src/backend/optimizer/path/joinpath.c   |  14 +-
 src/include/executor/hashjoin.h         |   7 +-
 src/include/executor/nodeHash.h         |   3 +
 src/test/regress/expected/join_hash.out |  58 ++++++++-
 src/test/regress/sql/join_hash.sql      |  25 +++-
 7 files changed, 314 insertions(+), 47 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 91fd806c97..58789be71a 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -2067,6 +2067,66 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
 	hjstate->hj_CurTuple = NULL;
 }
 
+/*
+ * ExecParallelPrepHashTableForUnmatched
+ *		set up for a series of ExecParallelScanHashTableForUnmatched calls
+ *		return true if this worker is elected to do the unmatched inner scan
+ */
+bool
+ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
+{
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	int			curbatch = hashtable->curbatch;
+	ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+
+	hjstate->hj_CurTuple = NULL;
+	if (curbatch < 0)
+		return false;
+
+	/*
+	 * It would not be deadlock-free to wait on the batch barrier, because it
+	 * is in PHJ_BATCH_PROBE phase, and thus processes attached to it have
+	 * already emitted tuples.  Therefore, we'll hold a wait-free election:
+	 * only one process can continue to the next phase, and all others detach
+	 * from this batch.  They can still go any work on other batches, if there
+	 * are any.
+	 */
+	Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE);
+	if (!BarrierArriveAndDetachExceptLast(&batch->batch_barrier))
+	{
+		/* This process considers the batch to be done. */
+		hashtable->batches[hashtable->curbatch].done = true;
+
+		/* Make sure any temporary files are closed. */
+		sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
+		sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
+
+		/*
+		 * Track largest batch we've seen, which would normally happen in
+		 * ExecHashTableDetachBatch().
+		 */
+		hashtable->spacePeak =
+			Max(hashtable->spacePeak,
+				batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
+		hashtable->curbatch = -1;
+		return false;
+	}
+
+	/* Now we are alone with this batch. */
+	Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN);
+	Assert(BarrierParticipants(&batch->batch_barrier) == 1);
+
+	/*
+	 * See also ExecParallelHashJoinNewBatch()'s assertion that
+	 * batch->work_queue == batch->chunks.  That is, we are now ready to start
+	 * processing all chunks by consuming from work_queue.
+	 */
+	hashtable->unmatched_scan_chunk = NULL;
+	hashtable->unmatched_scan_idx = 0;
+
+	return true;
+}
+
 /*
  * ExecScanHashTableForUnmatched
  *		scan the hash table for unmatched inner tuples
@@ -2128,6 +2188,80 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
 	return false;
 }
 
+/*
+ * ExecParallelScanHashTableForUnmatched
+ *		scan the hash table for unmatched inner tuples, in parallel join
+ *
+ * On success, the inner tuple is stored into hjstate->hj_CurTuple and
+ * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
+ * for the latter.
+ */
+bool
+ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+									  ExprContext *econtext)
+{
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	int			curbatch = hashtable->curbatch;
+	ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[curbatch];
+	HashMemoryChunk chunk;
+
+	for (;;)
+	{
+		if (!(chunk = hashtable->unmatched_scan_chunk))
+		{
+
+			/*
+			 * Since only one process can run this currently, we don't need to
+			 * bother with interlocking, when popping chunks off the work
+			 * queue.
+			 */
+			chunk = (HashMemoryChunk)
+				dsa_get_address(hashtable->area, accessor->shared->work_queue);
+			if (!chunk)
+				break;
+			accessor->shared->work_queue = chunk->next.shared;
+			hashtable->unmatched_scan_chunk = chunk;
+			hashtable->unmatched_scan_idx = 0;
+		}
+
+		while (hashtable->unmatched_scan_idx < chunk->used)
+		{
+			HashJoinTuple hashTuple = (HashJoinTuple)
+			(HASH_CHUNK_DATA(chunk) + hashtable->unmatched_scan_idx);
+
+			hashtable->unmatched_scan_idx += MAXALIGN(HJTUPLE_OVERHEAD +
+													  HJTUPLE_MINTUPLE(hashTuple)->t_len);
+
+			if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+				continue;
+
+			/* insert hashtable's tuple into exec slot */
+			econtext->ecxt_innertuple =
+				ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+									  hjstate->hj_HashTupleSlot, false);
+
+			/*
+			 * Reset temp memory each time; although this function doesn't do
+			 * any qual eval, the caller will, so let's keep it parallel to
+			 * ExecScanHashBucket.
+			 */
+			ResetExprContext(econtext);
+
+			hjstate->hj_CurTuple = hashTuple;
+			return true;
+		}
+
+		hashtable->unmatched_scan_chunk = NULL;
+
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	/*
+	 * no more unmatched tuples
+	 */
+	return false;
+}
+
 /*
  * ExecHashTableReset
  *
@@ -2908,6 +3042,12 @@ ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size,
 	chunk->next.shared = hashtable->batches[curbatch].shared->chunks;
 	hashtable->batches[curbatch].shared->chunks = chunk_shared;
 
+	/*
+	 * Also make this the head of the work_queue list.  This is used as a
+	 * cursor for scanning all chunks in the batch.
+	 */
+	hashtable->batches[curbatch].shared->work_queue = chunk_shared;
+
 	if (size <= HASH_CHUNK_THRESHOLD)
 	{
 		/*
@@ -3116,18 +3256,31 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
 	{
 		int			curbatch = hashtable->curbatch;
 		ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+		bool		attached = true;
 
 		/* Make sure any temporary files are closed. */
 		sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
 		sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
 
-		/* Detach from the batch we were last working on. */
-		if (BarrierArriveAndDetach(&batch->batch_barrier))
+		/* After attaching we always get at least to PHJ_BATCH_PROBE. */
+		Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE ||
+			   BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN);
+
+		/*
+		 * Even if we aren't doing a full/right outer join, we'll step through
+		 * the PHJ_BATCH_SCAN phase just to maintain the invariant that freeing
+		 * happens in PHJ_BATCH_FREE, but that'll be wait-free.
+		 */
+		if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE)
+			attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
+		if (attached && BarrierArriveAndDetach(&batch->batch_barrier))
 		{
 			/*
-			 * Technically we shouldn't access the barrier because we're no
-			 * longer attached, but since there is no way it's moving after
-			 * this point it seems safe to make the following assertion.
+			 * We are not longer attached to the batch barrier, but we're the
+			 * process that was chosen to free resources and it's safe to
+			 * assert the current phase.  The ParallelHashJoinBatch can't go
+			 * away underneath us while we are attached to the build barrier,
+			 * making this access safe.
 			 */
 			Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_FREE);
 
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index f189fb4d28..93bf0ad6e9 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -86,6 +86,7 @@
  *  PHJ_BATCH_ALLOCATE*      -- one allocates buckets
  *  PHJ_BATCH_LOAD           -- all load the hash table from disk
  *  PHJ_BATCH_PROBE          -- all probe
+ *  PHJ_BATCH_SCAN*          -- one does full/right unmatched scan
  *  PHJ_BATCH_FREE*          -- one frees memory
  *
  * Batch 0 is a special case, because it starts out in phase
@@ -103,9 +104,10 @@
  * to a barrier, unless the barrier has reached a phase that means that no
  * process will wait on it again.  We emit tuples while attached to the build
  * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
- * PHJ_BATCH_PROBE.  These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_FREE
- * respectively without waiting, using BarrierArriveAndDetach().  The last to
- * detach receives a different return value so that it knows that it's safe to
+ * PHJ_BATCH_PROBE.  These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN
+ * respectively without waiting, using BarrierArriveAndDetach() and
+ * BarrierArriveAndDetachExceptLast() respectively.  The last to detach
+ * receives a different return value so that it knows that it's safe to
  * clean up.  Any straggler process that attaches after that phase is reached
  * will see that it's too late to participate or access the relevant shared
  * memory objects.
@@ -393,8 +395,23 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 					if (HJ_FILL_INNER(node))
 					{
 						/* set up to scan for unmatched inner tuples */
-						ExecPrepHashTableForUnmatched(node);
-						node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+						if (parallel)
+						{
+							/*
+							 * Only one process is currently allow to handle
+							 * each batch's unmatched tuples, in a parallel
+							 * join.
+							 */
+							if (ExecParallelPrepHashTableForUnmatched(node))
+								node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+							else
+								node->hj_JoinState = HJ_NEED_NEW_BATCH;
+						}
+						else
+						{
+							ExecPrepHashTableForUnmatched(node);
+							node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+						}
 					}
 					else
 						node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -487,25 +504,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				{
 					node->hj_MatchedOuter = true;
 
-					if (parallel)
-					{
-						/*
-						 * Full/right outer joins are currently not supported
-						 * for parallel joins, so we don't need to set the
-						 * match bit.  Experiments show that it's worth
-						 * avoiding the shared memory traffic on large
-						 * systems.
-						 */
-						Assert(!HJ_FILL_INNER(node));
-					}
-					else
-					{
-						/*
-						 * This is really only needed if HJ_FILL_INNER(node),
-						 * but we'll avoid the branch and just set it always.
-						 */
+
+					/*
+					 * This is really only needed if HJ_FILL_INNER(node), but
+					 * we'll avoid the branch and just set it always.
+					 */
+					if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
 						HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
-					}
 
 					/* In an antijoin, we never return a matched tuple */
 					if (node->js.jointype == JOIN_ANTI)
@@ -563,7 +568,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				 * so any unmatched inner tuples in the hashtable have to be
 				 * emitted before we continue to the next batch.
 				 */
-				if (!ExecScanHashTableForUnmatched(node, econtext))
+				if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
+					  : ExecScanHashTableForUnmatched(node, econtext)))
 				{
 					/* no more unmatched tuples */
 					node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -1197,13 +1203,44 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
 					 * hash table stays alive until everyone's finished
 					 * probing it, but no participant is allowed to wait at
 					 * this barrier again (or else a deadlock could occur).
-					 * All attached participants must eventually call
-					 * BarrierArriveAndDetach() so that the final phase
-					 * PHJ_BATCH_FREE can be reached.
+					 * All attached participants must eventually detach from
+					 * the barrier and one worker must advance the phase so
+					 * that the final phase is reached.
 					 */
 					ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
 					sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
+
+					/*
+					 * This is a good place to assert that the batch's
+					 * work_queue has been set to point to the head of the
+					 * chunk list.  No new chunks are created once this phase
+					 * is reached, and we don't want to have to do any extra
+					 * IPC just to set up the work_queue that the later
+					 * PHJ_BATCH_SCAN phase wants, so instead require that it
+					 * has been maintained correctly by the earlier phases.
+					 */
+					Assert(hashtable->batches[batchno].shared->work_queue ==
+						   hashtable->batches[batchno].shared->chunks);
+
 					return true;
+				case PHJ_BATCH_SCAN:
+
+					/*
+					 * In principle, we could help scan for unmatched tuples,
+					 * since that phase is already underway (the thing we can't
+					 * do under current deadlock-avoidance rules is wait for
+					 * others to arrive at PHJ_BATCH_SCAN, because
+					 * PHJ_BATCH_PROBE emits tuples, but in this case we just
+					 * got here without waiting).  That is not yet done.  For
+					 * now, we just detach and go around again.  We have to use
+					 * ExecHashTableDetachBatch() because there's a small
+					 * chance we'll be the last to detach, and then we're
+					 * responsible for freeing memory.
+					 */
+					ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
+					hashtable->batches[batchno].done = true;
+					ExecHashTableDetachBatch(hashtable);
+					break;
 
 				case PHJ_BATCH_FREE:
 
diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c
index e6ef0deb23..bd51e4f972 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -2193,15 +2193,9 @@ hash_inner_and_outer(PlannerInfo *root,
 		 * able to properly guarantee uniqueness.  Similarly, we can't handle
 		 * JOIN_FULL and JOIN_RIGHT, because they can produce false null
 		 * extended rows.  Also, the resulting path must not be parameterized.
-		 * We would be able to support JOIN_FULL and JOIN_RIGHT for Parallel
-		 * Hash, since in that case we're back to a single hash table with a
-		 * single set of match bits for each batch, but that will require
-		 * figuring out a deadlock-free way to wait for the probe to finish.
 		 */
 		if (joinrel->consider_parallel &&
 			save_jointype != JOIN_UNIQUE_OUTER &&
-			save_jointype != JOIN_FULL &&
-			save_jointype != JOIN_RIGHT &&
 			outerrel->partial_pathlist != NIL &&
 			bms_is_empty(joinrel->lateral_relids))
 		{
@@ -2235,9 +2229,13 @@ hash_inner_and_outer(PlannerInfo *root,
 			 * total inner path will also be parallel-safe, but if not, we'll
 			 * have to search for the cheapest safe, unparameterized inner
 			 * path.  If doing JOIN_UNIQUE_INNER, we can't use any alternative
-			 * inner path.
+			 * inner path.  If full or right join, we can't use parallelism
+			 * (building the hash table in each backend) because no one
+			 * process has all the match bits.
 			 */
-			if (cheapest_total_inner->parallel_safe)
+			if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT)
+				cheapest_safe_inner = NULL;
+			else if (cheapest_total_inner->parallel_safe)
 				cheapest_safe_inner = cheapest_total_inner;
 			else if (save_jointype != JOIN_UNIQUE_INNER)
 				cheapest_safe_inner =
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 0abd888d1e..7615025d73 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -154,6 +154,7 @@ typedef struct ParallelHashJoinBatch
 	Barrier		batch_barrier;	/* synchronization for joining this batch */
 
 	dsa_pointer chunks;			/* chunks of tuples loaded */
+	dsa_pointer work_queue;		/* cursor for processing all chunks */
 	size_t		size;			/* size of buckets + chunks in memory */
 	size_t		estimated_size; /* size of buckets + chunks while writing */
 	size_t		ntuples;		/* number of tuples loaded */
@@ -266,7 +267,8 @@ typedef struct ParallelHashJoinState
 #define PHJ_BATCH_ALLOCATE				1
 #define PHJ_BATCH_LOAD					2
 #define PHJ_BATCH_PROBE					3
-#define PHJ_BATCH_FREE					4
+#define PHJ_BATCH_SCAN					4
+#define PHJ_BATCH_FREE					5
 
 /* The phases of batch growth while hashing, for grow_batches_barrier. */
 #define PHJ_GROW_BATCHES_ELECT			0
@@ -356,6 +358,9 @@ typedef struct HashJoinTableData
 	HashMemoryChunk unmatched_scan_chunk;
 	size_t		unmatched_scan_idx;
 
+	/* index of tuple within current chunk for serial unmatched inner scan */
+	size_t		current_chunk_idx;
+
 	/* Shared and private state for Parallel Hash. */
 	HashMemoryChunk current_chunk;	/* this backend's current chunk */
 	dsa_area   *area;			/* DSA area to allocate memory from */
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index d7634af05c..56d5350c61 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -56,8 +56,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
+extern bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate);
 extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate,
 										  ExprContext *econtext);
+extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+												  ExprContext *econtext);
 extern void ExecHashTableReset(HashJoinTable hashtable);
 extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable);
 extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out
index 3ec07bc1af..027f3888b0 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -784,8 +784,9 @@ select  count(*) from simple r full outer join simple s using (id);
 (1 row)
 
 rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s using (id);
@@ -806,7 +807,32 @@ select  count(*) from simple r full outer join simple s using (id);
 (1 row)
 
 rollback to settings;
--- An full outer join where every record is not matched.
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- A full outer join where every record is not matched.
 -- non-parallel
 savepoint settings;
 set local max_parallel_workers_per_gather = 0;
@@ -829,8 +855,9 @@ select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 (1 row)
 
 rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
@@ -850,6 +877,31 @@ select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
  40000
 (1 row)
 
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: ((0 - s.id) = r.id)
+                     ->  Parallel Seq Scan on simple s
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple r
+(9 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
 rollback to settings;
 -- exercise special code paths for huge tuples (note use of non-strict
 -- expression and left join required to get the detoasted tuple into
diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql
index 77dbc182d5..ba1b3e6e1b 100644
--- a/src/test/regress/sql/join_hash.sql
+++ b/src/test/regress/sql/join_hash.sql
@@ -435,15 +435,24 @@ explain (costs off)
 select  count(*) from simple r full outer join simple s using (id);
 rollback to settings;
 
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s using (id);
 select  count(*) from simple r full outer join simple s using (id);
 rollback to settings;
 
--- An full outer join where every record is not matched.
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- A full outer join where every record is not matched.
 
 -- non-parallel
 savepoint settings;
@@ -453,14 +462,24 @@ explain (costs off)
 select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 rollback to settings;
 
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 rollback to settings;
 
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+
 -- exercise special code paths for huge tuples (note use of non-strict
 -- expression and left join required to get the detoasted tuple into
 -- the hash table)
-- 
2.39.2

From f806c211d2d1128303f7ed29bf8d9b8395d3a859 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Mon, 27 Mar 2023 16:32:22 +1300
Subject: [PATCH v13 3/4] XXX fixup

---
 src/backend/executor/nodeHash.c     | 34 +++++++++++++++++++++++++++++
 src/backend/executor/nodeHashjoin.c |  2 ++
 src/include/executor/hashjoin.h     |  3 ++-
 3 files changed, 38 insertions(+), 1 deletion(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 58789be71a..6539b32d45 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -2116,6 +2116,17 @@ ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
 	Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN);
 	Assert(BarrierParticipants(&batch->batch_barrier) == 1);
 
+	/*
+	 * Has another process decided to give up early and command all processes
+	 * to skip the unmatched scan?
+	 */
+	if (batch->skip_unmatched)
+	{
+		hashtable->batches[hashtable->curbatch].done = true;
+		ExecHashTableDetachBatch(hashtable);
+		return false;
+	}
+
 	/*
 	 * See also ExecParallelHashJoinNewBatch()'s assertion that
 	 * batch->work_queue == batch->chunks.  That is, we are now ready to start
@@ -3211,6 +3222,7 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
 		accessor->shared = shared;
 		accessor->preallocated = 0;
 		accessor->done = false;
+		accessor->outer_eof = false;
 		accessor->inner_tuples =
 			sts_attach(ParallelHashJoinBatchInner(shared),
 					   ParallelWorkerNumber + 1,
@@ -3266,6 +3278,28 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
 		Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE ||
 			   BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN);
 
+		/*
+		 * If we're abandoning the PHJ_BATCH_PROBE phase early without having
+		 * reached the end of it, it means the plan doesn't want any more
+		 * tuples, and it is happy to abandon any tuples buffered in this
+		 * process's subplans.  For correctness, we can't allow any process to
+		 * execute the PHJ_BATCH_SCAN phase, because we will never have the
+		 * complete set of match bits.  Therefore we skip emitting unmatched
+		 * tuples in all backends (if this is a full/right join), as if those
+		 * tuples were all due to be emitted by this process and it has
+		 * abandoned them too.
+		 */
+		if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE &&
+			!hashtable->batches[curbatch].outer_eof)
+		{
+			/*
+			 * This flag may be written to by multiple backends during
+			 * PHJ_BATCH_PROBE phase, but will only be read in PHJ_BATCH_SCAN
+			 * phase so requires no extra locking.
+			 */
+			batch->skip_unmatched = true;
+		}
+
 		/*
 		 * Even if we aren't doing a full/right outer join, we'll step through
 		 * the PHJ_BATCH_SCAN phase just to maintain the invariant that freeing
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 93bf0ad6e9..c8af59f106 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -972,6 +972,8 @@ ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
 	}
 
 	/* End of this batch */
+	hashtable->batches[curbatch].outer_eof = true;
+
 	return NULL;
 }
 
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 7615025d73..f5fcf7d4e6 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -160,6 +160,7 @@ typedef struct ParallelHashJoinBatch
 	size_t		ntuples;		/* number of tuples loaded */
 	size_t		old_ntuples;	/* number of tuples before repartitioning */
 	bool		space_exhausted;
+	bool		skip_unmatched;	/* whether to abandon unmatched scan */
 
 	/*
 	 * Variable-sized SharedTuplestore objects follow this struct in memory.
@@ -204,7 +205,7 @@ typedef struct ParallelHashJoinBatchAccessor
 	size_t		estimated_size; /* size of partition on disk */
 	size_t		old_ntuples;	/* how many tuples before repartitioning? */
 	bool		at_least_one_chunk; /* has this backend allocated a chunk? */
-
+	bool		outer_eof;		/* has this process hit end of batch? */
 	bool		done;			/* flag to remember that a batch is done */
 	SharedTuplestoreAccessor *inner_tuples;
 	SharedTuplestoreAccessor *outer_tuples;
-- 
2.39.2

Reply via email to