Hi,

attached is v2 of the patch, with a bugfix and two significant improvements:

1) bugfix: forgotten memset() in ExecHashIncreaseNumBatches()

   Caused segfaults whenever we started with a single batch and then
   had to increase the number of batches.

2) 0002: postpone the batching (not just build of buckets)

   When ExecChooseHashTableSize finds out we'll probably need batching,
   we start with nbatch=1 anyway, and only switch to batching (with the
   estimated number of batches) after filling work_mem.

   This helps in two basic cases - firstly, when we do over-estimates
   we may end up doing batching even when we could do with a single
   batch (i.e. without writing to temp files at all). That's really
   expensive, and this helps with that - not entirely, because we can
   only distinguish "no batching vs. batching" case and not the number
   of batches, but that's the more interesting case anyway (the
   difference between batching with 16 or 32 batches is not large, at
   least in my experience).

   The other use case is the patch adding bloom filters, because it
   allows with properly sizing the bloom filter, which needs the number
   of distinct values. So while the filter might be sized using the
   estimated number of tuples passed to the Hash, it's likely to be
   larger than needed and thus more expensive. So sizing the bloom
   filter is crucial, and this change allows first accumulating enough
   data to estimate the size of bloom filter first. More discussion in
   the other patch.

3) 0003: eliminate the MaxAlloc limit

   Currently the size of buckets is limited my MaxAlloc, which means it
   can't exceed 512MB (assuming I've done my math correctly), which
   means ~67M buckets. This removes the MaxAlloc limit and just keeps
   the (INT_MAX/2) limit, so ~2B rows. I don't think it's worth
   extending further at this point.

   There was a discussion about this, quoting the f2fc98fb message:

      Limit the size of the hashtable pointer array to not more than
      MaxAllocSize, per reports from Kouhei Kaigai and others of
      "invalid memory alloc request size" failures.  There was
      discussion of allowing the array to get larger than that by using
      the "huge" palloc API, but so far no proof that that is actually
      a good idea, and at this point in the 9.5 cycle major changes
      from old behavior don't seem like the way to go.

   I believe the objections were correct, because it'd really waste a
   lot of memory in case of over-estimates. I do however think that
   doing (1) and (2) fixes this issue, because with those changes we
   never allocate the buckets based on the initial estimate. We pretty
   much get the exact number of buckets necessary.

I haven't done any performance testing yet (well, I did, but not reliable enough for sharing here). I'll post more data early January, once the machine completes other tests it's running right now.

I've also been thinking about what other optimizations might be possible, and the one thing I'm wondering about is adding HyperLogLog counter. The thing is that we do size buckets based on number of rows, but that may be nonsense - to size the hash table, we actually need number of distinct values. So when the Hash contains duplicate rows (say 10 rows for each value), we end up with only ~10% of buckets containing any data (about 10 tuples in a list).

If we knew the number of distinct values, we could make the buckets smaller and thus reduce the memory requirements significantly.

I mention this because the patch with bloom filters actually uses HLL to size the bloom filters, so this would not be really introducing any new code.


regards

--
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From 3aaf27f722e6c90702af8f51fd11a95b8c173c38 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@pgaddict.com>
Date: Sun, 27 Dec 2015 18:25:51 +0100
Subject: [PATCH 1/5] delayed build of hash buckets v2

Removes forgotten memset in ExecHashIncreaseNumBatches() causing
crashes when we start without batching and find out we need to
do batching later.
---
 src/backend/commands/explain.c  |  8 +---
 src/backend/executor/nodeHash.c | 94 +++++++++++++----------------------------
 src/include/executor/hashjoin.h |  5 ---
 3 files changed, 32 insertions(+), 75 deletions(-)

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 12dae77..8fd9c9f 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -2116,21 +2116,17 @@ show_hash_info(HashState *hashstate, ExplainState *es)
 		if (es->format != EXPLAIN_FORMAT_TEXT)
 		{
 			ExplainPropertyLong("Hash Buckets", hashtable->nbuckets, es);
-			ExplainPropertyLong("Original Hash Buckets",
-								hashtable->nbuckets_original, es);
 			ExplainPropertyLong("Hash Batches", hashtable->nbatch, es);
 			ExplainPropertyLong("Original Hash Batches",
 								hashtable->nbatch_original, es);
 			ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es);
 		}
-		else if (hashtable->nbatch_original != hashtable->nbatch ||
-				 hashtable->nbuckets_original != hashtable->nbuckets)
+		else if (hashtable->nbatch_original != hashtable->nbatch)
 		{
 			appendStringInfoSpaces(es->str, es->indent * 2);
 			appendStringInfo(es->str,
-							 "Buckets: %d (originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
+							 "Buckets: %d Batches: %d (originally %d)  Memory Usage: %ldkB\n",
 							 hashtable->nbuckets,
-							 hashtable->nbuckets_original,
 							 hashtable->nbatch,
 							 hashtable->nbatch_original,
 							 spacePeakKb);
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 5e05ec3..7708581 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -39,7 +39,7 @@
 
 
 static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
-static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable);
+static void ExecHashBuildBuckets(HashJoinTable hashtable);
 static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node,
 					  int mcvsToUse);
 static void ExecHashSkewTableInsert(HashJoinTable hashtable,
@@ -129,9 +129,8 @@ MultiExecHash(HashState *node)
 		}
 	}
 
-	/* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
-	if (hashtable->nbuckets != hashtable->nbuckets_optimal)
-		ExecHashIncreaseNumBuckets(hashtable);
+	/* Construct the actual hash table (using the optimal number of buckets). */
+	ExecHashBuildBuckets(hashtable);
 
 	/* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
 	hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
@@ -283,10 +282,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
 	 */
 	hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));
 	hashtable->nbuckets = nbuckets;
-	hashtable->nbuckets_original = nbuckets;
-	hashtable->nbuckets_optimal = nbuckets;
 	hashtable->log2_nbuckets = log2_nbuckets;
-	hashtable->log2_nbuckets_optimal = log2_nbuckets;
 	hashtable->buckets = NULL;
 	hashtable->keepNulls = keepNulls;
 	hashtable->skewEnabled = false;
@@ -372,18 +368,11 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
 	}
 
 	/*
-	 * Prepare context for the first-scan space allocations; allocate the
-	 * hashbucket array therein, and set each bucket "empty".
-	 */
-	MemoryContextSwitchTo(hashtable->batchCxt);
-
-	hashtable->buckets = (HashJoinTuple *)
-		palloc0(nbuckets * sizeof(HashJoinTuple));
-
-	/*
 	 * Set up for skew optimization, if possible and there's a need for more
 	 * than one batch.  (In a one-batch join, there's no point in it.)
 	 */
+	MemoryContextSwitchTo(hashtable->batchCxt);
+
 	if (nbatch > 1)
 		ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
 
@@ -654,25 +643,11 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	 */
 	ninmemory = nfreed = 0;
 
-	/* If know we need to resize nbuckets, we can do it while rebatching. */
-	if (hashtable->nbuckets_optimal != hashtable->nbuckets)
-	{
-		/* we never decrease the number of buckets */
-		Assert(hashtable->nbuckets_optimal > hashtable->nbuckets);
-
-		hashtable->nbuckets = hashtable->nbuckets_optimal;
-		hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal;
-
-		hashtable->buckets = repalloc(hashtable->buckets,
-								sizeof(HashJoinTuple) * hashtable->nbuckets);
-	}
-
 	/*
 	 * We will scan through the chunks directly, so that we can reset the
 	 * buckets now and not have to keep track which tuples in the buckets have
 	 * already been processed. We will free the old chunks as we go.
 	 */
-	memset(hashtable->buckets, 0, sizeof(HashJoinTuple) * hashtable->nbuckets);
 	oldchunks = hashtable->chunks;
 	hashtable->chunks = NULL;
 
@@ -704,10 +679,6 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 
 				copyTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
 				memcpy(copyTuple, hashTuple, hashTupleSize);
-
-				/* and add it back to the appropriate bucket */
-				copyTuple->next = hashtable->buckets[bucketno];
-				hashtable->buckets[bucketno] = copyTuple;
 			}
 			else
 			{
@@ -753,27 +724,20 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 }
 
 /*
- * ExecHashIncreaseNumBuckets
- *		increase the original number of buckets in order to reduce
- *		number of tuples per bucket
+ * ExecHashBuildBuckets
+ *		complete building the hash table by allocating the optimal number of
+ * 		buckets and filling them with tuples
  */
 static void
-ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
+ExecHashBuildBuckets(HashJoinTable hashtable)
 {
 	HashMemoryChunk chunk;
 
-	/* do nothing if not an increase (it's called increase for a reason) */
-	if (hashtable->nbuckets >= hashtable->nbuckets_optimal)
-		return;
-
 #ifdef HJDEBUG
-	printf("Increasing nbuckets %d => %d\n",
-		   hashtable->nbuckets, hashtable->nbuckets_optimal);
+	printf("Constructing table with nbuckets %d\n", hashtable->nbuckets);
 #endif
 
-	hashtable->nbuckets = hashtable->nbuckets_optimal;
-	hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal;
-
+	Assert(hashtable->buckets == NULL);
 	Assert(hashtable->nbuckets > 1);
 	Assert(hashtable->nbuckets <= (INT_MAX / 2));
 	Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets));
@@ -785,10 +749,7 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
 	 * chunks)
 	 */
 	hashtable->buckets =
-		(HashJoinTuple *) repalloc(hashtable->buckets,
-								hashtable->nbuckets * sizeof(HashJoinTuple));
-
-	memset(hashtable->buckets, 0, hashtable->nbuckets * sizeof(HashJoinTuple));
+		(HashJoinTuple *) palloc0(hashtable->nbuckets * sizeof(HashJoinTuple));
 
 	/* scan through all tuples in all chunks to rebuild the hash table */
 	for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next)
@@ -816,7 +777,7 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
 	}
 
 #ifdef HJDEBUG
-	printf("Nbuckets increased to %d, average items per bucket %.1f\n",
+	printf("Nbuckets set to %d, average items per bucket %.1f\n",
 		   hashtable->nbuckets, batchTuples / hashtable->nbuckets);
 #endif
 }
@@ -872,9 +833,15 @@ ExecHashTableInsert(HashJoinTable hashtable,
 		 */
 		HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
 
-		/* Push it onto the front of the bucket's list */
-		hashTuple->next = hashtable->buckets[bucketno];
-		hashtable->buckets[bucketno] = hashTuple;
+		/*
+		 * We only do this if we already have buckets allocated, i.e. after
+		 * the first batch.
+		 */
+		if (hashtable->buckets != NULL)
+		{
+			hashTuple->next = hashtable->buckets[bucketno];
+			hashtable->buckets[bucketno] = hashTuple;
+		}
 
 		/*
 		 * Increase the (optimal) number of buckets if we just exceeded the
@@ -882,14 +849,14 @@ ExecHashTableInsert(HashJoinTable hashtable,
 		 * batch.
 		 */
 		if (hashtable->nbatch == 1 &&
-			ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))
+			ntuples > (hashtable->nbuckets * NTUP_PER_BUCKET))
 		{
 			/* Guard against integer overflow and alloc size overflow */
-			if (hashtable->nbuckets_optimal <= INT_MAX / 2 &&
-				hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinTuple))
+			if (hashtable->nbuckets <= INT_MAX / 2 &&
+				hashtable->nbuckets * 2 <= MaxAllocSize / sizeof(HashJoinTuple))
 			{
-				hashtable->nbuckets_optimal *= 2;
-				hashtable->log2_nbuckets_optimal += 1;
+				hashtable->nbuckets *= 2;
+				hashtable->log2_nbuckets += 1;
 			}
 		}
 
@@ -898,7 +865,7 @@ ExecHashTableInsert(HashJoinTable hashtable,
 		if (hashtable->spaceUsed > hashtable->spacePeak)
 			hashtable->spacePeak = hashtable->spaceUsed;
 		if (hashtable->spaceUsed +
-			hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
+			hashtable->nbuckets * sizeof(HashJoinTuple)
 			> hashtable->spaceAllowed)
 			ExecHashIncreaseNumBatches(hashtable);
 	}
@@ -1216,7 +1183,6 @@ void
 ExecHashTableReset(HashJoinTable hashtable)
 {
 	MemoryContext oldcxt;
-	int			nbuckets = hashtable->nbuckets;
 
 	/*
 	 * Release all the hash buckets and tuples acquired in the prior pass, and
@@ -1226,8 +1192,8 @@ ExecHashTableReset(HashJoinTable hashtable)
 	oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
 
 	/* Reallocate and reinitialize the hash bucket headers. */
-	hashtable->buckets = (HashJoinTuple *)
-		palloc0(nbuckets * sizeof(HashJoinTuple));
+	hashtable->buckets =
+		(HashJoinTuple *) palloc0(hashtable->nbuckets * sizeof(HashJoinTuple));
 
 	hashtable->spaceUsed = 0;
 
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 7a51ea6..255c506 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -128,11 +128,6 @@ typedef struct HashJoinTableData
 	int			nbuckets;		/* # buckets in the in-memory hash table */
 	int			log2_nbuckets;	/* its log2 (nbuckets must be a power of 2) */
 
-	int			nbuckets_original;		/* # buckets when starting the first
-										 * hash */
-	int			nbuckets_optimal;		/* optimal # buckets (per batch) */
-	int			log2_nbuckets_optimal;	/* log2(nbuckets_optimal) */
-
 	/* buckets[i] is head of list of tuples in i'th in-memory bucket */
 	struct HashJoinTupleData **buckets;
 	/* buckets array is per-batch storage, as are all the tuples */
-- 
2.1.0

>From 8c0c25fc894de49054fb6711af96cbcf61cc2b51 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@pgaddict.com>
Date: Wed, 30 Dec 2015 14:02:26 +0100
Subject: [PATCH 2/5] always start in un-batched mode

---
 src/backend/executor/nodeHash.c | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 7708581..e7d4b5f 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -290,7 +290,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
 	hashtable->skewBucketLen = 0;
 	hashtable->nSkewBuckets = 0;
 	hashtable->skewBucketNums = NULL;
-	hashtable->nbatch = nbatch;
+	hashtable->nbatch = 1;
 	hashtable->curbatch = 0;
 	hashtable->nbatch_original = nbatch;
 	hashtable->nbatch_outstart = nbatch;
@@ -600,7 +600,15 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	if (oldnbatch > Min(INT_MAX / 2, MaxAllocSize / (sizeof(void *) * 2)))
 		return;
 
-	nbatch = oldnbatch * 2;
+	/*
+	 * If we're incrementing the number of batches for the first time,
+	 * let's see if we should start with nbatch_original.
+	 */
+	if ((oldnbatch == 1) && (hashtable->nbatch_original > 1))
+		nbatch = hashtable->nbatch_original;
+	else
+		nbatch = oldnbatch * 2;
+
 	Assert(nbatch > 1);
 
 #ifdef HJDEBUG
-- 
2.1.0

>From b07280437d5dc3cf60b2755b2562825cd0da0ec6 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@pgaddict.com>
Date: Wed, 30 Dec 2015 15:56:56 +0100
Subject: [PATCH 3/5] eliminate the MaxAllocSize limit, now buckets can be up
 to ~8GB (INT_MAX/2 items)

Note: don't forget to zero buckets allocated with AllocHuge()
---
 src/backend/executor/nodeHash.c | 26 +++++++++++++++-----------
 1 file changed, 15 insertions(+), 11 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index e7d4b5f..c53d485 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -466,22 +466,19 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 
 	/*
 	 * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when
-	 * memory is filled, assuming a single batch; but limit the value so that
-	 * the pointer arrays we'll try to allocate do not exceed work_mem nor
-	 * MaxAllocSize.
+	 * memory is filled, assuming a single batch.
 	 *
 	 * Note that both nbuckets and nbatch must be powers of 2 to make
 	 * ExecHashGetBucketAndBatch fast.
 	 */
 	max_pointers = (work_mem * 1024L) / sizeof(HashJoinTuple);
-	max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple));
+
 	/* If max_pointers isn't a power of 2, must round it down to one */
 	mppow2 = 1L << my_log2(max_pointers);
 	if (max_pointers != mppow2)
 		max_pointers = mppow2 / 2;
 
-	/* Also ensure we avoid integer overflow in nbatch and nbuckets */
-	/* (this step is redundant given the current value of MaxAllocSize) */
+	/* Ensure we avoid integer overflow in nbatch and nbuckets */
 	max_pointers = Min(max_pointers, INT_MAX / 2);
 
 	dbuckets = ceil(ntuples / NTUP_PER_BUCKET);
@@ -597,7 +594,7 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 		return;
 
 	/* safety check to avoid overflow */
-	if (oldnbatch > Min(INT_MAX / 2, MaxAllocSize / (sizeof(void *) * 2)))
+	if (oldnbatch > (INT_MAX / 2))
 		return;
 
 	/*
@@ -757,7 +754,11 @@ ExecHashBuildBuckets(HashJoinTable hashtable)
 	 * chunks)
 	 */
 	hashtable->buckets =
-		(HashJoinTuple *) palloc0(hashtable->nbuckets * sizeof(HashJoinTuple));
+		(HashJoinTuple *) MemoryContextAllocHuge(hashtable->batchCxt,
+								hashtable->nbuckets * sizeof(HashJoinTuple));
+
+	/* Don't forget to zero the buckets (AllocHuge does not do that). */
+	memset(hashtable->buckets, 0, hashtable->nbuckets * sizeof(HashJoinTuple));
 
 	/* scan through all tuples in all chunks to rebuild the hash table */
 	for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next)
@@ -860,8 +861,7 @@ ExecHashTableInsert(HashJoinTable hashtable,
 			ntuples > (hashtable->nbuckets * NTUP_PER_BUCKET))
 		{
 			/* Guard against integer overflow and alloc size overflow */
-			if (hashtable->nbuckets <= INT_MAX / 2 &&
-				hashtable->nbuckets * 2 <= MaxAllocSize / sizeof(HashJoinTuple))
+			if (hashtable->nbuckets <= INT_MAX / 2)
 			{
 				hashtable->nbuckets *= 2;
 				hashtable->log2_nbuckets += 1;
@@ -1201,7 +1201,11 @@ ExecHashTableReset(HashJoinTable hashtable)
 
 	/* Reallocate and reinitialize the hash bucket headers. */
 	hashtable->buckets =
-		(HashJoinTuple *) palloc0(hashtable->nbuckets * sizeof(HashJoinTuple));
+		(HashJoinTuple *) MemoryContextAllocHuge(hashtable->batchCxt,
+								hashtable->nbuckets * sizeof(HashJoinTuple));
+
+	/* Don't forget to zero the buckets (AllocHuge does not do that). */
+	memset(hashtable->buckets, 0, hashtable->nbuckets * sizeof(HashJoinTuple));
 
 	hashtable->spaceUsed = 0;
 
-- 
2.1.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to