Hi,

 * avoids wasting memory on duplicated hash tables
 * avoids wasting disk space on duplicated batch files
 * avoids wasting CPU executing duplicate subplans

What's the last one referring to?





+static void
+MultiExecParallelHash(HashState *node)
+{

+       switch (BarrierPhase(build_barrier))
+       {
+               case PHJ_BUILD_ALLOCATING:
+
+                       /*
+                        * Either I just allocated the initial hash table in
+                        * ExecHashTableCreate(), or someone else is doing 
that.  Either
+                        * way, wait for everyone to arrive here so we can 
proceed, and
+                        * then fall through.
+                        */
+                       BarrierArriveAndWait(build_barrier, 
WAIT_EVENT_HASH_BUILD_ALLOCATING);

Can you add a /* fallthrough */ comment here? Gcc is warning if you
don't. While we currently have lotsa other places not having the
annotation, it seem reasonable to have it in new code.


+               case PHJ_BUILD_HASHING_INNER:
+
+                       /*
+                        * It's time to begin hashing, or if we just arrived 
here then
+                        * hashing is already underway, so join in that effort. 
 While
+                        * hashing we have to be prepared to help increase the 
number of
+                        * batches or buckets at any time, and if we arrived 
here when
+                        * that was already underway we'll have to help 
complete that work
+                        * immediately so that it's safe to access batches and 
buckets
+                        * below.
+                        */
+                       if 
(PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) !=
+                               PHJ_GROW_BATCHES_ELECTING)
+                               ExecParallelHashIncreaseNumBatches(hashtable);
+                       if 
(PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) !=
+                               PHJ_GROW_BUCKETS_ELECTING)
+                               ExecParallelHashIncreaseNumBuckets(hashtable);
+                       ExecParallelHashEnsureBatchAccessors(hashtable);

"accessors" sounds a bit weird for a bunch of pointers, but maybe that's
just my ESL senses tingling wrongly.



/* ----------------------------------------------------------------
@@ -240,12 +427,15 @@ ExecEndHash(HashState *node)
  * ----------------------------------------------------------------
  */
 HashJoinTable
-ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
+ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)

+       /*
+        * Parallel Hash tries to use the combined work_mem of all workers to
+        * avoid the need to batch.  If that won't work, it falls back to 
work_mem
+        * per worker and tries to process batches in parallel.
+        */

One day we're going to need a better approach to this. I have no idea
how, but this per-node, and now per_node * max_parallelism, approach has
only implementation simplicity as its benefit.





+static HashJoinTuple
+ExecParallelHashLoadTuple(HashJoinTable hashtable, MinimalTuple tuple,
+                                                 dsa_pointer *shared)
+{

+static void
+ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
+{



+/*
+ * Get the first tuple in a given bucket identified by number.
+ */
+static HashJoinTuple
+ExecHashFirstTupleInBucket(HashJoinTable hashtable, int bucketno)
+{
+       if (hashtable->parallel_state)
+       {
+               dsa_pointer p =
+               dsa_pointer_atomic_read(&hashtable->buckets.shared[bucketno]);

Can you make this, and possibly a few other places, more readable by
introducing a temporary variable?


+/*
+ * Insert a tuple at the front of a chain of tuples in DSA memory atomically.
+ */
+static void
+ExecParallelHashPushTuple(dsa_pointer_atomic *head,
+                                                 HashJoinTuple tuple,
+                                                 dsa_pointer tuple_shared)
+{
+       do
+       {
+               tuple->next.shared = dsa_pointer_atomic_read(head);
+       } while (!dsa_pointer_atomic_compare_exchange(head,
+                                                                               
                  &tuple->next.shared,
+                                                                               
                  tuple_shared));
+}

This is hard to read.


+ * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
+ * be used repeatedly as required to coordinate expansions in the number of
+ * batches or buckets.  Their phases are as follows:
+ *
+ *   PHJ_GROW_BATCHES_ELECTING       -- initial state
+ *   PHJ_GROW_BATCHES_ALLOCATING     -- one allocates new batches
+ *   PHJ_GROW_BATCHES_REPARTITIONING -- all rep
s/rep/repartition/?

 #include "access/htup_details.h"
+#include "access/parallel.h"
 #include "executor/executor.h"
 #include "executor/hashjoin.h"
 #include "executor/nodeHash.h"
 #include "executor/nodeHashjoin.h"
 #include "miscadmin.h"
+#include "pgstat.h"
 #include "utils/memutils.h"
-
+#include "utils/sharedtuplestore.h"

deletes a separator newline.
 
 
 /* ----------------------------------------------------------------
@@ -138,6 +236,18 @@ ExecHashJoin(PlanState *pstate)
                                        /* no chance to not build the hash 
table */
                                        node->hj_FirstOuterTupleSlot = NULL;
                                }
+                               else if (hashNode->parallel_state != NULL)
+                               {
+                                       /*
+                                        * The empty-outer optimization is not 
implemented for
+                                        * shared hash tables, because no one 
participant can
+                                        * determine that there are no outer 
tuples, and it's not
+                                        * yet clear that it's worth the 
synchronization overhead
+                                        * of reaching consensus to figure that 
out.  So we have
+                                        * to build the hash table.
+                                        */
+                                       node->hj_FirstOuterTupleSlot = NULL;
+                               }

Hm. Isn't MultiExecParallelHash already doing so?


-                               node->hj_JoinState = HJ_NEED_NEW_OUTER;
+                               if (hashtable->parallel_state)
+                               {
+                                       Barrier    *build_barrier;
+
+                                       build_barrier = 
&hashtable->parallel_state->build_barrier;
+                                       if (BarrierPhase(build_barrier) == 
PHJ_BUILD_HASHING_OUTER)
+                                       {
+                                               /*
+                                                * If multi-batch, we need to 
hash the outer relation
+                                                * up front.
+                                                */
+                                               if (hashtable->nbatch > 1)
+                                                       
ExecParallelHashJoinPartitionOuter(node);
+                                               
BarrierArriveAndWait(build_barrier,
+                                                                               
         WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
+                                       }
+                                       Assert(BarrierPhase(build_barrier) == 
PHJ_BUILD_DONE);
+
+                                       /* Each backend should now select a 
batch to work on. */
+                                       hashtable->curbatch = -1;
+                                       node->hj_JoinState = HJ_NEED_NEW_BATCH;
+
+                                       continue;
+                               }
+                               else
+                                       node->hj_JoinState = HJ_NEED_NEW_OUTER;

You know what I'm going to say about all these branches, and sigh.

If we don't split this into two versions, we at least should store
hashNode->parallel_state in a local var, so the compiler doesn't have to
pull that out of memory after every external function call (of which
there are a lot). In common cases it'll end up in a callee saved
registers, and most of the called functions won't be too register
starved (on x86-64).



+/*
+ * Choose a batch to work on, and attach to it.  Returns true if successful,
+ * false if there are no more batches.
+ */
+static bool
+ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
+{


+                                       /*
+                                        * This batch is ready to probe.  
Return control to
+                                        * caller. We stay attached to 
batch_barrier so that the
+                                        * hash table stays alive until 
everyone's finish probing

*finished?


+                               case PHJ_BATCH_DONE:
+
+                                       /*
+                                        * Already done.  Detach and go around 
again (if any
+                                        * remain).
+                                        */
+                                       BarrierDetach(batch_barrier);
+
+                                       /*
+                                        * We didn't work on this batch, but we 
need to observe
+                                        * its size for EXPLAIN.
+                                        */
+                                       
ExecParallelHashUpdateSpacePeak(hashtable, batchno);
+                                       hashtable->batches[batchno].done = true;
+                                       hashtable->curbatch = -1;
+                                       break;

Hm, maybe I'm missing something, but why is it guaranteed that "we
didn't work on this batch"?



+void
+ExecShutdownHashJoin(HashJoinState *node)
+{
+       /*
+        * By the time ExecEndHashJoin runs in a worker, shared memory has been
+        * destroyed.  So this is our last chance to do any shared memory 
cleanup.
+        */

This comment doesn't really make much sense to me.


+void
+ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
+{

could use a header comment.



a) The executor side is starting to look good.
b) This is a lot of code.
c) I'm tired, planner has to wait till tomorrow.

- Andres


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to