Hi,

A couple days ago I posted a WIP patch [1] implementing "BRIN Sort",
i.e. a node producing sorted output for BRIN minmax indexes. One of the
challenges I mentioned in that thread is costing - that's actually not
specific to that patch, it's an issue affecting BRIN in general, not
just the proposed node, due to block-range indexes being very different
from regular indexes with explicit tuple pointers.

I mentioned I have some ideas how to improve this, and that I'll start a
separate thread to discuss this. So here we go ...


The traditional estimation problem is roughly this:

    Given a condition, how many rows will match it?

That is, given a table with X rows, we need to estimate how many rows
will match a WHERE condition, for example. And once we have the row
estimate, we can estimate the amount of I/O, cost for sorting, etc.

We have built fairly solid capability to calculate these estimates,
using per-column statistics, extended statistics, ... The calculated
estimates are not always perfect, but in general it works well.

This affects all path types etc. mostly equally - yes, some paths are
more sensitive to poor estimates (e.g. runtime may grow exponentially
with increasing rowcount).


BRIN indexes however add another layers to this - once we have estimated
the number of rows, we need to estimate the number of pages ranges this
maps to. You may estimate the WHERE condition to match 1000 rows, but
then you need to decide if that's 1 page range, 1000 page ranges or
possibly even all page ranges for the table.

It all depends on how "correlated" the data is with physical position in
the table. If you have perfectly correlated data, it may be enough to
scan a single page. If it's random, you may need to scan everything.

The existing costing uses the column correlation statistics, but sadly
that's rather insensitive to outlier values. If you have a sequential
table, and then set 1% of data to min/max (making the ranges very wide),
the correlation will remain very close to 1.0, but you'll have to scan
all the ranges (and the costing won't reflect that).

The "BRIN sort" patch needs to estimate a different thing - given a page
range, how many other page ranges overlap with it? This is roughly the
amount of stuff we'll need to scan and sort in order to produce the
first row.

These are all things we can't currently estimate - we have some rough
heuristics, but it's pretty easy to confuse those.

Therefore, I propose to calculate a couple new statistics for BRIN
indexes (assume minmax indexes, unless mentioned otherwise):


1) average number of overlapping ranges
---------------------------------------

Given a range, with how many ranges it overlaps? In a perfectly
sequential table this will be 0, so if you have a value you know it'll
match just one range. In random table, it'll be pretty close to the
number of page ranges.

This can be calculated by simply walking the ranges, sorted by minval
(see brin_minmax_count_overlaps).


2) average number of matching ranges for a value
------------------------------------------------

Given a value, how many ranges it matches? This can be calculated by
matching sampled rows to ranges (brin_minmax_match_tuples_to_ranges).

For minmax indexes this is somewhat complementary to the average number
of overlaps, the relationship is roughly this:

  avg(# of matching ranges) = 1 + avg(number of overlapping ranges)/2

The intuition is that if you assume a range randomly overlapped by other
ranges, you're likely to hit about 1/2 of them.

The reason why we want to calculate both (1) and (2) is that for other
opclasses the relationship is not that simple. For bloom opclasses we
probably can't calculate overlaps at all (or at least not that easily),
so the average number of matches is all we have. For minmax-multi, the
overlaps will probably use only the min/max values, ignoring the "gaps",
but the matches should use the gaps.


3) a bunch of other simple statistics
-------------------------------------

These are number of summarized / not-summarized ranges, all_nulls and
has_nulls ranges, which is useful to estimate IS NULL conditions etc.


The attached patch implements a PoC of this. There's a new GUC
(enable_indexam_stats) that can be used to enable/disable this (both the
ANALYZE and costing part). By default it's "off" so make sure to do

  SET enable_indexam_stats = true;

The statistics is stored in pg_statistics catalog, in a new staindexam
column (with bytea). The opclasses can implement a new support
procedure, similarly to what we do of opclass options. There's a couple
of wrinkles (should be explained in XXX comments), but in principle this
works.

The brin_minmax_stats procedure implements this for minmax opclasses,
calculating the stuff mentioned above. I've been experimenting with
different ways to calculate some of the stuff, and ANALYZE prints info
about the calculated values and timings (this can be disabled by
removing the STATS_CROSS_CHECK define).

Finally, brincostestimate() loads the statistics and uses it for
costing. At the moment it uses only the average number of overlaps.

Trivial example:

create table t (a int) with (fillfactor = 10);

  insert into t
  select (case when mod(i,22) = 0 then 100000000
               when mod(i,22) = 1 then 0
               else i end)
    from generate_series(1,300000) s(i);

  create index on t using brin (a) with (pages_per_range = 1);

The table fits 22 rows per page, and the data is mostly sequential,
except that every page has both 0 and 100000000. The correlation however
remains fairly high:

  # select correlation from pg_stats where tablename = 't';
   correlation
  -------------
     0.8303595
  (1 row)

Now, let's do a simple query:

# explain (analyze, buffers, timing off) select * from t where a = 500;

                              QUERY PLAN
------------------------------------------------------------------------
 Bitmap Heap Scan on t  (cost=154.00..254.92 rows=2 width=4)
                        (actual rows=1 loops=1)
   Recheck Cond: (a = 500)
   Rows Removed by Index Recheck: 299999
   Heap Blocks: lossy=13637
   Buffers: shared hit=13695
   ->  Bitmap Index Scan on t_a_idx  (cost=0.00..154.00 rows=26 width=0)
                                     (actual rows=136370 loops=1)
         Index Cond: (a = 500)
         Buffers: shared hit=58
 Planning:
   Buffers: shared hit=1
 Planning Time: 0.173 ms
 Execution Time: 101.972 ms
(12 rows)

That's pretty poor, because brincostestimate() still thinks it'll be
enough to read one or two page ranges (because 1/0.8 = ~1.2).

Now, with the extra statistics:

SET enable_indexam_stats = true;
ANALYZE t;

                               QUERY PLAN
----------------------------------------------------------------------
 Bitmap Heap Scan on t  (cost=157.41..17544.41 rows=2 width=4)
                        (actual rows=1 loops=1)
   Recheck Cond: (a = 500)
   Rows Removed by Index Recheck: 299999
   Heap Blocks: lossy=13637
   Buffers: shared hit=13695
   ->  Bitmap Index Scan on t_a_idx  (cost=0.00..157.41 rows=300000
                               width=0) (actual rows=136370 loops=1)
         Index Cond: (a = 500)
         Buffers: shared hit=58
 Planning:
   Buffers: shared hit=1
 Planning Time: 0.230 ms
 Execution Time: 104.603 ms
(12 rows)

So in this case we realize we actually have to scan the whole table, all
~13637 ranges, and the cost reflects that.

Feel free to experiment with other data sets.


regards

[1]
https://www.postgresql.org/message-id/e70fa091-e338-1598-9de4-6d0ef6b693e2%40enterprisedb.com

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 20b7d65b948..d2c30336981 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -95,6 +95,7 @@ brinhandler(PG_FUNCTION_ARGS)
 	amroutine->amstrategies = 0;
 	amroutine->amsupport = BRIN_LAST_OPTIONAL_PROCNUM;
 	amroutine->amoptsprocnum = BRIN_PROCNUM_OPTIONS;
+	amroutine->amstatsprocnum = BRIN_PROCNUM_STATISTICS;
 	amroutine->amcanorder = false;
 	amroutine->amcanorderbyop = false;
 	amroutine->amcanbackward = false;
diff --git a/src/backend/access/brin/brin_minmax.c b/src/backend/access/brin/brin_minmax.c
index 9e8a8e056cc..6db1aa885c0 100644
--- a/src/backend/access/brin/brin_minmax.c
+++ b/src/backend/access/brin/brin_minmax.c
@@ -10,17 +10,22 @@
  */
 #include "postgres.h"
 
+#include "access/brin.h"
 #include "access/brin_internal.h"
+#include "access/brin_revmap.h"
 #include "access/brin_tuple.h"
 #include "access/genam.h"
 #include "access/stratnum.h"
 #include "catalog/pg_amop.h"
 #include "catalog/pg_type.h"
+#include "miscadmin.h"
+#include "storage/bufmgr.h"
 #include "utils/builtins.h"
 #include "utils/datum.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
+#include "utils/timestamp.h"
 
 typedef struct MinmaxOpaque
 {
@@ -31,6 +36,7 @@ typedef struct MinmaxOpaque
 static FmgrInfo *minmax_get_strategy_procinfo(BrinDesc *bdesc, uint16 attno,
 											  Oid subtype, uint16 strategynum);
 
+#define STATS_CROSS_CHECK
 
 Datum
 brin_minmax_opcinfo(PG_FUNCTION_ARGS)
@@ -253,6 +259,1177 @@ brin_minmax_union(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+/* FIXME copy of a private struct from brin.c */
+typedef struct c
+{
+	BlockNumber bo_pagesPerRange;
+	BrinRevmap *bo_rmAccess;
+	BrinDesc   *bo_bdesc;
+} BrinOpaque;
+
+/*
+ * Compare ranges by minval (collation and operator are taken from the extra
+ * argument, which is expected to be TypeCacheEntry).
+ */
+static int
+range_minval_cmp(const void *a, const void *b, void *arg)
+{
+	BrinRange *ra = *(BrinRange **) a;
+	BrinRange *rb = *(BrinRange **) b;
+	TypeCacheEntry *typentry = (TypeCacheEntry *) arg;
+	FmgrInfo   *cmpfunc = &typentry->cmp_proc_finfo;
+	Datum	c;
+
+	c = FunctionCall2Coll(cmpfunc, typentry->typcollation,
+						  ra->min_value, rb->min_value);
+	return DatumGetInt32(c);
+}
+
+/*
+ * Compare ranges by maxval (collation and operator are taken from the extra
+ * argument, which is expected to be TypeCacheEntry).
+ */
+static int
+range_maxval_cmp(const void *a, const void *b, void *arg)
+{
+	BrinRange *ra = *(BrinRange **) a;
+	BrinRange *rb = *(BrinRange **) b;
+	TypeCacheEntry *typentry = (TypeCacheEntry *) arg;
+	FmgrInfo   *cmpfunc = &typentry->cmp_proc_finfo;
+	Datum	c;
+
+	c = FunctionCall2Coll(cmpfunc, typentry->typcollation,
+						  ra->max_value, rb->max_value);
+	return DatumGetInt32(c);
+}
+
+/* compare values using an operator from typcache */
+static int
+range_values_cmp(const void *a, const void *b, void *arg)
+{
+	Datum	da = * (Datum *) a;
+	Datum	db = * (Datum *) b;
+	TypeCacheEntry *typentry = (TypeCacheEntry *) arg;
+	FmgrInfo   *cmpfunc = &typentry->cmp_proc_finfo;
+	Datum	c;
+
+	c = FunctionCall2Coll(cmpfunc, typentry->typcollation,
+						  da, db);
+	return DatumGetInt32(c);
+}
+
+/*
+ * maxval_start
+ *		Determine first index so that (maxvalue >= value).
+ *
+ * The array of ranges is expected to be sorted by maxvalue, so this is the first
+ * range that can possibly intersect with range having "value" as minval.
+ */
+static int
+maxval_start(BrinRange **ranges, int nranges, Datum value, TypeCacheEntry *typcache)
+{
+	int		start = 0,
+			end = (nranges - 1);
+
+	// everything matches
+	if (range_values_cmp(&value, &ranges[start]->max_value, typcache) <= 0)
+		return 0;
+
+	// no matches
+	if (range_values_cmp(&value, &ranges[end]->max_value, typcache) > 0)
+		return nranges;
+
+	while ((end - start) > 0)
+	{
+		int	midpoint;
+		int	r;
+
+		midpoint = start + (end - start) / 2;
+
+		r = range_values_cmp(&value, &ranges[midpoint]->max_value, typcache);
+
+		if (r <= 0)
+			end = midpoint;
+		else
+			start = (midpoint + 1);
+	}
+
+	Assert(ranges[start]->max_value >= value);
+	Assert(ranges[start-1]->max_value < value);
+
+	return start;
+}
+
+/*
+ * minval_end
+ *		Determine first index so that (minval > value).
+ *
+ * The array of ranges is expected to be sorted by minvalue, so this is the first
+ * range that can't possibly intersect with a range having "value" as maxval.
+ */
+static int
+minval_end(BrinRange **ranges, int nranges, Datum value, TypeCacheEntry *typcache)
+{
+	int		start = 0,
+			end = (nranges - 1);
+
+	// everything matches
+	if (range_values_cmp(&value, &ranges[end]->min_value, typcache) >= 0)
+		return nranges;
+
+	// no matches
+	if (range_values_cmp(&value, &ranges[start]->min_value, typcache) < 0)
+		return 0;
+
+	while ((end - start) > 0)
+	{
+		int midpoint;
+		int r;
+
+		midpoint = start + (end - start) / 2;
+
+		r = range_values_cmp(&value, &ranges[midpoint]->min_value, typcache);
+
+		if (r >= 0)
+			start = midpoint + 1;
+		else
+			end = midpoint;
+	}
+
+	Assert(ranges[start]->min_value > value);
+	Assert(ranges[start-1]->min_value <= value);
+
+	return start;
+}
+
+
+/*
+ * lower_bound
+ *		Determine first index so that (values[index] >= value).
+ *
+ * The array of ranges is expected to be sorted by maxvalue, so this is the first
+ * range that can possibly intersect with range having "value" as minval.
+ */
+static int
+lower_bound(Datum *values, int nvalues, Datum value, TypeCacheEntry *typcache)
+{
+	int		start = 0,
+			end = (nvalues - 1);
+
+	// everything matches
+	if (range_values_cmp(&value, &values[start], typcache) <= 0)
+		return 0;
+
+	// no matches
+	if (range_values_cmp(&value, &values[end], typcache) > 0)
+		return nvalues;
+
+	while ((end - start) > 0)
+	{
+		int	midpoint;
+		int	r;
+
+		midpoint = start + (end - start) / 2;
+
+		r = range_values_cmp(&value, &values[midpoint], typcache);
+
+		if (r <= 0)
+			end = midpoint;
+		else
+			start = (midpoint + 1);
+	}
+
+	Assert(values[start] >= value);
+	Assert(values[start-1] < value);
+
+	return start;
+}
+
+/*
+ * upper_bound
+ *		Determine first index so that (values[index] > value).
+ *
+ * The array of ranges is expected to be sorted by minvalue, so this is the first
+ * range that can't possibly intersect with a range having "value" as maxval.
+ */
+static int
+upper_bound(Datum *values, int nvalues, Datum value, TypeCacheEntry *typcache)
+{
+	int		start = 0,
+			end = (nvalues - 1);
+
+	// everything matches
+	if (range_values_cmp(&value, &values[end], typcache) >= 0)
+		return nvalues;
+
+	// no matches
+	if (range_values_cmp(&value, &values[start], typcache) < 0)
+		return 0;
+
+	while ((end - start) > 0)
+	{
+		int midpoint;
+		int r;
+
+		midpoint = start + (end - start) / 2;
+
+		r = range_values_cmp(&value, &values[midpoint], typcache);
+
+		if (r >= 0)
+			start = midpoint + 1;
+		else
+			end = midpoint;
+	}
+
+	Assert(values[start] > value);
+	Assert(values[start-1] <= value);
+
+	return start;
+}
+
+/*
+ * Simple histogram, with bins tracking value and two overlap counts.
+ *
+ * XXX Maybe we should have two separate histograms, one for all counts and
+ * another one for "unique" values.
+ *
+ * XXX Serialize the histogram. There might be a data set where we have very
+ * many distinct buckets (values having very different number of matching
+ * ranges) - not sure if there's some sort of upper limit (but hard to say for
+ * other opclasses, like bloom). And we don't want arbitrarily large histogram,
+ * to keep the statistics fairly small, I guess. So we'd need to pick a subset,
+ * merge buckets with "similar" counts, or approximate it somehow. For now we
+ * don't serialize it, because we don't use the histogram.
+ */
+typedef struct histogram_bin_t
+{
+	int		value;
+	int		count;
+} histogram_bin_t;
+
+typedef struct histogram_t
+{
+	int				nbins;
+	int				nbins_max;
+	histogram_bin_t	bins[FLEXIBLE_ARRAY_MEMBER];
+} histogram_t;
+
+#define HISTOGRAM_BINS_START 32
+
+/* allocate histogram with default number of bins */
+static histogram_t *
+histogram_init(void)
+{
+	histogram_t *hist;
+
+	hist = (histogram_t *) palloc0(offsetof(histogram_t, bins) +
+								   sizeof(histogram_bin_t) * HISTOGRAM_BINS_START);
+	hist->nbins_max = HISTOGRAM_BINS_START;
+
+	return hist;
+}
+
+/*
+ * histogram_add
+ *		Add a hit for a particular value to the histogram.
+ *
+ * XXX We don't sort the bins, so just do binary sort. For large number of values
+ * this might be an issue, for small number of values a linear search is fine.
+ */
+static histogram_t *
+histogram_add(histogram_t *hist, int value)
+{
+	bool	found = false;
+	histogram_bin_t *bin;
+
+	for (int i = 0; i < hist->nbins; i++)
+	{
+		if (hist->bins[i].value == value)
+		{
+			bin = &hist->bins[i];
+			found = true;
+		}
+	}
+
+	if (!found)
+	{
+		if (hist->nbins == hist->nbins_max)
+		{
+			int		nbins = (2 * hist->nbins_max);
+			hist = repalloc(hist, offsetof(histogram_t, bins) +
+								   sizeof(histogram_bin_t) * nbins);
+			hist->nbins_max = nbins;
+		}
+
+		Assert(hist->nbins < hist->nbins_max);
+
+		bin = &hist->bins[hist->nbins++];
+		bin->value = value;
+		bin->count = 0;
+	}
+
+	bin->count += 1;
+
+	Assert(bin->value == value);
+	Assert(bin->count >= 0);
+
+	return hist;
+}
+
+/* used to sort histogram bins by value */
+static int
+histogram_bin_cmp(const void *a, const void *b)
+{
+	histogram_bin_t *ba = (histogram_bin_t *) a;
+	histogram_bin_t *bb = (histogram_bin_t *) b;
+
+	if (ba->value < bb->value)
+		return -1;
+
+	if (bb->value < ba->value)
+		return 1;
+
+	return 0;
+}
+
+static void
+histogram_print(histogram_t *hist)
+{
+	return;
+
+	elog(WARNING, "----- histogram -----");
+	for (int i = 0; i < hist->nbins; i++)
+	{
+		elog(WARNING, "bin %d value %d count %d",
+			 i, hist->bins[i].value, hist->bins[i].count);
+	}
+}
+
+/*
+ * brin_minmax_count_overlaps
+ *		Calculate number of overlaps.
+ *
+ * This uses the minranges to quickly eliminate ranges that can't possibly
+ * intersect. We simply walk minranges until minval > current maxval, and
+ * we're done.
+ *
+ * Unlike brin_minmax_count_overlaps2, this does not have issues with wide
+ * ranges, so this is what we should use.
+ */
+static int
+brin_minmax_count_overlaps(BrinRange **minranges, int nranges, TypeCacheEntry *typcache)
+{
+	int noverlaps;
+
+#ifdef STATS_CROSS_CHECK
+	TimestampTz		start_ts = GetCurrentTimestamp();
+#endif
+
+	noverlaps = 0;
+	for (int i = 0; i < nranges; i++)
+	{
+		Datum	maxval = minranges[i]->max_value;
+
+		/*
+		 * Determine index of the first range with (minval > current maxval)
+		 * by binary search. We know all other ranges can't overlap the
+		 * current one. We simply subtract indexes to count ranges.
+		 */
+		int		idx = minval_end(minranges, nranges, maxval, typcache);
+
+		/* -1 because we don't count the range as intersecting with itself */
+		noverlaps += (idx - i - 1);
+	}
+
+	/*
+	 * We only count 1/2 the ranges (minval > current minval), so the total
+	 * number of overlaps is twice what we counted.
+	 */
+	noverlaps *= 2;
+
+#ifdef STATS_CROSS_CHECK
+	elog(WARNING, "----- brin_minmax_count_overlaps -----");
+	elog(WARNING, "noverlaps = %d", noverlaps);
+	elog(WARNING, "duration = %ld", TimestampDifferenceMilliseconds(start_ts,
+									GetCurrentTimestamp()));
+#endif
+
+	return noverlaps;
+}
+
+#ifdef STATS_CROSS_CHECK
+/*
+ * brin_minmax_count_overlaps2
+ *		Calculate number of overlaps.
+ *
+ * This uses the minranges/maxranges to quickly eliminate ranges that can't
+ * possibly intersect.
+ *
+ * XXX Seems rather complicated and works poorly for wide ranges (with outlier
+ * values), brin_minmax_count_overlaps is likely better.
+ */
+static int
+brin_minmax_count_overlaps2(BrinRanges *ranges,
+						   BrinRange **minranges, BrinRange **maxranges,
+						   TypeCacheEntry *typcache)
+{
+	int noverlaps;
+
+	TimestampTz		start_ts = GetCurrentTimestamp();
+
+	/*
+	 * Walk the ranges ordered by max_values, see how many ranges overlap.
+	 *
+	 * Once we get to a state where (min_value > current.max_value) for
+	 * all future ranges, we know none of them can overlap and we can
+	 * terminate. This is what min_index_lowest is for.
+	 *
+	 * XXX If there are very wide ranges (with outlier min/max values),
+	 * the min_index_lowest is going to be pretty useless, because the
+	 * range will be sorted at the very end by max_value, but will have
+	 * very low min_index, so this won't work.
+	 *
+	 * XXX We could collect a more elaborate stuff, like for example a
+	 * histogram of number of overlaps, or maximum number of overlaps.
+	 * So we'd have average, but then also an info if there are some
+	 * ranges with very many overlaps.
+	 */
+	noverlaps = 0;
+	for (int i = 0; i < ranges->nranges; i++)
+	{
+		int			idx = i+1;
+		BrinRange *ra = maxranges[i];
+		uint64		min_index = ra->min_index;
+
+#ifdef NOT_USED
+		/*
+		 * XXX Not needed, we can just count "future" ranges and then
+		 * we just multiply by 2.
+		 */
+
+		/*
+		 * What's the first range that might overlap with this one?
+		 * needs to have maxval > current.minval.
+		 */
+		while (idx > 0)
+		{
+			BrinRange *rb = maxranges[idx - 1];
+
+			/* the range is before the current one, so can't intersect */
+			if (range_values_cmp(&rb->max_value, &ra->min_value, typcache) < 0)
+				break;
+
+			idx--;
+		}
+#endif
+
+		/*
+		 * Find the first min_index that is higher than the max_value,
+		 * so that we can compare that instead of the values in the
+		 * next loop. There should be fewer value comparisons than in
+		 * the next loop, so we'll save on function calls.
+		 */
+		while (min_index < ranges->nranges)
+		{
+			if (range_values_cmp(&minranges[min_index]->min_value,
+								 &ra->max_value, typcache) > 0)
+				break;
+
+			min_index++;
+		}
+
+		/*
+		 * Walk the following ranges (ordered by max_value), and check
+		 * if it overlaps. If it matches, we look at the next one. If
+		 * not, we check if there can be more ranges.
+		 */
+		for (int j = idx; j < ranges->nranges; j++)
+		{
+			BrinRange *rb = maxranges[j];
+
+			/* the range overlaps - just continue with the next one */
+			// if (range_values_cmp(&rb->min_value, &ra->max_value, typcache) <= 0)
+			if (rb->min_index < min_index)
+			{
+				noverlaps++;
+				continue;
+			}
+
+			/*
+			 * Are there any future ranges that might overlap? We can
+			 * check the min_index_lowest to decide quickly.
+			 */
+			 if (rb->min_index_lowest >= min_index)
+					break;
+		}
+	}
+
+	/*
+	 * We only count intersect for "following" ranges when ordered by maxval,
+	 * so we only see 1/2 the overlaps. So double the result.
+	 */
+	noverlaps *= 2;
+
+	elog(WARNING, "----- brin_minmax_count_overlaps2 -----");
+	elog(WARNING, "noverlaps = %d", noverlaps);
+	elog(WARNING, "duration = %ld", TimestampDifferenceMilliseconds(start_ts,
+									GetCurrentTimestamp()));
+
+	return noverlaps;
+}
+
+/*
+ * brin_minmax_count_overlaps_bruteforce
+ *		Calculate number of overlaps by brute force.
+ *
+ * Actually compares every range to every other range. Quite expensive, used
+ * primarily to cross-check the other algorithms. 
+ */
+static int
+brin_minmax_count_overlaps_bruteforce(BrinRanges *ranges, TypeCacheEntry *typcache)
+{
+	int noverlaps;
+
+	TimestampTz		start_ts = GetCurrentTimestamp();
+
+	/*
+	 * Brute force calculation of overlapping ranges, comparing each
+	 * range to every other range - bound to be pretty expensive, as
+	 * it's pretty much O(N^2). Kept mostly for easy cross-check with
+	 * the preceding "optimized" code.
+	 */
+	noverlaps = 0;
+	for (int i = 0; i < ranges->nranges; i++)
+	{
+		BrinRange *ra = &ranges->ranges[i];
+
+		for (int j = 0; j < ranges->nranges; j++)
+		{
+			BrinRange *rb = &ranges->ranges[j];
+
+			if (i == j)
+				continue;
+
+			if (range_values_cmp(&ra->max_value, &rb->min_value, typcache) < 0)
+				continue;
+
+			if (range_values_cmp(&rb->max_value, &ra->min_value, typcache) < 0)
+				continue;
+
+			elog(DEBUG1, "[%ld,%ld] overlaps [%ld,%ld]",
+				 ra->min_value, ra->max_value,
+				 rb->min_value, rb->max_value);
+
+			noverlaps++;
+		}
+	}
+
+	elog(WARNING, "----- brin_minmax_count_overlaps_bruteforce -----");
+	elog(WARNING, "noverlaps = %d", noverlaps);
+	elog(WARNING, "duration = %ld", TimestampDifferenceMilliseconds(start_ts,
+									GetCurrentTimestamp()));
+
+	return noverlaps;
+}
+#endif
+
+/*
+ * brin_minmax_match_tuples_to_ranges
+ *		Match tuples to ranges, count average number of ranges per tuple.
+ *
+ * Alternative to brin_minmax_match_tuples_to_ranges2, leveraging ordering
+ * of values, not ranges.
+ *
+ * XXX This seems like the optimal way to do this.
+ */
+static void
+brin_minmax_match_tuples_to_ranges(BrinRanges *ranges,
+								   int numrows, HeapTuple *rows,
+								   int nvalues, Datum *values,
+								   TypeCacheEntry *typcache,
+								   int *res_nmatches,
+								   int *res_nmatches_unique,
+								   int *res_nvalues_unique)
+{
+	int		nmatches = 0;
+	int		nmatches_unique = 0;
+	int		nvalues_unique = 0;
+	int		nmatches_value = 0;
+
+	int	   *unique = (int *) palloc0(sizeof(int) * nvalues);
+
+#ifdef STATS_CROSS_CHECK
+	TimestampTz		start_ts = GetCurrentTimestamp();
+#endif
+
+	/*
+	 * Build running count of unique values. We know there are unique[i]
+	 * unique values in values array up to index "i".
+	 */
+	unique[0] = 1;
+	for (int i = 1; i < nvalues; i++)
+	{
+		if (range_values_cmp(&values[i-1], &values[i], typcache) == 0)
+			unique[i] = unique[i-1];
+		else
+			unique[i] = unique[i-1] + 1;
+	}
+
+	nvalues_unique = unique[nvalues-1];
+
+	/*
+	 * Walk the ranges, for each range determine the first/last mapping
+	 * value. Use the "unique" array to count the unique values.
+	 */
+	for (int i = 0; i < ranges->nranges; i++)
+	{
+		int		start;
+		int		end;
+
+		start = lower_bound(values, nvalues, ranges->ranges[i].min_value, typcache);
+		end = upper_bound(values, nvalues, ranges->ranges[i].max_value, typcache);
+
+		Assert(end > start);
+
+		nmatches_value = (end - start);
+		nmatches_unique += (unique[end-1] - unique[start] + 1);
+
+		nmatches += nmatches_value;
+	}
+
+#ifdef STATS_CROSS_CHECK
+	elog(WARNING, "----- brin_minmax_match_tuples_to_ranges -----");
+	elog(WARNING, "nmatches = %d %f", nmatches, (double) nmatches / numrows);
+	elog(WARNING, "nmatches unique = %d %d %f", nmatches_unique, nvalues_unique,
+		 (double) nmatches_unique / nvalues_unique);
+	elog(WARNING, "duration = %ld", TimestampDifferenceMilliseconds(start_ts,
+									GetCurrentTimestamp()));
+#endif
+
+	*res_nmatches = nmatches;
+	*res_nmatches_unique = nmatches_unique;
+	*res_nvalues_unique = nvalues_unique;
+}
+
+#ifdef STATS_CROSS_CHECK
+/*
+ * brin_minmax_match_tuples_to_ranges2
+ *		Match tuples to ranges, count average number of ranges per tuple.
+ *
+ * Match sample tuples to the ranges, so that we can count how many ranges
+ * a value matches on average. This might seem redundant to the number of
+ * overlaps, because the value is ~avg_overlaps/2.
+ *
+ * Imagine ranges arranged in "shifted" uniformly by 1/overlaps, e.g. with 3
+ * overlaps [0,100], [33,133], [66, 166] and so on. A random value will hit
+ * only half of there ranges, thus 1/2. This can be extended to randomly
+ * overlapping ranges.
+ *
+ * However, we may not be able to count overlaps for some opclasses (e.g. for
+ * bloom ranges), in which case we have at least this.
+ *
+ * This simply walks the values, and determines matching ranges by looking
+ * for lower/upper bound in ranges ordered by minval/maxval.
+ *
+ * XXX The other question is what to do about duplicate values. If we have a
+ * very frequent value in the sample, it's likely in many places/ranges. Which
+ * will skew the average, because it'll be added repeatedly. So we also count
+ * avg_ranges for unique values.
+ *
+ * XXX The relationship that (average_matches ~ average_overlaps/2) only
+ * works for minmax opclass, and can't be extended to minmax-multi. The
+ * overlaps can only consider the two extreme values (essentially treating
+ * the summary as a single minmax range), because that's what brinsort
+ * needs. But the minmax-multi range may have "gaps" (kinda the whole point
+ * of these opclasses), which affects matching tuples to ranges.
+ *
+ * XXX This also builds histograms of the number of matches, both for the
+ * raw and unique values. At the moment we don't do anything with the
+ * results, though (except for printing those).
+ */
+static void
+brin_minmax_match_tuples_to_ranges2(BrinRanges *ranges,
+								    BrinRange **minranges, BrinRange **maxranges,
+								    int numrows, HeapTuple *rows,
+								    int nvalues, Datum *values,
+								    TypeCacheEntry *typcache,
+								    int *res_nmatches,
+								    int *res_nmatches_unique,
+								    int *res_nvalues_unique)
+{
+	int		nmatches = 0;
+	int		nmatches_unique = 0;
+	int		nvalues_unique = 0;
+	histogram_t *hist = histogram_init();
+	histogram_t *hist_unique = histogram_init();
+	int		nmatches_value = 0;
+
+	TimestampTz		start_ts = GetCurrentTimestamp();
+
+	for (int i = 0; i < nvalues; i++)
+	{
+		int		start;
+		int		end;
+
+		/*
+		 * Same value as preceding, so just use the preceding count.
+		 * We don't increment the unique counters, because this is
+		 * a duplicate.
+		 */
+		if ((i > 0) && (range_values_cmp(&values[i-1], &values[i], typcache) == 0))
+		{
+			nmatches += nmatches_value;
+			hist = histogram_add(hist, nmatches_value);
+			continue;
+		}
+
+		nmatches_value = 0;
+
+		start = maxval_start(maxranges, ranges->nranges, values[i], typcache);
+		end = minval_end(minranges, ranges->nranges, values[i], typcache);
+
+		for (int j = start; j < ranges->nranges; j++)
+		{
+			if (maxranges[j]->min_index >= end)
+				continue;
+
+			if (maxranges[j]->min_index_lowest >= end)
+				break;
+
+			nmatches_value++;
+		}
+
+		hist = histogram_add(hist, nmatches_value);
+		hist_unique = histogram_add(hist_unique, nmatches_value);
+
+		nmatches += nmatches_value;
+		nmatches_unique += nmatches_value;
+		nvalues_unique++;
+	}
+
+	elog(WARNING, "----- brin_minmax_match_tuples_to_ranges2 -----");
+	elog(WARNING, "nmatches = %d %f", nmatches, (double) nmatches / numrows);
+	elog(WARNING, "nmatches unique = %d %d %f",
+		 nmatches_unique, nvalues_unique, (double) nmatches_unique / nvalues_unique);
+	elog(WARNING, "duration = %ld", TimestampDifferenceMilliseconds(start_ts,
+									GetCurrentTimestamp()));
+
+	pg_qsort(hist->bins, hist->nbins, sizeof(histogram_bin_t), histogram_bin_cmp);
+	pg_qsort(hist_unique->bins, hist_unique->nbins, sizeof(histogram_bin_t), histogram_bin_cmp);
+
+	histogram_print(hist);
+	histogram_print(hist_unique);
+
+	pfree(hist);
+	pfree(hist_unique);
+
+	*res_nmatches = nmatches;
+	*res_nmatches_unique = nmatches_unique;
+	*res_nvalues_unique = nvalues_unique;
+}
+
+/*
+ * brin_minmax_match_tuples_to_ranges_bruteforce
+ *		Match tuples to ranges, count average number of ranges per tuple.
+ *
+ * Bruteforce approach, used mostly for cross-checking.
+ */
+static void
+brin_minmax_match_tuples_to_ranges_bruteforce(BrinRanges *ranges,
+											  int numrows, HeapTuple *rows,
+											  int nvalues, Datum *values,
+											  TypeCacheEntry *typcache,
+											  int *res_nmatches,
+											  int *res_nmatches_unique,
+											  int *res_nvalues_unique)
+{
+	int nmatches = 0;
+	int nmatches_unique = 0;
+	int nvalues_unique = 0;
+
+	TimestampTz		start_ts = GetCurrentTimestamp();
+
+	for (int i = 0; i < nvalues; i++)
+	{
+		bool	is_unique;
+		int		nmatches_value = 0;
+
+		/* is this a new value? */
+		is_unique = ((i == 0) || (range_values_cmp(&values[i-1], &values[i], typcache) != 0));
+
+		/* count unique values */
+		nvalues_unique += (is_unique) ? 1 : 0;
+
+		for (int j = 0; j < ranges->nranges; j++)
+		{
+			if (range_values_cmp(&values[i], &ranges->ranges[j].min_value, typcache) < 0)
+				continue;
+
+			if (range_values_cmp(&values[i], &ranges->ranges[j].max_value, typcache) > 0)
+				continue;
+
+			nmatches_value++;
+		}
+
+		nmatches += nmatches_value;
+		nmatches_unique += (is_unique) ? nmatches_value : 0;
+	}
+
+	elog(WARNING, "----- brin_minmax_match_tuples_to_ranges_bruteforce -----");
+	elog(WARNING, "nmatches = %d %f", nmatches, (double) nmatches / numrows);
+	elog(WARNING, "nmatches unique = %d %d %f", nmatches_unique, nvalues_unique,
+		 (double) nmatches_unique / nvalues_unique);
+	elog(WARNING, "duration = %ld", TimestampDifferenceMilliseconds(start_ts,
+									GetCurrentTimestamp()));
+
+	*res_nmatches = nmatches;
+	*res_nmatches_unique = nmatches_unique;
+	*res_nvalues_unique = nvalues_unique;
+}
+#endif
+
+/*
+ * brin_minmax_stats
+ *		Calculate custom statistics for a BRIN minmax index.
+ */
+Datum
+brin_minmax_stats(PG_FUNCTION_ARGS)
+{
+	Relation		heapRel = (Relation) PG_GETARG_POINTER(0);
+	Relation		indexRel = (Relation) PG_GETARG_POINTER(1);
+	AttrNumber		attnum = PG_GETARG_INT16(2);
+	AttrNumber		heap_attnum = PG_GETARG_INT16(3);
+	HeapTuple	   *rows = (HeapTuple *) PG_GETARG_POINTER(4);
+	int				numrows = PG_GETARG_INT32(5);
+
+	BrinOpaque *opaque;
+	BlockNumber nblocks;
+	BlockNumber	nranges;
+	BlockNumber	heapBlk;
+	BrinMemTuple *dtup;
+	BrinTuple  *btup = NULL;
+	Size		btupsz = 0;
+	Buffer		buf = InvalidBuffer;
+	BrinRanges  *ranges;
+	BlockNumber	pagesPerRange;
+	BrinDesc	   *bdesc;
+	BrinMinmaxStats *stats;
+
+	Oid				typoid;
+	TypeCacheEntry *typcache;
+	BrinRange	  **minranges,
+				  **maxranges;
+	int64			noverlaps;
+	int64			prev_min_index;
+
+	/*
+	 * Mostly what brinbeginscan does to initialize BrinOpaque, except that
+	 * we use active snapshot instead of the scan snapshot.
+	 */
+	opaque = palloc_object(BrinOpaque);
+	opaque->bo_rmAccess = brinRevmapInitialize(indexRel,
+											   &opaque->bo_pagesPerRange,
+											   GetActiveSnapshot());
+	opaque->bo_bdesc = brin_build_desc(indexRel);
+
+	bdesc = opaque->bo_bdesc;
+	pagesPerRange = opaque->bo_pagesPerRange;
+
+	/* make sure the provided attnum is valid */
+	Assert((attnum > 0) && (attnum <= bdesc->bd_tupdesc->natts));
+
+	/*
+	 * We need to know the size of the table so that we know how long to iterate
+	 * on the revmap (and to pre-allocate the arrays).
+	 */
+	nblocks = RelationGetNumberOfBlocks(heapRel);
+
+	/*
+	 * How many ranges can there be? We simply look at the number of pages,
+	 * divide it by the pages_per_range.
+	 *
+	 * XXX We need to be careful not to overflow nranges, so we just divide
+	 * and then maybe add 1 for partial ranges.
+	 */
+	nranges = (nblocks / pagesPerRange);
+	if (nblocks % pagesPerRange != 0)
+		nranges += 1;
+
+	/* allocate for space, and also for the alternative ordering */
+	ranges = palloc0(offsetof(BrinRanges, ranges) + nranges * sizeof(BrinRange));
+	ranges->nranges = 0;
+
+	/* allocate an initial in-memory tuple, out of the per-range memcxt */
+	dtup = brin_new_memtuple(bdesc);
+
+	/* result stats */
+	stats = palloc0(sizeof(BrinMinmaxStats));
+	SET_VARSIZE(stats, sizeof(BrinMinmaxStats));
+
+	/*
+	 * Now scan the revmap.  We start by querying for heap page 0,
+	 * incrementing by the number of pages per range; this gives us a full
+	 * view of the table.
+	 *
+	 * XXX We count the ranges, and count the special types (not summarized,
+	 * all-null and has-null). The regular ranges are accumulated into an
+	 * array, so that we can calculate additional statistics (overlaps, hits
+	 * for sample tuples, etc).
+	 *
+	 * XXX This needs rethinking to make it work with large indexes with more
+	 * ranges than we can fit into memory (work_mem/maintenance_work_mem).
+	 */
+	for (heapBlk = 0; heapBlk < nblocks; heapBlk += pagesPerRange)
+	{
+		bool		gottuple = false;
+		BrinTuple  *tup;
+		OffsetNumber off;
+		Size		size;
+
+		stats->n_ranges++;
+
+		CHECK_FOR_INTERRUPTS();
+
+		tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
+									   &off, &size, BUFFER_LOCK_SHARE,
+									   GetActiveSnapshot());
+		if (tup)
+		{
+			gottuple = true;
+			btup = brin_copy_tuple(tup, size, btup, &btupsz);
+			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+		}
+
+		/* Ranges with no indexed tuple are ignored for overlap analysis. */
+		if (!gottuple)
+		{
+			continue;
+		}
+		else
+		{
+			dtup = brin_deform_tuple(bdesc, btup, dtup);
+			if (dtup->bt_placeholder)
+			{
+				/* Placeholders can be ignored too, as if not summarized. */
+				continue;
+			}
+			else
+			{
+				BrinValues *bval;
+
+				bval = &dtup->bt_columns[attnum - 1];
+
+				/* OK this range is summarized */
+				stats->n_summarized++;
+
+				if (bval->bv_allnulls)
+					stats->n_all_nulls++;
+
+				if (bval->bv_hasnulls)
+					stats->n_has_nulls++;
+
+				if (!bval->bv_allnulls)
+				{
+					BrinRange  *range;
+
+					range = &ranges->ranges[ranges->nranges++];
+
+					range->blkno_start = heapBlk;
+					range->blkno_end = heapBlk + (pagesPerRange - 1);
+
+					range->min_value = bval->bv_values[0];
+					range->max_value = bval->bv_values[1];
+				}
+			}
+		}
+	}
+
+	if (buf != InvalidBuffer)
+		ReleaseBuffer(buf);
+
+	elog(WARNING, "extracted ranges %d from BRIN index", ranges->nranges);
+
+	/* if we have no regular ranges, we're done */
+	if (ranges->nranges == 0)
+		goto cleanup;
+
+	/*
+	 * Build auxiliary info to optimize the calculation.
+	 *
+	 * We have ranges in the blocknum order, but that is not very useful when
+	 * calculating which ranges interstect - we could cross-check every range
+	 * against every other range, but that's O(N^2) and thus may get extremely
+	 * expensive pretty quick).
+	 *
+	 * To make that cheaper, we'll build two orderings, allowing us to quickly
+	 * eliminate ranges that can't possibly overlap:
+	 *
+	 * - minranges = ranges ordered by min_value
+	 * - maxranges = ranges ordered by max_value
+	 *
+	 * To count intersections, we'll then walk maxranges (i.e. ranges ordered
+	 * by maxval), and for each following range we'll check if it overlaps.
+	 * If yes, we'll proceed to the next one, until we find a range that does
+	 * not overlap. But there might be a later page overlapping - but we can
+	 * use a min_index_lowest tracking the minimum min_index for "future"
+	 * ranges to quickly decide if there are such ranges. If there are none,
+	 * we can terminate (and proceed to the next maxranges element), else we
+	 * have to process additional ranges.
+	 *
+	 * Note: This only counts overlaps with ranges with max_value higher than
+	 * the current one - we want to count all, but the overlaps with preceding
+	 * ranges have already been counted when processing those preceding ranges.
+	 * That is, we'll end up with counting each overlap just for one of those
+	 * ranges, so we get only 1/2 the count.
+	 *
+	 * Note: We don't count the range as overlapping with itself. This needs
+	 * to be considered later, when applying the statistics.
+	 *
+	 *
+	 * XXX This will not work for very many ranges - we can have up to 2^32 of
+	 * them, so allocating a ~32B struct for each would need a lot of memory.
+	 * Not sure what to do about that, perhaps we could sample a couple ranges
+	 * and do some calculations based on that? That is, we could process all
+	 * ranges up to some number (say, statistics_target * 300, as for rows), and
+	 * then sample ranges for larger tables. Then sort the sampled ranges, and
+	 * walk through all ranges once, comparing them to the sample and counting
+	 * overlaps (having them sorted should allow making this quite efficient,
+	 * I think - following algorithm similar to the one implemented here).
+	 */
+
+	/* info about ordering for the data type */
+	typoid = get_atttype(RelationGetRelid(indexRel), attnum);
+	typcache = lookup_type_cache(typoid, TYPECACHE_CMP_PROC_FINFO);
+
+	/* shouldn't happen, I think - we use this to build the index */
+	Assert(OidIsValid(typcache->cmp_proc_finfo.fn_oid));
+
+	minranges = (BrinRange **) palloc0(ranges->nranges * sizeof(BrinRanges *));
+	maxranges = (BrinRange **) palloc0(ranges->nranges * sizeof(BrinRanges *));
+
+	/*
+	 * Build and sort the ranges min_value / max_value (just pointers
+	 * to the main array). Then go and assign the min_index to each
+	 * range, and finally walk the maxranges array backwards and track
+	 * the min_index_lowest as minimum of "future" indexes.
+	 */
+	for (int i = 0; i < ranges->nranges; i++)
+	{
+		minranges[i] = &ranges->ranges[i];
+		maxranges[i] = &ranges->ranges[i];
+	}
+
+	qsort_arg(minranges, ranges->nranges, sizeof(BrinRange *),
+			  range_minval_cmp, typcache);
+
+	qsort_arg(maxranges, ranges->nranges, sizeof(BrinRange *),
+			  range_maxval_cmp, typcache);
+
+	/*
+	 * Update the min_index for each range. If the values are equal, be sure to
+	 * pick the lowest index with that min_value.
+	 */
+	minranges[0]->min_index = 0;
+	for (int i = 1; i < ranges->nranges; i++)
+	{
+		if (range_values_cmp(&minranges[i]->min_value, &minranges[i-1]->min_value, typcache) == 0)
+			minranges[i]->min_index = minranges[i-1]->min_index;
+		else
+			minranges[i]->min_index = i;
+	}
+
+	/*
+	 * Walk the maxranges backward and assign the min_index_lowest as
+	 * a running minimum.
+	 */
+	prev_min_index = ranges->nranges;
+	for (int i = (ranges->nranges - 1); i >= 0; i--)
+	{
+		maxranges[i]->min_index_lowest = Min(maxranges[i]->min_index,
+											 prev_min_index);
+		prev_min_index = maxranges[i]->min_index_lowest;
+	}
+
+	noverlaps = brin_minmax_count_overlaps(minranges, ranges->nranges, typcache);
+
+	/* calculate average number of overlapping ranges for any range */
+	stats->avg_overlaps = (double) noverlaps / ranges->nranges;
+
+#ifdef STATS_CROSS_CHECK
+	brin_minmax_count_overlaps2(ranges, minranges, maxranges, typcache);
+	brin_minmax_count_overlaps_bruteforce(ranges, typcache);
+#endif
+
+	/* match tuples to ranges */
+	{
+		int		nvalues = 0;
+		int		nmatches,
+				nmatches_unique,
+				nvalues_unique;
+
+		Datum  *values = (Datum *) palloc0(numrows * sizeof(Datum));
+
+		TupleDesc	tdesc = RelationGetDescr(heapRel);
+
+		for (int i = 0; i < numrows; i++)
+		{
+			bool	isnull;
+			Datum	value;
+
+			value = heap_getattr(rows[i], heap_attnum, tdesc, &isnull);
+			if (!isnull)
+				values[nvalues++] = value;
+		}
+
+		qsort_arg(values, nvalues, sizeof(Datum), range_values_cmp, typcache);
+
+		/* optimized algorithm */
+		brin_minmax_match_tuples_to_ranges(ranges,
+										   numrows, rows, nvalues, values,
+										   typcache,
+										   &nmatches,
+										   &nmatches_unique,
+										   &nvalues_unique);
+
+		stats->avg_matches = (double) nmatches / numrows;
+		stats->avg_matches_unique = (double) nmatches_unique / nvalues_unique;
+
+#ifdef STATS_CROSS_CHECK
+		brin_minmax_match_tuples_to_ranges2(ranges, minranges, maxranges,
+										    numrows, rows, nvalues, values,
+										    typcache,
+										    &nmatches,
+										    &nmatches_unique,
+										    &nvalues_unique);
+
+		brin_minmax_match_tuples_to_ranges_bruteforce(ranges,
+													  numrows, rows,
+													  nvalues, values,
+													  typcache,
+													  &nmatches,
+													  &nmatches_unique,
+													  &nvalues_unique);
+#endif
+	}
+
+	/*
+	 * Possibly quite large, so release explicitly and don't rely
+	 * on the memory context to discard this.
+	 */
+	pfree(minranges);
+	pfree(maxranges);
+
+cleanup:
+	/* possibly quite large, so release explicitly */
+	pfree(ranges);
+
+	/* free the BrinOpaque, just like brinendscan() would */
+	brinRevmapTerminate(opaque->bo_rmAccess);
+	brin_free_desc(opaque->bo_bdesc);
+
+	PG_RETURN_POINTER(stats);
+}
+
 /*
  * Cache and return the procedure for the given strategy.
  *
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index ff1354812bd..b7435194dc0 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -16,6 +16,7 @@
 
 #include <math.h>
 
+#include "access/brin_internal.h"
 #include "access/detoast.h"
 #include "access/genam.h"
 #include "access/multixact.h"
@@ -30,6 +31,7 @@
 #include "catalog/catalog.h"
 #include "catalog/index.h"
 #include "catalog/indexing.h"
+#include "catalog/pg_am.h"
 #include "catalog/pg_collation.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
@@ -81,6 +83,7 @@ typedef struct AnlIndexData
 
 /* Default statistics target (GUC parameter) */
 int			default_statistics_target = 100;
+bool		enable_indexam_stats = false;
 
 /* A few variables that don't seem worth passing around as parameters */
 static MemoryContext anl_context = NULL;
@@ -92,7 +95,7 @@ static void do_analyze_rel(Relation onerel,
 						   AcquireSampleRowsFunc acquirefunc, BlockNumber relpages,
 						   bool inh, bool in_outer_xact, int elevel);
 static void compute_index_stats(Relation onerel, double totalrows,
-								AnlIndexData *indexdata, int nindexes,
+								AnlIndexData *indexdata, Relation *indexRels, int nindexes,
 								HeapTuple *rows, int numrows,
 								MemoryContext col_context);
 static VacAttrStats *examine_attribute(Relation onerel, int attnum,
@@ -454,15 +457,49 @@ do_analyze_rel(Relation onerel, VacuumParams *params,
 		{
 			AnlIndexData *thisdata = &indexdata[ind];
 			IndexInfo  *indexInfo;
+			bool		collectAmStats;
+			Oid			regproc;
 
 			thisdata->indexInfo = indexInfo = BuildIndexInfo(Irel[ind]);
 			thisdata->tupleFract = 1.0; /* fix later if partial */
-			if (indexInfo->ii_Expressions != NIL && va_cols == NIL)
+
+			/*
+			 * Should we collect AM-specific statistics for any of the columns?
+			 *
+			 * If AM-specific statistics are enabled (using a GUC), see if we
+			 * have an optional support procedure to build the statistics.
+			 *
+			 * If there's any such attribute, we just force building stats
+			 * even for regular index keys (not just expressions) and indexes
+			 * without predicates. It'd be good to only build the AM stats, but
+			 * for now this is good enough.
+			 *
+			 * XXX The GUC is there morestly to make it easier to enable/disable
+			 * this during development.
+			 *
+			 * FIXME Only build the AM statistics, not the other stats. And only
+			 * do that for the keys with the optional procedure. not all of them.
+			 */
+			collectAmStats = false;
+			if (enable_indexam_stats && (Irel[ind]->rd_indam->amstatsprocnum != 0))
+			{
+				for (int j = 0; j < indexInfo->ii_NumIndexAttrs; j++)
+				{
+					regproc = index_getprocid(Irel[ind], (j+1), Irel[ind]->rd_indam->amstatsprocnum);
+					if (OidIsValid(regproc))
+					{
+						collectAmStats = true;
+						break;
+					}
+				}
+			}
+
+			if ((indexInfo->ii_Expressions != NIL || collectAmStats) && va_cols == NIL)
 			{
 				ListCell   *indexpr_item = list_head(indexInfo->ii_Expressions);
 
 				thisdata->vacattrstats = (VacAttrStats **)
-					palloc(indexInfo->ii_NumIndexAttrs * sizeof(VacAttrStats *));
+					palloc0(indexInfo->ii_NumIndexAttrs * sizeof(VacAttrStats *));
 				tcnt = 0;
 				for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
 				{
@@ -483,6 +520,12 @@ do_analyze_rel(Relation onerel, VacuumParams *params,
 						if (thisdata->vacattrstats[tcnt] != NULL)
 							tcnt++;
 					}
+					else
+					{
+						thisdata->vacattrstats[tcnt] =
+							examine_attribute(Irel[ind], i + 1, NULL);
+						tcnt++;
+					}
 				}
 				thisdata->attr_cnt = tcnt;
 			}
@@ -588,7 +631,7 @@ do_analyze_rel(Relation onerel, VacuumParams *params,
 
 		if (nindexes > 0)
 			compute_index_stats(onerel, totalrows,
-								indexdata, nindexes,
+								indexdata, Irel, nindexes,
 								rows, numrows,
 								col_context);
 
@@ -822,12 +865,82 @@ do_analyze_rel(Relation onerel, VacuumParams *params,
 	anl_context = NULL;
 }
 
+/*
+ * compute_indexam_stats
+ *		Call the optional procedure to compute AM-specific statistics.
+ *
+ * We simply call the procedure, which is expected to produce a bytea value.
+ *
+ * At the moment this only deals with BRIN indexes, and bails out for other
+ * access methods, but it should be generic - use something like amoptsprocnum
+ * and just check if the procedure exists.
+ */
+static void
+compute_indexam_stats(Relation onerel,
+					  Relation indexRel, IndexInfo *indexInfo,
+					  double totalrows, AnlIndexData *indexdata,
+					  HeapTuple *rows, int numrows)
+{
+	if (!enable_indexam_stats)
+		return;
+
+	/* ignore index AMs without the optional procedure */
+	if (indexRel->rd_indam->amstatsprocnum == 0)
+		return;
+
+	/*
+	 * Look at attributes, and calculate stats for those that have the
+	 * optional stats proc for the opfamily.
+	 */
+	for (int i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
+	{
+		AttrNumber		attno = (i + 1);
+		AttrNumber		attnum = indexInfo->ii_IndexAttrNumbers[i];	/* heap attnum */
+		RegProcedure	regproc;
+		FmgrInfo	   *statsproc;
+		Datum			datum;
+		VacAttrStats   *stats;
+		MemoryContext	oldcxt;
+
+		/* do this first, as it doesn't fail when proc not defined */
+		regproc = index_getprocid(indexRel, attno, indexRel->rd_indam->amstatsprocnum);
+
+		/* ignore opclasses without the optional procedure */
+		if (!RegProcedureIsValid(regproc))
+			continue;
+
+		statsproc = index_getprocinfo(indexRel, attno, indexRel->rd_indam->amstatsprocnum);
+
+		stats = indexdata->vacattrstats[i];
+
+		if (statsproc != NULL)
+			elog(WARNING, "collecting stats on BRIN ranges %p using proc %p attnum %d",
+				 indexRel, statsproc, attno);
+
+		oldcxt = MemoryContextSwitchTo(stats->anl_context);
+
+		/* call the proc, let the AM calculate whatever it wants */
+		datum = FunctionCall6Coll(statsproc,
+								  InvalidOid, /* FIXME correct collation */
+								  PointerGetDatum(onerel),
+								  PointerGetDatum(indexRel),
+								  Int16GetDatum(attno),
+								  Int16GetDatum(attnum),
+								  PointerGetDatum(rows),
+								  Int32GetDatum(numrows));
+
+		stats->staindexam = datum;
+
+		MemoryContextSwitchTo(oldcxt);
+	}
+}
+
 /*
  * Compute statistics about indexes of a relation
  */
 static void
 compute_index_stats(Relation onerel, double totalrows,
-					AnlIndexData *indexdata, int nindexes,
+					AnlIndexData *indexdata, Relation *indexRels, int nindexes,
 					HeapTuple *rows, int numrows,
 					MemoryContext col_context)
 {
@@ -847,6 +960,7 @@ compute_index_stats(Relation onerel, double totalrows,
 	{
 		AnlIndexData *thisdata = &indexdata[ind];
 		IndexInfo  *indexInfo = thisdata->indexInfo;
+		Relation	indexRel = indexRels[ind];
 		int			attr_cnt = thisdata->attr_cnt;
 		TupleTableSlot *slot;
 		EState	   *estate;
@@ -859,6 +973,13 @@ compute_index_stats(Relation onerel, double totalrows,
 					rowno;
 		double		totalindexrows;
 
+		/*
+		 * If this is a BRIN index, try calling a procedure to collect
+		 * extra opfamily-specific statistics (if procedure defined).
+		 */
+		compute_indexam_stats(onerel, indexRel, indexInfo, totalrows,
+							  thisdata, rows, numrows);
+
 		/* Ignore index if no columns to analyze and not partial */
 		if (attr_cnt == 0 && indexInfo->ii_Predicate == NIL)
 			continue;
@@ -1661,6 +1782,13 @@ update_attstats(Oid relid, bool inh, int natts, VacAttrStats **vacattrstats)
 		values[Anum_pg_statistic_stanullfrac - 1] = Float4GetDatum(stats->stanullfrac);
 		values[Anum_pg_statistic_stawidth - 1] = Int32GetDatum(stats->stawidth);
 		values[Anum_pg_statistic_stadistinct - 1] = Float4GetDatum(stats->stadistinct);
+
+		/* optional AM-specific stats */
+		if (DatumGetPointer(stats->staindexam) != NULL)
+			values[Anum_pg_statistic_staindexam - 1] = stats->staindexam;
+		else
+			nulls[Anum_pg_statistic_staindexam - 1] = true;
+
 		i = Anum_pg_statistic_stakind1 - 1;
 		for (k = 0; k < STATISTIC_NUM_SLOTS; k++)
 		{
diff --git a/src/backend/utils/adt/selfuncs.c b/src/backend/utils/adt/selfuncs.c
index 69e0fb98f5b..dec1eb43e1f 100644
--- a/src/backend/utils/adt/selfuncs.c
+++ b/src/backend/utils/adt/selfuncs.c
@@ -7715,6 +7715,7 @@ brincostestimate(PlannerInfo *root, IndexPath *path, double loop_count,
 	Relation	indexRel;
 	ListCell   *l;
 	VariableStatData vardata;
+	double		averageOverlaps;
 
 	Assert(rte->rtekind == RTE_RELATION);
 
@@ -7762,6 +7763,7 @@ brincostestimate(PlannerInfo *root, IndexPath *path, double loop_count,
 	 * correlation statistics, we will keep it as 0.
 	 */
 	*indexCorrelation = 0;
+	averageOverlaps = 0.0;
 
 	foreach(l, path->indexclauses)
 	{
@@ -7771,6 +7773,36 @@ brincostestimate(PlannerInfo *root, IndexPath *path, double loop_count,
 		/* attempt to lookup stats in relation for this index column */
 		if (attnum != 0)
 		{
+			/*
+			 * If AM-specific statistics are enabled, try looking up the stats
+			 * for the index key. We only have this for minmax opclasses, so
+			 * we just cast it like that. But other BRIN opclasses might need
+			 * other stats so either we need to abstract this somehow, or maybe
+			 * just collect a sufficiently generic stats for all BRIN indexes.
+			 *
+			 * XXX Make this non-minmax specific.
+			 */
+			if (enable_indexam_stats)
+			{
+				BrinMinmaxStats  *amstats
+					= (BrinMinmaxStats *) get_attindexam(index->indexoid, attnum);
+
+				if (amstats)
+				{
+					elog(WARNING, "found AM stats: attnum %d n_ranges %ld n_summarized %ld n_all_nulls %ld n_has_nulls %ld avg_overlaps %f",
+						 attnum, amstats->n_ranges, amstats->n_summarized,
+						 amstats->n_all_nulls, amstats->n_has_nulls,
+						 amstats->avg_overlaps);
+
+					/*
+					 * The only thing we use at the moment is the average number
+					 * of overlaps for a single range. Use the other stuff too.
+					 */
+					averageOverlaps = Max(averageOverlaps,
+										  1.0 + amstats->avg_overlaps);
+				}
+			}
+
 			/* Simple variable -- look to stats for the underlying table */
 			if (get_relation_stats_hook &&
 				(*get_relation_stats_hook) (root, rte, attnum, &vardata))
@@ -7851,6 +7883,14 @@ brincostestimate(PlannerInfo *root, IndexPath *path, double loop_count,
 											 baserel->relid,
 											 JOIN_INNER, NULL);
 
+	/*
+	 * XXX Can we combine qualSelectivity with the average number of matching
+	 * ranges per value? qualSelectivity estimates how many tuples ar we
+	 * going to match, and average number of matches says how many ranges
+	 * will each of those match on average. We don't know how many will
+	 * be duplicate, but it gives us a worst-case estimate, at least.
+	 */
+
 	/*
 	 * Now calculate the minimum possible ranges we could match with if all of
 	 * the rows were in the perfect order in the table's heap.
@@ -7867,6 +7907,25 @@ brincostestimate(PlannerInfo *root, IndexPath *path, double loop_count,
 	else
 		estimatedRanges = Min(minimalRanges / *indexCorrelation, indexRanges);
 
+	elog(WARNING, "before index AM stats: cestimatedRanges = %f", estimatedRanges);
+
+	/*
+	 * If we found some AM stats, look at average number of overlapping ranges,
+	 * and apply that to the currently estimated ranges.
+	 *
+	 * XXX We pretty much combine this with correlation info (because it was
+	 * already applied in the estimatedRanges formula above), which might be
+	 * overly pessimistic. The overlaps stats seems somewhat redundant with
+	 * the correlation, so maybe we should do just one? The AM stats seems
+	 * like a more reliable information, because the correlation is not very
+	 * sensitive to outliers, for example. So maybe let's prefer that, and
+	 * only use the correlation as fallback when AM stats are not available?
+	 */
+	if (averageOverlaps > 0.0)
+		estimatedRanges = Min(estimatedRanges * averageOverlaps, indexRanges);
+
+	elog(WARNING, "after index AM stats: cestimatedRanges = %f", estimatedRanges);
+
 	/* we expect to visit this portion of the table */
 	selec = estimatedRanges / indexRanges;
 
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index a16a63f4957..1725f5af347 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -3138,6 +3138,47 @@ get_attavgwidth(Oid relid, AttrNumber attnum)
 	return 0;
 }
 
+
+/*
+ * get_attstaindexam
+ *
+ *	  Given the table and attribute number of a column, get the index AM
+ *	  statistics.  Return NULL if no data available.
+ *
+ * Currently this is only consulted for individual tables, not for inheritance
+ * trees, so we don't need an "inh" parameter.
+ */
+bytea *
+get_attindexam(Oid relid, AttrNumber attnum)
+{
+	HeapTuple	tp;
+
+	tp = SearchSysCache3(STATRELATTINH,
+						 ObjectIdGetDatum(relid),
+						 Int16GetDatum(attnum),
+						 BoolGetDatum(false));
+	if (HeapTupleIsValid(tp))
+	{
+		Datum	val;
+		bytea  *retval = NULL;
+		bool	isnull;
+
+		val = SysCacheGetAttr(STATRELATTINH, tp,
+							  Anum_pg_statistic_staindexam,
+							  &isnull);
+
+		if (!isnull)
+			retval = (bytea *) PG_DETOAST_DATUM(val);
+
+		// staindexam = ((Form_pg_statistic) GETSTRUCT(tp))->staindexam;
+		ReleaseSysCache(tp);
+
+		return retval;
+	}
+
+	return NULL;
+}
+
 /*
  * get_attstatsslot
  *
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 05ab087934c..06dfeb6cd8b 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -967,6 +967,16 @@ struct config_bool ConfigureNamesBool[] =
 		true,
 		NULL, NULL, NULL
 	},
+	{
+		{"enable_indexam_stats", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("Enables the planner's use of index AM stats."),
+			NULL,
+			GUC_EXPLAIN
+		},
+		&enable_indexam_stats,
+		false,
+		NULL, NULL, NULL
+	},
 	{
 		{"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
 			gettext_noop("Enables genetic query optimization."),
diff --git a/src/include/access/amapi.h b/src/include/access/amapi.h
index 1dc674d2305..8437c2f0e71 100644
--- a/src/include/access/amapi.h
+++ b/src/include/access/amapi.h
@@ -216,6 +216,8 @@ typedef struct IndexAmRoutine
 	uint16		amsupport;
 	/* opclass options support function number or 0 */
 	uint16		amoptsprocnum;
+	/* opclass statistics support function number or 0 */
+	uint16		amstatsprocnum;
 	/* does AM support ORDER BY indexed column's value? */
 	bool		amcanorder;
 	/* does AM support ORDER BY result of an operator on indexed column? */
diff --git a/src/include/access/brin.h b/src/include/access/brin.h
index 887fb0a5532..0989bfa96cd 100644
--- a/src/include/access/brin.h
+++ b/src/include/access/brin.h
@@ -34,6 +34,52 @@ typedef struct BrinStatsData
 	BlockNumber revmapNumPages;
 } BrinStatsData;
 
+/*
+ * Info about ranges for BRIN Sort.
+ */
+typedef struct BrinRange
+{
+	BlockNumber blkno_start;
+	BlockNumber blkno_end;
+
+	Datum	min_value;
+	Datum	max_value;
+	bool	has_nulls;
+	bool	all_nulls;
+	bool	not_summarized;
+
+	/*
+	 * Index of the range when ordered by min_value (if there are multiple
+	 * ranges with the same min_value, it's the lowest one).
+	 */
+	uint32	min_index;
+
+	/*
+	 * Minimum min_index from all ranges with higher max_value (i.e. when
+	 * sorted by max_value). If there are multiple ranges with the same
+	 * max_value, it depends on the ordering (i.e. the ranges may get
+	 * different min_index_lowest, depending on the exact ordering).
+	 */
+	uint32	min_index_lowest;
+} BrinRange;
+
+typedef struct BrinRanges
+{
+	int			nranges;
+	BrinRange	ranges[FLEXIBLE_ARRAY_MEMBER];
+} BrinRanges;
+
+typedef struct BrinMinmaxStats
+{
+	int32		vl_len_;		/* varlena header (do not touch directly!) */
+	int64		n_ranges;
+	int64		n_summarized;
+	int64		n_all_nulls;
+	int64		n_has_nulls;
+	double		avg_overlaps;
+	double		avg_matches;
+	double		avg_matches_unique;
+} BrinMinmaxStats;
 
 #define BRIN_DEFAULT_PAGES_PER_RANGE	128
 #define BrinGetPagesPerRange(relation) \
diff --git a/src/include/access/brin_internal.h b/src/include/access/brin_internal.h
index 25186609272..ee6c6f9b709 100644
--- a/src/include/access/brin_internal.h
+++ b/src/include/access/brin_internal.h
@@ -73,6 +73,7 @@ typedef struct BrinDesc
 #define BRIN_PROCNUM_UNION			4
 #define BRIN_MANDATORY_NPROCS		4
 #define BRIN_PROCNUM_OPTIONS 		5	/* optional */
+#define BRIN_PROCNUM_STATISTICS		6	/* optional */
 /* procedure numbers up to 10 are reserved for BRIN future expansion */
 #define BRIN_FIRST_OPTIONAL_PROCNUM 11
 #define BRIN_LAST_OPTIONAL_PROCNUM	15
diff --git a/src/include/catalog/pg_amproc.dat b/src/include/catalog/pg_amproc.dat
index 4cc129bebd8..ea3de9bcba1 100644
--- a/src/include/catalog/pg_amproc.dat
+++ b/src/include/catalog/pg_amproc.dat
@@ -804,6 +804,8 @@
   amproc => 'brin_minmax_consistent' },
 { amprocfamily => 'brin/bytea_minmax_ops', amproclefttype => 'bytea',
   amprocrighttype => 'bytea', amprocnum => '4', amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/bytea_minmax_ops', amproclefttype => 'bytea',
+  amprocrighttype => 'bytea', amprocnum => '6', amproc => 'brin_minmax_stats' },
 
 # bloom bytea
 { amprocfamily => 'brin/bytea_bloom_ops', amproclefttype => 'bytea',
@@ -835,6 +837,8 @@
   amproc => 'brin_minmax_consistent' },
 { amprocfamily => 'brin/char_minmax_ops', amproclefttype => 'char',
   amprocrighttype => 'char', amprocnum => '4', amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/char_minmax_ops', amproclefttype => 'char',
+  amprocrighttype => 'char', amprocnum => '6', amproc => 'brin_minmax_stats' },
 
 # bloom "char"
 { amprocfamily => 'brin/char_bloom_ops', amproclefttype => 'char',
@@ -864,6 +868,8 @@
   amproc => 'brin_minmax_consistent' },
 { amprocfamily => 'brin/name_minmax_ops', amproclefttype => 'name',
   amprocrighttype => 'name', amprocnum => '4', amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/name_minmax_ops', amproclefttype => 'name',
+  amprocrighttype => 'name', amprocnum => '6', amproc => 'brin_minmax_stats' },
 
 # bloom name
 { amprocfamily => 'brin/name_bloom_ops', amproclefttype => 'name',
@@ -893,6 +899,8 @@
   amproc => 'brin_minmax_consistent' },
 { amprocfamily => 'brin/integer_minmax_ops', amproclefttype => 'int8',
   amprocrighttype => 'int8', amprocnum => '4', amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/integer_minmax_ops', amproclefttype => 'int8',
+  amprocrighttype => 'int8', amprocnum => '6', amproc => 'brin_minmax_stats' },
 
 { amprocfamily => 'brin/integer_minmax_ops', amproclefttype => 'int2',
   amprocrighttype => 'int2', amprocnum => '1',
@@ -905,6 +913,8 @@
   amproc => 'brin_minmax_consistent' },
 { amprocfamily => 'brin/integer_minmax_ops', amproclefttype => 'int2',
   amprocrighttype => 'int2', amprocnum => '4', amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/integer_minmax_ops', amproclefttype => 'int2',
+  amprocrighttype => 'int2', amprocnum => '6', amproc => 'brin_minmax_stats' },
 
 { amprocfamily => 'brin/integer_minmax_ops', amproclefttype => 'int4',
   amprocrighttype => 'int4', amprocnum => '1',
@@ -917,6 +927,8 @@
   amproc => 'brin_minmax_consistent' },
 { amprocfamily => 'brin/integer_minmax_ops', amproclefttype => 'int4',
   amprocrighttype => 'int4', amprocnum => '4', amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/integer_minmax_ops', amproclefttype => 'int4',
+  amprocrighttype => 'int4', amprocnum => '6', amproc => 'brin_minmax_stats' },
 
 # minmax multi integer: int2, int4, int8
 { amprocfamily => 'brin/integer_minmax_multi_ops', amproclefttype => 'int2',
@@ -1034,6 +1046,8 @@
   amproc => 'brin_minmax_consistent' },
 { amprocfamily => 'brin/text_minmax_ops', amproclefttype => 'text',
   amprocrighttype => 'text', amprocnum => '4', amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/text_minmax_ops', amproclefttype => 'text',
+  amprocrighttype => 'text', amprocnum => '6', amproc => 'brin_minmax_stats' },
 
 # bloom text
 { amprocfamily => 'brin/text_bloom_ops', amproclefttype => 'text',
@@ -1062,6 +1076,8 @@
   amproc => 'brin_minmax_consistent' },
 { amprocfamily => 'brin/oid_minmax_ops', amproclefttype => 'oid',
   amprocrighttype => 'oid', amprocnum => '4', amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/oid_minmax_ops', amproclefttype => 'oid',
+  amprocrighttype => 'oid', amprocnum => '6', amproc => 'brin_minmax_stats' },
 
 # minmax multi oid
 { amprocfamily => 'brin/oid_minmax_multi_ops', amproclefttype => 'oid',
@@ -1110,6 +1126,8 @@
   amproc => 'brin_minmax_consistent' },
 { amprocfamily => 'brin/tid_minmax_ops', amproclefttype => 'tid',
   amprocrighttype => 'tid', amprocnum => '4', amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/tid_minmax_ops', amproclefttype => 'tid',
+  amprocrighttype => 'tid', amprocnum => '6', amproc => 'brin_minmax_stats' },
 
 # bloom tid
 { amprocfamily => 'brin/tid_bloom_ops', amproclefttype => 'tid',
@@ -1160,6 +1178,9 @@
 { amprocfamily => 'brin/float_minmax_ops', amproclefttype => 'float4',
   amprocrighttype => 'float4', amprocnum => '4',
   amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/float_minmax_ops', amproclefttype => 'float4',
+  amprocrighttype => 'float4', amprocnum => '6',
+  amproc => 'brin_minmax_stats' },
 
 { amprocfamily => 'brin/float_minmax_ops', amproclefttype => 'float8',
   amprocrighttype => 'float8', amprocnum => '1',
@@ -1173,6 +1194,9 @@
 { amprocfamily => 'brin/float_minmax_ops', amproclefttype => 'float8',
   amprocrighttype => 'float8', amprocnum => '4',
   amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/float_minmax_ops', amproclefttype => 'float8',
+  amprocrighttype => 'float8', amprocnum => '6',
+  amproc => 'brin_minmax_stats' },
 
 # minmax multi float
 { amprocfamily => 'brin/float_minmax_multi_ops', amproclefttype => 'float4',
@@ -1261,6 +1285,9 @@
 { amprocfamily => 'brin/macaddr_minmax_ops', amproclefttype => 'macaddr',
   amprocrighttype => 'macaddr', amprocnum => '4',
   amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/macaddr_minmax_ops', amproclefttype => 'macaddr',
+  amprocrighttype => 'macaddr', amprocnum => '6',
+  amproc => 'brin_minmax_stats' },
 
 # minmax multi macaddr
 { amprocfamily => 'brin/macaddr_minmax_multi_ops', amproclefttype => 'macaddr',
@@ -1314,6 +1341,9 @@
 { amprocfamily => 'brin/macaddr8_minmax_ops', amproclefttype => 'macaddr8',
   amprocrighttype => 'macaddr8', amprocnum => '4',
   amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/macaddr8_minmax_ops', amproclefttype => 'macaddr8',
+  amprocrighttype => 'macaddr8', amprocnum => '6',
+  amproc => 'brin_minmax_stats' },
 
 # minmax multi macaddr8
 { amprocfamily => 'brin/macaddr8_minmax_multi_ops',
@@ -1366,6 +1396,8 @@
   amproc => 'brin_minmax_consistent' },
 { amprocfamily => 'brin/network_minmax_ops', amproclefttype => 'inet',
   amprocrighttype => 'inet', amprocnum => '4', amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/network_minmax_ops', amproclefttype => 'inet',
+  amprocrighttype => 'inet', amprocnum => '6', amproc => 'brin_minmax_stats' },
 
 # minmax multi inet
 { amprocfamily => 'brin/network_minmax_multi_ops', amproclefttype => 'inet',
@@ -1436,6 +1468,9 @@
 { amprocfamily => 'brin/bpchar_minmax_ops', amproclefttype => 'bpchar',
   amprocrighttype => 'bpchar', amprocnum => '4',
   amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/bpchar_minmax_ops', amproclefttype => 'bpchar',
+  amprocrighttype => 'bpchar', amprocnum => '6',
+  amproc => 'brin_minmax_stats' },
 
 # bloom character
 { amprocfamily => 'brin/bpchar_bloom_ops', amproclefttype => 'bpchar',
@@ -1467,6 +1502,8 @@
   amproc => 'brin_minmax_consistent' },
 { amprocfamily => 'brin/time_minmax_ops', amproclefttype => 'time',
   amprocrighttype => 'time', amprocnum => '4', amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/time_minmax_ops', amproclefttype => 'time',
+  amprocrighttype => 'time', amprocnum => '6', amproc => 'brin_minmax_stats' },
 
 # minmax multi time without time zone
 { amprocfamily => 'brin/time_minmax_multi_ops', amproclefttype => 'time',
@@ -1517,6 +1554,9 @@
 { amprocfamily => 'brin/datetime_minmax_ops', amproclefttype => 'timestamp',
   amprocrighttype => 'timestamp', amprocnum => '4',
   amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/datetime_minmax_ops', amproclefttype => 'timestamp',
+  amprocrighttype => 'timestamp', amprocnum => '6',
+  amproc => 'brin_minmax_stats' },
 
 { amprocfamily => 'brin/datetime_minmax_ops', amproclefttype => 'timestamptz',
   amprocrighttype => 'timestamptz', amprocnum => '1',
@@ -1530,6 +1570,9 @@
 { amprocfamily => 'brin/datetime_minmax_ops', amproclefttype => 'timestamptz',
   amprocrighttype => 'timestamptz', amprocnum => '4',
   amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/datetime_minmax_ops', amproclefttype => 'timestamptz',
+  amprocrighttype => 'timestamptz', amprocnum => '6',
+  amproc => 'brin_minmax_stats' },
 
 { amprocfamily => 'brin/datetime_minmax_ops', amproclefttype => 'date',
   amprocrighttype => 'date', amprocnum => '1',
@@ -1542,6 +1585,8 @@
   amproc => 'brin_minmax_consistent' },
 { amprocfamily => 'brin/datetime_minmax_ops', amproclefttype => 'date',
   amprocrighttype => 'date', amprocnum => '4', amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/datetime_minmax_ops', amproclefttype => 'date',
+  amprocrighttype => 'date', amprocnum => '6', amproc => 'brin_minmax_stats' },
 
 # minmax multi datetime (date, timestamp, timestamptz)
 { amprocfamily => 'brin/datetime_minmax_multi_ops',
@@ -1668,6 +1713,9 @@
 { amprocfamily => 'brin/interval_minmax_ops', amproclefttype => 'interval',
   amprocrighttype => 'interval', amprocnum => '4',
   amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/interval_minmax_ops', amproclefttype => 'interval',
+  amprocrighttype => 'interval', amprocnum => '6',
+  amproc => 'brin_minmax_stats' },
 
 # minmax multi interval
 { amprocfamily => 'brin/interval_minmax_multi_ops',
@@ -1721,6 +1769,9 @@
 { amprocfamily => 'brin/timetz_minmax_ops', amproclefttype => 'timetz',
   amprocrighttype => 'timetz', amprocnum => '4',
   amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/timetz_minmax_ops', amproclefttype => 'timetz',
+  amprocrighttype => 'timetz', amprocnum => '6',
+  amproc => 'brin_minmax_stats' },
 
 # minmax multi time with time zone
 { amprocfamily => 'brin/timetz_minmax_multi_ops', amproclefttype => 'timetz',
@@ -1771,6 +1822,8 @@
   amproc => 'brin_minmax_consistent' },
 { amprocfamily => 'brin/bit_minmax_ops', amproclefttype => 'bit',
   amprocrighttype => 'bit', amprocnum => '4', amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/bit_minmax_ops', amproclefttype => 'bit',
+  amprocrighttype => 'bit', amprocnum => '6', amproc => 'brin_minmax_stats' },
 
 # minmax bit varying
 { amprocfamily => 'brin/varbit_minmax_ops', amproclefttype => 'varbit',
@@ -1785,6 +1838,9 @@
 { amprocfamily => 'brin/varbit_minmax_ops', amproclefttype => 'varbit',
   amprocrighttype => 'varbit', amprocnum => '4',
   amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/varbit_minmax_ops', amproclefttype => 'varbit',
+  amprocrighttype => 'varbit', amprocnum => '6',
+  amproc => 'brin_minmax_stats' },
 
 # minmax numeric
 { amprocfamily => 'brin/numeric_minmax_ops', amproclefttype => 'numeric',
@@ -1799,6 +1855,9 @@
 { amprocfamily => 'brin/numeric_minmax_ops', amproclefttype => 'numeric',
   amprocrighttype => 'numeric', amprocnum => '4',
   amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/numeric_minmax_ops', amproclefttype => 'numeric',
+  amprocrighttype => 'numeric', amprocnum => '6',
+  amproc => 'brin_minmax_stats' },
 
 # minmax multi numeric
 { amprocfamily => 'brin/numeric_minmax_multi_ops', amproclefttype => 'numeric',
@@ -1851,6 +1910,8 @@
   amproc => 'brin_minmax_consistent' },
 { amprocfamily => 'brin/uuid_minmax_ops', amproclefttype => 'uuid',
   amprocrighttype => 'uuid', amprocnum => '4', amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/uuid_minmax_ops', amproclefttype => 'uuid',
+  amprocrighttype => 'uuid', amprocnum => '6', amproc => 'brin_minmax_stats' },
 
 # minmax multi uuid
 { amprocfamily => 'brin/uuid_minmax_multi_ops', amproclefttype => 'uuid',
@@ -1924,6 +1985,9 @@
 { amprocfamily => 'brin/pg_lsn_minmax_ops', amproclefttype => 'pg_lsn',
   amprocrighttype => 'pg_lsn', amprocnum => '4',
   amproc => 'brin_minmax_union' },
+{ amprocfamily => 'brin/pg_lsn_minmax_ops', amproclefttype => 'pg_lsn',
+  amprocrighttype => 'pg_lsn', amprocnum => '6',
+  amproc => 'brin_minmax_stats' },
 
 # minmax multi pg_lsn
 { amprocfamily => 'brin/pg_lsn_minmax_multi_ops', amproclefttype => 'pg_lsn',
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 62a5b8e655d..1dd9177b01c 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -8407,6 +8407,10 @@
 { oid => '3386', descr => 'BRIN minmax support',
   proname => 'brin_minmax_union', prorettype => 'bool',
   proargtypes => 'internal internal internal', prosrc => 'brin_minmax_union' },
+{ oid => '9979', descr => 'BRIN minmax support',
+  proname => 'brin_minmax_stats', prorettype => 'bool',
+  proargtypes => 'internal internal int2 int2 internal int4',
+  prosrc => 'brin_minmax_stats' },
 
 # BRIN minmax multi
 { oid => '4616', descr => 'BRIN multi minmax support',
diff --git a/src/include/catalog/pg_statistic.h b/src/include/catalog/pg_statistic.h
index cdf74481398..7043b169f7c 100644
--- a/src/include/catalog/pg_statistic.h
+++ b/src/include/catalog/pg_statistic.h
@@ -121,6 +121,11 @@ CATALOG(pg_statistic,2619,StatisticRelationId)
 	anyarray	stavalues3;
 	anyarray	stavalues4;
 	anyarray	stavalues5;
+
+	/*
+	 * Statistics calculated by index AM (e.g. BRIN for ranges, etc.).
+	 */
+	bytea		staindexam;
 #endif
 } FormData_pg_statistic;
 
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 5d816ba7f4e..319f7d4aadc 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -155,6 +155,7 @@ typedef struct VacAttrStats
 	float4	   *stanumbers[STATISTIC_NUM_SLOTS];
 	int			numvalues[STATISTIC_NUM_SLOTS];
 	Datum	   *stavalues[STATISTIC_NUM_SLOTS];
+	Datum		staindexam;		/* index-specific stats (as bytea) */
 
 	/*
 	 * These fields describe the stavalues[n] element types. They will be
@@ -258,6 +259,7 @@ extern PGDLLIMPORT int vacuum_multixact_freeze_min_age;
 extern PGDLLIMPORT int vacuum_multixact_freeze_table_age;
 extern PGDLLIMPORT int vacuum_failsafe_age;
 extern PGDLLIMPORT int vacuum_multixact_failsafe_age;
+extern PGDLLIMPORT bool enable_indexam_stats;
 
 /* Variables for cost-based parallel vacuum */
 extern PGDLLIMPORT pg_atomic_uint32 *VacuumSharedCostBalance;
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index 50f02883052..71ce5b15d74 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -185,6 +185,7 @@ extern Oid	getBaseType(Oid typid);
 extern Oid	getBaseTypeAndTypmod(Oid typid, int32 *typmod);
 extern int32 get_typavgwidth(Oid typid, int32 typmod);
 extern int32 get_attavgwidth(Oid relid, AttrNumber attnum);
+extern bytea *get_attindexam(Oid relid, AttrNumber attnum);
 extern bool get_attstatsslot(AttStatsSlot *sslot, HeapTuple statstuple,
 							 int reqkind, Oid reqop, int flags);
 extern void free_attstatsslot(AttStatsSlot *sslot);

Reply via email to