On 04.10.2011 11:51, Alexander Korotkov wrote:
On Tue, Oct 4, 2011 at 12:12 PM, Heikki Linnakangas<
heikki.linnakan...@enterprisedb.com> wrote:
Can you elaborate the consider-split algorithm? The criteria to select the
new split over the previously selected one is this:
! /*
! * If ratio is acceptable, we should compare current split
with
! * previously selected one. If no split was selected then
we select
! * current anyway. Between splits of one dimension we
search for
! * minimal overlap (allowing negative values) and minimal
ration
! * (between same overlaps. We switch dimension if find
less overlap
! * (non-negative) or less range with same overlap.
! */
! range = diminfo->upper - diminfo->lower;
! overlap = ((leftUpper) - (rightLower)) / range;
! if (context->first ||
! (context->dim == dimNum&&
! (overlap< context->overlap ||
! (overlap == context->overlap&& ratio>
context->ratio))) ||
! (context->dim != dimNum&&
! ((range> context->range&&
! non_negative(overlap)<=
non_negative(context->overlap)**) ||
! non_negative(overlap)<
non_negative(context->overlap)**))
! )
! {
Why are negative overlaps handled differently across dimensions and within
the same dimension? Your considerSplit algorithm in the SYRCoSE 2011 paper
doesn't seem to make that distinction.
Yes, I've changed this behaviour after experiments on real-life datasets. On
the datasets where data don't overlap themselfs (points are always such
datasets), non-overlapping splits are frequently possible. In this case
splits tends to be along one dimension, because most distant non-overlapping
splits (i.e. having lowest negative overlap) appears to be in the same
dimension as previous split. Therefore MBRs appear to be very prolonged
along another dimension, that's bad for search. In order to prevent this
behavour I've made transition to another dimension by non-nagative part of
overlap and range. Using range as the split criteria makes MBRs more
quadratic, and using non-negative part of overlap as priority criteria give
to range chance to matter.
Ok. Could you phrase that as a code comment?
Here's a version of the patch I've been working on. There's no
functional changes, just a lot of moving things around, comment changes,
etc. to hopefully make it more readable.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
diff --git a/src/backend/access/gist/gistproc.c b/src/backend/access/gist/gistproc.c
index 43c4b12..af1a42d 100644
--- a/src/backend/access/gist/gistproc.c
+++ b/src/backend/access/gist/gistproc.c
@@ -26,6 +26,10 @@ static bool gist_box_leaf_consistent(BOX *key, BOX *query,
static double size_box(Datum dbox);
static bool rtree_internal_consistent(BOX *key, BOX *query,
StrategyNumber strategy);
+static BOX *empty_box(void);
+
+/* Minimal possible ratio of split */
+#define LIMIT_RATIO 0.3
/**************************************************
@@ -49,30 +53,6 @@ rt_box_union(PG_FUNCTION_ARGS)
PG_RETURN_BOX_P(n);
}
-static Datum
-rt_box_inter(PG_FUNCTION_ARGS)
-{
- BOX *a = PG_GETARG_BOX_P(0);
- BOX *b = PG_GETARG_BOX_P(1);
- BOX *n;
-
- n = (BOX *) palloc(sizeof(BOX));
-
- n->high.x = Min(a->high.x, b->high.x);
- n->high.y = Min(a->high.y, b->high.y);
- n->low.x = Max(a->low.x, b->low.x);
- n->low.y = Max(a->low.y, b->low.y);
-
- if (n->high.x < n->low.x || n->high.y < n->low.y)
- {
- pfree(n);
- /* Indicate "no intersection" by returning NULL pointer */
- n = NULL;
- }
-
- PG_RETURN_BOX_P(n);
-}
-
/*
* The GiST Consistent method for boxes
*
@@ -194,86 +174,6 @@ gist_box_penalty(PG_FUNCTION_ARGS)
PG_RETURN_POINTER(result);
}
-static void
-chooseLR(GIST_SPLITVEC *v,
- OffsetNumber *list1, int nlist1, BOX *union1,
- OffsetNumber *list2, int nlist2, BOX *union2)
-{
- bool firstToLeft = true;
-
- if (v->spl_ldatum_exists || v->spl_rdatum_exists)
- {
- if (v->spl_ldatum_exists && v->spl_rdatum_exists)
- {
- BOX LRl = *union1,
- LRr = *union2;
- BOX RLl = *union2,
- RLr = *union1;
- double sizeLR,
- sizeRL;
-
- adjustBox(&LRl, DatumGetBoxP(v->spl_ldatum));
- adjustBox(&LRr, DatumGetBoxP(v->spl_rdatum));
- adjustBox(&RLl, DatumGetBoxP(v->spl_ldatum));
- adjustBox(&RLr, DatumGetBoxP(v->spl_rdatum));
-
- sizeLR = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&LRl), BoxPGetDatum(&LRr)));
- sizeRL = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&RLl), BoxPGetDatum(&RLr)));
-
- if (sizeLR > sizeRL)
- firstToLeft = false;
-
- }
- else
- {
- float p1,
- p2;
- GISTENTRY oldUnion,
- addon;
-
- gistentryinit(oldUnion, (v->spl_ldatum_exists) ? v->spl_ldatum : v->spl_rdatum,
- NULL, NULL, InvalidOffsetNumber, FALSE);
-
- gistentryinit(addon, BoxPGetDatum(union1), NULL, NULL, InvalidOffsetNumber, FALSE);
- DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p1));
- gistentryinit(addon, BoxPGetDatum(union2), NULL, NULL, InvalidOffsetNumber, FALSE);
- DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p2));
-
- if ((v->spl_ldatum_exists && p1 > p2) || (v->spl_rdatum_exists && p1 < p2))
- firstToLeft = false;
- }
- }
-
- if (firstToLeft)
- {
- v->spl_left = list1;
- v->spl_right = list2;
- v->spl_nleft = nlist1;
- v->spl_nright = nlist2;
- if (v->spl_ldatum_exists)
- adjustBox(union1, DatumGetBoxP(v->spl_ldatum));
- v->spl_ldatum = BoxPGetDatum(union1);
- if (v->spl_rdatum_exists)
- adjustBox(union2, DatumGetBoxP(v->spl_rdatum));
- v->spl_rdatum = BoxPGetDatum(union2);
- }
- else
- {
- v->spl_left = list2;
- v->spl_right = list1;
- v->spl_nleft = nlist2;
- v->spl_nright = nlist1;
- if (v->spl_ldatum_exists)
- adjustBox(union2, DatumGetBoxP(v->spl_ldatum));
- v->spl_ldatum = BoxPGetDatum(union2);
- if (v->spl_rdatum_exists)
- adjustBox(union1, DatumGetBoxP(v->spl_rdatum));
- v->spl_rdatum = BoxPGetDatum(union1);
- }
-
- v->spl_ldatum_exists = v->spl_rdatum_exists = false;
-}
-
/*
* Trivial split: half of entries will be placed on one page
* and another half - to another
@@ -338,199 +238,576 @@ fallbackSplit(GistEntryVector *entryvec, GIST_SPLITVEC *v)
}
/*
- * The GiST PickSplit method
+ * Represents information about entry which can be present to any group without
+ * affecting overlap over selected axis ("common entry").
+ */
+typedef struct
+{
+ /* Index of entry in the initial array */
+ int index;
+ /* Delta between penalties of entry insertion into different groups */
+ double delta;
+} CommonEntry;
+
+/*
+ * Context for g_box_consider_split. Contains information about currently
+ * selected split and some general information.
+ */
+typedef struct
+{
+ /* number of splitting entries */
+ int entriesCount;
+ /* minimum bounding box across all entries */
+ BOX boundingBox;
+
+ /* Information about currently selected split follows */
+
+ bool first; /* true if no split was selected yet */
+
+ double leftUpper; /* upper bound of left interval */
+ double rightLower; /* lower bound of right interval */
+
+ float4 ratio;
+ float4 overlap;
+ int dim; /* axis of this split */
+ double range; /* width of general MBR projection to the selected axis */
+} ConsiderSplitContext;
+
+/*
+ * Interval represents projection of box to axis.
+ */
+typedef struct
+{
+ double lower,
+ upper;
+} SplitInterval;
+
+/*
+ * Interval comparison function by lower bound of the interval;
+ */
+static int
+interval_cmp_lower(const void *i1, const void *i2)
+{
+ double lower1 = ((SplitInterval *) i1)->lower,
+ lower2 = ((SplitInterval *) i2)->lower;
+
+ if (lower1 < lower2)
+ return -1;
+ else if (lower1 > lower2)
+ return 1;
+ else
+ return 0;
+}
+
+/*
+ * Interval comparison function by upper bound of the interval;
+ */
+static int
+interval_cmp_upper(const void *i1, const void *i2)
+{
+ double upper1 = ((SplitInterval *) i1)->upper,
+ upper2 = ((SplitInterval *) i2)->upper;
+
+ if (upper1 < upper2)
+ return -1;
+ else if (upper1 > upper2)
+ return 1;
+ else
+ return 0;
+}
+
+/*
+ * Replace negative value with zero.
+ */
+static inline float
+non_negative(float val)
+{
+ if (val >= 0.0f)
+ return val;
+ else
+ return 0.0f;
+}
+
+/*
+ * Consider replacement of currently selected split with the better one.
+ */
+static void inline
+g_box_consider_split(ConsiderSplitContext *context, int dimNum,
+ double rightLower, int minLeftCount,
+ double leftUpper, int maxLeftCount)
+{
+ int leftCount,
+ rightCount;
+ float4 ratio,
+ overlap;
+ double range;
+
+ /*
+ * Calculate entries distribution ration assuming most uniform
+ * distribution of common entries.
+ */
+ if (minLeftCount >= (context->entriesCount + 1) / 2)
+ {
+ leftCount = minLeftCount;
+ }
+ else
+ {
+ if (maxLeftCount <= context->entriesCount / 2)
+ leftCount = maxLeftCount;
+ else
+ leftCount = context->entriesCount / 2;
+ }
+ rightCount = context->entriesCount - leftCount;
+
+ /*
+ * Ratio of split - quotient between size of lesser group and total entries
+ * count.
+ */
+ ratio = ((float4) Min(leftCount, rightCount)) /
+ ((float4) context->entriesCount);
+
+ if (ratio > LIMIT_RATIO)
+ {
+ bool selectthis = false;
+
+ /*
+ * The ratio is acceptable, so compare current split with previously
+ * selected one. Between splits of one dimension we search for
+ * minimal overlap (allowing negative values) and minimal ration
+ * (between same overlaps. We switch dimension if find less overlap
+ * (non-negative) or less range with same overlap.
+ */
+ if (dimNum == 0)
+ range = context->boundingBox.high.x - context->boundingBox.low.x;
+ else
+ range = context->boundingBox.high.y - context->boundingBox.low.y;
+
+ overlap = (leftUpper - rightLower) / range;
+
+ /* If there is no previous selection, select this */
+ if (context->first)
+ selectthis = true;
+ else if (context->dim == dimNum)
+ {
+ /*
+ * Within the same dimension, choose the new split if it has a
+ * smaller overlap, or same overlap but better ratio.
+ */
+ if (overlap < context->overlap ||
+ (overlap == context->overlap && ratio > context->ratio))
+ selectthis = true;
+ }
+ else
+ {
+ /*
+ * Across dimensions, choose the new split if it has a
+ * smaller *non-negative* overlap, or same overlap but bigger
+ * range.
+ */
+ if (non_negative(overlap) < non_negative(context->overlap) ||
+ (range > context->range &&
+ non_negative(overlap) <= non_negative(context->overlap)))
+ selectthis = true;
+ }
+
+ if (selectthis)
+ {
+ /* save information about selected split */
+ context->first = false;
+ context->ratio = ratio;
+ context->range = range;
+ context->overlap = overlap;
+ context->rightLower = rightLower;
+ context->leftUpper = leftUpper;
+ context->dim = dimNum;
+ }
+ }
+}
+
+/*
+ * Create new empty BOX
+ */
+static BOX *
+empty_box(void)
+{
+ BOX *result;
+
+ result = (BOX *) palloc(sizeof(BOX));
+ result->low.x = 0.0f;
+ result->low.y = 0.0f;
+ result->high.x = 0.0f;
+ result->high.y = 0.0f;
+ return result;
+}
+
+/*
+ * Return increase of original BOX area by new BOX area insertion.
+ */
+static double
+box_penalty(BOX *original, BOX *new)
+{
+ double union_width,
+ union_height;
+
+ union_width = Max(original->high.x, new->high.x) -
+ Min(original->low.x, new->low.x);
+ union_height = Max(original->high.y, new->high.y) -
+ Min(original->low.y, new->low.y);
+ return union_width * union_height - (original->high.x - original->low.x) *
+ (original->high.y - original->low.y);
+}
+
+/*
+ * Compare common entries by their deltas.
+ */
+static int
+common_entry_cmp(const void *i1, const void *i2)
+{
+ double delta1 = ((CommonEntry *) i1)->delta,
+ delta2 = ((CommonEntry *) i2)->delta;
+
+ if (delta1 < delta2)
+ return -1;
+ else if (delta1 > delta2)
+ return 1;
+ else
+ return 0;
+}
+
+/*
+ * --------------------------------------------------------------------------
+ * Double sorting split algorithm. Finds split of boxes by considering splits
+ * along each axis. Searching splits along axes are based on the notions of
+ * split pair and corner split pair.
*
- * New linear algorithm, see 'New Linear Node Splitting Algorithm for R-tree',
- * C.H.Ang and T.C.Tan
+ * Split pair is a pair <a, b> where 'a' is upper bound of left group and 'b'
+ * is lower bound of right group. Lower bound or left group and upper bound of
+ * right group are assumed to be the general bounds along axis.
*
- * This is used for both boxes and points.
+ * Corner split pair is a split pair <a, b> where 'a' is upper bound of some
+ * interval, 'b' is lower bound of some, 'a' can't be lower or 'b' can't be
+ * higher with split pair property preservation.
+ *
+ * Split along axis divides entries to the following groups:
+ * 1) Entries which should be placed to the left group
+ * 2) Entries which should be placed to the right group
+ * 3) "Common entries" which can be placed to any of groups without affecting
+ * of overlap along selected axis.
+ *
+ * Split along axis is selected by overlap along that axis and some other
+ * criteria (see g_box_consider_split). When split along axis is selected,
+ * "common entries" are distributed by penalty with group boxes.
+ *
+ * For details see:
+ * "A new double sorting-based node splitting algorithm for R-tree", A. Korotkov
+ * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36
+ * --------------------------------------------------------------------------
*/
Datum
gist_box_picksplit(PG_FUNCTION_ARGS)
{
GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
- OffsetNumber i;
- OffsetNumber *listL,
- *listR,
- *listB,
- *listT;
- BOX *unionL,
- *unionR,
- *unionB,
- *unionT;
- int posL,
- posR,
- posB,
- posT;
- BOX pageunion;
- BOX *cur;
- char direction = ' ';
- bool allisequal = true;
- OffsetNumber maxoff;
- int nbytes;
+ OffsetNumber i,
+ maxoff;
+ ConsiderSplitContext context;
+ BOX *box,
+ *leftBox,
+ *rightBox;
+ int dim,
+ commonEntriesCount,
+ nbytes;
+ SplitInterval *intervalsLower,
+ *intervalsUpper;
+ CommonEntry *commonEntries;
+ int nentries;
+
+#define PLACE_LEFT(box, off) \
+ do { \
+ if (v->spl_nleft > 0) \
+ adjustBox(leftBox, box); \
+ else \
+ *leftBox = *(box); \
+ v->spl_left[v->spl_nleft++] = off; \
+ } while(0)
+
+#define PLACE_RIGHT(box, off) \
+ do { \
+ if (v->spl_nright > 0) \
+ adjustBox(rightBox, box); \
+ else \
+ *rightBox = *(box); \
+ v->spl_right[v->spl_nright++] = off; \
+ } while(0)
+
+ memset(&context, 0, sizeof(ConsiderSplitContext));
- posL = posR = posB = posT = 0;
maxoff = entryvec->n - 1;
+ nentries = context.entriesCount = maxoff - FirstOffsetNumber + 1;
- cur = DatumGetBoxP(entryvec->vector[FirstOffsetNumber].key);
- memcpy((void *) &pageunion, (void *) cur, sizeof(BOX));
+ /* Allocate arrays for intervals along axes */
+ intervalsLower = (SplitInterval *) palloc(nentries * sizeof(SplitInterval));
+ intervalsUpper = (SplitInterval *) palloc(nentries * sizeof(SplitInterval));
- /* find MBR */
- for (i = OffsetNumberNext(FirstOffsetNumber); i <= maxoff; i = OffsetNumberNext(i))
+ /*
+ * Calculate the overall minimum bounding box over all the entries.
+ */
+ for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
- cur = DatumGetBoxP(entryvec->vector[i].key);
- if (allisequal && (
- pageunion.high.x != cur->high.x ||
- pageunion.high.y != cur->high.y ||
- pageunion.low.x != cur->low.x ||
- pageunion.low.y != cur->low.y
- ))
- allisequal = false;
-
- adjustBox(&pageunion, cur);
+ box = DatumGetBoxP(entryvec->vector[i].key);
+ if (i == FirstOffsetNumber)
+ context.boundingBox = *box;
+ else
+ adjustBox(&context.boundingBox, box);
}
- if (allisequal)
+ /*
+ * Iterate over axes for optimal split searching.
+ */
+ context.first = true; /* nothing selected yet */
+ for (dim = 0; dim < 2; dim++)
{
+ double leftUpper,
+ rightLower;
+ int i1,
+ i2;
+
+ /* Prepare array of intervals along axis */
+ for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+ {
+ box = DatumGetBoxP(entryvec->vector[i].key);
+ if (dim == 0)
+ {
+ intervalsLower[i - FirstOffsetNumber].lower = box->low.x;
+ intervalsLower[i - FirstOffsetNumber].upper = box->high.x;
+ }
+ else
+ {
+ intervalsLower[i - FirstOffsetNumber].lower = box->low.y;
+ intervalsLower[i - FirstOffsetNumber].upper = box->high.y;
+ }
+ }
+
/*
- * All entries are the same
+ * Make two arrays of intervals: sorted by lower bound and sorted by
+ * upper bound.
*/
+ memcpy(intervalsUpper, intervalsLower,
+ sizeof(SplitInterval) * nentries);
+ qsort(intervalsLower, nentries, sizeof(SplitInterval),
+ interval_cmp_lower);
+ qsort(intervalsUpper, nentries, sizeof(SplitInterval),
+ interval_cmp_upper);
+
+ /*
+ * Iterate over lower bound of right group finding corresponding
+ * lowest upper bound of left group.
+ */
+ i1 = 0;
+ i2 = 0;
+ rightLower = intervalsLower[i1].lower;
+ leftUpper = intervalsUpper[i2].lower;
+
+ while (true)
+ {
+ /*
+ * Find next lower bound of right group.
+ */
+ while (i1 < nentries && rightLower == intervalsLower[i1].lower)
+ {
+ leftUpper = Max(leftUpper, intervalsLower[i1].upper);
+ i1++;
+ }
+ if (i1 >= nentries)
+ break;
+ rightLower = intervalsLower[i1].lower;
+
+ /*
+ * Find count of intervals which anyway should be placed to the
+ * left group.
+ */
+ while (i2 < nentries && intervalsUpper[i2].upper <= leftUpper)
+ i2++;
+
+ /*
+ * Consider found split.
+ */
+ g_box_consider_split(&context, dim, rightLower, i1, leftUpper, i2);
+ }
+
+ /*
+ * Iterate over upper bound of left group finding corresponding
+ * highest lower bound of right group.
+ */
+ i1 = nentries - 1;
+ i2 = nentries - 1;
+ rightLower = intervalsLower[i1].upper;
+ leftUpper = intervalsUpper[i2].upper;
+
+ while (true)
+ {
+ /*
+ * Find next upper bound of left group.
+ */
+ while (i2 >= 0 && leftUpper == intervalsUpper[i2].upper)
+ {
+ rightLower = Min(rightLower, intervalsUpper[i2].lower);
+ i2--;
+ }
+ if (i2 < 0)
+ break;
+ leftUpper = intervalsUpper[i2].upper;
+
+ /*
+ * Find count of intervals which anyway should be placed to the
+ * right group.
+ */
+ while (i1 >= 0 && intervalsLower[i1].lower >= rightLower)
+ i1--;
+
+ /*
+ * Consider found split.
+ */
+ g_box_consider_split(&context, dim,
+ rightLower, i1 + 1, leftUpper, i2 + 1);
+ }
+ }
+
+ /*
+ * If we failed to find any acceptable splits, use trivial split.
+ */
+ if (context.first)
+ {
fallbackSplit(entryvec, v);
PG_RETURN_POINTER(v);
}
+ /* Allocate vectors for results */
nbytes = (maxoff + 2) * sizeof(OffsetNumber);
- listL = (OffsetNumber *) palloc(nbytes);
- listR = (OffsetNumber *) palloc(nbytes);
- listB = (OffsetNumber *) palloc(nbytes);
- listT = (OffsetNumber *) palloc(nbytes);
- unionL = (BOX *) palloc(sizeof(BOX));
- unionR = (BOX *) palloc(sizeof(BOX));
- unionB = (BOX *) palloc(sizeof(BOX));
- unionT = (BOX *) palloc(sizeof(BOX));
-
-#define ADDLIST( list, unionD, pos, num ) do { \
- if ( pos ) { \
- if ( (unionD)->high.x < cur->high.x ) (unionD)->high.x = cur->high.x; \
- if ( (unionD)->low.x > cur->low.x ) (unionD)->low.x = cur->low.x; \
- if ( (unionD)->high.y < cur->high.y ) (unionD)->high.y = cur->high.y; \
- if ( (unionD)->low.y > cur->low.y ) (unionD)->low.y = cur->low.y; \
- } else { \
- memcpy( (void*)(unionD), (void*) cur, sizeof( BOX ) ); \
- } \
- (list)[pos] = num; \
- (pos)++; \
-} while(0)
+ v->spl_left = (OffsetNumber *) palloc(nbytes);
+ v->spl_right = (OffsetNumber *) palloc(nbytes);
+ v->spl_nleft = 0;
+ v->spl_nright = 0;
- for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
- {
- cur = DatumGetBoxP(entryvec->vector[i].key);
- if (cur->low.x - pageunion.low.x < pageunion.high.x - cur->high.x)
- ADDLIST(listL, unionL, posL, i);
- else
- ADDLIST(listR, unionR, posR, i);
- if (cur->low.y - pageunion.low.y < pageunion.high.y - cur->high.y)
- ADDLIST(listB, unionB, posB, i);
- else
- ADDLIST(listT, unionT, posT, i);
- }
+ /* Allocate boxes of left and right groups */
+ leftBox = empty_box();
+ rightBox = empty_box();
-#define LIMIT_RATIO 0.1
-#define _IS_BADRATIO(x,y) ( (y) == 0 || (float)(x)/(float)(y) < LIMIT_RATIO )
-#define IS_BADRATIO(x,y) ( _IS_BADRATIO((x),(y)) || _IS_BADRATIO((y),(x)) )
- /* bad disposition, try to split by centers of boxes */
- if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB))
+ /*
+ * Allocate an array for "Common entries" - entries which can be placed
+ * to either group without affecting overlap along selected axis.
+ */
+ commonEntriesCount = 0;
+ commonEntries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry));
+
+ /*
+ * Distribute entries which can be distributed unambiguously, and collect
+ * "common entries".
+ */
+ for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
- double avgCenterX = 0.0,
- avgCenterY = 0.0;
- double CenterX,
- CenterY;
+ double lower,
+ upper;
- for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+ /*
+ * Get upper and lower bounds along selected axis.
+ */
+ box = DatumGetBoxP(entryvec->vector[i].key);
+ if (context.dim == 0)
{
- cur = DatumGetBoxP(entryvec->vector[i].key);
- avgCenterX += ((double) cur->high.x + (double) cur->low.x) / 2.0;
- avgCenterY += ((double) cur->high.y + (double) cur->low.y) / 2.0;
+ lower = box->low.x;
+ upper = box->high.x;
}
-
- avgCenterX /= maxoff;
- avgCenterY /= maxoff;
-
- posL = posR = posB = posT = 0;
- for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+ else
{
- cur = DatumGetBoxP(entryvec->vector[i].key);
-
- CenterX = ((double) cur->high.x + (double) cur->low.x) / 2.0;
- CenterY = ((double) cur->high.y + (double) cur->low.y) / 2.0;
+ lower = box->low.y;
+ upper = box->high.y;
+ }
- if (CenterX < avgCenterX)
- ADDLIST(listL, unionL, posL, i);
- else if (CenterX == avgCenterX)
+ if (upper <= context.leftUpper)
+ {
+ /* Fits to the left group */
+ if (lower >= context.rightLower)
{
- if (posL > posR)
- ADDLIST(listR, unionR, posR, i);
- else
- ADDLIST(listL, unionL, posL, i);
+ /* Fits also to the right group, so "common entry" */
+ commonEntries[commonEntriesCount++].index = i;
}
else
- ADDLIST(listR, unionR, posR, i);
-
- if (CenterY < avgCenterY)
- ADDLIST(listB, unionB, posB, i);
- else if (CenterY == avgCenterY)
{
- if (posB > posT)
- ADDLIST(listT, unionT, posT, i);
- else
- ADDLIST(listB, unionB, posB, i);
+ /* Doesn't fit to the right group, so join to the left group */
+ PLACE_LEFT(box, i);
}
- else
- ADDLIST(listT, unionT, posT, i);
}
-
- if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB))
+ else
{
- fallbackSplit(entryvec, v);
- PG_RETURN_POINTER(v);
+ /*
+ * Each entry should fit on either left or right group, so since
+ * this didn't fit on the left group, it better fit in the right
+ * group.
+ */
+ Assert(lower >= context.rightLower);
+
+ /* Doesn't fit to the left group, so join to the right group */
+ PLACE_RIGHT(box, i);
}
}
- /* which split more optimal? */
- if (Max(posL, posR) < Max(posB, posT))
- direction = 'x';
- else if (Max(posL, posR) > Max(posB, posT))
- direction = 'y';
- else
+ /*
+ * Distribute "common entries", if any
+ */
+ if (commonEntriesCount > 0)
{
- Datum interLR = DirectFunctionCall2(rt_box_inter,
- BoxPGetDatum(unionL),
- BoxPGetDatum(unionR));
- Datum interBT = DirectFunctionCall2(rt_box_inter,
- BoxPGetDatum(unionB),
- BoxPGetDatum(unionT));
- double sizeLR,
- sizeBT;
-
- sizeLR = size_box(interLR);
- sizeBT = size_box(interBT);
-
- if (sizeLR < sizeBT)
- direction = 'x';
- else
- direction = 'y';
- }
+ /*
+ * Calculate minimum number of entries that must be placed in both
+ * groups, to reach LIMIT_RATIO.
+ */
+ int m = ceil(LIMIT_RATIO * (double) nentries);
- if (direction == 'x')
- chooseLR(v,
- listL, posL, unionL,
- listR, posR, unionR);
- else
- chooseLR(v,
- listB, posB, unionB,
- listT, posT, unionT);
+ /*
+ * Calculate delta between penalties of join "common entries" to
+ * different groups.
+ */
+ for (i = 0; i < commonEntriesCount; i++)
+ {
+ box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key);
+ commonEntries[i].delta = Abs(box_penalty(leftBox, box) -
+ box_penalty(rightBox, box));
+ }
+
+ /*
+ * Sort "common entries" by calculated deltas in order to distribute
+ * the most ambiguous entries first.
+ */
+ qsort(commonEntries, commonEntriesCount, sizeof(CommonEntry), common_entry_cmp);
+
+ /*
+ * Distribute "common entries" between groups.
+ */
+ for (i = 0; i < commonEntriesCount; i++)
+ {
+ box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key);
+
+ /*
+ * Check if we have to place this entry in either group to achieve
+ * LIMIT_RATIO.
+ */
+ if (v->spl_nleft + (commonEntriesCount - i) <= m)
+ PLACE_LEFT(box, commonEntries[i].index);
+ else if (v->spl_nright + (commonEntriesCount - i) <= m)
+ PLACE_RIGHT(box, commonEntries[i].index);
+ else
+ {
+ /* Otherwise select the group by minimal penalty */
+ if (box_penalty(leftBox, box) < box_penalty(rightBox, box))
+ PLACE_LEFT(box, commonEntries[i].index);
+ else
+ PLACE_RIGHT(box, commonEntries[i].index);
+ }
+ }
+ }
+ v->spl_ldatum = PointerGetDatum(leftBox);
+ v->spl_rdatum = PointerGetDatum(rightBox);
PG_RETURN_POINTER(v);
}
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers