On Sun, 2014-08-10 at 14:26 -0700, Jeff Davis wrote: > This patch is requires the Memory Accounting patch, or something similar > to track memory usage. > > The attached patch enables hashagg to spill to disk, which means that > hashagg will contain itself to work_mem even if the planner makes a > bad misestimate of the cardinality.
New patch attached. All open items are complete, though the patch may have a few rough edges. Summary of changes: * rebased on top of latest memory accounting patch http://www.postgresql.org/message-id/1417497257.5584.5.camel@jeff-desktop * added a flag to hash_create to prevent it from creating an extra level of memory context - without this, the memory accounting would have a measurable impact on performance * cost model for the disk usage * intelligently choose the number of partitions for each pass of the data * explain support * in build_hash_table(), be more intelligent about the value of nbuckets to pass to BuildTupleHashTable() - BuildTupleHashTable tries to choose a value to keep the table in work_mem, but it isn't very accurate. * some very rudimentary testing (sanity checks, really) shows good results Summary of previous discussion (my summary; I may have missed some points): Tom Lane requested that the patch also handle the case where transition values grow (e.g. array_agg) beyond work_mem. I feel this patch provides a lot of benefit as it is, and trying to handle that case would be a lot more work (we need a way to write the transition values out to disk at a minimum, and perhaps combine them with other transition values). I also don't think my patch would interfere with a fix there in the future. Tomas Vondra suggested an alternative design that more closely resembles HashJoin: instead of filling up the hash table and then spilling any new groups, the idea would be to split the current data into two partitions, keep one in the hash table, and spill the other (see ExecHashIncreaseNumBatches()). This has the advantage that it's very fast to identify whether the tuple is part of the in-memory batch or not; and we can avoid even looking in the memory hashtable if not. The batch-splitting approach has a major downside, however: you are likely to evict a skew value from the in-memory batch, which will result in all subsequent tuples with that skew value going to disk. My approach never evicts from the in-memory table until we actually finalize the groups, so the skew values are likely to be completely processed in the first pass. So, the attached patch implements my original approach, which I still feel is the best solution. Regards, Jeff Davis
*** a/doc/src/sgml/config.sgml --- b/doc/src/sgml/config.sgml *************** *** 3017,3022 **** include_dir 'conf.d' --- 3017,3037 ---- </listitem> </varlistentry> + <varlistentry id="guc-enable-hashagg-disk" xreflabel="enable_hashagg_disk"> + <term><varname>enable_hashagg_disk</varname> (<type>boolean</type>) + <indexterm> + <primary><varname>enable_hashagg_disk</> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Enables or disables the query planner's use of hashed aggregation plan + types when the planner expects the hash table size to exceed + <varname>work_mem</varname>. The default is <literal>on</>. + </para> + </listitem> + </varlistentry> + <varlistentry id="guc-enable-hashjoin" xreflabel="enable_hashjoin"> <term><varname>enable_hashjoin</varname> (<type>boolean</type>) <indexterm> *** a/src/backend/commands/explain.c --- b/src/backend/commands/explain.c *************** *** 86,91 **** static void show_sort_group_keys(PlanState *planstate, const char *qlabel, --- 86,92 ---- List *ancestors, ExplainState *es); static void show_sort_info(SortState *sortstate, ExplainState *es); static void show_hash_info(HashState *hashstate, ExplainState *es); + static void show_hashagg_info(AggState *hashstate, ExplainState *es); static void show_tidbitmap_info(BitmapHeapScanState *planstate, ExplainState *es); static void show_instrumentation_count(const char *qlabel, int which, *************** *** 1423,1428 **** ExplainNode(PlanState *planstate, List *ancestors, --- 1424,1430 ---- case T_Agg: show_agg_keys((AggState *) planstate, ancestors, es); show_upper_qual(plan->qual, "Filter", planstate, ancestors, es); + show_hashagg_info((AggState *) planstate, es); if (plan->qual) show_instrumentation_count("Rows Removed by Filter", 1, planstate, es); *************** *** 1913,1918 **** show_sort_info(SortState *sortstate, ExplainState *es) --- 1915,1956 ---- } /* + * Show information on hash aggregate buckets and batches + */ + static void + show_hashagg_info(AggState *aggstate, ExplainState *es) + { + Agg *agg = (Agg *)aggstate->ss.ps.plan; + + Assert(IsA(aggstate, AggState)); + + if (agg->aggstrategy != AGG_HASHED) + return; + + if (!aggstate->hash_init_state) + { + long memPeakKb = (aggstate->hash_mem_peak + 1023) / 1024; + long diskKb = (aggstate->hash_disk + 1023) / 1024; + + if (es->format == EXPLAIN_FORMAT_TEXT) + { + appendStringInfoSpaces(es->str, es->indent * 2); + appendStringInfo( + es->str, + "Batches: %d Memory Usage: %ldkB Disk Usage:%ldkB\n", + aggstate->hash_num_batches, memPeakKb, diskKb); + } + else + { + ExplainPropertyLong("HashAgg Batches", + aggstate->hash_num_batches, es); + ExplainPropertyLong("Peak Memory Usage", memPeakKb, es); + ExplainPropertyLong("Disk Usage", diskKb, es); + } + } + } + + /* * Show information on hash buckets/batches. */ static void *** a/src/backend/executor/execGrouping.c --- b/src/backend/executor/execGrouping.c *************** *** 310,316 **** BuildTupleHashTable(int numCols, AttrNumber *keyColIdx, hash_ctl.hcxt = tablecxt; hashtable->hashtab = hash_create("TupleHashTable", nbuckets, &hash_ctl, ! HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT); return hashtable; } --- 310,317 ---- hash_ctl.hcxt = tablecxt; hashtable->hashtab = hash_create("TupleHashTable", nbuckets, &hash_ctl, ! HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | ! HASH_CONTEXT | HASH_NOCHILDCXT); return hashtable; } *************** *** 331,336 **** TupleHashEntry --- 332,386 ---- LookupTupleHashEntry(TupleHashTable hashtable, TupleTableSlot *slot, bool *isnew) { + uint32 hashvalue; + + hashvalue = TupleHashEntryHash(hashtable, slot); + return LookupTupleHashEntryHash(hashtable, slot, hashvalue, isnew); + } + + /* + * TupleHashEntryHash + * + * Calculate the hash value of the tuple. + */ + uint32 + TupleHashEntryHash(TupleHashTable hashtable, TupleTableSlot *slot) + { + TupleHashEntryData dummy; + TupleHashTable saveCurHT; + uint32 hashvalue; + + /* + * Set up data needed by hash function. + * + * We save and restore CurTupleHashTable just in case someone manages to + * invoke this code re-entrantly. + */ + hashtable->inputslot = slot; + hashtable->in_hash_funcs = hashtable->tab_hash_funcs; + hashtable->cur_eq_funcs = hashtable->tab_eq_funcs; + + saveCurHT = CurTupleHashTable; + CurTupleHashTable = hashtable; + + dummy.firstTuple = NULL; /* flag to reference inputslot */ + hashvalue = TupleHashTableHash(&dummy, sizeof(TupleHashEntryData)); + + CurTupleHashTable = saveCurHT; + + return hashvalue; + } + + /* + * LookupTupleHashEntryHash + * + * Like LookupTupleHashEntry, but allows the caller to specify the tuple's + * hash value, to avoid recalculating it. + */ + TupleHashEntry + LookupTupleHashEntryHash(TupleHashTable hashtable, TupleTableSlot *slot, + uint32 hashvalue, bool *isnew) + { TupleHashEntry entry; MemoryContext oldContext; TupleHashTable saveCurHT; *************** *** 371,380 **** LookupTupleHashEntry(TupleHashTable hashtable, TupleTableSlot *slot, /* Search the hash table */ dummy.firstTuple = NULL; /* flag to reference inputslot */ ! entry = (TupleHashEntry) hash_search(hashtable->hashtab, ! &dummy, ! isnew ? HASH_ENTER : HASH_FIND, ! &found); if (isnew) { --- 421,429 ---- /* Search the hash table */ dummy.firstTuple = NULL; /* flag to reference inputslot */ ! entry = (TupleHashEntry) hash_search_with_hash_value( ! hashtable->hashtab, &dummy, hashvalue, isnew ? HASH_ENTER : HASH_FIND, ! &found); if (isnew) { *** a/src/backend/executor/nodeAgg.c --- b/src/backend/executor/nodeAgg.c *************** *** 96,101 **** --- 96,103 ---- #include "postgres.h" + #include <math.h> + #include "access/htup_details.h" #include "catalog/objectaccess.h" #include "catalog/pg_aggregate.h" *************** *** 108,121 **** --- 110,127 ---- #include "optimizer/tlist.h" #include "parser/parse_agg.h" #include "parser/parse_coerce.h" + #include "storage/buffile.h" #include "utils/acl.h" #include "utils/builtins.h" + #include "utils/dynahash.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" #include "utils/tuplesort.h" #include "utils/datum.h" + #define HASH_DISK_MIN_PARTITIONS 1 + #define HASH_DISK_MAX_PARTITIONS 256 /* * AggStatePerAggData - per-aggregate working state for the Agg scan *************** *** 301,306 **** typedef struct AggHashEntryData --- 307,323 ---- AggStatePerGroupData pergroup[1]; /* VARIABLE LENGTH ARRAY */ } AggHashEntryData; /* VARIABLE LENGTH STRUCT */ + typedef struct HashWork + { + BufFile *input_file; /* input partition, NULL for outer plan */ + int input_bits; /* number of bits for input partition mask */ + int64 input_groups; /* estimated number of input groups */ + + int n_output_partitions; /* number of output partitions */ + BufFile **output_partitions; /* output partition files */ + int64 *output_ntuples; /* number of tuples in each partition */ + int output_bits; /* log2(n_output_partitions) + input_bits */ + } HashWork; static void initialize_aggregates(AggState *aggstate, AggStatePerAgg peragg, *************** *** 321,331 **** static void finalize_aggregate(AggState *aggstate, Datum *resultVal, bool *resultIsNull); static Bitmapset *find_unaggregated_cols(AggState *aggstate); static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos); ! static void build_hash_table(AggState *aggstate); ! static AggHashEntry lookup_hash_entry(AggState *aggstate, ! TupleTableSlot *inputslot); static TupleTableSlot *agg_retrieve_direct(AggState *aggstate); ! static void agg_fill_hash_table(AggState *aggstate); static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate); static Datum GetAggInitVal(Datum textInitVal, Oid transtype); --- 338,352 ---- Datum *resultVal, bool *resultIsNull); static Bitmapset *find_unaggregated_cols(AggState *aggstate); static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos); ! static void build_hash_table(AggState *aggstate, long nbuckets); ! static AggHashEntry lookup_hash_entry(AggState *aggstate, HashWork *work, ! TupleTableSlot *inputslot, uint32 hashvalue); ! static HashWork *hash_work(BufFile *input_file, int64 input_groups, ! int input_bits); ! static void save_tuple(AggState *aggstate, HashWork *work, ! TupleTableSlot *slot, uint32 hashvalue); static TupleTableSlot *agg_retrieve_direct(AggState *aggstate); ! static bool agg_fill_hash_table(AggState *aggstate); static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate); static Datum GetAggInitVal(Datum textInitVal, Oid transtype); *************** *** 923,942 **** find_unaggregated_cols_walker(Node *node, Bitmapset **colnos) } /* * Initialize the hash table to empty. * * The hash table always lives in the aggcontext memory context. */ static void ! build_hash_table(AggState *aggstate) { Agg *node = (Agg *) aggstate->ss.ps.plan; MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory; Size entrysize; Assert(node->aggstrategy == AGG_HASHED); Assert(node->numGroups > 0); entrysize = sizeof(AggHashEntryData) + (aggstate->numaggs - 1) * sizeof(AggStatePerGroupData); --- 944,989 ---- } /* + * Estimate all memory used by a group in the hash table. + */ + Size + hash_group_size(int numAggs, int inputWidth, Size transitionSpace) + { + Size size; + + /* tuple overhead */ + size = MAXALIGN(sizeof(MinimalTupleData)); + /* group key */ + size += MAXALIGN(inputWidth); + /* hash table overhead */ + size += hash_agg_entry_size(numAggs); + /* by-ref transition space */ + size += transitionSpace; + + return size; + } + + /* * Initialize the hash table to empty. * * The hash table always lives in the aggcontext memory context. */ static void ! build_hash_table(AggState *aggstate, long nbuckets) { Agg *node = (Agg *) aggstate->ss.ps.plan; MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory; Size entrysize; + Size hash_group_mem = hash_group_size(aggstate->numaggs, + node->plan_width, + node->transitionSpace); Assert(node->aggstrategy == AGG_HASHED); Assert(node->numGroups > 0); + /* don't exceed work_mem */ + nbuckets = Min(nbuckets, (long) ((work_mem * 1024L) / hash_group_mem)); + entrysize = sizeof(AggHashEntryData) + (aggstate->numaggs - 1) * sizeof(AggStatePerGroupData); *************** *** 944,953 **** build_hash_table(AggState *aggstate) node->grpColIdx, aggstate->eqfunctions, aggstate->hashfunctions, ! node->numGroups, entrysize, ! aggstate->aggcontext, tmpmem); } /* --- 991,1006 ---- node->grpColIdx, aggstate->eqfunctions, aggstate->hashfunctions, ! nbuckets, entrysize, ! aggstate->hashcontext, tmpmem); + + aggstate->hash_mem_min = MemoryContextMemAllocated( + aggstate->hashcontext, true); + + if (aggstate->hash_mem_min > aggstate->hash_mem_peak) + aggstate->hash_mem_peak = aggstate->hash_mem_min; } /* *************** *** 1024,1035 **** hash_agg_entry_size(int numAggs) * When called, CurrentMemoryContext should be the per-query context. */ static AggHashEntry ! lookup_hash_entry(AggState *aggstate, TupleTableSlot *inputslot) { TupleTableSlot *hashslot = aggstate->hashslot; ListCell *l; AggHashEntry entry; ! bool isnew; /* if first time through, initialize hashslot by cloning input slot */ if (hashslot->tts_tupleDescriptor == NULL) --- 1077,1091 ---- * When called, CurrentMemoryContext should be the per-query context. */ static AggHashEntry ! lookup_hash_entry(AggState *aggstate, HashWork *work, ! TupleTableSlot *inputslot, uint32 hashvalue) { TupleTableSlot *hashslot = aggstate->hashslot; ListCell *l; AggHashEntry entry; ! int64 hash_mem; ! bool isnew = false; ! bool *p_isnew; /* if first time through, initialize hashslot by cloning input slot */ if (hashslot->tts_tupleDescriptor == NULL) *************** *** 1049,1058 **** lookup_hash_entry(AggState *aggstate, TupleTableSlot *inputslot) hashslot->tts_isnull[varNumber] = inputslot->tts_isnull[varNumber]; } /* find or create the hashtable entry using the filtered tuple */ ! entry = (AggHashEntry) LookupTupleHashEntry(aggstate->hashtable, ! hashslot, ! &isnew); if (isnew) { --- 1105,1124 ---- hashslot->tts_isnull[varNumber] = inputslot->tts_isnull[varNumber]; } + hash_mem = MemoryContextMemAllocated(aggstate->hashcontext, true); + if (hash_mem > aggstate->hash_mem_peak) + aggstate->hash_mem_peak = hash_mem; + + if (hash_mem <= aggstate->hash_mem_min || + hash_mem < work_mem * 1024L) + p_isnew = &isnew; + else + p_isnew = NULL; + /* find or create the hashtable entry using the filtered tuple */ ! entry = (AggHashEntry) LookupTupleHashEntryHash(aggstate->hashtable, ! hashslot, hashvalue, ! p_isnew); if (isnew) { *************** *** 1060,1068 **** lookup_hash_entry(AggState *aggstate, TupleTableSlot *inputslot) --- 1126,1291 ---- initialize_aggregates(aggstate, aggstate->peragg, entry->pergroup); } + if (entry == NULL) + save_tuple(aggstate, work, inputslot, hashvalue); + return entry; } + + /* + * hash_work + * + * Construct a HashWork item, which represents one iteration of HashAgg to be + * done. Should be called in the aggregate's memory context. + */ + static HashWork * + hash_work(BufFile *input_file, int64 input_groups, int input_bits) + { + HashWork *work = palloc(sizeof(HashWork)); + + work->input_file = input_file; + work->input_bits = input_bits; + work->input_groups = input_groups; + + /* + * Will be set only if we run out of memory and need to partition an + * additional level. + */ + work->n_output_partitions = 0; + work->output_partitions = NULL; + work->output_ntuples = NULL; + work->output_bits = 0; + + return work; + } + + /* + * save_tuple + * + * Not enough memory to add tuple as new entry in hash table. Save for later + * in the appropriate partition. + */ + static void + save_tuple(AggState *aggstate, HashWork *work, TupleTableSlot *slot, + uint32 hashvalue) + { + int partition; + MinimalTuple tuple; + BufFile *file; + int written; + + if (work->output_partitions == NULL) + { + Agg *agg = (Agg *) aggstate->ss.ps.plan; + Size group_size = hash_group_size(aggstate->numaggs, + agg->plan_width, + agg->transitionSpace); + double total_size = group_size * work->input_groups; + int npartitions; + int partition_bits; + + /* + * Try to make enough partitions so that each one fits in work_mem, + * with a little slop. + */ + npartitions = ceil ( (1.5 * total_size) / (work_mem * 1024L) ); + + if (npartitions < HASH_DISK_MIN_PARTITIONS) + npartitions = HASH_DISK_MIN_PARTITIONS; + if (npartitions > HASH_DISK_MAX_PARTITIONS) + npartitions = HASH_DISK_MAX_PARTITIONS; + + partition_bits = my_log2(npartitions); + + /* make sure that we don't exhaust the hash bits */ + if (partition_bits + work->input_bits >= 32) + partition_bits = 32 - work->input_bits; + + /* number of partitions will be a power of two */ + npartitions = 1L << partition_bits; + + work->output_bits = partition_bits; + work->n_output_partitions = npartitions; + work->output_partitions = palloc0(sizeof(BufFile *) * npartitions); + work->output_ntuples = palloc0(sizeof(int64) * npartitions); + } + + if (work->output_bits == 0) + partition = 0; + else + partition = (hashvalue << work->input_bits) >> + (32 - work->output_bits); + + work->output_ntuples[partition]++; + + if (work->output_partitions[partition] == NULL) + work->output_partitions[partition] = BufFileCreateTemp(false); + file = work->output_partitions[partition]; + + tuple = ExecFetchSlotMinimalTuple(slot); + + written = BufFileWrite(file, (void *) &hashvalue, sizeof(uint32)); + if (written != sizeof(uint32)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to HashAgg temporary file: %m"))); + aggstate->hash_disk += written; + + written = BufFileWrite(file, (void *) tuple, tuple->t_len); + if (written != tuple->t_len) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to HashAgg temporary file: %m"))); + aggstate->hash_disk += written; + } + + + /* + * read_saved_tuple + * read the next tuple from a batch file. Return NULL if no more. + * + * On success, *hashvalue is set to the tuple's hash value, and the tuple + * itself is stored in the given slot. + * + * Copied with minor modifications from ExecHashJoinGetSavedTuple. + */ + static TupleTableSlot * + read_saved_tuple(BufFile *file, uint32 *hashvalue, TupleTableSlot *tupleSlot) + { + uint32 header[2]; + size_t nread; + MinimalTuple tuple; + + /* + * Since both the hash value and the MinimalTuple length word are uint32, + * we can read them both in one BufFileRead() call without any type + * cheating. + */ + nread = BufFileRead(file, (void *) header, sizeof(header)); + if (nread == 0) /* end of file */ + { + ExecClearTuple(tupleSlot); + return NULL; + } + if (nread != sizeof(header)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from HashAgg temporary file: %m"))); + *hashvalue = header[0]; + tuple = (MinimalTuple) palloc(header[1]); + tuple->t_len = header[1]; + nread = BufFileRead(file, + (void *) ((char *) tuple + sizeof(uint32)), + header[1] - sizeof(uint32)); + if (nread != header[1] - sizeof(uint32)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from HashAgg temporary file: %m"))); + return ExecStoreMinimalTuple(tuple, tupleSlot, true); + } + + /* * ExecAgg - * *************** *** 1107,1115 **** ExecAgg(AggState *node) /* Dispatch based on strategy */ if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED) { ! if (!node->table_filled) ! agg_fill_hash_table(node); ! return agg_retrieve_hash_table(node); } else return agg_retrieve_direct(node); --- 1330,1345 ---- /* Dispatch based on strategy */ if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED) { ! TupleTableSlot *slot = NULL; ! ! while (slot == NULL) ! { ! if (!node->table_filled) ! if (!agg_fill_hash_table(node)) ! break; ! slot = agg_retrieve_hash_table(node); ! } ! return slot; } else return agg_retrieve_direct(node); *************** *** 1325,1337 **** agg_retrieve_direct(AggState *aggstate) /* * ExecAgg for hashed case: phase 1, read input and build hash table */ ! static void agg_fill_hash_table(AggState *aggstate) { PlanState *outerPlan; ExprContext *tmpcontext; AggHashEntry entry; TupleTableSlot *outerslot; /* * get state info from node --- 1555,1569 ---- /* * ExecAgg for hashed case: phase 1, read input and build hash table */ ! static bool agg_fill_hash_table(AggState *aggstate) { PlanState *outerPlan; ExprContext *tmpcontext; AggHashEntry entry; TupleTableSlot *outerslot; + HashWork *work; + int i; /* * get state info from node *************** *** 1340,1359 **** agg_fill_hash_table(AggState *aggstate) /* tmpcontext is the per-input-tuple expression context */ tmpcontext = aggstate->tmpcontext; /* * Process each outer-plan tuple, and then fetch the next one, until we * exhaust the outer plan. */ for (;;) { ! outerslot = ExecProcNode(outerPlan); ! if (TupIsNull(outerslot)) ! break; /* set up for advance_aggregates call */ tmpcontext->ecxt_outertuple = outerslot; /* Find or build hashtable entry for this tuple's group */ ! entry = lookup_hash_entry(aggstate, outerslot); /* Advance the aggregates */ advance_aggregates(aggstate, entry->pergroup); --- 1572,1640 ---- /* tmpcontext is the per-input-tuple expression context */ tmpcontext = aggstate->tmpcontext; + if (aggstate->hash_work == NIL) + { + aggstate->agg_done = true; + return false; + } + + work = linitial(aggstate->hash_work); + aggstate->hash_work = list_delete_first(aggstate->hash_work); + + /* if not the first time through, reinitialize */ + if (!aggstate->hash_init_state) + { + long nbuckets; + Agg *node = (Agg *) aggstate->ss.ps.plan; + + MemoryContextResetAndDeleteChildren(aggstate->hashcontext); + + /* + * If this table will hold only a partition of the input, then use a + * proportionally smaller estimate for nbuckets. + */ + nbuckets = node->numGroups >> work->input_bits; + + build_hash_table(aggstate, nbuckets); + } + + aggstate->hash_init_state = false; + /* * Process each outer-plan tuple, and then fetch the next one, until we * exhaust the outer plan. */ for (;;) { ! uint32 hashvalue; ! ! CHECK_FOR_INTERRUPTS(); ! ! if (work->input_file == NULL) ! { ! outerslot = ExecProcNode(outerPlan); ! if (TupIsNull(outerslot)) ! break; ! ! hashvalue = TupleHashEntryHash(aggstate->hashtable, outerslot); ! } ! else ! { ! outerslot = read_saved_tuple(work->input_file, &hashvalue, ! aggstate->hashslot); ! if (TupIsNull(outerslot)) ! { ! BufFileClose(work->input_file); ! work->input_file = NULL; ! break; ! } ! } ! /* set up for advance_aggregates call */ tmpcontext->ecxt_outertuple = outerslot; /* Find or build hashtable entry for this tuple's group */ ! entry = lookup_hash_entry(aggstate, work, outerslot, hashvalue); /* Advance the aggregates */ advance_aggregates(aggstate, entry->pergroup); *************** *** 1362,1370 **** agg_fill_hash_table(AggState *aggstate) --- 1643,1697 ---- ResetExprContext(tmpcontext); } + if (work->input_file) + BufFileClose(work->input_file); + + /* add each output partition as a new work item */ + for (i = 0; i < work->n_output_partitions; i++) + { + BufFile *file = work->output_partitions[i]; + MemoryContext oldContext; + HashWork *new_work; + int64 input_ngroups; + + /* partition is empty */ + if (file == NULL) + continue; + + /* rewind file for reading */ + if (BufFileSeek(file, 0, 0L, SEEK_SET)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rewind HashAgg temporary file: %m"))); + + /* + * Estimate the number of input groups for this new work item as the + * total number of tuples in its input file. Although that's a worst + * case, it's not bad here for two reasons: (1) overestimating is + * better than underestimating; and (2) we've already scanned the + * relation once, so it's likely that we've already finalized many of + * the common values. + */ + input_ngroups = work->output_ntuples[i]; + + oldContext = MemoryContextSwitchTo(aggstate->aggcontext); + new_work = hash_work(file, + input_ngroups, + work->output_bits + work->input_bits); + aggstate->hash_work = lappend( + aggstate->hash_work, + new_work); + aggstate->hash_num_batches++; + MemoryContextSwitchTo(oldContext); + } + + pfree(work); + aggstate->table_filled = true; /* Initialize to walk the hash table */ ResetTupleHashIterator(aggstate->hashtable, &aggstate->hashiter); + + return true; } /* *************** *** 1396,1411 **** agg_retrieve_hash_table(AggState *aggstate) * We loop retrieving groups until we find one satisfying * aggstate->ss.ps.qual */ ! while (!aggstate->agg_done) { /* * Find the next entry in the hash table */ entry = (AggHashEntry) ScanTupleHashTable(&aggstate->hashiter); if (entry == NULL) { ! /* No more entries in hashtable, so done */ ! aggstate->agg_done = TRUE; return NULL; } --- 1723,1740 ---- * We loop retrieving groups until we find one satisfying * aggstate->ss.ps.qual */ ! for (;;) { + CHECK_FOR_INTERRUPTS(); + /* * Find the next entry in the hash table */ entry = (AggHashEntry) ScanTupleHashTable(&aggstate->hashiter); if (entry == NULL) { ! /* No more entries in hashtable, so done with this batch */ ! aggstate->table_filled = false; return NULL; } *************** *** 1636,1645 **** ExecInitAgg(Agg *node, EState *estate, int eflags) if (node->aggstrategy == AGG_HASHED) { ! build_hash_table(aggstate); aggstate->table_filled = false; /* Compute the columns we actually need to hash on */ aggstate->hash_needed = find_hash_columns(aggstate); } else { --- 1965,1997 ---- if (node->aggstrategy == AGG_HASHED) { ! MemoryContext oldContext; ! ! aggstate->hash_mem_min = 0; ! aggstate->hash_mem_peak = 0; ! aggstate->hash_num_batches = 0; ! aggstate->hash_init_state = true; aggstate->table_filled = false; + aggstate->hash_disk = 0; + + aggstate->hashcontext = + AllocSetContextCreate(aggstate->aggcontext, + "HashAgg Hash Table Context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + build_hash_table(aggstate, node->numGroups); + /* Compute the columns we actually need to hash on */ aggstate->hash_needed = find_hash_columns(aggstate); + + /* prime with initial work item to read from outer plan */ + oldContext = MemoryContextSwitchTo(aggstate->aggcontext); + aggstate->hash_work = lappend(aggstate->hash_work, + hash_work(NULL, node->numGroups, 0)); + aggstate->hash_num_batches++; + MemoryContextSwitchTo(oldContext); } else { *************** *** 2048,2079 **** ExecEndAgg(AggState *node) void ExecReScanAgg(AggState *node) { ExprContext *econtext = node->ss.ps.ps_ExprContext; ! int aggno; node->agg_done = false; node->ss.ps.ps_TupFromTlist = false; ! if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED) { /* ! * In the hashed case, if we haven't yet built the hash table then we ! * can just return; nothing done yet, so nothing to undo. If subnode's ! * chgParam is not NULL then it will be re-scanned by ExecProcNode, ! * else no reason to re-scan it at all. */ ! if (!node->table_filled) return; /* ! * If we do have the hash table and the subplan does not have any ! * parameter changes, then we can just rescan the existing hash table; ! * no need to build it again. */ ! if (node->ss.ps.lefttree->chgParam == NULL) { ResetTupleHashIterator(node->hashtable, &node->hashiter); return; } } --- 2400,2433 ---- void ExecReScanAgg(AggState *node) { + Agg *agg = (Agg *) node->ss.ps.plan; ExprContext *econtext = node->ss.ps.ps_ExprContext; ! int aggno; node->agg_done = false; node->ss.ps.ps_TupFromTlist = false; ! if (agg->aggstrategy == AGG_HASHED) { /* ! * In the hashed case, if we haven't done any execution work yet, we ! * can just return; nothing to undo. If subnode's chgParam is not NULL ! * then it will be re-scanned by ExecProcNode, else no reason to ! * re-scan it at all. */ ! if (node->hash_init_state) return; /* ! * If we do have the hash table, it never went to disk, and the ! * subplan does not have any parameter changes, then we can just ! * rescan the existing hash table; no need to build it again. */ ! if (node->ss.ps.lefttree->chgParam == NULL && node->hash_disk == 0) { ResetTupleHashIterator(node->hashtable, &node->hashiter); + node->table_filled = true; return; } } *************** *** 2110,2120 **** ExecReScanAgg(AggState *node) */ MemoryContextResetAndDeleteChildren(node->aggcontext); ! if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED) { /* Rebuild an empty hash table */ ! build_hash_table(node); node->table_filled = false; } else { --- 2464,2493 ---- */ MemoryContextResetAndDeleteChildren(node->aggcontext); ! if (agg->aggstrategy == AGG_HASHED) { + MemoryContext oldContext; + + node->hashcontext = + AllocSetContextCreate(node->aggcontext, + "HashAgg Hash Table Context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + /* Rebuild an empty hash table */ ! build_hash_table(node, agg->numGroups); ! node->hash_init_state = true; node->table_filled = false; + node->hash_disk = 0; + node->hash_work = NIL; + + /* prime with initial work item to read from outer plan */ + oldContext = MemoryContextSwitchTo(node->aggcontext); + node->hash_work = lappend(node->hash_work, + hash_work(NULL, agg->numGroups, 0)); + node->hash_num_batches++; + MemoryContextSwitchTo(oldContext); } else { *** a/src/backend/optimizer/path/costsize.c --- b/src/backend/optimizer/path/costsize.c *************** *** 75,80 **** --- 75,81 ---- #include "access/htup_details.h" #include "executor/executor.h" + #include "executor/nodeAgg.h" #include "executor/nodeHash.h" #include "miscadmin.h" #include "nodes/nodeFuncs.h" *************** *** 113,118 **** bool enable_bitmapscan = true; --- 114,120 ---- bool enable_tidscan = true; bool enable_sort = true; bool enable_hashagg = true; + bool enable_hashagg_disk = true; bool enable_nestloop = true; bool enable_material = true; bool enable_mergejoin = true; *************** *** 1468,1474 **** cost_agg(Path *path, PlannerInfo *root, AggStrategy aggstrategy, const AggClauseCosts *aggcosts, int numGroupCols, double numGroups, Cost input_startup_cost, Cost input_total_cost, ! double input_tuples) { double output_tuples; Cost startup_cost; --- 1470,1476 ---- AggStrategy aggstrategy, const AggClauseCosts *aggcosts, int numGroupCols, double numGroups, Cost input_startup_cost, Cost input_total_cost, ! int input_width, double input_tuples) { double output_tuples; Cost startup_cost; *************** *** 1531,1536 **** cost_agg(Path *path, PlannerInfo *root, --- 1533,1542 ---- else { /* must be AGG_HASHED */ + double group_size = hash_group_size(aggcosts->numAggs, + input_width, + aggcosts->transitionSpace); + startup_cost = input_total_cost; startup_cost += aggcosts->transCost.startup; startup_cost += aggcosts->transCost.per_tuple * input_tuples; *************** *** 1538,1543 **** cost_agg(Path *path, PlannerInfo *root, --- 1544,1578 ---- total_cost = startup_cost; total_cost += aggcosts->finalCost * numGroups; total_cost += cpu_tuple_cost * numGroups; + + if (group_size * numGroups > (work_mem * 1024L)) + { + double groups_per_batch = (work_mem * 1024L) / group_size; + + /* first batch doesn't go to disk */ + double groups_disk = numGroups - groups_per_batch; + + /* + * Assume that the groups that go to disk are of an average number + * of tuples. This is pessimistic -- the largest groups are more + * likely to be processed in the first pass and never go to disk. + */ + double tuples_disk = groups_disk * (input_tuples / numGroups); + + int tuple_size = sizeof(uint32) /* stored hash value */ + + MAXALIGN(sizeof(MinimalTupleData)) + + MAXALIGN(input_width); + double pages_to_disk = (tuples_disk * tuple_size) / BLCKSZ; + + /* + * Write and then read back the data that's not processed in the + * first pass. Data could be read and written more times than that + * if not enough partitions are created, but the depth will be a + * very small number even for a very large amount of data, so + * ignore it here. + */ + total_cost += seq_page_cost * 2 * pages_to_disk; + } output_tuples = numGroups; } *** a/src/backend/optimizer/plan/createplan.c --- b/src/backend/optimizer/plan/createplan.c *************** *** 4369,4374 **** make_agg(PlannerInfo *root, List *tlist, List *qual, --- 4369,4377 ---- node->grpColIdx = grpColIdx; node->grpOperators = grpOperators; node->numGroups = numGroups; + if (aggcosts != NULL) + node->transitionSpace = aggcosts->transitionSpace; + node->plan_width = lefttree->plan_width; copy_plan_costsize(plan, lefttree); /* only care about copying size */ cost_agg(&agg_path, root, *************** *** 4376,4381 **** make_agg(PlannerInfo *root, List *tlist, List *qual, --- 4379,4385 ---- numGroupCols, numGroups, lefttree->startup_cost, lefttree->total_cost, + lefttree->plan_width, lefttree->plan_rows); plan->startup_cost = agg_path.startup_cost; plan->total_cost = agg_path.total_cost; *** a/src/backend/optimizer/plan/planagg.c --- b/src/backend/optimizer/plan/planagg.c *************** *** 234,240 **** optimize_minmax_aggregates(PlannerInfo *root, List *tlist, cost_agg(&agg_p, root, AGG_PLAIN, aggcosts, 0, 0, best_path->startup_cost, best_path->total_cost, ! best_path->parent->rows); if (total_cost > agg_p.total_cost) return NULL; /* too expensive */ --- 234,240 ---- cost_agg(&agg_p, root, AGG_PLAIN, aggcosts, 0, 0, best_path->startup_cost, best_path->total_cost, ! best_path->parent->width, best_path->parent->rows); if (total_cost > agg_p.total_cost) return NULL; /* too expensive */ *** a/src/backend/optimizer/plan/planner.c --- b/src/backend/optimizer/plan/planner.c *************** *** 2744,2750 **** choose_hashed_grouping(PlannerInfo *root, /* plus the per-hash-entry overhead */ hashentrysize += hash_agg_entry_size(agg_costs->numAggs); ! if (hashentrysize * dNumGroups > work_mem * 1024L) return false; /* --- 2744,2751 ---- /* plus the per-hash-entry overhead */ hashentrysize += hash_agg_entry_size(agg_costs->numAggs); ! if (!enable_hashagg_disk && ! hashentrysize * dNumGroups > work_mem * 1024L) return false; /* *************** *** 2779,2785 **** choose_hashed_grouping(PlannerInfo *root, cost_agg(&hashed_p, root, AGG_HASHED, agg_costs, numGroupCols, dNumGroups, cheapest_path->startup_cost, cheapest_path->total_cost, ! path_rows); /* Result of hashed agg is always unsorted */ if (target_pathkeys) cost_sort(&hashed_p, root, target_pathkeys, hashed_p.total_cost, --- 2780,2786 ---- cost_agg(&hashed_p, root, AGG_HASHED, agg_costs, numGroupCols, dNumGroups, cheapest_path->startup_cost, cheapest_path->total_cost, ! path_width, path_rows); /* Result of hashed agg is always unsorted */ if (target_pathkeys) cost_sort(&hashed_p, root, target_pathkeys, hashed_p.total_cost, *************** *** 2810,2816 **** choose_hashed_grouping(PlannerInfo *root, cost_agg(&sorted_p, root, AGG_SORTED, agg_costs, numGroupCols, dNumGroups, sorted_p.startup_cost, sorted_p.total_cost, ! path_rows); else cost_group(&sorted_p, root, numGroupCols, dNumGroups, sorted_p.startup_cost, sorted_p.total_cost, --- 2811,2817 ---- cost_agg(&sorted_p, root, AGG_SORTED, agg_costs, numGroupCols, dNumGroups, sorted_p.startup_cost, sorted_p.total_cost, ! path_width, path_rows); else cost_group(&sorted_p, root, numGroupCols, dNumGroups, sorted_p.startup_cost, sorted_p.total_cost, *************** *** 2910,2916 **** choose_hashed_distinct(PlannerInfo *root, /* plus the per-hash-entry overhead */ hashentrysize += hash_agg_entry_size(0); ! if (hashentrysize * dNumDistinctRows > work_mem * 1024L) return false; /* --- 2911,2918 ---- /* plus the per-hash-entry overhead */ hashentrysize += hash_agg_entry_size(0); ! if (!enable_hashagg_disk && ! hashentrysize * dNumDistinctRows > work_mem * 1024L) return false; /* *************** *** 2929,2935 **** choose_hashed_distinct(PlannerInfo *root, cost_agg(&hashed_p, root, AGG_HASHED, NULL, numDistinctCols, dNumDistinctRows, cheapest_startup_cost, cheapest_total_cost, ! path_rows); /* * Result of hashed agg is always unsorted, so if ORDER BY is present we --- 2931,2937 ---- cost_agg(&hashed_p, root, AGG_HASHED, NULL, numDistinctCols, dNumDistinctRows, cheapest_startup_cost, cheapest_total_cost, ! path_width, path_rows); /* * Result of hashed agg is always unsorted, so if ORDER BY is present we *** a/src/backend/optimizer/prep/prepunion.c --- b/src/backend/optimizer/prep/prepunion.c *************** *** 851,857 **** choose_hashed_setop(PlannerInfo *root, List *groupClauses, cost_agg(&hashed_p, root, AGG_HASHED, NULL, numGroupCols, dNumGroups, input_plan->startup_cost, input_plan->total_cost, ! input_plan->plan_rows); /* * Now for the sorted case. Note that the input is *always* unsorted, --- 851,857 ---- cost_agg(&hashed_p, root, AGG_HASHED, NULL, numGroupCols, dNumGroups, input_plan->startup_cost, input_plan->total_cost, ! input_plan->plan_width, input_plan->plan_rows); /* * Now for the sorted case. Note that the input is *always* unsorted, *** a/src/backend/optimizer/util/pathnode.c --- b/src/backend/optimizer/util/pathnode.c *************** *** 1379,1385 **** create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, numCols, pathnode->path.rows, subpath->startup_cost, subpath->total_cost, ! rel->rows); } if (all_btree && all_hash) --- 1379,1385 ---- numCols, pathnode->path.rows, subpath->startup_cost, subpath->total_cost, ! rel->width, rel->rows); } if (all_btree && all_hash) *** a/src/backend/utils/hash/dynahash.c --- b/src/backend/utils/hash/dynahash.c *************** *** 291,301 **** hash_create(const char *tabname, long nelem, HASHCTL *info, int flags) CurrentDynaHashCxt = info->hcxt; else CurrentDynaHashCxt = TopMemoryContext; ! CurrentDynaHashCxt = AllocSetContextCreate(CurrentDynaHashCxt, ! tabname, ! ALLOCSET_DEFAULT_MINSIZE, ! ALLOCSET_DEFAULT_INITSIZE, ! ALLOCSET_DEFAULT_MAXSIZE); } /* Initialize the hash header, plus a copy of the table name */ --- 291,303 ---- CurrentDynaHashCxt = info->hcxt; else CurrentDynaHashCxt = TopMemoryContext; ! ! if ((flags & HASH_NOCHILDCXT) == 0) ! CurrentDynaHashCxt = AllocSetContextCreate(CurrentDynaHashCxt, ! tabname, ! ALLOCSET_DEFAULT_MINSIZE, ! ALLOCSET_DEFAULT_INITSIZE, ! ALLOCSET_DEFAULT_MAXSIZE); } /* Initialize the hash header, plus a copy of the table name */ *** a/src/backend/utils/misc/guc.c --- b/src/backend/utils/misc/guc.c *************** *** 771,776 **** static struct config_bool ConfigureNamesBool[] = --- 771,785 ---- NULL, NULL, NULL }, { + {"enable_hashagg_disk", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("Enables the planner's use of disk-based hashed aggregation plans."), + NULL + }, + &enable_hashagg_disk, + true, + NULL, NULL, NULL + }, + { {"enable_material", PGC_USERSET, QUERY_TUNING_METHOD, gettext_noop("Enables the planner's use of materialization."), NULL *** a/src/backend/utils/misc/postgresql.conf.sample --- b/src/backend/utils/misc/postgresql.conf.sample *************** *** 270,275 **** --- 270,276 ---- #enable_bitmapscan = on #enable_hashagg = on + #enable_hashagg_disk = on #enable_hashjoin = on #enable_indexscan = on #enable_indexonlyscan = on *** a/src/include/executor/executor.h --- b/src/include/executor/executor.h *************** *** 147,152 **** extern TupleHashTable BuildTupleHashTable(int numCols, AttrNumber *keyColIdx, --- 147,158 ---- extern TupleHashEntry LookupTupleHashEntry(TupleHashTable hashtable, TupleTableSlot *slot, bool *isnew); + extern uint32 TupleHashEntryHash(TupleHashTable hashtable, + TupleTableSlot *slot); + extern TupleHashEntry LookupTupleHashEntryHash(TupleHashTable hashtable, + TupleTableSlot *slot, + uint32 hashvalue, + bool *isnew); extern TupleHashEntry FindTupleHashEntry(TupleHashTable hashtable, TupleTableSlot *slot, FmgrInfo *eqfunctions, *** a/src/include/executor/nodeAgg.h --- b/src/include/executor/nodeAgg.h *************** *** 22,27 **** extern void ExecEndAgg(AggState *node); --- 22,28 ---- extern void ExecReScanAgg(AggState *node); extern Size hash_agg_entry_size(int numAggs); + extern Size hash_group_size(int numAggs, int inputWidth, Size transitionSpace); extern Datum aggregate_dummy(PG_FUNCTION_ARGS); *** a/src/include/nodes/execnodes.h --- b/src/include/nodes/execnodes.h *************** *** 1759,1769 **** typedef struct AggState --- 1759,1776 ---- AggStatePerGroup pergroup; /* per-Aggref-per-group working state */ HeapTuple grp_firstTuple; /* copy of first tuple of current group */ /* these fields are used in AGG_HASHED mode: */ + MemoryContext hashcontext; /* subcontext to use for hash table */ TupleHashTable hashtable; /* hash table with one entry per group */ TupleTableSlot *hashslot; /* slot for loading hash table */ List *hash_needed; /* list of columns needed in hash table */ + bool hash_init_state; /* in initial state before execution? */ bool table_filled; /* hash table filled yet? */ + int64 hash_disk; /* bytes of disk space used */ + uint64 hash_mem_min; /* memory used by empty hash table */ + uint64 hash_mem_peak; /* memory used at peak of execution */ + int hash_num_batches; /* total number of batches created */ TupleHashIterator hashiter; /* for iterating through hash table */ + List *hash_work; /* remaining work to be done */ } AggState; /* ---------------- *** a/src/include/nodes/plannodes.h --- b/src/include/nodes/plannodes.h *************** *** 666,671 **** typedef struct Agg --- 666,673 ---- AttrNumber *grpColIdx; /* their indexes in the target list */ Oid *grpOperators; /* equality operators to compare with */ long numGroups; /* estimated number of groups in input */ + Size transitionSpace; /* estimated size of by-ref transition val */ + int plan_width; /* input plan width */ } Agg; /* ---------------- *** a/src/include/optimizer/cost.h --- b/src/include/optimizer/cost.h *************** *** 57,62 **** extern bool enable_bitmapscan; --- 57,63 ---- extern bool enable_tidscan; extern bool enable_sort; extern bool enable_hashagg; + extern bool enable_hashagg_disk; extern bool enable_nestloop; extern bool enable_material; extern bool enable_mergejoin; *************** *** 102,108 **** extern void cost_agg(Path *path, PlannerInfo *root, AggStrategy aggstrategy, const AggClauseCosts *aggcosts, int numGroupCols, double numGroups, Cost input_startup_cost, Cost input_total_cost, ! double input_tuples); extern void cost_windowagg(Path *path, PlannerInfo *root, List *windowFuncs, int numPartCols, int numOrderCols, Cost input_startup_cost, Cost input_total_cost, --- 103,109 ---- AggStrategy aggstrategy, const AggClauseCosts *aggcosts, int numGroupCols, double numGroups, Cost input_startup_cost, Cost input_total_cost, ! int input_width, double input_tuples); extern void cost_windowagg(Path *path, PlannerInfo *root, List *windowFuncs, int numPartCols, int numOrderCols, Cost input_startup_cost, Cost input_total_cost, *** a/src/include/utils/hsearch.h --- b/src/include/utils/hsearch.h *************** *** 93,98 **** typedef struct HASHCTL --- 93,101 ---- #define HASH_COMPARE 0x400 /* Set user defined comparison function */ #define HASH_KEYCOPY 0x800 /* Set user defined key-copying function */ #define HASH_FIXED_SIZE 0x1000 /* Initial size is a hard limit */ + #define HASH_NOCHILDCXT 0x2000 /* Don't create a child context. Warning: + * hash_destroy will delete the memory context + * specified by the caller. */ /* max_dsize value to indicate expansible directory */ *** a/src/test/regress/expected/rangefuncs.out --- b/src/test/regress/expected/rangefuncs.out *************** *** 3,8 **** SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%'; --- 3,9 ---- ----------------------+--------- enable_bitmapscan | on enable_hashagg | on + enable_hashagg_disk | on enable_hashjoin | on enable_indexonlyscan | on enable_indexscan | on *************** *** 12,18 **** SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%'; enable_seqscan | on enable_sort | on enable_tidscan | on ! (11 rows) CREATE TABLE foo2(fooid int, f2 int); INSERT INTO foo2 VALUES(1, 11); --- 13,19 ---- enable_seqscan | on enable_sort | on enable_tidscan | on ! (12 rows) CREATE TABLE foo2(fooid int, f2 int); INSERT INTO foo2 VALUES(1, 11);
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers