Index: src/backend/executor/nodeLimit.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/executor/nodeLimit.c,v
retrieving revision 1.27
diff -c -r1.27 nodeLimit.c
*** src/backend/executor/nodeLimit.c	26 Jul 2006 19:31:50 -0000	1.27
--- src/backend/executor/nodeLimit.c	14 Sep 2006 14:53:39 -0000
***************
*** 270,275 ****
--- 270,295 ----
  	/* Reset position to start-of-scan */
  	node->position = 0;
  	node->subSlot = NULL;
+ 
+ 	/* This is a bit of a kluge 
+ 	 *
+ 	 * I'm not aware of any abstract way to communicate between the two nodes.
+ 	 * Perhaps I should add a new method in nodeSort like ExecAdviseSort and
+ 	 * have a dispatcher in ExecNode that only has one branch. It isn't likely
+ 	 * to be abstract enough to handle other forms of "advice" though.
+ 	 */
+ 	if (!node->noCount && (IsA(outerPlanState(node), SortState)))
+ 	{
+ 		SortState *sortState = (SortState*)outerPlanState(node);
+ 		int64 tuples_needed = node->count + node->offset;
+ 
+ 		/* check for overflow */
+ 		if (tuples_needed < 0)
+ 			return;
+ 			
+ 		sortState->bounded = true;
+ 		sortState->bound = tuples_needed;
+ 	}
  }
  
  /* ----------------------------------------------------------------
Index: src/backend/executor/nodeSort.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/executor/nodeSort.c,v
retrieving revision 1.57
diff -c -r1.57 nodeSort.c
*** src/backend/executor/nodeSort.c	27 Jun 2006 16:53:02 -0000	1.57
--- src/backend/executor/nodeSort.c	14 Sep 2006 14:53:40 -0000
***************
*** 88,93 ****
--- 88,97 ----
  											  plannode->sortColIdx,
  											  work_mem,
  											  node->randomAccess);
+ 		if (node->bounded) {
+ 			tuplesort_set_bound(tuplesortstate, node->bound);
+ 		}
+ 		
  		node->tuplesortstate = (void *) tuplesortstate;
  
  		/*
***************
*** 166,171 ****
--- 170,176 ----
  										 EXEC_FLAG_BACKWARD |
  										 EXEC_FLAG_MARK)) != 0;
  
+ 	sortstate->bounded = false;
  	sortstate->sort_Done = false;
  	sortstate->tuplesortstate = NULL;
  
Index: src/backend/utils/misc/guc.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/utils/misc/guc.c,v
retrieving revision 1.350
diff -c -r1.350 guc.c
*** src/backend/utils/misc/guc.c	2 Sep 2006 23:12:16 -0000	1.350
--- src/backend/utils/misc/guc.c	14 Sep 2006 14:53:46 -0000
***************
*** 102,107 ****
--- 102,108 ----
  #ifdef TRACE_SORT
  extern bool trace_sort;
  #endif
+ extern bool enable_bounded_sort;
  
  static const char *assign_log_destination(const char *value,
  					   bool doit, GucSource source);
***************
*** 935,940 ****
--- 936,951 ----
  		false, NULL, NULL
  	},
  #endif
+ 	{
+ 		{
+ 			"enable_bounded_sort", PGC_USERSET, QUERY_TUNING_METHOD,
+ 			gettext_noop("Enables incremental bounded sorts using heap sort"),
+ 			NULL,
+ 			GUC_NOT_IN_SAMPLE
+ 		},
+ 		&enable_bounded_sort,
+ 		true, NULL, NULL
+ 	},
  
  #ifdef WAL_DEBUG
  	{
Index: src/backend/utils/sort/tuplesort.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/utils/sort/tuplesort.c,v
retrieving revision 1.68
diff -c -r1.68 tuplesort.c
*** src/backend/utils/sort/tuplesort.c	14 Jul 2006 14:52:25 -0000	1.68
--- src/backend/utils/sort/tuplesort.c	14 Sep 2006 14:53:48 -0000
***************
*** 113,118 ****
--- 113,119 ----
  
  
  /* GUC variable */
+ bool		enable_bounded_sort = true;
  #ifdef TRACE_SORT
  bool		trace_sort = false;
  #endif
***************
*** 159,164 ****
--- 160,166 ----
  typedef enum
  {
  	TSS_INITIAL,				/* Loading tuples; still within memory limit */
+ 	TSS_BOUNDED,				/* Loading tuples into max-heap (backwards heap) */
  	TSS_BUILDRUNS,				/* Loading tuples; writing to tape */
  	TSS_SORTEDINMEM,			/* Sort completed entirely in memory */
  	TSS_SORTEDONTAPE,			/* Sort completed, final run is on tape */
***************
*** 166,171 ****
--- 168,190 ----
  } TupSortStatus;
  
  /*
+  * Parameters to pass to heap functions to dictate whether to generate a
+  * max-heap or min-heap. Normally we use a min-heap but when performing a
+  * bounded sort we need a max-heap so we can find the high tuples to throw out.
+  *
+  * It's important the values be 1 and -1 so we can multiple the regular
+  * comparator by them to get the expected results.
+  */
+ 
+ typedef enum
+ {
+ 	NOT_IN_HEAP_ORDER = 0,
+ 	HEAP_ORDER_MIN = 1,				/* Min-heap (ascending sort order) */
+ 	HEAP_ORDER_MAX = -1 			/* Max-heap (descending sort order) */
+ } HeapOrder;
+ 	
+ 
+ /*
   * Parameters for calculation of number of tapes to use --- see inittapes()
   * and tuplesort_merge_order().
   *
***************
*** 182,193 ****
--- 201,220 ----
  
  /*
   * Private state of a Tuplesort operation.
+  *
+  * Currently heaporder is:
+  *  - always HEAP_ORDER_MIN if we're in TSS_BUILDRUNS
+  *  - always HEAP_ORDER_MAX if we're in TSS_BOUNDED
+  *  - always NOT_IN_HEAP_ORDER otherwise
   */
  struct Tuplesortstate
  {
  	TupSortStatus status;		/* enumerated value as shown above */
  	int			nKeys;			/* number of columns in sort key */
  	bool		randomAccess;	/* did caller request random access? */
+ 	bool		bounded;		/* did caller specify a maximum number of tuples to return? */
+ 	unsigned	bound;			/* if bounded, what is the maximum number of tuples */
+ 	HeapOrder   heaporder;		/* if memtuples is in heap form, which direction */
  	long		availMem;		/* remaining memory available, in bytes */
  	long		allowedMem;		/* total memory allowed, in bytes */
  	int			maxTapes;		/* number of tapes (Knuth's T) */
***************
*** 405,410 ****
--- 432,439 ----
  static void mergepreread(Tuplesortstate *state);
  static void mergeprereadone(Tuplesortstate *state, int srcTape);
  static void dumptuples(Tuplesortstate *state, bool alltuples);
+ static void tuplesort_heapify(Tuplesortstate *state, int tupindex, HeapOrder heaporder);
+ static void tuplesort_unheapify(Tuplesortstate *state, int checkIndex);
  static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
  					  int tupleindex, bool checkIndex);
  static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
***************
*** 492,497 ****
--- 521,527 ----
  #endif
  
  	state->status = TSS_INITIAL;
+ 	state->heaporder = NOT_IN_HEAP_ORDER;
  	state->randomAccess = randomAccess;
  	state->allowedMem = workMem * 1024L;
  	state->availMem = state->allowedMem;
***************
*** 522,527 ****
--- 552,589 ----
  	return state;
  }
  
+ /* tuplesort_set_bound - External API to set a bound on a tuplesort
+  *
+  * Must be called before inserting any tuples.
+ 
+  * Sets a maximum number of tuples the caller is interested in. The first
+  * <bound> tuples are maintained using a simple insertion sort and returned
+  * normally. Any tuples that lie after those in the sorted result set are
+  * simply thrown out
+  */
+ 
+ void
+ tuplesort_set_bound(Tuplesortstate *state, int64 bound)
+ {
+ 	Assert(state->memtupcount == 0);
+ 	Assert(state->status == TSS_INITIAL);
+ 
+ 	if (!enable_bounded_sort)
+ 		return;
+ 
+ 	if (bound > (int64)UINT_MAX/4) {
+ 		state->bounded = false;
+ #ifdef TRACE_SORT
+ 		if (trace_sort)
+ 			elog(LOG, "refusing to use bounded sort for large limit %lld", bound);
+ #endif
+ 		return;
+ 	}
+ 
+ 	state->bounded = true;
+ 	state->bound = (unsigned)bound;
+ }
+ 
  Tuplesortstate *
  tuplesort_begin_heap(TupleDesc tupDesc,
  					 int nkeys,
***************
*** 857,864 ****
--- 919,942 ----
  				(void) grow_memtuples(state);
  				Assert(state->memtupcount < state->memtupsize);
  			}
+ 
+ 			Assert(state->heaporder == NOT_IN_HEAP_ORDER);
+ 
  			state->memtuples[state->memtupcount++] = *tuple;
  
+ 			/* Check if it's time to switch over to a bounded heapsort */
+ 			if (state->bounded && (state->memtupcount > state->bound*2 || 
+ 								   (state->memtupcount > state->bound && LACKMEM(state)))) 
+ 			{
+ #ifdef TRACE_SORT
+ 				if (trace_sort) elog(LOG, "switching to bounded heap sort at %d tuples", state->memtupcount);
+ #endif
+ 				tuplesort_heapify(state, 0, HEAP_ORDER_MAX);
+ 				Assert(state->memtupcount == state->bound);
+ 				state->status = TSS_BOUNDED;
+ 				return;
+ 			}
+ 
  			/*
  			 * Done if we still fit in available memory and have array slots.
  			 */
***************
*** 875,880 ****
--- 953,977 ----
  			 */
  			dumptuples(state, false);
  			break;
+ 
+ 		case TSS_BOUNDED:
+ 			Assert(state->memtupcount == state->bound);
+ 
+ 			/* We need to be careful to avoid inserting our tuple if it would
+ 			 * only be removed since we may have memtuples completely full here
+ 			 * (if the bound just barely fit in ram). Besides that's probably a
+ 			 * common case and it's worth optimizing for.
+ 			 */
+ 			if (COMPARETUP(state, tuple, &state->memtuples[0]) >= 0)
+ 				/* we can throw out the new tuple -- it's >= the top of the heap */
+ 				return;
+ 			else {
+ 				tuplesort_heap_siftup(state, false);
+ 				tuplesort_heap_insert(state, tuple, 0, false);
+ 			}
+ 			
+ 			break;
+ 
  		case TSS_BUILDRUNS:
  
  			/*
***************
*** 924,949 ****
  	switch (state->status)
  	{
  		case TSS_INITIAL:
  
  			/*
  			 * We were able to accumulate all the tuples within the allowed
  			 * amount of memory.  Just qsort 'em and we're done.
  			 */
! 			if (state->memtupcount > 1)
  			{
  				qsort_tuplesortstate = state;
  				qsort((void *) state->memtuples, state->memtupcount,
  					  sizeof(SortTuple), qsort_comparetup);
  			}
  			state->current = 0;
  			state->eof_reached = false;
  			state->markpos_offset = 0;
  			state->markpos_eof = false;
  			state->status = TSS_SORTEDINMEM;
  			break;
! 		case TSS_BUILDRUNS:
  
  			/*
  			 * Finish tape-based sort.	First, flush all tuples remaining in
  			 * memory out to tape; then merge until we have a single remaining
  			 * run (or, if !randomAccess, one run per tape). Note that
--- 1021,1076 ----
  	switch (state->status)
  	{
  		case TSS_INITIAL:
+ 			Assert(state->heaporder == NOT_IN_HEAP_ORDER);
  
  			/*
  			 * We were able to accumulate all the tuples within the allowed
  			 * amount of memory.  Just qsort 'em and we're done.
  			 */
! 			if (state->memtupcount > 1) 
  			{
+ #ifdef TRACE_SORT
+ 				if (trace_sort) elog(LOG, "doing qsort of %d tuples", state->memtupcount);
+ #endif
  				qsort_tuplesortstate = state;
  				qsort((void *) state->memtuples, state->memtupcount,
  					  sizeof(SortTuple), qsort_comparetup);
  			}
+ 
  			state->current = 0;
  			state->eof_reached = false;
  			state->markpos_offset = 0;
  			state->markpos_eof = false;
  			state->status = TSS_SORTEDINMEM;
  			break;
! 
! 		case TSS_BOUNDED:
! 			Assert(state->heaporder == HEAP_ORDER_MAX);
  
  			/*
+ 			 * We were able to accumulate all the tuples required for output in
+ 			 * memory using a heap to eliminate excess tuples and, if relevant,
+ 			 * duplicates.
+ 			 */
+ 			if (state->memtupcount > 1) 
+ 			{
+ #ifdef TRACE_SORT
+ 				if (trace_sort) elog(LOG, "doing unheapify of %d tuples", state->memtupcount);
+ #endif
+ 				tuplesort_unheapify(state, false);
+ 			}
+ 			
+ 			state->current = 0;
+ 			state->eof_reached = false;
+ 			state->markpos_offset = 0;
+ 			state->markpos_eof = false;
+ 			state->status = TSS_SORTEDINMEM;
+ 			break;
+ 
+ 		case TSS_BUILDRUNS:
+ 			Assert(state->heaporder == HEAP_ORDER_MIN);
+ 			
+ 			/*
  			 * Finish tape-based sort.	First, flush all tuples remaining in
  			 * memory out to tape; then merge until we have a single remaining
  			 * run (or, if !randomAccess, one run per tape). Note that
***************
*** 1298,1304 ****
  inittapes(Tuplesortstate *state)
  {
  	int			maxTapes,
- 				ntuples,
  				j;
  	long		tapeSpace;
  
--- 1425,1430 ----
***************
*** 1353,1372 ****
  	 * Convert the unsorted contents of memtuples[] into a heap. Each tuple is
  	 * marked as belonging to run number zero.
  	 *
- 	 * NOTE: we pass false for checkIndex since there's no point in comparing
- 	 * indexes in this step, even though we do intend the indexes to be part
- 	 * of the sort key...
  	 */
! 	ntuples = state->memtupcount;
! 	state->memtupcount = 0;		/* make the heap empty */
! 	for (j = 0; j < ntuples; j++)
! 	{
! 		/* Must copy source tuple to avoid possible overwrite */
! 		SortTuple	stup = state->memtuples[j];
! 
! 		tuplesort_heap_insert(state, &stup, 0, false);
! 	}
! 	Assert(state->memtupcount == ntuples);
  
  	state->currentRun = 0;
  
--- 1479,1486 ----
  	 * Convert the unsorted contents of memtuples[] into a heap. Each tuple is
  	 * marked as belonging to run number zero.
  	 *
  	 */
! 	tuplesort_heapify(state, 0, HEAP_ORDER_MIN);
  
  	state->currentRun = 0;
  
***************
*** 1980,1989 ****
   * as the front of the sort key; otherwise, no.
   */
  
! #define HEAPCOMPARE(tup1,tup2) \
! 	(checkIndex && ((tup1)->tupindex != (tup2)->tupindex) ? \
! 	 ((tup1)->tupindex) - ((tup2)->tupindex) : \
! 	 COMPARETUP(state, tup1, tup2))
  
  /*
   * Insert a new tuple into an empty or existing heap, maintaining the
--- 2094,2194 ----
   * as the front of the sort key; otherwise, no.
   */
  
! #define HEAPCOMPARE(tup1,tup2,order)							\
! 	(order *													\
! 	 (checkIndex && ((tup1)->tupindex != (tup2)->tupindex) ?	\
! 	  ((tup1)->tupindex) - ((tup2)->tupindex) : 				\
! 	  COMPARETUP(state, tup1, tup2)))
! 
! /*
!  * Convert the existing unordered list of sorttuples to a heap in either order.
!  * This used to be inline but now there are three separate places we heap sort
!  * (initializing the tapes, if we have a bounded output, and any time the user
!  * says he doesn't want to use glibc's qsort).
!  *
!  * NOTE heapify passes false for checkIndex (and stores a constant tupindex
!  * passed as a parameter) even though we use heaps for multi-run sources
!  * because we only heapify when we're doing in-memory sorts or in inittapes
!  * before there's any point in comparing tupindexes.
!  */
! 
! static void
! tuplesort_heapify(Tuplesortstate *state, int tupindex, HeapOrder heaporder)
! {
! 	int i,memtupcount;
! 
! 	if (state->heaporder == heaporder)
! 		return;
! 
! 	Assert(heaporder == HEAP_ORDER_MIN || 
! 		   heaporder == HEAP_ORDER_MAX);
! 
! 	state->heaporder = heaporder;
! 	memtupcount = state->memtupcount;
! 	state->memtupcount = 0;		/* make the heap empty */
! 	for(i=0;i<memtupcount;i++) {
! 		if ( state->bounded && 
! 			 state->memtupcount >= state->bound && 
! 			 heaporder * COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
! 		{
! 			/* new tuple would just get thrown out due to bound so skip it */
! 			continue;
! 		}
! 		else
! 		{
! 			/* Must copy source tuple to avoid possible overwrite */
! 			SortTuple stup = state->memtuples[i];
! 			tuplesort_heap_insert(state, &stup, tupindex, false);
! 		}
! 		
! 		if (state->bounded && state->memtupcount > state->bound)
! 			tuplesort_heap_siftup(state, false);
! 	}
! 	Assert(state->memtupcount == state->bounded ? state->bound : memtupcount);
! }
! 
! /* Unheapify - convert a heap to a sorted in memory list
!  *
!  * We always generate an ascending list, so if the heap was in HEAP_ORDER_MAX
!  * we can do it in place. If it's in HEAP_ORDER_MIN we have a problem but we
!  * don't currently need to unheapify a heap generated in order
!  */
! 
! static void
! tuplesort_unheapify(Tuplesortstate *state, int checkIndex)
! {
! 	int memtupcount;
! 
! 	Assert(state->heaporder == HEAP_ORDER_MAX);
! 
! 	/* unheapify in place. store each tuple in the newly freed up slot
! 	 * immediately following the heap. this reverses the list so in the case of
! 	 * a max-ordered heap this results in a properly ordered list of tuples */
! 	memtupcount = state->memtupcount;
! 	while (state->memtupcount > 0) {
! 		SortTuple stup = state->memtuples[0];
! 		tuplesort_heap_siftup(state, checkIndex);
! 		state->memtuples[state->memtupcount+1] = stup;
! 	}
! 	state->memtupcount = memtupcount;
! 
! #if 0
! 	/* in the case of a min-ordered heap the list is now backwards so reverse
! 	 * it. This isn't used anywhere though */
! 	if (state->heaporder == HEAP_ORDER_MIN) {
! 		int i;
! 		for(i=0;i<state->memtupcount/2;i++) {
! 			SortTuple stup = state->memtuples[i];
! 			state->memtuples[i] = state->memtuples[state->memtupcount-i];
! 			state->memtuples[state->memtupcount-1] = stup;
! 		}
! 	}
! #endif
! 
! 	state->heaporder = NOT_IN_HEAP_ORDER;
! 
! 	Assert(state->memtupcount == memtupcount);
! }
  
  /*
   * Insert a new tuple into an empty or existing heap, maintaining the
***************
*** 2002,2007 ****
--- 2207,2215 ----
  	SortTuple  *memtuples;
  	int			j;
  
+ 	Assert(state->heaporder == HEAP_ORDER_MIN || 
+ 		   state->heaporder == HEAP_ORDER_MAX);
+ 
  	/*
  	 * Save the tupleindex --- see notes above about writing on *tuple.
  	 * It's a historical artifact that tupleindex is passed as a separate
***************
*** 2022,2028 ****
  	{
  		int			i = (j - 1) >> 1;
  
! 		if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0)
  			break;
  		memtuples[j] = memtuples[i];
  		j = i;
--- 2230,2236 ----
  	{
  		int			i = (j - 1) >> 1;
  
! 		if (HEAPCOMPARE(tuple, &memtuples[i], state->heaporder) >= 0)
  			break;
  		memtuples[j] = memtuples[i];
  		j = i;
***************
*** 2042,2047 ****
--- 2250,2258 ----
  	int			i,
  				n;
  
+ 	Assert(state->heaporder == HEAP_ORDER_MIN || 
+ 		   state->heaporder == HEAP_ORDER_MAX);
+ 
  	if (--state->memtupcount <= 0)
  		return;
  	n = state->memtupcount;
***************
*** 2054,2062 ****
  		if (j >= n)
  			break;
  		if (j + 1 < n &&
! 			HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0)
  			j++;
! 		if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0)
  			break;
  		memtuples[i] = memtuples[j];
  		i = j;
--- 2265,2273 ----
  		if (j >= n)
  			break;
  		if (j + 1 < n &&
! 			HEAPCOMPARE(&memtuples[j], &memtuples[j + 1], state->heaporder) > 0)
  			j++;
! 		if (HEAPCOMPARE(tuple, &memtuples[j], state->heaporder) <= 0)
  			break;
  		memtuples[i] = memtuples[j];
  		i = j;
Index: src/include/nodes/execnodes.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/nodes/execnodes.h,v
retrieving revision 1.160
diff -c -r1.160 execnodes.h
*** src/include/nodes/execnodes.h	25 Aug 2006 04:06:56 -0000	1.160
--- src/include/nodes/execnodes.h	14 Sep 2006 14:53:50 -0000
***************
*** 1227,1232 ****
--- 1227,1235 ----
  {
  	ScanState	ss;				/* its first field is NodeTag */
  	bool		randomAccess;	/* need random access to sort output? */
+ 	bool		unique;			/* can we throw out duplicates? */
+ 	bool		bounded;		/* is the result set bounded */
+ 	int64		bound;			/* how many tuples are needed */
  	bool		sort_Done;		/* sort completed yet? */
  	void	   *tuplesortstate; /* private state of tuplesort.c */
  } SortState;
***************
*** 1296,1301 ****
--- 1299,1306 ----
  	PlanState	ps;				/* its first field is NodeTag */
  	FmgrInfo   *eqfunctions;	/* per-field lookup data for equality fns */
  	MemoryContext tempContext;	/* short-term context for comparisons */
+ 	bool		bounded;		/* is the result set bounded */
+ 	int64		bound;			/* how many tuples are needed */
  } UniqueState;
  
  /* ----------------
Index: src/include/utils/tuplesort.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/utils/tuplesort.h,v
retrieving revision 1.22
diff -c -r1.22 tuplesort.h
*** src/include/utils/tuplesort.h	13 Jul 2006 16:49:20 -0000	1.22
--- src/include/utils/tuplesort.h	14 Sep 2006 14:53:50 -0000
***************
*** 84,89 ****
--- 84,98 ----
  extern void tuplesort_markpos(Tuplesortstate *state);
  extern void tuplesort_restorepos(Tuplesortstate *state);
  
+ /* This can be called at any time before performsort to advise tuplesort that
+  * only this many tuples are interesting. If that many tuples fit in memory and
+  * we haven't already overflowed to disk then tuplesort will switch to a simple
+  * insertion sort or heap sort and throw away the uninteresting tuples.
+  */
+ 
+ extern void 
+ tuplesort_set_bound(Tuplesortstate *state, int64 bound);
+ 
  /*
   * This routine selects an appropriate sorting function to implement
   * a sort operator as efficiently as possible.
