On Wed, 2006-03-22 at 07:48 +0000, Simon Riggs wrote: > On Tue, 2006-03-21 at 17:47 -0500, Tom Lane wrote: > > > I'm fairly unconvinced about Simon's underlying premise --- that we > > can't make good use of work_mem in sorting after the run building phase > > --- anyway. > > We can make good use of memory, but there does come a point in final > merging where too much is of no further benefit. That point seems to be > at about 256 blocks per tape; patch enclosed for testing. (256 blocks > per tape roughly doubles performance over 32 blocks at that stage). > > That is never the case during run building - more is always better. > > > If we cut back our memory usage > Simon inserts the words: "too far" > > then we'll be forcing a > > significantly more-random access pattern to the temp file(s) during > > merging, because we won't be able to pre-read as much at a time. > > Yes, thats right. > > If we have 512MB of memory that gives us enough for 2000 tapes, yet the > initial runs might only build a few runs. There's just no way that all > 512MB of memory is needed to optimise the performance of reading in a > few tapes at time of final merge. > > I'm suggesting we always keep 2MB per active tape, or the full > allocation, whichever is lower. In the above example that could release > over 500MB of memory, which more importantly can be reused by subsequent > sorts if/when they occur. > > > Enclose two patches: > 1. mergebuffers.patch allows measurement of the effects of different > merge buffer sizes, current default=32 > > 2. reassign2.patch which implements the two kinds of resource > deallocation/reassignment proposed.
Missed couple of minor points in patch: reassign3.patch attached ro completely replace reassign2.patch. Recent test results show that with a 512MB test sort we can reclaim 97% of memory during final merge with only a noise level (+2%) increase in overall elapsed time. (Thats just an example, your mileage may vary). So a large query would use and keep about 536MB memory rather than 1536MB. Best Regards, Simon Riggs
Index: src/backend/utils/sort/tuplesort.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/utils/sort/tuplesort.c,v retrieving revision 1.65 diff -c -r1.65 tuplesort.c *** src/backend/utils/sort/tuplesort.c 10 Mar 2006 23:19:00 -0000 1.65 --- src/backend/utils/sort/tuplesort.c 22 Mar 2006 09:34:58 -0000 *************** *** 179,186 **** */ #define MINORDER 6 /* minimum merge order */ #define TAPE_BUFFER_OVERHEAD (BLCKSZ * 3) ! #define MERGE_BUFFER_SIZE (BLCKSZ * 32) ! /* * Private state of a Tuplesort operation. */ --- 179,187 ---- */ #define MINORDER 6 /* minimum merge order */ #define TAPE_BUFFER_OVERHEAD (BLCKSZ * 3) ! #define OPTIMAL_MERGE_BUFFER_SIZE (BLCKSZ * 32) ! #define PREFERRED_MERGE_BUFFER_SIZE (BLCKSZ * 256) ! #define REUSE_SPACE_LIMIT RELSEG_SIZE /* * Private state of a Tuplesort operation. */ *************** *** 255,260 **** --- 256,270 ---- */ int currentRun; + /* + * These variables are used during final merge to reassign resources + * as they become available for each tape + */ + int lastPrereadTape; /* last tape preread from */ + int numPrereads; /* num times last tape has been selected */ + int reassignableSlots; /* how many slots can be reassigned */ + long reassignableMem; /* how much memory can be reassigned */ + /* * Unless otherwise noted, all pointer variables below are pointers * to arrays of length maxTapes, holding per-tape data. *************** *** 294,299 **** --- 304,310 ---- int *tp_runs; /* # of real runs on each tape */ int *tp_dummy; /* # of dummy runs for each tape (D[]) */ int *tp_tapenum; /* Actual tape numbers (TAPE[]) */ + int activeTapes; /* # of active input tapes in merge pass */ /* *************** *** 398,408 **** --- 409,423 ---- static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess); static void puttuple_common(Tuplesortstate *state, SortTuple *tuple); + static void grow_memtuples(Tuplesortstate *state); + static void shrink_memtuples(Tuplesortstate *state); static void inittapes(Tuplesortstate *state); static void selectnewtape(Tuplesortstate *state); static void mergeruns(Tuplesortstate *state); static void mergeonerun(Tuplesortstate *state); static void beginmerge(Tuplesortstate *state); + static void assignResourcesUniformly(Tuplesortstate *state, bool initialAssignment); + static void reassignresources(Tuplesortstate *state, int srcTape); static void mergepreread(Tuplesortstate *state); static void mergeprereadone(Tuplesortstate *state, int srcTape); static void dumptuples(Tuplesortstate *state, bool alltuples); *************** *** 727,733 **** * moves around with tuple addition/removal, this might result in thrashing. * Small increases in the array size are likely to be pretty inefficient. */ ! static bool grow_memtuples(Tuplesortstate *state) { /* --- 742,748 ---- * moves around with tuple addition/removal, this might result in thrashing. * Small increases in the array size are likely to be pretty inefficient. */ ! static void grow_memtuples(Tuplesortstate *state) { /* *************** *** 740,752 **** * this assumption should be good. But let's check it.) */ if (state->availMem <= (long) (state->memtupsize * sizeof(SortTuple))) ! return false; /* * On a 64-bit machine, allowedMem could be high enough to get us into * trouble with MaxAllocSize, too. */ if ((Size) (state->memtupsize * 2) >= MaxAllocSize / sizeof(SortTuple)) ! return false; FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); state->memtupsize *= 2; --- 755,767 ---- * this assumption should be good. But let's check it.) */ if (state->availMem <= (long) (state->memtupsize * sizeof(SortTuple))) ! return; /* * On a 64-bit machine, allowedMem could be high enough to get us into * trouble with MaxAllocSize, too. */ if ((Size) (state->memtupsize * 2) >= MaxAllocSize / sizeof(SortTuple)) ! return; FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); state->memtupsize *= 2; *************** *** 756,762 **** USEMEM(state, GetMemoryChunkSpace(state->memtuples)); if (LACKMEM(state)) elog(ERROR, "unexpected out-of-memory situation during sort"); ! return true; } /* --- 771,849 ---- USEMEM(state, GetMemoryChunkSpace(state->memtuples)); if (LACKMEM(state)) elog(ERROR, "unexpected out-of-memory situation during sort"); ! return; ! } ! ! static void ! shrink_memtuples(Tuplesortstate *state) ! { ! int oldMemTupSize = state->memtupsize; ! int newMemTupSize; ! long oldMemTuplesAlloc = GetMemoryChunkSpace(state->memtuples); ! ! if (state->memtupcount > 0) ! { ! elog(LOG, "unexpected attempt to shrink sort memory"); ! return; ! } ! ! Assert(state->status == TSS_FINALMERGE || state->status == TSS_SORTEDONTAPE); ! ! /* ! * Set new size, based upon earlier memory usage ! * ! * We don't care exactly how many tuples are stored, however we already ! * know that the existing setting filled available memory. The merge ! * order was set according to OPTIMAL_MERGE_BUFFER_SIZE, on the assumption ! * that we might have to cope with a merge of maxTapes. However, we ! * would prefer to merge using PREFERRED_MERGE_BUFFER_SIZE for each active ! * tape. Any more memory than this is likely to be a waste, so we can ! * reduce memory in proporton to the ratio of activeTapes to maxTapes ! * and thus allow it to be reused by subsequent sorts or hashes within ! * the same execution. For large memory settings this will be a major ! * win, since we may have allocated enough memory for 1000s of tapes, ! * yet might only need to merge back on a few tapes. ! */ ! newMemTupSize = (int) ((state->memtupsize * state->activeTapes * ! (PREFERRED_MERGE_BUFFER_SIZE / OPTIMAL_MERGE_BUFFER_SIZE))/ ! state->tapeRange); ! ! if (newMemTupSize >= state->memtupsize) ! return; ! ! pfree(state->memtuples); ! FREEMEM(state, oldMemTuplesAlloc); ! ! /* ! * We don't need a memtuples array at all if we have randomAccess ! */ ! if (state->status == TSS_SORTEDONTAPE) ! { ! #ifdef TRACE_SORT ! if (trace_sort) ! elog(LOG, "releasing sort resources prior to enabling randomAccess: %s", ! pg_rusage_show(&state->ru_start)); ! #endif ! state->memtupsize = 0; ! return; ! } ! ! state->memtupsize = newMemTupSize; ! state->memtuples = (SortTuple *) ! palloc(state->memtupsize * sizeof(SortTuple)); ! USEMEM(state, oldMemTuplesAlloc); ! ! #ifdef TRACE_SORT ! if (trace_sort) ! elog(LOG, "shrinking resources to %d%% (from %d to %d slots): %s", ! (100*newMemTupSize) / oldMemTupSize, ! oldMemTupSize, state->memtupsize, ! pg_rusage_show(&state->ru_start)); ! #endif ! ! if (LACKMEM(state)) ! elog(ERROR, "unexpected out-of-memory situation during sort"); ! return; } /* *************** *** 834,840 **** */ if (state->memtupcount >= state->memtupsize - 1) { ! (void) grow_memtuples(state); Assert(state->memtupcount < state->memtupsize); } state->memtuples[state->memtupcount++] = *tuple; --- 921,927 ---- */ if (state->memtupcount >= state->memtupsize - 1) { ! grow_memtuples(state); Assert(state->memtupcount < state->memtupsize); } state->memtuples[state->memtupcount++] = *tuple; *************** *** 1115,1120 **** --- 1202,1209 ---- tuplesort_heap_siftup(state, false); if ((tupIndex = state->mergenext[srcTape]) == 0) { + reassignresources(state, srcTape); + /* * out of preloaded data on this tape, try to read more * *************** *** 1125,1133 **** --- 1214,1236 ---- /* * if still no data, we've reached end of run on this tape + * so we can permanently reassign the resources used by + * srcTape onto any remaining tapes for remainder of the + * final merge */ if ((tupIndex = state->mergenext[srcTape]) == 0) + { + state->reassignableSlots += state->mergeavailslots[srcTape]; + state->reassignableMem += state->mergeavailmem[srcTape]; + state->numPrereads = 0; + #ifdef TRACE_SORT + if (trace_sort) + elog(LOG, "final merge: tape %d exhausted: %s", + srcTape, + pg_rusage_show(&state->ru_start)); + #endif return true; + } } /* pull next preread tuple from list, insert in heap */ newtup = &state->memtuples[tupIndex]; *************** *** 1150,1155 **** --- 1253,1365 ---- } /* + * Assign space uniformly across remaining active tapes + */ + static void + assignResourcesUniformly(Tuplesortstate *state, bool initialAssignment) + { + int slotsPerTape; + long spacePerTape; + int activeTapes = state->activeTapes; + int srcTape; + + Assert(activeTapes > 0); + + if (initialAssignment) + { + slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes; + Assert(slotsPerTape > 0); + spacePerTape = state->availMem / activeTapes; + + for (srcTape = 0; srcTape < state->maxTapes; srcTape++) + { + if (state->mergeactive[srcTape]) + { + state->mergeavailslots[srcTape] = slotsPerTape; + state->mergeavailmem[srcTape] = spacePerTape; + } + } + } + else + { + slotsPerTape = state->reassignableSlots / activeTapes; + if (slotsPerTape == 0) + return; + spacePerTape = state->reassignableMem / activeTapes; + #ifdef TRACE_SORT + if (trace_sort) + elog(LOG, "reassigning resources; each tape gets: +%d slots, +%ld mem: %s", + slotsPerTape, + spacePerTape, + pg_rusage_show(&state->ru_start)); + #endif + + for (srcTape = 0; srcTape < state->maxTapes; srcTape++) + { + if (state->mergeactive[srcTape]) + { + state->mergeavailslots[srcTape] += slotsPerTape; + state->mergeavailmem[srcTape] += spacePerTape; + if (--activeTapes <= 0) + break; + } + } + } + + state->reassignableSlots = 0; + state->reassignableMem = 0; + } + + /* + * If we have any reassignable resources from earlier tapes that have been + * made empty by previous final run prereads, consider how to reassign them + */ + static void + reassignresources(Tuplesortstate *state, int srcTape) + { + if (state->reassignableSlots <= state->activeTapes) + return; + + /* + * We expect each tape to need to be preread about 2*number of tapes in + * final merge. That gives us time to decide whether we should allocate the + * reassignable resources evenly, or whether we should give them all to a + * single tape, which may be appropriate in some cases + */ + if (state->numPrereads == 0 || + state->lastPrereadTape == srcTape || + state->activeTapes == 1) + { + state->numPrereads++; + state->lastPrereadTape = srcTape; + + /* + * If confirm that the emerging pattern of prereads + * is consistently on a single tape, then hand + * over all the spare resources to that tape + */ + if (state->numPrereads > 3 || state->activeTapes == 1) + { + #ifdef TRACE_SORT + if (trace_sort) + elog(LOG, "reassigning memory to tape %d, +%d slots, +%ld mem: %s", + srcTape, + state->reassignableSlots, + state->reassignableMem, + pg_rusage_show(&state->ru_start)); + #endif + state->mergeavailslots[srcTape] += state->reassignableSlots; + state->mergeavailmem[srcTape] += state->reassignableMem; + state->reassignableSlots = 0; + state->reassignableMem = 0; + state->numPrereads = 0; + } + } + else + assignResourcesUniformly(state, false); + } + + /* * Fetch the next tuple in either forward or back direction. * Returns NULL if no more tuples. If *should_free is set, the * caller must pfree the returned tuple when done with it. *************** *** 1231,1237 **** * the MERGE_BUFFER_SIZE workspace. */ mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) / ! (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD); /* Even in minimum memory, use at least a MINORDER merge */ mOrder = Max(mOrder, MINORDER); --- 1441,1447 ---- * the MERGE_BUFFER_SIZE workspace. */ mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) / ! (OPTIMAL_MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD); /* Even in minimum memory, use at least a MINORDER merge */ mOrder = Max(mOrder, MINORDER); *************** *** 1436,1443 **** /* Tell logtape.c we won't be writing anymore */ LogicalTapeSetForgetFreeSpace(state->tapeset); /* Initialize for the final merge pass */ - beginmerge(state); state->status = TSS_FINALMERGE; return; } } --- 1646,1653 ---- /* Tell logtape.c we won't be writing anymore */ LogicalTapeSetForgetFreeSpace(state->tapeset); /* Initialize for the final merge pass */ state->status = TSS_FINALMERGE; + beginmerge(state); return; } } *************** *** 1506,1511 **** --- 1716,1722 ---- state->result_tape = state->tp_tapenum[state->tapeRange]; LogicalTapeFreeze(state->tapeset, state->result_tape); state->status = TSS_SORTEDONTAPE; + shrink_memtuples(state); } /* *************** *** 1523,1528 **** --- 1734,1740 ---- SortTuple *tup; long priorAvail, spaceFreed; + int numtapes; /* * Start the merge by loading one tuple from each active source tape into *************** *** 1530,1535 **** --- 1742,1749 ---- */ beginmerge(state); + numtapes = state->activeTapes; + /* * Execute merge by repeatedly extracting lowest tuple in heap, writing it * out, and replacing it with next tuple from same tape (if there is *************** *** 1576,1582 **** #ifdef TRACE_SORT if (trace_sort) ! elog(LOG, "finished %d-way merge step: %s", state->activeTapes, pg_rusage_show(&state->ru_start)); #endif } --- 1790,1796 ---- #ifdef TRACE_SORT if (trace_sort) ! elog(LOG, "finished %d-way merge step: %s", numtapes, pg_rusage_show(&state->ru_start)); #endif } *************** *** 1595,1602 **** int activeTapes; int tapenum; int srcTape; - int slotsPerTape; - long spacePerTape; /* Heap should be empty here */ Assert(state->memtupcount == 0); --- 1809,1814 ---- *************** *** 1628,1649 **** state->mergefreelist = 0; /* nothing in the freelist */ state->mergefirstfree = activeTapes; /* 1st slot avail for preread */ ! /* ! * Initialize space allocation to let each active input tape have an equal ! * share of preread space. ! */ ! Assert(activeTapes > 0); ! slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes; ! Assert(slotsPerTape > 0); ! spacePerTape = state->availMem / activeTapes; ! for (srcTape = 0; srcTape < state->maxTapes; srcTape++) ! { ! if (state->mergeactive[srcTape]) ! { ! state->mergeavailslots[srcTape] = slotsPerTape; ! state->mergeavailmem[srcTape] = spacePerTape; ! } ! } /* * Preread as many tuples as possible (and at least one) from each active --- 1840,1849 ---- state->mergefreelist = 0; /* nothing in the freelist */ state->mergefirstfree = activeTapes; /* 1st slot avail for preread */ ! if (state->status == TSS_FINALMERGE) ! shrink_memtuples(state); ! ! assignResourcesUniformly(state, true); /* * Preread as many tuples as possible (and at least one) from each active *************** *** 1722,1727 **** --- 1922,1928 ---- if (!state->mergeactive[srcTape]) return; /* tape's run is already exhausted */ + priorAvail = state->availMem; state->availMem = state->mergeavailmem[srcTape]; while ((state->mergeavailslots[srcTape] > 0 && !LACKMEM(state)) || *************** *** 1730,1736 **** --- 1931,1939 ---- /* read next tuple, if any */ if ((tuplen = getlen(state, srcTape, true)) == 0) { + /* remove this tape from the active set */ state->mergeactive[srcTape] = false; + --state->activeTapes; break; } READTUP(state, &stup, srcTape, tuplen);
---------------------------(end of broadcast)--------------------------- TIP 2: Don't 'kill -9' the postmaster