On Tue, Oct 4, 2011 at 1:46 PM, Heikki Linnakangas <
heikki.linnakan...@enterprisedb.com> wrote:

> Ok. Could you phrase that as a code comment?
>
> Here's a version of the patch I've been working on. There's no functional
> changes, just a lot of moving things around, comment changes, etc. to
> hopefully make it more readable.


Thanks for your work on this patch. Patch with comment is attached.

------
With best regards,
Alexander Korotkov.
*** a/src/backend/access/gist/gistproc.c
--- b/src/backend/access/gist/gistproc.c
***************
*** 26,31 **** static bool gist_box_leaf_consistent(BOX *key, BOX *query,
--- 26,35 ----
  static double size_box(Datum dbox);
  static bool rtree_internal_consistent(BOX *key, BOX *query,
  						  StrategyNumber strategy);
+ static BOX *empty_box(void);
+ 
+ /* Minimal possible ratio of split */
+ #define LIMIT_RATIO 0.3
  
  
  /**************************************************
***************
*** 49,78 **** rt_box_union(PG_FUNCTION_ARGS)
  	PG_RETURN_BOX_P(n);
  }
  
- static Datum
- rt_box_inter(PG_FUNCTION_ARGS)
- {
- 	BOX		   *a = PG_GETARG_BOX_P(0);
- 	BOX		   *b = PG_GETARG_BOX_P(1);
- 	BOX		   *n;
- 
- 	n = (BOX *) palloc(sizeof(BOX));
- 
- 	n->high.x = Min(a->high.x, b->high.x);
- 	n->high.y = Min(a->high.y, b->high.y);
- 	n->low.x = Max(a->low.x, b->low.x);
- 	n->low.y = Max(a->low.y, b->low.y);
- 
- 	if (n->high.x < n->low.x || n->high.y < n->low.y)
- 	{
- 		pfree(n);
- 		/* Indicate "no intersection" by returning NULL pointer */
- 		n = NULL;
- 	}
- 
- 	PG_RETURN_BOX_P(n);
- }
- 
  /*
   * The GiST Consistent method for boxes
   *
--- 53,58 ----
***************
*** 194,279 **** gist_box_penalty(PG_FUNCTION_ARGS)
  	PG_RETURN_POINTER(result);
  }
  
- static void
- chooseLR(GIST_SPLITVEC *v,
- 		 OffsetNumber *list1, int nlist1, BOX *union1,
- 		 OffsetNumber *list2, int nlist2, BOX *union2)
- {
- 	bool		firstToLeft = true;
- 
- 	if (v->spl_ldatum_exists || v->spl_rdatum_exists)
- 	{
- 		if (v->spl_ldatum_exists && v->spl_rdatum_exists)
- 		{
- 			BOX			LRl = *union1,
- 						LRr = *union2;
- 			BOX			RLl = *union2,
- 						RLr = *union1;
- 			double		sizeLR,
- 						sizeRL;
- 
- 			adjustBox(&LRl, DatumGetBoxP(v->spl_ldatum));
- 			adjustBox(&LRr, DatumGetBoxP(v->spl_rdatum));
- 			adjustBox(&RLl, DatumGetBoxP(v->spl_ldatum));
- 			adjustBox(&RLr, DatumGetBoxP(v->spl_rdatum));
- 
- 			sizeLR = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&LRl), BoxPGetDatum(&LRr)));
- 			sizeRL = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&RLl), BoxPGetDatum(&RLr)));
- 
- 			if (sizeLR > sizeRL)
- 				firstToLeft = false;
- 
- 		}
- 		else
- 		{
- 			float		p1,
- 						p2;
- 			GISTENTRY	oldUnion,
- 						addon;
- 
- 			gistentryinit(oldUnion, (v->spl_ldatum_exists) ? v->spl_ldatum : v->spl_rdatum,
- 						  NULL, NULL, InvalidOffsetNumber, FALSE);
- 
- 			gistentryinit(addon, BoxPGetDatum(union1), NULL, NULL, InvalidOffsetNumber, FALSE);
- 			DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p1));
- 			gistentryinit(addon, BoxPGetDatum(union2), NULL, NULL, InvalidOffsetNumber, FALSE);
- 			DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p2));
- 
- 			if ((v->spl_ldatum_exists && p1 > p2) || (v->spl_rdatum_exists && p1 < p2))
- 				firstToLeft = false;
- 		}
- 	}
- 
- 	if (firstToLeft)
- 	{
- 		v->spl_left = list1;
- 		v->spl_right = list2;
- 		v->spl_nleft = nlist1;
- 		v->spl_nright = nlist2;
- 		if (v->spl_ldatum_exists)
- 			adjustBox(union1, DatumGetBoxP(v->spl_ldatum));
- 		v->spl_ldatum = BoxPGetDatum(union1);
- 		if (v->spl_rdatum_exists)
- 			adjustBox(union2, DatumGetBoxP(v->spl_rdatum));
- 		v->spl_rdatum = BoxPGetDatum(union2);
- 	}
- 	else
- 	{
- 		v->spl_left = list2;
- 		v->spl_right = list1;
- 		v->spl_nleft = nlist2;
- 		v->spl_nright = nlist1;
- 		if (v->spl_ldatum_exists)
- 			adjustBox(union2, DatumGetBoxP(v->spl_ldatum));
- 		v->spl_ldatum = BoxPGetDatum(union2);
- 		if (v->spl_rdatum_exists)
- 			adjustBox(union1, DatumGetBoxP(v->spl_rdatum));
- 		v->spl_rdatum = BoxPGetDatum(union1);
- 	}
- 
- 	v->spl_ldatum_exists = v->spl_rdatum_exists = false;
- }
- 
  /*
   * Trivial split: half of entries will be placed on one page
   * and another half - to another
--- 174,179 ----
***************
*** 338,536 **** fallbackSplit(GistEntryVector *entryvec, GIST_SPLITVEC *v)
  }
  
  /*
!  * The GiST PickSplit method
   *
!  * New linear algorithm, see 'New Linear Node Splitting Algorithm for R-tree',
!  * C.H.Ang and T.C.Tan
   *
!  * This is used for both boxes and points.
   */
  Datum
  gist_box_picksplit(PG_FUNCTION_ARGS)
  {
  	GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
  	GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
! 	OffsetNumber i;
! 	OffsetNumber *listL,
! 			   *listR,
! 			   *listB,
! 			   *listT;
! 	BOX		   *unionL,
! 			   *unionR,
! 			   *unionB,
! 			   *unionT;
! 	int			posL,
! 				posR,
! 				posB,
! 				posT;
! 	BOX			pageunion;
! 	BOX		   *cur;
! 	char		direction = ' ';
! 	bool		allisequal = true;
! 	OffsetNumber maxoff;
! 	int			nbytes;
  
- 	posL = posR = posB = posT = 0;
  	maxoff = entryvec->n - 1;
  
! 	cur = DatumGetBoxP(entryvec->vector[FirstOffsetNumber].key);
! 	memcpy((void *) &pageunion, (void *) cur, sizeof(BOX));
  
! 	/* find MBR */
! 	for (i = OffsetNumberNext(FirstOffsetNumber); i <= maxoff; i = OffsetNumberNext(i))
  	{
! 		cur = DatumGetBoxP(entryvec->vector[i].key);
! 		if (allisequal && (
! 						   pageunion.high.x != cur->high.x ||
! 						   pageunion.high.y != cur->high.y ||
! 						   pageunion.low.x != cur->low.x ||
! 						   pageunion.low.y != cur->low.y
! 						   ))
! 			allisequal = false;
! 
! 		adjustBox(&pageunion, cur);
  	}
  
! 	if (allisequal)
  	{
  		/*
! 		 * All entries are the same
  		 */
  		fallbackSplit(entryvec, v);
  		PG_RETURN_POINTER(v);
  	}
  
  	nbytes = (maxoff + 2) * sizeof(OffsetNumber);
! 	listL = (OffsetNumber *) palloc(nbytes);
! 	listR = (OffsetNumber *) palloc(nbytes);
! 	listB = (OffsetNumber *) palloc(nbytes);
! 	listT = (OffsetNumber *) palloc(nbytes);
! 	unionL = (BOX *) palloc(sizeof(BOX));
! 	unionR = (BOX *) palloc(sizeof(BOX));
! 	unionB = (BOX *) palloc(sizeof(BOX));
! 	unionT = (BOX *) palloc(sizeof(BOX));
! 
! #define ADDLIST( list, unionD, pos, num ) do { \
! 	if ( pos ) { \
! 		if ( (unionD)->high.x < cur->high.x ) (unionD)->high.x	= cur->high.x; \
! 		if ( (unionD)->low.x  > cur->low.x	) (unionD)->low.x	= cur->low.x; \
! 		if ( (unionD)->high.y < cur->high.y ) (unionD)->high.y	= cur->high.y; \
! 		if ( (unionD)->low.y  > cur->low.y	) (unionD)->low.y	= cur->low.y; \
! 	} else { \
! 			memcpy( (void*)(unionD), (void*) cur, sizeof( BOX ) );	\
! 	} \
! 	(list)[pos] = num; \
! 	(pos)++; \
! } while(0)
  
! 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
! 	{
! 		cur = DatumGetBoxP(entryvec->vector[i].key);
! 		if (cur->low.x - pageunion.low.x < pageunion.high.x - cur->high.x)
! 			ADDLIST(listL, unionL, posL, i);
! 		else
! 			ADDLIST(listR, unionR, posR, i);
! 		if (cur->low.y - pageunion.low.y < pageunion.high.y - cur->high.y)
! 			ADDLIST(listB, unionB, posB, i);
! 		else
! 			ADDLIST(listT, unionT, posT, i);
! 	}
  
! #define LIMIT_RATIO 0.1
! #define _IS_BADRATIO(x,y)	( (y) == 0 || (float)(x)/(float)(y) < LIMIT_RATIO )
! #define IS_BADRATIO(x,y) ( _IS_BADRATIO((x),(y)) || _IS_BADRATIO((y),(x)) )
! 	/* bad disposition, try to split by centers of boxes  */
! 	if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB))
  	{
! 		double		avgCenterX = 0.0,
! 					avgCenterY = 0.0;
! 		double		CenterX,
! 					CenterY;
  
! 		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
  		{
! 			cur = DatumGetBoxP(entryvec->vector[i].key);
! 			avgCenterX += ((double) cur->high.x + (double) cur->low.x) / 2.0;
! 			avgCenterY += ((double) cur->high.y + (double) cur->low.y) / 2.0;
  		}
! 
! 		avgCenterX /= maxoff;
! 		avgCenterY /= maxoff;
! 
! 		posL = posR = posB = posT = 0;
! 		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
  		{
! 			cur = DatumGetBoxP(entryvec->vector[i].key);
! 
! 			CenterX = ((double) cur->high.x + (double) cur->low.x) / 2.0;
! 			CenterY = ((double) cur->high.y + (double) cur->low.y) / 2.0;
  
! 			if (CenterX < avgCenterX)
! 				ADDLIST(listL, unionL, posL, i);
! 			else if (CenterX == avgCenterX)
  			{
! 				if (posL > posR)
! 					ADDLIST(listR, unionR, posR, i);
! 				else
! 					ADDLIST(listL, unionL, posL, i);
  			}
  			else
- 				ADDLIST(listR, unionR, posR, i);
- 
- 			if (CenterY < avgCenterY)
- 				ADDLIST(listB, unionB, posB, i);
- 			else if (CenterY == avgCenterY)
  			{
! 				if (posB > posT)
! 					ADDLIST(listT, unionT, posT, i);
! 				else
! 					ADDLIST(listB, unionB, posB, i);
  			}
- 			else
- 				ADDLIST(listT, unionT, posT, i);
  		}
! 
! 		if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB))
  		{
! 			fallbackSplit(entryvec, v);
! 			PG_RETURN_POINTER(v);
  		}
  	}
  
! 	/* which split more optimal? */
! 	if (Max(posL, posR) < Max(posB, posT))
! 		direction = 'x';
! 	else if (Max(posL, posR) > Max(posB, posT))
! 		direction = 'y';
! 	else
  	{
! 		Datum		interLR = DirectFunctionCall2(rt_box_inter,
! 												  BoxPGetDatum(unionL),
! 												  BoxPGetDatum(unionR));
! 		Datum		interBT = DirectFunctionCall2(rt_box_inter,
! 												  BoxPGetDatum(unionB),
! 												  BoxPGetDatum(unionT));
! 		double		sizeLR,
! 					sizeBT;
! 
! 		sizeLR = size_box(interLR);
! 		sizeBT = size_box(interBT);
! 
! 		if (sizeLR < sizeBT)
! 			direction = 'x';
! 		else
! 			direction = 'y';
! 	}
  
! 	if (direction == 'x')
! 		chooseLR(v,
! 				 listL, posL, unionL,
! 				 listR, posR, unionR);
! 	else
! 		chooseLR(v,
! 				 listB, posB, unionB,
! 				 listT, posT, unionT);
  
  	PG_RETURN_POINTER(v);
  }
  
--- 238,825 ----
  }
  
  /*
!  * Represents information about entry which can be present to any group without
!  * affecting overlap over selected axis ("common entry").
!  */
! typedef struct
! {
! 	/* Index of entry in the initial array */
! 	int			index;
! 	/* Delta between penalties of entry insertion into different groups */
! 	double		delta;
! }	CommonEntry;
! 
! /*
!  * Context for g_box_consider_split. Contains information about currently
!  * selected split and some general information.
!  */
! typedef struct
! {
! 	/* number of splitting entries */
! 	int			entriesCount;
! 	/* minimum bounding box across all entries */
! 	BOX			boundingBox;
! 
! 	/* Information about currently selected split follows */
! 
! 	bool		first;	/* true if no split was selected yet */
! 
! 	double		leftUpper;		/* upper bound of left interval */
! 	double		rightLower;		/* lower bound of right interval */
! 
! 	float4		ratio;
! 	float4		overlap;
! 	int			dim;			/* axis of this split */
! 	double		range;			/* width of general MBR projection to the selected axis */
! }	ConsiderSplitContext;
! 
! /*
!  * Interval represents projection of box to axis.
!  */
! typedef struct
! {
! 	double		lower,
! 				upper;
! }	SplitInterval;
! 
! /*
!  * Interval comparison function by lower bound of the interval;
!  */
! static int
! interval_cmp_lower(const void *i1, const void *i2)
! {
! 	double		lower1 = ((SplitInterval *) i1)->lower,
! 				lower2 = ((SplitInterval *) i2)->lower;
! 
! 	if (lower1 < lower2)
! 		return -1;
! 	else if (lower1 > lower2)
! 		return 1;
! 	else
! 		return 0;
! }
! 
! /*
!  * Interval comparison function by upper bound of the interval;
!  */
! static int
! interval_cmp_upper(const void *i1, const void *i2)
! {
! 	double		upper1 = ((SplitInterval *) i1)->upper,
! 				upper2 = ((SplitInterval *) i2)->upper;
! 
! 	if (upper1 < upper2)
! 		return -1;
! 	else if (upper1 > upper2)
! 		return 1;
! 	else
! 		return 0;
! }
! 
! /*
!  * Replace negative value with zero.
!  */
! static inline float
! non_negative(float val)
! {
! 	if (val >= 0.0f)
! 		return val;
! 	else
! 		return 0.0f;
! }
! 
! /*
!  * Consider replacement of currently selected split with the better one.
!  */
! static void inline
! g_box_consider_split(ConsiderSplitContext *context, int dimNum,
! 					 double rightLower, int minLeftCount,
! 					 double leftUpper, int maxLeftCount)
! {
! 	int			leftCount,
! 				rightCount;
! 	float4		ratio,
! 				overlap;
! 	double		range;
! 
! 	/*
! 	 * Calculate entries distribution ration assuming most uniform
! 	 * distribution of common entries.
! 	 */
! 	if (minLeftCount >= (context->entriesCount + 1) / 2)
! 	{
! 		leftCount = minLeftCount;
! 	}
! 	else
! 	{
! 		if (maxLeftCount <= context->entriesCount / 2)
! 			leftCount = maxLeftCount;
! 		else
! 			leftCount = context->entriesCount / 2;
! 	}
! 	rightCount = context->entriesCount - leftCount;
! 
! 	/*
! 	 * Ratio of split - quotient between size of lesser group and total entries
! 	 * count.
! 	 */
! 	ratio = ((float4) Min(leftCount, rightCount)) /
! 		((float4) context->entriesCount);
! 
! 	if (ratio > LIMIT_RATIO)
! 	{
! 		bool selectthis = false;
! 
! 		/*
! 		 * The ratio is acceptable, so compare current split with previously
! 		 * selected one. Between splits of one dimension we search for
! 		 * minimal overlap (allowing negative values) and minimal ration
! 		 * (between same overlaps. We switch dimension if find less overlap
! 		 * (non-negative) or less range with same overlap.
! 		 */
! 		if (dimNum == 0)
! 			range = context->boundingBox.high.x - context->boundingBox.low.x;
! 		else
! 			range = context->boundingBox.high.y - context->boundingBox.low.y;
! 
! 		overlap = (leftUpper - rightLower) / range;
! 
! 		/* If there is no previous selection, select this */
! 		if (context->first)
! 			selectthis = true;
! 		else if (context->dim == dimNum)
! 		{
! 			/*
! 			 * Within the same dimension, choose the new split if it has a
! 			 * smaller overlap, or same overlap but better ratio.
! 			 */
! 			if (overlap < context->overlap ||
! 				(overlap == context->overlap && ratio > context->ratio))
! 				selectthis = true;
! 		}
! 		else
! 		{
! 			/*
! 			 * Across dimensions, choose the new split if it has a
! 			 * smaller *non-negative* overlap, or same *non-negative* overlap
! 			 * but bigger range. This condition differs from described in the
! 			 * article. On the datasets where leaf MBRs don't overlap
! 			 * themselves, non-overlapping splits (i.e. splits which have zero
! 			 * *non-negative* overlap) are frequently possible. In this case
! 			 * splits tends to be along one dimension, because most distant
! 			 * non-overlapping splits (i.e. having lowest negative overlap)
! 			 * appears to be in the same dimension as in the previous split.
! 			 * Therefore MBRs appear to be very prolonged along another
! 			 * dimension, that leads to bad search performance. Using range
! 			 * as the second split criteria makes MBRs more quadratic. Using
! 			 * *non-negative* overlap instead of overlap as the first split
! 			 * criteria gives to range criteria chance to matter, because
! 			 * non-overlapping splits are equivalent in this criteria.
! 			 */
! 			if (non_negative(overlap) < non_negative(context->overlap) ||
! 				(range > context->range &&
! 				 non_negative(overlap) <= non_negative(context->overlap)))
! 				selectthis = true;
! 		}
! 
! 		if (selectthis)
! 		{
! 			/* save information about selected split */
! 			context->first = false;
! 			context->ratio = ratio;
! 			context->range = range;
! 			context->overlap = overlap;
! 			context->rightLower = rightLower;
! 			context->leftUpper = leftUpper;
! 			context->dim = dimNum;
! 		}
! 	}
! }
! 
! /*
!  * Create new empty BOX
!  */
! static BOX *
! empty_box(void)
! {
! 	BOX		   *result;
! 
! 	result = (BOX *) palloc(sizeof(BOX));
! 	result->low.x = 0.0f;
! 	result->low.y = 0.0f;
! 	result->high.x = 0.0f;
! 	result->high.y = 0.0f;
! 	return result;
! }
! 
! /*
!  * Return increase of original BOX area by new BOX area insertion.
!  */
! static double
! box_penalty(BOX *original, BOX *new)
! {
! 	double		union_width,
! 				union_height;
! 
! 	union_width = Max(original->high.x, new->high.x) -
! 		Min(original->low.x, new->low.x);
! 	union_height = Max(original->high.y, new->high.y) -
! 		Min(original->low.y, new->low.y);
! 	return union_width * union_height - (original->high.x - original->low.x) *
! 		(original->high.y - original->low.y);
! }
! 
! /*
!  * Compare common entries by their deltas.
!  */
! static int
! common_entry_cmp(const void *i1, const void *i2)
! {
! 	double		delta1 = ((CommonEntry *) i1)->delta,
! 				delta2 = ((CommonEntry *) i2)->delta;
! 
! 	if (delta1 < delta2)
! 		return -1;
! 	else if (delta1 > delta2)
! 		return 1;
! 	else
! 		return 0;
! }
! 
! /*
!  * --------------------------------------------------------------------------
!  * Double sorting split algorithm. Finds split of boxes by considering splits
!  * along each axis. Searching splits along axes are based on the notions of
!  * split pair and corner split pair.
   *
!  * Split pair is a pair <a, b> where 'a' is upper bound of left group and 'b'
!  * is lower bound of right group. Lower bound or left group and upper bound of
!  * right group are assumed to be the general bounds along axis.
   *
!  * Corner split pair is a split pair <a, b> where 'a' is upper bound of some
!  * interval, 'b' is lower bound of some, 'a' can't be lower or 'b' can't be
!  * higher with split pair property preservation.
!  *
!  * Split along axis divides entries to the following groups:
!  * 1) Entries which should be placed to the left group
!  * 2) Entries which should be placed to the right group
!  * 3) "Common entries" which can be placed to any of groups without affecting
!  *	  of overlap along selected axis.
!  *
!  * Split along axis is selected by overlap along that axis and some other
!  * criteria (see g_box_consider_split). When split along axis is selected,
!  * "common entries" are distributed by penalty with group boxes.
!  *
!  * For details see:
!  * "A new double sorting-based node splitting algorithm for R-tree", A. Korotkov
!  * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36
!  * --------------------------------------------------------------------------
   */
  Datum
  gist_box_picksplit(PG_FUNCTION_ARGS)
  {
  	GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
  	GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
! 	OffsetNumber i,
! 				maxoff;
! 	ConsiderSplitContext context;
! 	BOX		   *box,
! 			   *leftBox,
! 			   *rightBox;
! 	int			dim,
! 				commonEntriesCount,
! 				nbytes;
! 	SplitInterval *intervalsLower,
! 			   *intervalsUpper;
! 	CommonEntry *commonEntries;
! 	int			nentries;
! 
! #define PLACE_LEFT(box, off)					\
! 	do {										\
! 		if (v->spl_nleft > 0)					\
! 			adjustBox(leftBox, box);			\
! 		else									\
! 			*leftBox = *(box);					\
! 		v->spl_left[v->spl_nleft++] = off;		\
! 	} while(0)
! 
! #define PLACE_RIGHT(box, off)					\
! 	do {										\
! 		if (v->spl_nright > 0)					\
! 			adjustBox(rightBox, box);			\
! 		else									\
! 			*rightBox = *(box);					\
! 		v->spl_right[v->spl_nright++] = off;	\
! 	} while(0)
! 
! 	memset(&context, 0, sizeof(ConsiderSplitContext));
  
  	maxoff = entryvec->n - 1;
+ 	nentries = context.entriesCount = maxoff - FirstOffsetNumber + 1;
  
! 	/* Allocate arrays for intervals along axes */
! 	intervalsLower = (SplitInterval *) palloc(nentries * sizeof(SplitInterval));
! 	intervalsUpper = (SplitInterval *) palloc(nentries * sizeof(SplitInterval));
  
! 	/*
! 	 * Calculate the overall minimum bounding box over all the entries.
! 	 */
! 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
  	{
! 		box = DatumGetBoxP(entryvec->vector[i].key);
! 		if (i == FirstOffsetNumber)
! 			context.boundingBox = *box;
! 		else
! 			adjustBox(&context.boundingBox, box);
  	}
  
! 	/*
! 	 * Iterate over axes for optimal split searching.
! 	 */
! 	context.first = true;		/* nothing selected yet */
! 	for (dim = 0; dim < 2; dim++)
  	{
+ 		double		leftUpper,
+ 					rightLower;
+ 		int			i1,
+ 					i2;
+ 
+ 		/* Prepare array of intervals along axis */
+ 		for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+ 		{
+ 			box = DatumGetBoxP(entryvec->vector[i].key);
+ 			if (dim == 0)
+ 			{
+ 				intervalsLower[i - FirstOffsetNumber].lower = box->low.x;
+ 				intervalsLower[i - FirstOffsetNumber].upper = box->high.x;
+ 			}
+ 			else
+ 			{
+ 				intervalsLower[i - FirstOffsetNumber].lower = box->low.y;
+ 				intervalsLower[i - FirstOffsetNumber].upper = box->high.y;
+ 			}
+ 		}
+ 
  		/*
! 		 * Make two arrays of intervals: sorted by lower bound and sorted by
! 		 * upper bound.
  		 */
+ 		memcpy(intervalsUpper, intervalsLower,
+ 			   sizeof(SplitInterval) * nentries);
+ 		qsort(intervalsLower, nentries, sizeof(SplitInterval),
+ 			  interval_cmp_lower);
+ 		qsort(intervalsUpper, nentries, sizeof(SplitInterval),
+ 			  interval_cmp_upper);
+ 
+ 		/*
+ 		 * Iterate over lower bound of right group finding corresponding
+ 		 * lowest upper bound of left group.
+ 		 */
+ 		i1 = 0;
+ 		i2 = 0;
+ 		rightLower = intervalsLower[i1].lower;
+ 		leftUpper = intervalsUpper[i2].lower;
+ 
+ 		while (true)
+ 		{
+ 			/*
+ 			 * Find next lower bound of right group.
+ 			 */
+ 			while (i1 < nentries && rightLower == intervalsLower[i1].lower)
+ 			{
+ 				leftUpper = Max(leftUpper, intervalsLower[i1].upper);
+ 				i1++;
+ 			}
+ 			if (i1 >= nentries)
+ 				break;
+ 			rightLower = intervalsLower[i1].lower;
+ 
+ 			/*
+ 			 * Find count of intervals which anyway should be placed to the
+ 			 * left group.
+ 			 */
+ 			while (i2 < nentries && intervalsUpper[i2].upper <= leftUpper)
+ 				i2++;
+ 
+ 			/*
+ 			 * Consider found split.
+ 			 */
+ 			g_box_consider_split(&context, dim, rightLower, i1, leftUpper, i2);
+ 		}
+ 
+ 		/*
+ 		 * Iterate over upper bound of left group finding corresponding
+ 		 * highest lower bound of right group.
+ 		 */
+ 		i1 = nentries - 1;
+ 		i2 = nentries - 1;
+ 		rightLower = intervalsLower[i1].upper;
+ 		leftUpper = intervalsUpper[i2].upper;
+ 
+ 		while (true)
+ 		{
+ 			/*
+ 			 * Find next upper bound of left group.
+ 			 */
+ 			while (i2 >= 0 && leftUpper == intervalsUpper[i2].upper)
+ 			{
+ 				rightLower = Min(rightLower, intervalsUpper[i2].lower);
+ 				i2--;
+ 			}
+ 			if (i2 < 0)
+ 				break;
+ 			leftUpper = intervalsUpper[i2].upper;
+ 
+ 			/*
+ 			 * Find count of intervals which anyway should be placed to the
+ 			 * right group.
+ 			 */
+ 			while (i1 >= 0 && intervalsLower[i1].lower >= rightLower)
+ 				i1--;
+ 
+ 			/*
+ 			 * Consider found split.
+ 			 */
+ 			g_box_consider_split(&context, dim,
+ 								 rightLower, i1 + 1, leftUpper, i2 + 1);
+ 		}
+ 	}
+ 
+ 	/*
+ 	 * If we failed to find any acceptable splits, use trivial split.
+ 	 */
+ 	if (context.first)
+ 	{
  		fallbackSplit(entryvec, v);
  		PG_RETURN_POINTER(v);
  	}
  
+ 	/* Allocate vectors for results */
  	nbytes = (maxoff + 2) * sizeof(OffsetNumber);
! 	v->spl_left = (OffsetNumber *) palloc(nbytes);
! 	v->spl_right = (OffsetNumber *) palloc(nbytes);
! 	v->spl_nleft = 0;
! 	v->spl_nright = 0;
  
! 	/* Allocate boxes of left and right groups */
! 	leftBox = empty_box();
! 	rightBox = empty_box();
  
! 	/*
! 	 * Allocate an array for "Common entries" - entries which can be placed
! 	 * to either group without affecting overlap along selected axis.
! 	 */
! 	commonEntriesCount = 0;
! 	commonEntries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry));
! 
! 	/*
! 	 * Distribute entries which can be distributed unambiguously, and collect
! 	 * "common entries".
! 	 */
! 	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
  	{
! 		double		lower,
! 					upper;
  
! 		/*
! 		 * Get upper and lower bounds along selected axis.
! 		 */
! 		box = DatumGetBoxP(entryvec->vector[i].key);
! 		if (context.dim == 0)
  		{
! 			lower = box->low.x;
! 			upper = box->high.x;
  		}
! 		else
  		{
! 			lower = box->low.y;
! 			upper = box->high.y;
! 		}
  
! 		if (upper <= context.leftUpper)
! 		{
! 			/* Fits to the left group */
! 			if (lower >= context.rightLower)
  			{
! 				/* Fits also to the right group, so "common entry" */
! 				commonEntries[commonEntriesCount++].index = i;
  			}
  			else
  			{
! 				/* Doesn't fit to the right group, so join to the left group */
! 				PLACE_LEFT(box, i);
  			}
  		}
! 		else
  		{
! 			/*
! 			 * Each entry should fit on either left or right group, so since
! 			 * this didn't fit on the left group, it better fit in the right
! 			 * group.
! 			 */
! 			Assert(lower >= context.rightLower);
! 
! 			/* Doesn't fit to the left group, so join to the right group */
! 			PLACE_RIGHT(box, i);
  		}
  	}
  
! 	/*
! 	 * Distribute "common entries", if any
! 	 */
! 	if (commonEntriesCount > 0)
  	{
! 		/*
! 		 * Calculate minimum number of entries that must be placed in both
! 		 * groups, to reach LIMIT_RATIO.
! 		 */
! 		int		m = ceil(LIMIT_RATIO * (double) nentries);
  
! 		/*
! 		 * Calculate delta between penalties of join "common entries" to
! 		 * different groups.
! 		 */
! 		for (i = 0; i < commonEntriesCount; i++)
! 		{
! 			box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key);
! 			commonEntries[i].delta = Abs(box_penalty(leftBox, box) -
! 										 box_penalty(rightBox, box));
! 		}
! 
! 		/*
! 		 * Sort "common entries" by calculated deltas in order to distribute
! 		 * the most ambiguous entries first.
! 		 */
! 		qsort(commonEntries, commonEntriesCount, sizeof(CommonEntry), common_entry_cmp);
! 
! 		/*
! 		 * Distribute "common entries" between groups.
! 		 */
! 		for (i = 0; i < commonEntriesCount; i++)
! 		{
! 			box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key);
! 
! 			/*
! 			 * Check if we have to place this entry in either group to achieve 
! 			 * LIMIT_RATIO.
! 			 */
! 			if (v->spl_nleft + (commonEntriesCount - i) <= m)
! 				PLACE_LEFT(box, commonEntries[i].index);
! 			else if (v->spl_nright + (commonEntriesCount - i) <= m)
! 				PLACE_RIGHT(box, commonEntries[i].index);
! 			else
! 			{
! 				/* Otherwise select the group by minimal penalty */
! 				if (box_penalty(leftBox, box) < box_penalty(rightBox, box))
! 					PLACE_LEFT(box, commonEntries[i].index);
! 				else
! 					PLACE_RIGHT(box, commonEntries[i].index);
! 			}
! 		}
! 	}
  
+ 	v->spl_ldatum = PointerGetDatum(leftBox);
+ 	v->spl_rdatum = PointerGetDatum(rightBox);
  	PG_RETURN_POINTER(v);
  }
  
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to