Both Robert and Heikki expressed dissatisfaction with the fact that
B-Tree index builds don't use sortsupport. Because B-Tree index builds
cannot really use the "onlyKey" optimization, the historic oversight
of not supporting B-Tree builds (and CLUSTER) might have been at least
a little understandable [1]. But with the recent work on sortsupport
for text, it's likely that B-Tree index builds will be missing out on
rather a lot by not taking advantage of sortsupport.

Attached patch modifies tuplesort to correct this oversight. What's
really nice about it is that there is actually a net negative code
footprint:

 src/backend/access/nbtree/nbtsort.c  |  63 +++---
 src/backend/utils/sort/sortsupport.c |  77 ++++++--
 src/backend/utils/sort/tuplesort.c   | 274 +++++++++++----------------
 src/include/utils/sortsupport.h      |   3 +
 4 files changed, 203 insertions(+), 214 deletions(-)

I've been able to throw out a lot of code, including two virtually
identical inlinable comparators (that have logic for NULL-wise
comparisons). This patch pretty much justifies itself as a refactoring
exercise, without performance improvements entering into it, which is
nice. I haven't benchmarked it yet, but it seems reasonable to assume
that much the same benefits will be seen as were seen for the
MinimalTuple case (for multiple-attribute sorts, that similarly cannot
use the "onlyKey" optimization).

With this patch, all callers of _bt_mkscankey_nodata() now use
sortsupport. This raises the question: Why not just have
_bt_mkscankey_nodata() do it all for us? I think that continuing to
have B-Tree provide a scanKey, and working off of that is a better
abstraction. It would save a few cycles to have the
index_getprocinfo() call currently within _bt_mkscankey_nodata() look
for BTSORTSUPPORT_PROC rather than BTORDER_PROC and be done with it,
but I'd call that a modularity violation. Tuplesort decides a strategy
(BTGreaterStrategyNumber or BTLessStrategyNumber) based on the scanKey
B-Tree private flag SK_BT_DESC, and it's appropriate for that to live
in tuplesort's head if possible, because tuplesort/sortsupport tracks
sort "direction" in a fairly intimate way. Besides, I think that
sortsupport already has enough clients for what it is.

I imagine it makes sense to directly access a sortsupport-installed
comparator through a scanKey, for actual index scans (think
abbreviated keys in internal B-Tree pages), but I still want
uniformity with the other cases within tuplesort (i.e. the
MinimalTuple and Datum cases), so I'm not about to change scanKeys to
have a comparator that doesn't need a fmgr trampoline for the benefit
of this patch. Note that I don't even store a scanKey within
Tuplesortstate anymore. That uniformity is what allowed to to throw
out the per-tuplesort-case reversedirection() logic, and use generic
reversedirection() logic instead (as anticipated by current comments
within Tuplesortstate).

Thoughts?

[1] 
http://www.postgresql.org/message-id/cam3swzqlg8nx2yeb+67xx0gig+-folfbtqjkg+jl15_zhi1...@mail.gmail.com
-- 
Peter Geoghegan
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
new file mode 100644
index e8a89d2..a6c44a7
*** a/src/backend/access/nbtree/nbtsort.c
--- b/src/backend/access/nbtree/nbtsort.c
*************** _bt_load(BTWriteState *wstate, BTSpool *
*** 684,689 ****
--- 684,690 ----
  	int			i,
  				keysz = RelationGetNumberOfAttributes(wstate->index);
  	ScanKey		indexScanKey = NULL;
+ 	SortSupport sortKeys;
  
  	if (merge)
  	{
*************** _bt_load(BTWriteState *wstate, BTSpool *
*** 699,704 ****
--- 700,730 ----
  										true, &should_free2);
  		indexScanKey = _bt_mkscankey_nodata(wstate->index);
  
+ 		/* Prepare SortSupport data for each column */
+ 		sortKeys = (SortSupport) palloc0(keysz * sizeof(SortSupportData));
+ 
+ 		for (i = 0; i < keysz; i++)
+ 		{
+ 			SortSupport sortKey = sortKeys + i;
+ 			ScanKey		scanKey = indexScanKey + i;
+ 			int16		strategy;
+ 
+ 			sortKey->ssup_cxt = CurrentMemoryContext;
+ 			sortKey->ssup_collation = scanKey->sk_collation;
+ 			sortKey->ssup_nulls_first =
+ 				(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
+ 			sortKey->ssup_attno = scanKey->sk_attno;
+ 
+ 			AssertState(sortKey->ssup_attno != 0);
+ 
+ 			strategy  = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
+ 				BTGreaterStrategyNumber : BTLessStrategyNumber;
+ 
+ 			PrepareSortSupportFromIndexRel(wstate->index, strategy, sortKey);
+ 		}
+ 
+ 		_bt_freeskey(indexScanKey);
+ 
  		for (;;)
  		{
  			load1 = true;		/* load BTSpool next ? */
*************** _bt_load(BTWriteState *wstate, BTSpool *
*** 711,753 ****
  			{
  				for (i = 1; i <= keysz; i++)
  				{
! 					ScanKey		entry;
  					Datum		attrDatum1,
  								attrDatum2;
  					bool		isNull1,
  								isNull2;
  					int32		compare;
  
! 					entry = indexScanKey + i - 1;
  					attrDatum1 = index_getattr(itup, i, tupdes, &isNull1);
  					attrDatum2 = index_getattr(itup2, i, tupdes, &isNull2);
- 					if (isNull1)
- 					{
- 						if (isNull2)
- 							compare = 0;		/* NULL "=" NULL */
- 						else if (entry->sk_flags & SK_BT_NULLS_FIRST)
- 							compare = -1;		/* NULL "<" NOT_NULL */
- 						else
- 							compare = 1;		/* NULL ">" NOT_NULL */
- 					}
- 					else if (isNull2)
- 					{
- 						if (entry->sk_flags & SK_BT_NULLS_FIRST)
- 							compare = 1;		/* NOT_NULL ">" NULL */
- 						else
- 							compare = -1;		/* NOT_NULL "<" NULL */
- 					}
- 					else
- 					{
- 						compare =
- 							DatumGetInt32(FunctionCall2Coll(&entry->sk_func,
- 														 entry->sk_collation,
- 															attrDatum1,
- 															attrDatum2));
  
! 						if (entry->sk_flags & SK_BT_DESC)
! 							compare = -compare;
! 					}
  					if (compare > 0)
  					{
  						load1 = false;
--- 737,756 ----
  			{
  				for (i = 1; i <= keysz; i++)
  				{
! 					SortSupport	entry;
  					Datum		attrDatum1,
  								attrDatum2;
  					bool		isNull1,
  								isNull2;
  					int32		compare;
  
! 					entry = sortKeys + i - 1;
  					attrDatum1 = index_getattr(itup, i, tupdes, &isNull1);
  					attrDatum2 = index_getattr(itup2, i, tupdes, &isNull2);
  
! 					compare = ApplySortComparator(attrDatum1, isNull1,
! 												  attrDatum2, isNull2,
! 												  entry);
  					if (compare > 0)
  					{
  						load1 = false;
*************** _bt_load(BTWriteState *wstate, BTSpool *
*** 781,787 ****
  												true, &should_free2);
  			}
  		}
! 		_bt_freeskey(indexScanKey);
  	}
  	else
  	{
--- 784,790 ----
  												true, &should_free2);
  			}
  		}
! 		pfree(sortKeys);
  	}
  	else
  	{
diff --git a/src/backend/utils/sort/sortsupport.c b/src/backend/utils/sort/sortsupport.c
new file mode 100644
index 2240fd0..8391b1c
*** a/src/backend/utils/sort/sortsupport.c
--- b/src/backend/utils/sort/sortsupport.c
***************
*** 21,26 ****
--- 21,27 ----
  #include "access/nbtree.h"
  #include "fmgr.h"
  #include "utils/lsyscache.h"
+ #include "utils/rel.h"
  #include "utils/sortsupport.h"
  
  
*************** PrepareSortSupportComparisonShim(Oid cmp
*** 86,113 ****
  }
  
  /*
!  * Fill in SortSupport given an ordering operator (btree "<" or ">" operator).
!  *
!  * Caller must previously have zeroed the SortSupportData structure and then
!  * filled in ssup_cxt, ssup_collation, and ssup_nulls_first.  This will fill
!  * in ssup_reverse as well as the comparator function pointer.
   */
! void
! PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)
  {
  	Oid			sortSupportFunction;
- 	Oid			opfamily;
- 	Oid			opcintype;
- 	int16		strategy;
- 
- 	Assert(ssup->comparator == NULL);
- 
- 	/* Find the operator in pg_amop */
- 	if (!get_ordering_op_properties(orderingOp, &opfamily, &opcintype,
- 									&strategy))
- 		elog(ERROR, "operator %u is not a valid ordering operator",
- 			 orderingOp);
- 	ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber);
  
  	/* Look for a sort support function */
  	sortSupportFunction = get_opfamily_proc(opfamily, opcintype, opcintype,
--- 87,98 ----
  }
  
  /*
!  * Lookup and call sortsupport function to setup state, or create shim
   */
! static void
! FinishSortSupportFunction(Oid opfamily, Oid opcintype, SortSupport ssup)
  {
  	Oid			sortSupportFunction;
  
  	/* Look for a sort support function */
  	sortSupportFunction = get_opfamily_proc(opfamily, opcintype, opcintype,
*************** PrepareSortSupportFromOrderingOp(Oid ord
*** 136,138 ****
--- 121,177 ----
  		PrepareSortSupportComparisonShim(sortFunction, ssup);
  	}
  }
+ 
+ /*
+  * Fill in SortSupport given an ordering operator (btree "<" or ">" operator).
+  *
+  * Caller must previously have zeroed the SortSupportData structure and then
+  * filled in ssup_cxt, ssup_collation, and ssup_nulls_first.  This will fill
+  * in ssup_reverse as well as the comparator function pointer.
+  */
+ void
+ PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)
+ {
+ 	Oid			opfamily;
+ 	Oid			opcintype;
+ 	int16		strategy;
+ 
+ 	Assert(ssup->comparator == NULL);
+ 
+ 	/* Find the operator in pg_amop */
+ 	if (!get_ordering_op_properties(orderingOp, &opfamily, &opcintype,
+ 									&strategy))
+ 		elog(ERROR, "operator %u is not a valid ordering operator",
+ 			 orderingOp);
+ 	ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber);
+ 
+ 	FinishSortSupportFunction(opfamily, opcintype, ssup);
+ }
+ 
+ /*
+  * Fill in SortSupport given an index relation, attribute, and strategy.
+  *
+  * Caller must previously have zeroed the SortSupportData structure and then
+  * filled in ssup_cxt, ssup_attno, ssup_collation, and ssup_nulls_first.  This
+  * will fill in ssup_reverse (based on the supplied strategy), as well as the
+  * comparator function pointer.
+  */
+ void
+ PrepareSortSupportFromIndexRel(Relation indexRel, int16 strategy,
+ 							   SortSupport ssup)
+ {
+ 	Oid			opfamily = indexRel->rd_opfamily[ssup->ssup_attno - 1];
+ 	Oid			opcintype = indexRel->rd_opcintype[ssup->ssup_attno - 1];
+ 
+ 	Assert(ssup->comparator == NULL);
+ 
+ 	/* Find the operator in pg_amop */
+ 	if (indexRel->rd_rel->relam != BTREE_AM_OID)
+ 		elog(ERROR, "unexpected non-btree AM: %u", indexRel->rd_rel->relam);
+ 	if (strategy != BTGreaterStrategyNumber &&
+ 		strategy != BTLessStrategyNumber)
+ 		elog(ERROR, "unexpected sort support strategy: %d", strategy);
+ 	ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber);
+ 
+ 	FinishSortSupportFunction(opfamily, opcintype, ssup);
+ }
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
new file mode 100644
index 8e57505..0486bab
*** a/src/backend/utils/sort/tuplesort.c
--- b/src/backend/utils/sort/tuplesort.c
*************** struct Tuplesortstate
*** 257,269 ****
  										int tapenum, unsigned int len);
  
  	/*
- 	 * Function to reverse the sort direction from its current state. (We
- 	 * could dispense with this if we wanted to enforce that all variants
- 	 * represent the sort key information alike.)
- 	 */
- 	void		(*reversedirection) (Tuplesortstate *state);
- 
- 	/*
  	 * This array holds the tuples now in sort memory.  If we are in state
  	 * INITIAL, the tuples are in no particular order; if we are in state
  	 * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
--- 257,262 ----
*************** struct Tuplesortstate
*** 340,347 ****
  	bool		markpos_eof;	/* saved "eof_reached" */
  
  	/*
! 	 * These variables are specific to the MinimalTuple case; they are set by
! 	 * tuplesort_begin_heap and used only by the MinimalTuple routines.
  	 */
  	TupleDesc	tupDesc;
  	SortSupport sortKeys;		/* array of length nKeys */
--- 333,341 ----
  	bool		markpos_eof;	/* saved "eof_reached" */
  
  	/*
! 	 * The sortKeys variable is used by every case other than the hash index
! 	 * case; it is set by tuplesort_begin_xxx.  tupDesc is only used by the
! 	 * MinimalTuple and CLUSTER routines, though.
  	 */
  	TupleDesc	tupDesc;
  	SortSupport sortKeys;		/* array of length nKeys */
*************** struct Tuplesortstate
*** 354,361 ****
  
  	/*
  	 * These variables are specific to the CLUSTER case; they are set by
! 	 * tuplesort_begin_cluster.  Note CLUSTER also uses tupDesc and
! 	 * indexScanKey.
  	 */
  	IndexInfo  *indexInfo;		/* info about index being used for reference */
  	EState	   *estate;			/* for evaluating index expressions */
--- 348,354 ----
  
  	/*
  	 * These variables are specific to the CLUSTER case; they are set by
! 	 * tuplesort_begin_cluster.
  	 */
  	IndexInfo  *indexInfo;		/* info about index being used for reference */
  	EState	   *estate;			/* for evaluating index expressions */
*************** struct Tuplesortstate
*** 368,374 ****
  	Relation	indexRel;		/* index being built */
  
  	/* These are specific to the index_btree subcase: */
- 	ScanKey		indexScanKey;
  	bool		enforceUnique;	/* complain if we find duplicate tuples */
  
  	/* These are specific to the index_hash subcase: */
--- 361,366 ----
*************** struct Tuplesortstate
*** 395,401 ****
  #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
  #define WRITETUP(state,tape,stup)	((*(state)->writetup) (state, tape, stup))
  #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
- #define REVERSEDIRECTION(state) ((*(state)->reversedirection) (state))
  #define LACKMEM(state)		((state)->availMem < 0)
  #define USEMEM(state,amt)	((state)->availMem -= (amt))
  #define FREEMEM(state,amt)	((state)->availMem += (amt))
--- 387,392 ----
*************** static void sort_bounded_heap(Tuplesorts
*** 464,469 ****
--- 455,461 ----
  static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
  					  int tupleindex, bool checkIndex);
  static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
+ static void reversedirection(Tuplesortstate *state);
  static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
  static void markrunend(Tuplesortstate *state, int tapenum);
  static int comparetup_heap(const SortTuple *a, const SortTuple *b,
*************** static void writetup_heap(Tuplesortstate
*** 473,479 ****
  			  SortTuple *stup);
  static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
  			 int tapenum, unsigned int len);
- static void reversedirection_heap(Tuplesortstate *state);
  static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
  				   Tuplesortstate *state);
  static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
--- 465,470 ----
*************** static void writetup_index(Tuplesortstat
*** 490,497 ****
  			   SortTuple *stup);
  static void readtup_index(Tuplesortstate *state, SortTuple *stup,
  			  int tapenum, unsigned int len);
- static void reversedirection_index_btree(Tuplesortstate *state);
- static void reversedirection_index_hash(Tuplesortstate *state);
  static int comparetup_datum(const SortTuple *a, const SortTuple *b,
  				 Tuplesortstate *state);
  static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
--- 481,486 ----
*************** static void writetup_datum(Tuplesortstat
*** 499,505 ****
  			   SortTuple *stup);
  static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
  			  int tapenum, unsigned int len);
- static void reversedirection_datum(Tuplesortstate *state);
  static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
  
  /*
--- 488,493 ----
*************** tuplesort_begin_heap(TupleDesc tupDesc,
*** 629,635 ****
  	state->copytup = copytup_heap;
  	state->writetup = writetup_heap;
  	state->readtup = readtup_heap;
- 	state->reversedirection = reversedirection_heap;
  
  	state->tupDesc = tupDesc;	/* assume we need not copy tupDesc */
  
--- 617,622 ----
*************** tuplesort_begin_cluster(TupleDesc tupDes
*** 665,671 ****
--- 652,660 ----
  						int workMem, bool randomAccess)
  {
  	Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+ 	ScanKey			indexScanKey;
  	MemoryContext oldcontext;
+ 	int				i;
  
  	Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
  
*************** tuplesort_begin_cluster(TupleDesc tupDes
*** 691,703 ****
  	state->copytup = copytup_cluster;
  	state->writetup = writetup_cluster;
  	state->readtup = readtup_cluster;
- 	state->reversedirection = reversedirection_index_btree;
  
  	state->indexInfo = BuildIndexInfo(indexRel);
- 	state->indexScanKey = _bt_mkscankey_nodata(indexRel);
  
  	state->tupDesc = tupDesc;	/* assume we need not copy tupDesc */
  
  	if (state->indexInfo->ii_Expressions != NULL)
  	{
  		TupleTableSlot *slot;
--- 680,692 ----
  	state->copytup = copytup_cluster;
  	state->writetup = writetup_cluster;
  	state->readtup = readtup_cluster;
  
  	state->indexInfo = BuildIndexInfo(indexRel);
  
  	state->tupDesc = tupDesc;	/* assume we need not copy tupDesc */
  
+ 	indexScanKey = _bt_mkscankey_nodata(indexRel);
+ 
  	if (state->indexInfo->ii_Expressions != NULL)
  	{
  		TupleTableSlot *slot;
*************** tuplesort_begin_cluster(TupleDesc tupDes
*** 715,720 ****
--- 704,735 ----
  		econtext->ecxt_scantuple = slot;
  	}
  
+ 	/* Prepare SortSupport data for each column */
+ 	state->sortKeys = (SortSupport) palloc0(state->nKeys *
+ 											sizeof(SortSupportData));
+ 
+ 	for (i = 0; i < state->nKeys; i++)
+ 	{
+ 		SortSupport sortKey = state->sortKeys + i;
+ 		ScanKey		scanKey = indexScanKey + i;
+ 		int16		strategy;
+ 
+ 		sortKey->ssup_cxt = CurrentMemoryContext;
+ 		sortKey->ssup_collation = scanKey->sk_collation;
+ 		sortKey->ssup_nulls_first =
+ 			(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
+ 		sortKey->ssup_attno = scanKey->sk_attno;
+ 
+ 		AssertState(sortKey->ssup_attno != 0);
+ 
+ 		strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
+ 			BTGreaterStrategyNumber : BTLessStrategyNumber;
+ 
+ 		PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
+ 	}
+ 
+ 	_bt_freeskey(indexScanKey);
+ 
  	MemoryContextSwitchTo(oldcontext);
  
  	return state;
*************** tuplesort_begin_index_btree(Relation hea
*** 727,733 ****
--- 742,750 ----
  							int workMem, bool randomAccess)
  {
  	Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+ 	ScanKey			indexScanKey;
  	MemoryContext oldcontext;
+ 	int				i;
  
  	oldcontext = MemoryContextSwitchTo(state->sortcontext);
  
*************** tuplesort_begin_index_btree(Relation hea
*** 751,763 ****
  	state->copytup = copytup_index;
  	state->writetup = writetup_index;
  	state->readtup = readtup_index;
- 	state->reversedirection = reversedirection_index_btree;
  
  	state->heapRel = heapRel;
  	state->indexRel = indexRel;
- 	state->indexScanKey = _bt_mkscankey_nodata(indexRel);
  	state->enforceUnique = enforceUnique;
  
  	MemoryContextSwitchTo(oldcontext);
  
  	return state;
--- 768,806 ----
  	state->copytup = copytup_index;
  	state->writetup = writetup_index;
  	state->readtup = readtup_index;
  
  	state->heapRel = heapRel;
  	state->indexRel = indexRel;
  	state->enforceUnique = enforceUnique;
  
+ 	indexScanKey = _bt_mkscankey_nodata(indexRel);
+ 	state->nKeys = RelationGetNumberOfAttributes(indexRel);
+ 	/* Prepare SortSupport data for each column */
+ 	state->sortKeys = (SortSupport) palloc0(state->nKeys *
+ 											sizeof(SortSupportData));
+ 
+ 	for (i = 0; i < state->nKeys; i++)
+ 	{
+ 		SortSupport sortKey = state->sortKeys + i;
+ 		ScanKey		scanKey = indexScanKey + i;
+ 		int16		strategy;
+ 
+ 		sortKey->ssup_cxt = CurrentMemoryContext;
+ 		sortKey->ssup_collation = scanKey->sk_collation;
+ 		sortKey->ssup_nulls_first =
+ 			(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
+ 		sortKey->ssup_attno = scanKey->sk_attno;
+ 
+ 		AssertState(sortKey->ssup_attno != 0);
+ 
+ 		strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
+ 			BTGreaterStrategyNumber : BTLessStrategyNumber;
+ 
+ 		PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
+ 	}
+ 
+ 	_bt_freeskey(indexScanKey);
+ 
  	MemoryContextSwitchTo(oldcontext);
  
  	return state;
*************** tuplesort_begin_index_hash(Relation heap
*** 788,794 ****
  	state->copytup = copytup_index;
  	state->writetup = writetup_index;
  	state->readtup = readtup_index;
- 	state->reversedirection = reversedirection_index_hash;
  
  	state->heapRel = heapRel;
  	state->indexRel = indexRel;
--- 831,836 ----
*************** tuplesort_begin_datum(Oid datumType, Oid
*** 831,837 ****
  	state->copytup = copytup_datum;
  	state->writetup = writetup_datum;
  	state->readtup = readtup_datum;
- 	state->reversedirection = reversedirection_datum;
  
  	state->datumType = datumType;
  
--- 873,878 ----
*************** make_bounded_heap(Tuplesortstate *state)
*** 2599,2605 ****
  	Assert(tupcount >= state->bound);
  
  	/* Reverse sort direction so largest entry will be at root */
! 	REVERSEDIRECTION(state);
  
  	state->memtupcount = 0;		/* make the heap empty */
  	for (i = 0; i < tupcount; i++)
--- 2640,2646 ----
  	Assert(tupcount >= state->bound);
  
  	/* Reverse sort direction so largest entry will be at root */
! 	reversedirection(state);
  
  	state->memtupcount = 0;		/* make the heap empty */
  	for (i = 0; i < tupcount; i++)
*************** sort_bounded_heap(Tuplesortstate *state)
*** 2663,2669 ****
  	 * Reverse sort direction back to the original state.  This is not
  	 * actually necessary but seems like a good idea for tidiness.
  	 */
! 	REVERSEDIRECTION(state);
  
  	state->status = TSS_SORTEDINMEM;
  	state->boundUsed = true;
--- 2704,2710 ----
  	 * Reverse sort direction back to the original state.  This is not
  	 * actually necessary but seems like a good idea for tidiness.
  	 */
! 	reversedirection(state);
  
  	state->status = TSS_SORTEDINMEM;
  	state->boundUsed = true;
*************** tuplesort_heap_siftup(Tuplesortstate *st
*** 2753,2758 ****
--- 2794,2817 ----
  	memtuples[i] = *tuple;
  }
  
+ /*
+  * Function to reverse the sort direction from its current state
+  *
+  * It is not safe to call this when performing hash tuplesorts
+  */
+ static void
+ reversedirection(Tuplesortstate *state)
+ {
+ 	SortSupport sortKey = state->sortKeys;
+ 	int			nkey;
+ 
+ 	for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
+ 	{
+ 		sortKey->ssup_reverse = !sortKey->ssup_reverse;
+ 		sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
+ 	}
+ }
+ 
  
  /*
   * Tape interface routines
*************** markrunend(Tuplesortstate *state, int ta
*** 2781,2853 ****
  
  
  /*
-  * Inline-able copy of FunctionCall2Coll() to save some cycles in sorting.
-  */
- static inline Datum
- myFunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
- {
- 	FunctionCallInfoData fcinfo;
- 	Datum		result;
- 
- 	InitFunctionCallInfoData(fcinfo, flinfo, 2, collation, NULL, NULL);
- 
- 	fcinfo.arg[0] = arg1;
- 	fcinfo.arg[1] = arg2;
- 	fcinfo.argnull[0] = false;
- 	fcinfo.argnull[1] = false;
- 
- 	result = FunctionCallInvoke(&fcinfo);
- 
- 	/* Check for null result, since caller is clearly not expecting one */
- 	if (fcinfo.isnull)
- 		elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
- 
- 	return result;
- }
- 
- /*
-  * Apply a sort function (by now converted to fmgr lookup form)
-  * and return a 3-way comparison result.  This takes care of handling
-  * reverse-sort and NULLs-ordering properly.  We assume that DESC and
-  * NULLS_FIRST options are encoded in sk_flags the same way btree does it.
-  */
- static inline int32
- inlineApplySortFunction(FmgrInfo *sortFunction, int sk_flags, Oid collation,
- 						Datum datum1, bool isNull1,
- 						Datum datum2, bool isNull2)
- {
- 	int32		compare;
- 
- 	if (isNull1)
- 	{
- 		if (isNull2)
- 			compare = 0;		/* NULL "=" NULL */
- 		else if (sk_flags & SK_BT_NULLS_FIRST)
- 			compare = -1;		/* NULL "<" NOT_NULL */
- 		else
- 			compare = 1;		/* NULL ">" NOT_NULL */
- 	}
- 	else if (isNull2)
- 	{
- 		if (sk_flags & SK_BT_NULLS_FIRST)
- 			compare = 1;		/* NOT_NULL ">" NULL */
- 		else
- 			compare = -1;		/* NOT_NULL "<" NULL */
- 	}
- 	else
- 	{
- 		compare = DatumGetInt32(myFunctionCall2Coll(sortFunction, collation,
- 													datum1, datum2));
- 
- 		if (sk_flags & SK_BT_DESC)
- 			compare = -compare;
- 	}
- 
- 	return compare;
- }
- 
- 
- /*
   * Routines specialized for HeapTuple (actually MinimalTuple) case
   */
  
--- 2840,2845 ----
*************** readtup_heap(Tuplesortstate *state, Sort
*** 2972,2991 ****
  								&stup->isnull1);
  }
  
- static void
- reversedirection_heap(Tuplesortstate *state)
- {
- 	SortSupport sortKey = state->sortKeys;
- 	int			nkey;
- 
- 	for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
- 	{
- 		sortKey->ssup_reverse = !sortKey->ssup_reverse;
- 		sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
- 	}
- }
- 
- 
  /*
   * Routines specialized for the CLUSTER case (HeapTuple data, with
   * comparisons per a btree index definition)
--- 2964,2969 ----
*************** static int
*** 2995,3001 ****
  comparetup_cluster(const SortTuple *a, const SortTuple *b,
  				   Tuplesortstate *state)
  {
! 	ScanKey		scanKey = state->indexScanKey;
  	HeapTuple	ltup;
  	HeapTuple	rtup;
  	TupleDesc	tupDesc;
--- 2973,2979 ----
  comparetup_cluster(const SortTuple *a, const SortTuple *b,
  				   Tuplesortstate *state)
  {
! 	SortSupport	sortKey = state->sortKeys;
  	HeapTuple	ltup;
  	HeapTuple	rtup;
  	TupleDesc	tupDesc;
*************** comparetup_cluster(const SortTuple *a, c
*** 3005,3018 ****
  	/* Compare the leading sort key, if it's simple */
  	if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
  	{
! 		compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
! 										  scanKey->sk_collation,
! 										  a->datum1, a->isnull1,
! 										  b->datum1, b->isnull1);
  		if (compare != 0 || state->nKeys == 1)
  			return compare;
  		/* Compare additional columns the hard way */
! 		scanKey++;
  		nkey = 1;
  	}
  	else
--- 2983,2995 ----
  	/* Compare the leading sort key, if it's simple */
  	if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
  	{
! 		compare = ApplySortComparator(a->datum1, a->isnull1,
! 									  b->datum1, b->isnull1,
! 									  sortKey);
  		if (compare != 0 || state->nKeys == 1)
  			return compare;
  		/* Compare additional columns the hard way */
! 		sortKey++;
  		nkey = 1;
  	}
  	else
*************** comparetup_cluster(const SortTuple *a, c
*** 3030,3036 ****
  		/* If not expression index, just compare the proper heap attrs */
  		tupDesc = state->tupDesc;
  
! 		for (; nkey < state->nKeys; nkey++, scanKey++)
  		{
  			AttrNumber	attno = state->indexInfo->ii_KeyAttrNumbers[nkey];
  			Datum		datum1,
--- 3007,3013 ----
  		/* If not expression index, just compare the proper heap attrs */
  		tupDesc = state->tupDesc;
  
! 		for (; nkey < state->nKeys; nkey++, sortKey++)
  		{
  			AttrNumber	attno = state->indexInfo->ii_KeyAttrNumbers[nkey];
  			Datum		datum1,
*************** comparetup_cluster(const SortTuple *a, c
*** 3041,3051 ****
  			datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
  			datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
  
! 			compare = inlineApplySortFunction(&scanKey->sk_func,
! 											  scanKey->sk_flags,
! 											  scanKey->sk_collation,
! 											  datum1, isnull1,
! 											  datum2, isnull2);
  			if (compare != 0)
  				return compare;
  		}
--- 3018,3026 ----
  			datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
  			datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
  
! 			compare = ApplySortComparator(datum1, isnull1,
! 										  datum2, isnull2,
! 										  sortKey);
  			if (compare != 0)
  				return compare;
  		}
*************** comparetup_cluster(const SortTuple *a, c
*** 3077,3091 ****
  		FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
  					   r_index_values, r_index_isnull);
  
! 		for (; nkey < state->nKeys; nkey++, scanKey++)
  		{
! 			compare = inlineApplySortFunction(&scanKey->sk_func,
! 											  scanKey->sk_flags,
! 											  scanKey->sk_collation,
! 											  l_index_values[nkey],
! 											  l_index_isnull[nkey],
! 											  r_index_values[nkey],
! 											  r_index_isnull[nkey]);
  			if (compare != 0)
  				return compare;
  		}
--- 3052,3064 ----
  		FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
  					   r_index_values, r_index_isnull);
  
! 		for (; nkey < state->nKeys; nkey++, sortKey++)
  		{
! 			compare = ApplySortComparator(l_index_values[nkey],
! 										  l_index_isnull[nkey],
! 										  r_index_values[nkey],
! 										  r_index_isnull[nkey],
! 										  sortKey);
  			if (compare != 0)
  				return compare;
  		}
*************** comparetup_index_btree(const SortTuple *
*** 3181,3187 ****
  	 * whether any null fields are present.  Also see the special treatment
  	 * for equal keys at the end.
  	 */
! 	ScanKey		scanKey = state->indexScanKey;
  	IndexTuple	tuple1;
  	IndexTuple	tuple2;
  	int			keysz;
--- 3154,3160 ----
  	 * whether any null fields are present.  Also see the special treatment
  	 * for equal keys at the end.
  	 */
! 	SortSupport	sortKey = state->sortKeys;
  	IndexTuple	tuple1;
  	IndexTuple	tuple2;
  	int			keysz;
*************** comparetup_index_btree(const SortTuple *
*** 3191,3200 ****
  	int32		compare;
  
  	/* Compare the leading sort key */
! 	compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
! 									  scanKey->sk_collation,
! 									  a->datum1, a->isnull1,
! 									  b->datum1, b->isnull1);
  	if (compare != 0)
  		return compare;
  
--- 3164,3172 ----
  	int32		compare;
  
  	/* Compare the leading sort key */
! 	compare = ApplySortComparator(a->datum1, a->isnull1,
! 								  b->datum1, b->isnull1,
! 								  sortKey);
  	if (compare != 0)
  		return compare;
  
*************** comparetup_index_btree(const SortTuple *
*** 3207,3214 ****
  	tuple2 = (IndexTuple) b->tuple;
  	keysz = state->nKeys;
  	tupDes = RelationGetDescr(state->indexRel);
! 	scanKey++;
! 	for (nkey = 2; nkey <= keysz; nkey++, scanKey++)
  	{
  		Datum		datum1,
  					datum2;
--- 3179,3186 ----
  	tuple2 = (IndexTuple) b->tuple;
  	keysz = state->nKeys;
  	tupDes = RelationGetDescr(state->indexRel);
! 	sortKey++;
! 	for (nkey = 2; nkey <= keysz; nkey++, sortKey++)
  	{
  		Datum		datum1,
  					datum2;
*************** comparetup_index_btree(const SortTuple *
*** 3218,3227 ****
  		datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
  		datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
  
! 		compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
! 										  scanKey->sk_collation,
! 										  datum1, isnull1,
! 										  datum2, isnull2);
  		if (compare != 0)
  			return compare;		/* done when we find unequal attributes */
  
--- 3190,3198 ----
  		datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
  		datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
  
! 		compare = ApplySortComparator(datum1, isnull1,
! 									  datum2, isnull2,
! 									  sortKey);
  		if (compare != 0)
  			return compare;		/* done when we find unequal attributes */
  
*************** readtup_index(Tuplesortstate *state, Sor
*** 3395,3420 ****
  								 &stup->isnull1);
  }
  
- static void
- reversedirection_index_btree(Tuplesortstate *state)
- {
- 	ScanKey		scanKey = state->indexScanKey;
- 	int			nkey;
- 
- 	for (nkey = 0; nkey < state->nKeys; nkey++, scanKey++)
- 	{
- 		scanKey->sk_flags ^= (SK_BT_DESC | SK_BT_NULLS_FIRST);
- 	}
- }
- 
- static void
- reversedirection_index_hash(Tuplesortstate *state)
- {
- 	/* We don't support reversing direction in a hash index sort */
- 	elog(ERROR, "reversedirection_index_hash is not implemented");
- }
- 
- 
  /*
   * Routines specialized for DatumTuple case
   */
--- 3366,3371 ----
*************** readtup_datum(Tuplesortstate *state, Sor
*** 3513,3525 ****
  							 &tuplen, sizeof(tuplen));
  }
  
- static void
- reversedirection_datum(Tuplesortstate *state)
- {
- 	state->onlyKey->ssup_reverse = !state->onlyKey->ssup_reverse;
- 	state->onlyKey->ssup_nulls_first = !state->onlyKey->ssup_nulls_first;
- }
- 
  /*
   * Convenience routine to free a tuple previously loaded into sort memory
   */
--- 3464,3469 ----
diff --git a/src/include/utils/sortsupport.h b/src/include/utils/sortsupport.h
new file mode 100644
index 4417143..faae703
*** a/src/include/utils/sortsupport.h
--- b/src/include/utils/sortsupport.h
***************
*** 48,53 ****
--- 48,54 ----
  #define SORTSUPPORT_H
  
  #include "access/attnum.h"
+ #include "utils/relcache.h"
  
  typedef struct SortSupportData *SortSupport;
  
*************** ApplySortComparator(Datum datum1, bool i
*** 152,156 ****
--- 153,159 ----
  /* Other functions in utils/sort/sortsupport.c */
  extern void PrepareSortSupportComparisonShim(Oid cmpFunc, SortSupport ssup);
  extern void PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup);
+ extern void PrepareSortSupportFromIndexRel(Relation indexRel, int16 strategy,
+ 										   SortSupport ssup);
  
  #endif   /* SORTSUPPORT_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to