On 19.8.2014 19:05, Robert Haas wrote:
> On Sat, Aug 16, 2014 at 9:31 AM, Tomas Vondra <t...@fuzzy.cz> wrote:
>> On 12.8.2014 00:30, Tomas Vondra wrote:
>>> On 11.8.2014 20:25, Robert Haas wrote:
>>>> It also strikes me that when there's only 1 batch, the set of bits
>>>> that map onto the batch number is zero-width, and one zero-width bit
>>>> range is as good as another.  In other words, if you're only planning
>>>> to do one batch, you can easily grow the number of buckets on the fly.
>>>> Growing the number of buckets only becomes difficult once you have
>>>> more than one batch.
>>
>> ...
>>
>>> I was considering using reversing the bits of the hash, because that's
>>> pretty much the simplest solution. But I think you're right it might
>>> actually work like this:
>>>
>>>   * Are more batches needed?
>>>
>>>       (yes) => just use nbuckets = my_log2(work_mem / tuple_size)
>>>
>>>       (no) => go ahead, until processing all tuples or hitting work_mem
>>>
>>>               (work_mem) => meh, use the same nbuckets above
>>>
>>>               (all tuples) => compute optimal nbuckets / resize
>>>
>>>
>>> But I need to think about this a bit. So far it seems to me there's no
>>> way additional batches might benefit from increasing nbuckets further.
>>
>> I think this is a simple and solid solution, solving the batchno
>> computation issues quite nicely. Attached is v10 patch (bare and
>> combined with the dense allocation), that does this:
>>
>> 1) when we know we'll need batching, buckets are sized for full work_mem
>>    (using the estimated tuple width, etc.)
>>
>> 2) without the batching, we estimate the 'right number of buckets' for
>>    the estimated number of tuples, and keep track of the optimal number
>>    as tuples are added to the hash table
>>
>>    - if we discover we need to start batching, we keep the current
>>      optimal value (which should be the same as the max number of
>>      buckets) and don't mess with it anymore (making it possible to
>>      compute batch IDs just like before)
>>
>>    - also, on the fist rebatch (nbatch=1 => nbatch=2) the hash table
>>      is resized as part of the rebatch
>>
>>    - if the hash build completes without batching, we do the resize
>>
>> I believe the patch is pretty much perfect. I plan to do more thorough
>> testing on a wide range of queries in the next few days.
>>
>> I also removed the 'enable_hash_resize' GUC, because it would be more
>> complex to implement this properly after doing the resize as part of
>> rebatch etc.. So either it would make the patch more complex, or it
>> wouldn't do what the name promises.
> 
> A variety of trivial comments on this:
> 
> PostgreSQL style is un-cuddled curly braces.  Also, multi-line
> comments need to start with a line containing only /* and end with a
> line containing only */.  In one place you've added curly braces
> around a single-line block that is otherwise unmodified; please don't
> do that.  In one place, you have "becase" instead of "because".  In
> another place, you write "add if after it" but it should say "add it
> after it" or maybe better "add the new one after it".  Avoid using
> punctuation like "=>" in comments to illustrate the connection between
> sentences; instead, use a connecting word like "then" or "therefore"
> or whatever is appropriate; in this instance, a period followed by the
> start of a new sentence seems sufficient.  Revert the removal of a
> single line of whitespace near the top of nodeHash.c.
> 
> There are too many things marked XXX in this patch.  They should
> either be fixed, if they are real problems, or they should be
> commented in a way that doesn't give rise to the idea that they're
> problems if they aren't.

OK, thanks for pointing this out. Attached is v11 of the patch (both
separate and combined with the dense allocation, as before).

I fixed as many of those issues as possible. All the XXX items were
obsolete, except for one in the chunk_alloc function.

I have also removed one constant

> 
> OK, now on to some more substantive stuff:
> 
> 1. It's not clear to me what the overall effect of this patch on
> memory utilization is.  Reducing NTUP_PER_BUCKET from 10 to 1 is going
> to use, on the average, 10x as much bucket-header memory per tuple.
> Specifically, I think it means we'll use about 8 bytes of
> bucket-header memory per tuple instead of 0.8 bytes per tuple.  If the
> tuples are narrow, that could be significant; concerns have been
> expressed about that here in the past.  Increasing the number of
> buckets could also increase memory usage.  On the other hand, the
> dense allocation stuff probably saves a ton of memory, so maybe we end
> up overall, but I'm not sure.  Your thoughts, and maybe some test
> results with narrow and wide tuples, would be appreciated.

The effect of the dense allocation was briefly discussed in this thread,
along with some quick measurements:

http://www.postgresql.org/message-id/53beea9e.2080...@fuzzy.cz

The dense allocation removes pretty much all the palloc overhead. For a
40B tuple, I did get this before the dense allocation

   HashBatchContext: 1451221040 total in 182 blocks; 2826592 free (11
chunks); 1448394448 used

and this with the dense allocation patch applied

   HashBatchContext: 729651520 total in 21980 blocks; 0 free (0 chunks);
729651520 used

So it's pretty much 50% reduction in memory consumption.

Now, the palloc header is >=16B, and removing this more than compensates
the increased number of buckets (in the worst case, there may be ~2x
buckets per tuple, which is 2x8B pointers).

I did a number of tests, and the results are quite consistent with this.


> 2. But, on the positive side, modulo the memory utilization questions
> mentioned above, I would expect the impact on hash join performance to
> be positive.  Going from 10 tuples per bucket to just 1 should help,
> and on cases where the actual load factor would have ended up much
> higher because of poor estimation, increasing the number of buckets on
> the fly should help even more.  I haven't tested this, though.

Yes, that's the results I was getting.

I've done a number of tests, and not a single one showed a slowdown so
far. Most well-estimated queries were 2-3x faster, and the poorly
estimated ones were orders of magnitude faster.

We're working on getting this fixed on a range of workloads, I'll post
some results once I have that.


> I haven't had a chance to completely go through this yet, so these are
> just some initial thoughts.

Thanks!

Tomas
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 781a736..d128e08 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -1900,18 +1900,21 @@ show_hash_info(HashState *hashstate, ExplainState *es)
 		if (es->format != EXPLAIN_FORMAT_TEXT)
 		{
 			ExplainPropertyLong("Hash Buckets", hashtable->nbuckets, es);
+			ExplainPropertyLong("Original Hash Buckets",
+								hashtable->nbuckets_original, es);
 			ExplainPropertyLong("Hash Batches", hashtable->nbatch, es);
 			ExplainPropertyLong("Original Hash Batches",
 								hashtable->nbatch_original, es);
 			ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es);
 		}
-		else if (hashtable->nbatch_original != hashtable->nbatch)
+		else if ((hashtable->nbatch_original != hashtable->nbatch) ||
+				 (hashtable->nbuckets_original != hashtable->nbuckets))
 		{
 			appendStringInfoSpaces(es->str, es->indent * 2);
 			appendStringInfo(es->str,
-			"Buckets: %d  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
-							 hashtable->nbuckets, hashtable->nbatch,
-							 hashtable->nbatch_original, spacePeakKb);
+			"Buckets: %d (originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
+							 hashtable->nbuckets, hashtable->nbuckets_original,
+							 hashtable->nbatch, hashtable->nbatch_original, spacePeakKb);
 		}
 		else
 		{
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6a9d956..688b6b4 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -37,8 +37,8 @@
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
 
-
 static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
+static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable);
 static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node,
 					  int mcvsToUse);
 static void ExecHashSkewTableInsert(HashJoinTable hashtable,
@@ -49,6 +49,9 @@ static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
 
 static char * chunk_alloc(HashJoinTable hashtable, int tupleSize);
 
+/* Memory needed for optimal number of buckets. */
+#define BUCKETS_SPACE(htab)	((htab)->nbuckets_optimal * sizeof(HashJoinTuple))
+
 /* ----------------------------------------------------------------
  *		ExecHash
  *
@@ -117,6 +120,7 @@ MultiExecHash(HashState *node)
 				/* It's a skew tuple, so put it into that hash table */
 				ExecHashSkewTableInsert(hashtable, slot, hashvalue,
 										bucketNumber);
+				hashtable->skewTuples += 1;
 			}
 			else
 			{
@@ -127,10 +131,31 @@ MultiExecHash(HashState *node)
 		}
 	}
 
+	/* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
+	if (hashtable->nbuckets != hashtable->nbuckets_optimal)
+	{
+
+		/* We never decrease the number of buckets. */
+		Assert(hashtable->nbuckets_optimal > hashtable->nbuckets);
+
+#ifdef HJDEBUG
+		printf("Increasing nbuckets %d => %d\n",
+			   hashtable->nbuckets, hashtable->nbuckets_optimal);
+#endif
+
+		ExecHashIncreaseNumBuckets(hashtable);
+
+	}
+
+	/* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+	hashtable->spaceUsed += BUCKETS_SPACE(hashtable);
+	if (hashtable->spaceUsed > hashtable->spacePeak)
+			hashtable->spacePeak = hashtable->spaceUsed;
+
 	/* must provide our own instrumentation support */
 	if (node->ps.instrument)
 		InstrStopNode(node->ps.instrument, hashtable->totalTuples);
-	
+
 	/*
 	 * We do not return the hash table directly because it's not a subtype of
 	 * Node, and so would violate the MultiExecProcNode API.  Instead, our
@@ -224,6 +249,7 @@ ExecEndHash(HashState *node)
 	ExecEndNode(outerPlan);
 }
 
+
 /* ----------------------------------------------------------------
  *		ExecHashTableCreate
  *
@@ -271,7 +297,10 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
 	 */
 	hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));
 	hashtable->nbuckets = nbuckets;
+	hashtable->nbuckets_original = nbuckets;
+	hashtable->nbuckets_optimal = nbuckets;
 	hashtable->log2_nbuckets = log2_nbuckets;
+	hashtable->log2_nbuckets_optimal = log2_nbuckets;
 	hashtable->buckets = NULL;
 	hashtable->keepNulls = keepNulls;
 	hashtable->skewEnabled = false;
@@ -285,6 +314,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
 	hashtable->nbatch_outstart = nbatch;
 	hashtable->growEnabled = true;
 	hashtable->totalTuples = 0;
+	hashtable->skewTuples = 0;
 	hashtable->innerBatchFile = NULL;
 	hashtable->outerBatchFile = NULL;
 	hashtable->spaceUsed = 0;
@@ -386,7 +416,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
  */
 
 /* Target bucket loading (tuples per bucket) */
-#define NTUP_PER_BUCKET			10
+#define NTUP_PER_BUCKET			1
 
 void
 ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
@@ -396,6 +426,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 {
 	int			tupsize;
 	double		inner_rel_bytes;
+	double		buckets_bytes;
 	long		hash_table_bytes;
 	long		skew_table_bytes;
 	long		max_pointers;
@@ -418,6 +449,13 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 	inner_rel_bytes = ntuples * tupsize;
 
 	/*
+	 * Estimate memory needed for buckets, assuming all the tuples fit into
+	 * a single batch (consider NTUP_PER_BUCKET tuples per bucket) - buckets
+	 * are just usual 'HashJoinTuple' (pointers to HashJoinTupleData).
+	 */
+	buckets_bytes = sizeof(HashJoinTuple) * my_log2(ntuples / NTUP_PER_BUCKET);
+
+	/*
 	 * Target in-memory hashtable size is work_mem kilobytes.
 	 */
 	hash_table_bytes = work_mem * 1024L;
@@ -468,16 +506,13 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 	/* also ensure we avoid integer overflow in nbatch and nbuckets */
 	max_pointers = Min(max_pointers, INT_MAX / 2);
 
-	if (inner_rel_bytes > hash_table_bytes)
+	if ((inner_rel_bytes + buckets_bytes) > hash_table_bytes)
 	{
 		/* We'll need multiple batches */
 		long		lbuckets;
 		double		dbatch;
 		int			minbatch;
-
-		lbuckets = (hash_table_bytes / tupsize) / NTUP_PER_BUCKET;
-		lbuckets = Min(lbuckets, max_pointers);
-		nbuckets = (int) lbuckets;
+		double		batch_bytes;
 
 		dbatch = ceil(inner_rel_bytes / hash_table_bytes);
 		dbatch = Min(dbatch, max_pointers);
@@ -485,6 +520,40 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 		nbatch = 2;
 		while (nbatch < minbatch)
 			nbatch <<= 1;
+
+		/* assuming equally-sized batches, compute bytes of hash table */
+		batch_bytes = inner_rel_bytes / nbatch;
+
+		/* when batching, for the buckets, assume full work_mem */
+		lbuckets = 1 << my_log2(hash_table_bytes / (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple)));
+		buckets_bytes = lbuckets * sizeof(HashJoinTuple);
+
+		/*
+		 * Increase the nbatch until we get both tuples and buckets into work_mem.
+		 * 
+		 * The loop should not execute more than once in most cases, becase tuples are
+		 * usually much wider than buckets (usually 8B pointers), so by using only
+		 * (batch_bytes/2) should get us below work_mem.
+		 *
+		 * The worst case is that (nbuckets == 2*ntuples-1), giving us about twice the
+		 * number of buckets, i.e. about 2*sizeof(void*) per tuple. But that's
+		 * the consequence of how NTUP_PER_BUCKET is chosen, and work_mem limit.
+		 */
+
+		while (batch_bytes + buckets_bytes > hash_table_bytes)
+		{
+
+			/* increment number of batches, and re-evaluate the amount of memory */
+			nbatch <<= 1;
+
+			/* assuming equally-sized batches, compute bytes of hash table and buckets */
+			batch_bytes = inner_rel_bytes / nbatch;
+
+		}
+
+		/* protect against nbucket overflow */
+		lbuckets = Min(lbuckets, max_pointers);
+		nbuckets = (int) lbuckets;
 	}
 	else
 	{
@@ -561,6 +630,7 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	MemoryContext oldcxt;
 	long		ninmemory;
 	long		nfreed;
+
 	HashChunk	oldchunks = hashtable->chunks_batch;
 
 	/* do nothing if we've decided to shut off growth */
@@ -614,24 +684,38 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	 */
 	ninmemory = nfreed = 0;
 
-	/* we're going to scan through the chunks directly, not buckets, so we can reset the
-	 * buckets and reinsert all the tuples there - just set them to NULL */
-	memset(hashtable->buckets, 0, sizeof(void*) * hashtable->nbuckets);
+	/* 
+	 * We will scan the tuples through chunks, not buckets, so we can reset the
+	 * buckets and reinsert all the tuples there. So resetting buckets is OK.
+	 */
+	memset(hashtable->buckets, 0, sizeof(HashJoinTuple) * hashtable->nbuckets);
+
+	/* If we need to do a resize of buckets, we can do it while rebatching. */
+	if (hashtable->nbuckets_optimal != hashtable->nbuckets)
+	{
+		hashtable->nbuckets = hashtable->nbuckets_optimal;
+		hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal;
 
-	/* reset the chunks (we have a copy in oldchunks) - this way we'll allocate */
+		hashtable->buckets = repalloc(hashtable->buckets,
+									  sizeof(HashJoinTuple) * hashtable->nbuckets);
+	}
+
+	/* Reset the chunks, so that we allocate everything into a new list of chunks. */
 	hashtable->chunks_batch = NULL;
 
-	/* so, let's scan through the old chunks, and all tuples in each chunk */
-	while (oldchunks != NULL) {
+	/* Scan through the old chunks, keep only tuples from the current batch. */
+	while (oldchunks != NULL)
+	{
 
 		/* pointer to the next memory chunk (may be NULL for the last chunk) */
 		HashChunk nextchunk = oldchunks->next;
 
-		/* position within the buffer (up to oldchunks->used) */
-		size_t idx = 0;	
+		/* position within the chunk (up to oldchunks->used) */
+		size_t idx = 0;
 
 		/* process all tuples stored in this chunk (and then free it) */
-		while (idx < oldchunks->used) {
+		while (idx < oldchunks->used)
+		{
 
 			/* get the hashjoin tuple and mintuple stored right after it */
 			HashJoinTuple hashTuple = (HashJoinTuple)(oldchunks->data + idx);
@@ -703,6 +787,93 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 }
 
 /*
+ * ExecHashIncreaseNumBuckets
+ *		increase the original number of buckets in order to reduce
+ *		number of tuples per bucket
+ */
+static void
+ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
+{
+	HashChunk	chunk;
+
+	/* do nothing if not an increase (it's called increase for a reason) */
+	if (hashtable->nbuckets >= hashtable->nbuckets_optimal)
+		return;
+
+	/*
+	 * We already know the optimal number of buckets, so let's just
+	 * compute the log2_nbuckets for it.
+	 */
+	hashtable->nbuckets = hashtable->nbuckets_optimal;
+	hashtable->log2_nbuckets = my_log2(hashtable->nbuckets_optimal);
+
+	Assert(hashtable->nbuckets > 1);
+	Assert(hashtable->nbuckets <= (INT_MAX/2));
+	Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets));
+
+#ifdef HJDEBUG
+	printf("Increasing nbuckets to %d\n", hashtable->nbuckets);
+#endif
+
+	/* 
+	 * Just reallocate the proper number of buckets - we don't need to
+	 * walk through them - we can walk the dense-allocated chunks
+	 * (just like in ExecHashIncreaseNumBatches, but without all the
+	 * copying into new chunks)
+	 */
+
+	hashtable->buckets = (HashJoinTuple *) repalloc(hashtable->buckets,
+								hashtable->nbuckets * sizeof(HashJoinTuple));
+
+	memset(hashtable->buckets, 0, sizeof(void*) * hashtable->nbuckets);
+
+	/* scan through all tuples in all chunks, rebuild the hash table */
+	chunk = hashtable->chunks_batch;
+	while (chunk != NULL)
+	{
+
+		/* position within the buffer (up to chunk->used) */
+		size_t idx = 0;
+
+		/* process all tuples stored in this chunk (and then free it) */
+		while (idx < chunk->used)
+		{
+
+			/* get the hashjoin tuple and mintuple stored right after it */
+			HashJoinTuple hashTuple = (HashJoinTuple)(chunk->data + idx);
+			MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
+
+			int		hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
+
+			int		bucketno;
+			int		batchno;
+
+			ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
+									  &bucketno, &batchno);
+
+			/* just add the tuple to the proper bucket */
+			hashTuple->next = hashtable->buckets[bucketno];
+			hashtable->buckets[bucketno] = hashTuple;
+
+			/* bytes occupied in memory HJ tuple overhead + actual tuple length */
+			idx += hashTupleSize;
+
+		}
+
+		/* proceed to the next chunk */
+		chunk = chunk->next;
+
+	}
+
+#ifdef HJDEBUG
+	printf("Nbuckets increased to %d, average items per bucket %.1f\n",
+		   hashtable->nbuckets, batchTuples / hashtable->nbuckets);
+#endif
+
+}
+
+
+/*
  * ExecHashTableInsert
  *		insert a tuple into the hash table depending on the hash value
  *		it may just go to a temp file for later batches
@@ -755,12 +926,34 @@ ExecHashTableInsert(HashJoinTable hashtable,
 		hashTuple->next = hashtable->buckets[bucketno];
 		hashtable->buckets[bucketno] = hashTuple;
 
+		/*
+		 * Increase the (optimal) number of buckets if we just exceeded NTUP_PER_BUCKET,
+		 * but only when there's still a single batch at this moment.
+		 */
+		if ((hashtable->nbatch == 1) &&
+			((hashtable->totalTuples - hashtable->skewTuples)
+				>= (hashtable->nbuckets_optimal * NTUP_PER_BUCKET)))
+		{
+			/* overflow protection */
+			if (hashtable->nbuckets_optimal <= (INT_MAX/2))
+			{
+				hashtable->nbuckets_optimal *= 2;
+				hashtable->log2_nbuckets_optimal += 1;
+			}
+		}
+
 		/* Account for space used, and back off if we've used too much */
 		hashtable->spaceUsed += hashTupleSize;
 		if (hashtable->spaceUsed > hashtable->spacePeak)
 			hashtable->spacePeak = hashtable->spaceUsed;
-		if (hashtable->spaceUsed > hashtable->spaceAllowed)
+
+		/*
+		 * Do we need to increase number of batches? Account for space required
+		 * by buckets (optimal number).
+		 */
+		if (hashtable->spaceUsed + BUCKETS_SPACE(hashtable) > hashtable->spaceAllowed)
 			ExecHashIncreaseNumBatches(hashtable);
+
 	}
 	else
 	{
@@ -883,7 +1076,10 @@ ExecHashGetHashValue(HashJoinTable hashtable,
  * functions are good about randomizing all their output bits, else we are
  * likely to have very skewed bucket or batch occupancy.)
  *
- * nbuckets doesn't change over the course of the join.
+ * The nbuckets/log2_nbuckets may change while (nbatch==1) because of dynamic
+ * buckets growth. Once we start batching, the value is fixed and does not
+ * change over the course of join (making it possible to compute batch number
+ * the way we do here).
  *
  * nbatch is always a power of 2; we increase it only by doubling it.  This
  * effectively adds one more bit to the top of the batchno.
@@ -1090,8 +1286,10 @@ ExecHashTableReset(HashJoinTable hashtable)
 
 	MemoryContextSwitchTo(oldcxt);
 
-	/* reset the chunks too (the memory was allocated within batchCxt, so it's
-	 * already freed by resetting the batch context -just set it to NULL) */
+	/*
+	 * Reset the chunks too (the memory was allocated within batchCxt, so it's
+	 * already freed by resetting the batch context -just set it to NULL).
+	 */
 	hashtable->chunks_batch = NULL;
 
 }
@@ -1347,23 +1545,32 @@ ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
 static
 char * chunk_alloc(HashJoinTable hashtable, int size) {
 
-	/* maybe we should use MAXALIGN(size) here ... */
+	/* XXX maybe we should use MAXALIGN(size) here ... */
 
-	/* if tuple size is larger than of 1/8 of chunk size, allocate a separate chunk */
-	if (size > (HASH_CHUNK_SIZE/8)) {
+	/* if requested size is larger than of 1/8 of chunk size, allocate a separate chunk */
+	if (size > (HASH_CHUNK_SIZE/8))
+	{
 
 		/* allocate new chunk and put it at the beginning of the list */
-		HashChunk newChunk = (HashChunk)MemoryContextAlloc(hashtable->batchCxt, offsetof(HashChunkData, data) + size);
+		HashChunk newChunk
+			= (HashChunk)MemoryContextAlloc(hashtable->batchCxt,
+											offsetof(HashChunkData, data) + size);
 
 		newChunk->maxlen = size;
 		newChunk->used = 0;
 		newChunk->ntuples = 0;
 
-		/* If there already is a chunk, add if after it so we can still use the space in it */
-		if (hashtable->chunks_batch != NULL) {
+		/*
+		 * If there already is a chunk, add the new one after it, so we can still use the
+		 * space in the existing one.
+		 */
+		if (hashtable->chunks_batch != NULL)
+		{
 			newChunk->next = hashtable->chunks_batch->next;
 			hashtable->chunks_batch->next = newChunk;
-		} else {
+		}
+		else
+		{
 			newChunk->next = hashtable->chunks_batch;
 			hashtable->chunks_batch = newChunk;
 		}
@@ -1375,12 +1582,18 @@ char * chunk_alloc(HashJoinTable hashtable, int size) {
 
 	}
 
-	/* ok, it's within chunk size, let's see if we have enough space for it in the current
-	 * chunk => if not, allocate a new chunk (chunks==NULL means this is the first chunk) */
-	if ((hashtable->chunks_batch == NULL) || (hashtable->chunks_batch->maxlen - hashtable->chunks_batch->used) < size) {
+	/*
+	 * Requested size is less than 1/8 of a chunk, so place it in the current chunk if there
+	 * is enough free space. If not, allocate a new chunk and add it there.
+	 */
+	if ((hashtable->chunks_batch == NULL) ||
+		(hashtable->chunks_batch->maxlen - hashtable->chunks_batch->used) < size)
+	{
 
 		/* allocate new chunk and put it at the beginning of the list */
-		HashChunk newChunk = (HashChunk)MemoryContextAlloc(hashtable->batchCxt, offsetof(HashChunkData, data) + HASH_CHUNK_SIZE);
+		HashChunk newChunk
+			= (HashChunk)MemoryContextAlloc(hashtable->batchCxt,
+											offsetof(HashChunkData, data) + HASH_CHUNK_SIZE);
 
 		newChunk->maxlen = HASH_CHUNK_SIZE;
 		newChunk->used = 0;
@@ -1421,7 +1634,6 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
 	hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
 	hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
 												   hashTupleSize);
-
 	hashTuple->hashvalue = hashvalue;
 	memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
 	HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index c80a4d1..eda7502 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -103,7 +103,6 @@ typedef struct HashSkewBucket
 #define SKEW_MIN_OUTER_FRACTION  0.01
 
 #define HASH_CHUNK_SIZE			(16*1024L)	/* 16kB chunks by default */
-#define HASH_SKEW_CHUNK_SIZE	(4*1024L)	/* 4kB chunks for skew buckets */
 
 typedef struct HashChunkData
 {
@@ -124,6 +123,10 @@ typedef struct HashJoinTableData
 	int			nbuckets;		/* # buckets in the in-memory hash table */
 	int			log2_nbuckets;	/* its log2 (nbuckets must be a power of 2) */
 
+	int			nbuckets_original;	/* # buckets when starting the first hash */
+	int			nbuckets_optimal;	/* optimal # buckets (per batch) */
+	int			log2_nbuckets_optimal;	/* same as log2_nbuckets optimal */
+
 	/* buckets[i] is head of list of tuples in i'th in-memory bucket */
 	struct HashJoinTupleData **buckets;
 	/* buckets array is per-batch storage, as are all the tuples */
@@ -145,6 +148,7 @@ typedef struct HashJoinTableData
 	bool		growEnabled;	/* flag to shut off nbatch increases */
 
 	double		totalTuples;	/* # tuples obtained from inner plan */
+	double		skewTuples;		/* # tuples inserted into skew tuples */
 
 	/*
 	 * These arrays are allocated for the life of the hash join, but only if
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 781a736..d128e08 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -1900,18 +1900,21 @@ show_hash_info(HashState *hashstate, ExplainState *es)
 		if (es->format != EXPLAIN_FORMAT_TEXT)
 		{
 			ExplainPropertyLong("Hash Buckets", hashtable->nbuckets, es);
+			ExplainPropertyLong("Original Hash Buckets",
+								hashtable->nbuckets_original, es);
 			ExplainPropertyLong("Hash Batches", hashtable->nbatch, es);
 			ExplainPropertyLong("Original Hash Batches",
 								hashtable->nbatch_original, es);
 			ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es);
 		}
-		else if (hashtable->nbatch_original != hashtable->nbatch)
+		else if ((hashtable->nbatch_original != hashtable->nbatch) ||
+				 (hashtable->nbuckets_original != hashtable->nbuckets))
 		{
 			appendStringInfoSpaces(es->str, es->indent * 2);
 			appendStringInfo(es->str,
-			"Buckets: %d  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
-							 hashtable->nbuckets, hashtable->nbatch,
-							 hashtable->nbatch_original, spacePeakKb);
+			"Buckets: %d (originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
+							 hashtable->nbuckets, hashtable->nbuckets_original,
+							 hashtable->nbatch, hashtable->nbatch_original, spacePeakKb);
 		}
 		else
 		{
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 589b2f1..688b6b4 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -37,8 +37,8 @@
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
 
-
 static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
+static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable);
 static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node,
 					  int mcvsToUse);
 static void ExecHashSkewTableInsert(HashJoinTable hashtable,
@@ -47,6 +47,10 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable,
 						int bucketNumber);
 static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
 
+static char * chunk_alloc(HashJoinTable hashtable, int tupleSize);
+
+/* Memory needed for optimal number of buckets. */
+#define BUCKETS_SPACE(htab)	((htab)->nbuckets_optimal * sizeof(HashJoinTuple))
 
 /* ----------------------------------------------------------------
  *		ExecHash
@@ -116,6 +120,7 @@ MultiExecHash(HashState *node)
 				/* It's a skew tuple, so put it into that hash table */
 				ExecHashSkewTableInsert(hashtable, slot, hashvalue,
 										bucketNumber);
+				hashtable->skewTuples += 1;
 			}
 			else
 			{
@@ -126,6 +131,27 @@ MultiExecHash(HashState *node)
 		}
 	}
 
+	/* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
+	if (hashtable->nbuckets != hashtable->nbuckets_optimal)
+	{
+
+		/* We never decrease the number of buckets. */
+		Assert(hashtable->nbuckets_optimal > hashtable->nbuckets);
+
+#ifdef HJDEBUG
+		printf("Increasing nbuckets %d => %d\n",
+			   hashtable->nbuckets, hashtable->nbuckets_optimal);
+#endif
+
+		ExecHashIncreaseNumBuckets(hashtable);
+
+	}
+
+	/* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+	hashtable->spaceUsed += BUCKETS_SPACE(hashtable);
+	if (hashtable->spaceUsed > hashtable->spacePeak)
+			hashtable->spacePeak = hashtable->spaceUsed;
+
 	/* must provide our own instrumentation support */
 	if (node->ps.instrument)
 		InstrStopNode(node->ps.instrument, hashtable->totalTuples);
@@ -271,7 +297,10 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
 	 */
 	hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));
 	hashtable->nbuckets = nbuckets;
+	hashtable->nbuckets_original = nbuckets;
+	hashtable->nbuckets_optimal = nbuckets;
 	hashtable->log2_nbuckets = log2_nbuckets;
+	hashtable->log2_nbuckets_optimal = log2_nbuckets;
 	hashtable->buckets = NULL;
 	hashtable->keepNulls = keepNulls;
 	hashtable->skewEnabled = false;
@@ -285,6 +314,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
 	hashtable->nbatch_outstart = nbatch;
 	hashtable->growEnabled = true;
 	hashtable->totalTuples = 0;
+	hashtable->skewTuples = 0;
 	hashtable->innerBatchFile = NULL;
 	hashtable->outerBatchFile = NULL;
 	hashtable->spaceUsed = 0;
@@ -294,6 +324,8 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
 	hashtable->spaceAllowedSkew =
 		hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100;
 
+	hashtable->chunks_batch = NULL;
+
 	/*
 	 * Get info about the hash functions to be used for each hash key. Also
 	 * remember whether the join operators are strict.
@@ -384,7 +416,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
  */
 
 /* Target bucket loading (tuples per bucket) */
-#define NTUP_PER_BUCKET			10
+#define NTUP_PER_BUCKET			1
 
 void
 ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
@@ -394,6 +426,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 {
 	int			tupsize;
 	double		inner_rel_bytes;
+	double		buckets_bytes;
 	long		hash_table_bytes;
 	long		skew_table_bytes;
 	long		max_pointers;
@@ -416,6 +449,13 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 	inner_rel_bytes = ntuples * tupsize;
 
 	/*
+	 * Estimate memory needed for buckets, assuming all the tuples fit into
+	 * a single batch (consider NTUP_PER_BUCKET tuples per bucket) - buckets
+	 * are just usual 'HashJoinTuple' (pointers to HashJoinTupleData).
+	 */
+	buckets_bytes = sizeof(HashJoinTuple) * my_log2(ntuples / NTUP_PER_BUCKET);
+
+	/*
 	 * Target in-memory hashtable size is work_mem kilobytes.
 	 */
 	hash_table_bytes = work_mem * 1024L;
@@ -466,16 +506,13 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 	/* also ensure we avoid integer overflow in nbatch and nbuckets */
 	max_pointers = Min(max_pointers, INT_MAX / 2);
 
-	if (inner_rel_bytes > hash_table_bytes)
+	if ((inner_rel_bytes + buckets_bytes) > hash_table_bytes)
 	{
 		/* We'll need multiple batches */
 		long		lbuckets;
 		double		dbatch;
 		int			minbatch;
-
-		lbuckets = (hash_table_bytes / tupsize) / NTUP_PER_BUCKET;
-		lbuckets = Min(lbuckets, max_pointers);
-		nbuckets = (int) lbuckets;
+		double		batch_bytes;
 
 		dbatch = ceil(inner_rel_bytes / hash_table_bytes);
 		dbatch = Min(dbatch, max_pointers);
@@ -483,6 +520,40 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 		nbatch = 2;
 		while (nbatch < minbatch)
 			nbatch <<= 1;
+
+		/* assuming equally-sized batches, compute bytes of hash table */
+		batch_bytes = inner_rel_bytes / nbatch;
+
+		/* when batching, for the buckets, assume full work_mem */
+		lbuckets = 1 << my_log2(hash_table_bytes / (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple)));
+		buckets_bytes = lbuckets * sizeof(HashJoinTuple);
+
+		/*
+		 * Increase the nbatch until we get both tuples and buckets into work_mem.
+		 * 
+		 * The loop should not execute more than once in most cases, becase tuples are
+		 * usually much wider than buckets (usually 8B pointers), so by using only
+		 * (batch_bytes/2) should get us below work_mem.
+		 *
+		 * The worst case is that (nbuckets == 2*ntuples-1), giving us about twice the
+		 * number of buckets, i.e. about 2*sizeof(void*) per tuple. But that's
+		 * the consequence of how NTUP_PER_BUCKET is chosen, and work_mem limit.
+		 */
+
+		while (batch_bytes + buckets_bytes > hash_table_bytes)
+		{
+
+			/* increment number of batches, and re-evaluate the amount of memory */
+			nbatch <<= 1;
+
+			/* assuming equally-sized batches, compute bytes of hash table and buckets */
+			batch_bytes = inner_rel_bytes / nbatch;
+
+		}
+
+		/* protect against nbucket overflow */
+		lbuckets = Min(lbuckets, max_pointers);
+		nbuckets = (int) lbuckets;
 	}
 	else
 	{
@@ -556,11 +627,12 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	int			oldnbatch = hashtable->nbatch;
 	int			curbatch = hashtable->curbatch;
 	int			nbatch;
-	int			i;
 	MemoryContext oldcxt;
 	long		ninmemory;
 	long		nfreed;
 
+	HashChunk	oldchunks = hashtable->chunks_batch;
+
 	/* do nothing if we've decided to shut off growth */
 	if (!hashtable->growEnabled)
 		return;
@@ -612,51 +684,84 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	 */
 	ninmemory = nfreed = 0;
 
-	for (i = 0; i < hashtable->nbuckets; i++)
+	/* 
+	 * We will scan the tuples through chunks, not buckets, so we can reset the
+	 * buckets and reinsert all the tuples there. So resetting buckets is OK.
+	 */
+	memset(hashtable->buckets, 0, sizeof(HashJoinTuple) * hashtable->nbuckets);
+
+	/* If we need to do a resize of buckets, we can do it while rebatching. */
+	if (hashtable->nbuckets_optimal != hashtable->nbuckets)
+	{
+		hashtable->nbuckets = hashtable->nbuckets_optimal;
+		hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal;
+
+		hashtable->buckets = repalloc(hashtable->buckets,
+									  sizeof(HashJoinTuple) * hashtable->nbuckets);
+	}
+
+	/* Reset the chunks, so that we allocate everything into a new list of chunks. */
+	hashtable->chunks_batch = NULL;
+
+	/* Scan through the old chunks, keep only tuples from the current batch. */
+	while (oldchunks != NULL)
 	{
-		HashJoinTuple prevtuple;
-		HashJoinTuple tuple;
 
-		prevtuple = NULL;
-		tuple = hashtable->buckets[i];
+		/* pointer to the next memory chunk (may be NULL for the last chunk) */
+		HashChunk nextchunk = oldchunks->next;
+
+		/* position within the chunk (up to oldchunks->used) */
+		size_t idx = 0;
 
-		while (tuple != NULL)
+		/* process all tuples stored in this chunk (and then free it) */
+		while (idx < oldchunks->used)
 		{
-			/* save link in case we delete */
-			HashJoinTuple nexttuple = tuple->next;
-			int			bucketno;
-			int			batchno;
+
+			/* get the hashjoin tuple and mintuple stored right after it */
+			HashJoinTuple hashTuple = (HashJoinTuple)(oldchunks->data + idx);
+			MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
+
+			int		hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
+
+			int		bucketno;
+			int		batchno;
 
 			ninmemory++;
-			ExecHashGetBucketAndBatch(hashtable, tuple->hashvalue,
+			ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
 									  &bucketno, &batchno);
-			Assert(bucketno == i);
+
 			if (batchno == curbatch)
 			{
-				/* keep tuple */
-				prevtuple = tuple;
+				/* keep tuple - this means we need to copy it into the new chunks */
+				HashJoinTuple copyTuple = (HashJoinTuple) chunk_alloc(hashtable, hashTupleSize);
+				memcpy(copyTuple, hashTuple, hashTupleSize);
+
+				/* and of course add it to the new buckets - just push the copy it onto the front
+				 * of the bucket's list */
+				copyTuple->next = hashtable->buckets[bucketno];
+				hashtable->buckets[bucketno] = copyTuple;
 			}
 			else
 			{
 				/* dump it out */
 				Assert(batchno > curbatch);
-				ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(tuple),
-									  tuple->hashvalue,
+				ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
+									  hashTuple->hashvalue,
 									  &hashtable->innerBatchFile[batchno]);
-				/* and remove from hash table */
-				if (prevtuple)
-					prevtuple->next = nexttuple;
-				else
-					hashtable->buckets[i] = nexttuple;
-				/* prevtuple doesn't change */
-				hashtable->spaceUsed -=
-					HJTUPLE_OVERHEAD + HJTUPLE_MINTUPLE(tuple)->t_len;
-				pfree(tuple);
+
+				hashtable->spaceUsed -= hashTupleSize;
 				nfreed++;
 			}
 
-			tuple = nexttuple;
+			/* bytes occupied in memory HJ tuple overhead + actual tuple length */
+			idx += hashTupleSize;
+
 		}
+
+		/* so, we're done with this chunk - free it and proceed to the next one */
+		pfree(oldchunks);
+		oldchunks = nextchunk;
+
 	}
 
 #ifdef HJDEBUG
@@ -682,6 +787,93 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 }
 
 /*
+ * ExecHashIncreaseNumBuckets
+ *		increase the original number of buckets in order to reduce
+ *		number of tuples per bucket
+ */
+static void
+ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
+{
+	HashChunk	chunk;
+
+	/* do nothing if not an increase (it's called increase for a reason) */
+	if (hashtable->nbuckets >= hashtable->nbuckets_optimal)
+		return;
+
+	/*
+	 * We already know the optimal number of buckets, so let's just
+	 * compute the log2_nbuckets for it.
+	 */
+	hashtable->nbuckets = hashtable->nbuckets_optimal;
+	hashtable->log2_nbuckets = my_log2(hashtable->nbuckets_optimal);
+
+	Assert(hashtable->nbuckets > 1);
+	Assert(hashtable->nbuckets <= (INT_MAX/2));
+	Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets));
+
+#ifdef HJDEBUG
+	printf("Increasing nbuckets to %d\n", hashtable->nbuckets);
+#endif
+
+	/* 
+	 * Just reallocate the proper number of buckets - we don't need to
+	 * walk through them - we can walk the dense-allocated chunks
+	 * (just like in ExecHashIncreaseNumBatches, but without all the
+	 * copying into new chunks)
+	 */
+
+	hashtable->buckets = (HashJoinTuple *) repalloc(hashtable->buckets,
+								hashtable->nbuckets * sizeof(HashJoinTuple));
+
+	memset(hashtable->buckets, 0, sizeof(void*) * hashtable->nbuckets);
+
+	/* scan through all tuples in all chunks, rebuild the hash table */
+	chunk = hashtable->chunks_batch;
+	while (chunk != NULL)
+	{
+
+		/* position within the buffer (up to chunk->used) */
+		size_t idx = 0;
+
+		/* process all tuples stored in this chunk (and then free it) */
+		while (idx < chunk->used)
+		{
+
+			/* get the hashjoin tuple and mintuple stored right after it */
+			HashJoinTuple hashTuple = (HashJoinTuple)(chunk->data + idx);
+			MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
+
+			int		hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
+
+			int		bucketno;
+			int		batchno;
+
+			ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
+									  &bucketno, &batchno);
+
+			/* just add the tuple to the proper bucket */
+			hashTuple->next = hashtable->buckets[bucketno];
+			hashtable->buckets[bucketno] = hashTuple;
+
+			/* bytes occupied in memory HJ tuple overhead + actual tuple length */
+			idx += hashTupleSize;
+
+		}
+
+		/* proceed to the next chunk */
+		chunk = chunk->next;
+
+	}
+
+#ifdef HJDEBUG
+	printf("Nbuckets increased to %d, average items per bucket %.1f\n",
+		   hashtable->nbuckets, batchTuples / hashtable->nbuckets);
+#endif
+
+}
+
+
+/*
  * ExecHashTableInsert
  *		insert a tuple into the hash table depending on the hash value
  *		it may just go to a temp file for later batches
@@ -717,8 +909,8 @@ ExecHashTableInsert(HashJoinTable hashtable,
 
 		/* Create the HashJoinTuple */
 		hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
-		hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
-													   hashTupleSize);
+		hashTuple = (HashJoinTuple) chunk_alloc(hashtable, hashTupleSize);
+
 		hashTuple->hashvalue = hashvalue;
 		memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
 
@@ -734,12 +926,34 @@ ExecHashTableInsert(HashJoinTable hashtable,
 		hashTuple->next = hashtable->buckets[bucketno];
 		hashtable->buckets[bucketno] = hashTuple;
 
+		/*
+		 * Increase the (optimal) number of buckets if we just exceeded NTUP_PER_BUCKET,
+		 * but only when there's still a single batch at this moment.
+		 */
+		if ((hashtable->nbatch == 1) &&
+			((hashtable->totalTuples - hashtable->skewTuples)
+				>= (hashtable->nbuckets_optimal * NTUP_PER_BUCKET)))
+		{
+			/* overflow protection */
+			if (hashtable->nbuckets_optimal <= (INT_MAX/2))
+			{
+				hashtable->nbuckets_optimal *= 2;
+				hashtable->log2_nbuckets_optimal += 1;
+			}
+		}
+
 		/* Account for space used, and back off if we've used too much */
 		hashtable->spaceUsed += hashTupleSize;
 		if (hashtable->spaceUsed > hashtable->spacePeak)
 			hashtable->spacePeak = hashtable->spaceUsed;
-		if (hashtable->spaceUsed > hashtable->spaceAllowed)
+
+		/*
+		 * Do we need to increase number of batches? Account for space required
+		 * by buckets (optimal number).
+		 */
+		if (hashtable->spaceUsed + BUCKETS_SPACE(hashtable) > hashtable->spaceAllowed)
 			ExecHashIncreaseNumBatches(hashtable);
+
 	}
 	else
 	{
@@ -862,7 +1076,10 @@ ExecHashGetHashValue(HashJoinTable hashtable,
  * functions are good about randomizing all their output bits, else we are
  * likely to have very skewed bucket or batch occupancy.)
  *
- * nbuckets doesn't change over the course of the join.
+ * The nbuckets/log2_nbuckets may change while (nbatch==1) because of dynamic
+ * buckets growth. Once we start batching, the value is fixed and does not
+ * change over the course of join (making it possible to compute batch number
+ * the way we do here).
  *
  * nbatch is always a power of 2; we increase it only by doubling it.  This
  * effectively adds one more bit to the top of the batchno.
@@ -1068,6 +1285,13 @@ ExecHashTableReset(HashJoinTable hashtable)
 	hashtable->spaceUsed = 0;
 
 	MemoryContextSwitchTo(oldcxt);
+
+	/*
+	 * Reset the chunks too (the memory was allocated within batchCxt, so it's
+	 * already freed by resetting the batch context -just set it to NULL).
+	 */
+	hashtable->chunks_batch = NULL;
+
 }
 
 /*
@@ -1318,6 +1542,76 @@ ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
 	return INVALID_SKEW_BUCKET_NO;
 }
 
+static
+char * chunk_alloc(HashJoinTable hashtable, int size) {
+
+	/* XXX maybe we should use MAXALIGN(size) here ... */
+
+	/* if requested size is larger than of 1/8 of chunk size, allocate a separate chunk */
+	if (size > (HASH_CHUNK_SIZE/8))
+	{
+
+		/* allocate new chunk and put it at the beginning of the list */
+		HashChunk newChunk
+			= (HashChunk)MemoryContextAlloc(hashtable->batchCxt,
+											offsetof(HashChunkData, data) + size);
+
+		newChunk->maxlen = size;
+		newChunk->used = 0;
+		newChunk->ntuples = 0;
+
+		/*
+		 * If there already is a chunk, add the new one after it, so we can still use the
+		 * space in the existing one.
+		 */
+		if (hashtable->chunks_batch != NULL)
+		{
+			newChunk->next = hashtable->chunks_batch->next;
+			hashtable->chunks_batch->next = newChunk;
+		}
+		else
+		{
+			newChunk->next = hashtable->chunks_batch;
+			hashtable->chunks_batch = newChunk;
+		}
+
+		newChunk->used += size;
+		newChunk->ntuples += 1;
+
+		return newChunk->data;
+
+	}
+
+	/*
+	 * Requested size is less than 1/8 of a chunk, so place it in the current chunk if there
+	 * is enough free space. If not, allocate a new chunk and add it there.
+	 */
+	if ((hashtable->chunks_batch == NULL) ||
+		(hashtable->chunks_batch->maxlen - hashtable->chunks_batch->used) < size)
+	{
+
+		/* allocate new chunk and put it at the beginning of the list */
+		HashChunk newChunk
+			= (HashChunk)MemoryContextAlloc(hashtable->batchCxt,
+											offsetof(HashChunkData, data) + HASH_CHUNK_SIZE);
+
+		newChunk->maxlen = HASH_CHUNK_SIZE;
+		newChunk->used = 0;
+		newChunk->ntuples = 0;
+
+		newChunk->next = hashtable->chunks_batch;
+		hashtable->chunks_batch = newChunk;
+	}
+
+	/* OK, we have enough space in the chunk, let's add the tuple */
+	hashtable->chunks_batch->used += size;
+	hashtable->chunks_batch->ntuples += 1;
+
+	/* allocate pointer to the start of the tuple memory */
+	return hashtable->chunks_batch->data + (hashtable->chunks_batch->used - size);
+
+}
+
 /*
  * ExecHashSkewTableInsert
  *
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 3beae40..eda7502 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -102,12 +102,31 @@ typedef struct HashSkewBucket
 #define SKEW_WORK_MEM_PERCENT  2
 #define SKEW_MIN_OUTER_FRACTION  0.01
 
+#define HASH_CHUNK_SIZE			(16*1024L)	/* 16kB chunks by default */
+
+typedef struct HashChunkData
+{
+	int			ntuples;	/* number of tuples stored in this chunk */
+	size_t		maxlen;		/* length of the buffer */
+	size_t		used;		/* number of chunk bytes already used */
+
+	struct HashChunkData   *next;	/* pointer to the next chunk (linked list) */
+
+	char		data[1];	/* buffer allocated at the end */
+} HashChunkData;
+
+typedef struct HashChunkData* HashChunk;
+
 
 typedef struct HashJoinTableData
 {
 	int			nbuckets;		/* # buckets in the in-memory hash table */
 	int			log2_nbuckets;	/* its log2 (nbuckets must be a power of 2) */
 
+	int			nbuckets_original;	/* # buckets when starting the first hash */
+	int			nbuckets_optimal;	/* optimal # buckets (per batch) */
+	int			log2_nbuckets_optimal;	/* same as log2_nbuckets optimal */
+
 	/* buckets[i] is head of list of tuples in i'th in-memory bucket */
 	struct HashJoinTupleData **buckets;
 	/* buckets array is per-batch storage, as are all the tuples */
@@ -129,6 +148,7 @@ typedef struct HashJoinTableData
 	bool		growEnabled;	/* flag to shut off nbatch increases */
 
 	double		totalTuples;	/* # tuples obtained from inner plan */
+	double		skewTuples;		/* # tuples inserted into skew tuples */
 
 	/*
 	 * These arrays are allocated for the life of the hash join, but only if
@@ -157,6 +177,10 @@ typedef struct HashJoinTableData
 
 	MemoryContext hashCxt;		/* context for whole-hash-join storage */
 	MemoryContext batchCxt;		/* context for this-batch-only storage */
+
+	/* used for dense allocation of tuples (into linked chunks) */
+	HashChunk	chunks_batch;	/*  one list for the whole batch */
+
 }	HashJoinTableData;
 
 #endif   /* HASHJOIN_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to