On 30.6.2014 23:12, Tomas Vondra wrote:
> Hi,
> 
> attached is v5 of the patch. The main change is that scaling the number
> of buckets is done only once, after the initial hash table is build. The
> main advantage of this is lower price. This also allowed some cleanup of
> unecessary code.
> 
> However, this new patch causes warning like this:
> 
>     WARNING:  temporary file leak: File 231 still referenced
> 
> I've been eyeballing the code for a while now, but I still fail to see
> where this comes from :-( Any ideas?

Meh, the patches are wrong - I haven't realized the tight coupling
between buckets/batches in ExecHashGetBucketAndBatch:

  *bucketno = hashvalue & (nbuckets - 1);
  *batchno = (hashvalue >> hashtable->log2_nbuckets) & (nbatch - 1);

The previous patches worked mostly by pure luck (the nbuckets was set
only in the first batch), but once I moved the code at the end, it
started failing. And by "worked" I mean "didn't throw an error, but
probably returned bogus results with (nbatch>1)".

However, ISTM this nbuckets-nbatch coupling is not really necessary,
it's only constructed this way to assign independent batch/bucket. So I
went and changed the code like this:

  *bucketno = hashvalue & (nbuckets - 1);
  *batchno = (hashvalue >> (32 - hashtable->log2_nbatch));

I.e. using the top bits for batchno, low bits for bucketno (as before).

Hopefully I got it right this time. At least it seems to be working for
cases that failed before (no file leaks, proper rowcounts so far).

regards
Tomas
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 0d9663c..db3a953 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -1900,18 +1900,20 @@ show_hash_info(HashState *hashstate, ExplainState *es)
 		if (es->format != EXPLAIN_FORMAT_TEXT)
 		{
 			ExplainPropertyLong("Hash Buckets", hashtable->nbuckets, es);
+			ExplainPropertyLong("Original Hash Buckets",
+								hashtable->nbuckets_original, es);
 			ExplainPropertyLong("Hash Batches", hashtable->nbatch, es);
 			ExplainPropertyLong("Original Hash Batches",
 								hashtable->nbatch_original, es);
 			ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es);
 		}
-		else if (hashtable->nbatch_original != hashtable->nbatch)
+		else if ((hashtable->nbatch_original != hashtable->nbatch) || (hashtable->nbuckets_original != hashtable->nbuckets))
 		{
 			appendStringInfoSpaces(es->str, es->indent * 2);
 			appendStringInfo(es->str,
-			"Buckets: %d  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
-							 hashtable->nbuckets, hashtable->nbatch,
-							 hashtable->nbatch_original, spacePeakKb);
+			"Buckets: %d (originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
+							 hashtable->nbuckets, hashtable->nbuckets_original,
+							 hashtable->nbatch, hashtable->nbatch_original, spacePeakKb);
 		}
 		else
 		{
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 589b2f1..48cca62 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -37,8 +37,10 @@
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
 
+bool enable_hashjoin_bucket = true;
 
 static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
+static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable);
 static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node,
 					  int mcvsToUse);
 static void ExecHashSkewTableInsert(HashJoinTable hashtable,
@@ -48,6 +50,33 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable,
 static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
 
 
+/*
+ * Compute appropriate size for hashtable given the estimated size of the
+ * relation to be hashed (number of rows and average row width).
+ *
+ * This is exported so that the planner's costsize.c can use it.
+ */
+
+/* Target bucket loading (tuples per bucket) */
+#define NTUP_PER_BUCKET			10
+
+/* Multiple of NTUP_PER_BUCKET triggering the increase of nbuckets.
+ * 
+ * Once we reach the threshold we double the number of buckets, and we
+ * want to get 1.0 on average (to get NTUP_PER_BUCKET on average). That
+ * means these two equations should hold
+ * 
+ *   b = 2a         (growth)
+ *   (a + b)/2 = 1  (oscillate around NTUP_PER_BUCKET)
+ * 
+ * which means b=1.3333 (a = b/2). If we wanted higher threshold, we
+ * could grow the nbuckets to (4*nbuckets), thus using (b=4a) for
+ * growth, leading to (b=1.6). Or (b=8a) giving 1.7777 etc.
+ * 
+ * Let's start with doubling the bucket count, i.e. 1.333. */
+#define NTUP_GROW_COEFFICIENT	1.333
+#define NTUP_GROW_THRESHOLD		(NTUP_PER_BUCKET * NTUP_GROW_COEFFICIENT)
+
 /* ----------------------------------------------------------------
  *		ExecHash
  *
@@ -116,6 +145,7 @@ MultiExecHash(HashState *node)
 				/* It's a skew tuple, so put it into that hash table */
 				ExecHashSkewTableInsert(hashtable, slot, hashvalue,
 										bucketNumber);
+				hashtable->skewTuples += 1;
 			}
 			else
 			{
@@ -126,6 +156,23 @@ MultiExecHash(HashState *node)
 		}
 	}
 
+	/* If average number of tuples per bucket is over the defined threshold,
+	 * increase the number of buckets to get below it. */
+	if (enable_hashjoin_bucket) {
+
+		/* consider only tuples in the non-skew buckets */
+		double nonSkewTuples = (hashtable->totalTuples - hashtable->skewTuples);
+
+		if ((nonSkewTuples / hashtable->nbatch) > (hashtable->nbuckets * NTUP_GROW_THRESHOLD)) {
+
+#ifdef HJDEBUG
+			printf("Increasing nbucket to %d (average per bucket = %.1f)\n",
+				   nbuckets,  (batchTuples / hashtable->nbuckets));
+#endif
+			ExecHashIncreaseNumBuckets(hashtable);
+		}
+	}
+
 	/* must provide our own instrumentation support */
 	if (node->ps.instrument)
 		InstrStopNode(node->ps.instrument, hashtable->totalTuples);
@@ -239,6 +286,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
 	int			nbatch;
 	int			num_skew_mcvs;
 	int			log2_nbuckets;
+	int			log2_nbatch;
 	int			nkeys;
 	int			i;
 	ListCell   *ho;
@@ -263,6 +311,10 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
 	log2_nbuckets = my_log2(nbuckets);
 	Assert(nbuckets == (1 << log2_nbuckets));
 
+	/* nbatch must be a power of 2 */
+	log2_nbatch = my_log2(nbatch);
+	Assert(nbuckets == (1 << log2_nbatch));
+
 	/*
 	 * Initialize the hash table control block.
 	 *
@@ -271,6 +323,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
 	 */
 	hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));
 	hashtable->nbuckets = nbuckets;
+	hashtable->nbuckets_original = nbuckets;
 	hashtable->log2_nbuckets = log2_nbuckets;
 	hashtable->buckets = NULL;
 	hashtable->keepNulls = keepNulls;
@@ -281,10 +334,12 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
 	hashtable->skewBucketNums = NULL;
 	hashtable->nbatch = nbatch;
 	hashtable->curbatch = 0;
+	hashtable->log2_nbatch = log2_nbatch;
 	hashtable->nbatch_original = nbatch;
 	hashtable->nbatch_outstart = nbatch;
 	hashtable->growEnabled = true;
 	hashtable->totalTuples = 0;
+	hashtable->skewTuples = 0;
 	hashtable->innerBatchFile = NULL;
 	hashtable->outerBatchFile = NULL;
 	hashtable->spaceUsed = 0;
@@ -375,17 +430,6 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
 	return hashtable;
 }
 
-
-/*
- * Compute appropriate size for hashtable given the estimated size of the
- * relation to be hashed (number of rows and average row width).
- *
- * This is exported so that the planner's costsize.c can use it.
- */
-
-/* Target bucket loading (tuples per bucket) */
-#define NTUP_PER_BUCKET			10
-
 void
 ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 						int *numbuckets,
@@ -605,6 +649,7 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	MemoryContextSwitchTo(oldcxt);
 
 	hashtable->nbatch = nbatch;
+	hashtable->log2_nbatch += 1;
 
 	/*
 	 * Scan through the existing hash table entries and dump out any that are
@@ -682,6 +727,116 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 }
 
 /*
+ * ExecHashIncreaseNumBuckets
+ *		increase the original number of buckets in order to reduce
+ *		number of tuples per bucket
+ */
+static void
+ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
+{
+	int			i;
+	int			oldnbuckets = hashtable->nbuckets;
+	HashJoinTuple  *oldbuckets = hashtable->buckets;
+	MemoryContext   oldcxt;
+	double		batchTuples = (hashtable->totalTuples / hashtable->nbatch);
+
+	/*
+	 * Determine the proper number of buckets, i.e. stop once the average
+	 * per bucket gets below the threshold (1.33 * NTUP_PER_BUCKET).
+	 * 
+	 * Also, check for overflow - this can only happen with extremely large
+	 * work_mem values, because (INT_MAX/2) means ~8GB only for the buckets.
+	 * With tuples, the hash table would require tens of GBs of work_mem.
+	 * 
+	 * XXX Technically there's also a limit for buckets fitting into work_mem
+	 * (with NTUP_PER_BUCKET tuples), but this can't be really exceeded
+	 * because when filling work_mem, another batch will be added (thus the
+	 * number of tuples will drop and more buckets won't be needed anymore).
+	 * 
+	 * That is, something like this will be enforced implicitly:
+	 * 
+	 *    work_mem * 1024L >= (nbuckets * tupsize * NTUP_GROW_THRESHOLD)
+	 * 
+	 * So it's enough to check only the overflow here.
+	 */
+
+	/* double the number of buckets until we get below the growth threshold, or
+	 * until we hit the overflow protection */
+	while ((batchTuples > (hashtable->nbuckets * NTUP_GROW_THRESHOLD))
+			&& (hashtable->nbuckets <= (INT_MAX/2))) {
+		hashtable->nbuckets *= 2;
+		hashtable->log2_nbuckets += 1;
+	}
+
+	/* XXX Not sure if we should update the info about used space here.
+	 * The code seems to ignore the space used for 'buckets' and we're not
+	 * allocating more space for tuples (just shuffling them to the new
+	 * buckets). And the amount of memory used for buckets is quite small
+	 * (just an array of pointers, thus ~8kB per 1k buckets on 64-bit). */
+
+	/* XXX Should we disable growth if (nbuckets * NTUP_PER_BUCKET)
+	 * reaches work_mem (or something like that)? We shouldn't really
+	 * get into such position (should be handled by increasing the
+	 * number of batches, which is called right before this). */
+
+	/* XXX Maybe adding info into hashjoin explain output (e.g. initial
+	 * nbuckets, time spent growing the table) would be appropriate. */
+
+	Assert(hashtable->nbuckets > 1);
+	Assert(hashtable->nbuckets <= (INT_MAX/2));
+	Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets));
+
+#ifdef HJDEBUG
+	printf("Increasing nbuckets to %d\n", hashtable->nbuckets);
+#endif
+
+	/* TODO Maybe it'd be better to resize the buckets in place (should be possible,
+	 * but when I tried it I always ended up with a strange infinite loop). */
+
+	/* allocate a new bucket list (use the batch context as before) */
+	oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
+
+	hashtable->buckets = (HashJoinTuple *)
+		palloc0(hashtable->nbuckets * sizeof(HashJoinTuple));
+
+	MemoryContextSwitchTo(oldcxt);
+
+	/* walk through the old buckets, move the buckets into the new table */
+	for (i = 0; i < oldnbuckets; i++)
+	{
+
+		HashJoinTuple tuple = oldbuckets[i];
+
+		while (tuple != NULL)
+		{
+			/* save link in case we delete */
+			HashJoinTuple nexttuple = tuple->next;
+			int			bucketno;
+			int			batchno;
+
+			ExecHashGetBucketAndBatch(hashtable, tuple->hashvalue,
+									  &bucketno, &batchno);
+
+			/* move it to the correct bucket (in the new bucket array) */
+			tuple->next = hashtable->buckets[bucketno];
+			hashtable->buckets[bucketno] = tuple;
+
+			/* process the next tuple */
+			tuple = nexttuple;
+		}
+	}
+
+	pfree(oldbuckets);
+
+#ifdef HJDEBUG
+	printf("Nbuckets increased to %d, average items per bucket %.1f\n",
+		   hashtable->nbuckets, batchTuples / hashtable->nbuckets);
+#endif
+	
+}
+
+
+/*
  * ExecHashTableInsert
  *		insert a tuple into the hash table depending on the hash value
  *		it may just go to a temp file for later batches
@@ -740,6 +895,7 @@ ExecHashTableInsert(HashJoinTable hashtable,
 			hashtable->spacePeak = hashtable->spaceUsed;
 		if (hashtable->spaceUsed > hashtable->spaceAllowed)
 			ExecHashIncreaseNumBatches(hashtable);
+
 	}
 	else
 	{
@@ -856,14 +1012,12 @@ ExecHashGetHashValue(HashJoinTable hashtable,
  * chains), and must only cause the batch number to remain the same or
  * increase.  Our algorithm is
  *		bucketno = hashvalue MOD nbuckets
- *		batchno = (hashvalue DIV nbuckets) MOD nbatch
+ *		batchno =  highers log2(nbatch) bits
  * where nbuckets and nbatch are both expected to be powers of 2, so we can
  * do the computations by shifting and masking.  (This assumes that all hash
  * functions are good about randomizing all their output bits, else we are
  * likely to have very skewed bucket or batch occupancy.)
  *
- * nbuckets doesn't change over the course of the join.
- *
  * nbatch is always a power of 2; we increase it only by doubling it.  This
  * effectively adds one more bit to the top of the batchno.
  */
@@ -880,7 +1034,7 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 	{
 		/* we can do MOD by masking, DIV by shifting */
 		*bucketno = hashvalue & (nbuckets - 1);
-		*batchno = (hashvalue >> hashtable->log2_nbuckets) & (nbatch - 1);
+		*batchno = (hashvalue >> (32 - hashtable->log2_nbatch));
 	}
 	else
 	{
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 3a31a75..c92cc26 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -788,6 +788,15 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 	{
+		{"enable_hashjoin_bucket", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("Enables dynamic increase of hash buckets."),
+			NULL
+		},
+		&enable_hashjoin_bucket,
+		true,
+		NULL, NULL, NULL
+	},
+	{
 		{"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
 			gettext_noop("Enables genetic query optimization."),
 			gettext_noop("This algorithm attempts to do planning without "
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 3beae40..82648ab 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -106,6 +106,7 @@ typedef struct HashSkewBucket
 typedef struct HashJoinTableData
 {
 	int			nbuckets;		/* # buckets in the in-memory hash table */
+	int			nbuckets_original;	/* # buckets when starting the first hash */
 	int			log2_nbuckets;	/* its log2 (nbuckets must be a power of 2) */
 
 	/* buckets[i] is head of list of tuples in i'th in-memory bucket */
@@ -122,6 +123,7 @@ typedef struct HashJoinTableData
 
 	int			nbatch;			/* number of batches */
 	int			curbatch;		/* current batch #; 0 during 1st pass */
+	int			log2_nbatch;	/* its log2 (nbuckets must be a power of 2) */
 
 	int			nbatch_original;	/* nbatch when we started inner scan */
 	int			nbatch_outstart;	/* nbatch when we started outer scan */
@@ -129,6 +131,7 @@ typedef struct HashJoinTableData
 	bool		growEnabled;	/* flag to shut off nbatch increases */
 
 	double		totalTuples;	/* # tuples obtained from inner plan */
+	double		skewTuples;		/* # tuples inserted into skew tuples */
 
 	/*
 	 * These arrays are allocated for the life of the hash join, but only if
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 75be5bd..15604cb 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -50,4 +50,6 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 						int *num_skew_mcvs);
 extern int	ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue);
 
+extern bool enable_hashjoin_bucket;
+
 #endif   /* NODEHASH_H */
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 75e2afb..60b8da8 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -62,6 +62,7 @@ extern bool enable_material;
 extern bool enable_mergejoin;
 extern bool enable_hashjoin;
 extern int	constraint_exclusion;
+extern bool enable_hashjoin_bucket;
 
 extern double clamp_row_est(double nrows);
 extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to