On 15/03/2019 20:25, Andrey Borodin wrote:
11 марта 2019 г., в 20:03, Heikki Linnakangas <hlinn...@iki.fi>
написал(а):

On 10/03/2019 18:40, Andrey Borodin wrote:
One thing still bothers me. Let's assume that we have internal
page with 2 deletable leaves. We lock these leaves in order of
items on internal page. Is it possible that 2nd page have
follow-right link on 1st and someone will lock 2nd page, try to
lock 1st and deadlock with VACUUM?

Hmm. If the follow-right flag is set on a page, it means that its
right sibling doesn't have a downlink in the parent yet.
Nevertheless, I think I'd sleep better, if we acquired the locks in
left-to-right order, to be safe.
>
Actually, I did not found lock coupling in GiST code. But I decided
to lock just two pages at once (leaf, then parent, for every pair).
PFA v22 with this concurrency logic.

Good. I just noticed, that the README actually does say explicitly, that the child must be locked before the parent.

I rebased this over the new IntegerSet implementation, from the other thread, and did another round of refactoring, cleanups, etc. Attached is a new version of this patch. I'm also including the IntegerSet patch here, for convenience, but it's the same patch I posted at [1].

It's in pretty good shape, but one remaining issue that needs to be fixed:

During Hot Standby, the B-tree code writes a WAL reord, when a deleted page is recycled, to prevent the deletion from being replayed too early in the hot standby. See _bt_getbuf() and btree_xlog_reuse_page(). I think we need to do something similar in GiST.

I'll try fixing that tomorrow, unless you beat me to it. Making the changes is pretty straightforward, but it's a bit cumbersome to test.

[1] https://www.postgresql.org/message-id/1035d8e6-cfd1-0c27-8902-40d8d45eb...@iki.fi

- Heikki
>From 4c05c69020334babdc1aa406c5032ae2861e5cb5 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Wed, 20 Mar 2019 02:26:08 +0200
Subject: [PATCH 1/2] Add IntegerSet, to hold large sets of 64-bit ints
 efficiently.

The set is implemented as a B-tree, with a compact representation at leaf
items, using Simple-8b algorithm, so that clusters of nearby values take
less space.

This doesn't include any use of the code yet, but we have two patches in
the works that would benefit from this:

* the GiST vacuum patch, to track empty GiST pages and internal GiST pages.

* Reducing memory usage, and also allowing more than 1 GB of memory to be
  used, to hold the dead TIDs in VACUUM.

This includes a unit test module, in src/test/modules/test_integerset.
It can be used to verify correctness, as a regression test, but if you run
it manully, it can also print memory usage and execution time of some of
the tests.

Author: Heikki Linnakangas, Andrey Borodin
Discussion: https://www.postgresql.org/message-id/b5e82599-1966-5783-733c-1a947ddb7...@iki.fi
---
 src/backend/lib/Makefile                      |    2 +-
 src/backend/lib/README                        |    2 +
 src/backend/lib/integerset.c                  | 1039 +++++++++++++++++
 src/include/lib/integerset.h                  |   25 +
 src/test/modules/Makefile                     |    1 +
 src/test/modules/test_integerset/.gitignore   |    4 +
 src/test/modules/test_integerset/Makefile     |   21 +
 src/test/modules/test_integerset/README       |    7 +
 .../expected/test_integerset.out              |   14 +
 .../test_integerset/sql/test_integerset.sql   |   11 +
 .../test_integerset/test_integerset--1.0.sql  |    8 +
 .../modules/test_integerset/test_integerset.c |  622 ++++++++++
 .../test_integerset/test_integerset.control   |    4 +
 13 files changed, 1759 insertions(+), 1 deletion(-)
 create mode 100644 src/backend/lib/integerset.c
 create mode 100644 src/include/lib/integerset.h
 create mode 100644 src/test/modules/test_integerset/.gitignore
 create mode 100644 src/test/modules/test_integerset/Makefile
 create mode 100644 src/test/modules/test_integerset/README
 create mode 100644 src/test/modules/test_integerset/expected/test_integerset.out
 create mode 100644 src/test/modules/test_integerset/sql/test_integerset.sql
 create mode 100644 src/test/modules/test_integerset/test_integerset--1.0.sql
 create mode 100644 src/test/modules/test_integerset/test_integerset.c
 create mode 100644 src/test/modules/test_integerset/test_integerset.control

diff --git a/src/backend/lib/Makefile b/src/backend/lib/Makefile
index 191ea9bca26..3c1ee1df83a 100644
--- a/src/backend/lib/Makefile
+++ b/src/backend/lib/Makefile
@@ -13,6 +13,6 @@ top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = binaryheap.o bipartite_match.o bloomfilter.o dshash.o hyperloglog.o \
-       ilist.o knapsack.o pairingheap.o rbtree.o stringinfo.o
+       ilist.o integerset.o knapsack.o pairingheap.o rbtree.o stringinfo.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/lib/README b/src/backend/lib/README
index ae5debe1bc6..f2fb591237d 100644
--- a/src/backend/lib/README
+++ b/src/backend/lib/README
@@ -13,6 +13,8 @@ hyperloglog.c - a streaming cardinality estimator
 
 ilist.c - single and double-linked lists
 
+integerset.c - a data structure for holding large set of integers
+
 knapsack.c - knapsack problem solver
 
 pairingheap.c - a pairing heap
diff --git a/src/backend/lib/integerset.c b/src/backend/lib/integerset.c
new file mode 100644
index 00000000000..c9172fa2005
--- /dev/null
+++ b/src/backend/lib/integerset.c
@@ -0,0 +1,1039 @@
+/*-------------------------------------------------------------------------
+ *
+ * integerset.c
+ *	  Data structure to hold a large set of 64-bit integers efficiently
+ *
+ * IntegerSet provides an in-memory data structure to hold a set of
+ * arbitrary 64-bit integers.  Internally, the values are stored in a
+ * B-tree, with a special packed representation at the leaf level using
+ * the Simple-8b algorithm, which can pack hold clusters of nearby values
+ * very tightly.
+ *
+ * Memory consumption depends on the number of values stored, but also
+ * on how far the values are from each other.  In the best case, with
+ * long runs of consecutive integers, memory consumption can be as low as
+ * 0.1 bytes per integer.  In the worst case, if integers are more than
+ * 2^32 apart, it uses about 8 bytes per integer.  In typical use, the
+ * consumption per integer is somewhere between those extremes, depending
+ * on the range of integers stored, and how "clustered" they are.
+ *
+ *
+ * Interface
+ * ---------
+ *
+ *	intset_create			- Create a new empty set.
+ *	intset_add_member		- Add an integer to the set.
+ *	intset_is_member		- Test if an integer is in the set
+ *	intset_begin_iterate	- Begin iterating through all integers in set
+ *	intset_iterate_next		- Return next integer
+ *
+ *
+ * Limitations
+ * -----------
+ *
+ * - Values must be added in order.  (Random insertions would require
+ *   splitting nodes, which hasn't been implemented.)
+ *
+ * - Values cannot be added while iteration is in progress.
+ *
+ * - No support for removing values.
+ *
+ * None of these limitations are fundamental to the data structure, and
+ * could be lifted if needed, by writing some new code.  But the current
+ * users of this facility don't need them.
+ *
+ *
+ * References
+ * ----------
+ *
+ * Simple-8b encoding is based on:
+ *
+ * Vo Ngoc Anh , Alistair Moffat, Index compression using 64-bit words,
+ *   Software - Practice & Experience, v.40 n.2, p.131-147, February 2010
+ *   (https://doi.org/10.1002/spe.948)
+ *
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/lib/integerset.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "lib/integerset.h"
+#include "port/pg_bitutils.h"
+#include "utils/memutils.h"
+
+
+/*
+ * Properties of Simple-8b encoding.  (These are needed here, before
+ * other definitions, so that we can size other arrays accordingly).
+ *
+ * SIMPLE8B_MAX_VALUE is the greatest integer that can be encoded.  Simple-8B
+ * uses 64-bit words, but uses four bits to indicate the "mode" of the
+ * codeword, leaving at most 60 bits for the actual integer.
+ *
+ * SIMPLE8B_MAX_VALUES_PER_CODEWORD is the maximum number of integers that
+ * can be encoded in a single codeword.
+ */
+#define SIMPLE8B_MAX_VALUE		((1L << 60) - 1)
+#define SIMPLE8B_MAX_VALUES_PER_CODEWORD 240
+
+/*
+ * Parameters for shape of the in-memory B-tree.
+ *
+ * These set the size of each internal and leaf node.  They don't necessarily
+ * need to be the same, because the tree is just an in-memory structure.
+ * With the default 64, each node is about 1 kb.
+ *
+ * If you change these, you must recalculate MAX_TREE_LEVELS, too!
+ */
+#define MAX_INTERNAL_ITEMS	64
+#define MAX_LEAF_ITEMS	64
+
+/*
+ * Maximum height of the tree.
+ *
+ * MAX_TREE_ITEMS is calculated from the "fan-out" of the B-tree.  The
+ * theoretical maximum number of items that we can store in a set is 2^64,
+ * so MAX_TREE_LEVELS should be set so that:
+ *
+ *   MAX_LEAF_ITEMS * MAX_INTERNAL_ITEMS ^ (MAX_TREE_LEVELS - 1) >= 2^64.
+ *
+ * In practice, we'll need far fewer levels, because you will run out of
+ * memory long before reaching that number, but let's be conservative.
+ */
+#define MAX_TREE_LEVELS		11
+
+/*
+ * Node structures, for the in-memory B-tree.
+ *
+ * An internal node holds a number of downlink pointers to leaf nodes, or
+ * to internal nodes on lower level.  For each downlink, the key value
+ * corresponding the lower level node is stored in a sorted array.  The
+ * stored key values are low keys.  In other words, if the downlink has value
+ * X, then all items stored on that child are >= X.
+ *
+ * Each leaf node holds a number of "items", with a varying number of
+ * integers packed into each item.  Each item consists of two 64-bit words:
+ * The first word holds first integer stored in the item, in plain format.
+ * The second word contains between 0 and 240 more integers, packed using
+ * Simple-8b encoding.  By storing the first integer in plain, unpacked,
+ * format, we can use binary search to quickly find an item that holds (or
+ * would hold) a particular integer.  And by storing the rest in packed form,
+ * we still get pretty good memory density, if there are clusters of integers
+ * with similar values.
+ *
+ * Each leaf node also has a pointer to the next leaf node, so that the leaf
+ * nodes can be easily walked from beginning to end, when iterating.
+ */
+typedef struct intset_node intset_node;
+typedef struct intset_leaf_node intset_leaf_node;
+typedef struct intset_internal_node intset_internal_node;
+
+/* Common structure of both leaf and internal nodes. */
+struct intset_node
+{
+	uint16		level;
+	uint16		num_items;
+};
+
+/* Internal node */
+struct intset_internal_node
+{
+	/* common header, must match intset_node */
+	uint16		level;			/* >= 1 on internal nodes */
+	uint16		num_items;
+
+	/*
+	 * 'values' is an array of key values, and 'downlinks' are pointers
+	 * to lower-level nodes, corresponding to the key values.
+	 */
+	uint64		values[MAX_INTERNAL_ITEMS];
+	intset_node *downlinks[MAX_INTERNAL_ITEMS];
+};
+
+/* Leaf node */
+typedef struct
+{
+	uint64		first;		/* first integer in this item */
+	uint64		codeword;	/* simple8b encoded differences from 'first' */
+} leaf_item;
+
+#define MAX_VALUES_PER_LEAF_ITEM	(1 + SIMPLE8B_MAX_VALUES_PER_CODEWORD)
+
+struct intset_leaf_node
+{
+	/* common header, must match intset_node */
+	uint16		level;			/* 0 on leafs */
+	uint16		num_items;
+
+	intset_leaf_node *next;	/* right sibling, if any */
+
+	leaf_item	items[MAX_LEAF_ITEMS];
+};
+
+/*
+ * We buffer insertions in a simple array, before packing and inserting them
+ * into the B-tree.  MAX_BUFFERED_VALUES sets the size of the buffer.  The
+ * encoder assumes that it is large enough, that we can always fill a leaf
+ * item with buffered new items.  In other words, MAX_BUFFERED_VALUES must be
+ * larger than MAX_VALUES_PER_LEAF_ITEM.
+ */
+#define MAX_BUFFERED_VALUES			(MAX_VALUES_PER_LEAF_ITEM * 2)
+
+/*
+ * IntegerSet is the top-level object representing the set.
+ *
+ * The integers are stored in an in-memory B-tree structure, and an array
+ * for newly-added integers.  IntegerSet also tracks information about memory
+ * usage, as well as the current position, when iterating the set with
+ * intset_begin_iterate / intset_iterate_next.
+ */
+struct IntegerSet
+{
+	/*
+	 * 'context' is a dedicated memory context, used to hold the IntegerSet
+	 * struct itself, as well as all the tree nodes.
+	 *
+	 * 'mem_used' tracks the amount of memory used.  We don't do anything with
+	 * it in integerset.c itself, but the callers can ask for it with
+	 * intset_memory_usage().
+	 */
+	MemoryContext context;		/* memory context holding everything */
+	uint64		mem_used;		/* amount of memory used */
+
+	uint64		num_entries;	/* total # of values in the set */
+	uint64		highest_value;	/* highest value stored in this set */
+
+	/*
+	 * B-tree to hold the packed values.
+	 *
+	 * 'rightmost_nodes' hold pointers to the rightmost node on each level.
+	 * rightmost_parent[0] is rightmost leaf, rightmost_parent[1] is its
+	 * parent, and so forth, all the way up to the root. These are needed when
+	 * adding new values. (Currently, we require that new values are added at
+	 * the end.)
+	 */
+	int			num_levels;		/* height of the tree */
+	intset_node *root;			/* root node */
+	intset_node *rightmost_nodes[MAX_TREE_LEVELS];
+	intset_leaf_node *leftmost_leaf;	/* leftmost leaf node */
+
+	/*
+	 * Holding area for new items that haven't been inserted to the tree yet.
+	 */
+	uint64		buffered_values[MAX_BUFFERED_VALUES];
+	int			num_buffered_values;
+
+	/*
+	 * Iterator support.
+	 *
+	 * 'iter_values' is an array of integers ready to be returned to the
+	 * caller.  'item_node' and 'item_itemno' point to the leaf node, and
+	 * item within the leaf node, to get the next batch of values from.
+	 *
+	 * Normally, 'iter_values' points 'iter_values_buf', which holds items
+	 * decoded from a leaf item. But after we have scanned the whole B-tree,
+	 * we iterate through all the unbuffered values, too, by pointing
+	 * iter_values to 'buffered_values'.
+	 */
+	uint64	   *iter_values;
+	int			iter_num_values;	/* number of elements in 'iter_values' */
+	int			iter_valueno;		/* index into 'iter_values' */
+	intset_leaf_node *iter_node;	/* current leaf node */
+	int			iter_itemno;		/* next item 'iter_node' to decode */
+
+	uint64		iter_values_buf[MAX_VALUES_PER_LEAF_ITEM];
+};
+
+/*
+ * prototypes for internal functions.
+ */
+static void intset_update_upper(IntegerSet *intset, int level,
+				 intset_node *new_node, uint64 new_node_item);
+static void intset_flush_buffered_values(IntegerSet *intset);
+
+static int intset_binsrch_uint64(uint64 value, uint64 *arr, int arr_elems,
+				bool nextkey);
+static int intset_binsrch_leaf(uint64 value, leaf_item *arr, int arr_elems,
+				bool nextkey);
+
+static uint64 simple8b_encode(uint64 *ints, int *num_encoded, uint64 base);
+static int simple8b_decode(uint64 codeword, uint64 *decoded, uint64 base);
+static bool simple8b_contains(uint64 codeword, uint64 key, uint64 base);
+
+
+/*
+ * Create a new, initially empty, integer set.
+ */
+IntegerSet *
+intset_create(void)
+{
+	MemoryContext context;
+	IntegerSet *intset;
+
+	/*
+	 * Create a new memory context to hold everything.
+	 *
+	 * We never free any nodes, so the generational allocator works well for
+	 * us.
+	 *
+	 * Use a large block size, in the hopes that if we use a lot of memory,
+	 * the libc allocator will give it back to the OS when we free it, rather
+	 * than add it to a free-list. (On glibc, see M_MMAP_THRESHOLD.  As of this
+	 * writing, the effective threshold is somewhere between 128 kB and 4 MB.)
+	 */
+	context = GenerationContextCreate(CurrentMemoryContext,
+									  "integer set",
+									  SLAB_LARGE_BLOCK_SIZE);
+
+	/* Allocate the IntegerSet object itself, in the context. */
+	intset = (IntegerSet *) MemoryContextAlloc(context, sizeof(IntegerSet));
+	intset->context = context;
+	intset->mem_used = GetMemoryChunkSpace(intset);
+
+	intset->num_entries = 0;
+	intset->highest_value = 0;
+
+	intset->num_levels = 0;
+	intset->root = NULL;
+	memset(intset->rightmost_nodes, 0, sizeof(intset->rightmost_nodes));
+	intset->leftmost_leaf = NULL;
+
+	intset->num_buffered_values = 0;
+
+	intset->iter_node = NULL;
+	intset->iter_itemno = 0;
+	intset->iter_valueno = 0;
+	intset->iter_num_values = 0;
+
+	return intset;
+}
+
+/*
+ * Allocate a new node.
+ */
+static intset_internal_node *
+intset_new_internal_node(IntegerSet *intset)
+{
+	intset_internal_node *n;
+
+	n = (intset_internal_node *) MemoryContextAlloc(intset->context,
+													sizeof(intset_internal_node));
+	intset->mem_used += GetMemoryChunkSpace(n);
+
+	n->level = 0;		/* caller must set */
+	n->num_items = 0;
+
+	return n;
+}
+
+static intset_leaf_node *
+intset_new_leaf_node(IntegerSet *intset)
+{
+	intset_leaf_node *n;
+
+	n = (intset_leaf_node *) MemoryContextAlloc(intset->context,
+												sizeof(intset_leaf_node));
+	intset->mem_used += GetMemoryChunkSpace(n);
+
+	n->level = 0;
+	n->num_items = 0;
+	n->next = NULL;
+
+	return n;
+}
+
+/*
+ * Free the integer set
+ */
+void
+intset_free(IntegerSet *intset)
+{
+	/* everything is allocated in the memory context */
+	MemoryContextDelete(intset->context);
+}
+
+/*
+ * Return the number of entries in the integer set.
+ */
+uint64
+intset_num_entries(IntegerSet *intset)
+{
+	return intset->num_entries;
+}
+
+/*
+ * Return the amount of memory used by the integer set.
+ */
+uint64
+intset_memory_usage(IntegerSet *intset)
+{
+	return intset->mem_used;
+}
+
+/*
+ * Add a value to the set.
+ *
+ * Values must be added in order.
+ */
+void
+intset_add_member(IntegerSet *intset, uint64 x)
+{
+	if (intset->iter_node)
+		elog(ERROR, "cannot add new values to integer set when iteration is in progress");
+
+	if (x <= intset->highest_value && intset->num_entries > 0)
+		elog(ERROR, "cannot add value to integer set out of order");
+
+	if (intset->num_buffered_values >= MAX_BUFFERED_VALUES)
+	{
+		/* Time to flush our buffer */
+		intset_flush_buffered_values(intset);
+		Assert(intset->num_buffered_values < MAX_BUFFERED_VALUES);
+	}
+
+	/* Add it to the buffer of newly-added values */
+	intset->buffered_values[intset->num_buffered_values] = x;
+	intset->num_buffered_values++;
+	intset->num_entries++;
+	intset->highest_value = x;
+}
+
+/*
+ * Take a batch of buffered values, and pack them into the B-tree.
+ */
+static void
+intset_flush_buffered_values(IntegerSet *intset)
+{
+	uint64	   *values = intset->buffered_values;
+	uint64		num_values = intset->num_buffered_values;
+	int			num_packed = 0;
+	intset_leaf_node *leaf;
+
+	leaf = (intset_leaf_node *) intset->rightmost_nodes[0];
+
+	/*
+	 * If the tree is completely empty, create the first leaf page, which
+	 * is also the root.
+	 */
+	if (leaf == NULL)
+	{
+		/*
+		 * This is the very first item in the set.
+		 *
+		 * Allocate root node. It's also a leaf.
+		 */
+		leaf = intset_new_leaf_node(intset);
+
+		intset->root = (intset_node *) leaf;
+		intset->leftmost_leaf = leaf;
+		intset->rightmost_nodes[0] = (intset_node *) leaf;
+		intset->num_levels = 1;
+	}
+
+	/*
+	 * If there are less than MAX_VALUES_PER_LEAF_ITEM values in the
+	 * buffer, stop.  In most cases, we cannot encode that many values
+	 * in a single value, but this way, the encoder doesn't have to
+	 * worry about running out of input.
+	 */
+	while (num_values - num_packed >= MAX_VALUES_PER_LEAF_ITEM)
+	{
+		leaf_item	item;
+		int			num_encoded;
+
+		/*
+		 * Construct the next leaf item, packing as many buffered values
+		 * as possible.
+		 */
+		item.first = values[num_packed];
+		item.codeword = simple8b_encode(&values[num_packed + 1],
+										&num_encoded,
+										item.first);
+
+		/*
+		 * Add the item to the node, allocating a new node if the old one
+		 * is full.
+		 */
+		if (leaf->num_items >= MAX_LEAF_ITEMS)
+		{
+			/* Allocate new leaf and link it to the tree */
+			intset_leaf_node *old_leaf = leaf;
+
+			leaf = intset_new_leaf_node(intset);
+			old_leaf->next = leaf;
+			intset->rightmost_nodes[0] = (intset_node *) leaf;
+			intset_update_upper(intset, 1, (intset_node *) leaf, item.first);
+		}
+		leaf->items[leaf->num_items++] = item;
+
+		num_packed += 1 + num_encoded;
+	}
+
+	/*
+	 * Move any remaining buffered values to the beginning of the array.
+	 */
+	if (num_packed < intset->num_buffered_values)
+	{
+		memmove(&intset->buffered_values[0],
+				&intset->buffered_values[num_packed],
+				(intset->num_buffered_values - num_packed) * sizeof(uint64));
+	}
+	intset->num_buffered_values -= num_packed;
+}
+
+/*
+ * Insert a downlink into parent node, after creating a new node.
+ *
+ * Recurses if the parent node is full, too.
+ */
+static void
+intset_update_upper(IntegerSet *intset, int level, intset_node *child,
+					uint64 child_key)
+{
+	intset_internal_node *parent;
+
+	Assert(level > 0);
+
+	/*
+	 * Create a new root node, if necessary.
+	 */
+	if (level >= intset->num_levels)
+	{
+		intset_node *oldroot = intset->root;
+		uint64		downlink_key;
+
+		/* MAX_TREE_LEVELS should be more than enough, this shouldn't happen */
+		if (intset->num_levels == MAX_TREE_LEVELS)
+			elog(ERROR, "could not expand integer set, maximum number of levels reached");
+		intset->num_levels++;
+
+		/*
+		 * Get the first value on the old root page, to be used as the
+		 * downlink.
+		 */
+		if (intset->root->level == 0)
+			downlink_key = ((intset_leaf_node *) oldroot)->items[0].first;
+		else
+			downlink_key = ((intset_internal_node *) oldroot)->values[0];
+
+		parent = intset_new_internal_node(intset);
+		parent->level = level;
+		parent->values[0] = downlink_key;
+		parent->downlinks[0] = oldroot;
+		parent->num_items = 1;
+
+		intset->root = (intset_node *) parent;
+		intset->rightmost_nodes[level] = (intset_node *) parent;
+	}
+
+	/*
+	 * Place the downlink on the parent page.
+	 */
+	parent = (intset_internal_node *) intset->rightmost_nodes[level];
+
+	if (parent->num_items < MAX_INTERNAL_ITEMS)
+	{
+		parent->values[parent->num_items] = child_key;
+		parent->downlinks[parent->num_items] = child;
+		parent->num_items++;
+	}
+	else
+	{
+		/*
+		 * Doesn't fit.  Allocate new parent, with the downlink as the first
+		 * item on it, and recursively insert the downlink to the new parent
+		 * to the grandparent.
+		 */
+		parent = intset_new_internal_node(intset);
+		parent->level = level;
+		parent->values[0] = child_key;
+		parent->downlinks[0] = child;
+		parent->num_items = 1;
+
+		intset->rightmost_nodes[level] = (intset_node *) parent;
+
+		intset_update_upper(intset, level + 1, (intset_node *) parent, child_key);
+	}
+}
+
+/*
+ * Does the set contain the given value?
+ */
+bool
+intset_is_member(IntegerSet *intset, uint64 x)
+{
+	intset_node   *node;
+	intset_leaf_node *leaf;
+	int			level;
+	int			itemno;
+	leaf_item  *item;
+
+	/*
+	 * The value might be in the buffer of newly-added values.
+	 */
+	if (intset->num_buffered_values > 0 && x >= intset->buffered_values[0])
+	{
+		int			itemno;
+
+		itemno = intset_binsrch_uint64(x,
+									   intset->buffered_values,
+									   intset->num_buffered_values,
+									   false);
+		if (itemno >= intset->num_buffered_values)
+			return false;
+		else
+			return intset->buffered_values[itemno] == x;
+	}
+
+	/*
+	 * Start from the root, and walk down the B-tree to find the right leaf
+	 * node.
+	 */
+	if (!intset->root)
+		return false;
+	node = intset->root;
+	for (level = intset->num_levels - 1; level > 0; level--)
+	{
+		intset_internal_node *n = (intset_internal_node *) node;
+
+		Assert(node->level == level);
+
+		itemno = intset_binsrch_uint64(x, n->values, n->num_items, true);
+		if (itemno == 0)
+			return false;
+		node = n->downlinks[itemno - 1];
+	}
+	Assert(node->level == 0);
+	leaf = (intset_leaf_node *) node;
+
+	/*
+	 * Binary search the right item on the leaf page
+	 */
+	itemno = intset_binsrch_leaf(x, leaf->items, leaf->num_items, true);
+	if (itemno == 0)
+		return false;
+	item = &leaf->items[itemno - 1];
+
+	/* Is this a match to the first value on the item? */
+	if (item->first == x)
+		return true;
+	Assert(x > item->first);
+
+	/* Is it in the packed codeword? */
+	if (simple8b_contains(item->codeword, x, item->first))
+		return true;
+
+	return false;
+}
+
+/*
+ * Begin in-order scan through all the values.
+ *
+ * While the iteration is in-progress, you cannot add new values to the set.
+ */
+void
+intset_begin_iterate(IntegerSet *intset)
+{
+	intset->iter_node = intset->leftmost_leaf;
+	intset->iter_itemno = 0;
+	intset->iter_valueno = 0;
+	intset->iter_num_values = 0;
+	intset->iter_values = intset->iter_values_buf;
+}
+
+/*
+ * Returns the next integer, when iterating.
+ *
+ * intset_begin_iterate() must be called first.  intset_iterate_next() returns
+ * the next value in the set.  If there are no more values, *found is set
+ * to false.
+ */
+uint64
+intset_iterate_next(IntegerSet *intset, bool *found)
+{
+	for (;;)
+	{
+		if (intset->iter_valueno < intset->iter_num_values)
+		{
+			*found = true;
+			return intset->iter_values[intset->iter_valueno++];
+		}
+
+		/* Our queue is empty, decode next leaf item */
+		if (intset->iter_node && intset->iter_itemno < intset->iter_node->num_items)
+		{
+			/* We have reached end of this packed item.  Step to the next one. */
+			leaf_item  *item;
+			int			num_decoded;
+
+			item = &intset->iter_node->items[intset->iter_itemno++];
+
+			intset->iter_values[0] = item->first;
+			num_decoded = simple8b_decode(item->codeword, &intset->iter_values[1], item->first);
+			intset->iter_num_values = num_decoded + 1;
+
+			intset->iter_valueno = 0;
+			continue;
+		}
+
+		/* No more items on this leaf, step to next node */
+		if (intset->iter_node)
+		{
+			/* No more matches on this bucket. Step to the next node. */
+			intset->iter_node = intset->iter_node->next;
+			intset->iter_itemno = 0;
+			intset->iter_valueno = 0;
+			intset->iter_num_values = 0;
+			continue;
+		}
+
+		/*
+		 * We have reached the end of the B-tree.  But we might still have
+		 * some integers in the buffer of newly-added values.
+		 */
+		if (intset->iter_values == intset->iter_values_buf)
+		{
+			intset->iter_values = intset->buffered_values;
+			intset->iter_num_values = intset->num_buffered_values;
+			continue;
+		}
+
+		break;
+	}
+
+	/* No more results. */
+	*found = false;
+	return 0;
+}
+
+/*
+ * intset_binsrch_uint64() -- search a sorted array of uint64s
+ *
+ * Returns the first position with key equal or less than the given key.
+ * The returned position would be the "insert" location for the given key,
+ * that is, the position where the new key should be inserted to.
+ *
+ * 'nextkey' affects the behavior on equal keys.  If true, and there is an
+ * equal key in the array, this returns the position immediately after the
+ * equal key.  If false, this returns the position of the equal key itself.
+ */
+static int
+intset_binsrch_uint64(uint64 item, uint64 *arr, int arr_elems, bool nextkey)
+{
+	int			low,
+				high,
+				mid;
+
+	low = 0;
+	high = arr_elems;
+	while (high > low)
+	{
+		mid = low + (high - low) / 2;
+
+		if (nextkey)
+		{
+			if (item >= arr[mid])
+				low = mid + 1;
+			else
+				high = mid;
+		}
+		else
+		{
+			if (item > arr[mid])
+				low = mid + 1;
+			else
+				high = mid;
+		}
+	}
+
+	return low;
+}
+
+/* same, but for an array of leaf items */
+static int
+intset_binsrch_leaf(uint64 item, leaf_item *arr, int arr_elems, bool nextkey)
+{
+	int			low,
+				high,
+				mid;
+
+	low = 0;
+	high = arr_elems;
+	while (high > low)
+	{
+		mid = low + (high - low) / 2;
+
+		if (nextkey)
+		{
+			if (item >= arr[mid].first)
+				low = mid + 1;
+			else
+				high = mid;
+		}
+		else
+		{
+			if (item > arr[mid].first)
+				low = mid + 1;
+			else
+				high = mid;
+		}
+	}
+
+	return low;
+}
+
+/*
+ * Simple-8b encoding.
+ *
+ * Simple-8b algorithm packs between 1 and 240 integers into 64-bit words,
+ * called "codewords".  The number of integers packed into a single codeword
+ * depends on the integers being packed: small integers are encoded using
+ * fewer bits than large integers.  A single codeword can store a single
+ * 60-bit integer, or two 30-bit integers, for example.
+ *
+ * Since we're storing a unique, sorted, set of integers, we actually encode
+ * the *differences* between consecutive integers.  That way, clusters of
+ * integers that are close to each other are packed efficiently, regardless
+ * of the absolute values.
+ *
+ * In Simple-8b, each codeword consists of a 4-bit selector, which indicates
+ * how many integers are encoded in the codeword, and the encoded integers
+ * packed into the remaining 60 bits.  The selector allows for 16 different
+ * ways of using the remaining 60 bits, "modes".  The number of integers
+ * packed into a single codeword is listed in the simple8b_modes table below.
+ * For example, consider the following codeword:
+ *
+ *      20-bit integer       20-bit integer       20-bit integer
+ * 1101 00000000000000010010 01111010000100100000 00000000000000010100
+ * ^
+ * selector
+ *
+ * The selector 1101 is 13 in decimal.  From the modes table below, we see
+ * that it means that the codeword encodes three 12-bit integers.  In decimal,
+ * those integers are 18, 500000 and 20.  Because we encode deltas rather than
+ * absolute values, the actual values that they represent are 18,  500018 and
+ * 500038.
+ *
+ * Modes 0 and 1 are a bit special; they encode a run of 240 or 120 zeros
+ * (which means 240 or 120 consecutive integers, since we're encoding the
+ * the deltas between integers), without using the rest of the codeword bits
+ * for anything.
+ *
+ * Simple-8b cannot encode integers larger than 60 bits.  Values larger than
+ * that are always stored in the 'first' field of a leaf item, never in the
+ * packed codeword.  If there is a sequence of integers that are more than
+ * 2^60 apart, the codeword will go unused on those items.  To represent that,
+ * we use a magic EMPTY_CODEWORD codeword.
+ */
+static const struct
+{
+	uint8		bits_per_int;
+	uint8		num_ints;
+} simple8b_modes[17] =
+{
+	{  0, 240 },	/* mode  0: 240 zeros */
+	{  0, 120 },	/* mode  1: 120 zeros */
+	{  1,  60 },	/* mode  2: sixty 1-bit integers */
+	{  2,  30 },	/* mode  3: thirty 2-bit integers */
+	{  3,  20 },	/* mode  4: twenty 3-bit integers */
+	{  4,  15 },	/* mode  5: fifteen 4-bit integers */
+	{  5,  12 },	/* mode  6: twelve 5-bit integers */
+	{  6,  10 },	/* mode  7: ten 6-bit integers */
+	{  7,   8 },	/* mode  8: eight 7-bit integers (four bits are wasted) */
+	{  8,   7 },	/* mode  9: seven 8-bit integers (four bits are wasted) */
+	{ 10,   6 },	/* mode 10: six 10-bit integers */
+	{ 12,   5 },	/* mode 11: five 12-bit integers */
+	{ 15,   4 },	/* mode 12: four 15-bit integers */
+	{ 20,   3 },	/* mode 13: three 20-bit integers */
+	{ 30,   2 },	/* mode 14: two 30-bit integers */
+	{ 60,   1 },	/* mode 15: one 60-bit integer */
+	{ 0,    0 }		/* sentinel value */
+};
+
+/*
+ * EMPTY_CODEWORD is a special value, used to indicate "no values".
+ * It is used if the next value is too large to be encoded with Simple-8b.
+ *
+ * This value looks like a 0-mode codeword,  but we check for it
+ * specifically.  (In a real 0-mode codeword, all the unused bits are zero.)
+ */
+#define EMPTY_CODEWORD		(0xFFFFFFFFFFFFFFF0)
+
+/*
+ * Encode a number of integers into a Simple-8b codeword.
+ *
+ * Returns the number of integers that were encoded.
+ */
+static uint64
+simple8b_encode(uint64 *ints, int *num_encoded, uint64 base)
+{
+	int			selector;
+	int			nints;
+	int			bits;
+	uint64		diff;
+	uint64		last_val;
+	uint64		codeword;
+	uint64		diffs[60];
+	int			i;
+
+	Assert(ints[0] > base);
+
+	/*
+	 * Select the "mode" to use for the next codeword.
+	 *
+	 * In each iteration, check if the next value can be represented
+	 * in the current mode we're considering.  If it's too large, then
+	 * step up the mode to a wider one, and repeat.  If it fits, move
+	 * on to next integer.  Repeat until the codeword is full, given
+	 * the current mode.
+	 *
+	 * Note that we don't have any way to represent unused slots in the
+	 * codeword, so we require each codeword to be "full".
+	 */
+	selector = 0;
+	nints = simple8b_modes[0].num_ints;
+	bits = simple8b_modes[0].bits_per_int;
+	diff = ints[0] - base - 1;
+	last_val = ints[0];
+	i = 0;
+	for (;;)
+	{
+		if (diff >= (1L << bits))
+		{
+			/* too large, step up to next mode */
+			selector++;
+			nints = simple8b_modes[selector].num_ints;
+			bits = simple8b_modes[selector].bits_per_int;
+			if (i >= nints)
+				break;
+		}
+		else
+		{
+			if (i < 60)
+				diffs[i] = diff;
+			i++;
+			if (i >= nints)
+				break;
+
+			Assert(ints[i] > last_val);
+			diff = ints[i] - last_val - 1;
+			last_val = ints[i];
+		}
+	}
+
+	if (nints == 0)
+	{
+		/* The next value is too large and be encoded with Simple-8b */
+		Assert(i == 0);
+		*num_encoded = 0;
+		return EMPTY_CODEWORD;
+	}
+
+	/*
+	 * Encode the integers using the selected mode.  Note that we shift them
+	 * into the codeword in reverse order, so that they will come out in the
+	 * correct order in the decoder.
+	 */
+	codeword = 0;
+	if (bits > 0)
+	{
+		for (i = nints - 1; i >= 0; i--)
+		{
+			codeword <<= bits;
+			codeword |= diffs[i];
+		}
+	}
+
+	/* add selector to the codeword, and return */
+	codeword <<= 4;
+	codeword |= selector;
+
+	*num_encoded = nints;
+	return codeword;
+}
+
+/*
+ * Decode a codeword into an array of integers.
+ */
+static int
+simple8b_decode(uint64 codeword, uint64 *decoded, uint64 base)
+{
+	int			selector = codeword & 0x0f;
+	int			nints = simple8b_modes[selector].num_ints;
+	uint64		bits = simple8b_modes[selector].bits_per_int;
+	uint64		mask = (1L << bits) - 1;
+	uint64		prev_value;
+
+	if (codeword == EMPTY_CODEWORD)
+		return 0;
+
+	codeword >>= 4;		/* shift out the selector */
+
+	prev_value = base;
+	for (int i = 0; i < nints; i++)
+	{
+		uint64		diff = codeword & mask;
+
+		decoded[i] = prev_value + 1L + diff;
+		prev_value = decoded[i];
+		codeword >>= bits;
+	}
+	return nints;
+}
+
+/*
+ * This is very similar to simple8b_decode(), but instead of decoding all
+ * the values to an array, it just checks if the given integer is part of
+ * the codeword.
+ */
+static bool
+simple8b_contains(uint64 codeword, uint64 key, uint64 base)
+{
+	int			selector = codeword & 0x0f;
+	int			nints = simple8b_modes[selector].num_ints;
+	int			bits = simple8b_modes[selector].bits_per_int;
+
+	if (codeword == EMPTY_CODEWORD)
+		return false;
+
+	codeword >>= 4;		/* shift out the selector */
+
+	if (bits == 0)
+	{
+		/* Special handling for 0-bit cases. */
+		return key - base <= nints;
+	}
+	else
+	{
+		int			mask = (1L << bits) - 1;
+		uint64		prev_value;
+
+		prev_value = base;
+		for (int i = 0; i < nints; i++)
+		{
+			uint64		diff = codeword & mask;
+			uint64		curr_value;
+
+			curr_value = prev_value + 1L + diff;
+
+			if (curr_value >= key)
+			{
+				if (curr_value == key)
+					return true;
+				else
+					return false;
+			}
+
+			codeword >>= bits;
+			prev_value = curr_value;
+		}
+	}
+	return false;
+}
diff --git a/src/include/lib/integerset.h b/src/include/lib/integerset.h
new file mode 100644
index 00000000000..27aa3ee883c
--- /dev/null
+++ b/src/include/lib/integerset.h
@@ -0,0 +1,25 @@
+/*
+ * integerset.h
+ *	  In-memory data structure to hold a large set of integers efficiently
+ *
+ * Portions Copyright (c) 2012-2019, PostgreSQL Global Development Group
+ *
+ * src/include/lib/integerset.h
+ */
+#ifndef INTEGERSET_H
+#define INTEGERSET_H
+
+typedef struct IntegerSet IntegerSet;
+
+extern IntegerSet *intset_create(void);
+extern void intset_free(IntegerSet *intset);
+extern void intset_add_member(IntegerSet *intset, uint64 x);
+extern bool intset_is_member(IntegerSet *intset, uint64 x);
+
+extern uint64 intset_num_entries(IntegerSet *intset);
+extern uint64 intset_memory_usage(IntegerSet *intset);
+
+extern void intset_begin_iterate(IntegerSet *intset);
+extern uint64 intset_iterate_next(IntegerSet *intset, bool *found);
+
+#endif							/* INTEGERSET_H */
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 19d60a506e1..dfd0956aee3 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -12,6 +12,7 @@ SUBDIRS = \
 		  test_bloomfilter \
 		  test_ddl_deparse \
 		  test_extensions \
+		  test_integerset \
 		  test_parser \
 		  test_pg_dump \
 		  test_predtest \
diff --git a/src/test/modules/test_integerset/.gitignore b/src/test/modules/test_integerset/.gitignore
new file mode 100644
index 00000000000..5dcb3ff9723
--- /dev/null
+++ b/src/test/modules/test_integerset/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/test_integerset/Makefile b/src/test/modules/test_integerset/Makefile
new file mode 100644
index 00000000000..3b7c4999d6f
--- /dev/null
+++ b/src/test/modules/test_integerset/Makefile
@@ -0,0 +1,21 @@
+# src/test/modules/test_integerset/Makefile
+
+MODULE_big = test_integerset
+OBJS = test_integerset.o $(WIN32RES)
+PGFILEDESC = "test_integerset - test code for src/backend/lib/integerset.c"
+
+EXTENSION = test_integerset
+DATA = test_integerset--1.0.sql
+
+REGRESS = test_integerset
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_integerset
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_integerset/README b/src/test/modules/test_integerset/README
new file mode 100644
index 00000000000..3e4226adb55
--- /dev/null
+++ b/src/test/modules/test_integerset/README
@@ -0,0 +1,7 @@
+test_integerset contains unit tests for testing the integer set implementation,
+in src/backend/lib/integerset.c
+
+The tests verify the correctness of the implemention, but they can also be
+as a micro-benchmark:  If you set the 'intset_tests_stats' flag in
+test_integerset.c, the tests will print extra information about execution time
+and memory usage.
diff --git a/src/test/modules/test_integerset/expected/test_integerset.out b/src/test/modules/test_integerset/expected/test_integerset.out
new file mode 100644
index 00000000000..d7c88ded092
--- /dev/null
+++ b/src/test/modules/test_integerset/expected/test_integerset.out
@@ -0,0 +1,14 @@
+CREATE EXTENSION test_integerset;
+--
+-- These tests don't produce any interesting output.  We're checking that
+-- the operations complete without crashing or hanging and that none of their
+-- internal sanity tests fail.  They print progress information as INFOs,
+-- which are not interesting for automated tests, so suppress those.
+--
+SET client_min_messages = 'warning';
+SELECT test_integerset();
+ test_integerset 
+-----------------
+ 
+(1 row)
+
diff --git a/src/test/modules/test_integerset/sql/test_integerset.sql b/src/test/modules/test_integerset/sql/test_integerset.sql
new file mode 100644
index 00000000000..34223afa885
--- /dev/null
+++ b/src/test/modules/test_integerset/sql/test_integerset.sql
@@ -0,0 +1,11 @@
+CREATE EXTENSION test_integerset;
+
+--
+-- These tests don't produce any interesting output.  We're checking that
+-- the operations complete without crashing or hanging and that none of their
+-- internal sanity tests fail.  They print progress information as INFOs,
+-- which are not interesting for automated tests, so suppress those.
+--
+SET client_min_messages = 'warning';
+
+SELECT test_integerset();
diff --git a/src/test/modules/test_integerset/test_integerset--1.0.sql b/src/test/modules/test_integerset/test_integerset--1.0.sql
new file mode 100644
index 00000000000..d6d5a3f6cf7
--- /dev/null
+++ b/src/test/modules/test_integerset/test_integerset--1.0.sql
@@ -0,0 +1,8 @@
+/* src/test/modules/test_integerset/test_integerset--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_integerset" to load this file. \quit
+
+CREATE FUNCTION test_integerset()
+RETURNS pg_catalog.void STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_integerset/test_integerset.c b/src/test/modules/test_integerset/test_integerset.c
new file mode 100644
index 00000000000..24a2e08c0d1
--- /dev/null
+++ b/src/test/modules/test_integerset/test_integerset.c
@@ -0,0 +1,622 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_integerset.c
+ *		Test integer set data structure.
+ *
+ * Copyright (c) 2019, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/test/modules/test_integerset/test_integerset.c
+ *
+ * -------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "fmgr.h"
+#include "lib/integerset.h"
+#include "nodes/bitmapset.h"
+#include "utils/memutils.h"
+#include "utils/timestamp.h"
+#include "storage/block.h"
+#include "storage/itemptr.h"
+#include "miscadmin.h"
+
+/*
+ * If you enable this, the "pattern" tests will print information about
+ * how long populating, probing, and iterating the test set takes, and
+ * how much memory the test set consumed.  That can be used as
+ * micro-benchmark of various operations and input patterns.
+ *
+ * The information is printed to the server's stderr, mostly because
+ * that's where MemoryContextStats() output goes.
+ */
+static const bool intset_test_stats = true;
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(test_integerset);
+
+/*
+ * A struct to define a pattern of integers, for use with test_pattern()
+ * function.
+ */
+typedef struct
+{
+	char	   *test_name;		/* short name of the test, for humans */
+	char	   *pattern_str;	/* a bit pattern */
+	uint64		spacing;		/* pattern repeats at this interval */
+	uint64		num_values;		/* number of integers to set in total */
+} test_spec;
+
+static const test_spec test_specs[] = {
+	{
+		"all ones", "1111111111",
+		10, 100000000
+	},
+	{
+		"alternating bits", "0101010101",
+		10, 100000000
+	},
+	{
+		"clusters of ten", "1111111111",
+		10000, 10000000
+	},
+	{
+		"clusters of hundred",
+		"1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111",
+		10000, 100000000
+	},
+	{
+		"clusters of thousand",
+		"1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"
+		"1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"
+		"1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"
+		"1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"
+		"1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"
+		"1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"
+		"1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"
+		"1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"
+		"1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"
+		"1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111",
+		10000, 100000000
+	},
+	{
+		"one-every-64k", "1",
+		65536, 10000000
+	},
+	{
+		"sparse", "100000000000000000000000000000001",
+		10000000, 10000000
+	},
+	{
+		"single values, distance > 2^32", "1",
+		10000000000L, 1000000
+	},
+	{
+		"clusters, distance > 2^32", "10101010",
+		10000000000L, 10000000
+	},
+	{
+		"clusters, distance > 2^60", "10101010",
+		2000000000000000000L, 23 /* can't be much higher than this, or we overflow uint64 */
+	}
+};
+
+static void test_pattern(const test_spec *spec);
+static void test_empty(void);
+static void test_single_value(uint64 value);
+static void check_with_filler(IntegerSet *intset, uint64 x, uint64 value, uint64 filler_min, uint64 filler_max);
+static void test_single_value_and_filler(uint64 value, uint64 filler_min, uint64 filler_max);
+static void test_huge_distances(void);
+
+/*
+ * SQL-callable entry point to perform all tests.
+ */
+Datum
+test_integerset(PG_FUNCTION_ARGS)
+{
+	test_huge_distances();
+
+	test_empty();
+
+	test_single_value(0);
+	test_single_value(1);
+	test_single_value(PG_UINT64_MAX - 1);
+	test_single_value(PG_UINT64_MAX);
+
+	/* Same tests, but with some "filler" values, so that the B-tree gets created */
+	test_single_value_and_filler(0, 1000, 2000);
+	test_single_value_and_filler(1, 1000, 2000);
+	test_single_value_and_filler(1, 1000, 2000000);
+	test_single_value_and_filler(PG_UINT64_MAX - 1, 1000, 2000);
+	test_single_value_and_filler(PG_UINT64_MAX, 1000, 2000);
+
+	test_huge_distances();
+
+	for (int i = 0; i < lengthof(test_specs); i++)
+		test_pattern(&test_specs[i]);
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * Test with a repeating pattern, defined by the 'spec'.
+ */
+static void
+test_pattern(const test_spec *spec)
+{
+	IntegerSet *intset;
+	MemoryContext test_cxt;
+	MemoryContext old_cxt;
+	TimestampTz starttime;
+	TimestampTz endtime;
+	uint64		n;
+	uint64		last_int;
+	int			patternlen;
+	uint64	   *pattern_values;
+	uint64		pattern_num_values;
+
+	elog(NOTICE, "testing intset with pattern \"%s\"", spec->test_name);
+	if (intset_test_stats)
+		fprintf(stderr, "-----\ntesting intset with pattern \"%s\"\n", spec->test_name);
+
+	/* Pre-process the pattern, creating an array of integers from it. */
+	patternlen = strlen(spec->pattern_str);
+	pattern_values = palloc(patternlen * sizeof(uint64));
+	pattern_num_values = 0;
+	for (int i = 0; i < patternlen; i++)
+	{
+		if (spec->pattern_str[i] == '1')
+			pattern_values[pattern_num_values++] = i;
+	}
+
+	/*
+	 * Allocate the integer set.
+	 *
+	 * Allocate it in a separate memory context, so that we can print its
+	 * memory usage easily.  (intset_create() creates a memory context of its
+	 * own, too, but we don't have direct access to it, so we cannot call
+	 * MemoryContextStats() on it directly).
+	 */
+	test_cxt = AllocSetContextCreate(CurrentMemoryContext,
+									 "intset test",
+									 ALLOCSET_SMALL_SIZES);
+	MemoryContextSetIdentifier(test_cxt, spec->test_name);
+	old_cxt = MemoryContextSwitchTo(test_cxt);
+	intset = intset_create();
+	MemoryContextSwitchTo(old_cxt);
+
+	/*
+	 * Add values to the set.
+	 */
+	starttime = GetCurrentTimestamp();
+
+	n = 0;
+	last_int = 0;
+	while (n < spec->num_values)
+	{
+		uint64		x = 0;
+
+		for (int i = 0; i < pattern_num_values && n < spec->num_values; i++)
+		{
+			x = last_int + pattern_values[i];
+
+			intset_add_member(intset, x);
+			n++;
+		}
+		last_int += spec->spacing;
+	}
+
+	endtime = GetCurrentTimestamp();
+
+	if (intset_test_stats)
+		fprintf(stderr, "added %lu values in %lu ms\n",
+				spec->num_values, (endtime - starttime) / 1000);
+
+	/*
+	 * Print stats on the amount of memory used.
+	 *
+	 * We print the usage reported by intset_memory_usage(), as well as the
+	 * stats from the memory context. They should be in the same ballpark,
+	 * but it's hard to automate testing that, so if you're making changes
+	 * to the implementation, just observe that manually.
+	 */
+	if (intset_test_stats)
+	{
+		uint64		mem_usage;
+
+		/*
+		 * Also print memory usage as reported by intset_memory_usage().
+		 * It should be in the same ballpark as the usage reported by
+		 * MemoryContextStats().
+		 */
+		mem_usage = intset_memory_usage(intset);
+		fprintf(stderr, "intset_memory_usage() reported %lu (%0.2f bytes / integer)\n",
+				mem_usage, (double) mem_usage / spec->num_values);
+
+		MemoryContextStats(test_cxt);
+	}
+
+	/* Check that intset_get_num_entries works */
+	n = intset_num_entries(intset);
+	if (n != spec->num_values)
+		elog(ERROR, "intset_num_entries returned %lu, expected %lu", n, spec->num_values);
+
+	/*
+	 * Test random-access probes with intset_is_member()
+	 */
+	starttime = GetCurrentTimestamp();
+
+	for (n = 0; n < 1000000; n++)
+	{
+		bool		b;
+		bool		expected;
+		uint64		x;
+
+		/*
+		 * Pick next value to probe at random.  We limit the probes to the
+		 * last integer that we added to the set, plus an arbitrary constant
+		 * (1000).  There's no point in probing the whole 0 - 2^64 range, if
+		 * only a small part of the integer space is used.  We would very
+		 * rarely hit hit values that are actually in the set.
+		 */
+		x = (pg_lrand48() << 31) | pg_lrand48();
+		x = x % (last_int + 1000);
+
+		/* Do we expect this value to be present in the set? */
+		if (x >= last_int)
+			expected = false;
+		else
+		{
+			uint64		idx = x % spec->spacing;
+
+			if (idx >= patternlen)
+				expected = false;
+			else if (spec->pattern_str[idx] == '1')
+				expected = true;
+			else
+				expected = false;
+		}
+
+		/* Is it present according to intset_is_member() ? */
+		b = intset_is_member(intset, x);
+
+		if (b != expected)
+			elog(ERROR, "mismatch at %lu: %d vs %d", x, b, expected);
+	}
+	endtime = GetCurrentTimestamp();
+	if (intset_test_stats)
+		fprintf(stderr, "probed %lu values in %lu ms\n", n, (endtime - starttime) / 1000);
+
+	/*
+	 * Test iterator
+	 */
+	starttime = GetCurrentTimestamp();
+
+	intset_begin_iterate(intset);
+	n = 0;
+	last_int = 0;
+	while (n < spec->num_values)
+	{
+		for (int i = 0; i < pattern_num_values && n < spec->num_values; i++)
+		{
+			uint64		expected = last_int + pattern_values[i];
+			uint64		x;
+			bool		found;
+
+			x = intset_iterate_next(intset, &found);
+			if (!found)
+				break;
+
+			if (x != expected)
+				elog(ERROR, "iterate returned wrong value; got %lu, expected %lu", x, expected);
+			n++;
+		}
+		last_int += spec->spacing;
+	}
+	endtime = GetCurrentTimestamp();
+	if (intset_test_stats)
+		fprintf(stderr, "iterated %lu values in %lu ms\n", n, (endtime - starttime) / 1000);
+
+	if (n < spec->num_values)
+		elog(ERROR, "iterator stopped short after %lu entries, expected %lu", n, spec->num_values);
+	if (n > spec->num_values)
+		elog(ERROR, "iterator returned %lu entries, %lu was expected", n, spec->num_values);
+
+	intset_free(intset);
+}
+
+/*
+ * Test with a set containing a single integer.
+ */
+static void
+test_single_value(uint64 value)
+{
+	IntegerSet *intset;
+	uint64		x;
+	uint64		num_entries;
+	bool		found;
+
+	elog(NOTICE, "testing intset with single value %lu", value);
+
+	/* Create the set. */
+	intset = intset_create();
+	intset_add_member(intset, value);
+
+	/* Test intset_get_num_entries() */
+	num_entries = intset_num_entries(intset);
+	if (num_entries != 1)
+		elog(ERROR, "intset_num_entries returned %lu, expected %lu", num_entries, 1L);
+
+	/*
+	 * Test intset_is_member() at various special values, like 0 and and maximum
+	 * possible 64-bit integer, as well as the value itself.
+	 */
+	if (intset_is_member(intset, 0) != (value == 0))
+		elog(ERROR, "intset_is_member failed for 0");
+	if (intset_is_member(intset, 1) != (value == 1))
+		elog(ERROR, "intset_is_member failed for 1");
+	if (intset_is_member(intset, PG_UINT64_MAX) != (value == PG_UINT64_MAX))
+		elog(ERROR, "intset_is_member failed for PG_UINT64_MAX");
+	if (intset_is_member(intset, value) != true)
+		elog(ERROR, "intset_is_member failed for the tested value");
+
+	/*
+	 * Test iterator
+	 */
+	intset_begin_iterate(intset);
+	x = intset_iterate_next(intset, &found);
+	if (!found || x != value)
+		elog(ERROR, "intset_iterate_next failed for %lu", x);
+
+	x = intset_iterate_next(intset, &found);
+	if (found)
+		elog(ERROR, "intset_iterate_next failed %lu", x);
+
+	intset_free(intset);
+}
+
+/*
+ * Test with an integer set that contains:
+ *
+ * - a given single 'value', and
+ * - all integers between 'filler_min' and 'filler_max'.
+ *
+ * This exercises different codepaths than testing just with a single value,
+ * because the implementation buffers newly-added values.  If we add just
+ * single value to the set, we won't test the internal B-tree code at all,
+ * just the code that deals with the buffer.
+ */
+static void
+test_single_value_and_filler(uint64 value, uint64 filler_min, uint64 filler_max)
+{
+	IntegerSet *intset;
+	uint64		x;
+	bool		found;
+	uint64	   *iter_expected;
+	uint64		n = 0;
+	uint64		num_entries = 0;
+	uint64		mem_usage;
+
+	elog(NOTICE, "testing intset with value %lu, and all between %lu and %lu",
+		 value, filler_min, filler_max);
+
+	intset = intset_create();
+
+	iter_expected = palloc(sizeof(uint64) * (filler_max - filler_min + 1));
+	if (value < filler_min)
+	{
+		intset_add_member(intset, value);
+		iter_expected[n++] = value;
+	}
+
+	for (x = filler_min; x < filler_max; x++)
+	{
+		intset_add_member(intset, x);
+		iter_expected[n++] = x;
+	}
+
+	if (value >= filler_max)
+	{
+		intset_add_member(intset, value);
+		iter_expected[n++] = value;
+	}
+
+	/* Test intset_get_num_entries() */
+	num_entries = intset_num_entries(intset);
+	if (num_entries != n)
+		elog(ERROR, "intset_num_entries returned %lu, expected %lu", num_entries, n);
+
+	/*
+	 * Test intset_is_member() at various spots, at and around the values that we
+	 * expect to be set, as well as 0 and the maximum possible value.
+	 */
+	check_with_filler(intset, 0,                 value, filler_min, filler_max);
+	check_with_filler(intset, 1,                 value, filler_min, filler_max);
+	check_with_filler(intset, filler_min - 1,    value, filler_min, filler_max);
+	check_with_filler(intset, filler_min,        value, filler_min, filler_max);
+	check_with_filler(intset, filler_min + 1,    value, filler_min, filler_max);
+	check_with_filler(intset, value - 1,         value, filler_min, filler_max);
+	check_with_filler(intset, value,             value, filler_min, filler_max);
+	check_with_filler(intset, value + 1,         value, filler_min, filler_max);
+	check_with_filler(intset, filler_max - 1,    value, filler_min, filler_max);
+	check_with_filler(intset, filler_max,        value, filler_min, filler_max);
+	check_with_filler(intset, filler_max + 1,    value, filler_min, filler_max);
+	check_with_filler(intset, PG_UINT64_MAX - 1, value, filler_min, filler_max);
+	check_with_filler(intset, PG_UINT64_MAX,     value, filler_min, filler_max);
+
+	intset_begin_iterate(intset);
+	for (uint64 i = 0; i < n; i++)
+	{
+		x = intset_iterate_next(intset, &found);
+		if (!found || x != iter_expected[i])
+			elog(ERROR, "intset_iterate_next failed for %lu", x);
+	}
+	x = intset_iterate_next(intset, &found);
+	if (found)
+		elog(ERROR, "intset_iterate_next failed %lu", x);
+
+	mem_usage = intset_memory_usage(intset);
+	if (mem_usage < 5000 || mem_usage > 500000000)
+		elog(ERROR, "intset_memory_usage() reported suspicous value: %lu", mem_usage);
+
+	intset_free(intset);
+}
+
+/*
+ * Helper function for test_single_value_and_filler.
+ *
+ * Calls intset_is_member() for value 'x', and checks that the result is what
+ * we expect.
+ */
+static void
+check_with_filler(IntegerSet *intset, uint64 x,
+				  uint64 value, uint64 filler_min, uint64 filler_max)
+{
+	bool		expected;
+	bool		actual;
+
+	expected = (x == value || (filler_min <= x && x < filler_max));
+
+	actual = intset_is_member(intset, x);
+
+	if (actual != expected)
+		elog(ERROR, "intset_is_member failed for %lu", x);
+}
+
+/*
+ * Test empty set
+ */
+static void
+test_empty(void)
+{
+	IntegerSet *intset;
+	bool		found = true;
+	uint64		x;
+
+	elog(NOTICE, "testing intset with empty set");
+
+	intset = intset_create();
+
+	/* Test intset_is_member() */
+	if (intset_is_member(intset, 0) != false)
+		elog(ERROR, "intset_is_member on empty set returned true");
+	if (intset_is_member(intset, 1) != false)
+		elog(ERROR, "intset_is_member on empty set returned true");
+	if (intset_is_member(intset, PG_UINT64_MAX) != false)
+		elog(ERROR, "intset_is_member on empty set returned true");
+
+	/* Test iterator */
+	intset_begin_iterate(intset);
+	x = intset_iterate_next(intset, &found);
+	if (found)
+		elog(ERROR, "intset_iterate_next on empty set returned a value (%lu)", x);
+
+	intset_free(intset);
+}
+
+/*
+ * Test with integers that are more than 2^60 apart.
+ *
+ * The Simple-8b encoding used by the set implementation can only encode
+ * values up to 2^60.  That makes large differences like this interesting
+ * to test.
+ */
+static void
+test_huge_distances(void)
+{
+	IntegerSet *intset;
+	uint64		values[1000];
+	int			num_values = 0;
+	uint64		val = 0;
+	bool		found;
+	uint64		x;
+
+	elog(NOTICE, "testing intset with distances > 2^60 between values");
+
+	val = 0;
+	values[num_values++] = val;
+
+	val += 1152921504606846976L - 1;	/* 2^60 - 1*/
+	values[num_values++] = val;
+
+	val += 1152921504606846976L - 1;	/* 2^60 - 1*/
+	values[num_values++] = val;
+
+	val += 1152921504606846976L;		/* 2^60 */
+	values[num_values++] = val;
+
+	val += 1152921504606846976L;		/* 2^60 */
+	values[num_values++] = val;
+
+	val += 1152921504606846976L;		/* 2^60 */
+	values[num_values++] = val;
+
+	val += 1152921504606846976L + 1;	/* 2^60 + 1 */
+	values[num_values++] = val;
+
+	val += 1152921504606846976L + 1;	/* 2^60 + 1 */
+	values[num_values++] = val;
+
+	val += 1152921504606846976L + 1;	/* 2^60 + 1 */
+	values[num_values++] = val;
+
+	val += 1152921504606846976L;		/* 2^60 */
+	values[num_values++] = val;
+
+	/* we're now very close to 2^64, so can't add large values anymore */
+
+	intset = intset_create();
+
+	/*
+	 * Add many more values to the end, to make sure that all the above
+	 * values get flushed and packed into the tree structure.
+	 */
+	while (num_values < 1000)
+	{
+		val += pg_lrand48();
+		values[num_values++] = val;
+	}
+
+	/* Add these numbers to the set */
+	for (int i = 0; i < num_values; i++)
+		intset_add_member(intset, values[i]);
+
+	/*
+	 * Test iterset_is_member() around each of these values
+	 */
+	for (int i = 0; i < num_values; i++)
+	{
+		uint64		x = values[i];
+		bool		result;
+
+		if (x > 0)
+		{
+			result = intset_is_member(intset, x - 1);
+			if (result != false)
+				elog(ERROR, "intset_is_member failed for %lu", x - 1);
+		}
+
+		result = intset_is_member(intset, x);
+		if (result != true)
+			elog(ERROR, "intset_is_member failed for %lu", x);
+
+		result = intset_is_member(intset, x + 1);
+		if (result != false)
+			elog(ERROR, "intset_is_member failed for %lu", x + 1);
+	}
+
+	/*
+	 * Test iterator
+	 */
+	intset_begin_iterate(intset);
+	for (int i = 0; i < num_values; i++)
+	{
+		x = intset_iterate_next(intset, &found);
+		if (!found || x != values[i])
+			elog(ERROR, "intset_iterate_next failed for %lu", x);
+	}
+	x = intset_iterate_next(intset, &found);
+	if (found)
+		elog(ERROR, "intset_iterate_next failed %lu", x);
+}
diff --git a/src/test/modules/test_integerset/test_integerset.control b/src/test/modules/test_integerset/test_integerset.control
new file mode 100644
index 00000000000..7d20c2d7b88
--- /dev/null
+++ b/src/test/modules/test_integerset/test_integerset.control
@@ -0,0 +1,4 @@
+comment = 'Test code for integerset'
+default_version = '1.0'
+module_pathname = '$libdir/test_integerset'
+relocatable = true
-- 
2.20.1

>From 1a3690be16be340f288c069c452e8428f1cc48ad Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Wed, 20 Mar 2019 20:24:44 +0200
Subject: [PATCH 2/2] Delete empty pages during GiST VACUUM

This commit teaches GiST to actually delete pages during VACUUM.

To do this we scan GiST two times. At first pass we make note of empty
pages and internal pages. At second pass we scan through internal pages
looking for references to empty leaf pages.

Heikki's CHANGES since v22:

* Only scan the empty pages after the last scan, in a multi-pass vacuum.
  (I think that's what we want...) We could actually be smarter, and
  do this as part of the second pass's scan, in a multi-pass vacuum.

* Call ReadNewTransactionId() while holding lock. I think that's needed
  for correctness.

* Use new IntegerSet implementation.

Author: Andrey Borodin
Discussion: https://www.postgresql.org/message-id/b1e4df12-6cd3-4706-bdbd-bf3283328...@yandex-team.ru
---
 src/backend/access/gist/README         |  48 ++++
 src/backend/access/gist/gist.c         |  15 ++
 src/backend/access/gist/gistutil.c     |  15 +-
 src/backend/access/gist/gistvacuum.c   | 350 +++++++++++++++++++++++--
 src/backend/access/gist/gistxlog.c     |  65 +++++
 src/backend/access/rmgrdesc/gistdesc.c |   3 +
 src/include/access/gist.h              |   4 +
 src/include/access/gist_private.h      |   7 +-
 src/include/access/gistxlog.h          |  12 +-
 src/test/regress/expected/gist.out     |   6 +-
 src/test/regress/sql/gist.sql          |   6 +-
 11 files changed, 489 insertions(+), 42 deletions(-)

diff --git a/src/backend/access/gist/README b/src/backend/access/gist/README
index 02228662b81..501b1c1a77a 100644
--- a/src/backend/access/gist/README
+++ b/src/backend/access/gist/README
@@ -413,6 +413,54 @@ emptied yet; tuples never move upwards in the tree. The final emptying loops
 through buffers at a given level until all buffers at that level have been
 emptied, and then moves down to the next level.
 
+Bulk delete algorithm (VACUUM)
+------------------------------
+
+VACUUM works in two stages:
+
+In the first stage, we scan the whole index in physical order. To make sure
+that we don't miss any dead tuples because a concurrent page split moved them,
+we check the F_FOLLOW_RIGHT flags and NSN on each page, to detect if the
+page has been concurrently split. If a concurrent page split is detected, and
+one half the page was moved to a position that we already scanned, we "jump"
+to scan the page again. This is the same mechanism that B-tree VACUUM uses,
+but because we already have NSNs on pages, to detect page splits during
+searches, we don't need a "vacuum cycle ID" concept for that like B-tree does.
+
+While we scan all the pages, we also make note of any completely empty leaf
+pages. We will try to unlink them from the tree in the second stage. We also
+record the block numbers of all internal pages, in an IntegerSet. They are
+needed in the second stage, to locate parents of empty pages.
+
+In the second stage, we try to unlink any empty leaf pages from the tree, so
+that their space can be reused. If we didn't see any empty pages in the first
+stage, the second stage is skipped. In order to delete an empty page, its
+downlink must be removed from the parent. We scan all the internal pages,
+whose block numbers we
+memorized in the first stage, and look for downlinks to pages that we have
+memorized as being empty. Whenever we find one, we acquire a lock on the
+parent and child page, re-check that the child page is still empty. Then, we
+remove the downlink and mark the child as deleted, and release the locks.
+
+The insertion algorithm would get confused, if an internal page was completely
+empty. So we never delete the last child of an internal page, even if it's
+empty. Currently, we only support deleting leaf pages.
+
+This page deletion algorithm works on a best-effor basis. It might fail to
+find a downlink, if a concurrent page split moved it after the first stage.
+In that case, we won't be able to remove all empty pages. That's OK, it's
+not expected to happen very often, and hopefully the next VACUUM will clean
+it up, instead.
+
+When we have deleted a page, it's possible that an in-progress search will
+still descend on the page, if it saw the downlink before we removed it. The
+search will see that it is deleted, and ignore it, but as long as that can
+happen, we cannot reuse the page. To "wait out" any in-progress searches, when
+the page is deleted, it's labeled with the current next-transaction counter
+value. The page is not recycled, until that XID is no longer visible to
+anyone. That's much more conservative than necessary, but let's keep it
+simple.
+
 
 Authors:
 	Teodor Sigaev	<teo...@sigaev.ru>
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 2ce5425ef98..a746e911f37 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -704,6 +704,9 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace,
 			GISTInsertStack *item;
 			OffsetNumber downlinkoffnum;
 
+			/* currently, internal pages are never deleted */
+			Assert(!GistPageIsDeleted(stack->page));
+
 			downlinkoffnum = gistchoose(state.r, stack->page, itup, giststate);
 			iid = PageGetItemId(stack->page, downlinkoffnum);
 			idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
@@ -838,6 +841,18 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace,
 				}
 			}
 
+			/*
+			 * The page might have been deleted after we scanned the parent
+			 * and saw the downlink.
+			 */
+			if (GistPageIsDeleted(stack->page))
+			{
+				UnlockReleaseBuffer(stack->buffer);
+				xlocked = false;
+				state.stack = stack = stack->parent;
+				continue;
+			}
+
 			/* now state.stack->(page, buffer and blkno) points to leaf page */
 
 			gistinserttuple(&state, stack, giststate, itup,
diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c
index f32e16eed58..4e511dfb8c2 100644
--- a/src/backend/access/gist/gistutil.c
+++ b/src/backend/access/gist/gistutil.c
@@ -23,6 +23,7 @@
 #include "storage/lmgr.h"
 #include "utils/float.h"
 #include "utils/syscache.h"
+#include "utils/snapmgr.h"
 #include "utils/lsyscache.h"
 
 
@@ -829,13 +830,21 @@ gistNewBuffer(Relation r)
 		{
 			Page		page = BufferGetPage(buffer);
 
+			/*
+			 * If the page was never initialized, it's OK to use.
+			 */
 			if (PageIsNew(page))
-				return buffer;	/* OK to use, if never initialized */
+				return buffer;
 
 			gistcheckpage(r, buffer);
 
-			if (GistPageIsDeleted(page))
-				return buffer;	/* OK to use */
+			/*
+			 * Otherwise, recycle it if deleted, and too old to have any processes
+			 * interested in it.
+			 */
+			if (GistPageIsDeleted(page) &&
+				TransactionIdPrecedes(GistPageGetDeleteXid(page), RecentGlobalXmin))
+				return buffer;
 
 			LockBuffer(buffer, GIST_UNLOCK);
 		}
diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c
index 3c1d75691e8..531b4b73a45 100644
--- a/src/backend/access/gist/gistvacuum.c
+++ b/src/backend/access/gist/gistvacuum.c
@@ -16,26 +16,49 @@
 
 #include "access/genam.h"
 #include "access/gist_private.h"
+#include "access/transam.h"
 #include "commands/vacuum.h"
+#include "lib/integerset.h"
 #include "miscadmin.h"
 #include "storage/indexfsm.h"
 #include "storage/lmgr.h"
 
-/* Working state needed by gistbulkdelete */
+/*
+ * State kept across vacuum stages.
+ */
 typedef struct
 {
+	IndexBulkDeleteResult stats;	/* must be first */
+
 	IndexVacuumInfo *info;
-	IndexBulkDeleteResult *stats;
+
+	/*
+	 * These are used to memorize all internal and empty leaf pages in the 1st
+	 * vacuum phase.  They are used in the 2nd phase, to delete all the empty
+	 * pages.
+	 */
+	IntegerSet *internalPagesSet;
+	IntegerSet *emptyLeafPagesSet;
+} GistBulkDeleteResult;
+
+/* Working state needed by gistbulkdelete */
+typedef struct
+{
+	GistBulkDeleteResult *stats;
 	IndexBulkDeleteCallback callback;
 	void	   *callback_state;
 	GistNSN		startNSN;
 	BlockNumber totFreePages;	/* true total # of free pages */
 } GistVacState;
 
-static void gistvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
+static void gistvacuumscan(IndexVacuumInfo *info, GistBulkDeleteResult *stats,
 			   IndexBulkDeleteCallback callback, void *callback_state);
 static void gistvacuumpage(GistVacState *vstate, BlockNumber blkno,
 			   BlockNumber orig_blkno);
+static void gistvacuum_recycle_pages(GistBulkDeleteResult *stats);
+static bool gistdeletepage(GistBulkDeleteResult *stats,
+			   Buffer buffer, OffsetNumber downlink,
+			   Buffer leafBuffer);
 
 /*
  * VACUUM bulkdelete stage: remove index entries.
@@ -44,13 +67,15 @@ IndexBulkDeleteResult *
 gistbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 			   IndexBulkDeleteCallback callback, void *callback_state)
 {
+	GistBulkDeleteResult *gist_stats = (GistBulkDeleteResult *) stats;
+
 	/* allocate stats if first time through, else re-use existing struct */
-	if (stats == NULL)
-		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
+	if (gist_stats == NULL)
+		gist_stats = (GistBulkDeleteResult *) palloc0(sizeof(GistBulkDeleteResult));
 
-	gistvacuumscan(info, stats, callback, callback_state);
+	gistvacuumscan(info, gist_stats, callback, callback_state);
 
-	return stats;
+	return (IndexBulkDeleteResult *) gist_stats;
 }
 
 /*
@@ -59,6 +84,8 @@ gistbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 IndexBulkDeleteResult *
 gistvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 {
+	GistBulkDeleteResult *gist_stats = (GistBulkDeleteResult *) stats;
+
 	/* No-op in ANALYZE ONLY mode */
 	if (info->analyze_only)
 		return stats;
@@ -68,10 +95,26 @@ gistvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 	 * stats from the latest gistbulkdelete call.  If it wasn't called, we
 	 * still need to do a pass over the index, to obtain index statistics.
 	 */
-	if (stats == NULL)
+	if (gist_stats == NULL)
 	{
-		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
-		gistvacuumscan(info, stats, NULL, NULL);
+		gist_stats = (GistBulkDeleteResult *) palloc0(sizeof(GistBulkDeleteResult));
+		gistvacuumscan(info, gist_stats, NULL, NULL);
+	}
+
+	/*
+	 * If we saw any empty pages that could be recycle, try to unlink them from
+	 * the tree so that they can be reused.
+	 */
+	if (gist_stats->emptyLeafPagesSet)
+	{
+		gistvacuum_recycle_pages(gist_stats);
+		intset_free(gist_stats->emptyLeafPagesSet);
+		gist_stats->emptyLeafPagesSet = NULL;
+	}
+	if (gist_stats->internalPagesSet)
+	{
+		intset_free(gist_stats->internalPagesSet);
+		gist_stats->internalPagesSet = NULL;
 	}
 
 	/*
@@ -82,11 +125,11 @@ gistvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 	 */
 	if (!info->estimated_count)
 	{
-		if (stats->num_index_tuples > info->num_heap_tuples)
-			stats->num_index_tuples = info->num_heap_tuples;
+		if (gist_stats->stats.num_index_tuples > info->num_heap_tuples)
+			gist_stats->stats.num_index_tuples = info->num_heap_tuples;
 	}
 
-	return stats;
+	return (IndexBulkDeleteResult *) gist_stats;
 }
 
 /*
@@ -97,15 +140,16 @@ gistvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
  * btvacuumcleanup invoke this (the latter only if no btbulkdelete call
  * occurred).
  *
- * This also adds unused/delete pages to the free space map, although that
- * is currently not very useful.  There is currently no support for deleting
- * empty pages, so recycleable pages can only be found if an error occurs
- * while the index is being expanded, leaving an all-zeros page behind.
+ * This also makes note of any empty leaf pages, as well as all internal
+ * pages. gistvacuum_recycle_pages() needs that information.  Any deleted
+ * pages are added directly to the free space map.  (They should've been
+ * added there when they were originally deleted, already, but it's possible
+ * that the FSM was lost at a crash, for example.)
  *
  * The caller is responsible for initially allocating/zeroing a stats struct.
  */
 static void
-gistvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
+gistvacuumscan(IndexVacuumInfo *info, GistBulkDeleteResult *stats,
 			   IndexBulkDeleteCallback callback, void *callback_state)
 {
 	Relation	rel = info->index;
@@ -118,12 +162,19 @@ gistvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 	 * Reset counts that will be incremented during the scan; needed in case
 	 * of multiple scans during a single VACUUM command.
 	 */
-	stats->estimated_count = false;
-	stats->num_index_tuples = 0;
-	stats->pages_deleted = 0;
+	stats->stats.estimated_count = false;
+	stats->stats.num_index_tuples = 0;
+	stats->stats.pages_deleted = 0;
+
+	if (stats->internalPagesSet != NULL)
+		intset_free(stats->internalPagesSet);
+	stats->internalPagesSet = intset_create();
+	if (stats->emptyLeafPagesSet != NULL)
+		intset_free(stats->emptyLeafPagesSet);
+	stats->emptyLeafPagesSet = intset_create();
 
 	/* Set up info to pass down to gistvacuumpage */
-	vstate.info = info;
+	stats->info = info;
 	vstate.stats = stats;
 	vstate.callback = callback;
 	vstate.callback_state = callback_state;
@@ -171,6 +222,7 @@ gistvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 		/* Quit if we've scanned the whole relation */
 		if (blkno >= num_pages)
 			break;
+
 		/* Iterate over pages, then loop back to recheck length */
 		for (; blkno < num_pages; blkno++)
 			gistvacuumpage(&vstate, blkno, blkno);
@@ -192,8 +244,8 @@ gistvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 		IndexFreeSpaceMapVacuum(rel);
 
 	/* update statistics */
-	stats->num_pages = num_pages;
-	stats->pages_free = vstate.totFreePages;
+	stats->stats.num_pages = num_pages;
+	stats->stats.pages_free = vstate.totFreePages;
 }
 
 /*
@@ -210,8 +262,8 @@ gistvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 static void
 gistvacuumpage(GistVacState *vstate, BlockNumber blkno, BlockNumber orig_blkno)
 {
-	IndexVacuumInfo *info = vstate->info;
-	IndexBulkDeleteResult *stats = vstate->stats;
+	GistBulkDeleteResult *stats = vstate->stats;
+	IndexVacuumInfo *info = stats->info;
 	IndexBulkDeleteCallback callback = vstate->callback;
 	void	   *callback_state = vstate->callback_state;
 	Relation	rel = info->index;
@@ -240,12 +292,13 @@ restart:
 		/* Okay to recycle this page */
 		RecordFreeIndexPage(rel, blkno);
 		vstate->totFreePages++;
-		stats->pages_deleted++;
+		stats->stats.pages_deleted++;
 	}
 	else if (GistPageIsLeaf(page))
 	{
 		OffsetNumber todelete[MaxOffsetNumber];
 		int			ntodelete = 0;
+		int			nremain;
 		GISTPageOpaque opaque = GistPageGetOpaque(page);
 		OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
 
@@ -314,12 +367,28 @@ restart:
 
 			END_CRIT_SECTION();
 
-			stats->tuples_removed += ntodelete;
+			stats->stats.tuples_removed += ntodelete;
 			/* must recompute maxoff */
 			maxoff = PageGetMaxOffsetNumber(page);
 		}
 
-		stats->num_index_tuples += maxoff - FirstOffsetNumber + 1;
+		nremain = maxoff - FirstOffsetNumber + 1;
+		if (nremain == 0)
+		{
+			/*
+			 * The page is now completely empty.  Remember its block number,
+			 * we will try to delete the page in the second stage, in
+			 * gistvacuum_recycle_pages().
+			 *
+			 * Skip this when recursing, because IntegerSet requires that the
+			 * values are added in ascending order.  The next VACUUM will pick
+			 * it up...
+			 */
+			if (blkno == orig_blkno)
+				intset_add_member(stats->emptyLeafPagesSet, blkno);
+		}
+		else
+			stats->stats.num_index_tuples += nremain;
 	}
 	else
 	{
@@ -347,6 +416,14 @@ restart:
 						 errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
 						 errhint("Please REINDEX it.")));
 		}
+
+		/*
+		 * Remember the block number of this page, so that we can revisit
+		 * it later in gistvacuum_recycle_pages(), when we search for parents
+		 * of empty children.
+		 */
+		if (blkno == orig_blkno)
+			intset_add_member(stats->internalPagesSet, blkno);
 	}
 
 	UnlockReleaseBuffer(buffer);
@@ -364,3 +441,218 @@ restart:
 		goto restart;
 	}
 }
+
+/*
+ * Scan all internal pages, and try to delete their empty child pages.
+ */
+static void
+gistvacuum_recycle_pages(GistBulkDeleteResult *stats)
+{
+	IndexVacuumInfo *info = stats->info;
+	Relation	rel = info->index;
+	BlockNumber	empty_pages_remaining;
+
+	empty_pages_remaining = intset_num_entries(stats->emptyLeafPagesSet);
+
+	/*
+	 * Rescan all inner pages to find those that have empty child pages.
+	 */
+	intset_begin_iterate(stats->internalPagesSet);
+	while (empty_pages_remaining > 0)
+	{
+		BlockNumber	blkno;
+		bool		found;
+		Buffer		buffer;
+		Page		page;
+		OffsetNumber off,
+					maxoff;
+		OffsetNumber todelete[MaxOffsetNumber];
+		BlockNumber	leafs_to_delete[MaxOffsetNumber];
+		int			ntodelete;
+		int			deleted;
+
+		blkno = intset_iterate_next(stats->internalPagesSet, &found);
+		if (!found)
+			break;
+
+		buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
+									info->strategy);
+
+		LockBuffer(buffer, GIST_SHARE);
+		page = (Page) BufferGetPage(buffer);
+
+		if (PageIsNew(page) || GistPageIsDeleted(page) || GistPageIsLeaf(page))
+		{
+			/*
+			 * This page was an internal page earlier, but now it's something
+			 * else. Shouldn't happen...
+			 */
+			Assert(false);
+			UnlockReleaseBuffer(buffer);
+			continue;
+		}
+
+		/*
+		 * Scan all the downlinks, and see if any of them point to empty leaf
+		 * pages.
+		 */
+		maxoff = PageGetMaxOffsetNumber(page);
+		ntodelete = 0;
+		for (off = FirstOffsetNumber;
+			 off <= maxoff && ntodelete < maxoff - 1;
+			 off = OffsetNumberNext(off))
+		{
+			ItemId		iid = PageGetItemId(page, off);
+			IndexTuple	idxtuple = (IndexTuple) PageGetItem(page, iid);
+			BlockNumber leafblk;
+
+			leafblk = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
+			if (intset_is_member(stats->emptyLeafPagesSet, leafblk))
+			{
+				leafs_to_delete[ntodelete] = leafblk;
+				todelete[ntodelete++] = off;
+			}
+		}
+
+		/*
+		 * In order to avoid deadlock, child page must be locked before
+		 * parent, so we must release the lock on the parent, lock the child,
+		 * and then re-acquire the lock the parent. (And we wouldn't want
+		 * to do I/O, while holding a lock, anyway.)
+		 *
+		 * At the instant that we're not holding a lock on the parent, the
+		 * downlink might get moved by a concurrent, so we must re-check that
+		 * it still points to the same child page after we have acquired both
+		 * locks. Also, another backend might have inserted a tuple to the
+		 * page, so that it is no longer empty. gistdeletepage() re-checks all
+		 * these conditions.
+		 */
+		LockBuffer(buffer, GIST_UNLOCK);
+
+		deleted = 0;
+		for (int i = 0; i < ntodelete; i++)
+		{
+			Buffer		leafbuf;
+
+			/*
+			 * Don't remove the last downlink from the parent. That would
+			 * confuse the insertion code.
+			 */
+			if (PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
+				break;
+
+			leafbuf = ReadBufferExtended(rel, MAIN_FORKNUM, leafs_to_delete[i],
+										 RBM_NORMAL, info->strategy);
+			LockBuffer(leafbuf, GIST_EXCLUSIVE);
+			gistcheckpage(rel, leafbuf);
+
+			LockBuffer(buffer, GIST_EXCLUSIVE);
+			if (gistdeletepage(stats, buffer, todelete[i] - deleted, leafbuf))
+				deleted++;
+			LockBuffer(buffer, GIST_UNLOCK);
+
+			UnlockReleaseBuffer(leafbuf);
+		}
+		empty_pages_remaining -= deleted;
+
+		ReleaseBuffer(buffer);
+	}
+}
+
+
+/*
+ * gistdeletepage takes parent page and leaf page and tries to delete leaf.
+ * Both pages must be locked. Returns true if delete actually happened.
+ * Does not remove last downlink.
+ */
+static bool
+gistdeletepage(GistBulkDeleteResult *stats,
+			   Buffer parentBuffer, OffsetNumber downlink,
+			   Buffer leafBuffer)
+{
+	Page		parentPage = BufferGetPage(parentBuffer);
+	Page		leafPage = BufferGetPage(leafBuffer);
+	ItemId		iid;
+	IndexTuple	idxtuple;
+	XLogRecPtr	recptr;
+	TransactionId txid;
+
+	/* Check that the leaf is still empty */
+	if (!GistPageIsLeaf(leafPage))
+	{
+		Assert(false);
+		return false;
+	}
+	if (PageGetMaxOffsetNumber(leafPage) != InvalidOffsetNumber)
+		return false;		/* no longer empty */
+
+	if (GistFollowRight(leafPage)
+		|| GistPageGetNSN(parentPage) > GistPageGetNSN(leafPage))
+	{
+		/* Don't mess with a concurrent page split. */
+		return false;
+	}
+
+	/*
+	 * Check that the parent page still looks valid.
+	 */
+	if (PageIsNew(parentPage) ||
+		GistPageIsDeleted(parentPage) ||
+		GistPageIsLeaf(parentPage))
+	{
+		Assert(false);
+		return false;
+	}
+
+	/*
+	 * Check that old downlink is still pointing to leafBuffer.
+	 *
+	 * It might have moved by a concurrent insert. We could try to relocate
+	 * it, by scanning the page again, or perhaps even by moving right if
+	 * the page was split, but let's keep it simple and just give up.
+	 * The next VACUUM will pick this up again.
+	 */
+	if (PageGetMaxOffsetNumber(parentPage) < downlink
+		|| PageGetMaxOffsetNumber(parentPage) <= FirstOffsetNumber)
+		return false;
+
+	iid = PageGetItemId(parentPage, downlink);
+	idxtuple = (IndexTuple) PageGetItem(parentPage, iid);
+	if (BufferGetBlockNumber(leafBuffer) !=
+		ItemPointerGetBlockNumber(&(idxtuple->t_tid)))
+		return false;
+
+	/*
+	 * All good.  Proceed with the deletion.
+	 *
+	 * Like in _bt_unlink_halfdead_page we need an upper bound on xid
+	 * that could hold downlinks to this page. We use
+	 * ReadNewTransactionId() to instead of GetCurrentTransactionId
+	 * since we are in a VACUUM.
+	 */
+	txid = ReadNewTransactionId();
+
+	/* Mark page as deleted dropping references from internal pages */
+	START_CRIT_SECTION();
+
+	/* Remember xid of last transaction that could see this page */
+	GistPageSetDeleteXid(leafPage,txid);
+	GistPageSetDeleted(leafPage);
+	MarkBufferDirty(leafBuffer);
+	stats->stats.pages_deleted++;
+
+	MarkBufferDirty(parentBuffer);
+	/* Offsets are changed as long as we delete tuples from internal page */
+	PageIndexTupleDelete(parentPage, downlink);
+
+	if (RelationNeedsWAL(stats->info->index))
+		recptr 	= gistXLogPageDelete(leafBuffer, txid, parentBuffer, downlink);
+	else
+		recptr = gistGetFakeLSN(stats->info->index);
+	PageSetLSN(parentPage, recptr);
+	PageSetLSN(leafPage, recptr);
+
+	END_CRIT_SECTION();
+
+	return true;
+}
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index 408bd5390af..4dbca41bab1 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -508,6 +508,43 @@ gistRedoCreateIndex(XLogReaderState *record)
 	UnlockReleaseBuffer(buffer);
 }
 
+/* redo page deletion */
+static void
+gistRedoPageDelete(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record);
+	Buffer		buffer;
+	Page		page;
+
+	/* FIXME: Are we locking the pages in correct order, for hot standby? */
+
+	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+	{
+		page = (Page) BufferGetPage(buffer);
+
+		GistPageSetDeleteXid(page, xldata->deleteXid);
+		GistPageSetDeleted(page);
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+
+	if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
+	{
+		page = (Page) BufferGetPage(buffer);
+
+		PageIndexTupleDelete(page, xldata->downlinkOffset);
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+}
+
 void
 gist_redo(XLogReaderState *record)
 {
@@ -535,6 +572,9 @@ gist_redo(XLogReaderState *record)
 		case XLOG_GIST_CREATE_INDEX:
 			gistRedoCreateIndex(record);
 			break;
+		case XLOG_GIST_PAGE_DELETE:
+			gistRedoPageDelete(record);
+			break;
 		default:
 			elog(PANIC, "gist_redo: unknown op code %u", info);
 	}
@@ -653,6 +693,31 @@ gistXLogSplit(bool page_is_leaf,
 	return recptr;
 }
 
+/*
+ * Write XLOG record describing a page deletion. This also includes removal of
+ * downlink from the parent page.
+ */
+XLogRecPtr
+gistXLogPageDelete(Buffer buffer, TransactionId xid,
+				   Buffer parentBuffer, OffsetNumber downlinkOffset)
+{
+	gistxlogPageDelete xlrec;
+	XLogRecPtr	recptr;
+
+	xlrec.deleteXid = xid;
+	xlrec.downlinkOffset = downlinkOffset;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(gistxlogPageDelete));
+
+	XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+	XLogRegisterBuffer(1, parentBuffer, REGBUF_STANDARD);
+
+	recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_DELETE);
+
+	return recptr;
+}
+
 /*
  * Write XLOG record describing a page update. The update can include any
  * number of deletions and/or insertions of tuples on a single index page.
diff --git a/src/backend/access/rmgrdesc/gistdesc.c b/src/backend/access/rmgrdesc/gistdesc.c
index e468c9e15aa..0861f829922 100644
--- a/src/backend/access/rmgrdesc/gistdesc.c
+++ b/src/backend/access/rmgrdesc/gistdesc.c
@@ -76,6 +76,9 @@ gist_identify(uint8 info)
 		case XLOG_GIST_CREATE_INDEX:
 			id = "CREATE_INDEX";
 			break;
+		case XLOG_GIST_PAGE_DELETE:
+			id = "PAGE_DELETE";
+			break;
 	}
 
 	return id;
diff --git a/src/include/access/gist.h b/src/include/access/gist.h
index 3234f241560..ce8bfd83ea4 100644
--- a/src/include/access/gist.h
+++ b/src/include/access/gist.h
@@ -151,6 +151,10 @@ typedef struct GISTENTRY
 #define GistPageGetNSN(page) ( PageXLogRecPtrGet(GistPageGetOpaque(page)->nsn))
 #define GistPageSetNSN(page, val) ( PageXLogRecPtrSet(GistPageGetOpaque(page)->nsn, val))
 
+/* For deleted pages we store last xid which could see the page in scan */
+#define GistPageGetDeleteXid(page) ( ((PageHeader) (page))->pd_prune_xid )
+#define GistPageSetDeleteXid(page, val) ( ((PageHeader) (page))->pd_prune_xid = val)
+
 /*
  * Vector of GISTENTRY structs; user-defined methods union and picksplit
  * take it as one of their arguments
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
index 463d2bfc7b9..c77d0b4dd81 100644
--- a/src/include/access/gist_private.h
+++ b/src/include/access/gist_private.h
@@ -414,12 +414,17 @@ extern bool gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
 extern SplitedPageLayout *gistSplit(Relation r, Page page, IndexTuple *itup,
 		  int len, GISTSTATE *giststate);
 
+/* gistxlog.c */
+extern XLogRecPtr gistXLogPageDelete(Buffer buffer,
+				   TransactionId xid, Buffer parentBuffer,
+				   OffsetNumber downlinkOffset);
+
 extern XLogRecPtr gistXLogUpdate(Buffer buffer,
 			   OffsetNumber *todelete, int ntodelete,
 			   IndexTuple *itup, int ntup,
 			   Buffer leftchild);
 
-XLogRecPtr gistXLogDelete(Buffer buffer, OffsetNumber *todelete,
+extern XLogRecPtr gistXLogDelete(Buffer buffer, OffsetNumber *todelete,
 			   int ntodelete, RelFileNode hnode);
 
 extern XLogRecPtr gistXLogSplit(bool page_is_leaf,
diff --git a/src/include/access/gistxlog.h b/src/include/access/gistxlog.h
index 5117aabf1af..939a1ea7559 100644
--- a/src/include/access/gistxlog.h
+++ b/src/include/access/gistxlog.h
@@ -23,7 +23,7 @@
 #define XLOG_GIST_PAGE_SPLIT		0x30
  /* #define XLOG_GIST_INSERT_COMPLETE	 0x40 */	/* not used anymore */
 #define XLOG_GIST_CREATE_INDEX		0x50
- /* #define XLOG_GIST_PAGE_DELETE		 0x60 */	/* not used anymore */
+#define XLOG_GIST_PAGE_DELETE		0x60
 
 /*
  * Backup Blk 0: updated page.
@@ -76,6 +76,16 @@ typedef struct gistxlogPageSplit
 	 */
 } gistxlogPageSplit;
 
+/*
+ * Backup Blk 0: page that was deleted.
+ * Backup Blk 1: parent page, containing the downlink to the deleted page.
+ */
+typedef struct gistxlogPageDelete
+{
+	TransactionId deleteXid;	/* last Xid which could see page in scan */
+	OffsetNumber downlinkOffset; /* Offset of downlink referencing this page */
+} gistxlogPageDelete;
+
 extern void gist_redo(XLogReaderState *record);
 extern void gist_desc(StringInfo buf, XLogReaderState *record);
 extern const char *gist_identify(uint8 info);
diff --git a/src/test/regress/expected/gist.out b/src/test/regress/expected/gist.out
index f5a2993aaf2..0a43449f003 100644
--- a/src/test/regress/expected/gist.out
+++ b/src/test/regress/expected/gist.out
@@ -27,10 +27,8 @@ insert into gist_point_tbl (id, p)
 select g+100000, point(g*10+1, g*10+1) from generate_series(1, 10000) g;
 -- To test vacuum, delete some entries from all over the index.
 delete from gist_point_tbl where id % 2 = 1;
--- And also delete some concentration of values. (GiST doesn't currently
--- attempt to delete pages even when they become empty, but if it did, this
--- would exercise it)
-delete from gist_point_tbl where id < 10000;
+-- And also delete some concentration of values.
+delete from gist_point_tbl where id > 5000;
 vacuum analyze gist_point_tbl;
 -- rebuild the index with a different fillfactor
 alter index gist_pointidx SET (fillfactor = 40);
diff --git a/src/test/regress/sql/gist.sql b/src/test/regress/sql/gist.sql
index bae722fe13c..657b1954847 100644
--- a/src/test/regress/sql/gist.sql
+++ b/src/test/regress/sql/gist.sql
@@ -28,10 +28,8 @@ select g+100000, point(g*10+1, g*10+1) from generate_series(1, 10000) g;
 -- To test vacuum, delete some entries from all over the index.
 delete from gist_point_tbl where id % 2 = 1;
 
--- And also delete some concentration of values. (GiST doesn't currently
--- attempt to delete pages even when they become empty, but if it did, this
--- would exercise it)
-delete from gist_point_tbl where id < 10000;
+-- And also delete some concentration of values.
+delete from gist_point_tbl where id > 5000;
 
 vacuum analyze gist_point_tbl;
 
-- 
2.20.1

Reply via email to