On Mon, Mar 31, 2014 at 1:30 PM, Tom Lane <t...@sss.pgh.pa.us> wrote:
> Possibly worth noting is that on my RHEL6 and Fedora machines,
> "man strxfrm" says the same thing as the POSIX spec.  Likewise on OS X.
> I don't think we can rely on what you suggest.

Okay. Attached revision only trusts strxfrm() blobs (as far as that
goes) when the buffer passed to strxfrm() was sufficiently large that
the blob could fully fit. We're now managing two buffers as part of
the text sort support state (for leading poor man's keys, rather than
just for non-leading keys as before) - one for the original string (to
put in a NULL byte, since like strcoll(), strxfrm() expects that), and
the other to temporarily store the blob before memcpy()ing over to the
pass-by-value poor man's normalized key Datum. These are the same two
persistent sortsupport-state buffers used when sorting a non-leading
text key, where there is one for each string that needs to have a NULL
byte appended.

This appears to perform at least as well as the prior revision, and
possibly even appreciably better. I guess that glibc was internally
managing a buffer to do much the same thing, so perhaps we gain
something more from mostly avoiding that with a longer lived buffer.

Perhaps you could investigate the performance of this new revision too, Thom?

I'd really like to see this basic approach expanded to store pseudo
leading keys in internal B-Tree pages too, per my earlier proposals.
At the very least, B-Tree index builds should benefit from this. I
think I'm really only scratching the surface here, both in terms of
the number of places in the code where a poor man's normalized key
could usefully be exploited, as well as in terms of the number of
B-Tree operator classes that could be made to provide one.

-- 
Peter Geoghegan
*** a/src/backend/utils/adt/varlena.c
--- b/src/backend/utils/adt/varlena.c
***************
*** 17,22 ****
--- 17,23 ----
  #include <ctype.h>
  #include <limits.h>
  
+ #include "access/nbtree.h"
  #include "access/tuptoaster.h"
  #include "catalog/pg_collation.h"
  #include "catalog/pg_type.h"
***************
*** 29,34 ****
--- 30,36 ----
  #include "utils/bytea.h"
  #include "utils/lsyscache.h"
  #include "utils/pg_locale.h"
+ #include "utils/sortsupport.h"
  
  
  /* GUC variable */
*************** typedef struct
*** 50,61 ****
--- 52,85 ----
  	int			skiptable[256]; /* skip distance for given mismatched char */
  } TextPositionState;
  
+ typedef struct
+ {
+ 	char	   *buf1;			/* Also used as poorman original string buf */
+ 	char	   *buf2;			/* Used to store leading key/poor man blob */
+ 	int			buflen1;
+ 	int			buflen2;
+ #ifdef HAVE_LOCALE_T
+ 	pg_locale_t locale;
+ #endif
+ } TextSortSupport;
+ 
+ /*
+  * This should be large enough that most strings will be fit, but small enough
+  * that we feel comfortable putting it on the stack
+  */
+ #define TEXTBUFLEN		1024
+ 
  #define DatumGetUnknownP(X)			((unknown *) PG_DETOAST_DATUM(X))
  #define DatumGetUnknownPCopy(X)		((unknown *) PG_DETOAST_DATUM_COPY(X))
  #define PG_GETARG_UNKNOWN_P(n)		DatumGetUnknownP(PG_GETARG_DATUM(n))
  #define PG_GETARG_UNKNOWN_P_COPY(n) DatumGetUnknownPCopy(PG_GETARG_DATUM(n))
  #define PG_RETURN_UNKNOWN_P(x)		PG_RETURN_POINTER(x)
  
+ static void btpoorman_worker(SortSupport ssup, Oid collid);
+ static int bttextfastcmp_c(Datum x, Datum y, SortSupport ssup);
+ static int bttextfastcmp_locale(Datum x, Datum y, SortSupport ssup);
+ static int bttextfastcmp_locale_poorman(Datum x, Datum y, SortSupport ssup);
+ static Datum bttext_convert(Datum d, SortSupport ssup);
  static int32 text_length(Datum str);
  static text *text_catenate(text *t1, text *t2);
  static text *text_substring(Datum str,
*************** varstr_cmp(char *arg1, int len1, char *a
*** 1356,1365 ****
  	}
  	else
  	{
! #define STACKBUFLEN		1024
! 
! 		char		a1buf[STACKBUFLEN];
! 		char		a2buf[STACKBUFLEN];
  		char	   *a1p,
  				   *a2p;
  
--- 1380,1387 ----
  	}
  	else
  	{
! 		char		a1buf[TEXTBUFLEN];
! 		char		a2buf[TEXTBUFLEN];
  		char	   *a1p,
  				   *a2p;
  
*************** varstr_cmp(char *arg1, int len1, char *a
*** 1393,1416 ****
  			int			a2len;
  			int			r;
  
! 			if (len1 >= STACKBUFLEN / 2)
  			{
  				a1len = len1 * 2 + 2;
  				a1p = palloc(a1len);
  			}
  			else
  			{
! 				a1len = STACKBUFLEN;
  				a1p = a1buf;
  			}
! 			if (len2 >= STACKBUFLEN / 2)
  			{
  				a2len = len2 * 2 + 2;
  				a2p = palloc(a2len);
  			}
  			else
  			{
! 				a2len = STACKBUFLEN;
  				a2p = a2buf;
  			}
  
--- 1415,1438 ----
  			int			a2len;
  			int			r;
  
! 			if (len1 >= TEXTBUFLEN / 2)
  			{
  				a1len = len1 * 2 + 2;
  				a1p = palloc(a1len);
  			}
  			else
  			{
! 				a1len = TEXTBUFLEN;
  				a1p = a1buf;
  			}
! 			if (len2 >= TEXTBUFLEN / 2)
  			{
  				a2len = len2 * 2 + 2;
  				a2p = palloc(a2len);
  			}
  			else
  			{
! 				a2len = TEXTBUFLEN;
  				a2p = a2buf;
  			}
  
*************** varstr_cmp(char *arg1, int len1, char *a
*** 1475,1485 ****
  		}
  #endif   /* WIN32 */
  
! 		if (len1 >= STACKBUFLEN)
  			a1p = (char *) palloc(len1 + 1);
  		else
  			a1p = a1buf;
! 		if (len2 >= STACKBUFLEN)
  			a2p = (char *) palloc(len2 + 1);
  		else
  			a2p = a2buf;
--- 1497,1507 ----
  		}
  #endif   /* WIN32 */
  
! 		if (len1 >= TEXTBUFLEN)
  			a1p = (char *) palloc(len1 + 1);
  		else
  			a1p = a1buf;
! 		if (len2 >= TEXTBUFLEN)
  			a2p = (char *) palloc(len2 + 1);
  		else
  			a2p = a2buf;
*************** bttextcmp(PG_FUNCTION_ARGS)
*** 1683,1688 ****
--- 1705,2008 ----
  	PG_RETURN_INT32(result);
  }
  
+ Datum
+ bttextsortsupport(PG_FUNCTION_ARGS)
+ {
+ 	SortSupport		ssup = (SortSupport) PG_GETARG_POINTER(0);
+ 	Oid				collid = ssup->ssup_collation;
+ 
+ 	btpoorman_worker(ssup, collid);
+ 
+ 	PG_RETURN_VOID();
+ }
+ 
+ static void
+ btpoorman_worker(SortSupport ssup, Oid collid)
+ {
+ 	TextSortSupport	   *tss;
+ 
+ 	/*
+ 	 * If LC_COLLATE = C, we can make things quite a bit faster by using
+ 	 * memcmp() rather than strcoll().  To minimize the per-comparison
+ 	 * overhead, we make this decision just once for the whole sort.  For now,
+ 	 * don't apply the "small string optimization" to C-locale, even though
+ 	 * that should work (without any strxfrm() processing).
+ 	 */
+ 	if (lc_collate_is_c(collid))
+ 	{
+ 		ssup->comparator = bttextfastcmp_c;
+ 		ssup->converter = NULL;
+ 		ssup->proper = NULL;
+ 		return;
+ 	}
+ 
+ 	/*
+ 	 * WIN32 requires complex hacks when the database encoding is UTF-8
+ 	 * (except when using the "C" collation).  For now, we don't optimize that
+ 	 * case.
+ 	 */
+ #ifdef WIN32
+ 	if (GetDatabaseEncoding() == PG_UTF8)
+ 		PG_RETURN_VOID();
+ #endif /* WIN32 */
+ 
+ 	/*
+ 	 * We need a collation-sensitive comparison.  To make things faster,
+ 	 * we'll figure out the collation based on the locale id and cache the
+ 	 * result.  Also, since strxfrm()/strcoll() require NULL-terminated inputs,
+ 	 * prepare one or two palloc'd buffers to use as temporary workspace.  In
+ 	 * the ad-hoc comparison case we only use palloc'd buffers when we need
+ 	 * more space than we're comfortable allocating on the stack, but here we
+ 	 * can keep the buffers around for the whole sort, so it makes sense to
+ 	 * allocate them once and use them unconditionally (although we won't
+ 	 * actually need them when sorting proper actually begins and strxfrm()
+ 	 * conversion has already occurred, when sorting a leading key).
+ 	 */
+ 	tss = MemoryContextAlloc(ssup->ssup_cxt, sizeof(TextSortSupport));
+ #ifdef HAVE_LOCALE_T
+ 	tss->locale = 0;
+ #endif
+ 
+ 	if (collid != DEFAULT_COLLATION_OID)
+ 	{
+ 		if (!OidIsValid(collid))
+ 		{
+ 			/*
+ 			 * This typically means that the parser could not resolve a
+ 			 * conflict of implicit collations, so report it that way.
+ 			 */
+ 			ereport(ERROR,
+ 					(errcode(ERRCODE_INDETERMINATE_COLLATION),
+ 					 errmsg("could not determine which collation to use for string comparison"),
+ 					 errhint("Use the COLLATE clause to set the collation explicitly.")));
+ 		}
+ #ifdef HAVE_LOCALE_T
+ 		tss->locale = pg_newlocale_from_collation(collid);
+ #endif
+ 	}
+ 
+ 	tss->buf1 = MemoryContextAlloc(ssup->ssup_cxt, TEXTBUFLEN);
+ 	tss->buflen1 = TEXTBUFLEN;
+ 	tss->buf2 = MemoryContextAlloc(ssup->ssup_cxt, TEXTBUFLEN);
+ 	tss->buflen2 = TEXTBUFLEN;
+ 
+ 	ssup->ssup_extra = tss;
+ 	if (!ssup->leading)
+ 	{
+ 		ssup->comparator = bttextfastcmp_locale;
+ 		ssup->converter = NULL;
+ 		ssup->proper = NULL;
+ 		return;
+ 	}
+ 
+ 	ssup->comparator = bttextfastcmp_locale_poorman;
+ 	ssup->converter = bttext_convert;
+ 
+ 	ssup->proper = MemoryContextAlloc(CurrentMemoryContext, sizeof(SortSupportData));
+ 	ssup->proper->ssup_cxt = ssup->ssup_cxt;
+ 	ssup->proper->ssup_collation = ssup->ssup_collation;
+ 	ssup->proper->ssup_reverse = ssup->ssup_reverse;
+ 	ssup->proper->ssup_nulls_first = ssup->ssup_nulls_first;
+ 	ssup->proper->ssup_attno = ssup->ssup_attno;
+ 
+ 	/*
+ 	 * Initialize the "proper" sortsupport state with a reliable
+ 	 * strcoll()-based comparator for tie-breaking.  This is the same thing
+ 	 * that this routine does directly for non-leading keys, which never have
+ 	 * poor man's normalized keys.
+ 	 *
+ 	 * Conceptually, this is the "true" leading key, which is technically not
+ 	 * actually leading.  The pseudo leading poor man's key is the leading key
+ 	 * for tuplesort's purposes.
+ 	 */
+ 	ssup->proper->leading = false;
+ 	btpoorman_worker(ssup->proper, collid);
+ }
+ 
+ static int
+ bttextfastcmp_c(Datum x, Datum y, SortSupport ssup)
+ {
+ 	text	   *arg1 = DatumGetTextPP(x);
+ 	text	   *arg2 = DatumGetTextPP(y);
+ 	char	   *a1p,
+ 			   *a2p;
+ 	int			len1,
+ 				len2,
+ 	result;
+ 
+ 	a1p = VARDATA_ANY(arg1);
+ 	a2p = VARDATA_ANY(arg2);
+ 
+ 	len1 = VARSIZE_ANY_EXHDR(arg1);
+ 	len2 = VARSIZE_ANY_EXHDR(arg2);
+ 
+ 	result = memcmp(a1p, a2p, Min(len1, len2));
+ 	if ((result == 0) && (len1 != len2))
+ 		result = (len1 < len2) ? -1 : 1;
+ 
+ 	/* We can't afford to leak memory here. */
+ 	if (PointerGetDatum(arg1) != x)
+ 		pfree(arg1);
+ 	if (PointerGetDatum(arg2) != y)
+ 		pfree(arg2);
+ 
+ 	return result;
+ }
+ 
+ static int
+ bttextfastcmp_locale(Datum x, Datum y, SortSupport ssup)
+ {
+ 	text	   *arg1 = DatumGetTextPP(x);
+ 	text	   *arg2 = DatumGetTextPP(y);
+ 	TextSortSupport	   *tss = (TextSortSupport *) ssup->ssup_extra;
+ 	char	   *a1p,
+ 			   *a2p;
+ 	int			len1,
+ 				len2,
+ 	result;
+ 
+ 	a1p = VARDATA_ANY(arg1);
+ 	a2p = VARDATA_ANY(arg2);
+ 
+ 	len1 = VARSIZE_ANY_EXHDR(arg1);
+ 	len2 = VARSIZE_ANY_EXHDR(arg2);
+ 
+ 	if (len1 >= tss->buflen1)
+ 	{
+ 		pfree(tss->buf1);
+ 		tss->buflen1 *= 2;
+ 		tss->buf1 = MemoryContextAlloc(ssup->ssup_cxt, tss->buflen1);
+ 	}
+ 	if (len2 >= tss->buflen2)
+ 	{
+ 		pfree(tss->buf2);
+ 		tss->buflen2 *= 2;
+ 		tss->buf2 = MemoryContextAlloc(ssup->ssup_cxt, tss->buflen2);
+ 	}
+ 
+ 	memcpy(tss->buf1, a1p, len1);
+ 	tss->buf1[len1] = '\0';
+ 	memcpy(tss->buf2, a2p, len2);
+ 	tss->buf2[len2] = '\0';
+ 
+ #ifdef HAVE_LOCALE_T
+ 	if (tss->locale)
+ 		result = strcoll_l(tss->buf1, tss->buf2, tss->locale);
+ 	else
+ #endif
+ 		result = strcoll(tss->buf1, tss->buf2);
+ 
+ 	/*
+ 	 * In some locales strcoll() can claim that nonidentical strings are equal.
+ 	 * Believing that would be bad news for a number of reasons, so we follow
+ 	 * Perl's lead and sort "equal" strings according to strcmp().
+ 	 */
+ 	if (result == 0)
+ 		result = strcmp(tss->buf1, tss->buf2);
+ 
+ 	/* We can't afford to leak memory here. */
+ 	if (PointerGetDatum(arg1) != x)
+ 		pfree(arg1);
+ 	if (PointerGetDatum(arg2) != y)
+ 		pfree(arg2);
+ 
+ 	return result;
+ }
+ 
+ /*
+  * This is only used for the leading column of a sort operation, where
+  * bttext_convert() has already prepared a poor man's normalized key
+  * representation.  The result returned here can only be trusted when non-zero.
+  */
+ static int
+ bttextfastcmp_locale_poorman(Datum x, Datum y, SortSupport ssup)
+ {
+ 	int result;
+ 	char* a = (char*) &x;
+ 	char* b = (char*) &y;
+ 	int len1, len2;
+ 
+ 	len1 = strlen(a);
+ 	len2 = strlen(b);
+ 
+ 	result = memcmp(a, b, Min(len1, len2));
+ 
+ 	/*
+ 	 * Length isn't usually reliable here, so when result = 0, leave it up to
+ 	 * the bttextfastcmp_locale() tie-breaker that the core system is obligated
+ 	 * to perform (rather than matching bttextfastcmp_c()'s length tie-break
+ 	 * behavior).
+ 	 *
+ 	 * Note that even a strcmp() on two non-truncated strxfrm() blobs cannot
+ 	 * indicate equality in a way that is generally trustworthy, for the same
+ 	 * reason that there is a strcoll() strcmp() tie-breaker (there'd still
+ 	 * need to be a strcmp() tie-breaker on the *original* string).
+ 	 */
+ 	return result;
+ }
+ 
+ static Datum
+ bttext_convert(Datum d, SortSupport ssup)
+ {
+ 	Datum				res;
+ 	char			   *pres;
+ 	int					len;
+ 	TextSortSupport	   *tss = (TextSortSupport *) ssup->ssup_extra;
+ 	text			   *full = DatumGetTextPP(d);
+ 	Size				bsize;
+ 
+ 	/*
+ 	 * Convert text into a "poor man's normalized key".  This is a
+ 	 * pass-by-value Datum that is treated as a cstring buffer.  A similar
+ 	 * technique is sometimes called the "small string optimization", used
+ 	 * where a string can be stored in place (typically, there is a
+ 	 * pointer/pass-by-value array union).
+ 	 *
+ 	 * Initialize res to all zeroes.
+ 	 */
+ 	res = 0;
+ 	pres = (char*) &res;
+ 	len = VARSIZE_ANY_EXHDR(full);
+ 
+ 	/* By convention, we use buffer 1 */
+ 	if (len >= tss->buflen1)
+ 	{
+ 		pfree(tss->buf1);
+ 		tss->buflen1 *= 2;
+ 		tss->buf1 = MemoryContextAlloc(ssup->ssup_cxt, tss->buflen1);
+ 	}
+ 
+ 	memcpy(tss->buf1, VARDATA_ANY(full), len);
+ 	tss->buf1[len] = '\0';
+ 
+ retry:
+ 
+ #ifdef HAVE_LOCALE_T
+ 	if (tss->locale)
+ 		bsize = strxfrm_l(tss->buf2, tss->buf1, tss->buflen2, tss->locale);
+ 	else
+ #endif
+ 		bsize = strxfrm(tss->buf2, tss->buf1, tss->buflen2);
+ 
+ 	if (bsize >= tss->buflen2)
+ 	{
+ 		/*
+ 		 * The standard requires that we fit the entire blob in temp buffer in
+ 		 * order for subsequent comparisons to be well-defined.  glibc doesn't
+ 		 * actually require this, but it doesn't seem to help performance to
+ 		 * depend on that.
+ 		 */
+ 		pfree(tss->buf2);
+ 		tss->buflen2 = Max(bsize + 1, tss->buflen2 * 2);
+ 		tss->buf2 = MemoryContextAlloc(ssup->ssup_cxt, tss->buflen2);
+ 		goto retry;
+ 	}
+ 
+ 	memcpy(pres, tss->buf2, Min(sizeof(Datum), tss->buflen2));
+ 	pres[Min(sizeof(Datum) - 1, tss->buflen2)] = '\0';
+ 
+ 	return res;
+ }
  
  Datum
  text_larger(PG_FUNCTION_ARGS)
*** a/src/backend/utils/sort/sortsupport.c
--- b/src/backend/utils/sort/sortsupport.c
*************** PrepareSortSupportComparisonShim(Oid cmp
*** 82,87 ****
--- 82,89 ----
  
  	ssup->ssup_extra = extra;
  	ssup->comparator = comparison_shim;
+ 	ssup->converter = NULL;
+ 	ssup->proper = NULL;
  }
  
  /*
*************** PrepareSortSupportFromOrderingOp(Oid ord
*** 104,109 ****
--- 106,113 ----
  		elog(ERROR, "operator %u is not a valid ordering operator",
  			 orderingOp);
  
+ 	ssup->converter = NULL;
+ 
  	if (issupport)
  	{
  		/* The sort support function should provide a comparator */
*** a/src/backend/utils/sort/tuplesort.c
--- b/src/backend/utils/sort/tuplesort.c
*************** bool		optimize_bounded_sort = true;
*** 150,156 ****
   * When sorting single Datums, the data value is represented directly by
   * datum1/isnull1.	If the datatype is pass-by-reference and isnull1 is false,
   * then datum1 points to a separately palloc'd data value that is also pointed
!  * to by the "tuple" pointer; otherwise "tuple" is NULL.
   *
   * While building initial runs, tupindex holds the tuple's run number.  During
   * merge passes, we re-use it to hold the input tape number that each tuple in
--- 150,159 ----
   * When sorting single Datums, the data value is represented directly by
   * datum1/isnull1.	If the datatype is pass-by-reference and isnull1 is false,
   * then datum1 points to a separately palloc'd data value that is also pointed
!  * to by the "tuple" pointer; otherwise "tuple" is NULL.  Note that there are
!  * some exceptions, as when the sort support infrastructure provides a "poor
!  * man's normalized key" representation.   When that occurs, extra precautions
!  * are taken when a comparison involving a pair of datum1s returns 0.
   *
   * While building initial runs, tupindex holds the tuple's run number.  During
   * merge passes, we re-use it to hold the input tape number that each tuple in
*************** tuplesort_begin_heap(TupleDesc tupDesc,
*** 647,657 ****
  		sortKey->ssup_collation = sortCollations[i];
  		sortKey->ssup_nulls_first = nullsFirstFlags[i];
  		sortKey->ssup_attno = attNums[i];
  
  		PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
  	}
! 
! 	if (nkeys == 1)
  		state->onlyKey = state->sortKeys;
  
  	MemoryContextSwitchTo(oldcontext);
--- 650,667 ----
  		sortKey->ssup_collation = sortCollations[i];
  		sortKey->ssup_nulls_first = nullsFirstFlags[i];
  		sortKey->ssup_attno = attNums[i];
+ 		sortKey->leading = (i == 0);
  
  		PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
  	}
! 	/*
! 	 * The "onlyKey" optimization cannot be used when a tie-breaker for an
! 	 * unreliable poor man's normalized key comparison is required.  Typically,
! 	 * the optimization is only of significant value to pass-by-value types
! 	 * anyway, whereas poor man's normalized keys are typically used by
! 	 * pass-by-reference types.
! 	 */
! 	if (nkeys == 1 && !state->sortKeys->converter)
  		state->onlyKey = state->sortKeys;
  
  	MemoryContextSwitchTo(oldcontext);
*************** comparetup_heap(const SortTuple *a, cons
*** 2870,2875 ****
--- 2880,2907 ----
  	rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
  	rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET);
  	tupDesc = state->tupDesc;
+ 	/* If leading comparison was unreliable, do reliable comparison */
+ 	if (state->sortKeys->converter)
+ 	{
+ 		AttrNumber	attno = sortKey->ssup_attno;
+ 		Datum		datum1,
+ 					datum2;
+ 		bool		isnull1,
+ 					isnull2;
+ 
+ 		Assert(attno == sortKey->proper->ssup_attno);
+ 		Assert(sortKey->leading);
+ 
+ 		datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
+ 		datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
+ 
+ 		compare = ApplySortComparator(datum1, isnull1,
+ 									  datum2, isnull2,
+ 									  sortKey->proper);
+ 		if (compare != 0)
+ 			return compare;
+ 	}
+ 
  	sortKey++;
  	for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++)
  	{
*************** copytup_heap(Tuplesortstate *state, Sort
*** 2910,2919 ****
  	/* set up first-column key value */
  	htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
  	htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
! 	stup->datum1 = heap_getattr(&htup,
! 								state->sortKeys[0].ssup_attno,
! 								state->tupDesc,
! 								&stup->isnull1);
  }
  
  static void
--- 2942,2971 ----
  	/* set up first-column key value */
  	htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
  	htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
! 	if (!state->sortKeys->converter)
! 	{
! 		/* Store ordinary Datum representation */
! 		stup->datum1 = heap_getattr(&htup,
! 									state->sortKeys[0].ssup_attno,
! 									state->tupDesc,
! 									&stup->isnull1);
! 	}
! 	else
! 	{
! 		Datum		temp;
! 
! 		/*
! 		 * Store "poor man's normalized key", which cannot indicate equality in
! 		 * a trustworthy manner, and may require a tie-breaker
! 		 */
! 		Assert(state->sortKeys[0].leading);
! 		temp = heap_getattr(&htup,
! 							state->sortKeys[0].ssup_attno,
! 							state->tupDesc,
! 							&stup->isnull1);
! 
! 		stup->datum1 = state->sortKeys->converter(temp, state->sortKeys);
! 	}
  }
  
  static void
*** a/src/include/catalog/pg_amproc.h
--- b/src/include/catalog/pg_amproc.h
*************** DATA(insert (	1989   26 26 1 356 ));
*** 122,127 ****
--- 122,128 ----
  DATA(insert (	1989   26 26 2 3134 ));
  DATA(insert (	1991   30 30 1 404 ));
  DATA(insert (	1994   25 25 1 360 ));
+ DATA(insert (	1994   25 25 2 3492 ));
  DATA(insert (	1996   1083 1083 1 1107 ));
  DATA(insert (	2000   1266 1266 1 1358 ));
  DATA(insert (	2002   1562 1562 1 1672 ));
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
*************** DATA(insert OID = 3135 ( btnamesortsuppo
*** 610,615 ****
--- 610,617 ----
  DESCR("sort support");
  DATA(insert OID = 360 (  bttextcmp		   PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 23 "25 25" _null_ _null_ _null_ _null_ bttextcmp _null_ _null_ _null_ ));
  DESCR("less-equal-greater");
+ DATA(insert OID = 3492 ( bttextsortsupport PGNSP PGUID 12 1 0 0 0 f f f f t f i 1 0 2278 "2281" _null_ _null_ _null_ _null_ bttextsortsupport _null_ _null_ _null_ ));
+ DESCR("sort support");
  DATA(insert OID = 377 (  cash_cmp		   PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 23 "790 790" _null_ _null_ _null_ _null_ cash_cmp _null_ _null_ _null_ ));
  DESCR("less-equal-greater");
  DATA(insert OID = 380 (  btreltimecmp	   PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 23 "703 703" _null_ _null_ _null_ _null_ btreltimecmp _null_ _null_ _null_ ));
*** a/src/include/utils/builtins.h
--- b/src/include/utils/builtins.h
*************** extern Datum bttintervalcmp(PG_FUNCTION_
*** 312,317 ****
--- 312,318 ----
  extern Datum btcharcmp(PG_FUNCTION_ARGS);
  extern Datum btnamecmp(PG_FUNCTION_ARGS);
  extern Datum bttextcmp(PG_FUNCTION_ARGS);
+ extern Datum bttextsortsupport(PG_FUNCTION_ARGS);
  
  /*
   *		Per-opclass sort support functions for new btrees.	Like the
*** a/src/include/utils/sortsupport.h
--- b/src/include/utils/sortsupport.h
*************** typedef struct SortSupportData
*** 59,64 ****
--- 59,65 ----
  	 */
  	MemoryContext ssup_cxt;		/* Context containing sort info */
  	Oid			ssup_collation; /* Collation to use, or InvalidOid */
+ 	bool		leading;		/* Leading (poor man-applicable) key? */
  
  	/*
  	 * Additional sorting parameters; but unlike ssup_collation, these can be
*************** typedef struct SortSupportData
*** 69,75 ****
  	bool		ssup_nulls_first;		/* sort nulls first? */
  
  	/*
! 	 * These fields are workspace for callers, and should not be touched by
  	 * opclass-specific functions.
  	 */
  	AttrNumber	ssup_attno;		/* column number to sort */
--- 70,76 ----
  	bool		ssup_nulls_first;		/* sort nulls first? */
  
  	/*
! 	 * These fields are workspace for callers, and should only be touched by
  	 * opclass-specific functions.
  	 */
  	AttrNumber	ssup_attno;		/* column number to sort */
*************** typedef struct SortSupportData
*** 81,86 ****
--- 82,97 ----
  	void	   *ssup_extra;		/* Workspace for opclass functions */
  
  	/*
+ 	 * Alternative "proper" SortSupport state for leading key, used only when
+ 	 * an unreliable poor man's normalized key comparator returns 0, and
+ 	 * tuplesort must use this separate state to perform a trustworthy
+ 	 * comparison.  This relates to the same attribute as our ssup_attno.  The
+ 	 * core system expects to be able to check if this is set, and pass it back
+ 	 * after having this SortSupport state return an uncertain 0 result.
+ 	 */
+ 	struct SortSupportData *proper;
+ 
+ 	/*
  	 * Function pointers are zeroed before calling the BTSORTSUPPORT function,
  	 * and must be set by it for any acceleration methods it wants to supply.
  	 * The comparator pointer must be set, others are optional.
*************** typedef struct SortSupportData
*** 96,103 ****
  	int			(*comparator) (Datum x, Datum y, SortSupport ssup);
  
  	/*
! 	 * Additional sort-acceleration functions might be added here later.
  	 */
  } SortSupportData;
  
  
--- 107,120 ----
  	int			(*comparator) (Datum x, Datum y, SortSupport ssup);
  
  	/*
! 	 * Given a datum involving an opclass with relevant support, use this
! 	 * callback to convert to a pass-by-value untrustworthy Datum, or poor
! 	 * man's normalized key.  Our sort support comparator expects this.  When
! 	 * this is set, tuplesort knows not to trust a comparison results of 0.
! 	 * However, -1 or 1 can be trusted, and taken as a perfect proxy for what
! 	 * the full leading key would indicate.
  	 */
+ 	Datum		(*converter) (Datum d, SortSupport ssup);
  } SortSupportData;
  
  
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to