On Tue, Oct 4, 2011 at 1:46 PM, Heikki Linnakangas < heikki.linnakan...@enterprisedb.com> wrote:
> Ok. Could you phrase that as a code comment? > > Here's a version of the patch I've been working on. There's no functional > changes, just a lot of moving things around, comment changes, etc. to > hopefully make it more readable. Thanks for your work on this patch. Patch with comment is attached. ------ With best regards, Alexander Korotkov.
*** a/src/backend/access/gist/gistproc.c --- b/src/backend/access/gist/gistproc.c *************** *** 26,31 **** static bool gist_box_leaf_consistent(BOX *key, BOX *query, --- 26,35 ---- static double size_box(Datum dbox); static bool rtree_internal_consistent(BOX *key, BOX *query, StrategyNumber strategy); + static BOX *empty_box(void); + + /* Minimal possible ratio of split */ + #define LIMIT_RATIO 0.3 /************************************************** *************** *** 49,78 **** rt_box_union(PG_FUNCTION_ARGS) PG_RETURN_BOX_P(n); } - static Datum - rt_box_inter(PG_FUNCTION_ARGS) - { - BOX *a = PG_GETARG_BOX_P(0); - BOX *b = PG_GETARG_BOX_P(1); - BOX *n; - - n = (BOX *) palloc(sizeof(BOX)); - - n->high.x = Min(a->high.x, b->high.x); - n->high.y = Min(a->high.y, b->high.y); - n->low.x = Max(a->low.x, b->low.x); - n->low.y = Max(a->low.y, b->low.y); - - if (n->high.x < n->low.x || n->high.y < n->low.y) - { - pfree(n); - /* Indicate "no intersection" by returning NULL pointer */ - n = NULL; - } - - PG_RETURN_BOX_P(n); - } - /* * The GiST Consistent method for boxes * --- 53,58 ---- *************** *** 194,279 **** gist_box_penalty(PG_FUNCTION_ARGS) PG_RETURN_POINTER(result); } - static void - chooseLR(GIST_SPLITVEC *v, - OffsetNumber *list1, int nlist1, BOX *union1, - OffsetNumber *list2, int nlist2, BOX *union2) - { - bool firstToLeft = true; - - if (v->spl_ldatum_exists || v->spl_rdatum_exists) - { - if (v->spl_ldatum_exists && v->spl_rdatum_exists) - { - BOX LRl = *union1, - LRr = *union2; - BOX RLl = *union2, - RLr = *union1; - double sizeLR, - sizeRL; - - adjustBox(&LRl, DatumGetBoxP(v->spl_ldatum)); - adjustBox(&LRr, DatumGetBoxP(v->spl_rdatum)); - adjustBox(&RLl, DatumGetBoxP(v->spl_ldatum)); - adjustBox(&RLr, DatumGetBoxP(v->spl_rdatum)); - - sizeLR = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&LRl), BoxPGetDatum(&LRr))); - sizeRL = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&RLl), BoxPGetDatum(&RLr))); - - if (sizeLR > sizeRL) - firstToLeft = false; - - } - else - { - float p1, - p2; - GISTENTRY oldUnion, - addon; - - gistentryinit(oldUnion, (v->spl_ldatum_exists) ? v->spl_ldatum : v->spl_rdatum, - NULL, NULL, InvalidOffsetNumber, FALSE); - - gistentryinit(addon, BoxPGetDatum(union1), NULL, NULL, InvalidOffsetNumber, FALSE); - DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p1)); - gistentryinit(addon, BoxPGetDatum(union2), NULL, NULL, InvalidOffsetNumber, FALSE); - DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p2)); - - if ((v->spl_ldatum_exists && p1 > p2) || (v->spl_rdatum_exists && p1 < p2)) - firstToLeft = false; - } - } - - if (firstToLeft) - { - v->spl_left = list1; - v->spl_right = list2; - v->spl_nleft = nlist1; - v->spl_nright = nlist2; - if (v->spl_ldatum_exists) - adjustBox(union1, DatumGetBoxP(v->spl_ldatum)); - v->spl_ldatum = BoxPGetDatum(union1); - if (v->spl_rdatum_exists) - adjustBox(union2, DatumGetBoxP(v->spl_rdatum)); - v->spl_rdatum = BoxPGetDatum(union2); - } - else - { - v->spl_left = list2; - v->spl_right = list1; - v->spl_nleft = nlist2; - v->spl_nright = nlist1; - if (v->spl_ldatum_exists) - adjustBox(union2, DatumGetBoxP(v->spl_ldatum)); - v->spl_ldatum = BoxPGetDatum(union2); - if (v->spl_rdatum_exists) - adjustBox(union1, DatumGetBoxP(v->spl_rdatum)); - v->spl_rdatum = BoxPGetDatum(union1); - } - - v->spl_ldatum_exists = v->spl_rdatum_exists = false; - } - /* * Trivial split: half of entries will be placed on one page * and another half - to another --- 174,179 ---- *************** *** 338,536 **** fallbackSplit(GistEntryVector *entryvec, GIST_SPLITVEC *v) } /* ! * The GiST PickSplit method * ! * New linear algorithm, see 'New Linear Node Splitting Algorithm for R-tree', ! * C.H.Ang and T.C.Tan * ! * This is used for both boxes and points. */ Datum gist_box_picksplit(PG_FUNCTION_ARGS) { GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0); GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1); ! OffsetNumber i; ! OffsetNumber *listL, ! *listR, ! *listB, ! *listT; ! BOX *unionL, ! *unionR, ! *unionB, ! *unionT; ! int posL, ! posR, ! posB, ! posT; ! BOX pageunion; ! BOX *cur; ! char direction = ' '; ! bool allisequal = true; ! OffsetNumber maxoff; ! int nbytes; - posL = posR = posB = posT = 0; maxoff = entryvec->n - 1; ! cur = DatumGetBoxP(entryvec->vector[FirstOffsetNumber].key); ! memcpy((void *) &pageunion, (void *) cur, sizeof(BOX)); ! /* find MBR */ ! for (i = OffsetNumberNext(FirstOffsetNumber); i <= maxoff; i = OffsetNumberNext(i)) { ! cur = DatumGetBoxP(entryvec->vector[i].key); ! if (allisequal && ( ! pageunion.high.x != cur->high.x || ! pageunion.high.y != cur->high.y || ! pageunion.low.x != cur->low.x || ! pageunion.low.y != cur->low.y ! )) ! allisequal = false; ! ! adjustBox(&pageunion, cur); } ! if (allisequal) { /* ! * All entries are the same */ fallbackSplit(entryvec, v); PG_RETURN_POINTER(v); } nbytes = (maxoff + 2) * sizeof(OffsetNumber); ! listL = (OffsetNumber *) palloc(nbytes); ! listR = (OffsetNumber *) palloc(nbytes); ! listB = (OffsetNumber *) palloc(nbytes); ! listT = (OffsetNumber *) palloc(nbytes); ! unionL = (BOX *) palloc(sizeof(BOX)); ! unionR = (BOX *) palloc(sizeof(BOX)); ! unionB = (BOX *) palloc(sizeof(BOX)); ! unionT = (BOX *) palloc(sizeof(BOX)); ! ! #define ADDLIST( list, unionD, pos, num ) do { \ ! if ( pos ) { \ ! if ( (unionD)->high.x < cur->high.x ) (unionD)->high.x = cur->high.x; \ ! if ( (unionD)->low.x > cur->low.x ) (unionD)->low.x = cur->low.x; \ ! if ( (unionD)->high.y < cur->high.y ) (unionD)->high.y = cur->high.y; \ ! if ( (unionD)->low.y > cur->low.y ) (unionD)->low.y = cur->low.y; \ ! } else { \ ! memcpy( (void*)(unionD), (void*) cur, sizeof( BOX ) ); \ ! } \ ! (list)[pos] = num; \ ! (pos)++; \ ! } while(0) ! for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) ! { ! cur = DatumGetBoxP(entryvec->vector[i].key); ! if (cur->low.x - pageunion.low.x < pageunion.high.x - cur->high.x) ! ADDLIST(listL, unionL, posL, i); ! else ! ADDLIST(listR, unionR, posR, i); ! if (cur->low.y - pageunion.low.y < pageunion.high.y - cur->high.y) ! ADDLIST(listB, unionB, posB, i); ! else ! ADDLIST(listT, unionT, posT, i); ! } ! #define LIMIT_RATIO 0.1 ! #define _IS_BADRATIO(x,y) ( (y) == 0 || (float)(x)/(float)(y) < LIMIT_RATIO ) ! #define IS_BADRATIO(x,y) ( _IS_BADRATIO((x),(y)) || _IS_BADRATIO((y),(x)) ) ! /* bad disposition, try to split by centers of boxes */ ! if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB)) { ! double avgCenterX = 0.0, ! avgCenterY = 0.0; ! double CenterX, ! CenterY; ! for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { ! cur = DatumGetBoxP(entryvec->vector[i].key); ! avgCenterX += ((double) cur->high.x + (double) cur->low.x) / 2.0; ! avgCenterY += ((double) cur->high.y + (double) cur->low.y) / 2.0; } ! ! avgCenterX /= maxoff; ! avgCenterY /= maxoff; ! ! posL = posR = posB = posT = 0; ! for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { ! cur = DatumGetBoxP(entryvec->vector[i].key); ! ! CenterX = ((double) cur->high.x + (double) cur->low.x) / 2.0; ! CenterY = ((double) cur->high.y + (double) cur->low.y) / 2.0; ! if (CenterX < avgCenterX) ! ADDLIST(listL, unionL, posL, i); ! else if (CenterX == avgCenterX) { ! if (posL > posR) ! ADDLIST(listR, unionR, posR, i); ! else ! ADDLIST(listL, unionL, posL, i); } else - ADDLIST(listR, unionR, posR, i); - - if (CenterY < avgCenterY) - ADDLIST(listB, unionB, posB, i); - else if (CenterY == avgCenterY) { ! if (posB > posT) ! ADDLIST(listT, unionT, posT, i); ! else ! ADDLIST(listB, unionB, posB, i); } - else - ADDLIST(listT, unionT, posT, i); } ! ! if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB)) { ! fallbackSplit(entryvec, v); ! PG_RETURN_POINTER(v); } } ! /* which split more optimal? */ ! if (Max(posL, posR) < Max(posB, posT)) ! direction = 'x'; ! else if (Max(posL, posR) > Max(posB, posT)) ! direction = 'y'; ! else { ! Datum interLR = DirectFunctionCall2(rt_box_inter, ! BoxPGetDatum(unionL), ! BoxPGetDatum(unionR)); ! Datum interBT = DirectFunctionCall2(rt_box_inter, ! BoxPGetDatum(unionB), ! BoxPGetDatum(unionT)); ! double sizeLR, ! sizeBT; ! ! sizeLR = size_box(interLR); ! sizeBT = size_box(interBT); ! ! if (sizeLR < sizeBT) ! direction = 'x'; ! else ! direction = 'y'; ! } ! if (direction == 'x') ! chooseLR(v, ! listL, posL, unionL, ! listR, posR, unionR); ! else ! chooseLR(v, ! listB, posB, unionB, ! listT, posT, unionT); PG_RETURN_POINTER(v); } --- 238,825 ---- } /* ! * Represents information about entry which can be present to any group without ! * affecting overlap over selected axis ("common entry"). ! */ ! typedef struct ! { ! /* Index of entry in the initial array */ ! int index; ! /* Delta between penalties of entry insertion into different groups */ ! double delta; ! } CommonEntry; ! ! /* ! * Context for g_box_consider_split. Contains information about currently ! * selected split and some general information. ! */ ! typedef struct ! { ! /* number of splitting entries */ ! int entriesCount; ! /* minimum bounding box across all entries */ ! BOX boundingBox; ! ! /* Information about currently selected split follows */ ! ! bool first; /* true if no split was selected yet */ ! ! double leftUpper; /* upper bound of left interval */ ! double rightLower; /* lower bound of right interval */ ! ! float4 ratio; ! float4 overlap; ! int dim; /* axis of this split */ ! double range; /* width of general MBR projection to the selected axis */ ! } ConsiderSplitContext; ! ! /* ! * Interval represents projection of box to axis. ! */ ! typedef struct ! { ! double lower, ! upper; ! } SplitInterval; ! ! /* ! * Interval comparison function by lower bound of the interval; ! */ ! static int ! interval_cmp_lower(const void *i1, const void *i2) ! { ! double lower1 = ((SplitInterval *) i1)->lower, ! lower2 = ((SplitInterval *) i2)->lower; ! ! if (lower1 < lower2) ! return -1; ! else if (lower1 > lower2) ! return 1; ! else ! return 0; ! } ! ! /* ! * Interval comparison function by upper bound of the interval; ! */ ! static int ! interval_cmp_upper(const void *i1, const void *i2) ! { ! double upper1 = ((SplitInterval *) i1)->upper, ! upper2 = ((SplitInterval *) i2)->upper; ! ! if (upper1 < upper2) ! return -1; ! else if (upper1 > upper2) ! return 1; ! else ! return 0; ! } ! ! /* ! * Replace negative value with zero. ! */ ! static inline float ! non_negative(float val) ! { ! if (val >= 0.0f) ! return val; ! else ! return 0.0f; ! } ! ! /* ! * Consider replacement of currently selected split with the better one. ! */ ! static void inline ! g_box_consider_split(ConsiderSplitContext *context, int dimNum, ! double rightLower, int minLeftCount, ! double leftUpper, int maxLeftCount) ! { ! int leftCount, ! rightCount; ! float4 ratio, ! overlap; ! double range; ! ! /* ! * Calculate entries distribution ration assuming most uniform ! * distribution of common entries. ! */ ! if (minLeftCount >= (context->entriesCount + 1) / 2) ! { ! leftCount = minLeftCount; ! } ! else ! { ! if (maxLeftCount <= context->entriesCount / 2) ! leftCount = maxLeftCount; ! else ! leftCount = context->entriesCount / 2; ! } ! rightCount = context->entriesCount - leftCount; ! ! /* ! * Ratio of split - quotient between size of lesser group and total entries ! * count. ! */ ! ratio = ((float4) Min(leftCount, rightCount)) / ! ((float4) context->entriesCount); ! ! if (ratio > LIMIT_RATIO) ! { ! bool selectthis = false; ! ! /* ! * The ratio is acceptable, so compare current split with previously ! * selected one. Between splits of one dimension we search for ! * minimal overlap (allowing negative values) and minimal ration ! * (between same overlaps. We switch dimension if find less overlap ! * (non-negative) or less range with same overlap. ! */ ! if (dimNum == 0) ! range = context->boundingBox.high.x - context->boundingBox.low.x; ! else ! range = context->boundingBox.high.y - context->boundingBox.low.y; ! ! overlap = (leftUpper - rightLower) / range; ! ! /* If there is no previous selection, select this */ ! if (context->first) ! selectthis = true; ! else if (context->dim == dimNum) ! { ! /* ! * Within the same dimension, choose the new split if it has a ! * smaller overlap, or same overlap but better ratio. ! */ ! if (overlap < context->overlap || ! (overlap == context->overlap && ratio > context->ratio)) ! selectthis = true; ! } ! else ! { ! /* ! * Across dimensions, choose the new split if it has a ! * smaller *non-negative* overlap, or same *non-negative* overlap ! * but bigger range. This condition differs from described in the ! * article. On the datasets where leaf MBRs don't overlap ! * themselves, non-overlapping splits (i.e. splits which have zero ! * *non-negative* overlap) are frequently possible. In this case ! * splits tends to be along one dimension, because most distant ! * non-overlapping splits (i.e. having lowest negative overlap) ! * appears to be in the same dimension as in the previous split. ! * Therefore MBRs appear to be very prolonged along another ! * dimension, that leads to bad search performance. Using range ! * as the second split criteria makes MBRs more quadratic. Using ! * *non-negative* overlap instead of overlap as the first split ! * criteria gives to range criteria chance to matter, because ! * non-overlapping splits are equivalent in this criteria. ! */ ! if (non_negative(overlap) < non_negative(context->overlap) || ! (range > context->range && ! non_negative(overlap) <= non_negative(context->overlap))) ! selectthis = true; ! } ! ! if (selectthis) ! { ! /* save information about selected split */ ! context->first = false; ! context->ratio = ratio; ! context->range = range; ! context->overlap = overlap; ! context->rightLower = rightLower; ! context->leftUpper = leftUpper; ! context->dim = dimNum; ! } ! } ! } ! ! /* ! * Create new empty BOX ! */ ! static BOX * ! empty_box(void) ! { ! BOX *result; ! ! result = (BOX *) palloc(sizeof(BOX)); ! result->low.x = 0.0f; ! result->low.y = 0.0f; ! result->high.x = 0.0f; ! result->high.y = 0.0f; ! return result; ! } ! ! /* ! * Return increase of original BOX area by new BOX area insertion. ! */ ! static double ! box_penalty(BOX *original, BOX *new) ! { ! double union_width, ! union_height; ! ! union_width = Max(original->high.x, new->high.x) - ! Min(original->low.x, new->low.x); ! union_height = Max(original->high.y, new->high.y) - ! Min(original->low.y, new->low.y); ! return union_width * union_height - (original->high.x - original->low.x) * ! (original->high.y - original->low.y); ! } ! ! /* ! * Compare common entries by their deltas. ! */ ! static int ! common_entry_cmp(const void *i1, const void *i2) ! { ! double delta1 = ((CommonEntry *) i1)->delta, ! delta2 = ((CommonEntry *) i2)->delta; ! ! if (delta1 < delta2) ! return -1; ! else if (delta1 > delta2) ! return 1; ! else ! return 0; ! } ! ! /* ! * -------------------------------------------------------------------------- ! * Double sorting split algorithm. Finds split of boxes by considering splits ! * along each axis. Searching splits along axes are based on the notions of ! * split pair and corner split pair. * ! * Split pair is a pair <a, b> where 'a' is upper bound of left group and 'b' ! * is lower bound of right group. Lower bound or left group and upper bound of ! * right group are assumed to be the general bounds along axis. * ! * Corner split pair is a split pair <a, b> where 'a' is upper bound of some ! * interval, 'b' is lower bound of some, 'a' can't be lower or 'b' can't be ! * higher with split pair property preservation. ! * ! * Split along axis divides entries to the following groups: ! * 1) Entries which should be placed to the left group ! * 2) Entries which should be placed to the right group ! * 3) "Common entries" which can be placed to any of groups without affecting ! * of overlap along selected axis. ! * ! * Split along axis is selected by overlap along that axis and some other ! * criteria (see g_box_consider_split). When split along axis is selected, ! * "common entries" are distributed by penalty with group boxes. ! * ! * For details see: ! * "A new double sorting-based node splitting algorithm for R-tree", A. Korotkov ! * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36 ! * -------------------------------------------------------------------------- */ Datum gist_box_picksplit(PG_FUNCTION_ARGS) { GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0); GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1); ! OffsetNumber i, ! maxoff; ! ConsiderSplitContext context; ! BOX *box, ! *leftBox, ! *rightBox; ! int dim, ! commonEntriesCount, ! nbytes; ! SplitInterval *intervalsLower, ! *intervalsUpper; ! CommonEntry *commonEntries; ! int nentries; ! ! #define PLACE_LEFT(box, off) \ ! do { \ ! if (v->spl_nleft > 0) \ ! adjustBox(leftBox, box); \ ! else \ ! *leftBox = *(box); \ ! v->spl_left[v->spl_nleft++] = off; \ ! } while(0) ! ! #define PLACE_RIGHT(box, off) \ ! do { \ ! if (v->spl_nright > 0) \ ! adjustBox(rightBox, box); \ ! else \ ! *rightBox = *(box); \ ! v->spl_right[v->spl_nright++] = off; \ ! } while(0) ! ! memset(&context, 0, sizeof(ConsiderSplitContext)); maxoff = entryvec->n - 1; + nentries = context.entriesCount = maxoff - FirstOffsetNumber + 1; ! /* Allocate arrays for intervals along axes */ ! intervalsLower = (SplitInterval *) palloc(nentries * sizeof(SplitInterval)); ! intervalsUpper = (SplitInterval *) palloc(nentries * sizeof(SplitInterval)); ! /* ! * Calculate the overall minimum bounding box over all the entries. ! */ ! for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { ! box = DatumGetBoxP(entryvec->vector[i].key); ! if (i == FirstOffsetNumber) ! context.boundingBox = *box; ! else ! adjustBox(&context.boundingBox, box); } ! /* ! * Iterate over axes for optimal split searching. ! */ ! context.first = true; /* nothing selected yet */ ! for (dim = 0; dim < 2; dim++) { + double leftUpper, + rightLower; + int i1, + i2; + + /* Prepare array of intervals along axis */ + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + { + box = DatumGetBoxP(entryvec->vector[i].key); + if (dim == 0) + { + intervalsLower[i - FirstOffsetNumber].lower = box->low.x; + intervalsLower[i - FirstOffsetNumber].upper = box->high.x; + } + else + { + intervalsLower[i - FirstOffsetNumber].lower = box->low.y; + intervalsLower[i - FirstOffsetNumber].upper = box->high.y; + } + } + /* ! * Make two arrays of intervals: sorted by lower bound and sorted by ! * upper bound. */ + memcpy(intervalsUpper, intervalsLower, + sizeof(SplitInterval) * nentries); + qsort(intervalsLower, nentries, sizeof(SplitInterval), + interval_cmp_lower); + qsort(intervalsUpper, nentries, sizeof(SplitInterval), + interval_cmp_upper); + + /* + * Iterate over lower bound of right group finding corresponding + * lowest upper bound of left group. + */ + i1 = 0; + i2 = 0; + rightLower = intervalsLower[i1].lower; + leftUpper = intervalsUpper[i2].lower; + + while (true) + { + /* + * Find next lower bound of right group. + */ + while (i1 < nentries && rightLower == intervalsLower[i1].lower) + { + leftUpper = Max(leftUpper, intervalsLower[i1].upper); + i1++; + } + if (i1 >= nentries) + break; + rightLower = intervalsLower[i1].lower; + + /* + * Find count of intervals which anyway should be placed to the + * left group. + */ + while (i2 < nentries && intervalsUpper[i2].upper <= leftUpper) + i2++; + + /* + * Consider found split. + */ + g_box_consider_split(&context, dim, rightLower, i1, leftUpper, i2); + } + + /* + * Iterate over upper bound of left group finding corresponding + * highest lower bound of right group. + */ + i1 = nentries - 1; + i2 = nentries - 1; + rightLower = intervalsLower[i1].upper; + leftUpper = intervalsUpper[i2].upper; + + while (true) + { + /* + * Find next upper bound of left group. + */ + while (i2 >= 0 && leftUpper == intervalsUpper[i2].upper) + { + rightLower = Min(rightLower, intervalsUpper[i2].lower); + i2--; + } + if (i2 < 0) + break; + leftUpper = intervalsUpper[i2].upper; + + /* + * Find count of intervals which anyway should be placed to the + * right group. + */ + while (i1 >= 0 && intervalsLower[i1].lower >= rightLower) + i1--; + + /* + * Consider found split. + */ + g_box_consider_split(&context, dim, + rightLower, i1 + 1, leftUpper, i2 + 1); + } + } + + /* + * If we failed to find any acceptable splits, use trivial split. + */ + if (context.first) + { fallbackSplit(entryvec, v); PG_RETURN_POINTER(v); } + /* Allocate vectors for results */ nbytes = (maxoff + 2) * sizeof(OffsetNumber); ! v->spl_left = (OffsetNumber *) palloc(nbytes); ! v->spl_right = (OffsetNumber *) palloc(nbytes); ! v->spl_nleft = 0; ! v->spl_nright = 0; ! /* Allocate boxes of left and right groups */ ! leftBox = empty_box(); ! rightBox = empty_box(); ! /* ! * Allocate an array for "Common entries" - entries which can be placed ! * to either group without affecting overlap along selected axis. ! */ ! commonEntriesCount = 0; ! commonEntries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry)); ! ! /* ! * Distribute entries which can be distributed unambiguously, and collect ! * "common entries". ! */ ! for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { ! double lower, ! upper; ! /* ! * Get upper and lower bounds along selected axis. ! */ ! box = DatumGetBoxP(entryvec->vector[i].key); ! if (context.dim == 0) { ! lower = box->low.x; ! upper = box->high.x; } ! else { ! lower = box->low.y; ! upper = box->high.y; ! } ! if (upper <= context.leftUpper) ! { ! /* Fits to the left group */ ! if (lower >= context.rightLower) { ! /* Fits also to the right group, so "common entry" */ ! commonEntries[commonEntriesCount++].index = i; } else { ! /* Doesn't fit to the right group, so join to the left group */ ! PLACE_LEFT(box, i); } } ! else { ! /* ! * Each entry should fit on either left or right group, so since ! * this didn't fit on the left group, it better fit in the right ! * group. ! */ ! Assert(lower >= context.rightLower); ! ! /* Doesn't fit to the left group, so join to the right group */ ! PLACE_RIGHT(box, i); } } ! /* ! * Distribute "common entries", if any ! */ ! if (commonEntriesCount > 0) { ! /* ! * Calculate minimum number of entries that must be placed in both ! * groups, to reach LIMIT_RATIO. ! */ ! int m = ceil(LIMIT_RATIO * (double) nentries); ! /* ! * Calculate delta between penalties of join "common entries" to ! * different groups. ! */ ! for (i = 0; i < commonEntriesCount; i++) ! { ! box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key); ! commonEntries[i].delta = Abs(box_penalty(leftBox, box) - ! box_penalty(rightBox, box)); ! } ! ! /* ! * Sort "common entries" by calculated deltas in order to distribute ! * the most ambiguous entries first. ! */ ! qsort(commonEntries, commonEntriesCount, sizeof(CommonEntry), common_entry_cmp); ! ! /* ! * Distribute "common entries" between groups. ! */ ! for (i = 0; i < commonEntriesCount; i++) ! { ! box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key); ! ! /* ! * Check if we have to place this entry in either group to achieve ! * LIMIT_RATIO. ! */ ! if (v->spl_nleft + (commonEntriesCount - i) <= m) ! PLACE_LEFT(box, commonEntries[i].index); ! else if (v->spl_nright + (commonEntriesCount - i) <= m) ! PLACE_RIGHT(box, commonEntries[i].index); ! else ! { ! /* Otherwise select the group by minimal penalty */ ! if (box_penalty(leftBox, box) < box_penalty(rightBox, box)) ! PLACE_LEFT(box, commonEntries[i].index); ! else ! PLACE_RIGHT(box, commonEntries[i].index); ! } ! } ! } + v->spl_ldatum = PointerGetDatum(leftBox); + v->spl_rdatum = PointerGetDatum(rightBox); PG_RETURN_POINTER(v); }
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers