On Wed, 2006-03-22 at 07:48 +0000, Simon Riggs wrote:
> On Tue, 2006-03-21 at 17:47 -0500, Tom Lane wrote:
> 
> > I'm fairly unconvinced about Simon's underlying premise --- that we
> > can't make good use of work_mem in sorting after the run building phase
> > --- anyway.  
> 
> We can make good use of memory, but there does come a point in final
> merging where too much is of no further benefit. That point seems to be
> at about 256 blocks per tape; patch enclosed for testing. (256 blocks
> per tape roughly doubles performance over 32 blocks at that stage).
> 
> That is never the case during run building - more is always better.
> 
> > If we cut back our memory usage 
> Simon inserts the words: "too far"
> > then we'll be forcing a
> > significantly more-random access pattern to the temp file(s) during
> > merging, because we won't be able to pre-read as much at a time.
> 
> Yes, thats right.
> 
> If we have 512MB of memory that gives us enough for 2000 tapes, yet the
> initial runs might only build a few runs. There's just no way that all
> 512MB of memory is needed to optimise the performance of reading in a
> few tapes at time of final merge.
> 
> I'm suggesting we always keep 2MB per active tape, or the full
> allocation, whichever is lower. In the above example that could release
> over 500MB of memory, which more importantly can be reused by subsequent
> sorts if/when they occur.
> 
> 
> Enclose two patches:
> 1. mergebuffers.patch allows measurement of the effects of different
> merge buffer sizes, current default=32
> 
> 2. reassign2.patch which implements the two kinds of resource
> deallocation/reassignment proposed.

Missed couple of minor points in patch: reassign3.patch attached ro
completely replace reassign2.patch.

Recent test results show that with a 512MB test sort we can reclaim 97%
of memory during final merge with only a noise level (+2%) increase in
overall elapsed time. (Thats just an example, your mileage may vary). So
a large query would use and keep about 536MB memory rather than 1536MB.

Best Regards, Simon Riggs
Index: src/backend/utils/sort/tuplesort.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/utils/sort/tuplesort.c,v
retrieving revision 1.65
diff -c -r1.65 tuplesort.c
*** src/backend/utils/sort/tuplesort.c	10 Mar 2006 23:19:00 -0000	1.65
--- src/backend/utils/sort/tuplesort.c	22 Mar 2006 09:34:58 -0000
***************
*** 179,186 ****
   */
  #define MINORDER		6		/* minimum merge order */
  #define TAPE_BUFFER_OVERHEAD		(BLCKSZ * 3)
! #define MERGE_BUFFER_SIZE			(BLCKSZ * 32)
! 
  /*
   * Private state of a Tuplesort operation.
   */
--- 179,187 ----
   */
  #define MINORDER		6		/* minimum merge order */
  #define TAPE_BUFFER_OVERHEAD		(BLCKSZ * 3)
! #define OPTIMAL_MERGE_BUFFER_SIZE	(BLCKSZ * 32)
! #define PREFERRED_MERGE_BUFFER_SIZE (BLCKSZ * 256)
! #define REUSE_SPACE_LIMIT           RELSEG_SIZE
  /*
   * Private state of a Tuplesort operation.
   */
***************
*** 255,260 ****
--- 256,270 ----
  	 */
  	int			currentRun;
  
+     /*
+      * These variables are used during final merge to reassign resources
+      * as they become available for each tape
+      */
+     int         lastPrereadTape;    /* last tape preread from */
+     int         numPrereads;        /* num times last tape has been selected */
+     int         reassignableSlots;  /* how many slots can be reassigned */
+     long        reassignableMem;    /* how much memory can be reassigned */
+ 
  	/*
  	 * Unless otherwise noted, all pointer variables below are pointers
  	 * to arrays of length maxTapes, holding per-tape data.
***************
*** 294,299 ****
--- 304,310 ----
  	int		   *tp_runs;		/* # of real runs on each tape */
  	int		   *tp_dummy;		/* # of dummy runs for each tape (D[]) */
  	int		   *tp_tapenum;		/* Actual tape numbers (TAPE[]) */
+ 
  	int			activeTapes;	/* # of active input tapes in merge pass */
  
  	/*
***************
*** 398,408 ****
--- 409,423 ----
  
  static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
  static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
+ static void grow_memtuples(Tuplesortstate *state);
+ static void shrink_memtuples(Tuplesortstate *state);
  static void inittapes(Tuplesortstate *state);
  static void selectnewtape(Tuplesortstate *state);
  static void mergeruns(Tuplesortstate *state);
  static void mergeonerun(Tuplesortstate *state);
  static void beginmerge(Tuplesortstate *state);
+ static void assignResourcesUniformly(Tuplesortstate *state, bool initialAssignment);
+ static void reassignresources(Tuplesortstate *state, int srcTape);
  static void mergepreread(Tuplesortstate *state);
  static void mergeprereadone(Tuplesortstate *state, int srcTape);
  static void dumptuples(Tuplesortstate *state, bool alltuples);
***************
*** 727,733 ****
   * moves around with tuple addition/removal, this might result in thrashing.
   * Small increases in the array size are likely to be pretty inefficient.
   */
! static bool
  grow_memtuples(Tuplesortstate *state)
  {
  	/*
--- 742,748 ----
   * moves around with tuple addition/removal, this might result in thrashing.
   * Small increases in the array size are likely to be pretty inefficient.
   */
! static void
  grow_memtuples(Tuplesortstate *state)
  {
  	/*
***************
*** 740,752 ****
  	 * this assumption should be good.  But let's check it.)
  	 */
  	if (state->availMem <= (long) (state->memtupsize * sizeof(SortTuple)))
! 		return false;
  	/*
  	 * On a 64-bit machine, allowedMem could be high enough to get us into
  	 * trouble with MaxAllocSize, too.
  	 */
  	if ((Size) (state->memtupsize * 2) >= MaxAllocSize / sizeof(SortTuple))
! 		return false;
  
  	FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
  	state->memtupsize *= 2;
--- 755,767 ----
  	 * this assumption should be good.  But let's check it.)
  	 */
  	if (state->availMem <= (long) (state->memtupsize * sizeof(SortTuple)))
! 		return;
  	/*
  	 * On a 64-bit machine, allowedMem could be high enough to get us into
  	 * trouble with MaxAllocSize, too.
  	 */
  	if ((Size) (state->memtupsize * 2) >= MaxAllocSize / sizeof(SortTuple))
! 		return;
  
  	FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
  	state->memtupsize *= 2;
***************
*** 756,762 ****
  	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
  	if (LACKMEM(state))
  		elog(ERROR, "unexpected out-of-memory situation during sort");
! 	return true;
  }
  
  /*
--- 771,849 ----
  	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
  	if (LACKMEM(state))
  		elog(ERROR, "unexpected out-of-memory situation during sort");
! 	return;
! }
! 
! static void
! shrink_memtuples(Tuplesortstate *state)
! {
!     int  oldMemTupSize = state->memtupsize;
!     int  newMemTupSize;
!     long oldMemTuplesAlloc = GetMemoryChunkSpace(state->memtuples);
! 
!     if (state->memtupcount > 0)
!     {
! 		elog(LOG, "unexpected attempt to shrink sort memory");
!         return;
!     }
! 
!     Assert(state->status == TSS_FINALMERGE || state->status == TSS_SORTEDONTAPE);
! 
!     /*
!      * Set new size, based upon earlier memory usage
!      *
!      * We don't care exactly how many tuples are stored, however we already
!      * know that the existing setting filled available memory. The merge
!      * order was set according to OPTIMAL_MERGE_BUFFER_SIZE, on the assumption
!      * that we might have to cope with a merge of maxTapes. However, we
!      * would prefer to merge using PREFERRED_MERGE_BUFFER_SIZE for each active
!      * tape. Any more memory than this is likely to be a waste, so we can 
!      * reduce memory in proporton to the ratio of activeTapes to maxTapes
!      * and thus allow it to be reused by subsequent sorts or hashes within 
!      * the same execution. For large memory settings this will be a major
!      * win, since we may have allocated enough memory for 1000s of tapes,
!      * yet might only need to merge back on a few tapes.
!      */
!    newMemTupSize = (int) ((state->memtupsize * state->activeTapes * 
!                     (PREFERRED_MERGE_BUFFER_SIZE / OPTIMAL_MERGE_BUFFER_SIZE))/  
!                      state->tapeRange);
! 
!     if (newMemTupSize >= state->memtupsize)
!         return;
! 
!     pfree(state->memtuples);
! 	FREEMEM(state, oldMemTuplesAlloc);
! 
!     /*
!      * We don't need a memtuples array at all if we have randomAccess
!      */
!     if (state->status == TSS_SORTEDONTAPE)
!     {
! #ifdef TRACE_SORT
! 	if (trace_sort)
! 		elog(LOG, "releasing sort resources prior to enabling randomAccess: %s",
! 		    pg_rusage_show(&state->ru_start));
! #endif
!         state->memtupsize = 0;
!         return;
!     }
! 
! 	state->memtupsize = newMemTupSize;
! 	state->memtuples = (SortTuple *)
! 		  palloc(state->memtupsize * sizeof(SortTuple));
! 	USEMEM(state, oldMemTuplesAlloc);
! 
! #ifdef TRACE_SORT
! 	if (trace_sort)
! 		elog(LOG, "shrinking resources to %d%% (from %d to %d slots): %s",
!             (100*newMemTupSize) / oldMemTupSize,
!             oldMemTupSize, state->memtupsize,
! 		    pg_rusage_show(&state->ru_start));
! #endif
! 
! 	if (LACKMEM(state))
! 		elog(ERROR, "unexpected out-of-memory situation during sort");
! 	return;
  }
  
  /*
***************
*** 834,840 ****
  			 */
  			if (state->memtupcount >= state->memtupsize - 1)
  			{
! 				(void) grow_memtuples(state);
  				Assert(state->memtupcount < state->memtupsize);
  			}
  			state->memtuples[state->memtupcount++] = *tuple;
--- 921,927 ----
  			 */
  			if (state->memtupcount >= state->memtupsize - 1)
  			{
! 				grow_memtuples(state);
  				Assert(state->memtupcount < state->memtupsize);
  			}
  			state->memtuples[state->memtupcount++] = *tuple;
***************
*** 1115,1120 ****
--- 1202,1209 ----
  				tuplesort_heap_siftup(state, false);
  				if ((tupIndex = state->mergenext[srcTape]) == 0)
  				{
+                     reassignresources(state, srcTape);
+  
  					/*
  					 * out of preloaded data on this tape, try to read more
  					 *
***************
*** 1125,1133 ****
--- 1214,1236 ----
  
  					/*
  					 * if still no data, we've reached end of run on this tape
+                      * so we can permanently reassign the resources used by
+                      * srcTape onto any remaining tapes for remainder of the
+                      * final merge
  					 */
  					if ((tupIndex = state->mergenext[srcTape]) == 0)
+                     {
+                         state->reassignableSlots += state->mergeavailslots[srcTape];
+                         state->reassignableMem += state->mergeavailmem[srcTape];
+                         state->numPrereads = 0;
+ #ifdef TRACE_SORT
+                     	if (trace_sort)
+                     		elog(LOG, "final merge: tape %d exhausted: %s",
+                                 srcTape,
+                     		    pg_rusage_show(&state->ru_start));
+ #endif
  						return true;
+                     }
  				}
  				/* pull next preread tuple from list, insert in heap */
  				newtup = &state->memtuples[tupIndex];
***************
*** 1150,1155 ****
--- 1253,1365 ----
  }
  
  /*
+  * Assign space uniformly across remaining active tapes
+  */
+ static void 
+ assignResourcesUniformly(Tuplesortstate *state, bool initialAssignment)
+ {
+     int slotsPerTape;
+     long spacePerTape;
+     int activeTapes = state->activeTapes;
+     int srcTape;
+ 
+ 	Assert(activeTapes > 0);
+ 
+     if (initialAssignment)
+     {
+     	slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
+     	Assert(slotsPerTape > 0);
+     	spacePerTape = state->availMem / activeTapes;
+ 
+     	for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
+     	{
+     		if (state->mergeactive[srcTape])
+     		{
+     			state->mergeavailslots[srcTape] = slotsPerTape;
+     			state->mergeavailmem[srcTape] = spacePerTape;
+     		}
+     	}
+     }
+     else
+     {
+         slotsPerTape = state->reassignableSlots / activeTapes;
+         if (slotsPerTape == 0)
+             return;
+         spacePerTape = state->reassignableMem / activeTapes;
+ #ifdef TRACE_SORT
+     	if (trace_sort)
+     		elog(LOG, "reassigning resources; each tape gets: +%d slots, +%ld mem: %s",
+                 slotsPerTape,
+                 spacePerTape,
+     		    pg_rusage_show(&state->ru_start));
+ #endif
+ 
+     	for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
+     	{
+     		if (state->mergeactive[srcTape])
+     		{
+     			state->mergeavailslots[srcTape] += slotsPerTape;
+     			state->mergeavailmem[srcTape] += spacePerTape;
+                 if (--activeTapes <= 0)
+                     break;
+     		}
+     	}
+     }
+ 
+     state->reassignableSlots = 0;
+     state->reassignableMem = 0;
+ }
+ 
+ /*
+  * If we have any reassignable resources from earlier tapes that have been 
+  * made empty by previous final run prereads, consider how to reassign them
+  */
+ static void
+ reassignresources(Tuplesortstate *state, int srcTape)
+ {
+     if (state->reassignableSlots <= state->activeTapes)
+         return;
+ 
+     /*
+      * We expect each tape to need to be preread about 2*number of tapes in 
+      * final merge. That gives us time to decide whether we should allocate the
+      * reassignable resources evenly, or whether we should give them all to a 
+      * single tape, which may be appropriate in some cases
+      */
+     if (state->numPrereads == 0 ||
+         state->lastPrereadTape == srcTape || 
+         state->activeTapes == 1)
+     {
+         state->numPrereads++;
+         state->lastPrereadTape = srcTape;
+ 
+         /*
+          * If confirm that the emerging pattern of prereads
+          * is consistently on a single tape, then hand
+          * over all the spare resources to that tape
+          */
+         if (state->numPrereads > 3 || state->activeTapes == 1)
+         {
+ #ifdef TRACE_SORT
+         	if (trace_sort)
+         		elog(LOG, "reassigning memory to tape %d, +%d slots, +%ld mem: %s",
+                     srcTape,
+                     state->reassignableSlots,
+                     state->reassignableMem,
+         		    pg_rusage_show(&state->ru_start));
+ #endif
+             state->mergeavailslots[srcTape] += state->reassignableSlots;
+             state->mergeavailmem[srcTape] += state->reassignableMem;
+             state->reassignableSlots = 0;
+             state->reassignableMem = 0;
+             state->numPrereads = 0;
+         }
+     }
+     else
+             assignResourcesUniformly(state, false);
+ }
+ 
+ /*
   * Fetch the next tuple in either forward or back direction.
   * Returns NULL if no more tuples.	If *should_free is set, the
   * caller must pfree the returned tuple when done with it.
***************
*** 1231,1237 ****
  	 * the MERGE_BUFFER_SIZE workspace.
  	 */
  	mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
! 		(MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
  
  	/* Even in minimum memory, use at least a MINORDER merge */
  	mOrder = Max(mOrder, MINORDER);
--- 1441,1447 ----
  	 * the MERGE_BUFFER_SIZE workspace.
  	 */
  	mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
! 		(OPTIMAL_MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
  
  	/* Even in minimum memory, use at least a MINORDER merge */
  	mOrder = Max(mOrder, MINORDER);
***************
*** 1436,1443 ****
  				/* Tell logtape.c we won't be writing anymore */
  				LogicalTapeSetForgetFreeSpace(state->tapeset);
  				/* Initialize for the final merge pass */
- 				beginmerge(state);
  				state->status = TSS_FINALMERGE;
  				return;
  			}
  		}
--- 1646,1653 ----
  				/* Tell logtape.c we won't be writing anymore */
  				LogicalTapeSetForgetFreeSpace(state->tapeset);
  				/* Initialize for the final merge pass */
  				state->status = TSS_FINALMERGE;
+ 				beginmerge(state);
  				return;
  			}
  		}
***************
*** 1506,1511 ****
--- 1716,1722 ----
  	state->result_tape = state->tp_tapenum[state->tapeRange];
  	LogicalTapeFreeze(state->tapeset, state->result_tape);
  	state->status = TSS_SORTEDONTAPE;
+     shrink_memtuples(state);
  }
  
  /*
***************
*** 1523,1528 ****
--- 1734,1740 ----
  	SortTuple  *tup;
  	long		priorAvail,
  				spaceFreed;
+     int         numtapes;
  
  	/*
  	 * Start the merge by loading one tuple from each active source tape into
***************
*** 1530,1535 ****
--- 1742,1749 ----
  	 */
  	beginmerge(state);
  
+     numtapes = state->activeTapes;
+ 
  	/*
  	 * Execute merge by repeatedly extracting lowest tuple in heap, writing it
  	 * out, and replacing it with next tuple from same tape (if there is
***************
*** 1576,1582 ****
  
  #ifdef TRACE_SORT
  	if (trace_sort)
! 		elog(LOG, "finished %d-way merge step: %s", state->activeTapes,
  			 pg_rusage_show(&state->ru_start));
  #endif
  }
--- 1790,1796 ----
  
  #ifdef TRACE_SORT
  	if (trace_sort)
! 		elog(LOG, "finished %d-way merge step: %s", numtapes,
  			 pg_rusage_show(&state->ru_start));
  #endif
  }
***************
*** 1595,1602 ****
  	int			activeTapes;
  	int			tapenum;
  	int			srcTape;
- 	int			slotsPerTape;
- 	long		spacePerTape;
  
  	/* Heap should be empty here */
  	Assert(state->memtupcount == 0);
--- 1809,1814 ----
***************
*** 1628,1649 ****
  	state->mergefreelist = 0;	/* nothing in the freelist */
  	state->mergefirstfree = activeTapes;  /* 1st slot avail for preread */
  
! 	/*
! 	 * Initialize space allocation to let each active input tape have an equal
! 	 * share of preread space.
! 	 */
! 	Assert(activeTapes > 0);
! 	slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
! 	Assert(slotsPerTape > 0);
! 	spacePerTape = state->availMem / activeTapes;
! 	for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
! 	{
! 		if (state->mergeactive[srcTape])
! 		{
! 			state->mergeavailslots[srcTape] = slotsPerTape;
! 			state->mergeavailmem[srcTape] = spacePerTape;
! 		}
! 	}
  
  	/*
  	 * Preread as many tuples as possible (and at least one) from each active
--- 1840,1849 ----
  	state->mergefreelist = 0;	/* nothing in the freelist */
  	state->mergefirstfree = activeTapes;  /* 1st slot avail for preread */
  
!     if (state->status == TSS_FINALMERGE)
!         shrink_memtuples(state);
! 
!     assignResourcesUniformly(state, true);
  
  	/*
  	 * Preread as many tuples as possible (and at least one) from each active
***************
*** 1722,1727 ****
--- 1922,1928 ----
  
  	if (!state->mergeactive[srcTape])
  		return;					/* tape's run is already exhausted */
+ 
  	priorAvail = state->availMem;
  	state->availMem = state->mergeavailmem[srcTape];
  	while ((state->mergeavailslots[srcTape] > 0 && !LACKMEM(state)) ||
***************
*** 1730,1736 ****
--- 1931,1939 ----
  		/* read next tuple, if any */
  		if ((tuplen = getlen(state, srcTape, true)) == 0)
  		{
+             /* remove this tape from the active set */
  			state->mergeactive[srcTape] = false;
+             --state->activeTapes;
  			break;
  		}
  		READTUP(state, &stup, srcTape, tuplen);
---------------------------(end of broadcast)---------------------------
TIP 2: Don't 'kill -9' the postmaster

Reply via email to