Hi, * avoids wasting memory on duplicated hash tables * avoids wasting disk space on duplicated batch files * avoids wasting CPU executing duplicate subplans
What's the last one referring to? +static void +MultiExecParallelHash(HashState *node) +{ + switch (BarrierPhase(build_barrier)) + { + case PHJ_BUILD_ALLOCATING: + + /* + * Either I just allocated the initial hash table in + * ExecHashTableCreate(), or someone else is doing that. Either + * way, wait for everyone to arrive here so we can proceed, and + * then fall through. + */ + BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATING); Can you add a /* fallthrough */ comment here? Gcc is warning if you don't. While we currently have lotsa other places not having the annotation, it seem reasonable to have it in new code. + case PHJ_BUILD_HASHING_INNER: + + /* + * It's time to begin hashing, or if we just arrived here then + * hashing is already underway, so join in that effort. While + * hashing we have to be prepared to help increase the number of + * batches or buckets at any time, and if we arrived here when + * that was already underway we'll have to help complete that work + * immediately so that it's safe to access batches and buckets + * below. + */ + if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) != + PHJ_GROW_BATCHES_ELECTING) + ExecParallelHashIncreaseNumBatches(hashtable); + if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) != + PHJ_GROW_BUCKETS_ELECTING) + ExecParallelHashIncreaseNumBuckets(hashtable); + ExecParallelHashEnsureBatchAccessors(hashtable); "accessors" sounds a bit weird for a bunch of pointers, but maybe that's just my ESL senses tingling wrongly. /* ---------------------------------------------------------------- @@ -240,12 +427,15 @@ ExecEndHash(HashState *node) * ---------------------------------------------------------------- */ HashJoinTable -ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) +ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls) + /* + * Parallel Hash tries to use the combined work_mem of all workers to + * avoid the need to batch. If that won't work, it falls back to work_mem + * per worker and tries to process batches in parallel. + */ One day we're going to need a better approach to this. I have no idea how, but this per-node, and now per_node * max_parallelism, approach has only implementation simplicity as its benefit. +static HashJoinTuple +ExecParallelHashLoadTuple(HashJoinTable hashtable, MinimalTuple tuple, + dsa_pointer *shared) +{ +static void +ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch) +{ +/* + * Get the first tuple in a given bucket identified by number. + */ +static HashJoinTuple +ExecHashFirstTupleInBucket(HashJoinTable hashtable, int bucketno) +{ + if (hashtable->parallel_state) + { + dsa_pointer p = + dsa_pointer_atomic_read(&hashtable->buckets.shared[bucketno]); Can you make this, and possibly a few other places, more readable by introducing a temporary variable? +/* + * Insert a tuple at the front of a chain of tuples in DSA memory atomically. + */ +static void +ExecParallelHashPushTuple(dsa_pointer_atomic *head, + HashJoinTuple tuple, + dsa_pointer tuple_shared) +{ + do + { + tuple->next.shared = dsa_pointer_atomic_read(head); + } while (!dsa_pointer_atomic_compare_exchange(head, + &tuple->next.shared, + tuple_shared)); +} This is hard to read. + * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may + * be used repeatedly as required to coordinate expansions in the number of + * batches or buckets. Their phases are as follows: + * + * PHJ_GROW_BATCHES_ELECTING -- initial state + * PHJ_GROW_BATCHES_ALLOCATING -- one allocates new batches + * PHJ_GROW_BATCHES_REPARTITIONING -- all rep s/rep/repartition/? #include "access/htup_details.h" +#include "access/parallel.h" #include "executor/executor.h" #include "executor/hashjoin.h" #include "executor/nodeHash.h" #include "executor/nodeHashjoin.h" #include "miscadmin.h" +#include "pgstat.h" #include "utils/memutils.h" - +#include "utils/sharedtuplestore.h" deletes a separator newline. /* ---------------------------------------------------------------- @@ -138,6 +236,18 @@ ExecHashJoin(PlanState *pstate) /* no chance to not build the hash table */ node->hj_FirstOuterTupleSlot = NULL; } + else if (hashNode->parallel_state != NULL) + { + /* + * The empty-outer optimization is not implemented for + * shared hash tables, because no one participant can + * determine that there are no outer tuples, and it's not + * yet clear that it's worth the synchronization overhead + * of reaching consensus to figure that out. So we have + * to build the hash table. + */ + node->hj_FirstOuterTupleSlot = NULL; + } Hm. Isn't MultiExecParallelHash already doing so? - node->hj_JoinState = HJ_NEED_NEW_OUTER; + if (hashtable->parallel_state) + { + Barrier *build_barrier; + + build_barrier = &hashtable->parallel_state->build_barrier; + if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER) + { + /* + * If multi-batch, we need to hash the outer relation + * up front. + */ + if (hashtable->nbatch > 1) + ExecParallelHashJoinPartitionOuter(node); + BarrierArriveAndWait(build_barrier, + WAIT_EVENT_HASH_BUILD_HASHING_OUTER); + } + Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE); + + /* Each backend should now select a batch to work on. */ + hashtable->curbatch = -1; + node->hj_JoinState = HJ_NEED_NEW_BATCH; + + continue; + } + else + node->hj_JoinState = HJ_NEED_NEW_OUTER; You know what I'm going to say about all these branches, and sigh. If we don't split this into two versions, we at least should store hashNode->parallel_state in a local var, so the compiler doesn't have to pull that out of memory after every external function call (of which there are a lot). In common cases it'll end up in a callee saved registers, and most of the called functions won't be too register starved (on x86-64). +/* + * Choose a batch to work on, and attach to it. Returns true if successful, + * false if there are no more batches. + */ +static bool +ExecParallelHashJoinNewBatch(HashJoinState *hjstate) +{ + /* + * This batch is ready to probe. Return control to + * caller. We stay attached to batch_barrier so that the + * hash table stays alive until everyone's finish probing *finished? + case PHJ_BATCH_DONE: + + /* + * Already done. Detach and go around again (if any + * remain). + */ + BarrierDetach(batch_barrier); + + /* + * We didn't work on this batch, but we need to observe + * its size for EXPLAIN. + */ + ExecParallelHashUpdateSpacePeak(hashtable, batchno); + hashtable->batches[batchno].done = true; + hashtable->curbatch = -1; + break; Hm, maybe I'm missing something, but why is it guaranteed that "we didn't work on this batch"? +void +ExecShutdownHashJoin(HashJoinState *node) +{ + /* + * By the time ExecEndHashJoin runs in a worker, shared memory has been + * destroyed. So this is our last chance to do any shared memory cleanup. + */ This comment doesn't really make much sense to me. +void +ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt) +{ could use a header comment. a) The executor side is starting to look good. b) This is a lot of code. c) I'm tired, planner has to wait till tomorrow. - Andres -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers