On 12.8.2014 00:30, Tomas Vondra wrote:
> On 11.8.2014 20:25, Robert Haas wrote:
>> It also strikes me that when there's only 1 batch, the set of bits
>> that map onto the batch number is zero-width, and one zero-width bit
>> range is as good as another.  In other words, if you're only planning
>> to do one batch, you can easily grow the number of buckets on the fly.
>> Growing the number of buckets only becomes difficult once you have
>> more than one batch.

...

> I was considering using reversing the bits of the hash, because that's
> pretty much the simplest solution. But I think you're right it might
> actually work like this:
>
>   * Are more batches needed?
>
>       (yes) => just use nbuckets = my_log2(work_mem / tuple_size)
>
>       (no) => go ahead, until processing all tuples or hitting work_mem
>
>               (work_mem) => meh, use the same nbuckets above
>
>               (all tuples) => compute optimal nbuckets / resize
>
>
> But I need to think about this a bit. So far it seems to me there's no
> way additional batches might benefit from increasing nbuckets further.

I think this is a simple and solid solution, solving the batchno
computation issues quite nicely. Attached is v10 patch (bare and
combined with the dense allocation), that does this:

1) when we know we'll need batching, buckets are sized for full work_mem
   (using the estimated tuple width, etc.)

2) without the batching, we estimate the 'right number of buckets' for
   the estimated number of tuples, and keep track of the optimal number
   as tuples are added to the hash table

   - if we discover we need to start batching, we keep the current
     optimal value (which should be the same as the max number of
     buckets) and don't mess with it anymore (making it possible to
     compute batch IDs just like before)

   - also, on the fist rebatch (nbatch=1 => nbatch=2) the hash table
     is resized as part of the rebatch

   - if the hash build completes without batching, we do the resize

I believe the patch is pretty much perfect. I plan to do more thorough
testing on a wide range of queries in the next few days.

I also removed the 'enable_hash_resize' GUC, because it would be more
complex to implement this properly after doing the resize as part of
rebatch etc.. So either it would make the patch more complex, or it
wouldn't do what the name promises.

regards
Tomas

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 781a736..ee3fffb 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -1900,18 +1900,20 @@ show_hash_info(HashState *hashstate, ExplainState *es)
 		if (es->format != EXPLAIN_FORMAT_TEXT)
 		{
 			ExplainPropertyLong("Hash Buckets", hashtable->nbuckets, es);
+			ExplainPropertyLong("Original Hash Buckets",
+								hashtable->nbuckets_original, es);
 			ExplainPropertyLong("Hash Batches", hashtable->nbatch, es);
 			ExplainPropertyLong("Original Hash Batches",
 								hashtable->nbatch_original, es);
 			ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es);
 		}
-		else if (hashtable->nbatch_original != hashtable->nbatch)
+		else if ((hashtable->nbatch_original != hashtable->nbatch) || (hashtable->nbuckets_original != hashtable->nbuckets))
 		{
 			appendStringInfoSpaces(es->str, es->indent * 2);
 			appendStringInfo(es->str,
-			"Buckets: %d  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
-							 hashtable->nbuckets, hashtable->nbatch,
-							 hashtable->nbatch_original, spacePeakKb);
+			"Buckets: %d (originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
+							 hashtable->nbuckets, hashtable->nbuckets_original,
+							 hashtable->nbatch, hashtable->nbatch_original, spacePeakKb);
 		}
 		else
 		{
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 589b2f1..a5d4812 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -37,8 +37,8 @@
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
 
-
 static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
+static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable);
 static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node,
 					  int mcvsToUse);
 static void ExecHashSkewTableInsert(HashJoinTable hashtable,
@@ -47,6 +47,10 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable,
 						int bucketNumber);
 static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
 
+static char * chunk_alloc(HashJoinTable hashtable, int tupleSize);
+
+/* Memory needed for optimal number of buckets. */
+#define BUCKETS_SPACE(htab)	((htab)->nbuckets_optimal * sizeof(HashJoinTuple))
 
 /* ----------------------------------------------------------------
  *		ExecHash
@@ -116,6 +120,7 @@ MultiExecHash(HashState *node)
 				/* It's a skew tuple, so put it into that hash table */
 				ExecHashSkewTableInsert(hashtable, slot, hashvalue,
 										bucketNumber);
+				hashtable->skewTuples += 1;
 			}
 			else
 			{
@@ -126,10 +131,30 @@ MultiExecHash(HashState *node)
 		}
 	}
 
+	/* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
+	if (hashtable->nbuckets != hashtable->nbuckets_optimal) {
+
+		/* We never decrease the number of buckets. */
+		Assert(hashtable->nbuckets_optimal > hashtable->nbuckets);
+
+#ifdef HJDEBUG
+		printf("Increasing nbuckets %d => %d\n",
+			   hashtable->nbuckets, hashtable->nbuckets_optimal);
+#endif
+
+		ExecHashIncreaseNumBuckets(hashtable);
+
+	}
+
+	/* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+	hashtable->spaceUsed += BUCKETS_SPACE(hashtable);
+	if (hashtable->spaceUsed > hashtable->spacePeak)
+			hashtable->spacePeak = hashtable->spaceUsed;
+
 	/* must provide our own instrumentation support */
 	if (node->ps.instrument)
 		InstrStopNode(node->ps.instrument, hashtable->totalTuples);
-
+	
 	/*
 	 * We do not return the hash table directly because it's not a subtype of
 	 * Node, and so would violate the MultiExecProcNode API.  Instead, our
@@ -223,7 +248,6 @@ ExecEndHash(HashState *node)
 	ExecEndNode(outerPlan);
 }
 
-
 /* ----------------------------------------------------------------
  *		ExecHashTableCreate
  *
@@ -271,7 +295,10 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
 	 */
 	hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));
 	hashtable->nbuckets = nbuckets;
+	hashtable->nbuckets_original = nbuckets;
+	hashtable->nbuckets_optimal = nbuckets;
 	hashtable->log2_nbuckets = log2_nbuckets;
+	hashtable->log2_nbuckets_optimal = log2_nbuckets;
 	hashtable->buckets = NULL;
 	hashtable->keepNulls = keepNulls;
 	hashtable->skewEnabled = false;
@@ -285,6 +312,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
 	hashtable->nbatch_outstart = nbatch;
 	hashtable->growEnabled = true;
 	hashtable->totalTuples = 0;
+	hashtable->skewTuples = 0;
 	hashtable->innerBatchFile = NULL;
 	hashtable->outerBatchFile = NULL;
 	hashtable->spaceUsed = 0;
@@ -294,6 +322,8 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
 	hashtable->spaceAllowedSkew =
 		hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100;
 
+	hashtable->chunks_batch = NULL;
+
 	/*
 	 * Get info about the hash functions to be used for each hash key. Also
 	 * remember whether the join operators are strict.
@@ -375,7 +405,6 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
 	return hashtable;
 }
 
-
 /*
  * Compute appropriate size for hashtable given the estimated size of the
  * relation to be hashed (number of rows and average row width).
@@ -384,7 +413,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
  */
 
 /* Target bucket loading (tuples per bucket) */
-#define NTUP_PER_BUCKET			10
+#define NTUP_PER_BUCKET			1
 
 void
 ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
@@ -394,6 +423,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 {
 	int			tupsize;
 	double		inner_rel_bytes;
+	double		buckets_bytes;
 	long		hash_table_bytes;
 	long		skew_table_bytes;
 	long		max_pointers;
@@ -416,6 +446,13 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 	inner_rel_bytes = ntuples * tupsize;
 
 	/*
+	 * Estimate memory needed for buckets, assuming all the tuples fit into
+	 * a single batch (consider NTUP_PER_BUCKET tuples per bucket) - buckets
+	 * are just usual 'HashJoinTuple' (pointers to HashJoinTupleData).
+	 */
+	buckets_bytes = sizeof(HashJoinTuple) * my_log2(ntuples / NTUP_PER_BUCKET);
+
+	/*
 	 * Target in-memory hashtable size is work_mem kilobytes.
 	 */
 	hash_table_bytes = work_mem * 1024L;
@@ -466,23 +503,56 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 	/* also ensure we avoid integer overflow in nbatch and nbuckets */
 	max_pointers = Min(max_pointers, INT_MAX / 2);
 
-	if (inner_rel_bytes > hash_table_bytes)
+	if ((inner_rel_bytes + buckets_bytes) > hash_table_bytes)
 	{
 		/* We'll need multiple batches */
 		long		lbuckets;
 		double		dbatch;
 		int			minbatch;
-
-		lbuckets = (hash_table_bytes / tupsize) / NTUP_PER_BUCKET;
-		lbuckets = Min(lbuckets, max_pointers);
-		nbuckets = (int) lbuckets;
+		double		batch_bytes;
 
 		dbatch = ceil(inner_rel_bytes / hash_table_bytes);
 		dbatch = Min(dbatch, max_pointers);
 		minbatch = (int) dbatch;
 		nbatch = 2;
+
+		/* increase nbatch until the hashtable fits into work_mem */
 		while (nbatch < minbatch)
 			nbatch <<= 1;
+
+		/* assuming equally-sized batches, compute bytes of hash table */
+		batch_bytes = inner_rel_bytes / nbatch;
+
+		/* when batching, for the buckets, assume full work_mem */
+		lbuckets = 1 << my_log2(hash_table_bytes / (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple)));
+		buckets_bytes = lbuckets * sizeof(HashJoinTuple);
+
+		/*
+		 * XXX the while loop should not really repeat more than once, or maybe twice,
+		 * becase at this point (batch_bytes < work_mem) and it only may not fit
+		 * if (batch_bytes + buckets_bytes > work_mem). But the tuples are generally
+		 * much larger than buckets (which are just pointers).
+		 *
+		 * Adding another batch reduces the _expected_ number of tuples in the batch,
+		 * however the buckets are sized for full work_mem utilization (e.g. because
+		 * of underestimate, which is a common problem). So the while loop only tweak
+		 * batch_bytes, not the number of buckets (or buckets_bytes).
+		 */
+
+		/* we need to get both tuples and buckets into work_mem */
+		while (batch_bytes + buckets_bytes > hash_table_bytes) {
+
+			/* increment number of batches, and re-evaluate the amount of memory */
+			nbatch <<= 1;
+
+			/* assuming equally-sized batches, compute bytes of hash table and buckets */
+			batch_bytes = inner_rel_bytes / nbatch;
+
+		}
+
+		/* protect against nbucket overflow */
+		lbuckets = Min(lbuckets, max_pointers);
+		nbuckets = (int) lbuckets;
 	}
 	else
 	{
@@ -556,11 +626,12 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	int			oldnbatch = hashtable->nbatch;
 	int			curbatch = hashtable->curbatch;
 	int			nbatch;
-	int			i;
 	MemoryContext oldcxt;
 	long		ninmemory;
 	long		nfreed;
 
+	HashChunk	oldchunks = hashtable->chunks_batch;
+
 	/* do nothing if we've decided to shut off growth */
 	if (!hashtable->growEnabled)
 		return;
@@ -612,51 +683,81 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	 */
 	ninmemory = nfreed = 0;
 
-	for (i = 0; i < hashtable->nbuckets; i++)
-	{
-		HashJoinTuple prevtuple;
-		HashJoinTuple tuple;
+	/* we're going to scan through the chunks directly, not buckets, so we can reset the
+	 * buckets and reinsert all the tuples there - just set them to NULL */
+	memset(hashtable->buckets, 0, sizeof(HashJoinTuple) * hashtable->nbuckets);
 
-		prevtuple = NULL;
-		tuple = hashtable->buckets[i];
+	/* if we need to do a resize of buckets, we can do it right during the rebatch */
+	if (hashtable->nbuckets_optimal != hashtable->nbuckets) {
 
-		while (tuple != NULL)
-		{
-			/* save link in case we delete */
-			HashJoinTuple nexttuple = tuple->next;
-			int			bucketno;
-			int			batchno;
+		hashtable->nbuckets = hashtable->nbuckets_optimal;
+		hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal;
+
+		hashtable->buckets = repalloc(hashtable->buckets,
+									  sizeof(HashJoinTuple) * hashtable->nbuckets);
+
+	}
+
+	/* reset the chunks (we have a copy in oldchunks) - this way we'll allocate */
+	hashtable->chunks_batch = NULL;
+
+	/* so, let's scan through the old chunks, and all tuples in each chunk */
+	while (oldchunks != NULL) {
+
+		/* pointer to the next memory chunk (may be NULL for the last chunk) */
+		HashChunk nextchunk = oldchunks->next;
+
+		/* position within the buffer (up to oldchunks->used) */
+		size_t idx = 0;
+
+		/* process all tuples stored in this chunk (and then free it) */
+		while (idx < oldchunks->used) {
+
+			/* get the hashjoin tuple and mintuple stored right after it */
+			HashJoinTuple hashTuple = (HashJoinTuple)(oldchunks->data + idx);
+			MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
+
+			int		hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
+
+			int		bucketno;
+			int		batchno;
 
 			ninmemory++;
-			ExecHashGetBucketAndBatch(hashtable, tuple->hashvalue,
+			ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
 									  &bucketno, &batchno);
-			Assert(bucketno == i);
+
 			if (batchno == curbatch)
 			{
-				/* keep tuple */
-				prevtuple = tuple;
+				/* keep tuple - this means we need to copy it into the new chunks */
+				HashJoinTuple copyTuple = (HashJoinTuple) chunk_alloc(hashtable, hashTupleSize);
+				memcpy(copyTuple, hashTuple, hashTupleSize);
+
+				/* and of course add it to the new buckets - just push the copy it onto the front
+				 * of the bucket's list */
+				copyTuple->next = hashtable->buckets[bucketno];
+				hashtable->buckets[bucketno] = copyTuple;
 			}
 			else
 			{
 				/* dump it out */
 				Assert(batchno > curbatch);
-				ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(tuple),
-									  tuple->hashvalue,
+				ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
+									  hashTuple->hashvalue,
 									  &hashtable->innerBatchFile[batchno]);
-				/* and remove from hash table */
-				if (prevtuple)
-					prevtuple->next = nexttuple;
-				else
-					hashtable->buckets[i] = nexttuple;
-				/* prevtuple doesn't change */
-				hashtable->spaceUsed -=
-					HJTUPLE_OVERHEAD + HJTUPLE_MINTUPLE(tuple)->t_len;
-				pfree(tuple);
+
+				hashtable->spaceUsed -= hashTupleSize;
 				nfreed++;
 			}
 
-			tuple = nexttuple;
+			/* bytes occupied in memory HJ tuple overhead + actual tuple length */
+			idx += hashTupleSize;
+
 		}
+
+		/* so, we're done with this chunk - free it and proceed to the next one */
+		pfree(oldchunks);
+		oldchunks = nextchunk;
+
 	}
 
 #ifdef HJDEBUG
@@ -682,6 +783,106 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 }
 
 /*
+ * ExecHashIncreaseNumBuckets
+ *		increase the original number of buckets in order to reduce
+ *		number of tuples per bucket
+ */
+static void
+ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
+{
+	HashChunk	chunk;
+
+	/* do nothing if not an increase (it's called increase for a reason) */
+	if (hashtable->nbuckets >= hashtable->nbuckets_optimal)
+		return;
+
+	/*
+	 * We already know the optimal number of buckets, so let's just
+	 * compute the log2_nbuckets for it.
+	 */
+	hashtable->nbuckets = hashtable->nbuckets_optimal;
+	hashtable->log2_nbuckets = my_log2(hashtable->nbuckets_optimal);
+
+	/* XXX Not sure if we should update the info about used space here.
+	 * The code seems to ignore the space used for 'buckets' and we're not
+	 * allocating more space for tuples (just shuffling them to the new
+	 * buckets). And the amount of memory used for buckets is quite small
+	 * (just an array of pointers, thus ~8kB per 1k buckets on 64-bit). */
+
+	/* XXX Should we disable growth if (nbuckets * NTUP_PER_BUCKET)
+	 * reaches work_mem (or something like that)? We shouldn't really
+	 * get into such position (should be handled by increasing the
+	 * number of batches, which is called right before this). */
+
+	/* XXX Maybe adding info into hashjoin explain output (e.g. initial
+	 * nbuckets, time spent growing the table) would be appropriate. */
+
+	Assert(hashtable->nbuckets > 1);
+	Assert(hashtable->nbuckets <= (INT_MAX/2));
+	Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets));
+
+#ifdef HJDEBUG
+	printf("Increasing nbuckets to %d\n", hashtable->nbuckets);
+#endif
+
+	/* Just reallocate the proper number of buckets - we don't need to
+	 * walk through them - we can walk the dense-allocated chunks
+	 * (just like in ExecHashIncreaseNumBatches, but without all the
+	 * copying into new chunks)
+	 */
+
+	/* just resize the existing bucket list to the right size */
+	hashtable->buckets = (HashJoinTuple *) repalloc(hashtable->buckets,
+								hashtable->nbuckets * sizeof(HashJoinTuple));
+
+	/* XXX Maybe not needed? */
+	memset(hashtable->buckets, 0, sizeof(void*) * hashtable->nbuckets);
+
+	/* so, let's scan through the chunks, and all tuples in each chunk */
+	chunk = hashtable->chunks_batch;
+	while (chunk != NULL) {
+
+		/* position within the buffer (up to chunk->used) */
+		size_t idx = 0;
+
+		/* process all tuples stored in this chunk (and then free it) */
+		while (idx < chunk->used) {
+
+			/* get the hashjoin tuple and mintuple stored right after it */
+			HashJoinTuple hashTuple = (HashJoinTuple)(chunk->data + idx);
+			MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
+
+			int		hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
+
+			int		bucketno;
+			int		batchno;
+
+			ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
+									  &bucketno, &batchno);
+
+			/* just add the tuple to the proper bucket */
+			hashTuple->next = hashtable->buckets[bucketno];
+			hashtable->buckets[bucketno] = hashTuple;
+
+			/* bytes occupied in memory HJ tuple overhead + actual tuple length */
+			idx += hashTupleSize;
+
+		}
+
+		/* proceed to the next chunk */
+		chunk = chunk->next;
+
+	}
+
+#ifdef HJDEBUG
+	printf("Nbuckets increased to %d, average items per bucket %.1f\n",
+		   hashtable->nbuckets, batchTuples / hashtable->nbuckets);
+#endif
+
+}
+
+
+/*
  * ExecHashTableInsert
  *		insert a tuple into the hash table depending on the hash value
  *		it may just go to a temp file for later batches
@@ -717,8 +918,8 @@ ExecHashTableInsert(HashJoinTable hashtable,
 
 		/* Create the HashJoinTuple */
 		hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
-		hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
-													   hashTupleSize);
+		hashTuple = (HashJoinTuple) chunk_alloc(hashtable, hashTupleSize);
+
 		hashTuple->hashvalue = hashvalue;
 		memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
 
@@ -734,12 +935,30 @@ ExecHashTableInsert(HashJoinTable hashtable,
 		hashTuple->next = hashtable->buckets[bucketno];
 		hashtable->buckets[bucketno] = hashTuple;
 
+		/* Increase the (optimal) number of buckets if we just exceeded NTUP_PER_BUCKET,
+		 * but only when there's still a single batch at this moment. */
+		if ((hashtable->nbatch == 1) &&
+			((hashtable->totalTuples - hashtable->skewTuples)
+				>= (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))) {
+
+			/* overflow protection */
+			if (hashtable->nbuckets_optimal <= (INT_MAX/2)) {
+				hashtable->nbuckets_optimal *= 2;
+				hashtable->log2_nbuckets_optimal += 1;
+			}
+		}
+
 		/* Account for space used, and back off if we've used too much */
 		hashtable->spaceUsed += hashTupleSize;
 		if (hashtable->spaceUsed > hashtable->spacePeak)
 			hashtable->spacePeak = hashtable->spaceUsed;
-		if (hashtable->spaceUsed > hashtable->spaceAllowed)
+
+		/* Do we need to increase number of batches? Account for space required
+		 * by buckets (optimal number). */
+		if (hashtable->spaceUsed + BUCKETS_SPACE(hashtable) > hashtable->spaceAllowed) {
 			ExecHashIncreaseNumBatches(hashtable);
+		}
+
 	}
 	else
 	{
@@ -862,7 +1081,10 @@ ExecHashGetHashValue(HashJoinTable hashtable,
  * functions are good about randomizing all their output bits, else we are
  * likely to have very skewed bucket or batch occupancy.)
  *
- * nbuckets doesn't change over the course of the join.
+ * The nbuckets/log2_nbuckets may change until (nbatch>1) because of dynamic
+ * buckets growth. Once we start batching, the value is fixed and does not
+ * change over the course of join (making it possible to compute batchno
+ * the way we do here).
  *
  * nbatch is always a power of 2; we increase it only by doubling it.  This
  * effectively adds one more bit to the top of the batchno.
@@ -1068,6 +1290,11 @@ ExecHashTableReset(HashJoinTable hashtable)
 	hashtable->spaceUsed = 0;
 
 	MemoryContextSwitchTo(oldcxt);
+
+	/* reset the chunks too (the memory was allocated within batchCxt, so it's
+	 * already freed by resetting the batch context -just set it to NULL) */
+	hashtable->chunks_batch = NULL;
+
 }
 
 /*
@@ -1318,6 +1545,61 @@ ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
 	return INVALID_SKEW_BUCKET_NO;
 }
 
+static
+char * chunk_alloc(HashJoinTable hashtable, int size) {
+
+	/* maybe we should use MAXALIGN(size) here ... */
+
+	/* if tuple size is larger than of 1/8 of chunk size, allocate a separate chunk */
+	if (size > (HASH_CHUNK_SIZE/8)) {
+
+		/* allocate new chunk and put it at the beginning of the list */
+		HashChunk newChunk = (HashChunk)MemoryContextAlloc(hashtable->batchCxt, offsetof(HashChunkData, data) + size);
+
+		newChunk->maxlen = size;
+		newChunk->used = 0;
+		newChunk->ntuples = 0;
+
+		/* If there already is a chunk, add if after it so we can still use the space in it */
+		if (hashtable->chunks_batch != NULL) {
+			newChunk->next = hashtable->chunks_batch->next;
+			hashtable->chunks_batch->next = newChunk;
+		} else {
+			newChunk->next = hashtable->chunks_batch;
+			hashtable->chunks_batch = newChunk;
+		}
+
+		newChunk->used += size;
+		newChunk->ntuples += 1;
+
+		return newChunk->data;
+
+	}
+
+	/* ok, it's within chunk size, let's see if we have enough space for it in the current
+	 * chunk => if not, allocate a new chunk (chunks==NULL means this is the first chunk) */
+	if ((hashtable->chunks_batch == NULL) || (hashtable->chunks_batch->maxlen - hashtable->chunks_batch->used) < size) {
+
+		/* allocate new chunk and put it at the beginning of the list */
+		HashChunk newChunk = (HashChunk)MemoryContextAlloc(hashtable->batchCxt, offsetof(HashChunkData, data) + HASH_CHUNK_SIZE);
+
+		newChunk->maxlen = HASH_CHUNK_SIZE;
+		newChunk->used = 0;
+		newChunk->ntuples = 0;
+
+		newChunk->next = hashtable->chunks_batch;
+		hashtable->chunks_batch = newChunk;
+	}
+
+	/* OK, we have enough space in the chunk, let's add the tuple */
+	hashtable->chunks_batch->used += size;
+	hashtable->chunks_batch->ntuples += 1;
+
+	/* allocate pointer to the start of the tuple memory */
+	return hashtable->chunks_batch->data + (hashtable->chunks_batch->used - size);
+
+}
+
 /*
  * ExecHashSkewTableInsert
  *
@@ -1340,6 +1622,7 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
 	hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
 	hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
 												   hashTupleSize);
+
 	hashTuple->hashvalue = hashvalue;
 	memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
 	HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 3beae40..75f86c9 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -102,12 +102,32 @@ typedef struct HashSkewBucket
 #define SKEW_WORK_MEM_PERCENT  2
 #define SKEW_MIN_OUTER_FRACTION  0.01
 
+#define HASH_CHUNK_SIZE			(16*1024L)	/* 16kB chunks by default */
+#define HASH_SKEW_CHUNK_SIZE	(4*1024L)	/* 4kB chunks for skew buckets */
+
+typedef struct HashChunkData
+{
+	int			ntuples;	/* number of tuples stored in this chunk */
+	size_t		maxlen;		/* length of the buffer */
+	size_t		used;		/* number of chunk bytes already used */
+
+	struct HashChunkData   *next;	/* pointer to the next chunk (linked list) */
+
+	char		data[1];	/* buffer allocated at the end */
+} HashChunkData;
+
+typedef struct HashChunkData* HashChunk;
+
 
 typedef struct HashJoinTableData
 {
 	int			nbuckets;		/* # buckets in the in-memory hash table */
 	int			log2_nbuckets;	/* its log2 (nbuckets must be a power of 2) */
 
+	int			nbuckets_original;	/* # buckets when starting the first hash */
+	int			nbuckets_optimal;	/* optimal # buckets (per batch) */
+	int			log2_nbuckets_optimal;	/* same as log2_nbuckets optimal */
+
 	/* buckets[i] is head of list of tuples in i'th in-memory bucket */
 	struct HashJoinTupleData **buckets;
 	/* buckets array is per-batch storage, as are all the tuples */
@@ -129,6 +149,7 @@ typedef struct HashJoinTableData
 	bool		growEnabled;	/* flag to shut off nbatch increases */
 
 	double		totalTuples;	/* # tuples obtained from inner plan */
+	double		skewTuples;		/* # tuples inserted into skew tuples */
 
 	/*
 	 * These arrays are allocated for the life of the hash join, but only if
@@ -157,6 +178,10 @@ typedef struct HashJoinTableData
 
 	MemoryContext hashCxt;		/* context for whole-hash-join storage */
 	MemoryContext batchCxt;		/* context for this-batch-only storage */
+
+	/* used for dense allocation of tuples (into linked chunks) */
+	HashChunk	chunks_batch;	/*  one list for the whole batch */
+
 }	HashJoinTableData;
 
 #endif   /* HASHJOIN_H */
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 781a736..ee3fffb 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -1900,18 +1900,20 @@ show_hash_info(HashState *hashstate, ExplainState *es)
 		if (es->format != EXPLAIN_FORMAT_TEXT)
 		{
 			ExplainPropertyLong("Hash Buckets", hashtable->nbuckets, es);
+			ExplainPropertyLong("Original Hash Buckets",
+								hashtable->nbuckets_original, es);
 			ExplainPropertyLong("Hash Batches", hashtable->nbatch, es);
 			ExplainPropertyLong("Original Hash Batches",
 								hashtable->nbatch_original, es);
 			ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es);
 		}
-		else if (hashtable->nbatch_original != hashtable->nbatch)
+		else if ((hashtable->nbatch_original != hashtable->nbatch) || (hashtable->nbuckets_original != hashtable->nbuckets))
 		{
 			appendStringInfoSpaces(es->str, es->indent * 2);
 			appendStringInfo(es->str,
-			"Buckets: %d  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
-							 hashtable->nbuckets, hashtable->nbatch,
-							 hashtable->nbatch_original, spacePeakKb);
+			"Buckets: %d (originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
+							 hashtable->nbuckets, hashtable->nbuckets_original,
+							 hashtable->nbatch, hashtable->nbatch_original, spacePeakKb);
 		}
 		else
 		{
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6a9d956..a5d4812 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -37,8 +37,8 @@
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
 
-
 static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
+static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable);
 static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node,
 					  int mcvsToUse);
 static void ExecHashSkewTableInsert(HashJoinTable hashtable,
@@ -49,6 +49,9 @@ static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
 
 static char * chunk_alloc(HashJoinTable hashtable, int tupleSize);
 
+/* Memory needed for optimal number of buckets. */
+#define BUCKETS_SPACE(htab)	((htab)->nbuckets_optimal * sizeof(HashJoinTuple))
+
 /* ----------------------------------------------------------------
  *		ExecHash
  *
@@ -117,6 +120,7 @@ MultiExecHash(HashState *node)
 				/* It's a skew tuple, so put it into that hash table */
 				ExecHashSkewTableInsert(hashtable, slot, hashvalue,
 										bucketNumber);
+				hashtable->skewTuples += 1;
 			}
 			else
 			{
@@ -127,6 +131,26 @@ MultiExecHash(HashState *node)
 		}
 	}
 
+	/* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
+	if (hashtable->nbuckets != hashtable->nbuckets_optimal) {
+
+		/* We never decrease the number of buckets. */
+		Assert(hashtable->nbuckets_optimal > hashtable->nbuckets);
+
+#ifdef HJDEBUG
+		printf("Increasing nbuckets %d => %d\n",
+			   hashtable->nbuckets, hashtable->nbuckets_optimal);
+#endif
+
+		ExecHashIncreaseNumBuckets(hashtable);
+
+	}
+
+	/* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+	hashtable->spaceUsed += BUCKETS_SPACE(hashtable);
+	if (hashtable->spaceUsed > hashtable->spacePeak)
+			hashtable->spacePeak = hashtable->spaceUsed;
+
 	/* must provide our own instrumentation support */
 	if (node->ps.instrument)
 		InstrStopNode(node->ps.instrument, hashtable->totalTuples);
@@ -271,7 +295,10 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
 	 */
 	hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));
 	hashtable->nbuckets = nbuckets;
+	hashtable->nbuckets_original = nbuckets;
+	hashtable->nbuckets_optimal = nbuckets;
 	hashtable->log2_nbuckets = log2_nbuckets;
+	hashtable->log2_nbuckets_optimal = log2_nbuckets;
 	hashtable->buckets = NULL;
 	hashtable->keepNulls = keepNulls;
 	hashtable->skewEnabled = false;
@@ -285,6 +312,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
 	hashtable->nbatch_outstart = nbatch;
 	hashtable->growEnabled = true;
 	hashtable->totalTuples = 0;
+	hashtable->skewTuples = 0;
 	hashtable->innerBatchFile = NULL;
 	hashtable->outerBatchFile = NULL;
 	hashtable->spaceUsed = 0;
@@ -377,7 +405,6 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
 	return hashtable;
 }
 
-
 /*
  * Compute appropriate size for hashtable given the estimated size of the
  * relation to be hashed (number of rows and average row width).
@@ -386,7 +413,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
  */
 
 /* Target bucket loading (tuples per bucket) */
-#define NTUP_PER_BUCKET			10
+#define NTUP_PER_BUCKET			1
 
 void
 ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
@@ -396,6 +423,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 {
 	int			tupsize;
 	double		inner_rel_bytes;
+	double		buckets_bytes;
 	long		hash_table_bytes;
 	long		skew_table_bytes;
 	long		max_pointers;
@@ -418,6 +446,13 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 	inner_rel_bytes = ntuples * tupsize;
 
 	/*
+	 * Estimate memory needed for buckets, assuming all the tuples fit into
+	 * a single batch (consider NTUP_PER_BUCKET tuples per bucket) - buckets
+	 * are just usual 'HashJoinTuple' (pointers to HashJoinTupleData).
+	 */
+	buckets_bytes = sizeof(HashJoinTuple) * my_log2(ntuples / NTUP_PER_BUCKET);
+
+	/*
 	 * Target in-memory hashtable size is work_mem kilobytes.
 	 */
 	hash_table_bytes = work_mem * 1024L;
@@ -468,23 +503,56 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 	/* also ensure we avoid integer overflow in nbatch and nbuckets */
 	max_pointers = Min(max_pointers, INT_MAX / 2);
 
-	if (inner_rel_bytes > hash_table_bytes)
+	if ((inner_rel_bytes + buckets_bytes) > hash_table_bytes)
 	{
 		/* We'll need multiple batches */
 		long		lbuckets;
 		double		dbatch;
 		int			minbatch;
-
-		lbuckets = (hash_table_bytes / tupsize) / NTUP_PER_BUCKET;
-		lbuckets = Min(lbuckets, max_pointers);
-		nbuckets = (int) lbuckets;
+		double		batch_bytes;
 
 		dbatch = ceil(inner_rel_bytes / hash_table_bytes);
 		dbatch = Min(dbatch, max_pointers);
 		minbatch = (int) dbatch;
 		nbatch = 2;
+
+		/* increase nbatch until the hashtable fits into work_mem */
 		while (nbatch < minbatch)
 			nbatch <<= 1;
+
+		/* assuming equally-sized batches, compute bytes of hash table */
+		batch_bytes = inner_rel_bytes / nbatch;
+
+		/* when batching, for the buckets, assume full work_mem */
+		lbuckets = 1 << my_log2(hash_table_bytes / (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple)));
+		buckets_bytes = lbuckets * sizeof(HashJoinTuple);
+
+		/*
+		 * XXX the while loop should not really repeat more than once, or maybe twice,
+		 * becase at this point (batch_bytes < work_mem) and it only may not fit
+		 * if (batch_bytes + buckets_bytes > work_mem). But the tuples are generally
+		 * much larger than buckets (which are just pointers).
+		 *
+		 * Adding another batch reduces the _expected_ number of tuples in the batch,
+		 * however the buckets are sized for full work_mem utilization (e.g. because
+		 * of underestimate, which is a common problem). So the while loop only tweak
+		 * batch_bytes, not the number of buckets (or buckets_bytes).
+		 */
+
+		/* we need to get both tuples and buckets into work_mem */
+		while (batch_bytes + buckets_bytes > hash_table_bytes) {
+
+			/* increment number of batches, and re-evaluate the amount of memory */
+			nbatch <<= 1;
+
+			/* assuming equally-sized batches, compute bytes of hash table and buckets */
+			batch_bytes = inner_rel_bytes / nbatch;
+
+		}
+
+		/* protect against nbucket overflow */
+		lbuckets = Min(lbuckets, max_pointers);
+		nbuckets = (int) lbuckets;
 	}
 	else
 	{
@@ -561,6 +629,7 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	MemoryContext oldcxt;
 	long		ninmemory;
 	long		nfreed;
+
 	HashChunk	oldchunks = hashtable->chunks_batch;
 
 	/* do nothing if we've decided to shut off growth */
@@ -616,7 +685,18 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 
 	/* we're going to scan through the chunks directly, not buckets, so we can reset the
 	 * buckets and reinsert all the tuples there - just set them to NULL */
-	memset(hashtable->buckets, 0, sizeof(void*) * hashtable->nbuckets);
+	memset(hashtable->buckets, 0, sizeof(HashJoinTuple) * hashtable->nbuckets);
+
+	/* if we need to do a resize of buckets, we can do it right during the rebatch */
+	if (hashtable->nbuckets_optimal != hashtable->nbuckets) {
+
+		hashtable->nbuckets = hashtable->nbuckets_optimal;
+		hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal;
+
+		hashtable->buckets = repalloc(hashtable->buckets,
+									  sizeof(HashJoinTuple) * hashtable->nbuckets);
+
+	}
 
 	/* reset the chunks (we have a copy in oldchunks) - this way we'll allocate */
 	hashtable->chunks_batch = NULL;
@@ -628,7 +708,7 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 		HashChunk nextchunk = oldchunks->next;
 
 		/* position within the buffer (up to oldchunks->used) */
-		size_t idx = 0;	
+		size_t idx = 0;
 
 		/* process all tuples stored in this chunk (and then free it) */
 		while (idx < oldchunks->used) {
@@ -703,6 +783,106 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 }
 
 /*
+ * ExecHashIncreaseNumBuckets
+ *		increase the original number of buckets in order to reduce
+ *		number of tuples per bucket
+ */
+static void
+ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
+{
+	HashChunk	chunk;
+
+	/* do nothing if not an increase (it's called increase for a reason) */
+	if (hashtable->nbuckets >= hashtable->nbuckets_optimal)
+		return;
+
+	/*
+	 * We already know the optimal number of buckets, so let's just
+	 * compute the log2_nbuckets for it.
+	 */
+	hashtable->nbuckets = hashtable->nbuckets_optimal;
+	hashtable->log2_nbuckets = my_log2(hashtable->nbuckets_optimal);
+
+	/* XXX Not sure if we should update the info about used space here.
+	 * The code seems to ignore the space used for 'buckets' and we're not
+	 * allocating more space for tuples (just shuffling them to the new
+	 * buckets). And the amount of memory used for buckets is quite small
+	 * (just an array of pointers, thus ~8kB per 1k buckets on 64-bit). */
+
+	/* XXX Should we disable growth if (nbuckets * NTUP_PER_BUCKET)
+	 * reaches work_mem (or something like that)? We shouldn't really
+	 * get into such position (should be handled by increasing the
+	 * number of batches, which is called right before this). */
+
+	/* XXX Maybe adding info into hashjoin explain output (e.g. initial
+	 * nbuckets, time spent growing the table) would be appropriate. */
+
+	Assert(hashtable->nbuckets > 1);
+	Assert(hashtable->nbuckets <= (INT_MAX/2));
+	Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets));
+
+#ifdef HJDEBUG
+	printf("Increasing nbuckets to %d\n", hashtable->nbuckets);
+#endif
+
+	/* Just reallocate the proper number of buckets - we don't need to
+	 * walk through them - we can walk the dense-allocated chunks
+	 * (just like in ExecHashIncreaseNumBatches, but without all the
+	 * copying into new chunks)
+	 */
+
+	/* just resize the existing bucket list to the right size */
+	hashtable->buckets = (HashJoinTuple *) repalloc(hashtable->buckets,
+								hashtable->nbuckets * sizeof(HashJoinTuple));
+
+	/* XXX Maybe not needed? */
+	memset(hashtable->buckets, 0, sizeof(void*) * hashtable->nbuckets);
+
+	/* so, let's scan through the chunks, and all tuples in each chunk */
+	chunk = hashtable->chunks_batch;
+	while (chunk != NULL) {
+
+		/* position within the buffer (up to chunk->used) */
+		size_t idx = 0;
+
+		/* process all tuples stored in this chunk (and then free it) */
+		while (idx < chunk->used) {
+
+			/* get the hashjoin tuple and mintuple stored right after it */
+			HashJoinTuple hashTuple = (HashJoinTuple)(chunk->data + idx);
+			MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
+
+			int		hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
+
+			int		bucketno;
+			int		batchno;
+
+			ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
+									  &bucketno, &batchno);
+
+			/* just add the tuple to the proper bucket */
+			hashTuple->next = hashtable->buckets[bucketno];
+			hashtable->buckets[bucketno] = hashTuple;
+
+			/* bytes occupied in memory HJ tuple overhead + actual tuple length */
+			idx += hashTupleSize;
+
+		}
+
+		/* proceed to the next chunk */
+		chunk = chunk->next;
+
+	}
+
+#ifdef HJDEBUG
+	printf("Nbuckets increased to %d, average items per bucket %.1f\n",
+		   hashtable->nbuckets, batchTuples / hashtable->nbuckets);
+#endif
+
+}
+
+
+/*
  * ExecHashTableInsert
  *		insert a tuple into the hash table depending on the hash value
  *		it may just go to a temp file for later batches
@@ -755,12 +935,30 @@ ExecHashTableInsert(HashJoinTable hashtable,
 		hashTuple->next = hashtable->buckets[bucketno];
 		hashtable->buckets[bucketno] = hashTuple;
 
+		/* Increase the (optimal) number of buckets if we just exceeded NTUP_PER_BUCKET,
+		 * but only when there's still a single batch at this moment. */
+		if ((hashtable->nbatch == 1) &&
+			((hashtable->totalTuples - hashtable->skewTuples)
+				>= (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))) {
+
+			/* overflow protection */
+			if (hashtable->nbuckets_optimal <= (INT_MAX/2)) {
+				hashtable->nbuckets_optimal *= 2;
+				hashtable->log2_nbuckets_optimal += 1;
+			}
+		}
+
 		/* Account for space used, and back off if we've used too much */
 		hashtable->spaceUsed += hashTupleSize;
 		if (hashtable->spaceUsed > hashtable->spacePeak)
 			hashtable->spacePeak = hashtable->spaceUsed;
-		if (hashtable->spaceUsed > hashtable->spaceAllowed)
+
+		/* Do we need to increase number of batches? Account for space required
+		 * by buckets (optimal number). */
+		if (hashtable->spaceUsed + BUCKETS_SPACE(hashtable) > hashtable->spaceAllowed) {
 			ExecHashIncreaseNumBatches(hashtable);
+		}
+
 	}
 	else
 	{
@@ -883,7 +1081,10 @@ ExecHashGetHashValue(HashJoinTable hashtable,
  * functions are good about randomizing all their output bits, else we are
  * likely to have very skewed bucket or batch occupancy.)
  *
- * nbuckets doesn't change over the course of the join.
+ * The nbuckets/log2_nbuckets may change until (nbatch>1) because of dynamic
+ * buckets growth. Once we start batching, the value is fixed and does not
+ * change over the course of join (making it possible to compute batchno
+ * the way we do here).
  *
  * nbatch is always a power of 2; we increase it only by doubling it.  This
  * effectively adds one more bit to the top of the batchno.
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index c80a4d1..75f86c9 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -124,6 +124,10 @@ typedef struct HashJoinTableData
 	int			nbuckets;		/* # buckets in the in-memory hash table */
 	int			log2_nbuckets;	/* its log2 (nbuckets must be a power of 2) */
 
+	int			nbuckets_original;	/* # buckets when starting the first hash */
+	int			nbuckets_optimal;	/* optimal # buckets (per batch) */
+	int			log2_nbuckets_optimal;	/* same as log2_nbuckets optimal */
+
 	/* buckets[i] is head of list of tuples in i'th in-memory bucket */
 	struct HashJoinTupleData **buckets;
 	/* buckets array is per-batch storage, as are all the tuples */
@@ -145,6 +149,7 @@ typedef struct HashJoinTableData
 	bool		growEnabled;	/* flag to shut off nbatch increases */
 
 	double		totalTuples;	/* # tuples obtained from inner plan */
+	double		skewTuples;		/* # tuples inserted into skew tuples */
 
 	/*
 	 * These arrays are allocated for the life of the hash join, but only if
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to