Hi, On 2017-03-27 22:33:03 -0700, Andres Freund wrote: > On 2017-03-23 20:35:09 +1300, Thomas Munro wrote: > > Here is a new patch series responding to feedback from Peter and Andres: > > Here's a review of 0007 & 0010 together - they're going to have to be > applied together anyway... > ... > ok, ENOTIME for today...
Continuing, where I dropped of tiredly yesterday. - ExecHashJoinSaveTuple(tuple, - hashvalue, - &hashtable->innerBatchFile[batchno]); + if (HashJoinTableIsShared(hashtable)) + sts_puttuple(hashtable->shared_inner_batches, batchno, &hashvalue, + tuple); + else + ExecHashJoinSaveTuple(tuple, + hashvalue, + &hashtable->innerBatchFile[batchno]); } } Why isn't this done inside of ExecHashJoinSaveTuple? @@ -1280,6 +1785,68 @@ ExecHashTableReset(HashJoinTable hashtable) + /* Rewind the shared read heads for this batch, inner and outer. */ + sts_prepare_parallel_read(hashtable->shared_inner_batches, + curbatch); + sts_prepare_parallel_read(hashtable->shared_outer_batches, + curbatch); It feels somewhat wrong to do this in here, rather than on the callsites. + } + + /* + * Each participant needs to make sure that data it has written for + * this partition is now read-only and visible to other participants. + */ + sts_end_write(hashtable->shared_inner_batches, curbatch); + sts_end_write(hashtable->shared_outer_batches, curbatch); + + /* + * Wait again, so that all workers see the new hash table and can + * safely read from batch files from any participant because they have + * all ended writing. + */ + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_RESETTING_BATCH(curbatch)); + BarrierWait(&hashtable->shared->barrier, WAIT_EVENT_HASH_RESETTING); + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_LOADING_BATCH(curbatch)); + ExecHashUpdate(hashtable); + + /* Forget the current chunks. */ + hashtable->current_chunk = NULL; + return; + } /* * Release all the hash buckets and tuples acquired in the prior pass, and @@ -1289,10 +1856,10 @@ ExecHashTableReset(HashJoinTable hashtable) oldcxt = MemoryContextSwitchTo(hashtable->batchCxt); /* Reallocate and reinitialize the hash bucket headers. */ - hashtable->buckets = (HashJoinTuple *) - palloc0(nbuckets * sizeof(HashJoinTuple)); + hashtable->buckets = (HashJoinBucketHead *) + palloc0(nbuckets * sizeof(HashJoinBucketHead)); - hashtable->spaceUsed = nbuckets * sizeof(HashJoinTuple); + hashtable->spaceUsed = nbuckets * sizeof(HashJoinBucketHead); /* Cannot be more than our previous peak; we had this size before. */ Assert(hashtable->spaceUsed <= hashtable->spacePeak); @@ -1301,6 +1868,22 @@ ExecHashTableReset(HashJoinTable hashtable) /* Forget the chunks (the memory was freed by the context reset above). */ hashtable->chunks = NULL; + + /* Rewind the shared read heads for this batch, inner and outer. */ + if (hashtable->innerBatchFile[curbatch] != NULL) + { + if (BufFileSeek(hashtable->innerBatchFile[curbatch], 0, 0L, SEEK_SET)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rewind hash-join temporary file: %m"))); + } + if (hashtable->outerBatchFile[curbatch] != NULL) + { + if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rewind hash-join temporary file: %m"))); + } } /* @@ -1310,12 +1893,21 @@ ExecHashTableReset(HashJoinTable hashtable) void ExecHashTableResetMatchFlags(HashJoinTable hashtable) { + dsa_pointer chunk_shared = InvalidDsaPointer; HashMemoryChunk chunk; HashJoinTuple tuple; int i; /* Reset all flags in the main table ... */ - chunk = hashtable->chunks; + if (HashJoinTableIsShared(hashtable)) + { + /* This only runs in the leader during rescan initialization. */ + Assert(!IsParallelWorker()); + hashtable->shared->chunk_work_queue = hashtable->shared->chunks; + chunk = pop_chunk_queue(hashtable, &chunk_shared); + } + else + chunk = hashtable->chunks; Hm - doesn't pop_chunk_queue empty the work queue? +/* + * Load a tuple into shared dense storage, like 'load_private_tuple'. This + * version is for shared hash tables. + */ +static HashJoinTuple +load_shared_tuple(HashJoinTable hashtable, MinimalTuple tuple, + dsa_pointer *shared, bool respect_work_mem) +{ Hm. Are there issues with "blessed" records being stored in shared memory? I seem to recall you talking about it, but I see nothing addressing the issue here? (later) Ah, I see - you just prohibit paralleism in that case - might be worth pointing to. + /* Check if some other participant has increased nbatch. */ + if (hashtable->shared->nbatch > hashtable->nbatch) + { + Assert(respect_work_mem); + ExecHashIncreaseNumBatches(hashtable, hashtable->shared->nbatch); + } + + /* Check if we need to help shrinking. */ + if (hashtable->shared->shrink_needed && respect_work_mem) + { + hashtable->current_chunk = NULL; + LWLockRelease(&hashtable->shared->chunk_lock); + return NULL; + } + + /* Oversized tuples get their own chunk. */ + if (size > HASH_CHUNK_THRESHOLD) + chunk_size = size + HASH_CHUNK_HEADER_SIZE; + else + chunk_size = HASH_CHUNK_SIZE; + + /* If appropriate, check if work_mem would be exceeded by a new chunk. */ + if (respect_work_mem && + hashtable->shared->grow_enabled && + hashtable->shared->nbatch <= MAX_BATCHES_BEFORE_INCREASES_STOP && + (hashtable->shared->size + + chunk_size) > (work_mem * 1024L * + hashtable->shared->planned_participants)) + { + /* + * It would be exceeded. Let's increase the number of batches, so we + * can try to shrink the hash table. + */ + hashtable->shared->nbatch *= 2; + ExecHashIncreaseNumBatches(hashtable, hashtable->shared->nbatch); + hashtable->shared->chunk_work_queue = hashtable->shared->chunks; + hashtable->shared->chunks = InvalidDsaPointer; + hashtable->shared->shrink_needed = true; + hashtable->current_chunk = NULL; + LWLockRelease(&hashtable->shared->chunk_lock); + + /* The caller needs to shrink the hash table. */ + return NULL; + } Hm - we could end up calling ExecHashIncreaseNumBatches twice here? Probably harmless. /* ---------------------------------------------------------------- * ExecHashJoin @@ -129,6 +200,14 @@ ExecHashJoin(HashJoinState *node) /* no chance to not build the hash table */ node->hj_FirstOuterTupleSlot = NULL; } + else if (hashNode->shared_table_data != NULL) + { + /* + * The empty-outer optimization is not implemented for + * shared hash tables yet. + */ + node->hj_FirstOuterTupleSlot = NULL; Hm, why is this checking for the shared-ness of the join in a different manner? + if (HashJoinTableIsShared(hashtable)) + { + /* + * An important optimization: if this is a + * single-batch join and not an outer join, there is + * no reason to synchronize again when we've finished + * probing. + */ + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_PROBING_BATCH(hashtable->curbatch)); + if (hashtable->nbatch == 1 && !HJ_FILL_INNER(node)) + return NULL; /* end of join */ + + /* + * Check if we are a leader that can't go further than + * probing the first batch, to avoid risk of deadlock + * against workers. + */ + if (!LeaderGateCanContinue(&hashtable->shared->leader_gate)) + { + /* + * Other backends will need to handle all future + * batches written by me. We don't detach until + * after we've finished writing to all batches so + * that they are flushed, otherwise another + * participant might try to read them too soon. + */ + sts_end_write_all_partitions(hashNode->shared_inner_batches); + sts_end_write_all_partitions(hashNode->shared_outer_batches); + BarrierDetach(&hashtable->shared->barrier); + hashtable->detached_early = true; + return NULL; + } + + /* + * We can't start searching for unmatched tuples until + * all participants have finished probing, so we + * synchronize here. + */ + Assert(BarrierPhase(&hashtable->shared->barrier) == + PHJ_PHASE_PROBING_BATCH(hashtable->curbatch)); + if (BarrierWait(&hashtable->shared->barrier, + WAIT_EVENT_HASHJOIN_PROBING)) + { + /* Serial phase: prepare for unmatched. */ + if (HJ_FILL_INNER(node)) + { + hashtable->shared->chunk_work_queue = + hashtable->shared->chunks; + hashtable->shared->chunks = InvalidDsaPointer; + } + } Couldn't we skip that if this isn't an outer join? Not sure if the complication would be worth it... +void +ExecShutdownHashJoin(HashJoinState *node) +{ + /* + * By the time ExecEndHashJoin runs in a work, shared memory has been s/work/worker/ + * destroyed. So this is our last chance to do any shared memory cleanup. + */ + if (node->hj_HashTable) + ExecHashTableDetach(node->hj_HashTable); +} + There is no extra charge + * for probing the hash table for outer path row, on the basis that + * read-only access to a shared hash table shouldn't be any more + * expensive. + */ Hm, that's debatable. !shared will mostly be on the local numa node, shared probably not. * Get hash table size that executor would use for inner relation. * + * Shared hash tables are allowed to use the work_mem of all participants + * combined to make up for the fact that there is only one copy shared by + * all. Hm. I don't quite understand that reasoning. * XXX for the moment, always assume that skew optimization will be * performed. As long as SKEW_WORK_MEM_PERCENT is small, it's not worth * trying to determine that for sure. If we don't do skew for parallelism, should we skip that bit? - Andres -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers