On 9 February 2012 14:51, Robert Haas <robertmh...@gmail.com> wrote:
> I'm not sure I entirely follow all this, but I'll look at the code
> once you have it.

I have attached a revision of the patch, with the adjustments already
described. Note that I have not made this support btree tuplesorting
yet, as there is an impedance mismatch that must be resolved,
particularly with the SortSupport stuff, and I wanted to know what you
think of the multiple key specialisation first. Arguably, we could get
away with only a single specialisation - I haven't really though about
it much.

You say "Well, how often will we sort 10,000 integers?", and I think
that btree index creation is one very common and useful case, so I'd
like to look at that in more detail. I certainly don't see any reason
to not do it too.

This should give you performance for sorting multiple-keys that is
almost as good as the single-key optimisation that you found to be
more compelling. Obviously the need to actually call comparetup_heap
to look at non-leading sortkeys will vary from case to case, and this
is based on your test case, where there are no duplicates and thus no
need to ever do that. That isn't too far from representative, as I
think that in general, a majority of comparisons won't result in
equality.

-- 
Peter Geoghegan       http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training and Services
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
new file mode 100644
index 1452e8c..e040ace
*** a/src/backend/utils/sort/tuplesort.c
--- b/src/backend/utils/sort/tuplesort.c
***************
*** 113,118 ****
--- 113,119 ----
  #include "utils/pg_rusage.h"
  #include "utils/rel.h"
  #include "utils/sortsupport.h"
+ #include "utils/template_qsort_arg.h"
  #include "utils/tuplesort.h"
  
  
*************** struct Tuplesortstate
*** 281,286 ****
--- 282,301 ----
  	int			currentRun;
  
  	/*
+ 	 * Will invocations of a tuple-class encapsulating comparator
+ 	 * (comparetup_heap, comparetup_index_btree, etc) skip the leading key,
+ 	 * because that has already been compared elsewhere?
+ 	 */
+ 	bool		skipLeadingkey;
+ 
+ 	/*
+ 	 * This specialization function pointer is sometimes used as an alternative
+ 	 * to the standard qsort_arg, when it has been determined that we can
+ 	 * benefit from various per number-of-sortkey performance optimizations.
+ 	 */
+ 	void (*qsort_arg_spec)(void *a, size_t n, size_t es, void *arg);
+ 
+ 	/*
  	 * Unless otherwise noted, all pointer variables below are pointers to
  	 * arrays of length maxTapes, holding per-tape data.
  	 */
*************** static void readtup_datum(Tuplesortstate
*** 492,497 ****
--- 507,519 ----
  static void reversedirection_datum(Tuplesortstate *state);
  static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
  
+ /*
+  * A set of type-neutral specializations, for single sortkey and multiple
+  * sortkey sorts. These specialization are used for sorting both heap and
+  * btree index tuples that meet that criteria.
+  */
+ DO_TEMPLATE_QSORT_ARG(single, ONE_ADDITIONAL_CODE, single_comparetup_inline)
+ DO_TEMPLATE_QSORT_ARG(mult, MULTI_ADDITIONAL_CODE, mult_comparetup_inline)
  
  /*
   *		tuplesort_begin_xxx
*************** tuplesort_begin_heap(TupleDesc tupDesc,
*** 631,636 ****
--- 653,661 ----
  		PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
  	}
  
+ 	state->qsort_arg_spec =
+ 		nkeys==1?single_qsort_arg:mult_qsort_arg;
+ 	state->skipLeadingkey = true;
  	MemoryContextSwitchTo(oldcontext);
  
  	return state;
*************** tuplesort_performsort(Tuplesortstate *st
*** 1222,1232 ****
  			 * amount of memory.  Just qsort 'em and we're done.
  			 */
  			if (state->memtupcount > 1)
! 				qsort_arg((void *) state->memtuples,
! 						  state->memtupcount,
! 						  sizeof(SortTuple),
! 						  (qsort_arg_comparator) state->comparetup,
! 						  (void *) state);
  			state->current = 0;
  			state->eof_reached = false;
  			state->markpos_offset = 0;
--- 1247,1273 ----
  			 * amount of memory.  Just qsort 'em and we're done.
  			 */
  			if (state->memtupcount > 1)
! 			{
! 				/* Use a sorting specialization if available */
! 				if (state->qsort_arg_spec)
! 					/* specialization available */
! 					state->qsort_arg_spec(
! 								(void *) state->memtuples,
! 								state->memtupcount,
! 								sizeof(SortTuple),
! 								(void *) state);
! 				else
! 					/*
! 					 * Fall back on regular qsort_arg, with function pointer
! 					 * comparator, making no compile-time assumptions about the
! 					 * number of sortkeys or the datatype(s) to be sorted.
! 					 */
! 					qsort_arg((void *) state->memtuples,
! 								  state->memtupcount,
! 								  sizeof(SortTuple),
! 								  (qsort_arg_comparator) state->comparetup,
! 								  (void *) state);
! 			}
  			state->current = 0;
  			state->eof_reached = false;
  			state->markpos_offset = 0;
*************** make_bounded_heap(Tuplesortstate *state)
*** 2407,2412 ****
--- 2448,2454 ----
  	REVERSEDIRECTION(state);
  
  	state->memtupcount = 0;		/* make the heap empty */
+ 	state->skipLeadingkey = false; /* Not using inlining optimization */
  	for (i = 0; i < tupcount; i++)
  	{
  		if (state->memtupcount >= state->bound &&
*************** comparetup_heap(const SortTuple *a, cons
*** 2660,2674 ****
  	int			nkey;
  	int32		compare;
  
! 	/* Allow interrupting long sorts */
! 	CHECK_FOR_INTERRUPTS();
! 
! 	/* Compare the leading sort key */
! 	compare = ApplySortComparator(a->datum1, a->isnull1,
! 								  b->datum1, b->isnull1,
! 								  sortKey);
! 	if (compare != 0)
! 		return compare;
  
  	/* Compare additional sort keys */
  	ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
--- 2702,2716 ----
  	int			nkey;
  	int32		compare;
  
! 	if (!state->skipLeadingkey)
! 	{
! 		/* Compare the leading sort key */
! 		compare = ApplySortComparator(a->datum1, a->isnull1,
! 									  b->datum1, b->isnull1,
! 									  sortKey);
! 		if (compare != 0)
! 			return compare;
! 	}
  
  	/* Compare additional sort keys */
  	ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
*************** comparetup_index_btree(const SortTuple *
*** 2981,2990 ****
  					   Tuplesortstate *state)
  {
  	/*
! 	 * This is similar to _bt_tuplecompare(), but we have already done the
! 	 * index_getattr calls for the first column, and we need to keep track of
! 	 * whether any null fields are present.  Also see the special treatment
! 	 * for equal keys at the end.
  	 */
  	ScanKey		scanKey = state->indexScanKey;
  	IndexTuple	tuple1;
--- 3023,3031 ----
  					   Tuplesortstate *state)
  {
  	/*
! 	 * We have already done the index_getattr calls for the first column, and we
! 	 * need to keep track of whether any null fields are present.  Also see the
! 	 * special treatment for equal keys at the end.
  	 */
  	ScanKey		scanKey = state->indexScanKey;
  	IndexTuple	tuple1;
diff --git a/src/include/c.h b/src/include/c.h
new file mode 100644
index 7396adb..939f49a
*** a/src/include/c.h
--- b/src/include/c.h
*************** extern int	fdatasync(int fildes);
*** 850,853 ****
--- 850,865 ----
  /* /port compatibility functions */
  #include "port.h"
  
+ /*
+  * Define a cross-platform "always-inline" macro. This is a very sharp tool that
+  * should be used judiciously.
+  */
+ #ifdef __always_inline
+ #define pg_always_inline __always_inline
+ #elif defined(__force_inline)
+ #define pg_always_inline __force_inline
+ #else
+ #define pg_always_inline inline
+ #endif
+ 
  #endif   /* C_H */
diff --git a/src/include/utils/sortsupport.h b/src/include/utils/sortsupport.h
new file mode 100644
index ef8d853..37b3e80
*** a/src/include/utils/sortsupport.h
--- b/src/include/utils/sortsupport.h
*************** typedef struct SortSupportData
*** 94,103 ****
  	 * INT_MIN, as callers are allowed to negate the result before using it.
  	 */
  	int			(*comparator) (Datum x, Datum y, SortSupport ssup);
- 
- 	/*
- 	 * Additional sort-acceleration functions might be added here later.
- 	 */
  } SortSupportData;
  
  
--- 94,99 ----
diff --git a/src/include/utils/template_qsort_arg.h b/src/include/utils/template_qsort_arg.h
new file mode 100644
index ...9ede3f8
*** a/src/include/utils/template_qsort_arg.h
--- b/src/include/utils/template_qsort_arg.h
***************
*** 0 ****
--- 1,219 ----
+ /*-------------------------------------------------------------------------
+  *	template_qsort_arg.h: "template" version of qsort_arg.c
+  *
+  *	This version of qsort_arg is exclusively used within tuplesort.c to	more
+  *	efficiently sort common types such as integers and floats. In providing this
+  *	version, we seek to take advantage of compile-time optimizations, in
+  *	particular, the inlining of comparators.
+  *
+  *	The TEMPLATE_QSORT_ARG() macro generates an inlining variant (for sorts with
+  *	a single sortKey) and non-inlining variant for sorts with multiple sortKeys.
+  *
+  *	We rely on various function declarations, and indeed partially duplicate
+  *	some code from tuplesort.c, so this file should be considered private to
+  *	that module, rather than a generic piece of infrastructure. However, it is
+  *	reasonable for clients to use this infrastructure to generate their own
+  *	sorting specializations outside of that translation unit.
+  *
+  *	CAUTION: if you change this file, see also qsort_arg.c as well as qsort.c.
+  *	qsort_arg.c should be considered authoritative.
+  *
+  *	Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+  *	Portions Copyright (c) 1994, Regents of the University of California
+  *
+  *	src/include/utils/template_qsort_arg.h
+  *-------------------------------------------------------------------------
+  */
+ #include "c.h"
+ 
+ #define swapcode(TYPE, parmi, parmj, n)		\
+ do {										\
+ 	size_t i = (n) / sizeof (TYPE);			\
+ 	TYPE *pi = (TYPE *)(void *)(parmi);		\
+ 	TYPE *pj = (TYPE *)(void *)(parmj);		\
+ 	do {									\
+ 		TYPE	t = *pi;					\
+ 		*pi++ = *pj;						\
+ 		*pj++ = t;							\
+ 		} while (--i > 0);					\
+ } while (0)
+ 
+ #define SWAPINIT(a, es) swaptype = ((char *)(a) - (char *)0) % sizeof(long) || \
+ 	(es) % sizeof(long) ? 2 : (es) == sizeof(long)? 0 : 1;
+ 
+ #define thistype_vecswap(a, b, n)	 if ((n) > 0) swapfunc((a), (b), (size_t)(n), swaptype)
+ 
+ #define thistype_swap(a, b)								\
+ 	if (swaptype == 0) {								\
+ 		long t = *(long *)(void *)(a);					\
+ 		*(long *)(void *)(a) = *(long *)(void *)(b);	\
+ 		*(long *)(void *)(b) = t;						\
+ 	} else												\
+ 		swapfunc(a, b, es, swaptype)
+ 
+ pg_always_inline static void
+ swapfunc(char *a, char *b, size_t n, int swaptype)
+ {
+ 	if (swaptype <= 1)
+ 		swapcode(long, a, b, n);
+ 	else
+ 		swapcode(char, a, b, n);
+ }
+ 
+ /*
+  * This macro manufactures a type-specific implementation of qsort_arg with
+  * the comparator, COMPAR, known at compile time. COMPAR is typically an
+  * inline function.
+  *
+  * COMPAR should take as its arguments two Datums, and return an int, in
+  * line with standard qsort convention.
+  *
+  * We have void* parameters for comparetup_inline just to shut up the compiler.
+  * They could be SortTuple pointers instead, but that would make it more
+  * difficult to keep template_qsort_arg.h consistent with tuplesort.c.
+  */
+ 
+ #define DO_TEMPLATE_QSORT_ARG(SPEC_VAR, ADD_CODE, OUT_COMPAR)			\
+ void SPEC_VAR##_qsort_arg(void *a, size_t n, size_t es, void *arg);		\
+ 																					\
+ /*																					\
+  * Note that this is heavily based on comparetup_inline; the two should be kept		\
+  * consistent.																		\
+  */																					\
+ pg_always_inline static int															\
+ SPEC_VAR##_comparetup_inline(const void *a, const void *b,							\
+ 		Tuplesortstate *state)														\
+ {																					\
+ 	const SortTuple* aT = a;														\
+ 	const SortTuple* bT = b;														\
+ 	int32		compare;															\
+ 	SortSupport	sortKey = state->sortKeys;											\
+ 																					\
+ 	/* Compare the leading sort key */												\
+ 	compare = ApplySortComparator(aT->datum1, aT->isnull1,							\
+ 								  bT->datum1, bT->isnull1,							\
+ 								  sortKey);											\
+ 	if (compare != 0)																\
+ 		return compare;																\
+ 	/* Additional code (varies according to number of sortKeys) */					\
+ 	ADD_CODE																		\
+ 	return 0;																		\
+ }																					\
+ 																					\
+ pg_always_inline static char *									\
+ SPEC_VAR##med3(char *a, char *b, char *c, void *arg)			\
+ {																\
+ 	return OUT_COMPAR(a, b, arg) < 0 ?							\
+ 		(OUT_COMPAR(b, c, arg) < 0 ?							\
+ 					b : (OUT_COMPAR(a, c, arg) < 0 ? c : a))	\
+ 		: (OUT_COMPAR(b, c, arg) > 0 ?							\
+ 					b : (OUT_COMPAR(a, c, arg) < 0 ? a : c));	\
+ }																\
+ 																\
+ void																		\
+ SPEC_VAR##_qsort_arg(void *a, size_t n, size_t es, void *arg)				\
+ {																			\
+ 	char	   *pa,															\
+ 			   *pb,															\
+ 			   *pc,															\
+ 			   *pd,															\
+ 			   *pl,															\
+ 			   *pm,															\
+ 			   *pn;															\
+ 	int			d,															\
+ 				r,															\
+ 				swaptype,													\
+ 				presorted;													\
+ 																			\
+ loop:SWAPINIT(a, es);														\
+ 	if (n < 7)																\
+ 	{																		\
+ 		for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)		\
+ 			for (pl = pm; pl > (char *) a &&								\
+ 					OUT_COMPAR(pl - es, pl, arg) > 0;						\
+ 				 pl -= es)													\
+ 				thistype_swap(pl, pl - es);									\
+ 		return;																\
+ 	}																		\
+ 	presorted = 1;															\
+ 	for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)			\
+ 	{																		\
+ 		if (OUT_COMPAR(pm - es, pm, arg) > 0)								\
+ 		{																	\
+ 			presorted = 0;													\
+ 			break;															\
+ 		}																	\
+ 	}																		\
+ 	if (presorted)															\
+ 		return;																\
+ 	pm = (char *) a + (n / 2) * es;											\
+ 	if (n > 7)																\
+ 	{																		\
+ 		pl = (char *) a;													\
+ 		pn = (char *) a + (n - 1) * es;										\
+ 		if (n > 40)															\
+ 		{																	\
+ 			d = (n / 8) * es;												\
+ 			pl = SPEC_VAR##med3(pl, pl + d, pl + 2 * d, arg);				\
+ 			pm = SPEC_VAR##med3(pm - d, pm, pm + d, arg);					\
+ 			pn = SPEC_VAR##med3(pn - 2 * d, pn - d, pn, arg);				\
+ 		}																	\
+ 		pm = SPEC_VAR##med3(pl, pm, pn, arg);								\
+ 	}																		\
+ 	thistype_swap(a, pm);													\
+ 	pa = pb = (char *) a + es;												\
+ 	pc = pd = (char *) a + (n - 1) * es;									\
+ 	for (;;)																\
+ 	{																		\
+ 		while (pb <= pc &&													\
+ 					(r = OUT_COMPAR(pb, a, arg)) <= 0)						\
+ 		{																	\
+ 			if (r == 0)														\
+ 			{																\
+ 				thistype_swap(pa, pb);										\
+ 				pa += es;													\
+ 			}																\
+ 			pb += es;														\
+ 		}																	\
+ 		while (pb <= pc &&													\
+ 				(r = OUT_COMPAR(pc, a, arg)) >= 0)							\
+ 		{																	\
+ 			if (r == 0)														\
+ 			{																\
+ 				thistype_swap(pc, pd);										\
+ 				pd -= es;													\
+ 			}																\
+ 			pc -= es;														\
+ 		}																	\
+ 		if (pb > pc)														\
+ 			break;															\
+ 		thistype_swap(pb, pc);												\
+ 		pb += es;															\
+ 		pc -= es;															\
+ 	}																		\
+ 	pn = (char *) a + n * es;												\
+ 	r = Min(pa - (char *) a, pb - pa);										\
+ 	thistype_vecswap(a, pb - r, r);											\
+ 	r = Min(pd - pc, pn - pd - es);											\
+ 	thistype_vecswap(pb, pn - r, r);										\
+ 	if ((r = pb - pa) > es)													\
+ 		SPEC_VAR##_qsort_arg(a, r / es, es, arg);							\
+ 	if ((r = pd - pc) > es)													\
+ 	{																		\
+ 		/* Iterate rather than recurse to save stack space */				\
+ 		a = pn - r;															\
+ 		n = r / es;															\
+ 		goto loop;															\
+ 	}																		\
+ }
+ 
+ /*
+  * Additional code, used depending on the specialization.
+  */
+ #define MULTI_ADDITIONAL_CODE												\
+ {																			\
+ 	Assert(state->nKeys > 1);												\
+ 	return state->comparetup(aT, bT, state);								\
+ }
+ 
+ #define ONE_ADDITIONAL_CODE Assert(state->nKeys == 1);
diff --git a/src/port/qsort.c b/src/port/qsort.c
new file mode 100644
index 8e2c6d9..d1981d6
*** a/src/port/qsort.c
--- b/src/port/qsort.c
***************
*** 7,13 ****
   *	  Remove ill-considered "swap_cnt" switch to insertion sort,
   *	  in favor of a simple check for presorted input.
   *
!  *	CAUTION: if you change this file, see also qsort_arg.c
   *
   *	src/port/qsort.c
   */
--- 7,14 ----
   *	  Remove ill-considered "swap_cnt" switch to insertion sort,
   *	  in favor of a simple check for presorted input.
   *
!  *	CAUTION: if you change this file, see also qsort_arg.c and
!  *	template_qsort_arg.h
   *
   *	src/port/qsort.c
   */
diff --git a/src/port/qsort_arg.c b/src/port/qsort_arg.c
new file mode 100644
index 28d1894..0ab6198
*** a/src/port/qsort_arg.c
--- b/src/port/qsort_arg.c
***************
*** 7,13 ****
   *	  Remove ill-considered "swap_cnt" switch to insertion sort,
   *	  in favor of a simple check for presorted input.
   *
!  *	CAUTION: if you change this file, see also qsort.c
   *
   *	src/port/qsort_arg.c
   */
--- 7,13 ----
   *	  Remove ill-considered "swap_cnt" switch to insertion sort,
   *	  in favor of a simple check for presorted input.
   *
!  *	CAUTION: if you change this file, see also qsort.c and template_qsort_arg.h
   *
   *	src/port/qsort_arg.c
   */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to