From 06b880dbc24eef6ffd6ed0ddf313c36164e6efb4 Mon Sep 17 00:00:00 2001
From: Andrey Borodin <amborodin@acm.org>
Date: Mon, 5 Feb 2018 15:34:42 +0300
Subject: [PATCH] GiST intrapage indexing v1

---
 src/backend/access/gist/gist.c       | 384 +++++++++++++++++++++++++++++++----
 src/backend/access/gist/gistbuild.c  |  15 +-
 src/backend/access/gist/gistget.c    |   9 +
 src/backend/access/gist/gistutil.c   | 250 +++++++++++++++--------
 src/backend/access/gist/gistvacuum.c |   5 +-
 src/backend/access/gist/gistxlog.c   |  36 +++-
 src/include/access/gist_private.h    |  31 ++-
 src/include/access/gistxlog.h        |   5 +-
 8 files changed, 603 insertions(+), 132 deletions(-)

diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 51c32e4afe..ec73b232f4 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -33,7 +33,7 @@ static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
 				 GISTSTATE *giststate,
 				 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
 				 Buffer leftchild, Buffer rightchild,
-				 bool unlockbuf, bool unlockleftchild);
+				 bool unlockbuf, bool unlockleftchild, int ndeltup, OffsetNumber skipoffnum);
 static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
 				GISTSTATE *giststate, List *splitinfo, bool releasebuf);
 static void gistvacuumpage(Relation rel, Page page, Buffer buffer);
@@ -215,7 +215,9 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
 				BlockNumber *newblkno,
 				Buffer leftchildbuf,
 				List **splitinfo,
-				bool markfollowright)
+				bool markfollowright,
+				int ndeltup,
+				OffsetNumber skipoffnum)
 {
 	BlockNumber blkno = BufferGetBlockNumber(buffer);
 	Page		page = BufferGetPage(buffer);
@@ -248,7 +250,7 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
 	 * one-element todelete array; in the split case, it's handled implicitly
 	 * because the tuple vector passed to gistSplit won't include this tuple.
 	 */
-	is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
+	is_split = gistnospace(page, itup, ntup, oldoffnum, freespace, ndeltup);
 
 	/*
 	 * If leaf page is full, try at first to delete dead tuples. And then
@@ -257,7 +259,7 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
 	if (is_split && GistPageIsLeaf(page) && GistPageHasGarbage(page))
 	{
 		gistvacuumpage(rel, page, buffer);
-		is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
+		is_split = gistnospace(page, itup, ntup, oldoffnum, freespace, ndeltup);
 	}
 
 	if (is_split)
@@ -280,17 +282,38 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
 		 * remove the old version from the vector.
 		 */
 		itvec = gistextractpage(page, &tlen);
-		if (OffsetNumberIsValid(oldoffnum))
+
+		if (OffsetNumberIsValid(skipoffnum))
+		{
+			IndexTuple skiptuple = itvec[skipoffnum - FirstOffsetNumber];
+			int newskipgroupsize = GistTupleGetSkipCount(skiptuple) + ntup - ndeltup;
+
+			Assert(GistTupleIsSkip(skiptuple));
+			Assert(skipoffnum + newskipgroupsize <= PageGetMaxOffsetNumber(page) + ntup - ndeltup);
+			GistTupleSetSkipCount(skiptuple, newskipgroupsize);
+		}
+
+		if (OffsetNumberIsValid(oldoffnum) && ndeltup)
 		{
-			/* on inner page we should remove old tuple */
+			/* on inner page we should remove old tuples */
 			int			pos = oldoffnum - FirstOffsetNumber;
 
-			tlen--;
+			tlen -= ndeltup;
 			if (pos != tlen)
-				memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
+				memmove(itvec + pos, itvec + pos + ndeltup, sizeof(IndexTuple) * (tlen - pos));
+		}
+
+		itvec = gistjoinvector(itvec, &tlen, itup, ntup, oldoffnum);
+
+		if (GistTupleIsSkip(*itvec))
+		{
+			dist = gistSplitBySkipgroup(rel, page, itvec, tlen, giststate);
+		}
+		if (dist == NULL)
+		{
+			gistfiltervector(itvec, &tlen);
+			dist = gistSplit(rel, page, itvec, tlen, giststate);
 		}
-		itvec = gistjoinvector(itvec, &tlen, itup, ntup);
-		dist = gistSplit(rel, page, itvec, tlen, giststate);
 
 		/*
 		 * Check that split didn't produce too many pages.
@@ -357,7 +380,7 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
 		if (is_rootsplit)
 		{
 			IndexTuple *downlinks;
-			int			ndownlinks = 0;
+			int			ndownlinks = 1;
 			int			i;
 
 			rootpg.buffer = buffer;
@@ -368,8 +391,11 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
 			for (ptr = dist; ptr; ptr = ptr->next)
 				ndownlinks++;
 			downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
-			for (i = 0, ptr = dist; ptr; ptr = ptr->next)
+			for (i = 1, ptr = dist; ptr; ptr = ptr->next)
 				downlinks[i++] = ptr->itup;
+			downlinks[0] = gistunion(rel,downlinks + 1, ndownlinks - 1, giststate);
+			GistTupleMakeSkip(downlinks[0]);
+			GistTupleSetSkipCount(downlinks[0], ndownlinks - 1);
 
 			rootpg.block.blkno = GIST_ROOT_BLKNO;
 			rootpg.block.num = ndownlinks;
@@ -505,26 +531,37 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
 		 */
 		START_CRIT_SECTION();
 
+		if (OffsetNumberIsValid(skipoffnum))
+		{
+			IndexTuple skiptuple = (IndexTuple) PageGetItem(page, PageGetItemId(page, skipoffnum));
+			int newskipgroupsize = GistTupleGetSkipCount(skiptuple) + ntup - ndeltup;
+
+			Assert(GistTupleIsSkip(skiptuple));
+			Assert(skipoffnum + newskipgroupsize <= PageGetMaxOffsetNumber(page) + ntup - ndeltup);
+			GistTupleSetSkipCount(skiptuple, newskipgroupsize);
+		}
+
 		/*
 		 * Delete old tuple if any, then insert new tuple(s) if any.  If
 		 * possible, use the fast path of PageIndexTupleOverwrite.
 		 */
 		if (OffsetNumberIsValid(oldoffnum))
 		{
-			if (ntup == 1)
+			int noverwrite = Min(ntup,ndeltup);
+			for (i = 0; i < noverwrite; i++)
 			{
-				/* One-for-one replacement, so use PageIndexTupleOverwrite */
-				if (!PageIndexTupleOverwrite(page, oldoffnum, (Item) *itup,
-											 IndexTupleSize(*itup)))
+				if (!PageIndexTupleOverwrite(page, oldoffnum + i, (Item) itup[i],
+											 IndexTupleSize(itup[i])))
 					elog(ERROR, "failed to add item to index page in \"%s\"",
 						 RelationGetRelationName(rel));
 			}
-			else
+
+			for (i = noverwrite; i < ndeltup; i++)
 			{
-				/* Delete old, then append new tuple(s) to page */
-				PageIndexTupleDelete(page, oldoffnum);
-				gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
+				PageIndexTupleDelete(page, oldoffnum + i);
 			}
+
+			gistfillbuffer(page, itup + noverwrite, ntup - noverwrite, oldoffnum + noverwrite);
 		}
 		else
 		{
@@ -540,17 +577,18 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
 		if (RelationNeedsWAL(rel))
 		{
 			OffsetNumber ndeloffs = 0,
-						deloffs[1];
+						deloffs[BLCKSZ /sizeof(ItemIdData)];
 
 			if (OffsetNumberIsValid(oldoffnum))
 			{
-				deloffs[0] = oldoffnum;
-				ndeloffs = 1;
+				for (i = 0; i < ndeltup; i++)
+					deloffs[i] = oldoffnum + i;
+				ndeloffs = ndeltup;
 			}
 
 			recptr = gistXLogUpdate(buffer,
 									deloffs, ndeloffs, itup, ntup,
-									leftchildbuf);
+									leftchildbuf, skipoffnum);
 
 			PageSetLSN(page, recptr);
 		}
@@ -685,6 +723,7 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
 
 		if (!GistPageIsLeaf(stack->page))
 		{
+			OffsetNumber skipoffnum;
 			/*
 			 * This is an internal page so continue to walk down the tree.
 			 * Find the child node that has the minimum insertion penalty.
@@ -694,9 +733,71 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
 			GISTInsertStack *item;
 			OffsetNumber downlinkoffnum;
 
-			downlinkoffnum = gistchoose(state.r, stack->page, itup, giststate);
+			downlinkoffnum = gistchoose(state.r, stack->page, itup, giststate, &skipoffnum);
+
+			if (OffsetNumberIsValid(skipoffnum))
+			{
+				iid = PageGetItemId(stack->page, skipoffnum);
+				idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
+				Assert(GistTupleIsSkip(idxtuple));
+
+				/*
+				 * Check that the key representing the target child node is
+				 * consistent with the key we're inserting. Update it if it's not.
+				 */
+				newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
+				if (newtup)
+				{
+					/*
+					 * Swap shared lock for an exclusive one. Beware, the page may
+					 * change while we unlock/lock the page...
+					 */
+					if (!xlocked)
+					{
+						LockBuffer(stack->buffer, GIST_UNLOCK);
+						LockBuffer(stack->buffer, GIST_EXCLUSIVE);
+						xlocked = true;
+						stack->page = (Page) BufferGetPage(stack->buffer);
+
+						if (PageGetLSN(stack->page) != stack->lsn)
+						{
+							/* the page was changed while we unlocked it, retry */
+							continue;
+						}
+					}
+
+					/*
+					 * Update the tuple.
+					 *
+					 * We still hold the lock after gistinserttuple(), but it
+					 * might have to split the page to make the updated tuple fit.
+					 * In that case the updated tuple might migrate to the other
+					 * half of the split, so we have to go back to the parent and
+					 * descend back to the half that's a better fit for the new
+					 * tuple.
+					 */
+					if (gistinserttuple(&state, stack, giststate, newtup,
+										skipoffnum))
+					{
+						/*
+						 * If this was a root split, the root page continues to be
+						 * the parent and the updated tuple went to one of the
+						 * child pages, so we just need to retry from the root
+						 * page.
+						 */
+						if (stack->blkno != GIST_ROOT_BLKNO)
+						{
+							UnlockReleaseBuffer(stack->buffer);
+							xlocked = false;
+							state.stack = stack = stack->parent;
+						}
+						continue;
+					}
+				}
+			}
 			iid = PageGetItemId(stack->page, downlinkoffnum);
 			idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
+			Assert(!GistTupleIsSkip(idxtuple));
 			childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
 
 			/*
@@ -762,6 +863,7 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
 					continue;
 				}
 			}
+
 			LockBuffer(stack->buffer, GIST_UNLOCK);
 			xlocked = false;
 
@@ -770,6 +872,7 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
 			item->blkno = childblkno;
 			item->parent = stack;
 			item->downlinkoffnum = downlinkoffnum;
+			item->skipoffnum = skipoffnum;
 			state.stack = stack = item;
 		}
 		else
@@ -983,10 +1086,18 @@ gistFindCorrectParent(Relation r, GISTInsertStack *child)
 		while (true)
 		{
 			maxoff = PageGetMaxOffsetNumber(parent->page);
+			child->skipoffnum = InvalidOffsetNumber;
 			for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
 			{
 				iid = PageGetItemId(parent->page, i);
 				idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
+
+				if (GistTupleIsSkip(idxtuple))
+				{
+					child->skipoffnum = i;
+					continue;
+				}
+
 				if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
 				{
 					/* yes!!, found */
@@ -994,6 +1105,7 @@ gistFindCorrectParent(Relation r, GISTInsertStack *child)
 					return;
 				}
 			}
+			child->skipoffnum = InvalidOffsetNumber;
 
 			parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
 			UnlockReleaseBuffer(parent->buffer);
@@ -1174,7 +1286,7 @@ gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
 				GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum)
 {
 	return gistinserttuples(state, stack, giststate, &tuple, 1, oldoffnum,
-							InvalidBuffer, InvalidBuffer, false, false);
+							InvalidBuffer, InvalidBuffer, false, false, 1, InvalidOffsetNumber);
 }
 
 /* ----------------
@@ -1208,7 +1320,7 @@ gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
 				 GISTSTATE *giststate,
 				 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
 				 Buffer leftchild, Buffer rightchild,
-				 bool unlockbuf, bool unlockleftchild)
+				 bool unlockbuf, bool unlockleftchild, int ndeltup, OffsetNumber skipoffnum)
 {
 	List	   *splitinfo;
 	bool		is_split;
@@ -1220,8 +1332,9 @@ gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
 							   oldoffnum, NULL,
 							   leftchild,
 							   &splitinfo,
-							   true);
-
+							   true,
+							   ndeltup,
+							   skipoffnum);
 	/*
 	 * Before recursing up in case the page was split, release locks on the
 	 * child pages. We don't need to keep them locked when updating the
@@ -1246,6 +1359,85 @@ gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
 	return is_split;
 }
 
+inline void
+gistcheckskippage(Page page)
+{
+	OffsetNumber i,
+				maxoff;
+	int skiplast = 0;
+	bool wasskip = false;
+	Assert(false);
+
+	maxoff = PageGetMaxOffsetNumber(page);
+	for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+	{
+		IndexTuple tup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
+
+		if (GistTupleIsSkip(tup))
+		{
+			if (skiplast)
+			{
+				elog(NOTICE, "GS: at % d expecting %d more tuples but have skipgroup %d",i, skiplast,GistTupleGetSkipCount(tup));
+			}
+			Assert(skiplast == 0);
+			skiplast = GistTupleGetSkipCount(tup);
+			wasskip = true;
+		}
+		else
+		{
+			if (!wasskip)
+				continue;
+			if (skiplast<=0)
+			{
+				elog(NOTICE, "GS: fail at %d expecting more skips", i);
+			}
+			Assert(skiplast>0);
+			skiplast--;
+		}
+	}
+}
+
+static void
+gisttestskipgroup(GISTInsertState *state, GISTInsertStack *stack,
+				 GISTSTATE *giststate, OffsetNumber skipoffnum)
+{
+	Page		page = BufferGetPage(stack->buffer);
+	IndexTuple	skiptuple = (IndexTuple) PageGetItem(page, PageGetItemId(page, skipoffnum));
+	int			skipsize;
+
+	Assert(!GistPageIsLeaf(page));
+	Assert(GistTupleIsSkip(skiptuple));
+	skipsize = GistTupleGetSkipCount(skiptuple);
+	Assert((skipsize > 0) && (skipsize < BLCKSZ / sizeof(ItemIdData)));
+	if (skipsize > GIST_SKIPGROUP_THRESHOLD)
+	{
+		IndexTuple* itvec = gistextractrange(page, skipoffnum+1, skipsize);
+		SplitedPageLayout *ptr;
+		SplitedPageLayout *dist = gistSplit(state->r, page, itvec, skipsize, giststate);
+		int totalsize = 0;
+		for (ptr = dist; ptr; ptr = ptr->next)
+		{
+			char	   *data;
+			int i;
+			itvec[totalsize++] = ptr->itup;
+			GistTupleMakeSkip(ptr->itup);
+			GistTupleSetSkipCount(ptr->itup, ptr->block.num);
+			data = (char *) (ptr->list);
+
+			for (i = 0; i < ptr->block.num; i++)
+			{
+				IndexTuple	thistup = (IndexTuple) data;
+
+				itvec[totalsize++] = thistup;
+
+				data += IndexTupleSize(thistup);
+			}
+		}
+		gistinserttuples(state, stack, giststate, itvec, totalsize, skipoffnum,
+							InvalidBuffer, InvalidBuffer, false, false, skipsize + 1, InvalidOffsetNumber);
+	}
+}
+
 /*
  * Finish an incomplete split by inserting/updating the downlinks in parent
  * page. 'splitinfo' contains all the child pages involved in the split,
@@ -1264,6 +1456,7 @@ gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
 	GISTPageSplitInfo *right;
 	GISTPageSplitInfo *left;
 	IndexTuple	tuples[2];
+	bool		lastmomentsplit;
 
 	/* A split always contains at least two halves */
 	Assert(list_length(splitinfo) >= 2);
@@ -1297,8 +1490,8 @@ gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
 
 		if (gistinserttuples(state, stack->parent, giststate,
 							 &right->downlink, 1,
-							 InvalidOffsetNumber,
-							 left->buf, right->buf, false, false))
+							 stack->downlinkoffnum,
+							 left->buf, right->buf, false, false, 0, stack->skipoffnum))
 		{
 			/*
 			 * If the parent page was split, need to relocate the original
@@ -1320,13 +1513,19 @@ gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
 	 */
 	tuples[0] = left->downlink;
 	tuples[1] = right->downlink;
-	gistinserttuples(state, stack->parent, giststate,
+	lastmomentsplit = gistinserttuples(state, stack->parent, giststate,
 					 tuples, 2,
 					 stack->downlinkoffnum,
 					 left->buf, right->buf,
-					 true,		/* Unlock parent */
-					 unlockbuf	/* Unlock stack->buffer if caller wants that */
+					 false,		/* Unlock parent */
+					 unlockbuf,	/* Unlock stack->buffer if caller wants that */
+					 1,
+					 stack->skipoffnum
 		);
+	if (OffsetNumberIsValid(stack->skipoffnum) && !lastmomentsplit)
+		gisttestskipgroup(state,stack->parent, giststate, stack->skipoffnum);
+
+	LockBuffer(stack->parent->buffer, GIST_UNLOCK);
 	Assert(left->buf == stack->buffer);
 }
 
@@ -1417,6 +1616,121 @@ gistSplit(Relation r,
 	return res;
 }
 
+SplitedPageLayout *
+gistSplitBySkipgroup(Relation r,
+		  Page page,
+		  IndexTuple *itup,		/* contains compressed entry */
+		  int len,
+		  GISTSTATE *giststate)
+{
+	IndexTuple *lvectup,
+			   *rvectup,
+			   *skiptuples;
+	OffsetNumber *skipoffsets;
+	GistSplitVector v;
+	int			i, o, p;
+	int 		skipcount = 0;
+	SplitedPageLayout *res = NULL;
+
+	/* this should never recurse very deeply, but better safe than sorry */
+	check_stack_depth();
+
+	skiptuples = (IndexTuple *) palloc(sizeof(IndexTuple) * len);
+	skipoffsets = (OffsetNumber *) palloc(sizeof(OffsetNumber) * len);
+	for (i = 0; i < len; i++)
+	{
+		if (GistTupleIsSkip(itup[i]))
+		{
+			skipoffsets[skipcount] = i;
+			skiptuples[skipcount++] = itup[i];
+		}
+	}
+
+	/*
+	 * If a single skipgroup doesn't fit on a page, no amount of splitting will
+	 * help.
+	 */
+	if (skipcount <= 1)
+		return NULL;
+
+	memset(v.spl_lisnull, true, sizeof(bool) * giststate->tupdesc->natts);
+	memset(v.spl_risnull, true, sizeof(bool) * giststate->tupdesc->natts);
+	gistSplitByKey(r, page, skiptuples, skipcount, giststate, &v, 0);
+
+	/* form left and right vector */
+	lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
+	rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
+
+	o = 0;
+	for (i = 0; i < v.splitVector.spl_nleft; i++)
+	{
+		int index = skipoffsets[v.splitVector.spl_left[i] - 1];
+		Assert(GistTupleIsSkip(itup[index]));
+		int skipsize = GistTupleGetSkipCount(itup[index]);
+		for (p = 0; p <= skipsize; p++)
+		{
+			Assert(p==0 || !GistTupleIsSkip(itup[index + p]));
+			lvectup[o++] = itup[index + p];
+		}
+	}
+	v.splitVector.spl_nleft = o;
+
+	o = 0;
+	for (i = 0; i < v.splitVector.spl_nright; i++)
+	{
+		int index = skipoffsets[v.splitVector.spl_right[i] - 1];
+		Assert(GistTupleIsSkip(itup[index]));
+		int skipsize = GistTupleGetSkipCount(itup[index]);
+		for (p = 0; p <= skipsize; p++)
+		{
+			Assert(p==0 || !GistTupleIsSkip(itup[index + p]));
+			rvectup[o++] = itup[index + p];
+		}
+	}
+	v.splitVector.spl_nright = o;
+
+	/* finalize splitting (may need another split) */
+	if (!gistfitpage(rvectup, v.splitVector.spl_nright))
+	{
+		res = gistSplitBySkipgroup(r, page, rvectup, v.splitVector.spl_nright, giststate);
+		if (res == NULL)
+			return NULL;
+	}
+	else
+	{
+		ROTATEDIST(res);
+		res->block.num = v.splitVector.spl_nright;
+		res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
+		res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
+	}
+
+	if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
+	{
+		SplitedPageLayout *resptr,
+				   *subres;
+
+		resptr = subres = gistSplitBySkipgroup(r, page, lvectup, v.splitVector.spl_nleft, giststate);
+		if (subres == NULL)
+			return NULL;
+
+		/* install on list's tail */
+		while (resptr->next)
+			resptr = resptr->next;
+
+		resptr->next = res;
+		res = subres;
+	}
+	else
+	{
+		ROTATEDIST(res);
+		res->block.num = v.splitVector.spl_nleft;
+		res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
+		res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
+	}
+
+	return res;
+}
+
 /*
  * Create a GISTSTATE and fill it with information about the index
  */
@@ -1579,7 +1893,7 @@ gistvacuumpage(Relation rel, Page page, Buffer buffer)
 
 			recptr = gistXLogUpdate(buffer,
 									deletable, ndeletable,
-									NULL, 0, InvalidBuffer);
+									NULL, 0, InvalidBuffer, InvalidOffsetNumber);
 
 			PageSetLSN(page, recptr);
 		}
diff --git a/src/backend/access/gist/gistbuild.c b/src/backend/access/gist/gistbuild.c
index 434f15f014..a91776029c 100644
--- a/src/backend/access/gist/gistbuild.c
+++ b/src/backend/access/gist/gistbuild.c
@@ -550,6 +550,7 @@ gistProcessItup(GISTBuildState *buildstate, IndexTuple itup,
 	GISTBuildBuffers *gfbb = buildstate->gfbb;
 	Relation	indexrel = buildstate->indexrel;
 	BlockNumber childblkno;
+	OffsetNumber skipoffnum;
 	Buffer		buffer;
 	bool		result = false;
 	BlockNumber blkno;
@@ -591,7 +592,7 @@ gistProcessItup(GISTBuildState *buildstate, IndexTuple itup,
 		LockBuffer(buffer, GIST_EXCLUSIVE);
 
 		page = (Page) BufferGetPage(buffer);
-		childoffnum = gistchoose(indexrel, page, itup, giststate);
+		childoffnum = gistchoose(indexrel, page, itup, giststate, &skipoffnum);
 		iid = PageGetItemId(page, childoffnum);
 		idxtuple = (IndexTuple) PageGetItem(page, iid);
 		childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
@@ -690,7 +691,9 @@ gistbufferinginserttuples(GISTBuildState *buildstate, Buffer buffer, int level,
 							   itup, ntup, oldoffnum, &placed_to_blk,
 							   InvalidBuffer,
 							   &splitinfo,
-							   false);
+							   false,
+							   1,
+							   InvalidOffsetNumber);
 
 	/*
 	 * If this is a root split, update the root path item kept in memory. This
@@ -721,6 +724,8 @@ gistbufferinginserttuples(GISTBuildState *buildstate, Buffer buffer, int level,
 			{
 				ItemId		iid = PageGetItemId(page, off);
 				IndexTuple	idxtuple = (IndexTuple) PageGetItem(page, iid);
+				if (GistTupleIsSkip(idxtuple))
+					continue;
 				BlockNumber childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
 				Buffer		childbuf = ReadBuffer(buildstate->indexrel, childblkno);
 
@@ -879,7 +884,7 @@ gistBufferingFindCorrectParent(GISTBuildState *buildstate,
 		ItemId		iid = PageGetItemId(page, *downlinkoffnum);
 		IndexTuple	idxtuple = (IndexTuple) PageGetItem(page, iid);
 
-		if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == childblkno)
+		if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == childblkno && !GistTupleIsSkip(idxtuple))
 		{
 			/* Still there */
 			return buffer;
@@ -897,6 +902,8 @@ gistBufferingFindCorrectParent(GISTBuildState *buildstate,
 	{
 		ItemId		iid = PageGetItemId(page, off);
 		IndexTuple	idxtuple = (IndexTuple) PageGetItem(page, iid);
+		if (GistTupleIsSkip(idxtuple))
+			continue;
 
 		if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == childblkno)
 		{
@@ -1177,6 +1184,8 @@ gistMemorizeAllDownlinks(GISTBuildState *buildstate, Buffer parentbuf)
 	{
 		ItemId		iid = PageGetItemId(page, off);
 		IndexTuple	idxtuple = (IndexTuple) PageGetItem(page, iid);
+		if (GistTupleIsSkip(idxtuple))
+			continue;
 		BlockNumber childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
 
 		gistMemorizeParent(buildstate, childblkno, parentblkno);
diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c
index b30b931c3b..a9406a0aa1 100644
--- a/src/backend/access/gist/gistget.c
+++ b/src/backend/access/gist/gistget.c
@@ -421,6 +421,15 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances,
 
 		/* Ignore tuple if it doesn't match */
 		if (!match)
+		{
+			if (GistTupleIsSkip(it))
+			{
+				i += GistTupleGetSkipCount(it);
+			}
+			continue;
+		}
+
+		if (GistTupleIsSkip(it))
 			continue;
 
 		if (tbm && GistPageIsLeaf(page))
diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c
index 55cccd247a..87af0ad0f7 100644
--- a/src/backend/access/gist/gistutil.c
+++ b/src/backend/access/gist/gistutil.c
@@ -55,7 +55,7 @@ gistfillbuffer(Page page, IndexTuple *itup, int len, OffsetNumber off)
  * Check space for itup vector on page
  */
 bool
-gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace)
+gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace, int ndeltup)
 {
 	unsigned int size = freespace,
 				deleted = 0;
@@ -64,11 +64,14 @@ gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size f
 	for (i = 0; i < len; i++)
 		size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
 
-	if (todelete != InvalidOffsetNumber)
+	if (OffsetNumberIsValid(todelete))
 	{
-		IndexTuple	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, todelete));
+		for (i = 0; i < ndeltup; i++)
+		{
+			IndexTuple	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, todelete + i));
 
-		deleted = IndexTupleSize(itup) + sizeof(ItemIdData);
+			deleted = IndexTupleSize(itup) + sizeof(ItemIdData);
+		}
 	}
 
 	return (PageGetFreeSpace(page) + deleted < size);
@@ -106,18 +109,71 @@ gistextractpage(Page page, int *len /* out */ )
 	return itvec;
 }
 
+/*
+ * Read range into itup vector
+ */
+IndexTuple *
+gistextractrange(Page page, OffsetNumber start, int len)
+{
+	OffsetNumber i,
+				maxoff;
+	IndexTuple *itvec,
+			   *itvecnext;
+
+	maxoff = PageGetMaxOffsetNumber(page);
+	Assert(maxoff >= start + len - 1);
+
+	/* caller will use this x2 allocation */
+	itvecnext = itvec = palloc(sizeof(IndexTuple) * len * 2);
+
+	for (i = start; i < start + len; i = OffsetNumberNext(i))
+	{
+		IndexTuple tup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
+		Assert(!GistTupleIsSkip(tup));
+		*itvecnext = tup;
+		itvecnext++;
+	}
+
+	return itvec;
+}
+
 /*
  * join two vectors into one
  */
 IndexTuple *
-gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
+gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen, OffsetNumber oldoffnum)
 {
+	if (!OffsetNumberIsValid(oldoffnum))
+		oldoffnum = *len;
+	else
+		oldoffnum -= FirstOffsetNumber;
 	itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen));
-	memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen);
+	if (oldoffnum < *len)
+		memmove(&itvec[oldoffnum + addlen], &itvec[oldoffnum], sizeof(IndexTuple) * (*len - oldoffnum));
+	memmove(&itvec[oldoffnum], additvec, sizeof(IndexTuple) * addlen);
 	*len += addlen;
 	return itvec;
 }
 
+/*
+ * join two vectors into one
+ */
+void
+gistfiltervector(IndexTuple *itvec, int *len)
+{
+	int i;
+	int newlen = 0;
+	int oldlen = *len;
+
+	for (i = 0; i < oldlen; i++)
+	{
+		if (GistTupleIsSkip(itvec[i]))
+			continue;
+		itvec[newlen++] = itvec[i];
+	}
+	*len = newlen;
+}
+
 /*
  * make plain IndexTupleVector
  */
@@ -357,6 +413,11 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis
 		/* need to update key */
 		newtup = gistFormTuple(giststate, r, attr, isnull, false);
 		newtup->t_tid = oldtup->t_tid;
+		if (GistTupleIsSkip(oldtup))
+		{
+			GistTupleMakeSkip(newtup);
+			GistTupleSetSkipCount(newtup, GistTupleGetSkipCount(oldtup));
+		}
 	}
 
 	return newtup;
@@ -370,10 +431,11 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis
  */
 OffsetNumber
 gistchoose(Relation r, Page p, IndexTuple it,	/* it has compressed entry */
-		   GISTSTATE *giststate)
+		   GISTSTATE *giststate, OffsetNumber *skipoffnum)
 {
 	OffsetNumber result;
 	OffsetNumber maxoff;
+	OffsetNumber prevskip = InvalidOffsetNumber;
 	OffsetNumber i;
 	float		best_penalty[INDEX_MAX_KEYS];
 	GISTENTRY	entry,
@@ -439,98 +501,121 @@ gistchoose(Relation r, Page p, IndexTuple it,	/* it has compressed entry */
 		bool		zero_penalty;
 		int			j;
 
-		zero_penalty = true;
-
-		/* Loop over index attributes. */
-		for (j = 0; j < r->rd_att->natts; j++)
+		if (GistTupleIsSkip(itup))
 		{
 			Datum		datum;
 			float		usize;
 			bool		IsNull;
 
-			/* Compute penalty for this column. */
-			datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull);
-			gistdentryinit(giststate, j, &entry, datum, r, p, i,
-						   false, IsNull);
-			usize = gistpenalty(giststate, j, &entry, IsNull,
-								&identry[j], isnull[j]);
-			if (usize > 0)
-				zero_penalty = false;
+			prevskip = i;
 
-			if (best_penalty[j] < 0 || usize < best_penalty[j])
-			{
-				/*
-				 * New best penalty for column.  Tentatively select this tuple
-				 * as the target, and record the best penalty.  Then reset the
-				 * next column's penalty to "unknown" (and indirectly, the
-				 * same for all the ones to its right).  This will force us to
-				 * adopt this tuple's penalty values as the best for all the
-				 * remaining columns during subsequent loop iterations.
-				 */
-				result = i;
-				best_penalty[j] = usize;
-
-				if (j < r->rd_att->natts - 1)
-					best_penalty[j + 1] = -1;
-
-				/* we have new best, so reset keep-it decision */
-				keep_current_best = -1;
-			}
-			else if (best_penalty[j] == usize)
-			{
-				/*
-				 * The current tuple is exactly as good for this column as the
-				 * best tuple seen so far.  The next iteration of this loop
-				 * will compare the next column.
-				 */
-			}
-			else
+			datum = index_getattr(itup, 1, giststate->tupdesc, &IsNull);
+			gistdentryinit(giststate, 0, &entry, datum, r, p, i,
+							false, IsNull);
+			usize = gistpenalty(giststate, 0, &entry, IsNull,
+									&identry[0], isnull[0]);
+			if (usize > best_penalty[0] && best_penalty[0] != -1)
 			{
-				/*
-				 * The current tuple is worse for this column than the best
-				 * tuple seen so far.  Skip the remaining columns and move on
-				 * to the next tuple, if any.
-				 */
-				zero_penalty = false;	/* so outer loop won't exit */
-				break;
+				i += GistTupleGetSkipCount(itup);
 			}
 		}
-
-		/*
-		 * If we looped past the last column, and did not update "result",
-		 * then this tuple is exactly as good as the prior best tuple.
-		 */
-		if (j == r->rd_att->natts && result != i)
+		else
 		{
-			if (keep_current_best == -1)
+			zero_penalty = true;
+
+			/* Loop over index attributes. */
+			for (j = 0; j < r->rd_att->natts; j++)
 			{
-				/* we didn't make the random choice yet for this old best */
-				keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
+				Datum		datum;
+				float		usize;
+				bool		IsNull;
+
+				/* Compute penalty for this column. */
+				datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull);
+				gistdentryinit(giststate, j, &entry, datum, r, p, i,
+							false, IsNull);
+				usize = gistpenalty(giststate, j, &entry, IsNull,
+									&identry[j], isnull[j]);
+				if (usize > 0)
+					zero_penalty = false;
+
+				if (best_penalty[j] < 0 || usize < best_penalty[j])
+				{
+					/*
+					* New best penalty for column.  Tentatively select this tuple
+					* as the target, and record the best penalty.  Then reset the
+					* next column's penalty to "unknown" (and indirectly, the
+					* same for all the ones to its right).  This will force us to
+					* adopt this tuple's penalty values as the best for all the
+					* remaining columns during subsequent loop iterations.
+					*/
+					result = i;
+					*skipoffnum = prevskip;
+					best_penalty[j] = usize;
+
+					if (j < r->rd_att->natts - 1)
+						best_penalty[j + 1] = -1;
+
+					/* we have new best, so reset keep-it decision */
+					keep_current_best = -1;
+				}
+				else if (best_penalty[j] == usize)
+				{
+					/*
+					* The current tuple is exactly as good for this column as the
+					* best tuple seen so far.  The next iteration of this loop
+					* will compare the next column.
+					*/
+				}
+				else
+				{
+					/*
+					* The current tuple is worse for this column than the best
+					* tuple seen so far.  Skip the remaining columns and move on
+					* to the next tuple, if any.
+					*/
+					zero_penalty = false;	/* so outer loop won't exit */
+					break;
+				}
 			}
-			if (keep_current_best == 0)
+
+			/*
+			* If we looped past the last column, and did not update "result",
+			* then this tuple is exactly as good as the prior best tuple.
+			*/
+			if (j == r->rd_att->natts && result != i)
 			{
-				/* we choose to use the new tuple */
-				result = i;
-				/* choose again if there are even more exactly-as-good ones */
-				keep_current_best = -1;
+				if (keep_current_best == -1)
+				{
+					/* we didn't make the random choice yet for this old best */
+					keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
+				}
+				if (keep_current_best == 0)
+				{
+					/* we choose to use the new tuple */
+					result = i;
+					*skipoffnum = prevskip;
+					/* choose again if there are even more exactly-as-good ones */
+					keep_current_best = -1;
+				}
 			}
-		}
 
-		/*
-		 * If we find a tuple with zero penalty for all columns, and we've
-		 * decided we don't want to search for another tuple with equal
-		 * penalty, there's no need to examine remaining tuples; just break
-		 * out of the loop and return it.
-		 */
-		if (zero_penalty)
-		{
-			if (keep_current_best == -1)
+			/*
+			* If we find a tuple with zero penalty for all columns, and we've
+			* decided we don't want to search for another tuple with equal
+			* penalty, there's no need to examine remaining tuples; just break
+			* out of the loop and return it.
+			*/
+			if (zero_penalty)
 			{
-				/* we didn't make the random choice yet for this old best */
-				keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
+				if (keep_current_best == -1)
+				{
+					/* we didn't make the random choice yet for this old best */
+					keep_current_best = (random() <= (MAX_RANDOM_VALUE / 2)) ? 1 : 0;
+				}
+				if (keep_current_best == 1)
+					break;
 			}
-			if (keep_current_best == 1)
-				break;
 		}
 	}
 
@@ -643,6 +728,7 @@ gistFetchTuple(GISTSTATE *giststate, Relation r, IndexTuple tuple)
 	Datum		fetchatt[INDEX_MAX_KEYS];
 	bool		isnull[INDEX_MAX_KEYS];
 	int			i;
+	Assert(!GistTupleIsSkip(tuple));
 
 	for (i = 0; i < r->rd_att->natts; i++)
 	{
diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c
index 22181c6299..9773ca9347 100644
--- a/src/backend/access/gist/gistvacuum.c
+++ b/src/backend/access/gist/gistvacuum.c
@@ -225,7 +225,7 @@ gistbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 
 					recptr = gistXLogUpdate(buffer,
 											todelete, ntodelete,
-											NULL, 0, InvalidBuffer);
+											NULL, 0, InvalidBuffer, InvalidOffsetNumber);
 					PageSetLSN(page, recptr);
 				}
 				else
@@ -247,6 +247,9 @@ gistbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 				iid = PageGetItemId(page, i);
 				idxtuple = (IndexTuple) PageGetItem(page, iid);
 
+				if (GistTupleIsSkip(idxtuple))
+					continue;
+
 				ptr = (GistBDItem *) palloc(sizeof(GistBDItem));
 				ptr->blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
 				ptr->parentlsn = BufferGetLSNAtomic(buffer);
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index 1e09126978..e46ab6f0da 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -82,6 +82,15 @@ gistRedoPageUpdateRecord(XLogReaderState *record)
 
 		page = (Page) BufferGetPage(buffer);
 
+		if (xldata->skipoffnum != InvalidOffsetNumber)
+		{
+			IndexTuple skiptuple = (IndexTuple) PageGetItem(page, PageGetItemId(page, xldata->skipoffnum));
+			int newskipgroupsize = GistTupleGetSkipCount(skiptuple) + xldata->ntoinsert - xldata->ntodelete;
+
+			Assert(GistTupleIsSkip(skiptuple));
+			GistTupleSetSkipCount(skiptuple, newskipgroupsize);
+		}
+
 		if (xldata->ntodelete == 1 && xldata->ntoinsert == 1)
 		{
 			/*
@@ -457,14 +466,17 @@ XLogRecPtr
 gistXLogUpdate(Buffer buffer,
 			   OffsetNumber *todelete, int ntodelete,
 			   IndexTuple *itup, int ituplen,
-			   Buffer leftchildbuf)
+			   Buffer leftchildbuf, OffsetNumber skipoffnum)
 {
 	gistxlogPageUpdate xlrec;
 	int			i;
 	XLogRecPtr	recptr;
 
+	char *start, *end;
+
 	xlrec.ntodelete = ntodelete;
 	xlrec.ntoinsert = ituplen;
+	xlrec.skipoffnum = skipoffnum;
 
 	XLogBeginInsert();
 	XLogRegisterData((char *) &xlrec, sizeof(gistxlogPageUpdate));
@@ -473,8 +485,26 @@ gistXLogUpdate(Buffer buffer,
 	XLogRegisterBufData(0, (char *) todelete, sizeof(OffsetNumber) * ntodelete);
 
 	/* new tuples */
-	for (i = 0; i < ituplen; i++)
-		XLogRegisterBufData(0, (char *) (itup[i]), IndexTupleSize(itup[i]));
+	if (ituplen > 0)
+	{
+		start = (char *) itup[0];
+		end = start + IndexTupleSize(itup[0]);
+		for (i = 1; i < ituplen; i++)
+		{
+			if (end == (char *) itup[i])
+			{
+				end += IndexTupleSize(itup[i]);
+				continue;
+			}
+			else
+			{
+				XLogRegisterBufData(0, start, end - start);
+				start = (char *) itup[i];
+				end = start + IndexTupleSize(itup[i]);
+			}
+		}
+		XLogRegisterBufData(0, start, end - start);
+	}
 
 	/*
 	 * Include a full page image of the child buf. (only necessary if a
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
index 36ed7244ba..e01bfb1b2e 100644
--- a/src/include/access/gist_private.h
+++ b/src/include/access/gist_private.h
@@ -39,6 +39,8 @@
  */
 #define GIST_MAX_SPLIT_PAGES		75
 
+#define GIST_SKIPGROUP_THRESHOLD	16
+
 /* Buffer lock modes */
 #define GIST_SHARE	BUFFER_LOCK_SHARE
 #define GIST_EXCLUSIVE	BUFFER_LOCK_EXCLUSIVE
@@ -216,6 +218,8 @@ typedef struct GISTInsertStack
 	/* offset of the downlink in the parent page, that points to this page */
 	OffsetNumber downlinkoffnum;
 
+	OffsetNumber skipoffnum;
+
 	/* pointer to parent */
 	struct GISTInsertStack *parent;
 } GISTInsertStack;
@@ -276,6 +280,12 @@ typedef struct
 #define  GistTupleSetValid(itup)	ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_VALID )
 
 
+#define INDEX_SKIP_MASK 0x2000
+
+#define GistTupleIsSkip(itup)			((((IndexTuple) (itup))->t_info & INDEX_SKIP_MASK))
+#define GistTupleMakeSkip(itup)		((((IndexTuple) (itup))->t_info |= INDEX_SKIP_MASK))
+#define GistTupleSetSkipCount(itup, skipTupleCnt)	((((IndexTuple) (itup))->t_tid.ip_posid = skipTupleCnt))
+#define GistTupleGetSkipCount(itup)	((((IndexTuple) (itup))->t_tid.ip_posid))
 
 
 /*
@@ -404,15 +414,20 @@ extern bool gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
 				OffsetNumber oldoffnum, BlockNumber *newblkno,
 				Buffer leftchildbuf,
 				List **splitinfo,
-				bool markleftchild);
+				bool markleftchild,
+				int ndeltup,
+				OffsetNumber skipoffnum);
 
 extern SplitedPageLayout *gistSplit(Relation r, Page page, IndexTuple *itup,
 		  int len, GISTSTATE *giststate);
 
+extern SplitedPageLayout *gistSplitBySkipgroup(Relation r, Page page, IndexTuple *itup,
+		  int len, GISTSTATE *giststate);
+
 extern XLogRecPtr gistXLogUpdate(Buffer buffer,
 			   OffsetNumber *todelete, int ntodelete,
 			   IndexTuple *itup, int ntup,
-			   Buffer leftchild);
+			   Buffer leftchild, OffsetNumber skipoffnum);
 
 extern XLogRecPtr gistXLogSplit(bool page_is_leaf,
 			  SplitedPageLayout *dist,
@@ -440,15 +455,18 @@ extern bool gistproperty(Oid index_oid, int attno,
 			 IndexAMProperty prop, const char *propname,
 			 bool *res, bool *isnull);
 extern bool gistfitpage(IndexTuple *itvec, int len);
-extern bool gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace);
+extern bool gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace, int ndeltup);
 extern void gistcheckpage(Relation rel, Buffer buf);
 extern Buffer gistNewBuffer(Relation r);
 extern void gistfillbuffer(Page page, IndexTuple *itup, int len,
 			   OffsetNumber off);
-extern IndexTuple *gistextractpage(Page page, int *len /* out */ );
+extern IndexTuple *gistextractpage(Page page, int *len /* out */);
+extern IndexTuple *gistextractrange(Page page, OffsetNumber start, int len);
 extern IndexTuple *gistjoinvector(
 			   IndexTuple *itvec, int *len,
-			   IndexTuple *additvec, int addlen);
+			   IndexTuple *additvec, int addlen, OffsetNumber oldoffnum);
+extern void gistfiltervector(IndexTuple *itvec, int *len);
+extern inline void gistcheckskippage(Page page);
 extern IndexTupleData *gistfillitupvec(IndexTuple *vec, int veclen, int *memlen);
 
 extern IndexTuple gistunion(Relation r, IndexTuple *itvec,
@@ -462,7 +480,8 @@ extern IndexTuple gistFormTuple(GISTSTATE *giststate,
 
 extern OffsetNumber gistchoose(Relation r, Page p,
 		   IndexTuple it,
-		   GISTSTATE *giststate);
+		   GISTSTATE *giststate,
+		   OffsetNumber *skipoffnum);
 
 extern void GISTInitBuffer(Buffer b, uint32 f);
 extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
diff --git a/src/include/access/gistxlog.h b/src/include/access/gistxlog.h
index 1a2b9496d0..625a12ccd1 100644
--- a/src/include/access/gistxlog.h
+++ b/src/include/access/gistxlog.h
@@ -32,8 +32,9 @@
 typedef struct gistxlogPageUpdate
 {
 	/* number of deleted offsets */
-	uint16		ntodelete;
-	uint16		ntoinsert;
+	uint16	 	 ntodelete;
+	uint16		 ntoinsert;
+	OffsetNumber skipoffnum;
 
 	/*
 	 * In payload of blk 0 : 1. todelete OffsetNumbers 2. tuples to insert
-- 
2.14.3 (Apple Git-98)

