Hi,

On 2017-03-27 22:33:03 -0700, Andres Freund wrote:
> On 2017-03-23 20:35:09 +1300, Thomas Munro wrote:
> > Here is a new patch series responding to feedback from Peter and Andres:
> 
> Here's a review of 0007 & 0010 together - they're going to have to be
> applied together anyway...
> ...
> ok, ENOTIME for today...

Continuing, where I dropped of tiredly yesterday.


-               ExecHashJoinSaveTuple(tuple,
-                                                         hashvalue,
-                                                         
&hashtable->innerBatchFile[batchno]);
+               if (HashJoinTableIsShared(hashtable))
+                       sts_puttuple(hashtable->shared_inner_batches, batchno, 
&hashvalue,
+                                                tuple);
+               else
+                       ExecHashJoinSaveTuple(tuple,
+                                                                 hashvalue,
+                                                                 
&hashtable->innerBatchFile[batchno]);
        }
 }

Why isn't this done inside of ExecHashJoinSaveTuple?




@@ -1280,6 +1785,68 @@ ExecHashTableReset(HashJoinTable hashtable)

+                       /* Rewind the shared read heads for this batch, inner 
and outer. */
+                       
sts_prepare_parallel_read(hashtable->shared_inner_batches,
+                                                                         
curbatch);
+                       
sts_prepare_parallel_read(hashtable->shared_outer_batches,
+                                                                         
curbatch);

It feels somewhat wrong to do this in here, rather than on the callsites.

+               }
+
+               /*
+                * Each participant needs to make sure that data it has written 
for
+                * this partition is now read-only and visible to other 
participants.
+                */
+               sts_end_write(hashtable->shared_inner_batches, curbatch);
+               sts_end_write(hashtable->shared_outer_batches, curbatch);
+
+               /*
+                * Wait again, so that all workers see the new hash table and 
can
+                * safely read from batch files from any participant because 
they have
+                * all ended writing.
+                */
+               Assert(BarrierPhase(&hashtable->shared->barrier) ==
+                          PHJ_PHASE_RESETTING_BATCH(curbatch));
+               BarrierWait(&hashtable->shared->barrier, 
WAIT_EVENT_HASH_RESETTING);
+               Assert(BarrierPhase(&hashtable->shared->barrier) ==
+                          PHJ_PHASE_LOADING_BATCH(curbatch));
+               ExecHashUpdate(hashtable);
+
+               /* Forget the current chunks. */
+               hashtable->current_chunk = NULL;
+               return;
+       }
 
        /*
         * Release all the hash buckets and tuples acquired in the prior pass, 
and
@@ -1289,10 +1856,10 @@ ExecHashTableReset(HashJoinTable hashtable)
        oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
 
        /* Reallocate and reinitialize the hash bucket headers. */
-       hashtable->buckets = (HashJoinTuple *)
-               palloc0(nbuckets * sizeof(HashJoinTuple));
+       hashtable->buckets = (HashJoinBucketHead *)
+               palloc0(nbuckets * sizeof(HashJoinBucketHead));
 
-       hashtable->spaceUsed = nbuckets * sizeof(HashJoinTuple);
+       hashtable->spaceUsed = nbuckets * sizeof(HashJoinBucketHead);
 
        /* Cannot be more than our previous peak; we had this size before. */
        Assert(hashtable->spaceUsed <= hashtable->spacePeak);
@@ -1301,6 +1868,22 @@ ExecHashTableReset(HashJoinTable hashtable)
 
        /* Forget the chunks (the memory was freed by the context reset above). 
*/
        hashtable->chunks = NULL;
+
+       /* Rewind the shared read heads for this batch, inner and outer. */
+       if (hashtable->innerBatchFile[curbatch] != NULL)
+       {
+               if (BufFileSeek(hashtable->innerBatchFile[curbatch], 0, 0L, 
SEEK_SET))
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                  errmsg("could not rewind hash-join temporary 
file: %m")));
+       }
+       if (hashtable->outerBatchFile[curbatch] != NULL)
+       {
+               if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, 
SEEK_SET))
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                  errmsg("could not rewind hash-join temporary 
file: %m")));
+       }
 }
 
 /*
@@ -1310,12 +1893,21 @@ ExecHashTableReset(HashJoinTable hashtable)
 void
 ExecHashTableResetMatchFlags(HashJoinTable hashtable)
 {
+       dsa_pointer chunk_shared = InvalidDsaPointer;
        HashMemoryChunk chunk;
        HashJoinTuple tuple;
        int                     i;
 
        /* Reset all flags in the main table ... */
-       chunk = hashtable->chunks;
+       if (HashJoinTableIsShared(hashtable))
+       {
+               /* This only runs in the leader during rescan initialization. */
+               Assert(!IsParallelWorker());
+               hashtable->shared->chunk_work_queue = hashtable->shared->chunks;
+               chunk = pop_chunk_queue(hashtable, &chunk_shared);
+       }
+       else
+               chunk = hashtable->chunks;

Hm - doesn't pop_chunk_queue empty the work queue?


+/*
+ * Load a tuple into shared dense storage, like 'load_private_tuple'.  This
+ * version is for shared hash tables.
+ */
+static HashJoinTuple
+load_shared_tuple(HashJoinTable hashtable, MinimalTuple tuple,
+                                 dsa_pointer *shared, bool respect_work_mem)
+{

Hm. Are there issues with "blessed" records being stored in shared
memory?  I seem to recall you talking about it, but I see nothing
addressing the issue here?    (later) Ah, I see - you just prohibit
paralleism in that case - might be worth pointing to.


+       /* Check if some other participant has increased nbatch. */
+       if (hashtable->shared->nbatch > hashtable->nbatch)
+       {
+               Assert(respect_work_mem);
+               ExecHashIncreaseNumBatches(hashtable, 
hashtable->shared->nbatch);
+       }
+
+       /* Check if we need to help shrinking. */
+       if (hashtable->shared->shrink_needed && respect_work_mem)
+       {
+               hashtable->current_chunk = NULL;
+               LWLockRelease(&hashtable->shared->chunk_lock);
+               return NULL;
+       }
+
+       /* Oversized tuples get their own chunk. */
+       if (size > HASH_CHUNK_THRESHOLD)
+               chunk_size = size + HASH_CHUNK_HEADER_SIZE;
+       else
+               chunk_size = HASH_CHUNK_SIZE;
+
+       /* If appropriate, check if work_mem would be exceeded by a new chunk. 
*/
+       if (respect_work_mem &&
+               hashtable->shared->grow_enabled &&
+               hashtable->shared->nbatch <= MAX_BATCHES_BEFORE_INCREASES_STOP 
&&
+               (hashtable->shared->size +
+                chunk_size) > (work_mem * 1024L *
+                                               
hashtable->shared->planned_participants))
+       {
+               /*
+                * It would be exceeded.  Let's increase the number of batches, 
so we
+                * can try to shrink the hash table.
+                */
+               hashtable->shared->nbatch *= 2;
+               ExecHashIncreaseNumBatches(hashtable, 
hashtable->shared->nbatch);
+               hashtable->shared->chunk_work_queue = hashtable->shared->chunks;
+               hashtable->shared->chunks = InvalidDsaPointer;
+               hashtable->shared->shrink_needed = true;
+               hashtable->current_chunk = NULL;
+               LWLockRelease(&hashtable->shared->chunk_lock);
+
+               /* The caller needs to shrink the hash table. */
+               return NULL;
+       }

Hm - we could end up calling ExecHashIncreaseNumBatches twice here?
Probably harmless.




/* ----------------------------------------------------------------
  *             ExecHashJoin
@@ -129,6 +200,14 @@ ExecHashJoin(HashJoinState *node)
                                        /* no chance to not build the hash 
table */
                                        node->hj_FirstOuterTupleSlot = NULL;
                                }
+                               else if (hashNode->shared_table_data != NULL)
+                               {
+                                       /*
+                                        * The empty-outer optimization is not 
implemented for
+                                        * shared hash tables yet.
+                                        */
+                                       node->hj_FirstOuterTupleSlot = NULL;

Hm, why is this checking for the shared-ness of the join in a different
manner?


+                                       if (HashJoinTableIsShared(hashtable))
+                                       {
+                                               /*
+                                                * An important optimization: 
if this is a
+                                                * single-batch join and not an 
outer join, there is
+                                                * no reason to synchronize 
again when we've finished
+                                                * probing.
+                                                */
+                                               
Assert(BarrierPhase(&hashtable->shared->barrier) ==
+                                                          
PHJ_PHASE_PROBING_BATCH(hashtable->curbatch));
+                                               if (hashtable->nbatch == 1 && 
!HJ_FILL_INNER(node))
+                                                       return NULL;    /* end 
of join */
+
+                                               /*
+                                                * Check if we are a leader 
that can't go further than
+                                                * probing the first batch, to 
avoid risk of deadlock
+                                                * against workers.
+                                                */
+                                               if 
(!LeaderGateCanContinue(&hashtable->shared->leader_gate))
+                                               {
+                                                       /*
+                                                        * Other backends will 
need to handle all future
+                                                        * batches written by 
me.  We don't detach until
+                                                        * after we've finished 
writing to all batches so
+                                                        * that they are 
flushed, otherwise another
+                                                        * participant might 
try to read them too soon.
+                                                        */
+                                                       
sts_end_write_all_partitions(hashNode->shared_inner_batches);
+                                                       
sts_end_write_all_partitions(hashNode->shared_outer_batches);
+                                                       
BarrierDetach(&hashtable->shared->barrier);
+                                                       
hashtable->detached_early = true;
+                                                       return NULL;
+                                               }
+
+                                               /*
+                                                * We can't start searching for 
unmatched tuples until
+                                                * all participants have 
finished probing, so we
+                                                * synchronize here.
+                                                */
+                                               
Assert(BarrierPhase(&hashtable->shared->barrier) ==
+                                                          
PHJ_PHASE_PROBING_BATCH(hashtable->curbatch));
+                                               if 
(BarrierWait(&hashtable->shared->barrier,
+                                                                               
WAIT_EVENT_HASHJOIN_PROBING))
+                                               {
+                                                       /* Serial phase: 
prepare for unmatched. */
+                                                       if (HJ_FILL_INNER(node))
+                                                       {
+                                                               
hashtable->shared->chunk_work_queue =
+                                                                       
hashtable->shared->chunks;
+                                                               
hashtable->shared->chunks = InvalidDsaPointer;
+                                                       }
+                                               }

Couldn't we skip that if this isn't an outer join?  Not sure if the
complication would be worth it...


+void
+ExecShutdownHashJoin(HashJoinState *node)
+{
+       /*
+        * By the time ExecEndHashJoin runs in a work, shared memory has been

s/work/worker/

+        * destroyed.  So this is our last chance to do any shared memory 
cleanup.
+        */
+       if (node->hj_HashTable)
+               ExecHashTableDetach(node->hj_HashTable);
+}

+           There is no extra charge
+        * for probing the hash table for outer path row, on the basis that
+        * read-only access to a shared hash table shouldn't be any more
+        * expensive.
+        */

Hm, that's debatable. !shared will mostly be on the local numa node,
shared probably not.


* Get hash table size that executor would use for inner relation.
         *
+        * Shared hash tables are allowed to use the work_mem of all 
participants
+        * combined to make up for the fact that there is only one copy shared 
by
+        * all.

Hm. I don't quite understand that reasoning.


         * XXX for the moment, always assume that skew optimization will be
         * performed.  As long as SKEW_WORK_MEM_PERCENT is small, it's not worth
         * trying to determine that for sure.

If we don't do skew for parallelism, should we skip that bit?



- Andres


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to