On 30.6.2014 23:12, Tomas Vondra wrote:
> Hi,
>
> attached is v5 of the patch. The main change is that scaling the number
> of buckets is done only once, after the initial hash table is build. The
> main advantage of this is lower price. This also allowed some cleanup of
> unecessary code.
>
> However, this new patch causes warning like this:
>
> WARNING: temporary file leak: File 231 still referenced
>
> I've been eyeballing the code for a while now, but I still fail to see
> where this comes from :-( Any ideas?
Meh, the patches are wrong - I haven't realized the tight coupling
between buckets/batches in ExecHashGetBucketAndBatch:
*bucketno = hashvalue & (nbuckets - 1);
*batchno = (hashvalue >> hashtable->log2_nbuckets) & (nbatch - 1);
The previous patches worked mostly by pure luck (the nbuckets was set
only in the first batch), but once I moved the code at the end, it
started failing. And by "worked" I mean "didn't throw an error, but
probably returned bogus results with (nbatch>1)".
However, ISTM this nbuckets-nbatch coupling is not really necessary,
it's only constructed this way to assign independent batch/bucket. So I
went and changed the code like this:
*bucketno = hashvalue & (nbuckets - 1);
*batchno = (hashvalue >> (32 - hashtable->log2_nbatch));
I.e. using the top bits for batchno, low bits for bucketno (as before).
Hopefully I got it right this time. At least it seems to be working for
cases that failed before (no file leaks, proper rowcounts so far).
regards
Tomas
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 0d9663c..db3a953 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -1900,18 +1900,20 @@ show_hash_info(HashState *hashstate, ExplainState *es)
if (es->format != EXPLAIN_FORMAT_TEXT)
{
ExplainPropertyLong("Hash Buckets", hashtable->nbuckets, es);
+ ExplainPropertyLong("Original Hash Buckets",
+ hashtable->nbuckets_original, es);
ExplainPropertyLong("Hash Batches", hashtable->nbatch, es);
ExplainPropertyLong("Original Hash Batches",
hashtable->nbatch_original, es);
ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es);
}
- else if (hashtable->nbatch_original != hashtable->nbatch)
+ else if ((hashtable->nbatch_original != hashtable->nbatch) || (hashtable->nbuckets_original != hashtable->nbuckets))
{
appendStringInfoSpaces(es->str, es->indent * 2);
appendStringInfo(es->str,
- "Buckets: %d Batches: %d (originally %d) Memory Usage: %ldkB\n",
- hashtable->nbuckets, hashtable->nbatch,
- hashtable->nbatch_original, spacePeakKb);
+ "Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n",
+ hashtable->nbuckets, hashtable->nbuckets_original,
+ hashtable->nbatch, hashtable->nbatch_original, spacePeakKb);
}
else
{
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 589b2f1..48cca62 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -37,8 +37,10 @@
#include "utils/lsyscache.h"
#include "utils/syscache.h"
+bool enable_hashjoin_bucket = true;
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
+static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable);
static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node,
int mcvsToUse);
static void ExecHashSkewTableInsert(HashJoinTable hashtable,
@@ -48,6 +50,33 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable,
static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
+/*
+ * Compute appropriate size for hashtable given the estimated size of the
+ * relation to be hashed (number of rows and average row width).
+ *
+ * This is exported so that the planner's costsize.c can use it.
+ */
+
+/* Target bucket loading (tuples per bucket) */
+#define NTUP_PER_BUCKET 10
+
+/* Multiple of NTUP_PER_BUCKET triggering the increase of nbuckets.
+ *
+ * Once we reach the threshold we double the number of buckets, and we
+ * want to get 1.0 on average (to get NTUP_PER_BUCKET on average). That
+ * means these two equations should hold
+ *
+ * b = 2a (growth)
+ * (a + b)/2 = 1 (oscillate around NTUP_PER_BUCKET)
+ *
+ * which means b=1.3333 (a = b/2). If we wanted higher threshold, we
+ * could grow the nbuckets to (4*nbuckets), thus using (b=4a) for
+ * growth, leading to (b=1.6). Or (b=8a) giving 1.7777 etc.
+ *
+ * Let's start with doubling the bucket count, i.e. 1.333. */
+#define NTUP_GROW_COEFFICIENT 1.333
+#define NTUP_GROW_THRESHOLD (NTUP_PER_BUCKET * NTUP_GROW_COEFFICIENT)
+
/* ----------------------------------------------------------------
* ExecHash
*
@@ -116,6 +145,7 @@ MultiExecHash(HashState *node)
/* It's a skew tuple, so put it into that hash table */
ExecHashSkewTableInsert(hashtable, slot, hashvalue,
bucketNumber);
+ hashtable->skewTuples += 1;
}
else
{
@@ -126,6 +156,23 @@ MultiExecHash(HashState *node)
}
}
+ /* If average number of tuples per bucket is over the defined threshold,
+ * increase the number of buckets to get below it. */
+ if (enable_hashjoin_bucket) {
+
+ /* consider only tuples in the non-skew buckets */
+ double nonSkewTuples = (hashtable->totalTuples - hashtable->skewTuples);
+
+ if ((nonSkewTuples / hashtable->nbatch) > (hashtable->nbuckets * NTUP_GROW_THRESHOLD)) {
+
+#ifdef HJDEBUG
+ printf("Increasing nbucket to %d (average per bucket = %.1f)\n",
+ nbuckets, (batchTuples / hashtable->nbuckets));
+#endif
+ ExecHashIncreaseNumBuckets(hashtable);
+ }
+ }
+
/* must provide our own instrumentation support */
if (node->ps.instrument)
InstrStopNode(node->ps.instrument, hashtable->totalTuples);
@@ -239,6 +286,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
int nbatch;
int num_skew_mcvs;
int log2_nbuckets;
+ int log2_nbatch;
int nkeys;
int i;
ListCell *ho;
@@ -263,6 +311,10 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
log2_nbuckets = my_log2(nbuckets);
Assert(nbuckets == (1 << log2_nbuckets));
+ /* nbatch must be a power of 2 */
+ log2_nbatch = my_log2(nbatch);
+ Assert(nbuckets == (1 << log2_nbatch));
+
/*
* Initialize the hash table control block.
*
@@ -271,6 +323,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
*/
hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));
hashtable->nbuckets = nbuckets;
+ hashtable->nbuckets_original = nbuckets;
hashtable->log2_nbuckets = log2_nbuckets;
hashtable->buckets = NULL;
hashtable->keepNulls = keepNulls;
@@ -281,10 +334,12 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
hashtable->skewBucketNums = NULL;
hashtable->nbatch = nbatch;
hashtable->curbatch = 0;
+ hashtable->log2_nbatch = log2_nbatch;
hashtable->nbatch_original = nbatch;
hashtable->nbatch_outstart = nbatch;
hashtable->growEnabled = true;
hashtable->totalTuples = 0;
+ hashtable->skewTuples = 0;
hashtable->innerBatchFile = NULL;
hashtable->outerBatchFile = NULL;
hashtable->spaceUsed = 0;
@@ -375,17 +430,6 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
return hashtable;
}
-
-/*
- * Compute appropriate size for hashtable given the estimated size of the
- * relation to be hashed (number of rows and average row width).
- *
- * This is exported so that the planner's costsize.c can use it.
- */
-
-/* Target bucket loading (tuples per bucket) */
-#define NTUP_PER_BUCKET 10
-
void
ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
int *numbuckets,
@@ -605,6 +649,7 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
MemoryContextSwitchTo(oldcxt);
hashtable->nbatch = nbatch;
+ hashtable->log2_nbatch += 1;
/*
* Scan through the existing hash table entries and dump out any that are
@@ -682,6 +727,116 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
}
/*
+ * ExecHashIncreaseNumBuckets
+ * increase the original number of buckets in order to reduce
+ * number of tuples per bucket
+ */
+static void
+ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
+{
+ int i;
+ int oldnbuckets = hashtable->nbuckets;
+ HashJoinTuple *oldbuckets = hashtable->buckets;
+ MemoryContext oldcxt;
+ double batchTuples = (hashtable->totalTuples / hashtable->nbatch);
+
+ /*
+ * Determine the proper number of buckets, i.e. stop once the average
+ * per bucket gets below the threshold (1.33 * NTUP_PER_BUCKET).
+ *
+ * Also, check for overflow - this can only happen with extremely large
+ * work_mem values, because (INT_MAX/2) means ~8GB only for the buckets.
+ * With tuples, the hash table would require tens of GBs of work_mem.
+ *
+ * XXX Technically there's also a limit for buckets fitting into work_mem
+ * (with NTUP_PER_BUCKET tuples), but this can't be really exceeded
+ * because when filling work_mem, another batch will be added (thus the
+ * number of tuples will drop and more buckets won't be needed anymore).
+ *
+ * That is, something like this will be enforced implicitly:
+ *
+ * work_mem * 1024L >= (nbuckets * tupsize * NTUP_GROW_THRESHOLD)
+ *
+ * So it's enough to check only the overflow here.
+ */
+
+ /* double the number of buckets until we get below the growth threshold, or
+ * until we hit the overflow protection */
+ while ((batchTuples > (hashtable->nbuckets * NTUP_GROW_THRESHOLD))
+ && (hashtable->nbuckets <= (INT_MAX/2))) {
+ hashtable->nbuckets *= 2;
+ hashtable->log2_nbuckets += 1;
+ }
+
+ /* XXX Not sure if we should update the info about used space here.
+ * The code seems to ignore the space used for 'buckets' and we're not
+ * allocating more space for tuples (just shuffling them to the new
+ * buckets). And the amount of memory used for buckets is quite small
+ * (just an array of pointers, thus ~8kB per 1k buckets on 64-bit). */
+
+ /* XXX Should we disable growth if (nbuckets * NTUP_PER_BUCKET)
+ * reaches work_mem (or something like that)? We shouldn't really
+ * get into such position (should be handled by increasing the
+ * number of batches, which is called right before this). */
+
+ /* XXX Maybe adding info into hashjoin explain output (e.g. initial
+ * nbuckets, time spent growing the table) would be appropriate. */
+
+ Assert(hashtable->nbuckets > 1);
+ Assert(hashtable->nbuckets <= (INT_MAX/2));
+ Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets));
+
+#ifdef HJDEBUG
+ printf("Increasing nbuckets to %d\n", hashtable->nbuckets);
+#endif
+
+ /* TODO Maybe it'd be better to resize the buckets in place (should be possible,
+ * but when I tried it I always ended up with a strange infinite loop). */
+
+ /* allocate a new bucket list (use the batch context as before) */
+ oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
+
+ hashtable->buckets = (HashJoinTuple *)
+ palloc0(hashtable->nbuckets * sizeof(HashJoinTuple));
+
+ MemoryContextSwitchTo(oldcxt);
+
+ /* walk through the old buckets, move the buckets into the new table */
+ for (i = 0; i < oldnbuckets; i++)
+ {
+
+ HashJoinTuple tuple = oldbuckets[i];
+
+ while (tuple != NULL)
+ {
+ /* save link in case we delete */
+ HashJoinTuple nexttuple = tuple->next;
+ int bucketno;
+ int batchno;
+
+ ExecHashGetBucketAndBatch(hashtable, tuple->hashvalue,
+ &bucketno, &batchno);
+
+ /* move it to the correct bucket (in the new bucket array) */
+ tuple->next = hashtable->buckets[bucketno];
+ hashtable->buckets[bucketno] = tuple;
+
+ /* process the next tuple */
+ tuple = nexttuple;
+ }
+ }
+
+ pfree(oldbuckets);
+
+#ifdef HJDEBUG
+ printf("Nbuckets increased to %d, average items per bucket %.1f\n",
+ hashtable->nbuckets, batchTuples / hashtable->nbuckets);
+#endif
+
+}
+
+
+/*
* ExecHashTableInsert
* insert a tuple into the hash table depending on the hash value
* it may just go to a temp file for later batches
@@ -740,6 +895,7 @@ ExecHashTableInsert(HashJoinTable hashtable,
hashtable->spacePeak = hashtable->spaceUsed;
if (hashtable->spaceUsed > hashtable->spaceAllowed)
ExecHashIncreaseNumBatches(hashtable);
+
}
else
{
@@ -856,14 +1012,12 @@ ExecHashGetHashValue(HashJoinTable hashtable,
* chains), and must only cause the batch number to remain the same or
* increase. Our algorithm is
* bucketno = hashvalue MOD nbuckets
- * batchno = (hashvalue DIV nbuckets) MOD nbatch
+ * batchno = highers log2(nbatch) bits
* where nbuckets and nbatch are both expected to be powers of 2, so we can
* do the computations by shifting and masking. (This assumes that all hash
* functions are good about randomizing all their output bits, else we are
* likely to have very skewed bucket or batch occupancy.)
*
- * nbuckets doesn't change over the course of the join.
- *
* nbatch is always a power of 2; we increase it only by doubling it. This
* effectively adds one more bit to the top of the batchno.
*/
@@ -880,7 +1034,7 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
{
/* we can do MOD by masking, DIV by shifting */
*bucketno = hashvalue & (nbuckets - 1);
- *batchno = (hashvalue >> hashtable->log2_nbuckets) & (nbatch - 1);
+ *batchno = (hashvalue >> (32 - hashtable->log2_nbatch));
}
else
{
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 3a31a75..c92cc26 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -788,6 +788,15 @@ static struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
{
+ {"enable_hashjoin_bucket", PGC_USERSET, QUERY_TUNING_METHOD,
+ gettext_noop("Enables dynamic increase of hash buckets."),
+ NULL
+ },
+ &enable_hashjoin_bucket,
+ true,
+ NULL, NULL, NULL
+ },
+ {
{"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
gettext_noop("Enables genetic query optimization."),
gettext_noop("This algorithm attempts to do planning without "
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 3beae40..82648ab 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -106,6 +106,7 @@ typedef struct HashSkewBucket
typedef struct HashJoinTableData
{
int nbuckets; /* # buckets in the in-memory hash table */
+ int nbuckets_original; /* # buckets when starting the first hash */
int log2_nbuckets; /* its log2 (nbuckets must be a power of 2) */
/* buckets[i] is head of list of tuples in i'th in-memory bucket */
@@ -122,6 +123,7 @@ typedef struct HashJoinTableData
int nbatch; /* number of batches */
int curbatch; /* current batch #; 0 during 1st pass */
+ int log2_nbatch; /* its log2 (nbuckets must be a power of 2) */
int nbatch_original; /* nbatch when we started inner scan */
int nbatch_outstart; /* nbatch when we started outer scan */
@@ -129,6 +131,7 @@ typedef struct HashJoinTableData
bool growEnabled; /* flag to shut off nbatch increases */
double totalTuples; /* # tuples obtained from inner plan */
+ double skewTuples; /* # tuples inserted into skew tuples */
/*
* These arrays are allocated for the life of the hash join, but only if
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 75be5bd..15604cb 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -50,4 +50,6 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
int *num_skew_mcvs);
extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue);
+extern bool enable_hashjoin_bucket;
+
#endif /* NODEHASH_H */
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 75e2afb..60b8da8 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -62,6 +62,7 @@ extern bool enable_material;
extern bool enable_mergejoin;
extern bool enable_hashjoin;
extern int constraint_exclusion;
+extern bool enable_hashjoin_bucket;
extern double clamp_row_est(double nrows);
extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers