Both Robert and Heikki expressed dissatisfaction with the fact that B-Tree index builds don't use sortsupport. Because B-Tree index builds cannot really use the "onlyKey" optimization, the historic oversight of not supporting B-Tree builds (and CLUSTER) might have been at least a little understandable [1]. But with the recent work on sortsupport for text, it's likely that B-Tree index builds will be missing out on rather a lot by not taking advantage of sortsupport.
Attached patch modifies tuplesort to correct this oversight. What's really nice about it is that there is actually a net negative code footprint: src/backend/access/nbtree/nbtsort.c | 63 +++--- src/backend/utils/sort/sortsupport.c | 77 ++++++-- src/backend/utils/sort/tuplesort.c | 274 +++++++++++---------------- src/include/utils/sortsupport.h | 3 + 4 files changed, 203 insertions(+), 214 deletions(-) I've been able to throw out a lot of code, including two virtually identical inlinable comparators (that have logic for NULL-wise comparisons). This patch pretty much justifies itself as a refactoring exercise, without performance improvements entering into it, which is nice. I haven't benchmarked it yet, but it seems reasonable to assume that much the same benefits will be seen as were seen for the MinimalTuple case (for multiple-attribute sorts, that similarly cannot use the "onlyKey" optimization). With this patch, all callers of _bt_mkscankey_nodata() now use sortsupport. This raises the question: Why not just have _bt_mkscankey_nodata() do it all for us? I think that continuing to have B-Tree provide a scanKey, and working off of that is a better abstraction. It would save a few cycles to have the index_getprocinfo() call currently within _bt_mkscankey_nodata() look for BTSORTSUPPORT_PROC rather than BTORDER_PROC and be done with it, but I'd call that a modularity violation. Tuplesort decides a strategy (BTGreaterStrategyNumber or BTLessStrategyNumber) based on the scanKey B-Tree private flag SK_BT_DESC, and it's appropriate for that to live in tuplesort's head if possible, because tuplesort/sortsupport tracks sort "direction" in a fairly intimate way. Besides, I think that sortsupport already has enough clients for what it is. I imagine it makes sense to directly access a sortsupport-installed comparator through a scanKey, for actual index scans (think abbreviated keys in internal B-Tree pages), but I still want uniformity with the other cases within tuplesort (i.e. the MinimalTuple and Datum cases), so I'm not about to change scanKeys to have a comparator that doesn't need a fmgr trampoline for the benefit of this patch. Note that I don't even store a scanKey within Tuplesortstate anymore. That uniformity is what allowed to to throw out the per-tuplesort-case reversedirection() logic, and use generic reversedirection() logic instead (as anticipated by current comments within Tuplesortstate). Thoughts? [1] http://www.postgresql.org/message-id/cam3swzqlg8nx2yeb+67xx0gig+-folfbtqjkg+jl15_zhi1...@mail.gmail.com -- Peter Geoghegan
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c new file mode 100644 index e8a89d2..a6c44a7 *** a/src/backend/access/nbtree/nbtsort.c --- b/src/backend/access/nbtree/nbtsort.c *************** _bt_load(BTWriteState *wstate, BTSpool * *** 684,689 **** --- 684,690 ---- int i, keysz = RelationGetNumberOfAttributes(wstate->index); ScanKey indexScanKey = NULL; + SortSupport sortKeys; if (merge) { *************** _bt_load(BTWriteState *wstate, BTSpool * *** 699,704 **** --- 700,730 ---- true, &should_free2); indexScanKey = _bt_mkscankey_nodata(wstate->index); + /* Prepare SortSupport data for each column */ + sortKeys = (SortSupport) palloc0(keysz * sizeof(SortSupportData)); + + for (i = 0; i < keysz; i++) + { + SortSupport sortKey = sortKeys + i; + ScanKey scanKey = indexScanKey + i; + int16 strategy; + + sortKey->ssup_cxt = CurrentMemoryContext; + sortKey->ssup_collation = scanKey->sk_collation; + sortKey->ssup_nulls_first = + (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0; + sortKey->ssup_attno = scanKey->sk_attno; + + AssertState(sortKey->ssup_attno != 0); + + strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ? + BTGreaterStrategyNumber : BTLessStrategyNumber; + + PrepareSortSupportFromIndexRel(wstate->index, strategy, sortKey); + } + + _bt_freeskey(indexScanKey); + for (;;) { load1 = true; /* load BTSpool next ? */ *************** _bt_load(BTWriteState *wstate, BTSpool * *** 711,753 **** { for (i = 1; i <= keysz; i++) { ! ScanKey entry; Datum attrDatum1, attrDatum2; bool isNull1, isNull2; int32 compare; ! entry = indexScanKey + i - 1; attrDatum1 = index_getattr(itup, i, tupdes, &isNull1); attrDatum2 = index_getattr(itup2, i, tupdes, &isNull2); - if (isNull1) - { - if (isNull2) - compare = 0; /* NULL "=" NULL */ - else if (entry->sk_flags & SK_BT_NULLS_FIRST) - compare = -1; /* NULL "<" NOT_NULL */ - else - compare = 1; /* NULL ">" NOT_NULL */ - } - else if (isNull2) - { - if (entry->sk_flags & SK_BT_NULLS_FIRST) - compare = 1; /* NOT_NULL ">" NULL */ - else - compare = -1; /* NOT_NULL "<" NULL */ - } - else - { - compare = - DatumGetInt32(FunctionCall2Coll(&entry->sk_func, - entry->sk_collation, - attrDatum1, - attrDatum2)); ! if (entry->sk_flags & SK_BT_DESC) ! compare = -compare; ! } if (compare > 0) { load1 = false; --- 737,756 ---- { for (i = 1; i <= keysz; i++) { ! SortSupport entry; Datum attrDatum1, attrDatum2; bool isNull1, isNull2; int32 compare; ! entry = sortKeys + i - 1; attrDatum1 = index_getattr(itup, i, tupdes, &isNull1); attrDatum2 = index_getattr(itup2, i, tupdes, &isNull2); ! compare = ApplySortComparator(attrDatum1, isNull1, ! attrDatum2, isNull2, ! entry); if (compare > 0) { load1 = false; *************** _bt_load(BTWriteState *wstate, BTSpool * *** 781,787 **** true, &should_free2); } } ! _bt_freeskey(indexScanKey); } else { --- 784,790 ---- true, &should_free2); } } ! pfree(sortKeys); } else { diff --git a/src/backend/utils/sort/sortsupport.c b/src/backend/utils/sort/sortsupport.c new file mode 100644 index 2240fd0..8391b1c *** a/src/backend/utils/sort/sortsupport.c --- b/src/backend/utils/sort/sortsupport.c *************** *** 21,26 **** --- 21,27 ---- #include "access/nbtree.h" #include "fmgr.h" #include "utils/lsyscache.h" + #include "utils/rel.h" #include "utils/sortsupport.h" *************** PrepareSortSupportComparisonShim(Oid cmp *** 86,113 **** } /* ! * Fill in SortSupport given an ordering operator (btree "<" or ">" operator). ! * ! * Caller must previously have zeroed the SortSupportData structure and then ! * filled in ssup_cxt, ssup_collation, and ssup_nulls_first. This will fill ! * in ssup_reverse as well as the comparator function pointer. */ ! void ! PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup) { Oid sortSupportFunction; - Oid opfamily; - Oid opcintype; - int16 strategy; - - Assert(ssup->comparator == NULL); - - /* Find the operator in pg_amop */ - if (!get_ordering_op_properties(orderingOp, &opfamily, &opcintype, - &strategy)) - elog(ERROR, "operator %u is not a valid ordering operator", - orderingOp); - ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber); /* Look for a sort support function */ sortSupportFunction = get_opfamily_proc(opfamily, opcintype, opcintype, --- 87,98 ---- } /* ! * Lookup and call sortsupport function to setup state, or create shim */ ! static void ! FinishSortSupportFunction(Oid opfamily, Oid opcintype, SortSupport ssup) { Oid sortSupportFunction; /* Look for a sort support function */ sortSupportFunction = get_opfamily_proc(opfamily, opcintype, opcintype, *************** PrepareSortSupportFromOrderingOp(Oid ord *** 136,138 **** --- 121,177 ---- PrepareSortSupportComparisonShim(sortFunction, ssup); } } + + /* + * Fill in SortSupport given an ordering operator (btree "<" or ">" operator). + * + * Caller must previously have zeroed the SortSupportData structure and then + * filled in ssup_cxt, ssup_collation, and ssup_nulls_first. This will fill + * in ssup_reverse as well as the comparator function pointer. + */ + void + PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup) + { + Oid opfamily; + Oid opcintype; + int16 strategy; + + Assert(ssup->comparator == NULL); + + /* Find the operator in pg_amop */ + if (!get_ordering_op_properties(orderingOp, &opfamily, &opcintype, + &strategy)) + elog(ERROR, "operator %u is not a valid ordering operator", + orderingOp); + ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber); + + FinishSortSupportFunction(opfamily, opcintype, ssup); + } + + /* + * Fill in SortSupport given an index relation, attribute, and strategy. + * + * Caller must previously have zeroed the SortSupportData structure and then + * filled in ssup_cxt, ssup_attno, ssup_collation, and ssup_nulls_first. This + * will fill in ssup_reverse (based on the supplied strategy), as well as the + * comparator function pointer. + */ + void + PrepareSortSupportFromIndexRel(Relation indexRel, int16 strategy, + SortSupport ssup) + { + Oid opfamily = indexRel->rd_opfamily[ssup->ssup_attno - 1]; + Oid opcintype = indexRel->rd_opcintype[ssup->ssup_attno - 1]; + + Assert(ssup->comparator == NULL); + + /* Find the operator in pg_amop */ + if (indexRel->rd_rel->relam != BTREE_AM_OID) + elog(ERROR, "unexpected non-btree AM: %u", indexRel->rd_rel->relam); + if (strategy != BTGreaterStrategyNumber && + strategy != BTLessStrategyNumber) + elog(ERROR, "unexpected sort support strategy: %d", strategy); + ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber); + + FinishSortSupportFunction(opfamily, opcintype, ssup); + } diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c new file mode 100644 index 8e57505..0486bab *** a/src/backend/utils/sort/tuplesort.c --- b/src/backend/utils/sort/tuplesort.c *************** struct Tuplesortstate *** 257,269 **** int tapenum, unsigned int len); /* - * Function to reverse the sort direction from its current state. (We - * could dispense with this if we wanted to enforce that all variants - * represent the sort key information alike.) - */ - void (*reversedirection) (Tuplesortstate *state); - - /* * This array holds the tuples now in sort memory. If we are in state * INITIAL, the tuples are in no particular order; if we are in state * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS --- 257,262 ---- *************** struct Tuplesortstate *** 340,347 **** bool markpos_eof; /* saved "eof_reached" */ /* ! * These variables are specific to the MinimalTuple case; they are set by ! * tuplesort_begin_heap and used only by the MinimalTuple routines. */ TupleDesc tupDesc; SortSupport sortKeys; /* array of length nKeys */ --- 333,341 ---- bool markpos_eof; /* saved "eof_reached" */ /* ! * The sortKeys variable is used by every case other than the hash index ! * case; it is set by tuplesort_begin_xxx. tupDesc is only used by the ! * MinimalTuple and CLUSTER routines, though. */ TupleDesc tupDesc; SortSupport sortKeys; /* array of length nKeys */ *************** struct Tuplesortstate *** 354,361 **** /* * These variables are specific to the CLUSTER case; they are set by ! * tuplesort_begin_cluster. Note CLUSTER also uses tupDesc and ! * indexScanKey. */ IndexInfo *indexInfo; /* info about index being used for reference */ EState *estate; /* for evaluating index expressions */ --- 348,354 ---- /* * These variables are specific to the CLUSTER case; they are set by ! * tuplesort_begin_cluster. */ IndexInfo *indexInfo; /* info about index being used for reference */ EState *estate; /* for evaluating index expressions */ *************** struct Tuplesortstate *** 368,374 **** Relation indexRel; /* index being built */ /* These are specific to the index_btree subcase: */ - ScanKey indexScanKey; bool enforceUnique; /* complain if we find duplicate tuples */ /* These are specific to the index_hash subcase: */ --- 361,366 ---- *************** struct Tuplesortstate *** 395,401 **** #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup)) #define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup)) #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len)) - #define REVERSEDIRECTION(state) ((*(state)->reversedirection) (state)) #define LACKMEM(state) ((state)->availMem < 0) #define USEMEM(state,amt) ((state)->availMem -= (amt)) #define FREEMEM(state,amt) ((state)->availMem += (amt)) --- 387,392 ---- *************** static void sort_bounded_heap(Tuplesorts *** 464,469 **** --- 455,461 ---- static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple, int tupleindex, bool checkIndex); static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex); + static void reversedirection(Tuplesortstate *state); static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK); static void markrunend(Tuplesortstate *state, int tapenum); static int comparetup_heap(const SortTuple *a, const SortTuple *b, *************** static void writetup_heap(Tuplesortstate *** 473,479 **** SortTuple *stup); static void readtup_heap(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); - static void reversedirection_heap(Tuplesortstate *state); static int comparetup_cluster(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup); --- 465,470 ---- *************** static void writetup_index(Tuplesortstat *** 490,497 **** SortTuple *stup); static void readtup_index(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); - static void reversedirection_index_btree(Tuplesortstate *state); - static void reversedirection_index_hash(Tuplesortstate *state); static int comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup); --- 481,486 ---- *************** static void writetup_datum(Tuplesortstat *** 499,505 **** SortTuple *stup); static void readtup_datum(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); - static void reversedirection_datum(Tuplesortstate *state); static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup); /* --- 488,493 ---- *************** tuplesort_begin_heap(TupleDesc tupDesc, *** 629,635 **** state->copytup = copytup_heap; state->writetup = writetup_heap; state->readtup = readtup_heap; - state->reversedirection = reversedirection_heap; state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ --- 617,622 ---- *************** tuplesort_begin_cluster(TupleDesc tupDes *** 665,671 **** --- 652,660 ---- int workMem, bool randomAccess) { Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess); + ScanKey indexScanKey; MemoryContext oldcontext; + int i; Assert(indexRel->rd_rel->relam == BTREE_AM_OID); *************** tuplesort_begin_cluster(TupleDesc tupDes *** 691,703 **** state->copytup = copytup_cluster; state->writetup = writetup_cluster; state->readtup = readtup_cluster; - state->reversedirection = reversedirection_index_btree; state->indexInfo = BuildIndexInfo(indexRel); - state->indexScanKey = _bt_mkscankey_nodata(indexRel); state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ if (state->indexInfo->ii_Expressions != NULL) { TupleTableSlot *slot; --- 680,692 ---- state->copytup = copytup_cluster; state->writetup = writetup_cluster; state->readtup = readtup_cluster; state->indexInfo = BuildIndexInfo(indexRel); state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ + indexScanKey = _bt_mkscankey_nodata(indexRel); + if (state->indexInfo->ii_Expressions != NULL) { TupleTableSlot *slot; *************** tuplesort_begin_cluster(TupleDesc tupDes *** 715,720 **** --- 704,735 ---- econtext->ecxt_scantuple = slot; } + /* Prepare SortSupport data for each column */ + state->sortKeys = (SortSupport) palloc0(state->nKeys * + sizeof(SortSupportData)); + + for (i = 0; i < state->nKeys; i++) + { + SortSupport sortKey = state->sortKeys + i; + ScanKey scanKey = indexScanKey + i; + int16 strategy; + + sortKey->ssup_cxt = CurrentMemoryContext; + sortKey->ssup_collation = scanKey->sk_collation; + sortKey->ssup_nulls_first = + (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0; + sortKey->ssup_attno = scanKey->sk_attno; + + AssertState(sortKey->ssup_attno != 0); + + strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ? + BTGreaterStrategyNumber : BTLessStrategyNumber; + + PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey); + } + + _bt_freeskey(indexScanKey); + MemoryContextSwitchTo(oldcontext); return state; *************** tuplesort_begin_index_btree(Relation hea *** 727,733 **** --- 742,750 ---- int workMem, bool randomAccess) { Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess); + ScanKey indexScanKey; MemoryContext oldcontext; + int i; oldcontext = MemoryContextSwitchTo(state->sortcontext); *************** tuplesort_begin_index_btree(Relation hea *** 751,763 **** state->copytup = copytup_index; state->writetup = writetup_index; state->readtup = readtup_index; - state->reversedirection = reversedirection_index_btree; state->heapRel = heapRel; state->indexRel = indexRel; - state->indexScanKey = _bt_mkscankey_nodata(indexRel); state->enforceUnique = enforceUnique; MemoryContextSwitchTo(oldcontext); return state; --- 768,806 ---- state->copytup = copytup_index; state->writetup = writetup_index; state->readtup = readtup_index; state->heapRel = heapRel; state->indexRel = indexRel; state->enforceUnique = enforceUnique; + indexScanKey = _bt_mkscankey_nodata(indexRel); + state->nKeys = RelationGetNumberOfAttributes(indexRel); + /* Prepare SortSupport data for each column */ + state->sortKeys = (SortSupport) palloc0(state->nKeys * + sizeof(SortSupportData)); + + for (i = 0; i < state->nKeys; i++) + { + SortSupport sortKey = state->sortKeys + i; + ScanKey scanKey = indexScanKey + i; + int16 strategy; + + sortKey->ssup_cxt = CurrentMemoryContext; + sortKey->ssup_collation = scanKey->sk_collation; + sortKey->ssup_nulls_first = + (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0; + sortKey->ssup_attno = scanKey->sk_attno; + + AssertState(sortKey->ssup_attno != 0); + + strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ? + BTGreaterStrategyNumber : BTLessStrategyNumber; + + PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey); + } + + _bt_freeskey(indexScanKey); + MemoryContextSwitchTo(oldcontext); return state; *************** tuplesort_begin_index_hash(Relation heap *** 788,794 **** state->copytup = copytup_index; state->writetup = writetup_index; state->readtup = readtup_index; - state->reversedirection = reversedirection_index_hash; state->heapRel = heapRel; state->indexRel = indexRel; --- 831,836 ---- *************** tuplesort_begin_datum(Oid datumType, Oid *** 831,837 **** state->copytup = copytup_datum; state->writetup = writetup_datum; state->readtup = readtup_datum; - state->reversedirection = reversedirection_datum; state->datumType = datumType; --- 873,878 ---- *************** make_bounded_heap(Tuplesortstate *state) *** 2599,2605 **** Assert(tupcount >= state->bound); /* Reverse sort direction so largest entry will be at root */ ! REVERSEDIRECTION(state); state->memtupcount = 0; /* make the heap empty */ for (i = 0; i < tupcount; i++) --- 2640,2646 ---- Assert(tupcount >= state->bound); /* Reverse sort direction so largest entry will be at root */ ! reversedirection(state); state->memtupcount = 0; /* make the heap empty */ for (i = 0; i < tupcount; i++) *************** sort_bounded_heap(Tuplesortstate *state) *** 2663,2669 **** * Reverse sort direction back to the original state. This is not * actually necessary but seems like a good idea for tidiness. */ ! REVERSEDIRECTION(state); state->status = TSS_SORTEDINMEM; state->boundUsed = true; --- 2704,2710 ---- * Reverse sort direction back to the original state. This is not * actually necessary but seems like a good idea for tidiness. */ ! reversedirection(state); state->status = TSS_SORTEDINMEM; state->boundUsed = true; *************** tuplesort_heap_siftup(Tuplesortstate *st *** 2753,2758 **** --- 2794,2817 ---- memtuples[i] = *tuple; } + /* + * Function to reverse the sort direction from its current state + * + * It is not safe to call this when performing hash tuplesorts + */ + static void + reversedirection(Tuplesortstate *state) + { + SortSupport sortKey = state->sortKeys; + int nkey; + + for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++) + { + sortKey->ssup_reverse = !sortKey->ssup_reverse; + sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first; + } + } + /* * Tape interface routines *************** markrunend(Tuplesortstate *state, int ta *** 2781,2853 **** /* - * Inline-able copy of FunctionCall2Coll() to save some cycles in sorting. - */ - static inline Datum - myFunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2) - { - FunctionCallInfoData fcinfo; - Datum result; - - InitFunctionCallInfoData(fcinfo, flinfo, 2, collation, NULL, NULL); - - fcinfo.arg[0] = arg1; - fcinfo.arg[1] = arg2; - fcinfo.argnull[0] = false; - fcinfo.argnull[1] = false; - - result = FunctionCallInvoke(&fcinfo); - - /* Check for null result, since caller is clearly not expecting one */ - if (fcinfo.isnull) - elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid); - - return result; - } - - /* - * Apply a sort function (by now converted to fmgr lookup form) - * and return a 3-way comparison result. This takes care of handling - * reverse-sort and NULLs-ordering properly. We assume that DESC and - * NULLS_FIRST options are encoded in sk_flags the same way btree does it. - */ - static inline int32 - inlineApplySortFunction(FmgrInfo *sortFunction, int sk_flags, Oid collation, - Datum datum1, bool isNull1, - Datum datum2, bool isNull2) - { - int32 compare; - - if (isNull1) - { - if (isNull2) - compare = 0; /* NULL "=" NULL */ - else if (sk_flags & SK_BT_NULLS_FIRST) - compare = -1; /* NULL "<" NOT_NULL */ - else - compare = 1; /* NULL ">" NOT_NULL */ - } - else if (isNull2) - { - if (sk_flags & SK_BT_NULLS_FIRST) - compare = 1; /* NOT_NULL ">" NULL */ - else - compare = -1; /* NOT_NULL "<" NULL */ - } - else - { - compare = DatumGetInt32(myFunctionCall2Coll(sortFunction, collation, - datum1, datum2)); - - if (sk_flags & SK_BT_DESC) - compare = -compare; - } - - return compare; - } - - - /* * Routines specialized for HeapTuple (actually MinimalTuple) case */ --- 2840,2845 ---- *************** readtup_heap(Tuplesortstate *state, Sort *** 2972,2991 **** &stup->isnull1); } - static void - reversedirection_heap(Tuplesortstate *state) - { - SortSupport sortKey = state->sortKeys; - int nkey; - - for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++) - { - sortKey->ssup_reverse = !sortKey->ssup_reverse; - sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first; - } - } - - /* * Routines specialized for the CLUSTER case (HeapTuple data, with * comparisons per a btree index definition) --- 2964,2969 ---- *************** static int *** 2995,3001 **** comparetup_cluster(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) { ! ScanKey scanKey = state->indexScanKey; HeapTuple ltup; HeapTuple rtup; TupleDesc tupDesc; --- 2973,2979 ---- comparetup_cluster(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) { ! SortSupport sortKey = state->sortKeys; HeapTuple ltup; HeapTuple rtup; TupleDesc tupDesc; *************** comparetup_cluster(const SortTuple *a, c *** 3005,3018 **** /* Compare the leading sort key, if it's simple */ if (state->indexInfo->ii_KeyAttrNumbers[0] != 0) { ! compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags, ! scanKey->sk_collation, ! a->datum1, a->isnull1, ! b->datum1, b->isnull1); if (compare != 0 || state->nKeys == 1) return compare; /* Compare additional columns the hard way */ ! scanKey++; nkey = 1; } else --- 2983,2995 ---- /* Compare the leading sort key, if it's simple */ if (state->indexInfo->ii_KeyAttrNumbers[0] != 0) { ! compare = ApplySortComparator(a->datum1, a->isnull1, ! b->datum1, b->isnull1, ! sortKey); if (compare != 0 || state->nKeys == 1) return compare; /* Compare additional columns the hard way */ ! sortKey++; nkey = 1; } else *************** comparetup_cluster(const SortTuple *a, c *** 3030,3036 **** /* If not expression index, just compare the proper heap attrs */ tupDesc = state->tupDesc; ! for (; nkey < state->nKeys; nkey++, scanKey++) { AttrNumber attno = state->indexInfo->ii_KeyAttrNumbers[nkey]; Datum datum1, --- 3007,3013 ---- /* If not expression index, just compare the proper heap attrs */ tupDesc = state->tupDesc; ! for (; nkey < state->nKeys; nkey++, sortKey++) { AttrNumber attno = state->indexInfo->ii_KeyAttrNumbers[nkey]; Datum datum1, *************** comparetup_cluster(const SortTuple *a, c *** 3041,3051 **** datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1); datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2); ! compare = inlineApplySortFunction(&scanKey->sk_func, ! scanKey->sk_flags, ! scanKey->sk_collation, ! datum1, isnull1, ! datum2, isnull2); if (compare != 0) return compare; } --- 3018,3026 ---- datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1); datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2); ! compare = ApplySortComparator(datum1, isnull1, ! datum2, isnull2, ! sortKey); if (compare != 0) return compare; } *************** comparetup_cluster(const SortTuple *a, c *** 3077,3091 **** FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate, r_index_values, r_index_isnull); ! for (; nkey < state->nKeys; nkey++, scanKey++) { ! compare = inlineApplySortFunction(&scanKey->sk_func, ! scanKey->sk_flags, ! scanKey->sk_collation, ! l_index_values[nkey], ! l_index_isnull[nkey], ! r_index_values[nkey], ! r_index_isnull[nkey]); if (compare != 0) return compare; } --- 3052,3064 ---- FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate, r_index_values, r_index_isnull); ! for (; nkey < state->nKeys; nkey++, sortKey++) { ! compare = ApplySortComparator(l_index_values[nkey], ! l_index_isnull[nkey], ! r_index_values[nkey], ! r_index_isnull[nkey], ! sortKey); if (compare != 0) return compare; } *************** comparetup_index_btree(const SortTuple * *** 3181,3187 **** * whether any null fields are present. Also see the special treatment * for equal keys at the end. */ ! ScanKey scanKey = state->indexScanKey; IndexTuple tuple1; IndexTuple tuple2; int keysz; --- 3154,3160 ---- * whether any null fields are present. Also see the special treatment * for equal keys at the end. */ ! SortSupport sortKey = state->sortKeys; IndexTuple tuple1; IndexTuple tuple2; int keysz; *************** comparetup_index_btree(const SortTuple * *** 3191,3200 **** int32 compare; /* Compare the leading sort key */ ! compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags, ! scanKey->sk_collation, ! a->datum1, a->isnull1, ! b->datum1, b->isnull1); if (compare != 0) return compare; --- 3164,3172 ---- int32 compare; /* Compare the leading sort key */ ! compare = ApplySortComparator(a->datum1, a->isnull1, ! b->datum1, b->isnull1, ! sortKey); if (compare != 0) return compare; *************** comparetup_index_btree(const SortTuple * *** 3207,3214 **** tuple2 = (IndexTuple) b->tuple; keysz = state->nKeys; tupDes = RelationGetDescr(state->indexRel); ! scanKey++; ! for (nkey = 2; nkey <= keysz; nkey++, scanKey++) { Datum datum1, datum2; --- 3179,3186 ---- tuple2 = (IndexTuple) b->tuple; keysz = state->nKeys; tupDes = RelationGetDescr(state->indexRel); ! sortKey++; ! for (nkey = 2; nkey <= keysz; nkey++, sortKey++) { Datum datum1, datum2; *************** comparetup_index_btree(const SortTuple * *** 3218,3227 **** datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1); datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2); ! compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags, ! scanKey->sk_collation, ! datum1, isnull1, ! datum2, isnull2); if (compare != 0) return compare; /* done when we find unequal attributes */ --- 3190,3198 ---- datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1); datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2); ! compare = ApplySortComparator(datum1, isnull1, ! datum2, isnull2, ! sortKey); if (compare != 0) return compare; /* done when we find unequal attributes */ *************** readtup_index(Tuplesortstate *state, Sor *** 3395,3420 **** &stup->isnull1); } - static void - reversedirection_index_btree(Tuplesortstate *state) - { - ScanKey scanKey = state->indexScanKey; - int nkey; - - for (nkey = 0; nkey < state->nKeys; nkey++, scanKey++) - { - scanKey->sk_flags ^= (SK_BT_DESC | SK_BT_NULLS_FIRST); - } - } - - static void - reversedirection_index_hash(Tuplesortstate *state) - { - /* We don't support reversing direction in a hash index sort */ - elog(ERROR, "reversedirection_index_hash is not implemented"); - } - - /* * Routines specialized for DatumTuple case */ --- 3366,3371 ---- *************** readtup_datum(Tuplesortstate *state, Sor *** 3513,3525 **** &tuplen, sizeof(tuplen)); } - static void - reversedirection_datum(Tuplesortstate *state) - { - state->onlyKey->ssup_reverse = !state->onlyKey->ssup_reverse; - state->onlyKey->ssup_nulls_first = !state->onlyKey->ssup_nulls_first; - } - /* * Convenience routine to free a tuple previously loaded into sort memory */ --- 3464,3469 ---- diff --git a/src/include/utils/sortsupport.h b/src/include/utils/sortsupport.h new file mode 100644 index 4417143..faae703 *** a/src/include/utils/sortsupport.h --- b/src/include/utils/sortsupport.h *************** *** 48,53 **** --- 48,54 ---- #define SORTSUPPORT_H #include "access/attnum.h" + #include "utils/relcache.h" typedef struct SortSupportData *SortSupport; *************** ApplySortComparator(Datum datum1, bool i *** 152,156 **** --- 153,159 ---- /* Other functions in utils/sort/sortsupport.c */ extern void PrepareSortSupportComparisonShim(Oid cmpFunc, SortSupport ssup); extern void PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup); + extern void PrepareSortSupportFromIndexRel(Relation indexRel, int16 strategy, + SortSupport ssup); #endif /* SORTSUPPORT_H */
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers