Hi all,

A couple of days ago I've shared a WIP patch [1] implementing BRIN
indexes based on bloom filters. One inherent limitation of that approach
is that it can only support equality conditions - that's perfectly fine
in many cases (e.g. with UUIDs it's rare to use range queries, etc.).

[1]
https://www.postgresql.org/message-id/5d78b774-7e9c-c94e-12cf-fef51cc89b1a%402ndquadrant.com

But in other cases that restriction is pretty unacceptable, e.g. with
timestamps that are queried mostly using range conditions. A common
issue is that while the data is initially well correlated (giving us
nice narrow min/max ranges in the BRIN index), this degrades over time
(typically due to DELETE/UPDATE and then new rows routed to free space).
There are not many options to prevent this, and fixing it pretty much
requires CLUSTER on the table.

This patch addresses this by BRIN indexes with more complex "summary".
Instead of keeping just a single "minmax interval", we maintain a list
of "minmax intervals", which allows us to track "gaps" in the data.

To illustrate the improvement, consider this table:

    create table a (val float8) with (fillfactor = 90);
    insert into a select i::float from generate_series(1,10000000) s(i);
    update a set val = 1 where random() < 0.01;
    update a set val = 10000000 where random() < 0.01;

Which means the column 'val' is almost perfectly correlated with the
position in the table (which would be great for BRIN minmax indexes),
but then 1% of the values is set to 1 and 10.000.000. That means pretty
much every range will be [1,10000000], which makes this BRIN index
mostly useless, as illustrated by these explain plans:

    create index on a using brin (val) with (pages_per_range = 16);

    explain analyze select * from a where val = 100;
                                  QUERY PLAN
    --------------------------------------------------------------------
     Bitmap Heap Scan on a  (cost=54.01..10691.02 rows=8 width=8)
                            (actual time=5.901..785.520 rows=1 loops=1)
       Recheck Cond: (val = '100'::double precision)
       Rows Removed by Index Recheck: 9999999
       Heap Blocks: lossy=49020
       ->  Bitmap Index Scan on a_val_idx
             (cost=0.00..54.00 rows=3400 width=0)
             (actual time=5.792..5.792 rows=490240 loops=1)
             Index Cond: (val = '100'::double precision)
     Planning time: 0.119 ms
     Execution time: 785.583 ms
    (8 rows)

    explain analyze select * from a where val between 100 and 10000;
                                  QUERY PLAN
    ------------------------------------------------------------------
     Bitmap Heap Scan on a  (cost=55.94..25132.00 rows=7728 width=8)
                      (actual time=5.939..858.125 rows=9695 loops=1)
       Recheck Cond: ((val >= '100'::double precision) AND
                      (val <= '10000'::double precision))
       Rows Removed by Index Recheck: 9990305
       Heap Blocks: lossy=49020
       ->  Bitmap Index Scan on a_val_idx
             (cost=0.00..54.01 rows=10200 width=0)
             (actual time=5.831..5.831 rows=490240 loops=1)
             Index Cond: ((val >= '100'::double precision) AND
                          (val <= '10000'::double precision))
     Planning time: 0.139 ms
     Execution time: 871.132 ms
    (8 rows)

Obviously, the queries do scan the whole table and then eliminate most
of the rows in "Index Recheck". Decreasing pages_per_range does not
really make a measurable difference in this case - it eliminates maybe
10% of the rechecks, but most pages still have very wide minmax range.

With the patch, it looks about like this:

    create index on a using brin (val float8_minmax_multi_ops)
                            with (pages_per_range = 16);

    explain analyze select * from a where val = 100;
                                  QUERY PLAN
    -------------------------------------------------------------------
     Bitmap Heap Scan on a  (cost=830.01..11467.02 rows=8 width=8)
                            (actual time=7.772..8.533 rows=1 loops=1)
       Recheck Cond: (val = '100'::double precision)
       Rows Removed by Index Recheck: 3263
       Heap Blocks: lossy=16
       ->  Bitmap Index Scan on a_val_idx
             (cost=0.00..830.00 rows=3400 width=0)
             (actual time=7.729..7.729 rows=160 loops=1)
             Index Cond: (val = '100'::double precision)
     Planning time: 0.124 ms
     Execution time: 8.580 ms
    (8 rows)


    explain analyze select * from a where val between 100 and 10000;
                                 QUERY PLAN
    ------------------------------------------------------------------
     Bitmap Heap Scan on a  (cost=831.94..25908.00 rows=7728 width=8)
                        (actual time=9.318..23.715 rows=9695 loops=1)
       Recheck Cond: ((val >= '100'::double precision) AND
                      (val <= '10000'::double precision))
       Rows Removed by Index Recheck: 3361
       Heap Blocks: lossy=64
       ->  Bitmap Index Scan on a_val_idx
             (cost=0.00..830.01 rows=10200 width=0)
             (actual time=9.274..9.274 rows=640 loops=1)
             Index Cond: ((val >= '100'::double precision) AND
                          (val <= '10000'::double precision))
     Planning time: 0.138 ms
     Execution time: 36.100 ms
    (8 rows)

That is, the timings drop from 785ms/871ms to 9ms/36s. The index is a
bit larger (1700kB instead of 150kB), but it's still orders of
magnitudes smaller than btree index (which is ~214MB in this case).

The index build is slower than the regular BRIN indexes (about
comparable to btree), but I'm sure it can be significantly improved.
Also, I'm sure it's not bug-free.

Two additional notes:

1) The patch does break the current BRIN indexes, because it requires
passing all SearchKeys to the "consistent" BRIN function at once
(otherwise we couldn't eliminate individual intervals in the summary),
while currently the BRIN only deals with one SearchKey at a time. And I
haven't modified the existing brin_minmax_consistent() function (yeah,
I'm lazy, but this should be enough for interested people to try it out
I believe).

2) It only works with float8 (and also timestamp data types) for now,
but it should be straightforward to add support for additional data
types. Most of that will be about adding catalog definitions anyway.


regards


-- 
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/access/brin/brin_minmax_multi.c b/src/backend/access/brin/brin_minmax_multi.c
index 6ec3297..94d696e 100644
--- a/src/backend/access/brin/brin_minmax_multi.c
+++ b/src/backend/access/brin/brin_minmax_multi.c
@@ -29,9 +29,6 @@ typedef struct MinmaxMultiOpaque
 	FmgrInfo	strategy_procinfos[BTMaxStrategyNumber];
 } MinmaxMultiOpaque;
 
-#define		MODE_VALUES			1
-#define		MODE_RANGES			2
-
 #define		MINMAX_MAX_VALUES	64
 
 typedef struct MinmaxMultiRanges
@@ -39,12 +36,13 @@ typedef struct MinmaxMultiRanges
 	/* varlena header (do not touch directly!) */
 	int32	vl_len_;
 
-	int		mode;		/* values or ranges in the data array */
+	/* maxvalues >= (2*nranges + nvalues) */
 	int		maxvalues;	/* maximum number of values in the buffer */
-	int		numvalues;	/* number of values in the data buffer */
+	int		nranges;	/* number of ranges stored in the array */
+	int		nvalues;	/* number of values in the data array */
 
 	/* values stored for this range - either raw values, or ranges */
-	Datum 	data[FLEXIBLE_ARRAY_MEMBER];
+	Datum 	values[FLEXIBLE_ARRAY_MEMBER];
 
 } MinmaxMultiRanges;
 
@@ -68,13 +66,11 @@ minmax_multi_init(int maxvalues)
 	/*
 	 * Allocate the range list with space for the max number of values.
 	 */
-	len = offsetof(MinmaxMultiRanges, data) + maxvalues * sizeof(Datum);
+	len = offsetof(MinmaxMultiRanges, values) + maxvalues * sizeof(Datum);
 
 	ranges = (MinmaxMultiRanges *) palloc0(len);
 
-	ranges->mode = MINMAX_MAX_VALUES;
 	ranges->maxvalues = maxvalues;
-	ranges->mode = MODE_VALUES;	/* start by accumulating values */
 
 	SET_VARSIZE(ranges, len);
 
@@ -87,6 +83,12 @@ typedef struct compare_context
 	Oid			colloid;
 } compare_context;
 
+typedef struct DatumRange {
+	Datum	minval;
+	Datum	maxval;
+	bool	collapsed;
+} DatumRange;
+
 static int
 compare_values(const void *a, const void *b, void *arg)
 {
@@ -109,27 +111,77 @@ compare_values(const void *a, const void *b, void *arg)
 	return 0;
 }
 
+static int
+compare_ranges(const void *a, const void *b, void *arg)
+{
+	DatumRange ra = *(DatumRange *)a;
+	DatumRange rb = *(DatumRange *)b;
+	Datum r;
+
+	compare_context *cxt = (compare_context *)arg;
+
+	r = FunctionCall2Coll(cxt->cmpFn, cxt->colloid, ra.minval, rb.minval);
+
+	if (DatumGetBool(r))
+		return -1;
+
+	r = FunctionCall2Coll(cxt->cmpFn, cxt->colloid, rb.minval, ra.minval);
+
+	if (DatumGetBool(r))
+		return 1;
+
+	return 0;
+}
+
+/*
 static void
 print_range(char * label, int numvalues, Datum *values)
 {
-	int i;
+	int idx;
 	StringInfoData str;
 
 	initStringInfo(&str);
 
-	for (i = 0; i < (numvalues/2); i++)
+	idx = 0;
+	while (idx < 2*ranges->nranges)
+	{
+		if (idx == 0)
+			appendStringInfoString(&str, "RANGES: [");
+		else
+			appendStringInfoString(&str, ", ");
+
+		appendStringInfo(&str, "%d => [%.9f, %.9f]", idx/2, DatumGetFloat8(values[idx]), DatumGetFloat8(values[idx+1]));
+
+		idx += 2;
+	}
+
+	if (ranges->nranges > 0)
+		appendStringInfoString(&str, "]");
+
+	if ((ranges->nranges > 0) && (ranges->nvalues > 0))
+		appendStringInfoString(&str, " ");
+
+	while (idx < 2*ranges->nranges + ranges->nvalues)
 	{
-		if (i > 0)
+		if (idx == 2*ranges->nranges)
+			appendStringInfoString(&str, "VALUES: [");
+		else
 			appendStringInfoString(&str, ", ");
 
-		appendStringInfo(&str, "\n\t%d => [%.9f, %.9f]", i, DatumGetFloat8(values[2*i]), DatumGetFloat8(values[2*i+1]));
+		appendStringInfo(&str, "%.9f", DatumGetFloat8(values[idx]));
+
+		idx++;
 	}
 
+	if (ranges->nvalues > 0)
+		appendStringInfoString(&str, "]");
+
 	elog(WARNING, "%s : %s", label, str.data);
 
 	resetStringInfo(&str);
 	pfree(str.data);
 }
+*/
 
 /*
  * minmax_multi_contains_value
@@ -143,33 +195,18 @@ minmax_multi_contains_value(BrinDesc *bdesc, Oid colloid,
 	int			i;
 	FmgrInfo   *cmpFn;
 
-	Datum a, b;
-
-	memcpy(&a, &ranges->data[0], sizeof(Datum));
-	memcpy(&b, &ranges->data[ranges->numvalues-1], sizeof(Datum));
-
 	/*
-	 * If we're still collecting exact values, let's see if we have an
-	 * exact match by using equality strategy.
+	 * First inspect the ranges, if there are any. We first check the whole
+	 * range, and only when there's still a chance of getting a match we
+	 * inspect the individual ranges.
 	 */
-	if (ranges->mode == MODE_VALUES)
+	if (ranges->nranges > 0)
 	{
-		cmpFn = minmax_multi_get_strategy_procinfo(bdesc, attno, typid,
-											 BTEqualStrategyNumber);
+		Datum	compar;
+		bool	match = true;
 
-		for (i = 0; i < ranges->numvalues; i++)
-		{
-			Datum compar
-				= FunctionCall2Coll(cmpFn, colloid, newval, ranges->data[i]);
-
-			/* found an exact match */
-			if (DatumGetBool(compar))
-				return true;
-		}
-	}
-	else /* MODE_RANGES */
-	{
-		Datum compar;
+		Datum	minvalue = ranges->values[0];
+		Datum	maxvalue = ranges->values[2*ranges->nranges - 1];
 
 		/*
 		 * Otherwise, need to compare the new value with boundaries of all
@@ -178,36 +215,36 @@ minmax_multi_contains_value(BrinDesc *bdesc, Oid colloid,
 		 */
 		cmpFn = minmax_multi_get_strategy_procinfo(bdesc, attno, typid,
 											 BTLessStrategyNumber);
-		compar = FunctionCall2Coll(cmpFn, colloid, newval, ranges->data[0]);
+		compar = FunctionCall2Coll(cmpFn, colloid, newval, minvalue);
 
 		/* smaller than the smallest value in the range list */
 		if (DatumGetBool(compar))
-			return false;
+			match = false;
 
 		/*
 		 * And now compare it to the existing maximum (last value in the
-		 * data array).
+		 * data array). But only if we haven't already ruled out a possible
+		 * match in the minvalue check.
 		 */
-		cmpFn = minmax_multi_get_strategy_procinfo(bdesc, attno, typid,
-											 BTGreaterStrategyNumber);
-		compar = FunctionCall2Coll(cmpFn, colloid, newval,
-								   ranges->data[ranges->numvalues-1]);
-		if (DatumGetBool(compar))
-			return false;
+		if (match)
+		{
+			cmpFn = minmax_multi_get_strategy_procinfo(bdesc, attno, typid,
+												BTGreaterStrategyNumber);
+			compar = FunctionCall2Coll(cmpFn, colloid, newval, maxvalue);
+
+			if (DatumGetBool(compar))
+				match = false;
+		}
 
 		/*
-		 * So it's in the general range, but is it actually covered by
-		 * any of the ranges? Each range uses two values.
+		 * So it's in the general range, but is it actually covered by any
+		 * of the ranges? Repeat the check for each range.
 		 */
-		for (i = 0; i < (ranges->numvalues/2); i++)
+		for (i = 0; i < ranges->nranges && match; i++)
 		{
-			Datum	minvalue = 0;
-			Datum	maxvalue = 0;
-			Datum	compar;
-
 			/* copy the min/max values from the ranges */
-			minvalue = ranges->data[2*i];
-			maxvalue = ranges->data[2*i+1];
+			minvalue = ranges->values[2*i];
+			maxvalue = ranges->values[2*i+1];
 
 			/*
 			 * Otherwise, need to compare the new value with boundaries of all
@@ -235,7 +272,22 @@ minmax_multi_contains_value(BrinDesc *bdesc, Oid colloid,
 		}
 	}
 
-	/* the value is not covered by the BRIN tuple */
+	/* so we're done with the ranges, now let's inspect the exact values */
+	for (i = 2*ranges->nranges; i < 2*ranges->nranges + ranges->nvalues; i++)
+	{
+		Datum compar;
+
+		cmpFn = minmax_multi_get_strategy_procinfo(bdesc, attno, typid,
+											 BTEqualStrategyNumber);
+
+		compar = FunctionCall2Coll(cmpFn, colloid, newval, ranges->values[i]);
+
+		/* found an exact match */
+		if (DatumGetBool(compar))
+			return true;
+	}
+
+	/* the value is not covered by this BRIN tuple */
 	return false;
 }
 
@@ -250,110 +302,207 @@ minmax_multi_add_value(BrinDesc *bdesc, Oid colloid,
 					   MinmaxMultiRanges *ranges, Datum newval)
 {
 	int			i;
-	Datum		values[MINMAX_MAX_VALUES+2];
-	int			nvalues;
-	double		mindistance;
-	int			minidx;
 
 	/* context for sorting */
 	compare_context cxt;
 
+	Assert(ranges->maxvalues >= 2*ranges->nranges + ranges->nvalues);
+
 	/*
-	 * If we're still collecting exact values, let's see if there's a bit
-	 * more space for one additional one.
+	 * If there's space in the values array, copy it in and we're done.
+	 *
+	 * If we get duplicates, it doesn't matter as we'll deduplicate the
+	 * values later.
 	 */
-	if (ranges->mode == MODE_VALUES)
+	if (ranges->maxvalues > 2*ranges->nranges + ranges->nvalues)
 	{
-		/* yep, there's free space, so let's just squash it in */
-		if (ranges->numvalues < ranges->maxvalues)
-		{
-			ranges->data[ranges->numvalues++] = newval;
-			return;
-		}
+		ranges->values[2*ranges->nranges + ranges->nvalues] = newval;
+		ranges->nvalues++;
+		return;
+	}
 
+	/*
+	 * There's not enough space, so try deduplicating the values array,
+	 * including the new value.
+	 *
+	 * XXX maybe try deduplicating using memcmp first, instead of using
+	 * the (possibly) fairly complex/expensive comparator.
+	 *
+	 * XXX The if is somewhat unnecessary, because nvalues is always >= 0
+	 * so we do this always.
+	 */
+	if (ranges->nvalues >= 0)
+	{
+		FmgrInfo   *cmpFn;
+		int			nvalues = ranges->nvalues + 1;	/* space for newval */
+		Datum	   *values = palloc(sizeof(Datum) * nvalues);
+		int			idx;
+		DatumRange *ranges_tmp;
+		int			nranges;
+		int			count;
+
+		/* sort the values */
 		cxt.colloid = colloid;
 		cxt.cmpFn = minmax_multi_get_strategy_procinfo(bdesc, attno, attr->atttypid,
 												 BTLessStrategyNumber);
 
+		/* copy the existing value and the new value too */
+		memcpy(values, &ranges->values[2*ranges->nranges], ranges->nvalues * sizeof(Datum));
+		values[ranges->nvalues] = newval;
+
+		/* the actual sort of all the values */
+		qsort_arg(values, nvalues, sizeof(Datum), compare_values, (void *) &cxt);
+
+		/* equality for duplicate detection */
+		cmpFn = minmax_multi_get_strategy_procinfo(bdesc, attno, attr->atttypid,
+												 BTEqualStrategyNumber);
+
+		/* keep the first value */
+		idx = 1;
+		for (i = 1; i < nvalues; i++)
+		{
+			Datum compar;
+
+			/* is this a new value (different from the previous one)? */
+			compar = FunctionCall2Coll(cmpFn, colloid, values[i-1], values[i]);
+
+			/* not equal, we have to store it */
+			if (!DatumGetBool(compar))
+				values[idx++] = values[i];
+		}
+
 		/*
-		 * Nope, we have to switch to list of ranges. We simply sort the
-		 * values and set the mode to MODE_RANGES.
+		 * Have we managed to reduce the number of values? if yes, we can just
+		 * copy it back and we're done.
 		 */
-		qsort_arg(ranges->data, ranges->numvalues, sizeof(Datum),
-				  compare_values, (void *) &cxt);
+		if (idx < nvalues)
+		{
+			memcpy(&ranges->values[2*ranges->nranges], values, idx * sizeof(Datum));
+			ranges->nvalues = idx;
+			pfree(values);
+			return;
+		}
 
-		ranges->mode = MODE_RANGES;
-	}
+		Assert(idx == nvalues);
 
-	Assert(ranges->mode == MODE_RANGES);
-	Assert(ranges->maxvalues == ranges->numvalues);
+		/*
+		 * Nope, that didn't work, we have to merge some of the ranges. To do
+		 * that we'll turn the values to "collapsed" ranges (min==max), and
+		 * then merge a bunch of "closest ranges to cut the space requirements
+		 * in half.
+		 *
+		 * XXX Do a merge sort, instead of just using qsort.
+		 */
+		nranges = (ranges->nranges + nvalues);
+		ranges_tmp = palloc0(sizeof(DatumRange) * nranges);
 
-	// print_range("START", ranges->numvalues, ranges->data);
+		idx = 0;
 
-	/*
-	 * In the range mode we're guaranteed to have a full data array, because
-	 * that's the only situation we switch to MODE_RANGES. So we need to
-	 * decide which two ranges to merge, to reduce the number of ranges
-	 * to fit back into the BRIN tuple.
-	 *
-	 * We do that by treating the new value as a collapsed range, and then
-	 * merge the two ranges closest to each other.
-	 */
+		/* ranges */
+		for (i = 0; i < ranges->nranges; i++)
+		{
+			ranges_tmp[idx].minval = ranges->values[2*i];
+			ranges_tmp[idx].maxval = ranges->values[2*i+1];
+			ranges_tmp[idx].collapsed = false;
+			idx++;
+		}
 
-	nvalues = ranges->numvalues;
-	memcpy(values, ranges->data, ranges->numvalues * sizeof(Datum));
+		/* values as collapsed ranges */
+		for (i = 0; i < nvalues; i++)
+		{
+			ranges_tmp[idx].minval = values[i];
+			ranges_tmp[idx].maxval = values[i];
+			ranges_tmp[idx].collapsed = true;
+			idx++;
+		}
 
-	/* add the new value as a collapsed range (that's why we add it twice) */
-	memcpy(&values[nvalues++], &newval, sizeof(Datum));
-	memcpy(&values[nvalues++], &newval, sizeof(Datum));
+		Assert(idx == nranges);
 
-	/* sort the data again */
-	cxt.colloid = colloid;
-	cxt.cmpFn = minmax_multi_get_strategy_procinfo(bdesc, attno, attr->atttypid,
-											 BTLessStrategyNumber);
+		/* sort the ranges */
+		qsort_arg(ranges_tmp, nranges, sizeof(DatumRange), compare_ranges, (void *) &cxt);
 
-	qsort_arg(values, nvalues, sizeof(Datum), compare_values, (void *) &cxt);
+		/* Now combine as many ranges until the number of values to store
+		 * gets to half of MINMAX_MAX_VALUES. The collapsed ranges will be
+		 * stored as a single value.
+		 */
+		count = ranges->nranges * 2 + nvalues;
 
-	// print_range("MIDDLE", nvalues, values);
+		while (count > MINMAX_MAX_VALUES/2)
+		{
+			int		minidx = 0;
+			double	mindistance = DatumGetFloat8(ranges_tmp[1].minval) - DatumGetFloat8(ranges_tmp[0].maxval);
 
-	/*
-	 * now look for the two ranges closest to each other
-	 *
-	 * FIXME This simply uses DatumGetFloat8, which works for timestamps,
-	 * but we need to support custom fuctions too. 
-	 */
-	minidx = 0;
-	mindistance = DatumGetFloat8(values[2]) - DatumGetFloat8(values[1]);
+			/* pick the two closest ranges */
+			for (i = 1; i < (nranges-1); i++)
+			{
+				double	distance = DatumGetFloat8(ranges_tmp[i+1].minval) - DatumGetFloat8(ranges_tmp[i-1].maxval);
+				if (distance < mindistance)
+				{
+					mindistance = distance;
+					minidx = i;
+				}
+			}
 
-	/* extra range at the end, for the collapsed range we added */
-	for (i = 0; i < (ranges->maxvalues/2); i++)
-	{
-		double distance
-			= DatumGetFloat8(values[2*(i+1)]) - DatumGetFloat8(values[2*(i+1)-1]);
+			/*
+			 * Update the count of Datum values we need to store, depending
+			 * on what type of ranges we merged.
+			 *
+			 * 2 - when both ranges are 'regular'
+			 * 1 - when regular + collapsed
+			 * 0 - when both collapsed
+			 */
+			if (!ranges_tmp[minidx].collapsed && !ranges_tmp[minidx+1].collapsed)	/* both regular */
+				count -= 2;
+			else if (!ranges_tmp[minidx].collapsed || !ranges_tmp[minidx+1].collapsed) /* one regular */
+				count -= 1;
 
-		if (distance < mindistance)
-		{
-			mindistance = distance;
-			minidx = i;
-		}
-	}
+			/*
+			 * combine the two selected ranges, the new range is definiely
+			 * not collapsed
+			 */
+			ranges_tmp[minidx].maxval = ranges_tmp[minidx+1].maxval;
+			ranges_tmp[minidx].collapsed = false;
 
-	memset(ranges->data, 0, ranges->maxvalues * sizeof(Datum));
+			for (i = minidx+1; i < nranges-1; i++)
+				ranges_tmp[i] = ranges_tmp[i+1];
 
-	/*
-	 * Now we know which ranges to merge, which we do simply while copying
-	 * the data back into the range list.
-	 */
-	memcpy(ranges->data, values, (2*minidx + 1) * sizeof(Datum));
+			nranges--;
 
-	// print_range("END1", ranges->numvalues, ranges->data);
+			/*
+			 * we can never get zero values
+			 *
+			 * XXX Actually we should never get below (MINMAX_MAX_VALUES/2 - 1)
+			 * values or so.
+			 */
+			Assert(count > 0);
+		}
 
-	memcpy(ranges->data + (2*minidx + 1), &values[2*minidx+3], (ranges->maxvalues - 2*minidx-1) * sizeof(Datum));
+		/* first copy in the regular ranges */
+		ranges->nranges = 0;
+		for (i = 0; i < nranges; i++)
+		{
+			if (!ranges_tmp[i].collapsed)
+			{
+				ranges->values[2*ranges->nranges    ] = ranges_tmp[i].minval;
+				ranges->values[2*ranges->nranges + 1] = ranges_tmp[i].maxval;
+				ranges->nranges++;
+			}
+		}
 
-	// print_range("END2", ranges->numvalues, ranges->data);
+		/* now copy in the collapsed ones */
+		ranges->nvalues = 0;
+		for (i = 0; i < nranges; i++)
+		{
+			if (ranges_tmp[i].collapsed)
+			{
+				ranges->values[2*ranges->nranges + ranges->nvalues] = ranges_tmp[i].minval;
+				ranges->nvalues++;
+			}
+		}
 
-	Assert(minmax_multi_contains_value(bdesc, colloid, attno, attr->atttypid,
-			ranges, newval));
+		pfree(ranges_tmp);
+		pfree(values);
+	}
 }
 
 
@@ -439,7 +588,7 @@ brin_minmax_multi_add_value(PG_FUNCTION_ARGS)
 	/* */
 	minmax_multi_add_value(bdesc, colloid, attno, attr, ranges, newval);
 
-	PG_RETURN_BOOL(updated);
+	PG_RETURN_BOOL(true);
 }
 
 /*
@@ -462,6 +611,7 @@ brin_minmax_multi_consistent(PG_FUNCTION_ARGS)
 	MinmaxMultiRanges	*ranges;
 	int			keyno;
 	int			rangeno;
+	int			i;
 
 	/*
 	 * First check if there are any IS NULL scan keys, and if we're
@@ -512,10 +662,10 @@ brin_minmax_multi_consistent(PG_FUNCTION_ARGS)
 	ranges = (MinmaxMultiRanges *) DatumGetPointer(column->bv_values[0]);
 
 	/* inspect the ranges, and for each one evaluate the scan keys */
-	for (rangeno = 0; rangeno < (ranges->maxvalues/2); rangeno++)
+	for (rangeno = 0; rangeno < ranges->nranges; rangeno++)
 	{
-		Datum	minval = ranges->data[2*rangeno];
-		Datum	maxval = ranges->data[2*rangeno+1];
+		Datum	minval = ranges->values[2*rangeno];
+		Datum	maxval = ranges->values[2*rangeno+1];
 
 		/* assume the range is matching, and we'll try to prove otherwise */
 		bool	matching = true;
@@ -543,11 +693,39 @@ brin_minmax_multi_consistent(PG_FUNCTION_ARGS)
 					break;
 
 				case BTEqualStrategyNumber:
-					/* FIXME this inspects the whole range again ... */
-					matches = minmax_multi_contains_value(bdesc, colloid, attno,
-														  subtype, ranges, value);
-					break;
+				{
+					Datum		compar;
+					FmgrInfo   *cmpFn;
 
+					/* by default this range does not match */
+					matches = false;
+
+					/*
+					 * Otherwise, need to compare the new value with boundaries of all
+					 * the ranges. First check if it's less than the absolute minimum,
+					 * which is the first value in the array.
+					 */
+					cmpFn = minmax_multi_get_strategy_procinfo(bdesc, attno, subtype,
+															   BTLessStrategyNumber);
+					compar = FunctionCall2Coll(cmpFn, colloid, value, minval);
+
+					/* smaller than the smallest value in this range */
+					if (DatumGetBool(compar))
+						break;
+
+					cmpFn = minmax_multi_get_strategy_procinfo(bdesc, attno, subtype,
+															   BTGreaterStrategyNumber);
+					compar = FunctionCall2Coll(cmpFn, colloid, value, maxval);
+
+					/* larger than the largest value in this range */
+					if (DatumGetBool(compar))
+						break;
+
+					/* haven't managed to eliminate this range, so consider it matching */
+					matches = true;
+
+					break;
+				}
 				case BTGreaterEqualStrategyNumber:
 				case BTGreaterStrategyNumber:
 					finfo = minmax_multi_get_strategy_procinfo(bdesc, attno, subtype,
@@ -577,6 +755,60 @@ brin_minmax_multi_consistent(PG_FUNCTION_ARGS)
 			PG_RETURN_DATUM(BoolGetDatum(true));
 	}
 
+	/* and now inspect the values */
+	for (i = 0; i < ranges->nvalues; i++)
+	{
+		Datum	val = ranges->values[2*ranges->nranges + i];
+
+		/* assume the range is matching, and we'll try to prove otherwise */
+		bool	matching = true;
+
+		for (keyno = 0; keyno < nkeys; keyno++)
+		{
+			Datum	matches;
+			ScanKey	key = keys[keyno];
+
+			/* we've already dealt with NULL keys at the beginning */
+			if (key->sk_flags & SK_ISNULL)
+				continue;
+
+			attno = key->sk_attno;
+			subtype = key->sk_subtype;
+			value = key->sk_argument;
+			switch (key->sk_strategy)
+			{
+				case BTLessStrategyNumber:
+				case BTLessEqualStrategyNumber:
+				case BTEqualStrategyNumber:
+				case BTGreaterEqualStrategyNumber:
+				case BTGreaterStrategyNumber:
+				
+					finfo = minmax_multi_get_strategy_procinfo(bdesc, attno, subtype,
+															   key->sk_strategy);
+					matches = FunctionCall2Coll(finfo, colloid, value, val);
+					break;
+
+				default:
+					/* shouldn't happen */
+					elog(ERROR, "invalid strategy number %d", key->sk_strategy);
+					matches = 0;
+					break;
+			}
+
+			/* the range has to match all the scan keys */
+			matching &= DatumGetBool(matches);
+
+			/* once we find a non-matching key, we're done */
+			if (! matching)
+				break;
+		}
+
+		/* have we found a range matching all scan keys? if yes, we're
+		 * done */
+		if (matching)
+			PG_RETURN_DATUM(BoolGetDatum(true));
+	}
+
 	PG_RETURN_DATUM(BoolGetDatum(false));
 }
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to