On Thu, Apr 3, 2014 at 1:23 PM, Thom Brown <t...@linux.com> wrote:
> I'm getting an error when building this:

Sorry. I ran this through Valgrind, and forgot to reset where the
relevant macro is define'd before submission. Attached revision should
build without issue.


-- 
Peter Geoghegan
*** a/src/backend/executor/nodeMergeAppend.c
--- b/src/backend/executor/nodeMergeAppend.c
*************** ExecInitMergeAppend(MergeAppend *node, E
*** 136,141 ****
--- 136,142 ----
  		sortKey->ssup_collation = node->collations[i];
  		sortKey->ssup_nulls_first = node->nullsFirst[i];
  		sortKey->ssup_attno = node->sortColIdx[i];
+ 		sortKey->leading = (i == 0);
  
  		PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
  	}
*** a/src/backend/utils/adt/varlena.c
--- b/src/backend/utils/adt/varlena.c
***************
*** 17,22 ****
--- 17,23 ----
  #include <ctype.h>
  #include <limits.h>
  
+ #include "access/nbtree.h"
  #include "access/tuptoaster.h"
  #include "catalog/pg_collation.h"
  #include "catalog/pg_type.h"
***************
*** 29,34 ****
--- 30,36 ----
  #include "utils/bytea.h"
  #include "utils/lsyscache.h"
  #include "utils/pg_locale.h"
+ #include "utils/sortsupport.h"
  
  
  /* GUC variable */
*************** typedef struct
*** 50,61 ****
--- 52,85 ----
  	int			skiptable[256]; /* skip distance for given mismatched char */
  } TextPositionState;
  
+ typedef struct
+ {
+ 	char	   *buf1;			/* 1st string, or poorman original string buf */
+ 	char	   *buf2;			/* 2nd string, or leading key/poor man blob */
+ 	int			buflen1;
+ 	int			buflen2;
+ #ifdef HAVE_LOCALE_T
+ 	pg_locale_t locale;
+ #endif
+ } TextSortSupport;
+ 
+ /*
+  * This should be large enough that most strings will be fit, but small enough
+  * that we feel comfortable putting it on the stack
+  */
+ #define TEXTBUFLEN		1024
+ 
  #define DatumGetUnknownP(X)			((unknown *) PG_DETOAST_DATUM(X))
  #define DatumGetUnknownPCopy(X)		((unknown *) PG_DETOAST_DATUM_COPY(X))
  #define PG_GETARG_UNKNOWN_P(n)		DatumGetUnknownP(PG_GETARG_DATUM(n))
  #define PG_GETARG_UNKNOWN_P_COPY(n) DatumGetUnknownPCopy(PG_GETARG_DATUM(n))
  #define PG_RETURN_UNKNOWN_P(x)		PG_RETURN_POINTER(x)
  
+ static void btpoorman_worker(SortSupport ssup, Oid collid);
+ static int bttextfastcmp_c(Datum x, Datum y, SortSupport ssup);
+ static int bttextfastcmp_locale(Datum x, Datum y, SortSupport ssup);
+ static int bttextfastcmp_locale_poorman(Datum x, Datum y, SortSupport ssup);
+ static Datum bttext_convert(Datum d, SortSupport ssup);
  static int32 text_length(Datum str);
  static text *text_catenate(text *t1, text *t2);
  static text *text_substring(Datum str,
*************** varstr_cmp(char *arg1, int len1, char *a
*** 1356,1365 ****
  	}
  	else
  	{
! #define STACKBUFLEN		1024
! 
! 		char		a1buf[STACKBUFLEN];
! 		char		a2buf[STACKBUFLEN];
  		char	   *a1p,
  				   *a2p;
  
--- 1380,1387 ----
  	}
  	else
  	{
! 		char		a1buf[TEXTBUFLEN];
! 		char		a2buf[TEXTBUFLEN];
  		char	   *a1p,
  				   *a2p;
  
*************** varstr_cmp(char *arg1, int len1, char *a
*** 1393,1416 ****
  			int			a2len;
  			int			r;
  
! 			if (len1 >= STACKBUFLEN / 2)
  			{
  				a1len = len1 * 2 + 2;
  				a1p = palloc(a1len);
  			}
  			else
  			{
! 				a1len = STACKBUFLEN;
  				a1p = a1buf;
  			}
! 			if (len2 >= STACKBUFLEN / 2)
  			{
  				a2len = len2 * 2 + 2;
  				a2p = palloc(a2len);
  			}
  			else
  			{
! 				a2len = STACKBUFLEN;
  				a2p = a2buf;
  			}
  
--- 1415,1438 ----
  			int			a2len;
  			int			r;
  
! 			if (len1 >= TEXTBUFLEN / 2)
  			{
  				a1len = len1 * 2 + 2;
  				a1p = palloc(a1len);
  			}
  			else
  			{
! 				a1len = TEXTBUFLEN;
  				a1p = a1buf;
  			}
! 			if (len2 >= TEXTBUFLEN / 2)
  			{
  				a2len = len2 * 2 + 2;
  				a2p = palloc(a2len);
  			}
  			else
  			{
! 				a2len = TEXTBUFLEN;
  				a2p = a2buf;
  			}
  
*************** varstr_cmp(char *arg1, int len1, char *a
*** 1475,1485 ****
  		}
  #endif   /* WIN32 */
  
! 		if (len1 >= STACKBUFLEN)
  			a1p = (char *) palloc(len1 + 1);
  		else
  			a1p = a1buf;
! 		if (len2 >= STACKBUFLEN)
  			a2p = (char *) palloc(len2 + 1);
  		else
  			a2p = a2buf;
--- 1497,1507 ----
  		}
  #endif   /* WIN32 */
  
! 		if (len1 >= TEXTBUFLEN)
  			a1p = (char *) palloc(len1 + 1);
  		else
  			a1p = a1buf;
! 		if (len2 >= TEXTBUFLEN)
  			a2p = (char *) palloc(len2 + 1);
  		else
  			a2p = a2buf;
*************** bttextcmp(PG_FUNCTION_ARGS)
*** 1683,1688 ****
--- 1705,2039 ----
  	PG_RETURN_INT32(result);
  }
  
+ Datum
+ bttextsortsupport(PG_FUNCTION_ARGS)
+ {
+ 	SortSupport		ssup = (SortSupport) PG_GETARG_POINTER(0);
+ 	Oid				collid = ssup->ssup_collation;
+ 
+ 	btpoorman_worker(ssup, collid);
+ 
+ 	PG_RETURN_VOID();
+ }
+ 
+ static void
+ btpoorman_worker(SortSupport ssup, Oid collid)
+ {
+ 	TextSortSupport	   *tss;
+ 
+ 	/*
+ 	 * If LC_COLLATE = C, we can make things quite a bit faster by using
+ 	 * memcmp() rather than strcoll().  To minimize the per-comparison
+ 	 * overhead, we make this decision just once for the whole sort.  For now,
+ 	 * don't apply the "small string optimization" to C-locale, even though
+ 	 * that should work (without any strxfrm() processing).
+ 	 */
+ 	if (lc_collate_is_c(collid))
+ 	{
+ 		ssup->comparator = bttextfastcmp_c;
+ 		ssup->converter = NULL;
+ 		ssup->proper = NULL;
+ 		return;
+ 	}
+ 
+ 	/*
+ 	 * WIN32 requires complex hacks when the database encoding is UTF-8
+ 	 * (except when using the "C" collation).  For now, we don't optimize that
+ 	 * case.
+ 	 */
+ #ifdef WIN32
+ 	if (GetDatabaseEncoding() == PG_UTF8)
+ 		PG_RETURN_VOID();
+ #endif /* WIN32 */
+ 
+ 	/*
+ 	 * We need a collation-sensitive comparison.  To make things faster,
+ 	 * we'll figure out the collation based on the locale id and cache the
+ 	 * result.  Also, since strxfrm()/strcoll() require NULL-terminated inputs,
+ 	 * prepare one or two palloc'd buffers to use as temporary workspace.  In
+ 	 * the ad-hoc comparison case we only use palloc'd buffers when we need
+ 	 * more space than we're comfortable allocating on the stack, but here we
+ 	 * can keep the buffers around for the whole sort, so it makes sense to
+ 	 * allocate them once and use them unconditionally (although we won't
+ 	 * actually need them when sorting proper actually begins and strxfrm()
+ 	 * conversion has already occurred, when sorting a leading key).
+ 	 */
+ 	tss = MemoryContextAlloc(ssup->ssup_cxt, sizeof(TextSortSupport));
+ #ifdef HAVE_LOCALE_T
+ 	tss->locale = 0;
+ #endif
+ 
+ 	if (collid != DEFAULT_COLLATION_OID)
+ 	{
+ 		if (!OidIsValid(collid))
+ 		{
+ 			/*
+ 			 * This typically means that the parser could not resolve a
+ 			 * conflict of implicit collations, so report it that way.
+ 			 */
+ 			ereport(ERROR,
+ 					(errcode(ERRCODE_INDETERMINATE_COLLATION),
+ 					 errmsg("could not determine which collation to use for string comparison"),
+ 					 errhint("Use the COLLATE clause to set the collation explicitly.")));
+ 		}
+ #ifdef HAVE_LOCALE_T
+ 		tss->locale = pg_newlocale_from_collation(collid);
+ #endif
+ 	}
+ 
+ 	tss->buf1 = MemoryContextAlloc(ssup->ssup_cxt, TEXTBUFLEN);
+ 	tss->buflen1 = TEXTBUFLEN;
+ 	tss->buf2 = MemoryContextAlloc(ssup->ssup_cxt, TEXTBUFLEN);
+ 	tss->buflen2 = TEXTBUFLEN;
+ 
+ 	ssup->ssup_extra = tss;
+ 	if (!ssup->leading)
+ 	{
+ 		ssup->comparator = bttextfastcmp_locale;
+ 		ssup->converter = NULL;
+ 		ssup->proper = NULL;
+ 		return;
+ 	}
+ 
+ 	ssup->comparator = bttextfastcmp_locale_poorman;
+ 	ssup->converter = bttext_convert;
+ 
+ 	ssup->proper = MemoryContextAllocZero(CurrentMemoryContext,
+ 										  sizeof(SortSupportData));
+ 	ssup->proper->ssup_cxt = ssup->ssup_cxt;
+ 	ssup->proper->ssup_collation = ssup->ssup_collation;
+ 	ssup->proper->ssup_reverse = ssup->ssup_reverse;
+ 	ssup->proper->ssup_nulls_first = ssup->ssup_nulls_first;
+ 	ssup->proper->ssup_attno = ssup->ssup_attno;
+ 
+ 	/*
+ 	 * Initialize the "proper" sortsupport state with a reliable
+ 	 * strcoll()-based comparator for tie-breaking.  This is the same thing
+ 	 * that this routine does directly for non-leading keys, which never have
+ 	 * poor man's normalized keys.
+ 	 *
+ 	 * This is the "true" leading key, which is technically not actually
+ 	 * leading.  The pseudo leading poor man's key is the leading key for
+ 	 * tuplesort's purposes.
+ 	 */
+ 	ssup->proper->leading = false;
+ 	ssup->proper->true_leading = true;
+ 	btpoorman_worker(ssup->proper, collid);
+ }
+ 
+ static int
+ bttextfastcmp_c(Datum x, Datum y, SortSupport ssup)
+ {
+ 	text	   *arg1 = DatumGetTextPP(x);
+ 	text	   *arg2 = DatumGetTextPP(y);
+ 	char	   *a1p,
+ 			   *a2p;
+ 	int			len1,
+ 				len2,
+ 	result;
+ 
+ 	a1p = VARDATA_ANY(arg1);
+ 	a2p = VARDATA_ANY(arg2);
+ 
+ 	len1 = VARSIZE_ANY_EXHDR(arg1);
+ 	len2 = VARSIZE_ANY_EXHDR(arg2);
+ 
+ 	result = memcmp(a1p, a2p, Min(len1, len2));
+ 	if ((result == 0) && (len1 != len2))
+ 		result = (len1 < len2) ? -1 : 1;
+ 
+ 	/* We can't afford to leak memory here. */
+ 	if (PointerGetDatum(arg1) != x)
+ 		pfree(arg1);
+ 	if (PointerGetDatum(arg2) != y)
+ 		pfree(arg2);
+ 
+ 	return result;
+ }
+ 
+ static int
+ bttextfastcmp_locale(Datum x, Datum y, SortSupport ssup)
+ {
+ 	text	   *arg1 = DatumGetTextPP(x);
+ 	text	   *arg2 = DatumGetTextPP(y);
+ 	TextSortSupport	   *tss = (TextSortSupport *) ssup->ssup_extra;
+ 	char	   *a1p,
+ 			   *a2p;
+ 	int			len1,
+ 				len2,
+ 	result;
+ 
+ 	a1p = VARDATA_ANY(arg1);
+ 	a2p = VARDATA_ANY(arg2);
+ 
+ 	len1 = VARSIZE_ANY_EXHDR(arg1);
+ 	len2 = VARSIZE_ANY_EXHDR(arg2);
+ 
+ 	if (ssup->true_leading && len1 == len2)
+ 	{
+ 		/*
+ 		 * "True" leading key.  This indicates that we're being called as a
+ 		 * reliable tie-breaker for the poor man's normalized key comparison
+ 		 * (there may be other attributes that must be subsequently compared,
+ 		 * but we're consulted first).
+ 		 *
+ 		 * In general there is a pretty good chance that we are being consulted
+ 		 * because the key is actually fully equal, especially since the two
+ 		 * text strings are already known to have the same length.  As such, it
+ 		 * seems worth it to try and give an answer using only a cheap memcmp()
+ 		 * comparison in anticipation of that reliably indicating equality
+ 		 * frequently enough for it to be worth it on balance.
+ 		 */
+ 		if (memcmp(a1p, a2p, len1) == 0)
+ 			return 0;
+ 	}
+ 
+ 	if (len1 >= tss->buflen1)
+ 	{
+ 		pfree(tss->buf1);
+ 		tss->buflen1 *= 2;
+ 		tss->buf1 = MemoryContextAlloc(ssup->ssup_cxt, tss->buflen1);
+ 	}
+ 	if (len2 >= tss->buflen2)
+ 	{
+ 		pfree(tss->buf2);
+ 		tss->buflen2 *= 2;
+ 		tss->buf2 = MemoryContextAlloc(ssup->ssup_cxt, tss->buflen2);
+ 	}
+ 
+ 	memcpy(tss->buf1, a1p, len1);
+ 	tss->buf1[len1] = '\0';
+ 	memcpy(tss->buf2, a2p, len2);
+ 	tss->buf2[len2] = '\0';
+ 
+ #ifdef HAVE_LOCALE_T
+ 	if (tss->locale)
+ 		result = strcoll_l(tss->buf1, tss->buf2, tss->locale);
+ 	else
+ #endif
+ 		result = strcoll(tss->buf1, tss->buf2);
+ 
+ 	/*
+ 	 * In some locales strcoll() can claim that nonidentical strings are equal.
+ 	 * Believing that would be bad news for a number of reasons, so we follow
+ 	 * Perl's lead and sort "equal" strings according to strcmp().
+ 	 */
+ 	if (result == 0)
+ 		result = strcmp(tss->buf1, tss->buf2);
+ 
+ 	/* We can't afford to leak memory here. */
+ 	if (PointerGetDatum(arg1) != x)
+ 		pfree(arg1);
+ 	if (PointerGetDatum(arg2) != y)
+ 		pfree(arg2);
+ 
+ 	return result;
+ }
+ 
+ /*
+  * This is only used for the leading column of a sort operation, where
+  * bttext_convert() has already prepared a poor man's normalized key
+  * representation.  The result returned here can only be trusted when non-zero.
+  */
+ static int
+ bttextfastcmp_locale_poorman(Datum x, Datum y, SortSupport ssup)
+ {
+ 	int 	result;
+ 	int 	len1, len2;
+ 	char* a = (char*) &x;
+ 	char* b = (char*) &y;
+ 
+ 	if (a[sizeof(Datum) - 1] == '\0')
+ 		len1 = strlen(a);
+ 	else
+ 		len1 = sizeof(Datum);
+ 
+ 	if (b[sizeof(Datum) - 1] == '\0')
+ 		len2 = strlen(b);
+ 	else
+ 		len2 = sizeof(Datum);
+ 
+ 	result = memcmp(a, b, Min(len1, len2));
+ 
+ 	/*
+ 	 * Lengths here cannot reliably indicate original string length.  When
+ 	 * result = 0, leave it up to the bttextfastcmp_locale() tie-breaker that
+ 	 * the core system is obligated to perform (rather than matching
+ 	 * bttextfastcmp_c()'s length tie-break behavior).  Even a strcmp() on two
+ 	 * non-truncated strxfrm() blobs cannot indicate equality in a way that is
+ 	 * generally trustworthy, for the same reason that there is a strcoll()
+ 	 * strcmp() tie-breaker (there'd still need to be a strcmp() tie-breaker on
+ 	 * the *original* string).
+ 	 */
+ 	return result;
+ }
+ 
+ static Datum
+ bttext_convert(Datum d, SortSupport ssup)
+ {
+ 	Datum				res;
+ 	char			   *pres;
+ 	int					len;
+ 	TextSortSupport	   *tss = (TextSortSupport *) ssup->ssup_extra;
+ 	text			   *full = DatumGetTextPP(d);
+ 	Size				bsize;
+ 
+ 	/*
+ 	 * Convert text into a "poor man's normalized key".  This is a
+ 	 * pass-by-value Datum that is treated as a char array.
+ 	 */
+ 	pres = (char*) &res;
+ 	/* MemSet so non-copied bytes are always NULL */
+ 	MemSet(pres, 0, sizeof(Datum));
+ 	len = VARSIZE_ANY_EXHDR(full);
+ 
+ 	/* By convention, we use buffer 1 */
+ 	if (len >= tss->buflen1)
+ 	{
+ 		pfree(tss->buf1);
+ 		tss->buflen1 *= 2;
+ 		tss->buf1 = MemoryContextAlloc(ssup->ssup_cxt, tss->buflen1);
+ 	}
+ 
+ 	/* Just like strcoll(), strxfrm() expects a NULL-terminated string */
+ 	memcpy(tss->buf1, VARDATA_ANY(full), len);
+ 	tss->buf1[len] = '\0';
+ 
+ retry:
+ 
+ #ifdef HAVE_LOCALE_T
+ 	if (tss->locale)
+ 		bsize = strxfrm_l(tss->buf2, tss->buf1, tss->buflen2, tss->locale);
+ 	else
+ #endif
+ 		bsize = strxfrm(tss->buf2, tss->buf1, tss->buflen2);
+ 
+ 	if (bsize >= tss->buflen2)
+ 	{
+ 		/*
+ 		 * The standard requires that we fit the entire blob in temp buffer in
+ 		 * order for subsequent comparisons to be well-defined.  glibc doesn't
+ 		 * actually require this, but it doesn't seem to help performance to
+ 		 * depend on that.
+ 		 */
+ 		pfree(tss->buf2);
+ 		tss->buflen2 = Max(bsize + 1, tss->buflen2 * 2);
+ 		tss->buf2 = MemoryContextAlloc(ssup->ssup_cxt, tss->buflen2);
+ 		goto retry;
+ 	}
+ 
+ 	memcpy(pres, tss->buf2, Min(sizeof(Datum), bsize));
+ 
+ 	/*
+ 	 * Iff last byte isn't NULL, as in the common case where the entire Datum
+ 	 * is filled with blob bytes, that is interpreted as indicating that every
+ 	 * Datum byte should be compared.  Otherwise, the length is determined by
+ 	 * strlen().  This is safe because the strxfrm() blob is itself
+ 	 * NULL-terminated, leaving no danger of misinterpreting any NULL bytes not
+ 	 * intended to be interpreted as logically representing termination.
+ 	 */
+ 	return res;
+ }
  
  Datum
  text_larger(PG_FUNCTION_ARGS)
*** a/src/backend/utils/sort/sortsupport.c
--- b/src/backend/utils/sort/sortsupport.c
*************** PrepareSortSupportComparisonShim(Oid cmp
*** 82,87 ****
--- 82,89 ----
  
  	ssup->ssup_extra = extra;
  	ssup->comparator = comparison_shim;
+ 	ssup->converter = NULL;
+ 	ssup->proper = NULL;
  }
  
  /*
*************** PrepareSortSupportFromOrderingOp(Oid ord
*** 104,109 ****
--- 106,113 ----
  		elog(ERROR, "operator %u is not a valid ordering operator",
  			 orderingOp);
  
+ 	ssup->converter = NULL;
+ 
  	if (issupport)
  	{
  		/* The sort support function should provide a comparator */
*** a/src/backend/utils/sort/tuplesort.c
--- b/src/backend/utils/sort/tuplesort.c
*************** bool		optimize_bounded_sort = true;
*** 150,156 ****
   * When sorting single Datums, the data value is represented directly by
   * datum1/isnull1.	If the datatype is pass-by-reference and isnull1 is false,
   * then datum1 points to a separately palloc'd data value that is also pointed
!  * to by the "tuple" pointer; otherwise "tuple" is NULL.
   *
   * While building initial runs, tupindex holds the tuple's run number.  During
   * merge passes, we re-use it to hold the input tape number that each tuple in
--- 150,159 ----
   * When sorting single Datums, the data value is represented directly by
   * datum1/isnull1.	If the datatype is pass-by-reference and isnull1 is false,
   * then datum1 points to a separately palloc'd data value that is also pointed
!  * to by the "tuple" pointer; otherwise "tuple" is NULL.  Note that there are
!  * some exceptions, as when the sort support infrastructure provides a "poor
!  * man's normalized key" representation.   When that occurs, extra precautions
!  * are taken when a comparison involving a pair of datum1s returns 0.
   *
   * While building initial runs, tupindex holds the tuple's run number.  During
   * merge passes, we re-use it to hold the input tape number that each tuple in
*************** tuplesort_begin_heap(TupleDesc tupDesc,
*** 647,657 ****
  		sortKey->ssup_collation = sortCollations[i];
  		sortKey->ssup_nulls_first = nullsFirstFlags[i];
  		sortKey->ssup_attno = attNums[i];
  
  		PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
  	}
! 
! 	if (nkeys == 1)
  		state->onlyKey = state->sortKeys;
  
  	MemoryContextSwitchTo(oldcontext);
--- 650,667 ----
  		sortKey->ssup_collation = sortCollations[i];
  		sortKey->ssup_nulls_first = nullsFirstFlags[i];
  		sortKey->ssup_attno = attNums[i];
+ 		sortKey->leading = (i == 0);
  
  		PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
  	}
! 	/*
! 	 * The "onlyKey" optimization cannot be used when a tie-breaker for an
! 	 * unreliable poor man's normalized key comparison is required.  Typically,
! 	 * the optimization is only of significant value to pass-by-value types
! 	 * anyway, whereas poor man's normalized keys are typically used by
! 	 * pass-by-reference types.
! 	 */
! 	if (nkeys == 1 && !state->sortKeys->converter)
  		state->onlyKey = state->sortKeys;
  
  	MemoryContextSwitchTo(oldcontext);
*************** comparetup_heap(const SortTuple *a, cons
*** 2870,2875 ****
--- 2880,2908 ----
  	rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
  	rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET);
  	tupDesc = state->tupDesc;
+ 	/* If leading comparison was unreliable, do reliable comparison */
+ 	if (state->sortKeys->converter)
+ 	{
+ 		AttrNumber	attno = sortKey->ssup_attno;
+ 		Datum		datum1,
+ 					datum2;
+ 		bool		isnull1,
+ 					isnull2;
+ 
+ 		Assert(attno == sortKey->proper->ssup_attno);
+ 		Assert(sortKey->proper->true_leading);
+ 		Assert(sortKey->leading);
+ 
+ 		datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
+ 		datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
+ 
+ 		compare = ApplySortComparator(datum1, isnull1,
+ 									  datum2, isnull2,
+ 									  sortKey->proper);
+ 		if (compare != 0)
+ 			return compare;
+ 	}
+ 
  	sortKey++;
  	for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++)
  	{
*************** copytup_heap(Tuplesortstate *state, Sort
*** 2910,2919 ****
  	/* set up first-column key value */
  	htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
  	htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
! 	stup->datum1 = heap_getattr(&htup,
! 								state->sortKeys[0].ssup_attno,
! 								state->tupDesc,
! 								&stup->isnull1);
  }
  
  static void
--- 2943,2972 ----
  	/* set up first-column key value */
  	htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
  	htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
! 	if (!state->sortKeys->converter)
! 	{
! 		/* Store ordinary Datum representation */
! 		stup->datum1 = heap_getattr(&htup,
! 									state->sortKeys[0].ssup_attno,
! 									state->tupDesc,
! 									&stup->isnull1);
! 	}
! 	else
! 	{
! 		Datum		temp;
! 
! 		/*
! 		 * Store "poor man's normalized key", which cannot indicate equality in
! 		 * a trustworthy manner, and may require a tie-breaker
! 		 */
! 		Assert(state->sortKeys[0].leading);
! 		temp = heap_getattr(&htup,
! 							state->sortKeys[0].ssup_attno,
! 							state->tupDesc,
! 							&stup->isnull1);
! 
! 		stup->datum1 = state->sortKeys->converter(temp, state->sortKeys);
! 	}
  }
  
  static void
*** a/src/include/catalog/pg_amproc.h
--- b/src/include/catalog/pg_amproc.h
*************** DATA(insert (	1989   26 26 1 356 ));
*** 122,127 ****
--- 122,128 ----
  DATA(insert (	1989   26 26 2 3134 ));
  DATA(insert (	1991   30 30 1 404 ));
  DATA(insert (	1994   25 25 1 360 ));
+ DATA(insert (	1994   25 25 2 3492 ));
  DATA(insert (	1996   1083 1083 1 1107 ));
  DATA(insert (	2000   1266 1266 1 1358 ));
  DATA(insert (	2002   1562 1562 1 1672 ));
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
*************** DATA(insert OID = 3135 ( btnamesortsuppo
*** 610,615 ****
--- 610,617 ----
  DESCR("sort support");
  DATA(insert OID = 360 (  bttextcmp		   PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 23 "25 25" _null_ _null_ _null_ _null_ bttextcmp _null_ _null_ _null_ ));
  DESCR("less-equal-greater");
+ DATA(insert OID = 3492 ( bttextsortsupport PGNSP PGUID 12 1 0 0 0 f f f f t f i 1 0 2278 "2281" _null_ _null_ _null_ _null_ bttextsortsupport _null_ _null_ _null_ ));
+ DESCR("sort support");
  DATA(insert OID = 377 (  cash_cmp		   PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 23 "790 790" _null_ _null_ _null_ _null_ cash_cmp _null_ _null_ _null_ ));
  DESCR("less-equal-greater");
  DATA(insert OID = 380 (  btreltimecmp	   PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 23 "703 703" _null_ _null_ _null_ _null_ btreltimecmp _null_ _null_ _null_ ));
*** a/src/include/utils/builtins.h
--- b/src/include/utils/builtins.h
*************** extern Datum bttintervalcmp(PG_FUNCTION_
*** 312,317 ****
--- 312,318 ----
  extern Datum btcharcmp(PG_FUNCTION_ARGS);
  extern Datum btnamecmp(PG_FUNCTION_ARGS);
  extern Datum bttextcmp(PG_FUNCTION_ARGS);
+ extern Datum bttextsortsupport(PG_FUNCTION_ARGS);
  
  /*
   *		Per-opclass sort support functions for new btrees.	Like the
*** a/src/include/utils/sortsupport.h
--- b/src/include/utils/sortsupport.h
*************** typedef struct SortSupportData
*** 59,64 ****
--- 59,80 ----
  	 */
  	MemoryContext ssup_cxt;		/* Context containing sort info */
  	Oid			ssup_collation; /* Collation to use, or InvalidOid */
+ 	bool		leading;		/* Leading (poor man-applicable) key? */
+ 	bool		true_leading;	/* "True" (non-poorman's) leading key? */
+ 
+ 	/*
+ 	 * Alternative "proper" SortSupport state for leading key, used only when
+ 	 * an unreliable poor man's normalized key comparator returns 0, and the
+ 	 * core system must use this separate state to perform a trustworthy
+ 	 * comparison.  By convention this relates to the same attribute as our
+ 	 * ssup_attno.  When set initially by a SortSupport function, the core
+ 	 * system expects to be able to check if this is set, and independently
+ 	 * make reliable non-poorman's comparisons if and when this SortSupport
+ 	 * state returns an uncertain 0 result (the implication is that when this
+ 	 * applies, leading = true here, and this state relates to a poorman's
+ 	 * normalized key implementation).
+ 	 */
+ 	struct SortSupportData *proper;
  
  	/*
  	 * Additional sorting parameters; but unlike ssup_collation, these can be
*************** typedef struct SortSupportData
*** 69,75 ****
  	bool		ssup_nulls_first;		/* sort nulls first? */
  
  	/*
! 	 * These fields are workspace for callers, and should not be touched by
  	 * opclass-specific functions.
  	 */
  	AttrNumber	ssup_attno;		/* column number to sort */
--- 85,91 ----
  	bool		ssup_nulls_first;		/* sort nulls first? */
  
  	/*
! 	 * These fields are workspace for callers, and should only be touched by
  	 * opclass-specific functions.
  	 */
  	AttrNumber	ssup_attno;		/* column number to sort */
*************** typedef struct SortSupportData
*** 96,103 ****
  	int			(*comparator) (Datum x, Datum y, SortSupport ssup);
  
  	/*
! 	 * Additional sort-acceleration functions might be added here later.
  	 */
  } SortSupportData;
  
  
--- 112,125 ----
  	int			(*comparator) (Datum x, Datum y, SortSupport ssup);
  
  	/*
! 	 * Given a datum involving an opclass with relevant support, use this
! 	 * callback to convert to a pass-by-value untrustworthy Datum, or poor
! 	 * man's normalized key.  Our sort support comparator expects this.  When
! 	 * this is set, tuplesort knows not to trust a comparison results of 0.
! 	 * However, -1 or 1 can be trusted, and taken as a perfect proxy for what
! 	 * the full leading key would indicate.
  	 */
+ 	Datum		(*converter) (Datum d, SortSupport ssup);
  } SortSupportData;
  
  
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to