This is an automated email from the ASF dual-hosted git repository.

avamingli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit b75378a5358f0ea3bd64fd1a922ec7e783321f27
Author: Thomas Munro <[email protected]>
AuthorDate: Fri Mar 31 11:01:51 2023 +1300

    Parallel Hash Full Join.
    
    Full and right outer joins were not supported in the initial
    implementation of Parallel Hash Join because of deadlock hazards (see
    discussion).  Therefore FULL JOIN inhibited parallelism, as the other
    join strategies can't do that in parallel either.
    
    Add a new PHJ phase PHJ_BATCH_SCAN that scans for unmatched tuples on
    the inner side of one batch's hash table.  For now, sidestep the
    deadlock problem by terminating parallelism there.  The last process to
    arrive at that phase emits the unmatched tuples, while others detach and
    are free to go and work on other batches, if there are any, but
    otherwise they finish the join early.
    
    That unfairness is considered acceptable for now, because it's better
    than no parallelism at all.  The build and probe phases are run in
    parallel, and the new scan-for-unmatched phase, while serial, is usually
    applied to the smaller of the two relations and is either limited by
    some multiple of work_mem, or it's too big and is partitioned into
    batches and then the situation is improved by batch-level parallelism.
    
    Author: Melanie Plageman <[email protected]>
    Author: Thomas Munro <[email protected]>
    Reviewed-by: Thomas Munro <[email protected]>
    Discussion: 
https://postgr.es/m/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com
---
 src/backend/executor/nodeHash.c         | 179 ++++++++++++++++++++++++++++++--
 src/backend/executor/nodeHashjoin.c     | 102 +++++++++++-------
 src/backend/optimizer/path/joinpath.c   |  14 ++-
 src/include/executor/hashjoin.h         |   6 +-
 src/include/executor/nodeHash.h         |   3 +
 src/test/regress/expected/join_hash.out |  65 +++++++++++-
 src/test/regress/sql/join_hash.sql      |  27 ++++-
 7 files changed, 339 insertions(+), 57 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 62d3c2da790..39db08164e0 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -2388,6 +2388,69 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
        hjstate->hj_CurTuple = NULL;
 }
 
+/*
+ * Decide if this process is allowed to run the unmatched scan.  If so, the
+ * batch barrier is advanced to PHJ_BATCH_SCAN and true is returned.
+ * Otherwise the batch is detached and false is returned.
+ */
+bool
+ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
+{
+       HashJoinTable hashtable = hjstate->hj_HashTable;
+       int                     curbatch = hashtable->curbatch;
+       ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+
+       Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE);
+
+       /*
+        * It would not be deadlock-free to wait on the batch barrier, because 
it
+        * is in PHJ_BATCH_PROBE phase, and thus processes attached to it have
+        * already emitted tuples.  Therefore, we'll hold a wait-free election:
+        * only one process can continue to the next phase, and all others 
detach
+        * from this batch.  They can still go any work on other batches, if 
there
+        * are any.
+        */
+       if (!BarrierArriveAndDetachExceptLast(&batch->batch_barrier))
+       {
+               /* This process considers the batch to be done. */
+               hashtable->batches[hashtable->curbatch].done = true;
+
+               /* Make sure any temporary files are closed. */
+               
sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
+               
sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
+
+               /*
+                * Track largest batch we've seen, which would normally happen 
in
+                * ExecHashTableDetachBatch().
+                */
+               hashtable->spacePeak =
+                       Max(hashtable->spacePeak,
+                               batch->size + sizeof(dsa_pointer_atomic) * 
hashtable->nbuckets);
+               hashtable->curbatch = -1;
+               return false;
+       }
+
+       /* Now we are alone with this batch. */
+       Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN);
+       Assert(BarrierParticipants(&batch->batch_barrier) == 1);
+
+       /*
+        * Has another process decided to give up early and command all 
processes
+        * to skip the unmatched scan?
+        */
+       if (batch->skip_unmatched)
+       {
+               hashtable->batches[hashtable->curbatch].done = true;
+               ExecHashTableDetachBatch(hashtable);
+               return false;
+       }
+
+       /* Now prepare the process local state, just as for non-parallel join. 
*/
+       ExecPrepHashTableForUnmatched(hjstate);
+
+       return true;
+}
+
 /*
  * ExecScanHashTableForUnmatched
  *             scan the hash table for unmatched inner tuples
@@ -2462,6 +2525,72 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, 
ExprContext *econtext)
        return false;
 }
 
+/*
+ * ExecParallelScanHashTableForUnmatched
+ *             scan the hash table for unmatched inner tuples, in parallel join
+ *
+ * On success, the inner tuple is stored into hjstate->hj_CurTuple and
+ * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
+ * for the latter.
+ */
+bool
+ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+                                                                         
ExprContext *econtext)
+{
+       HashJoinTable hashtable = hjstate->hj_HashTable;
+       HashJoinTuple hashTuple = hjstate->hj_CurTuple;
+
+       for (;;)
+       {
+               /*
+                * hj_CurTuple is the address of the tuple last returned from 
the
+                * current bucket, or NULL if it's time to start scanning a new
+                * bucket.
+                */
+               if (hashTuple != NULL)
+                       hashTuple = ExecParallelHashNextTuple(hashtable, 
hashTuple);
+               else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
+                       hashTuple = ExecParallelHashFirstTuple(hashtable,
+                                                                               
                   hjstate->hj_CurBucketNo++);
+               else
+                       break;                          /* finished all buckets 
*/
+
+               while (hashTuple != NULL)
+               {
+                       if 
(!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+                       {
+                               TupleTableSlot *inntuple;
+
+                               /* insert hashtable's tuple into exec slot */
+                               inntuple = 
ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+                                                                               
                 hjstate->hj_HashTupleSlot,
+                                                                               
                 false);        /* do not pfree */
+                               econtext->ecxt_innertuple = inntuple;
+
+                               /*
+                                * Reset temp memory each time; although this 
function doesn't
+                                * do any qual eval, the caller will, so let's 
keep it
+                                * parallel to ExecScanHashBucket.
+                                */
+                               ResetExprContext(econtext);
+
+                               hjstate->hj_CurTuple = hashTuple;
+                               return true;
+                       }
+
+                       hashTuple = ExecParallelHashNextTuple(hashtable, 
hashTuple);
+               }
+
+               /* allow this loop to be cancellable */
+               CHECK_FOR_INTERRUPTS();
+       }
+
+       /*
+        * no more unmatched tuples
+        */
+       return false;
+}
+
 /*
  * ExecHashTableReset
  *
@@ -3793,6 +3922,7 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable 
hashtable)
                accessor->shared = shared;
                accessor->preallocated = 0;
                accessor->done = false;
+               accessor->outer_eof = false;
                accessor->inner_tuples =
                        sts_attach(ParallelHashJoinBatchInner(shared),
                                           hashtable->hjstate->worker_id,
@@ -3838,25 +3968,62 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
        {
                int                     curbatch = hashtable->curbatch;
                ParallelHashJoinBatch *batch = 
hashtable->batches[curbatch].shared;
+               bool            attached = true;
 
                /* Make sure any temporary files are closed. */
                
sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
                
sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
 
-               /* Detach from the batch we were last working on. */
+               /* After attaching we always get at least to PHJ_BATCH_PROBE. */
+               Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE ||
+                          BarrierPhase(&batch->batch_barrier) == 
PHJ_BATCH_SCAN);
+
+               /*
+                * If we're abandoning the PHJ_BATCH_PROBE phase early without 
having
+                * reached the end of it, it means the plan doesn't want any 
more
+                * tuples, and it is happy to abandon any tuples buffered in 
this
+                * process's subplans.  For correctness, we can't allow any 
process to
+                * execute the PHJ_BATCH_SCAN phase, because we will never have 
the
+                * complete set of match bits.  Therefore we skip emitting 
unmatched
+                * tuples in all backends (if this is a full/right join), as if 
those
+                * tuples were all due to be emitted by this process and it has
+                * abandoned them too.
+                */
                /*
                 * CBDB_PARALLEL: Parallel Hash Left Anti Semi (Not-In) 
Join(parallel-aware)
                 * If phs_lasj_has_null is true, that means we have found null 
when building hash table,
                 * there were no batches to detach.
                 */
-               if (!hashtable->parallel_state->phs_lasj_has_null && 
BarrierArriveAndDetach(&batch->batch_barrier))
+               if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE &&
+                       !hashtable->parallel_state->phs_lasj_has_null && /* 
CBDB_PARALLEL */
+                       !hashtable->batches[curbatch].outer_eof)
+               {
+                       /*
+                        * This flag may be written to by multiple backends 
during
+                        * PHJ_BATCH_PROBE phase, but will only be read in 
PHJ_BATCH_SCAN
+                        * phase so requires no extra locking.
+                        */
+                       batch->skip_unmatched = true;
+               }
+
+               /*
+                * Even if we aren't doing a full/right outer join, we'll step 
through
+                * the PHJ_BATCH_SCAN phase just to maintain the invariant that
+                * freeing happens in PHJ_BATCH_FREE, but that'll be wait-free.
+                */
+               if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE &&
+                       !hashtable->parallel_state->phs_lasj_has_null /* 
CBDB_PARALLEL */)
+                       attached = 
BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
+               if (attached && BarrierArriveAndDetach(&batch->batch_barrier))
                {
                        /*
-                        * Technically we shouldn't access the barrier because 
we're no
-                        * longer attached, but since there is no way it's 
moving after
-                        * this point it seems safe to make the following 
assertion.
+                        * We are not longer attached to the batch barrier, but 
we're the
+                        * process that was chosen to free resources and it's 
safe to
+                        * assert the current phase.  The ParallelHashJoinBatch 
can't go
+                        * away underneath us while we are attached to the 
build barrier,
+                        * making this access safe.
                         */
-                       Assert(BarrierPhase(&batch->batch_barrier) == 
PHJ_BATCH_DONE);
+                       Assert(BarrierPhase(&batch->batch_barrier) == 
PHJ_BATCH_FREE);
 
                        /* Free shared chunks and buckets. */
                        while (DsaPointerIsValid(batch->chunks))
diff --git a/src/backend/executor/nodeHashjoin.c 
b/src/backend/executor/nodeHashjoin.c
index 9ec70f16e31..7f58cafb75d 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -80,11 +80,12 @@
  * aren't enough to go around.  For each batch there is a separate barrier
  * with the following phases:
  *
- *  PHJ_BATCH_ELECTING       -- initial state
- *  PHJ_BATCH_ALLOCATING     -- one allocates buckets
- *  PHJ_BATCH_LOADING        -- all load the hash table from disk
- *  PHJ_BATCH_PROBING        -- all probe
- *  PHJ_BATCH_DONE           -- end
+ *  PHJ_BATCH_ELECT          -- initial state
+ *  PHJ_BATCH_ALLOCATE*      -- one allocates buckets
+ *  PHJ_BATCH_LOAD           -- all load the hash table from disk
+ *  PHJ_BATCH_PROBE          -- all probe
+ *  PHJ_BATCH_SCAN*          -- one does full/right unmatched scan
+ *  PHJ_BATCH_FREE*          -- one frees memory
  *
  * Batch 0 is a special case, because it starts out in phase
  * PHJ_BATCH_PROBING; populating batch 0's hash table is done during
@@ -97,11 +98,17 @@
  *
  * To avoid deadlocks, we never wait for any barrier unless it is known that
  * all other backends attached to it are actively executing the node or have
- * already arrived.  Practically, that means that we never return a tuple
- * while attached to a barrier, unless the barrier has reached its final
- * state.  In the slightly special case of the per-batch barrier, we return
- * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
- * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
+ * finished.  Practically, that means that we never emit a tuple while attached
+ * to a barrier, unless the barrier has reached a phase that means that no
+ * process will wait on it again.  We emit tuples while attached to the build
+ * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
+ * PHJ_BATCH_PROBE.  These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN
+ * respectively without waiting, using BarrierArriveAndDetach() and
+ * BarrierArriveAndDetachExceptLast() respectively.  The last to detach
+ * receives a different return value so that it knows that it's safe to
+ * clean up.  Any straggler process that attaches after that phase is reached
+ * will see that it's too late to participate or access the relevant shared
+ * memory objects.
  *
  *-------------------------------------------------------------------------
  */
@@ -493,8 +500,23 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
                                        if (HJ_FILL_INNER(node))
                                        {
                                                /* set up to scan for unmatched 
inner tuples */
-                                               
ExecPrepHashTableForUnmatched(node);
-                                               node->hj_JoinState = 
HJ_FILL_INNER_TUPLES;
+                                               if (parallel)
+                                               {
+                                                       /*
+                                                        * Only one process is 
currently allow to handle
+                                                        * each batch's 
unmatched tuples, in a parallel
+                                                        * join.
+                                                        */
+                                                       if 
(ExecParallelPrepHashTableForUnmatched(node))
+                                                               
node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+                                                       else
+                                                               
node->hj_JoinState = HJ_NEED_NEW_BATCH;
+                                               }
+                                               else
+                                               {
+                                                       
ExecPrepHashTableForUnmatched(node);
+                                                       node->hj_JoinState = 
HJ_FILL_INNER_TUPLES;
+                                               }
                                        }
                                        else
                                                node->hj_JoinState = 
HJ_NEED_NEW_BATCH;
@@ -605,25 +627,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
                                {
                                        node->hj_MatchedOuter = true;
 
-                                       if (parallel)
-                                       {
-                                               /*
-                                                * Full/right outer joins are 
currently not supported
-                                                * for parallel joins, so we 
don't need to set the
-                                                * match bit.  Experiments show 
that it's worth
-                                                * avoiding the shared memory 
traffic on large
-                                                * systems.
-                                                */
-                                               Assert(!HJ_FILL_INNER(node));
-                                       }
-                                       else
-                                       {
-                                               /*
-                                                * This is really only needed 
if HJ_FILL_INNER(node),
-                                                * but we'll avoid the branch 
and just set it always.
-                                                */
+
+                                       /*
+                                        * This is really only needed if 
HJ_FILL_INNER(node), but
+                                        * we'll avoid the branch and just set 
it always.
+                                        */
+                                       if 
(!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
                                                
HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
-                                       }
 
                                        /* In an antijoin, we never return a 
matched tuple */
                                        if (node->js.jointype == JOIN_ANTI ||
@@ -682,7 +692,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
                                 * so any unmatched inner tuples in the 
hashtable have to be
                                 * emitted before we continue to the next batch.
                                 */
-                               if (!ExecScanHashTableForUnmatched(node, 
econtext))
+                               if (!(parallel ? 
ExecParallelScanHashTableForUnmatched(node, econtext)
+                                         : ExecScanHashTableForUnmatched(node, 
econtext)))
                                {
                                        /* no more unmatched tuples */
                                        node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -1241,6 +1252,8 @@ ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
        }
 
        /* End of this batch */
+       hashtable->batches[curbatch].outer_eof = true;
+
        return NULL;
 }
 
@@ -1521,15 +1534,34 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
                                         * hash table stays alive until 
everyone's finished
                                         * probing it, but no participant is 
allowed to wait at
                                         * this barrier again (or else a 
deadlock could occur).
-                                        * All attached participants must 
eventually call
-                                        * BarrierArriveAndDetach() so that the 
final phase
-                                        * PHJ_BATCH_DONE can be reached.
+                                        * All attached participants must 
eventually detach from
+                                        * the barrier and one worker must 
advance the phase so
+                                        * that the final phase is reached.
                                         */
                                        
ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
                                        
sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
+
                                        return true;
+                               case PHJ_BATCH_SCAN:
+
+                                       /*
+                                        * In principle, we could help scan for 
unmatched tuples,
+                                        * since that phase is already underway 
(the thing we
+                                        * can't do under current 
deadlock-avoidance rules is wait
+                                        * for others to arrive at 
PHJ_BATCH_SCAN, because
+                                        * PHJ_BATCH_PROBE emits tuples, but in 
this case we just
+                                        * got here without waiting).  That is 
not yet done.  For
+                                        * now, we just detach and go around 
again.  We have to
+                                        * use ExecHashTableDetachBatch() 
because there's a small
+                                        * chance we'll be the last to detach, 
and then we're
+                                        * responsible for freeing memory.
+                                        */
+                                       
ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
+                                       hashtable->batches[batchno].done = true;
+                                       ExecHashTableDetachBatch(hashtable);
+                                       break;
 
-                               case PHJ_BATCH_DONE:
+                               case PHJ_BATCH_FREE:
 
                                        /*
                                         * Already done.  Detach and go around 
again (if any
diff --git a/src/backend/optimizer/path/joinpath.c 
b/src/backend/optimizer/path/joinpath.c
index ec50c66104a..bff31340128 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -2315,15 +2315,9 @@ hash_inner_and_outer(PlannerInfo *root,
                 * able to properly guarantee uniqueness.  Similarly, we can't 
handle
                 * JOIN_FULL and JOIN_RIGHT, because they can produce false null
                 * extended rows.  Also, the resulting path must not be 
parameterized.
-                * We would be able to support JOIN_FULL and JOIN_RIGHT for 
Parallel
-                * Hash, since in that case we're back to a single hash table 
with a
-                * single set of match bits for each batch, but that will 
require
-                * figuring out a deadlock-free way to wait for the probe to 
finish.
                 */
                if (joinrel->consider_parallel &&
                        save_jointype != JOIN_UNIQUE_OUTER &&
-                       save_jointype != JOIN_FULL &&
-                       save_jointype != JOIN_RIGHT &&
                        outerrel->partial_pathlist != NIL &&
                        bms_is_empty(joinrel->lateral_relids))
                {
@@ -2360,9 +2354,13 @@ hash_inner_and_outer(PlannerInfo *root,
                         * total inner path will also be parallel-safe, but if 
not, we'll
                         * have to search for the cheapest safe, 
unparameterized inner
                         * path.  If doing JOIN_UNIQUE_INNER, we can't use any 
alternative
-                        * inner path.
+                        * inner path.  If full or right join, we can't use 
parallelism
+                        * (building the hash table in each backend) because no 
one
+                        * process has all the match bits.
                         */
-                       if (cheapest_total_inner->parallel_safe)
+                       if (save_jointype == JOIN_FULL || save_jointype == 
JOIN_RIGHT)
+                               cheapest_safe_inner = NULL;
+                       else if (cheapest_total_inner->parallel_safe)
                                cheapest_safe_inner = cheapest_total_inner;
                        else if (save_jointype != JOIN_UNIQUE_INNER)
                                cheapest_safe_inner =
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index b1fbaacf5e9..b240d0ae555 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -195,6 +195,7 @@ typedef struct ParallelHashJoinBatch
        size_t          ntuples;                /* number of tuples loaded */
        size_t          old_ntuples;    /* number of tuples before 
repartitioning */
        bool            space_exhausted;
+       bool            skip_unmatched; /* whether to abandon unmatched scan */
 
        /*
         * Variable-sized SharedTuplestore objects follow this struct in memory.
@@ -239,7 +240,7 @@ typedef struct ParallelHashJoinBatchAccessor
        size_t          estimated_size; /* size of partition on disk */
        size_t          old_ntuples;    /* how many tuples before 
repartitioning? */
        bool            at_least_one_chunk; /* has this backend allocated a 
chunk? */
-
+       bool            outer_eof;              /* has this process hit end of 
batch? */
        bool            done;                   /* flag to remember that a 
batch is done */
        SharedTuplestoreAccessor *inner_tuples;
        SharedTuplestoreAccessor *outer_tuples;
@@ -305,7 +306,8 @@ typedef struct ParallelHashJoinState
 #define PHJ_BATCH_ALLOCATING                   1
 #define PHJ_BATCH_LOADING                              2
 #define PHJ_BATCH_PROBING                              3
-#define PHJ_BATCH_DONE                                 4
+#define PHJ_BATCH_SCAN                                 4
+#define PHJ_BATCH_FREE                                 5
 
 /* The phases of batch growth while hashing, for grow_batches_barrier. */
 #define PHJ_GROW_BATCHES_ELECTING              0
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 993de4519b5..36549376ef9 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -64,9 +64,12 @@ extern bool ExecScanHashBucket(HashState *hashState, 
HashJoinState *hjstate,
 extern bool ExecParallelScanHashBucket(HashState *hashState, HashJoinState 
*hjstate,
                                                                           
ExprContext *econtext);
 extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
+extern bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate);
 extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate,
                                                                                
  ExprContext *econtext);
 extern void ExecHashTableReset(HashState *hashState, HashJoinTable hashtable);
+extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+                                                                               
                  ExprContext *econtext);
 extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable);
 extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
                                     uint64 operatorMemKB,
diff --git a/src/test/regress/expected/join_hash.out 
b/src/test/regress/expected/join_hash.out
index 5171a7d9cf3..250704efbd7 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -315,6 +315,13 @@ $$);
  t                    | f
 (1 row)
 
+-- parallel full multi-batch hash join
+select count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
 rollback to settings;
 -- The "bad" case: during execution we need to increase number of
 -- batches; in this case we plan for 1 batch, and increase at least a
@@ -816,8 +823,9 @@ select  count(*) from simple r full outer join simple s 
using (id);
 (1 row)
 
 rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s using (id);
@@ -841,7 +849,32 @@ select  count(*) from simple r full outer join simple s 
using (id);
 (1 row)
 
 rollback to settings;
--- An full outer join where every record is not matched.
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- A full outer join where every record is not matched.
 -- non-parallel
 savepoint settings;
 set local max_parallel_workers_per_gather = 0;
@@ -869,8 +902,9 @@ select  count(*) from simple r full outer join simple s on 
(r.id = 0 - s.id);
 (1 row)
 
 rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s on (r.id = 0 - 
s.id);
@@ -895,6 +929,31 @@ select  count(*) from simple r full outer join simple s on 
(r.id = 0 - s.id);
  120000
 (1 row)
 
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - 
s.id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: ((0 - s.id) = r.id)
+                     ->  Parallel Seq Scan on simple s
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple r
+(9 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
 rollback to settings;
 -- exercise special code paths for huge tuples (note use of non-strict
 -- expression and left join required to get the detoasted tuple into
diff --git a/src/test/regress/sql/join_hash.sql 
b/src/test/regress/sql/join_hash.sql
index 325068e9d23..01961d1ce6e 100644
--- a/src/test/regress/sql/join_hash.sql
+++ b/src/test/regress/sql/join_hash.sql
@@ -191,6 +191,8 @@ select original > 1 as initially_multibatch, final > 
original as increased_batch
 $$
   select count(*) from simple r join simple s using (id);
 $$);
+-- parallel full multi-batch hash join
+select count(*) from simple r full outer join simple s using (id);
 rollback to settings;
 
 -- The "bad" case: during execution we need to increase number of
@@ -438,15 +440,24 @@ explain (costs off)
 select  count(*) from simple r full outer join simple s using (id);
 rollback to settings;
 
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s using (id);
 select  count(*) from simple r full outer join simple s using (id);
 rollback to settings;
 
--- An full outer join where every record is not matched.
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- A full outer join where every record is not matched.
 
 -- non-parallel
 savepoint settings;
@@ -456,14 +467,24 @@ explain (costs off)
 select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 rollback to settings;
 
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s on (r.id = 0 - 
s.id);
 select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 rollback to settings;
 
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - 
s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+
 -- exercise special code paths for huge tuples (note use of non-strict
 -- expression and left join required to get the detoasted tuple into
 -- the hash table)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to