On Wed, Mar 8, 2017 at 12:58 PM, Andres Freund <and...@anarazel.de> wrote: > 0002: Check hash join work_mem usage at the point of chunk allocation. > > Modify the existing hash join code to detect work_mem exhaustion at > the point where chunks are allocated, instead of checking after every > tuple insertion. This matches the logic used for estimating, and more > importantly allows for some parallelism in later patches. > > diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c > index 406c180..af1b66d 100644 > --- a/src/backend/executor/nodeHash.c > +++ b/src/backend/executor/nodeHash.c > @@ -48,7 +48,8 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable, > int bucketNumber); > static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable); > > -static void *dense_alloc(HashJoinTable hashtable, Size size); > +static void *dense_alloc(HashJoinTable hashtable, Size size, > + bool respect_work_mem); > > I still dislike this, but maybe Robert's point of: > > On 2017-02-16 08:57:21 -0500, Robert Haas wrote: >> On Wed, Feb 15, 2017 at 9:36 PM, Andres Freund <and...@anarazel.de> wrote: >> > Isn't it kinda weird to do this from within dense_alloc()? I mean that >> > dumps a lot of data to disk, frees a bunch of memory and so on - not >> > exactly what "dense_alloc" implies. Isn't the free()ing part also >> > dangerous, because the caller might actually use some of that memory, >> > like e.g. in ExecHashRemoveNextSkewBucket() or such. I haven't looked >> > deeply enough to check whether that's an active bug, but it seems like >> > inviting one if not. >> >> I haven't looked at this, but one idea might be to just rename >> dense_alloc() to ExecHashBlahBlahSomething(). If there's a real >> abstraction layer problem here then we should definitely fix it, but >> maybe it's just the angle at which you hold your head. > > Is enough.
There is a problem here. It can determine that it needs to increase the number of batches, effectively splitting the current batch, but then the caller goes on to insert the current tuple anyway, even though it may no longer belong in this batch. I will post a fix for that soon. I will also refactor it so that it doesn't do that work inside dense_alloc. You're right, that's too weird. In the meantime, here is a new patch series addressing the other things you raised. > 0003: Scan for unmatched tuples in a hash join one chunk at a time. > > > @@ -1152,8 +1155,65 @@ bool > ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) > { > HashJoinTable hashtable = hjstate->hj_HashTable; > - HashJoinTuple hashTuple = hjstate->hj_CurTuple; > + HashJoinTuple hashTuple; > + MinimalTuple tuple; > + > + /* > + * First, process the queue of chunks holding tuples that are in > regular > + * (non-skew) buckets. > + */ > + for (;;) > + { > + /* Do we need a new chunk to scan? */ > + if (hashtable->current_chunk == NULL) > + { > + /* Have we run out of chunks to scan? */ > + if (hashtable->unmatched_chunks == NULL) > + break; > + > + /* Pop the next chunk from the front of the queue. */ > + hashtable->current_chunk = > hashtable->unmatched_chunks; > + hashtable->unmatched_chunks = > hashtable->current_chunk->next; > + hashtable->current_chunk_index = 0; > + } > + > + /* Have we reached the end of this chunk yet? */ > + if (hashtable->current_chunk_index >= > hashtable->current_chunk->used) > + { > + /* Go around again to get the next chunk from the > queue. */ > + hashtable->current_chunk = NULL; > + continue; > + } > + > + /* Take the next tuple from this chunk. */ > + hashTuple = (HashJoinTuple) > + (hashtable->current_chunk->data + > hashtable->current_chunk_index); > + tuple = HJTUPLE_MINTUPLE(hashTuple); > + hashtable->current_chunk_index += > + MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); > + > + /* Is it unmatched? */ > + if (!HeapTupleHeaderHasMatch(tuple)) > + { > + TupleTableSlot *inntuple; > + > + /* insert hashtable's tuple into exec slot */ > + inntuple = ExecStoreMinimalTuple(tuple, > + > hjstate->hj_HashTupleSlot, > + > false); /* do not pfree */ > + econtext->ecxt_innertuple = inntuple; > + > + /* reset context each time (see below for > explanation) */ > + ResetExprContext(econtext); > + return true; > + } > + } > > I suspect this might actually be slower than the current/old logic, > because the current_chunk tests are repeated every loop. I think > retaining the two loops the previous code had makes sense, i.e. one to > find a relevant chunk, and one to iterate through all tuples in a chunk, > checking for an unmatched one. Ok, I've updated it to use two loops as suggested. I couldn't measure any speedup as a result but it's probably better code that way. > Have you run a performance comparison pre/post this patch? I don't > think there'd be a lot, but it seems important to verify that. I'd just > run a tpc-h pre/post comparison (prewarmed, fully cache resident, > parallelism disabled, hugepages is my personal recipe for the least > run-over-run variance). I haven't been able to measure any difference in TPCH results yet. I tried to contrive a simple test where there is a measurable difference. I created a pair of tables and repeatedly ran two FULL OUTER JOIN queries. In Q1 no unmatched tuples are found in the hash table, and in Q2 every tuple in the hash table turns out to be unmatched. I consistently measure just over 10% improvement. CREATE TABLE t1 AS SELECT generate_series(1, 10000000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'; CREATE TABLE t2 AS SELECT generate_series(10000001, 20000000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'; SET work_mem = '1GB'; -- Q1 SELECT COUNT(*) FROM t1 FULL OUTER JOIN t1 other USING (id); -- Q2 SELECT COUNT(*) FROM t1 FULL OUTER JOIN t2 USING (id); master: Q1 = 9.280s, Q2 = 9.645s 0003-hj-refactor-unmatched-v6.patch: Q1 = 8.341s, Q2 = 8.661s 0003-hj-refactor-unmatched-v7.patch: Q1 = 8.186s, Q2 = 8.642s > 0004: Add a barrier primitive for synchronizing backends. > > > +/*------------------------------------------------------------------------- > + * > + * barrier.c > + * Barriers for synchronizing cooperating processes. > + * > + * Copyright (c) 2017, PostgreSQL Global Development Group > + * > + * This implementation of barriers allows for static sets of participants > + * known up front, or dynamic sets of participants which processes can join > + * or leave at any time. In the dynamic case, a phase number can be used to > + * track progress through a parallel algorithm; in the static case it isn't > + * needed. > > Why would a phase id generally not be needed in the static case? > There's also further references to it ("Increments the current phase.") > that dont quite jive with that. I've extended that text at the top to explain. Short version: there is always a phase internally; that comment refers to the need for client code to examine it. Dynamic barrier users probably need to care what it is, since progress can be made while they're not attached so they need a way to find out about that after they attach, but static barriers generally don't need to care about the phase number because nothing can happen without explicit action from all participants so they should be in sync automatically. Hopefully the new comments explain that better. > + * IDENTIFICATION > + * src/backend/storage/ipc/barrier.c > > This could use a short example usage scenario. Without knowing existing > usages of the "pattern", it's probably hard to grasp. Examples added. > + *------------------------------------------------------------------------- > + */ > + > +#include "storage/barrier.h" > > Aren't you missing an include of postgres.h here? Fixed. > +bool > +BarrierWait(Barrier *barrier, uint32 wait_event_info) > +{ > + bool first; > + bool last; > + int start_phase; > + int next_phase; > + > + SpinLockAcquire(&barrier->mutex); > + start_phase = barrier->phase; > + next_phase = start_phase + 1; > + ++barrier->arrived; > + if (barrier->arrived == 1) > + first = true; > + else > + first = false; > + if (barrier->arrived == barrier->participants) > + { > + last = true; > + barrier->arrived = 0; > + barrier->phase = next_phase; > + } > + else > + last = false; > + SpinLockRelease(&barrier->mutex); > > Hm. So what's the defined concurrency protocol for non-static barriers, > when they attach after the spinlock here has been released? I think the > concurrency aspects deserve some commentary. Afaics it'll correctly > just count as the next phase - without any blocking - but that shouldn't > have to be inferred. It may join at start_phase or next_phase depending on what happened above. If it we just advanced the phase (by being the last to arrive) then another backend that attaches will be joining at phase == next_phase, and if that new backend calls BarrierWait it'll be waiting for the phase after that. > Things might get wonky if that new participant > then starts waiting for the new phase, violating the assert below... > + Assert(barrier->phase == start_phase || barrier->phase == > next_phase); I've added a comment near that assertion that explains the reason the assertion holds. Short version: The caller is attached, so there is no way for the phase to advance beyond next_phase without the caller's participation; the only possibilities to consider in the wait loop are "we're still waiting" or "the final participant arrived or detached, advancing the phase and releasing me". Put another way, no waiting backend can ever see phase advance beyond next_phase, because in order to do so, the waiting backend would need to run BarrierWait again; barrier->arrived can never reach barrier->participants a second time while we're in that wait loop. > +/* > + * Detach from a barrier. This may release other waiters from BarrierWait > and > + * advance the phase, if they were only waiting for this backend. Return > + * true if this participant was the last to detach. > + */ > +bool > +BarrierDetach(Barrier *barrier) > +{ > + bool release; > + bool last; > + > + SpinLockAcquire(&barrier->mutex); > + Assert(barrier->participants > 0); > + --barrier->participants; > + > + /* > + * If any other participants are waiting and we were the last > participant > + * waited for, release them. > + */ > + if (barrier->participants > 0 && > + barrier->arrived == barrier->participants) > + { > + release = true; > + barrier->arrived = 0; > + barrier->phase++; > + } > + else > + release = false; > + > + last = barrier->participants == 0; > + SpinLockRelease(&barrier->mutex); > + > + if (release) > + ConditionVariableBroadcast(&barrier->condition_variable); > + > + return last; > +} > > Doesn't this, again, run into danger of leading to an assert failure in > the loop in BarrierWait? I believe this code is correct. The assertion in BarrierWait can't fail, because waiters know that there is no way for the phase to get any further ahead without their help (because they are attached): again, the only possibilities are phase == start_phase (implying that they received a spurious condition variable signal) or phase == next_phase (the last backend being waited on has finally arrived or detached, allowing other participants to proceed). I've attached a test module that starts N workers, and makes the workers attach, call BarrierWait a random number of times, then detach, and then rinse and repeat, until the phase reaches some large number and they all exit. This exercises every interleaving of the attach, wait, detach. CREATE EXTENSION test_barrier, then something like SELECT test_barrier_reattach_random(4, 1000000) to verify that no assertions are thrown and it always completes. > +#include "postgres.h" > > Huh, that normally shouldn't be in a header. I see you introduced that > in a bunch of other places too - that really doesn't look right to me. Fixed. -- Thomas Munro http://www.enterprisedb.com
parallel-shared-hash-v7.tgz
Description: GNU Zip compressed data
test-barrier.patch
Description: Binary data
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers