From f7d2a623b2c36ef69eb3c5b74fe2619e5f054e49 Mon Sep 17 00:00:00 2001
From: Pavel Borisov <pashkin.elfe@gmail.com>
Date: Tue, 11 Aug 2020 12:05:51 +0400
Subject: [PATCH v4] Covering SP-GiST index - support for INCLUDE columns

Adding INCLUDE colums for SPGiST index is intended to increase the speed of queries by making scan index only likewise
in btree and GiST index. These included values are added only to leaf tuples and they are not used in index tree search
but they can be fetched during index scan.

The other point of included columns is to overcome SP-GiST limitation of being single-column in principle. I.e. in
certain cases a single covering SP-GiST index can replace several separate ones with less disk space and shared buffers
consumption, faster update etc. Also there can be included any data types without SP-GiST supported opclasses.

Discussion: https://www.postgresql.org/message-id/flat/CALT9ZEFi-vMp4faht9f9Junb1nO3NOSjhpxTmbm1UGLMsLqiEQ@mail.gmail.com
---
 doc/src/sgml/indices.sgml                     |   4 +-
 doc/src/sgml/ref/create_index.sgml            |   4 +-
 doc/src/sgml/spgist.sgml                      |   8 +
 src/backend/access/spgist/README              |   2 +-
 src/backend/access/spgist/spgdoinsert.c       | 172 +++++---
 src/backend/access/spgist/spginsert.c         |   5 +-
 src/backend/access/spgist/spgscan.c           |  87 +++-
 src/backend/access/spgist/spgutils.c          | 382 ++++++++++++++++--
 src/backend/access/spgist/spgvacuum.c         |  25 +-
 src/backend/access/spgist/spgxlog.c           |   6 +-
 src/include/access/spgist_private.h           | 160 +++++---
 src/test/regress/expected/amutils.out         |   4 +-
 src/test/regress/expected/index_including.out |   1 -
 .../expected/index_including_spgist.out       | 139 +++++++
 src/test/regress/parallel_schedule            |   2 +-
 src/test/regress/serial_schedule              |   1 +
 .../regress/sql/index_including_spgist.sql    |  81 ++++
 17 files changed, 898 insertions(+), 185 deletions(-)
 create mode 100644 src/test/regress/expected/index_including_spgist.out
 create mode 100644 src/test/regress/sql/index_including_spgist.sql

diff --git a/doc/src/sgml/indices.sgml b/doc/src/sgml/indices.sgml
index 28adaba72d..c89cc6cb08 100644
--- a/doc/src/sgml/indices.sgml
+++ b/doc/src/sgml/indices.sgml
@@ -1194,8 +1194,8 @@ CREATE UNIQUE INDEX tab_x_y ON tab(x) INCLUDE (y);
    likely to not need to access the heap.  If the heap tuple must be visited
    anyway, it costs nothing more to get the column's value from there.
    Other restrictions are that expressions are not currently supported as
-   included columns, and that only B-tree and GiST indexes currently support
-   included columns.
+   included columns, and that only B-tree, GiST and SP-GiST indexes currently
+   support included columns.
   </para>
 
   <para>
diff --git a/doc/src/sgml/ref/create_index.sgml b/doc/src/sgml/ref/create_index.sgml
index ff87b2d28f..3d360bcf47 100644
--- a/doc/src/sgml/ref/create_index.sgml
+++ b/doc/src/sgml/ref/create_index.sgml
@@ -187,8 +187,8 @@ CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS ] <replaceable class=
        </para>
 
        <para>
-        Currently, the B-tree and the GiST index access methods support this
-        feature.  In B-tree and the GiST indexes, the values of columns listed
+        Currently, the B-tree, GiST and SP-GiST index access methods support
+        this feature.  In these indexes, the values of columns listed
         in the <literal>INCLUDE</literal> clause are included in leaf tuples
         which correspond to heap tuples, but are not included in upper-level
         index entries used for tree navigation.
diff --git a/doc/src/sgml/spgist.sgml b/doc/src/sgml/spgist.sgml
index 0e04a08679..868a140a6a 100644
--- a/doc/src/sgml/spgist.sgml
+++ b/doc/src/sgml/spgist.sgml
@@ -240,6 +240,14 @@
   inner tuples that are passed through to reach the leaf level.
  </para>
 
+ <para>
+  In case when <acronym>SP-GiST</acronym> index is created with
+  <literal>INCLUDE</literal> clause i.e. covering index, leaf tuples also
+  contain data from included columns. This data is stored uncompressed and can have
+  data types without any SP-GiST operator class.
+
+ </para>
+
  <para>
   Inner tuples are more complex, since they are branching points in the
   search tree.  Each inner tuple contains a set of one or more
diff --git a/src/backend/access/spgist/README b/src/backend/access/spgist/README
index b55b073832..87e08431fa 100644
--- a/src/backend/access/spgist/README
+++ b/src/backend/access/spgist/README
@@ -73,8 +73,8 @@ Leaf tuple consists of:
     Example:
         radix tree - the rest of string (postfix)
         quad and k-d tree - the point itself
-
   ItemPointer to the heap
+  optional included colums values
 
 
 NULLS HANDLING
diff --git a/src/backend/access/spgist/spgdoinsert.c b/src/backend/access/spgist/spgdoinsert.c
index 934d65b89f..4c133b7106 100644
--- a/src/backend/access/spgist/spgdoinsert.c
+++ b/src/backend/access/spgist/spgdoinsert.c
@@ -22,7 +22,7 @@
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
 #include "utils/rel.h"
-
+#include "access/htup_details.h"
 
 /*
  * SPPageDesc tracks all info about a page we are inserting into.  In some
@@ -220,7 +220,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
 		SpGistBlockIsRoot(current->blkno))
 	{
 		/* Tuple is not part of a chain */
-		leafTuple->nextOffset = InvalidOffsetNumber;
+		SGLT_SET_OFFSET(leafTuple->nextOffset, InvalidOffsetNumber);
 		current->offnum = SpGistPageAddNewItem(state, current->page,
 											   (Item) leafTuple, leafTuple->size,
 											   NULL, false);
@@ -253,7 +253,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
 											 PageGetItemId(current->page, current->offnum));
 		if (head->tupstate == SPGIST_LIVE)
 		{
-			leafTuple->nextOffset = head->nextOffset;
+			SGLT_SET_OFFSET(leafTuple->nextOffset, SGLT_GET_OFFSET(head->nextOffset));
 			offnum = SpGistPageAddNewItem(state, current->page,
 										  (Item) leafTuple, leafTuple->size,
 										  NULL, false);
@@ -264,14 +264,14 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
 			 */
 			head = (SpGistLeafTuple) PageGetItem(current->page,
 												 PageGetItemId(current->page, current->offnum));
-			head->nextOffset = offnum;
+			SGLT_SET_OFFSET(head->nextOffset, offnum);
 
 			xlrec.offnumLeaf = offnum;
 			xlrec.offnumHeadLeaf = current->offnum;
 		}
 		else if (head->tupstate == SPGIST_DEAD)
 		{
-			leafTuple->nextOffset = InvalidOffsetNumber;
+			SGLT_SET_OFFSET(leafTuple->nextOffset, InvalidOffsetNumber);
 			PageIndexTupleDelete(current->page, current->offnum);
 			if (PageAddItem(current->page,
 							(Item) leafTuple, leafTuple->size,
@@ -362,13 +362,13 @@ checkSplitConditions(Relation index, SpGistState *state,
 		{
 			/* We could see a DEAD tuple as first/only chain item */
 			Assert(i == current->offnum);
-			Assert(it->nextOffset == InvalidOffsetNumber);
+			Assert(SGLT_GET_OFFSET(it->nextOffset) == InvalidOffsetNumber);
 			/* Don't count it in result, because it won't go to other page */
 		}
 		else
 			elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
 
-		i = it->nextOffset;
+		i = SGLT_GET_OFFSET(it->nextOffset);
 	}
 
 	*nToSplit = n;
@@ -437,7 +437,7 @@ moveLeafs(Relation index, SpGistState *state,
 		{
 			/* We could see a DEAD tuple as first/only chain item */
 			Assert(i == current->offnum);
-			Assert(it->nextOffset == InvalidOffsetNumber);
+			Assert(SGLT_GET_OFFSET(it->nextOffset) == InvalidOffsetNumber);
 			/* We don't want to move it, so don't count it in size */
 			toDelete[nDelete] = i;
 			nDelete++;
@@ -446,7 +446,7 @@ moveLeafs(Relation index, SpGistState *state,
 		else
 			elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
 
-		i = it->nextOffset;
+		i = SGLT_GET_OFFSET(it->nextOffset);
 	}
 
 	/* Find a leaf page that will hold them */
@@ -475,7 +475,7 @@ moveLeafs(Relation index, SpGistState *state,
 			 * don't care).  We're modifying the tuple on the source page
 			 * here, but it's okay since we're about to delete it.
 			 */
-			it->nextOffset = r;
+			SGLT_SET_OFFSET(it->nextOffset, r);
 
 			r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
 									 &startOffset, false);
@@ -490,7 +490,7 @@ moveLeafs(Relation index, SpGistState *state,
 	}
 
 	/* add the new tuple as well */
-	newLeafTuple->nextOffset = r;
+	SGLT_SET_OFFSET(newLeafTuple->nextOffset, r);
 	r = SpGistPageAddNewItem(state, npage,
 							 (Item) newLeafTuple, newLeafTuple->size,
 							 &startOffset, false);
@@ -709,6 +709,9 @@ doPickSplit(Relation index, SpGistState *state,
 	int			nToDelete,
 				nToInsert,
 				maxToInclude;
+	Datum	   *leafChainDatums;
+	bool	   *leafChainIsnulls;
+	const int	natts = IndexRelationGetNumberOfAttributes(index);
 
 	in.level = level;
 
@@ -723,14 +726,16 @@ doPickSplit(Relation index, SpGistState *state,
 	toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
 	newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
 	leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
-
 	STORE_STATE(state, xlrec.stateSrc);
 
+	leafChainDatums = (Datum *) palloc(n * natts * sizeof(Datum));
+	leafChainIsnulls = (bool *) palloc(n * natts * sizeof(bool));
+
 	/*
-	 * Form list of leaf tuples which will be distributed as split result;
-	 * also, count up the amount of space that will be freed from current.
-	 * (Note that in the non-root case, we won't actually delete the old
-	 * tuples, only replace them with redirects or placeholders.)
+	 * Collect leaf tuples which will be distributed as split result; also,
+	 * count up the amount of space that will be freed from current. (Note
+	 * that in the non-root case, we won't actually delete the old tuples,
+	 * only replace them with redirects or placeholders.)
 	 *
 	 * Note: the SGLTDATUM calls here are safe even when dealing with a nulls
 	 * page.  For a pass-by-value data type we will fetch a word that must
@@ -738,7 +743,15 @@ doPickSplit(Relation index, SpGistState *state,
 	 * tuples must have size at least SGDTSIZE).  For a pass-by-reference type
 	 * we are just computing a pointer that isn't going to get dereferenced.
 	 * So it's not worth guarding the calls with isNulls checks.
+	 *
+	 * Datums and isnulls of all leaf tuple attributes in a chain are
+	 * collected into 2-d arrays: (number of tuples in chain) x (number of
+	 * attributes) First attribute is key, the other - included attributes (if
+	 * any). After picksplit we need to form new leaf tuples as key attribute
+	 * length can change which can affect alignment of every include
+	 * attribute.
 	 */
+
 	nToInsert = 0;
 	nToDelete = 0;
 	spaceToDelete = 0;
@@ -759,6 +772,8 @@ doPickSplit(Relation index, SpGistState *state,
 			{
 				in.datums[nToInsert] = SGLTDATUM(it, state);
 				heapPtrs[nToInsert] = it->heapPtr;
+				SpGistDeformLeafTuple(it, state, leafChainDatums + nToInsert * natts,
+									  leafChainIsnulls + nToInsert * natts, isNulls);
 				nToInsert++;
 				toDelete[nToDelete] = i;
 				nToDelete++;
@@ -784,6 +799,9 @@ doPickSplit(Relation index, SpGistState *state,
 			{
 				in.datums[nToInsert] = SGLTDATUM(it, state);
 				heapPtrs[nToInsert] = it->heapPtr;
+
+				SpGistDeformLeafTuple(it, state, leafChainDatums + nToInsert * natts,
+									  leafChainIsnulls + nToInsert * natts, isNulls);
 				nToInsert++;
 				toDelete[nToDelete] = i;
 				nToDelete++;
@@ -795,7 +813,7 @@ doPickSplit(Relation index, SpGistState *state,
 			{
 				/* We could see a DEAD tuple as first/only chain item */
 				Assert(i == current->offnum);
-				Assert(it->nextOffset == InvalidOffsetNumber);
+				Assert(SGLT_GET_OFFSET(it->nextOffset) == InvalidOffsetNumber);
 				toDelete[nToDelete] = i;
 				nToDelete++;
 				/* replacing it with redirect will save no space */
@@ -803,7 +821,7 @@ doPickSplit(Relation index, SpGistState *state,
 			else
 				elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
 
-			i = it->nextOffset;
+			i = SGLT_GET_OFFSET(it->nextOffset);
 		}
 	}
 	in.nTuples = nToInsert;
@@ -816,10 +834,17 @@ doPickSplit(Relation index, SpGistState *state,
 	 */
 	in.datums[in.nTuples] = SGLTDATUM(newLeafTuple, state);
 	heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
+
+	SpGistDeformLeafTuple(newLeafTuple, state, leafChainDatums + (in.nTuples) * natts,
+						  leafChainIsnulls + (in.nTuples) * natts, isNulls);
 	in.nTuples++;
 
 	memset(&out, 0, sizeof(out));
 
+	/*
+	 * Process collected key values of tuples from the chain. Included values
+	 * are used to build fresh leaf tuples unchanged.
+	 */
 	if (!isNulls)
 	{
 		/*
@@ -837,9 +862,11 @@ doPickSplit(Relation index, SpGistState *state,
 		totalLeafSizes = 0;
 		for (i = 0; i < in.nTuples; i++)
 		{
-			newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
-										   out.leafTupleDatums[i],
-										   false);
+			*(leafChainDatums + i * natts) = (Datum) out.leafTupleDatums[i];
+			*(leafChainIsnulls + i * natts) = false;
+
+			newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i, leafChainDatums + i * natts,
+										   leafChainIsnulls + i * natts);
 			totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
 		}
 	}
@@ -860,9 +887,14 @@ doPickSplit(Relation index, SpGistState *state,
 		totalLeafSizes = 0;
 		for (i = 0; i < in.nTuples; i++)
 		{
-			newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
-										   (Datum) 0,
-										   true);
+			/*
+			 * Nulls tree can contain only null key values.
+			 */
+			*(leafChainDatums + i * natts) = (Datum) 0;
+			*(leafChainIsnulls + i * natts) = true;
+
+			newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i, leafChainDatums + i * natts,
+										   leafChainIsnulls + i * natts);
 			totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
 		}
 	}
@@ -1196,10 +1228,10 @@ doPickSplit(Relation index, SpGistState *state,
 		if (ItemPointerIsValid(&nodes[n]->t_tid))
 		{
 			Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
-			it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid);
+			SGLT_SET_OFFSET(it->nextOffset, ItemPointerGetOffsetNumber(&nodes[n]->t_tid));
 		}
 		else
-			it->nextOffset = InvalidOffsetNumber;
+			SGLT_SET_OFFSET(it->nextOffset, InvalidOffsetNumber);
 
 		/* Insert it on page */
 		newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
@@ -1889,67 +1921,83 @@ spgSplitNodeAction(Relation index, SpGistState *state,
  */
 bool
 spgdoinsert(Relation index, SpGistState *state,
-			ItemPointer heapPtr, Datum datum, bool isnull)
+			ItemPointer heapPtr, Datum *datum, bool *isnull)
 {
 	int			level = 0;
-	Datum		leafDatum;
+	Datum	   *leafDatum;
 	int			leafSize;
 	SPPageDesc	current,
 				parent;
 	FmgrInfo   *procinfo = NULL;
+	int			i;
 
 	/*
 	 * Look up FmgrInfo of the user-defined choose function once, to save
 	 * cycles in the loop below.
 	 */
-	if (!isnull)
+	if (!isnull[0])
 		procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
 
 	/*
 	 * Prepare the leaf datum to insert.
-	 *
+	 */
+
+	leafDatum = (Datum *) palloc0(sizeof(Datum) * (IndexRelationGetNumberOfAttributes(index)));
+
+	/*
 	 * If an optional "compress" method is provided, then call it to form the
-	 * leaf datum from the input datum.  Otherwise store the input datum as
-	 * is.  Since we don't use index_form_tuple in this AM, we have to make
-	 * sure value to be inserted is not toasted; FormIndexDatum doesn't
-	 * guarantee that.  But we assume the "compress" method to return an
-	 * untoasted value.
+	 * key datum from the input datum.  Otherwise store the input datum as is.
+	 * Since we don't use index_form_tuple in this AM, we have to make sure
+	 * value to be inserted is not toasted; FormIndexDatum doesn't guarantee
+	 * that.  But we assume the "compress" method to return an untoasted
+	 * value.
 	 */
-	if (!isnull)
+	if (!isnull[0])
 	{
 		if (OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
 		{
 			FmgrInfo   *compressProcinfo = NULL;
 
 			compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
-			leafDatum = FunctionCall1Coll(compressProcinfo,
-										  index->rd_indcollation[0],
-										  datum);
+			leafDatum[0] = FunctionCall1Coll(compressProcinfo,
+											 index->rd_indcollation[0],
+											 datum[0]);
 		}
 		else
 		{
 			Assert(state->attLeafType.type == state->attType.type);
 
 			if (state->attType.attlen == -1)
-				leafDatum = PointerGetDatum(PG_DETOAST_DATUM(datum));
+				leafDatum[0] = PointerGetDatum(PG_DETOAST_DATUM(datum[0]));
 			else
-				leafDatum = datum;
+				leafDatum[0] = datum[0];
 		}
 	}
 	else
-		leafDatum = (Datum) 0;
+		leafDatum[0] = (Datum) 0;
+
+	for (i = 1; i < IndexRelationGetNumberOfAttributes(index); i++)
+	{
+		if (!isnull[i])
+		{
+			if (TupleDescAttr(state->includeTupdesc, i - 1)->attlen == -1)
+				leafDatum[i] = PointerGetDatum(PG_DETOAST_DATUM(datum[i]));
+			else
+				leafDatum[i] = datum[i];
+		}
+		else
+			leafDatum[i] = (Datum) 0;
+	}
+
 
 	/*
-	 * Compute space needed for a leaf tuple containing the given datum.
+	 * Compute space needed on a page for a leaf tuple containing the given
+	 * datum.
 	 *
 	 * If it isn't gonna fit, and the opclass can't reduce the datum size by
 	 * suffixing, bail out now rather than getting into an endless loop.
 	 */
-	if (!isnull)
-		leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
-			SpGistGetTypeSize(&state->attLeafType, leafDatum);
-	else
-		leafSize = SGDTSIZE + sizeof(ItemIdData);
+	leafSize = SpgLeafSize(state, leafDatum, isnull) + sizeof(ItemIdData);
 
 	if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK)
 		ereport(ERROR,
@@ -1961,7 +2009,7 @@ spgdoinsert(Relation index, SpGistState *state,
 				 errhint("Values larger than a buffer page cannot be indexed.")));
 
 	/* Initialize "current" to the appropriate root page */
-	current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
+	current.blkno = isnull[0] ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
 	current.buffer = InvalidBuffer;
 	current.page = NULL;
 	current.offnum = FirstOffsetNumber;
@@ -1995,7 +2043,7 @@ spgdoinsert(Relation index, SpGistState *state,
 			 */
 			current.buffer =
 				SpGistGetBuffer(index,
-								GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
+								GBUF_LEAF | (isnull[0] ? GBUF_NULLS : 0),
 								Min(leafSize, SPGIST_PAGE_CAPACITY),
 								&isNew);
 			current.blkno = BufferGetBlockNumber(current.buffer);
@@ -2037,7 +2085,7 @@ spgdoinsert(Relation index, SpGistState *state,
 		current.page = BufferGetPage(current.buffer);
 
 		/* should not arrive at a page of the wrong type */
-		if (isnull ? !SpGistPageStoresNulls(current.page) :
+		if (isnull[0] ? !SpGistPageStoresNulls(current.page) :
 			SpGistPageStoresNulls(current.page))
 			elog(ERROR, "SPGiST index page %u has wrong nulls flag",
 				 current.blkno);
@@ -2054,7 +2102,7 @@ spgdoinsert(Relation index, SpGistState *state,
 			{
 				/* it fits on page, so insert it and we're done */
 				addLeafTuple(index, state, leafTuple,
-							 &current, &parent, isnull, isNew);
+							 &current, &parent, isnull[0], isNew);
 				break;
 			}
 			else if ((sizeToSplit =
@@ -2068,14 +2116,14 @@ spgdoinsert(Relation index, SpGistState *state,
 				 * chain to another leaf page rather than splitting it.
 				 */
 				Assert(!isNew);
-				moveLeafs(index, state, &current, &parent, leafTuple, isnull);
+				moveLeafs(index, state, &current, &parent, leafTuple, isnull[0]);
 				break;			/* we're done */
 			}
 			else
 			{
 				/* picksplit */
 				if (doPickSplit(index, state, &current, &parent,
-								leafTuple, level, isnull, isNew))
+								leafTuple, level, isnull[0], isNew))
 					break;		/* doPickSplit installed new tuples */
 
 				/* leaf tuple will not be inserted yet */
@@ -2110,8 +2158,8 @@ spgdoinsert(Relation index, SpGistState *state,
 			innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
 														PageGetItemId(current.page, current.offnum));
 
-			in.datum = datum;
-			in.leafDatum = leafDatum;
+			in.datum = datum[0];
+			in.leafDatum = leafDatum[0];
 			in.level = level;
 			in.allTheSame = innerTuple->allTheSame;
 			in.hasPrefix = (innerTuple->prefixSize > 0);
@@ -2121,7 +2169,7 @@ spgdoinsert(Relation index, SpGistState *state,
 
 			memset(&out, 0, sizeof(out));
 
-			if (!isnull)
+			if (!isnull[0])
 			{
 				/* use user-defined choose method */
 				FunctionCall2Coll(procinfo,
@@ -2158,11 +2206,11 @@ spgdoinsert(Relation index, SpGistState *state,
 					/* Adjust level as per opclass request */
 					level += out.result.matchNode.levelAdd;
 					/* Replace leafDatum and recompute leafSize */
-					if (!isnull)
+					if (!isnull[0])
 					{
-						leafDatum = out.result.matchNode.restDatum;
-						leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
-							SpGistGetTypeSize(&state->attLeafType, leafDatum);
+						leafDatum[0] = out.result.matchNode.restDatum;
+						leafSize = SpgLeafSize(state, leafDatum, isnull) +
+							sizeof(ItemIdData);
 					}
 
 					/*
@@ -2227,6 +2275,6 @@ spgdoinsert(Relation index, SpGistState *state,
 		SpGistSetLastUsedPage(index, parent.buffer);
 		UnlockReleaseBuffer(parent.buffer);
 	}
-
+	pfree(leafDatum);
 	return true;
 }
diff --git a/src/backend/access/spgist/spginsert.c b/src/backend/access/spgist/spginsert.c
index e4508a2b92..b54ae85f6e 100644
--- a/src/backend/access/spgist/spginsert.c
+++ b/src/backend/access/spgist/spginsert.c
@@ -55,8 +55,7 @@ spgistBuildCallback(Relation index, ItemPointer tid, Datum *values,
 	 * lock on some buffer.  So we need to be willing to retry.  We can flush
 	 * any temp data when retrying.
 	 */
-	while (!spgdoinsert(index, &buildstate->spgstate, tid,
-						*values, *isnull))
+	while (!spgdoinsert(index, &buildstate->spgstate, tid, values, isnull))
 	{
 		MemoryContextReset(buildstate->tmpCtx);
 	}
@@ -226,7 +225,7 @@ spginsert(Relation index, Datum *values, bool *isnull,
 	 * to avoid cumulative memory consumption.  That means we also have to
 	 * redo initSpGistState(), but it's cheap enough not to matter.
 	 */
-	while (!spgdoinsert(index, &spgstate, ht_ctid, *values, *isnull))
+	while (!spgdoinsert(index, &spgstate, ht_ctid, values, isnull))
 	{
 		MemoryContextReset(insertCtx);
 		initSpGistState(&spgstate, index);
diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c
index 4d506bfb9a..5a3c7c50cf 100644
--- a/src/backend/access/spgist/spgscan.c
+++ b/src/backend/access/spgist/spgscan.c
@@ -28,7 +28,8 @@
 
 typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr,
 							   Datum leafValue, bool isNull, bool recheck,
-							   bool recheckDistances, double *distances);
+							   bool recheckDistances, double *distances,
+							   SpGistLeafTuple leafTuple);
 
 /*
  * Pairing heap comparison function for the SpGistSearchItem queue.
@@ -88,6 +89,9 @@ spgFreeSearchItem(SpGistScanOpaque so, SpGistSearchItem *item)
 	if (item->traversalValue)
 		pfree(item->traversalValue);
 
+	if (item->isLeaf && item->leafTuple)
+		pfree(item->leafTuple);
+
 	pfree(item);
 }
 
@@ -134,6 +138,8 @@ spgAddStartItem(SpGistScanOpaque so, bool isnull)
 	startEntry->recheck = false;
 	startEntry->recheckDistances = false;
 
+	startEntry->leafTuple = NULL;
+
 	spgAddSearchItemToQueue(so, startEntry);
 }
 
@@ -438,14 +444,30 @@ spgendscan(IndexScanDesc scan)
  * Leaf SpGistSearchItem constructor, called in queue context
  */
 static SpGistSearchItem *
-spgNewHeapItem(SpGistScanOpaque so, int level, ItemPointer heapPtr,
+spgNewHeapItem(SpGistScanOpaque so, int level, SpGistLeafTuple leafTuple,
 			   Datum leafValue, bool recheck, bool recheckDistances,
 			   bool isnull, double *distances)
 {
 	SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances);
 
+	/*
+	 * If there are include attributes search item in the queue should contain
+	 * them.
+	 */
+	if (so->state.includeTupdesc)
+	{
+		Assert(so->state.includeTupdesc->natts);
+
+		item->leafTuple = palloc(leafTuple->size);
+		memcpy(item->leafTuple, leafTuple, leafTuple->size);
+	}
+	else
+	{
+		item->leafTuple = NULL;
+	}
+
 	item->level = level;
-	item->heapPtr = *heapPtr;
+	item->heapPtr = leafTuple->heapPtr;
 	/* copy value to queue cxt out of tmp cxt */
 	item->value = isnull ? (Datum) 0 :
 		datumCopy(leafValue, so->state.attLeafType.attbyval,
@@ -503,6 +525,8 @@ spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
 		in.returnData = so->want_itup;
 		in.leafDatum = SGLTDATUM(leafTuple, &so->state);
 
+
+
 		out.leafValue = (Datum) 0;
 		out.recheck = false;
 		out.distances = NULL;
@@ -528,7 +552,7 @@ spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
 			/* the scan is ordered -> add the item to the queue */
 			MemoryContext oldCxt = MemoryContextSwitchTo(so->traversalCxt);
 			SpGistSearchItem *heapItem = spgNewHeapItem(so, item->level,
-														&leafTuple->heapPtr,
+														leafTuple,
 														leafValue,
 														recheck,
 														recheckDistances,
@@ -543,8 +567,10 @@ spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
 		{
 			/* non-ordered scan, so report the item right away */
 			Assert(!recheckDistances);
+
 			storeRes(so, &leafTuple->heapPtr, leafValue, isnull,
-					 recheck, false, NULL);
+					 recheck, false, NULL, leafTuple);
+
 			*reportedSome = true;
 		}
 	}
@@ -736,7 +762,7 @@ spgTestLeafTuple(SpGistScanOpaque so,
 				/* dead tuple should be first in chain */
 				Assert(offset == ItemPointerGetOffsetNumber(&item->heapPtr));
 				/* No live entries on this page */
-				Assert(leafTuple->nextOffset == InvalidOffsetNumber);
+				Assert(SGLT_GET_OFFSET(leafTuple->nextOffset) == InvalidOffsetNumber);
 				return SpGistBreakOffsetNumber;
 			}
 		}
@@ -750,7 +776,7 @@ spgTestLeafTuple(SpGistScanOpaque so,
 
 	spgLeafTest(so, item, leafTuple, isnull, reportedSome, storeRes);
 
-	return leafTuple->nextOffset;
+	return SGLT_GET_OFFSET(leafTuple->nextOffset);
 }
 
 /*
@@ -782,8 +808,8 @@ redirect:
 		{
 			/* We store heap items in the queue only in case of ordered search */
 			Assert(so->numberOfNonNullOrderBys > 0);
-			storeRes(so, &item->heapPtr, item->value, item->isNull,
-					 item->recheck, item->recheckDistances, item->distances);
+			storeRes(so, &item->heapPtr, item->value, item->isNull, item->recheck,
+					 item->recheckDistances, item->distances, item->leafTuple);
 			reportedSome = true;
 		}
 		else
@@ -877,7 +903,7 @@ redirect:
 static void
 storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr,
 			Datum leafValue, bool isnull, bool recheck, bool recheckDistances,
-			double *distances)
+			double *distances, SpGistLeafTuple leafTuple)
 {
 	Assert(!recheckDistances && !distances);
 	tbm_add_tuples(so->tbm, heapPtr, 1, recheck);
@@ -904,7 +930,7 @@ spggetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 static void
 storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
 			  Datum leafValue, bool isnull, bool recheck, bool recheckDistances,
-			  double *nonNullDistances)
+			  double *nonNullDistances, SpGistLeafTuple leafTuple)
 {
 	Assert(so->nPtrs < MaxIndexTuplesPerPage);
 	so->heapPtrs[so->nPtrs] = *heapPtr;
@@ -949,9 +975,38 @@ storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
 		 * Reconstruct index data.  We have to copy the datum out of the temp
 		 * context anyway, so we may as well create the tuple here.
 		 */
-		so->reconTups[so->nPtrs] = heap_form_tuple(so->indexTupDesc,
-												   &leafValue,
-												   &isnull);
+		if (so->state.includeTupdesc)
+		{
+			/* Add included attributes */
+			Datum	   *leafDatums;
+			bool	   *leafIsnulls;
+
+			Assert(so->state.includeTupdesc->natts);
+
+			leafDatums = (Datum *) palloc(sizeof(Datum) * (so->state.includeTupdesc->natts + 1));
+			leafIsnulls = (bool *) palloc(sizeof(bool) * (so->state.includeTupdesc->natts + 1));
+
+			SpGistDeformLeafTuple(leafTuple, &so->state, leafDatums, leafIsnulls, isnull);
+
+			/*
+			 * override key value extracted from LeafTuple in case we've
+			 * reconstructed it already
+			 */
+			leafDatums[0] = leafValue;
+			leafIsnulls[0] = isnull;
+
+			so->reconTups[so->nPtrs] = heap_form_tuple(so->indexTupDesc,
+													   leafDatums,
+													   leafIsnulls);
+			pfree(leafDatums);
+			pfree(leafIsnulls);
+		}
+		else
+		{
+			so->reconTups[so->nPtrs] = heap_form_tuple(so->indexTupDesc,
+													   &leafValue,
+													   &isnull);
+		}
 	}
 	so->nPtrs++;
 }
@@ -1019,6 +1074,10 @@ spgcanreturn(Relation index, int attno)
 {
 	SpGistCache *cache;
 
+	/* Included attributes always can be fetched for index-only scans */
+	if (attno > 1)
+		return true;
+
 	/* We can do it if the opclass config function says so */
 	cache = spgGetCache(index);
 
diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c
index 0efe05e552..3ca47ff53d 100644
--- a/src/backend/access/spgist/spgutils.c
+++ b/src/backend/access/spgist/spgutils.c
@@ -31,7 +31,18 @@
 #include "utils/index_selfuncs.h"
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
+#include "access/itup.h"
+#include "access/detoast.h"
+#include "access/toast_internals.h"
+#include "access/heaptoast.h"
+#include "utils/expandeddatum.h"
 
+/* Does att's datatype allow packing into the 1-byte-header varlena format? */
+#define ATT_IS_PACKABLE(att) \
+		((att)->attlen == -1 && (att)->attstorage != TYPSTORAGE_PLAIN)
+
+Size		spgIncludedDataSize(TupleDesc tupleDesc, Datum *values,
+								bool *isnull, Size start);
 
 /*
  * SP-GiST handler function: return IndexAmRoutine with access method parameters
@@ -49,7 +60,7 @@ spghandler(PG_FUNCTION_ARGS)
 	amroutine->amcanorderbyop = true;
 	amroutine->amcanbackward = false;
 	amroutine->amcanunique = false;
-	amroutine->amcanmulticol = false;
+	amroutine->amcanmulticol = true;
 	amroutine->amoptionalkey = true;
 	amroutine->amsearcharray = false;
 	amroutine->amsearchnulls = true;
@@ -57,7 +68,7 @@ spghandler(PG_FUNCTION_ARGS)
 	amroutine->amclusterable = false;
 	amroutine->ampredlocks = false;
 	amroutine->amcanparallel = false;
-	amroutine->amcaninclude = false;
+	amroutine->amcaninclude = true;
 	amroutine->amusemaintenanceworkmem = false;
 	amroutine->amparallelvacuumoptions =
 		VACUUM_OPTION_PARALLEL_BULKDEL | VACUUM_OPTION_PARALLEL_COND_CLEANUP;
@@ -116,14 +127,21 @@ spgGetCache(Relation index)
 		cache = MemoryContextAllocZero(index->rd_indexcxt,
 									   sizeof(SpGistCache));
 
-		/* SPGiST doesn't support multi-column indexes */
-		Assert(index->rd_att->natts == 1);
+		/*
+		 * SPGiST should have one key column and can also have included
+		 * columns
+		 */
+		if (IndexRelationGetNumberOfKeyAttributes(index) != 1)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("SPGiST index can have only one key column")));
 
 		/*
-		 * Get the actual data type of the indexed column from the index
-		 * tupdesc.  We pass this to the opclass config function so that
-		 * polymorphic opclasses are possible.
+		 * Get the actual data type of the key column from the index tupdesc.
+		 * We pass this to the opclass config function so that polymorphic
+		 * opclasses are possible.
 		 */
+
 		atttype = TupleDescAttr(index->rd_att, 0)->atttypid;
 
 		/* Call the config function to get config info for the opclass */
@@ -156,6 +174,7 @@ spgGetCache(Relation index)
 		fillTypeDesc(&cache->attPrefixType, cache->config.prefixType);
 		fillTypeDesc(&cache->attLabelType, cache->config.labelType);
 
+
 		/* Last, get the lastUsedPages data from the metapage */
 		metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
 		LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
@@ -177,7 +196,23 @@ spgGetCache(Relation index)
 		/* assume it's up to date */
 		cache = (SpGistCache *) index->rd_amcache;
 	}
+	/* Form descriptor for included columns if any */
+	if (IndexRelationGetNumberOfAttributes(index) > 1)
+	{
+		int			i;
+
+		cache->includeTupdesc = CreateTemplateTupleDesc(
+														IndexRelationGetNumberOfAttributes(index) - 1);
 
+		for (i = 0; i < IndexRelationGetNumberOfAttributes(index) - 1; i++)
+		{
+			TupleDescInitEntry(cache->includeTupdesc, i + 1, NULL,
+							   TupleDescAttr(index->rd_att, i + 1)->atttypid,
+							   -1, 0);
+		}
+	}
+	else
+		cache->includeTupdesc = NULL;
 	return cache;
 }
 
@@ -190,6 +225,7 @@ initSpGistState(SpGistState *state, Relation index)
 	/* Get cached static information about index */
 	cache = spgGetCache(index);
 
+	state->includeTupdesc = cache->includeTupdesc;
 	state->config = cache->config;
 	state->attType = cache->attType;
 	state->attLeafType = cache->attLeafType;
@@ -603,7 +639,7 @@ spgoptions(Datum reloptions, bool validate)
 
 /*
  * Get the space needed to store a non-null datum of the indicated type.
- * Note the result is already rounded up to a MAXALIGN boundary.
+ * Note the result is not maxaligned and this should be done by caller if needed.
  * Also, we follow the SPGiST convention that pass-by-val types are
  * just stored in their Datum representation (compare memcpyDatum).
  */
@@ -619,7 +655,7 @@ SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum)
 	else
 		size = VARSIZE_ANY(datum);
 
-	return MAXALIGN(size);
+	return size;
 }
 
 /*
@@ -642,36 +678,202 @@ memcpyDatum(void *target, SpGistTypeDesc *att, Datum datum)
 }
 
 /*
- * Construct a leaf tuple containing the given heap TID and datum value
+ * Private version of heap_compute_data_size with start address not
+ * necessarily MAXALIGNed. The reason is that start address (and alignment)
+ * influence alignment of each of next values and overall size of included
+ * data area in SpGiST leaf tuple.
+ */
+Size
+spgIncludedDataSize(TupleDesc tupleDesc,
+					Datum *values,
+					bool *isnull, Size start)
+{
+	Size		data_length = 0;
+	int			i;
+	int			numberOfAttributes = tupleDesc->natts;
+
+	data_length = start;
+	for (i = 0; i < numberOfAttributes; i++)
+	{
+		Datum		val;
+		Form_pg_attribute atti;
+
+		if (isnull[i])
+			continue;
+
+		val = values[i];
+		atti = TupleDescAttr(tupleDesc, i);
+
+		if (ATT_IS_PACKABLE(atti) &&
+			VARATT_CAN_MAKE_SHORT(DatumGetPointer(val)))
+		{
+			/*
+			 * we're anticipating converting to a short varlena header, so
+			 * adjust length and don't count any alignment
+			 */
+			data_length += VARATT_CONVERTED_SHORT_SIZE(DatumGetPointer(val));
+		}
+		else if (atti->attlen == -1 &&
+				 VARATT_IS_EXTERNAL_EXPANDED(DatumGetPointer(val)))
+		{
+			/*
+			 * we want to flatten the expanded value so that the constructed
+			 * tuple doesn't depend on it
+			 */
+			data_length = att_align_nominal(data_length, atti->attalign);
+			data_length += EOH_get_flat_size(DatumGetEOHP(val));
+		}
+		else
+		{
+			data_length = att_align_datum(data_length, atti->attalign,
+										  atti->attlen, val);
+			data_length = att_addlength_datum(data_length, atti->attlen,
+											  val);
+		}
+	}
+	return data_length - start;
+}
+
+/* Calculate overall leaf tuple size. SGLTHDRSZ is MAXALIGNed only for backward
+ * compatibility and there might be gap between header and key data. After key
+ * data there are no such gaps more than is is necessary for each value
+ * alignment. Overall result is MAXALIGNed.*/
+unsigned int
+SpgLeafSize(SpGistState *state, Datum *datum, bool *isnull)
+{
+	/* compute space needed, nullmask size and offset for include attributes */
+	unsigned int size = SGLTHDRSZ;
+	unsigned int i;
+
+	if (!isnull[0])
+		/* key attribute size (not maxaligned) */
+		size += SpGistGetTypeSize(&state->attLeafType, datum[0]);
+
+	if (state->includeTupdesc)
+	{
+		Assert(state->includeTupdesc->natts);
+		if (state->includeTupdesc->natts + 1 >= INDEX_MAX_KEYS)
+			ereport(ERROR,
+					(errcode(ERRCODE_TOO_MANY_COLUMNS),
+					 errmsg("number of index columns (%d) exceeds limit (%d)",
+							state->includeTupdesc->natts, INDEX_MAX_KEYS)));
+		/* nullmask size */
+		for (i = 1; i <= state->includeTupdesc->natts; i++)
+		{
+			if (isnull[i])
+			{
+				size += (state->includeTupdesc->natts / 8) + 1;
+				break;
+			}
+		}
+		/* overall included attributes size each with added proper alignment. */
+		size += spgIncludedDataSize(state->includeTupdesc, datum + 1, isnull + 1, size);
+	}
+	return MAXALIGN(size);
+}
+
+/*
+ * Construct a leaf tuple containing the given heap TID, key data and included
+ * columns data. Key data starts from MAXALIGN boundary for backward compatibility.
+ * Nullmask apply only to included attributes and is placed just after key data if
+ * there is at least one NULL among included attributes. It doesn't need alignment.
+ * Then all included columns data follow aligned by their typealign's.
  */
 SpGistLeafTuple
 spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr,
-				 Datum datum, bool isnull)
+				 Datum *datum, bool *isnull)
 {
 	SpGistLeafTuple tup;
-	unsigned int size;
+	unsigned int size = SGLTHDRSZ;
+	unsigned int include_offset = 0;
+	unsigned int nullmask_size = 0;
+	unsigned int data_offset = 0;
+	unsigned int data_size = 0;
+	uint16		tupmask = 0;
+	int			i;
 
-	/* compute space needed (note result is already maxaligned) */
-	size = SGLTHDRSZ;
-	if (!isnull)
-		size += SpGistGetTypeSize(&state->attLeafType, datum);
+	/*
+	 * Calculate space needed. If there are include attributes also calculate
+	 * sizes and offsets needed for heap_fill_tuple
+	 */
+	if (!isnull[0])
+		/* key attribute size (not maxaligned) */
+		size += SpGistGetTypeSize(&state->attLeafType, datum[0]);
+
+	if (state->includeTupdesc)
+	{
+		Assert(state->includeTupdesc->natts);
+		if (state->includeTupdesc->natts + 1 >= INDEX_MAX_KEYS)
+			ereport(ERROR,
+					(errcode(ERRCODE_TOO_MANY_COLUMNS),
+					 errmsg("number of index columns (%d) exceeds limit (%d)",
+							state->includeTupdesc->natts, INDEX_MAX_KEYS)));
+
+		include_offset = size;
+
+		for (i = 1; i <= state->includeTupdesc->natts; i++)
+		{
+			if (isnull[i])
+			{
+				nullmask_size = (state->includeTupdesc->natts / 8) + 1;
+				size += nullmask_size;
+				break;
+			}
+		}
+
+		/*
+		 * Alignment of all included attributes is counted inside data_size.
+		 * data_offset itself is not aligned.
+		 */
+		data_size = spgIncludedDataSize(state->includeTupdesc, datum + 1, isnull + 1, size);
+		data_offset = size;
+
+		size += data_size;
+	}
 
 	/*
 	 * Ensure that we can replace the tuple with a dead tuple later.  This
-	 * test is unnecessary when !isnull, but let's be safe.
+	 * test is unnecessary when !isnull[0], but let's be safe.
 	 */
 	if (size < SGDTSIZE)
 		size = SGDTSIZE;
 
 	/* OK, form the tuple */
-	tup = (SpGistLeafTuple) palloc0(size);
+	tup = (SpGistLeafTuple) palloc0(MAXALIGN(size));
 
-	tup->size = size;
-	tup->nextOffset = InvalidOffsetNumber;
+	tup->size = MAXALIGN(size);
+	SGLT_SET_OFFSET(tup->nextOffset, InvalidOffsetNumber);
 	tup->heapPtr = *heapPtr;
-	if (!isnull)
-		memcpyDatum(SGLTDATAPTR(tup), &state->attLeafType, datum);
 
+	if (!isnull[0])
+		memcpyDatum(SGLTDATAPTR(tup), &state->attLeafType, datum[0]);
+
+	/* Add included columns data to leaf tuple if any. */
+	if (state->includeTupdesc)
+	{
+		/*
+		 * The start of include attributes tuple is not aligned by default.
+		 * All values alignment should be done by heap_fill_tuple
+		 * automaticaly. If there is a nulls mask it is included just after
+		 * key attribute data and it should not be aligned.
+		 */
+		heap_fill_tuple(state->includeTupdesc, datum + 1, isnull + 1,
+						(char *) tup + data_offset,
+						data_size, &tupmask,
+						(nullmask_size ? (bits8 *) tup + include_offset : NULL));
+
+		if (nullmask_size)
+			SGLT_SET_CONTAINSNULLMASK(tup->nextOffset, 1);
+
+		/*
+		 * We do this because heap_fill_tuple wants to initialize a "tupmask"
+		 * which is used for HeapTuples, but the only relevant info is the
+		 * "has variable attributes" field. We have already set the hasnull
+		 * bit above.
+		 */
+		if (tupmask & HEAP_HASVARWIDTH)
+			SGLT_SET_CONTAINSVARATT(tup->nextOffset, 1);
+	}
 	return tup;
 }
 
@@ -688,10 +890,10 @@ spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
 	unsigned int size;
 	unsigned short infomask = 0;
 
-	/* compute space needed (note result is already maxaligned) */
+	/* compute space needed */
 	size = SGNTHDRSZ;
 	if (!isnull)
-		size += SpGistGetTypeSize(&state->attLabelType, label);
+		size += MAXALIGN(SpGistGetTypeSize(&state->attLabelType, label));
 
 	/*
 	 * Here we make sure that the size will fit in the field reserved for it
@@ -735,7 +937,7 @@ spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix,
 
 	/* Compute size needed */
 	if (hasPrefix)
-		prefixSize = SpGistGetTypeSize(&state->attPrefixType, prefix);
+		prefixSize = MAXALIGN(SpGistGetTypeSize(&state->attPrefixType, prefix));
 	else
 		prefixSize = 0;
 
@@ -1046,3 +1248,133 @@ spgproperty(Oid index_oid, int attno,
 
 	return true;
 }
+
+/*
+ * Convert an SpGist tuple into palloc'd Datum/isnull arrays.
+ *
+ */
+void
+SpGistDeformLeafTuple(SpGistLeafTuple tup, SpGistState *state, Datum *datum, bool *isnull,
+					  bool key_isnull)
+{
+	unsigned int include_offset;	/* offset of include data */
+	int			off;
+	bits8	   *nullmask_ptr = NULL;	/* ptr to null bitmap in tuple */
+	char	   *tp;
+	bool		slow = false;	/* can we use/set attcacheoff? */
+	int			i;
+
+	if (key_isnull)
+	{
+		datum[0] = (Datum) 0;
+		isnull[0] = true;
+	}
+	else
+	{
+		datum[0] = SGLTDATUM(tup, state);
+		isnull[0] = false;
+	}
+
+	if (state->includeTupdesc)
+	{
+		Assert(state->includeTupdesc->natts);
+		if (state->includeTupdesc->natts + 1 >= INDEX_MAX_KEYS)
+			ereport(ERROR,
+					(errcode(ERRCODE_TOO_MANY_COLUMNS),
+					 errmsg("number of index columns (%d) exceeds limit (%d)",
+							state->includeTupdesc->natts, INDEX_MAX_KEYS)));
+
+		include_offset = key_isnull ? SGLTHDRSZ : SGLTHDRSZ + SpGistGetTypeSize(&state->attLeafType, datum[0]);
+
+		tp = (char *) tup;
+		off = include_offset;
+
+		if (SGLT_GET_CONTAINSNULLMASK(tup->nextOffset))
+		{
+			nullmask_ptr = (bits8 *) tp + include_offset;
+			off += (state->includeTupdesc->natts) / 8 + 1;
+		}
+
+		if (state->attLeafType.attlen > 0 && !SGLT_GET_CONTAINSVARATT(tup->nextOffset) &&
+			!SGLT_GET_CONTAINSNULLMASK(tup->nextOffset))
+			/* can use attcacheoff for all attributes */
+		{
+			for (i = 1; i <= state->includeTupdesc->natts; i++)
+			{
+				Form_pg_attribute thisatt = TupleDescAttr(state->includeTupdesc, i - 1);
+
+				isnull[i] = false;
+				if (thisatt->attcacheoff >= 0)
+					off = thisatt->attcacheoff;
+				else
+				{
+					off = att_align_nominal(off, thisatt->attalign);
+					thisatt->attcacheoff = off;
+				}
+				datum[i] = fetchatt(thisatt, tp + off);
+				off = att_addlength_pointer(off, thisatt->attlen, tp + off);
+			}
+		}
+		else
+
+			/*
+			 * general case: can use cache until first null or varlen
+			 * attribute
+			 */
+		{
+			if (state->attLeafType.attlen <= 0)
+				slow = true;	/* can't use attcacheoff at all */
+
+			for (i = 1; i <= state->includeTupdesc->natts; i++)
+			{
+				Form_pg_attribute thisatt = TupleDescAttr(state->includeTupdesc, i - 1);
+
+				if (SGLT_GET_CONTAINSNULLMASK(tup->nextOffset))
+				{
+					if (att_isnull(i - 1, nullmask_ptr))
+					{
+						datum[i] = (Datum) 0;
+						isnull[i] = true;
+						slow = true;	/* can't use attcacheoff anymore */
+						continue;
+					}
+				}
+
+				isnull[i] = false;
+
+				if (!slow && thisatt->attcacheoff >= 0)
+					off = thisatt->attcacheoff;
+				else if (thisatt->attlen == -1)
+				{
+					/*
+					 * We can only cache the offset for a varlena attribute if
+					 * the offset is already suitably aligned, so that there
+					 * would be no pad bytes in any case: then the offset will
+					 * be valid for either an aligned or unaligned value.
+					 */
+					if (!slow && off == att_align_nominal(off, thisatt->attalign))
+						thisatt->attcacheoff = off;
+					else
+					{
+						off = att_align_pointer(off, thisatt->attalign, -1, tp + off);
+						slow = true;
+					}
+				}
+				else
+				{
+					/* not varlena, so safe to use att_align_nominal */
+					off = att_align_nominal(off, thisatt->attalign);
+
+					if (!slow)
+						thisatt->attcacheoff = off;
+				}
+
+				datum[i] = fetchatt(thisatt, tp + off);
+				off = att_addlength_pointer(off, thisatt->attlen, tp + off);
+
+				if (thisatt->attlen <= 0)
+					slow = true;	/* can't use attcacheoff anymore */
+			}
+		}
+	}
+}
diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c
index bd98707f3c..a0d76901fc 100644
--- a/src/backend/access/spgist/spgvacuum.c
+++ b/src/backend/access/spgist/spgvacuum.c
@@ -168,23 +168,28 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
 			}
 
 			/* Form predecessor map, too */
-			if (lt->nextOffset != InvalidOffsetNumber)
+			if (SGLT_GET_OFFSET(lt->nextOffset) != InvalidOffsetNumber)
 			{
 				/* paranoia about corrupted chain links */
-				if (lt->nextOffset < FirstOffsetNumber ||
-					lt->nextOffset > max ||
-					predecessor[lt->nextOffset] != InvalidOffsetNumber)
+				if (SGLT_GET_OFFSET(lt->nextOffset) < FirstOffsetNumber ||
+					SGLT_GET_OFFSET(lt->nextOffset) > max ||
+					predecessor[SGLT_GET_OFFSET(lt->nextOffset)] != InvalidOffsetNumber)
 					elog(ERROR, "inconsistent tuple chain links in page %u of index \"%s\"",
 						 BufferGetBlockNumber(buffer),
 						 RelationGetRelationName(index));
-				predecessor[lt->nextOffset] = i;
+				predecessor[SGLT_GET_OFFSET(lt->nextOffset)] = i;
 			}
 		}
 		else if (lt->tupstate == SPGIST_REDIRECT)
 		{
 			SpGistDeadTuple dt = (SpGistDeadTuple) lt;
 
-			Assert(dt->nextOffset == InvalidOffsetNumber);
+			/*
+			 * Dead tuple nextOffset is allowed to have any values of two
+			 * highest bits in case it is inherited from SpGistLeafTuple where
+			 * these bits has their own meaning.
+			 */
+			Assert(SGLT_GET_OFFSET(dt->nextOffset) == InvalidOffsetNumber);
 			Assert(ItemPointerIsValid(&dt->pointer));
 
 			/*
@@ -201,7 +206,7 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
 		}
 		else
 		{
-			Assert(lt->nextOffset == InvalidOffsetNumber);
+			Assert(SGLT_GET_OFFSET(lt->nextOffset) == InvalidOffsetNumber);
 		}
 	}
 
@@ -250,7 +255,7 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
 		prevLive = deletable[i] ? InvalidOffsetNumber : i;
 
 		/* scan down the chain ... */
-		j = head->nextOffset;
+		j = SGLT_GET_OFFSET(head->nextOffset);
 		while (j != InvalidOffsetNumber)
 		{
 			SpGistLeafTuple lt;
@@ -301,7 +306,7 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
 				interveningDeletable = false;
 			}
 
-			j = lt->nextOffset;
+			j = SGLT_GET_OFFSET(lt->nextOffset);
 		}
 
 		if (prevLive == InvalidOffsetNumber)
@@ -366,7 +371,7 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
 		lt = (SpGistLeafTuple) PageGetItem(page,
 										   PageGetItemId(page, chainSrc[i]));
 		Assert(lt->tupstate == SPGIST_LIVE);
-		lt->nextOffset = chainDest[i];
+		SGLT_SET_OFFSET(lt->nextOffset, chainDest[i]);
 	}
 
 	MarkBufferDirty(buffer);
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c
index 7be2291d07..4022e3af07 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -122,8 +122,8 @@ spgRedoAddLeaf(XLogReaderState *record)
 
 				head = (SpGistLeafTuple) PageGetItem(page,
 													 PageGetItemId(page, xldata->offnumHeadLeaf));
-				Assert(head->nextOffset == leafTupleHdr.nextOffset);
-				head->nextOffset = xldata->offnumLeaf;
+				Assert(SGLT_GET_OFFSET(head->nextOffset) == SGLT_GET_OFFSET(leafTupleHdr.nextOffset));
+				SGLT_SET_OFFSET(head->nextOffset, xldata->offnumLeaf);
 			}
 		}
 		else
@@ -822,7 +822,7 @@ spgRedoVacuumLeaf(XLogReaderState *record)
 			lt = (SpGistLeafTuple) PageGetItem(page,
 											   PageGetItemId(page, chainSrc[i]));
 			Assert(lt->tupstate == SPGIST_LIVE);
-			lt->nextOffset = chainDest[i];
+			SGLT_SET_OFFSET(lt->nextOffset, chainDest[i]);
 		}
 
 		PageSetLSN(page, lsn);
diff --git a/src/include/access/spgist_private.h b/src/include/access/spgist_private.h
index 00b98ec6a0..8d03adb8f5 100644
--- a/src/include/access/spgist_private.h
+++ b/src/include/access/spgist_private.h
@@ -141,6 +141,7 @@ typedef struct SpGistState
 	SpGistTypeDesc attLeafType; /* type of leaf-tuple values */
 	SpGistTypeDesc attPrefixType;	/* type of inner-tuple prefix values */
 	SpGistTypeDesc attLabelType;	/* type of node label values */
+	TupleDesc	includeTupdesc; /* tuple descriptor of included columns */
 
 	char	   *deadTupleStorage;	/* workspace for spgFormDeadTuple */
 
@@ -148,6 +149,98 @@ typedef struct SpGistState
 	bool		isBuild;		/* true if doing index build */
 } SpGistState;
 
+/*
+ * SPGiST leaf tuple: carries a datum and a heap tuple TID
+ *
+ * In the simplest case, the datum is the same as the indexed value; but
+ * it could also be a suffix or some other sort of delta that permits
+ * reconstruction given knowledge of the prefix path traversed to get here.
+ *
+ * The size field is wider than could possibly be needed for an on-disk leaf
+ * tuple, but this allows us to form leaf tuples even when the datum is too
+ * wide to be stored immediately, and it costs nothing because of alignment
+ * considerations.
+ *
+ * Normally, nextOffset links to the next tuple belonging to the same parent
+ * node (which must be on the same page).  But when the root page is a leaf
+ * page, we don't chain its tuples, so nextOffset is always 0 on the root.
+ *
+ * size must be a multiple of MAXALIGN; also, it must be at least SGDTSIZE
+ * so that the tuple can be converted to REDIRECT status later.  (This
+ * restriction only adds bytes for the null-datum case, otherwise alignment
+ * restrictions force it anyway.)
+ *
+ * In a leaf tuple for a NULL indexed value, there's no useful datum value;
+ * however, the SGDTSIZE limit ensures that's there's a Datum word there
+ * anyway, so SGLTDATUM can be applied safely as long as you don't do
+ * anything with the result.
+ *
+ * Minimum space to store SpGistLeafTuple on a page is 12 bytes tuple header
+ * and 4 bytes ItemIdData so 14 lower bits of nextOffset (accessed as
+ * SGLT_GET/SET_OFFSET) is enough to store actual tuple number on a page even
+ * if page size is 64Kb. Two higher bits are to store per-tuple
+ * information is there nulls mask exist and is there any included attribute
+ * of variable length type.
+ */
+
+typedef struct SpGistLeafTupleData
+{
+	unsigned int tupstate:2,	/* LIVE/REDIRECT/DEAD/PLACEHOLDER */
+				size:30;		/* large enough for any palloc'able value */
+	OffsetNumber nextOffset;	/* higher 1 bit = 1 if included values has
+								 * nulls, 2 bit = 1 if included values contain
+								 * variable length values, lower 15 bits - is
+								 * "actual" nextOffset i.e. number of next
+								 * tuple in chain on a page, or
+								 * InvalidOffsetNumber. They SHOULD NOT be
+								 * set/read directly,
+								 * SGLT_SET_XXX/SGLT_GET_XXX macros must be
+								 * used instead. */
+	ItemPointerData heapPtr;	/* TID of represented heap tuple */
+	/* leaf datum follows */
+
+	/*
+	 * if SGLT_GET_CONTAINSNULLMASK nullmask follows. Its size (number of
+	 * included columns/8)+1
+	 */
+	/* include attributes follow if any */
+} SpGistLeafTupleData;
+
+typedef SpGistLeafTupleData *SpGistLeafTuple;
+
+#define SGLTHDRSZ			MAXALIGN(sizeof(SpGistLeafTupleData))
+#define SGLTDATAPTR(x)		(((char *) (x)) + SGLTHDRSZ)
+#define SGLTDATUM(x, s)		((s)->attLeafType.attbyval ? \
+							*(Datum *) SGLTDATAPTR(x) : \
+							PointerGetDatum(SGLTDATAPTR(x)))
+/*
+ * Accessor macros to get and set actual 14-bit offset and two bit flags from/to
+ * nextOffset value.
+ */
+#define SGLT_GET_OFFSET(x) 	( (x) & 0x3FFF )
+#define SGLT_GET_CONTAINSNULLMASK(x) ( (x) >> 15 )
+#define SGLT_GET_CONTAINSVARATT(x) ( ( (x) & 4000 ) >> 14 )
+#define SGLT_SET_OFFSET(x,o) ( (x) = ( (x) & 0xC000 ) | ( (o) & 0x3FFF) )
+#define SGLT_SET_CONTAINSNULLMASK(x,n) ( (x) = ( (n) << 15 ) | ( (x) & 0x3FFF ) )
+#define SGLT_SET_CONTAINSVARATT(x,v) ( (x) = ( (v) << 14 ) | ( (x) & 0xBFFF ) )
+
+#define SGLT_GET_INCLUDE_TUPSIZE(x) SGLT_GET_OFFSET(x)
+#define SGLT_SET_INCLUDE_TUPSIZE(x,o) SGLT_SET_OFFSET(x,o)
+
+extern char *SpGistFormIncludeTuple(TupleDesc tupleDescriptor, Datum *values,
+									bool *isnull, uint16 *tupdata);
+
+/*
+ * SPGiST dead tuple: declaration for examining non-live tuples
+ *
+ * The tupstate field of this struct must match those of regular inner and
+ * leaf tuples, and its size field must match a leaf tuple's.
+ * Also, the pointer field must be in the same place as a leaf tuple's heapPtr
+ * field, to satisfy some Asserts that we make when replacing a leaf tuple
+ * with a dead tuple.
+ * We don't use nextOffset, but it's needed to align the pointer field.
+ */
+
 typedef struct SpGistSearchItem
 {
 	pairingheap_node phNode;	/* pairing heap node */
@@ -160,14 +253,14 @@ typedef struct SpGistSearchItem
 	bool		isLeaf;			/* SearchItem is heap item */
 	bool		recheck;		/* qual recheck is needed */
 	bool		recheckDistances;	/* distance recheck is needed */
-
+	SpGistLeafTuple leafTuple;
 	/* array with numberOfOrderBys entries */
 	double		distances[FLEXIBLE_ARRAY_MEMBER];
+	/* if there are include columns SpGistLeafTupleData follow */
 } SpGistSearchItem;
 
 #define SizeOfSpGistSearchItem(n_distances) \
 	(offsetof(SpGistSearchItem, distances) + sizeof(double) * (n_distances))
-
 /*
  * Private state of an index scan
  */
@@ -241,6 +334,7 @@ typedef struct SpGistCache
 	SpGistTypeDesc attLeafType; /* type of leaf-tuple values */
 	SpGistTypeDesc attPrefixType;	/* type of inner-tuple prefix values */
 	SpGistTypeDesc attLabelType;	/* type of node label values */
+	TupleDesc	includeTupdesc;
 
 	SpGistLUPCache lastUsedPages;	/* local storage of last-used info */
 } SpGistCache;
@@ -321,60 +415,6 @@ typedef SpGistNodeTupleData *SpGistNodeTuple;
 							 *(Datum *) SGNTDATAPTR(x) : \
 							 PointerGetDatum(SGNTDATAPTR(x)))
 
-/*
- * SPGiST leaf tuple: carries a datum and a heap tuple TID
- *
- * In the simplest case, the datum is the same as the indexed value; but
- * it could also be a suffix or some other sort of delta that permits
- * reconstruction given knowledge of the prefix path traversed to get here.
- *
- * The size field is wider than could possibly be needed for an on-disk leaf
- * tuple, but this allows us to form leaf tuples even when the datum is too
- * wide to be stored immediately, and it costs nothing because of alignment
- * considerations.
- *
- * Normally, nextOffset links to the next tuple belonging to the same parent
- * node (which must be on the same page).  But when the root page is a leaf
- * page, we don't chain its tuples, so nextOffset is always 0 on the root.
- *
- * size must be a multiple of MAXALIGN; also, it must be at least SGDTSIZE
- * so that the tuple can be converted to REDIRECT status later.  (This
- * restriction only adds bytes for the null-datum case, otherwise alignment
- * restrictions force it anyway.)
- *
- * In a leaf tuple for a NULL indexed value, there's no useful datum value;
- * however, the SGDTSIZE limit ensures that's there's a Datum word there
- * anyway, so SGLTDATUM can be applied safely as long as you don't do
- * anything with the result.
- */
-typedef struct SpGistLeafTupleData
-{
-	unsigned int tupstate:2,	/* LIVE/REDIRECT/DEAD/PLACEHOLDER */
-				size:30;		/* large enough for any palloc'able value */
-	OffsetNumber nextOffset;	/* next tuple in chain, or InvalidOffsetNumber */
-	ItemPointerData heapPtr;	/* TID of represented heap tuple */
-	/* leaf datum follows */
-} SpGistLeafTupleData;
-
-typedef SpGistLeafTupleData *SpGistLeafTuple;
-
-#define SGLTHDRSZ			MAXALIGN(sizeof(SpGistLeafTupleData))
-#define SGLTDATAPTR(x)		(((char *) (x)) + SGLTHDRSZ)
-#define SGLTDATUM(x, s)		((s)->attLeafType.attbyval ? \
-							 *(Datum *) SGLTDATAPTR(x) : \
-							 PointerGetDatum(SGLTDATAPTR(x)))
-
-/*
- * SPGiST dead tuple: declaration for examining non-live tuples
- *
- * The tupstate field of this struct must match those of regular inner and
- * leaf tuples, and its size field must match a leaf tuple's.
- * Also, the pointer field must be in the same place as a leaf tuple's heapPtr
- * field, to satisfy some Asserts that we make when replacing a leaf tuple
- * with a dead tuple.
- * We don't use nextOffset, but it's needed to align the pointer field.
- * pointer and xid are only valid when tupstate = REDIRECT.
- */
 typedef struct SpGistDeadTupleData
 {
 	unsigned int tupstate:2,	/* LIVE/REDIRECT/DEAD/PLACEHOLDER */
@@ -394,7 +434,6 @@ typedef SpGistDeadTupleData *SpGistDeadTuple;
  * size plus sizeof(ItemIdData) (for the line pointer).  This works correctly
  * so long as tuple sizes are always maxaligned.
  */
-
 /* Page capacity after allowing for fixed header and special space */
 #define SPGIST_PAGE_CAPACITY  \
 	MAXALIGN_DOWN(BLCKSZ - \
@@ -456,9 +495,10 @@ extern void SpGistInitPage(Page page, uint16 f);
 extern void SpGistInitBuffer(Buffer b, uint16 f);
 extern void SpGistInitMetapage(Page page);
 extern unsigned int SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum);
+extern unsigned int SpgLeafSize(SpGistState *state, Datum *datum, bool *isnull);
 extern SpGistLeafTuple spgFormLeafTuple(SpGistState *state,
 										ItemPointer heapPtr,
-										Datum datum, bool isnull);
+										Datum *datum, bool *isnull);
 extern SpGistNodeTuple spgFormNodeTuple(SpGistState *state,
 										Datum label, bool isnull);
 extern SpGistInnerTuple spgFormInnerTuple(SpGistState *state,
@@ -466,6 +506,8 @@ extern SpGistInnerTuple spgFormInnerTuple(SpGistState *state,
 										  int nNodes, SpGistNodeTuple *nodes);
 extern SpGistDeadTuple spgFormDeadTuple(SpGistState *state, int tupstate,
 										BlockNumber blkno, OffsetNumber offnum);
+extern void SpGistDeformLeafTuple(SpGistLeafTuple tup, SpGistState *state,
+								  Datum *datum, bool *isnull, bool key_value_isnull);
 extern Datum *spgExtractNodeLabels(SpGistState *state,
 								   SpGistInnerTuple innerTuple);
 extern OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page,
@@ -484,7 +526,7 @@ extern void spgPageIndexMultiDelete(SpGistState *state, Page page,
 									int firststate, int reststate,
 									BlockNumber blkno, OffsetNumber offnum);
 extern bool spgdoinsert(Relation index, SpGistState *state,
-						ItemPointer heapPtr, Datum datum, bool isnull);
+						ItemPointer heapPtr, Datum *datum, bool *isnull);
 
 /* spgproc.c */
 extern double *spg_key_orderbys_distances(Datum key, bool isLeaf,
diff --git a/src/test/regress/expected/amutils.out b/src/test/regress/expected/amutils.out
index d92a6d12c6..93e6a43b6d 100644
--- a/src/test/regress/expected/amutils.out
+++ b/src/test/regress/expected/amutils.out
@@ -169,9 +169,9 @@ select amname, prop, pg_indexam_has_property(a.oid, prop) as p
  hash   | bogus         | 
  spgist | can_order     | f
  spgist | can_unique    | f
- spgist | can_multi_col | f
+ spgist | can_multi_col | t
  spgist | can_exclude   | t
- spgist | can_include   | f
+ spgist | can_include   | t
  spgist | bogus         | 
 (36 rows)
 
diff --git a/src/test/regress/expected/index_including.out b/src/test/regress/expected/index_including.out
index 8e5d53e712..4fd2b7e878 100644
--- a/src/test/regress/expected/index_including.out
+++ b/src/test/regress/expected/index_including.out
@@ -356,7 +356,6 @@ CREATE INDEX on tbl USING brin(c1, c2) INCLUDE (c3, c4);
 ERROR:  access method "brin" does not support included columns
 CREATE INDEX on tbl USING gist(c3) INCLUDE (c1, c4);
 CREATE INDEX on tbl USING spgist(c3) INCLUDE (c4);
-ERROR:  access method "spgist" does not support included columns
 CREATE INDEX on tbl USING gin(c1, c2) INCLUDE (c3, c4);
 ERROR:  access method "gin" does not support included columns
 CREATE INDEX on tbl USING hash(c1, c2) INCLUDE (c3, c4);
diff --git a/src/test/regress/expected/index_including_spgist.out b/src/test/regress/expected/index_including_spgist.out
new file mode 100644
index 0000000000..fa64766fb7
--- /dev/null
+++ b/src/test/regress/expected/index_including_spgist.out
@@ -0,0 +1,139 @@
+/*
+ * 1.1. test CREATE INDEX with buffered build
+ */
+-- Regular index with included columns
+CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box);
+-- size is chosen to exceed page size and trigger actual truncation
+INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,8000) AS x;
+CREATE INDEX tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c2,c3);
+SELECT pg_get_indexdef(i.indexrelid)
+FROM pg_index i JOIN pg_class c ON i.indexrelid = c.oid
+WHERE i.indrelid = 'tbl_spgist'::regclass ORDER BY c.relname;
+                                     pg_get_indexdef                                     
+-----------------------------------------------------------------------------------------
+ CREATE INDEX tbl_spgist_idx ON public.tbl_spgist USING spgist (c4) INCLUDE (c1, c2, c3)
+(1 row)
+
+SELECT * FROM tbl_spgist where c4 <@ box(point(1,1),point(10,10));
+ c1 | c2 | c3 |     c4      
+----+----+----+-------------
+  1 |  2 |  3 | (2,3),(1,2)
+  2 |  4 |  6 | (4,5),(2,3)
+  3 |  6 |  9 | (6,7),(3,4)
+  4 |  8 | 12 | (8,9),(4,5)
+(4 rows)
+
+SET enable_bitmapscan TO off;
+EXPLAIN  (costs off) SELECT * FROM tbl_spgist where c4 <@ box(point(1,1),point(10,10));
+                     QUERY PLAN                     
+----------------------------------------------------
+ Index Only Scan using tbl_spgist_idx on tbl_spgist
+   Index Cond: (c4 <@ '(10,10),(1,1)'::box)
+(2 rows)
+
+SET enable_bitmapscan TO default;
+DROP TABLE tbl_spgist;
+/*
+ * 1.2. test CREATE INDEX with inserts
+ */
+-- Regular index with included columns
+CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box);
+-- size is chosen to exceed page size and trigger actual truncation
+CREATE INDEX tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c2,c3);
+INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,8000) AS x;
+SELECT pg_get_indexdef(i.indexrelid)
+FROM pg_index i JOIN pg_class c ON i.indexrelid = c.oid
+WHERE i.indrelid = 'tbl_spgist'::regclass ORDER BY c.relname;
+                                     pg_get_indexdef                                     
+-----------------------------------------------------------------------------------------
+ CREATE INDEX tbl_spgist_idx ON public.tbl_spgist USING spgist (c4) INCLUDE (c1, c2, c3)
+(1 row)
+
+SELECT * FROM tbl_spgist where c4 <@ box(point(1,1),point(10,10));
+ c1 | c2 | c3 |     c4      
+----+----+----+-------------
+  1 |  2 |  3 | (2,3),(1,2)
+  2 |  4 |  6 | (4,5),(2,3)
+  3 |  6 |  9 | (6,7),(3,4)
+  4 |  8 | 12 | (8,9),(4,5)
+(4 rows)
+
+SET enable_bitmapscan TO off;
+EXPLAIN  (costs off) SELECT * FROM tbl_spgist where c4 <@ box(point(1,1),point(10,10));
+                     QUERY PLAN                     
+----------------------------------------------------
+ Index Only Scan using tbl_spgist_idx on tbl_spgist
+   Index Cond: (c4 <@ '(10,10),(1,1)'::box)
+(2 rows)
+
+SET enable_bitmapscan TO default;
+DROP TABLE tbl_spgist;
+/*
+ * 2. CREATE INDEX CONCURRENTLY
+ */
+CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,10) AS x;
+CREATE INDEX CONCURRENTLY tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c2,c3);
+SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl_spgist' ORDER BY indexname;
+                                        indexdef                                         
+-----------------------------------------------------------------------------------------
+ CREATE INDEX tbl_spgist_idx ON public.tbl_spgist USING spgist (c4) INCLUDE (c1, c2, c3)
+(1 row)
+
+DROP TABLE tbl_spgist;
+/*
+ * 3. REINDEX
+ */
+CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,10) AS x;
+CREATE INDEX tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c3);
+SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl_spgist' ORDER BY indexname;
+                                      indexdef                                       
+-------------------------------------------------------------------------------------
+ CREATE INDEX tbl_spgist_idx ON public.tbl_spgist USING spgist (c4) INCLUDE (c1, c3)
+(1 row)
+
+REINDEX INDEX tbl_spgist_idx;
+SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl_spgist' ORDER BY indexname;
+                                      indexdef                                       
+-------------------------------------------------------------------------------------
+ CREATE INDEX tbl_spgist_idx ON public.tbl_spgist USING spgist (c4) INCLUDE (c1, c3)
+(1 row)
+
+ALTER TABLE tbl_spgist DROP COLUMN c1;
+SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl_spgist' ORDER BY indexname;
+ indexdef 
+----------
+(0 rows)
+
+DROP TABLE tbl_spgist;
+/*
+ * 4. Update, delete values in indexed table.
+ */
+CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,10) AS x;
+CREATE INDEX tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c3);
+UPDATE tbl_spgist SET c1 = 100 WHERE c1 = 2;
+UPDATE tbl_spgist SET c1 = 1 WHERE c1 = 3;
+DELETE FROM tbl_spgist WHERE c1 = 5 OR c3 = 12;
+DROP TABLE tbl_spgist;
+/*
+ * 5. Alter column type.
+ */
+CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,10) AS x;
+CREATE INDEX tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c3);
+ALTER TABLE tbl_spgist ALTER c1 TYPE bigint;
+ALTER TABLE tbl_spgist ALTER c3 TYPE bigint;
+\d tbl_spgist
+             Table "public.tbl_spgist"
+ Column |  Type   | Collation | Nullable | Default 
+--------+---------+-----------+----------+---------
+ c1     | bigint  |           |          | 
+ c2     | integer |           |          | 
+ c3     | bigint  |           |          | 
+ c4     | box     |           |          | 
+Indexes:
+    "tbl_spgist_idx" spgist (c4) INCLUDE (c1, c3)
+
+DROP TABLE tbl_spgist;
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 026ea880cd..985458a1a8 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -50,7 +50,7 @@ test: copy copyselect copydml insert insert_conflict
 # ----------
 test: create_misc create_operator create_procedure
 # These depend on create_misc and create_operator
-test: create_index create_index_spgist create_view index_including index_including_gist
+test: create_index create_index_spgist create_view index_including index_including_gist index_including_spgist
 
 # ----------
 # Another group of parallel tests
diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule
index 979d926119..f3df961535 100644
--- a/src/test/regress/serial_schedule
+++ b/src/test/regress/serial_schedule
@@ -68,6 +68,7 @@ test: create_index_spgist
 test: create_view
 test: index_including
 test: index_including_gist
+test: index_including_spgist
 test: create_aggregate
 test: create_function_3
 test: create_cast
diff --git a/src/test/regress/sql/index_including_spgist.sql b/src/test/regress/sql/index_including_spgist.sql
new file mode 100644
index 0000000000..a59e73aa22
--- /dev/null
+++ b/src/test/regress/sql/index_including_spgist.sql
@@ -0,0 +1,81 @@
+/*
+ * 1.1. test CREATE INDEX with buffered build
+ */
+
+-- Regular index with included columns
+CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box);
+-- size is chosen to exceed page size and trigger actual truncation
+INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,8000) AS x;
+CREATE INDEX tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c2,c3);
+SELECT pg_get_indexdef(i.indexrelid)
+FROM pg_index i JOIN pg_class c ON i.indexrelid = c.oid
+WHERE i.indrelid = 'tbl_spgist'::regclass ORDER BY c.relname;
+SELECT * FROM tbl_spgist where c4 <@ box(point(1,1),point(10,10));
+SET enable_bitmapscan TO off;
+EXPLAIN  (costs off) SELECT * FROM tbl_spgist where c4 <@ box(point(1,1),point(10,10));
+SET enable_bitmapscan TO default;
+DROP TABLE tbl_spgist;
+
+/*
+ * 1.2. test CREATE INDEX with inserts
+ */
+
+-- Regular index with included columns
+CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box);
+-- size is chosen to exceed page size and trigger actual truncation
+CREATE INDEX tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c2,c3);
+INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,8000) AS x;
+SELECT pg_get_indexdef(i.indexrelid)
+FROM pg_index i JOIN pg_class c ON i.indexrelid = c.oid
+WHERE i.indrelid = 'tbl_spgist'::regclass ORDER BY c.relname;
+SELECT * FROM tbl_spgist where c4 <@ box(point(1,1),point(10,10));
+SET enable_bitmapscan TO off;
+EXPLAIN  (costs off) SELECT * FROM tbl_spgist where c4 <@ box(point(1,1),point(10,10));
+SET enable_bitmapscan TO default;
+DROP TABLE tbl_spgist;
+
+/*
+ * 2. CREATE INDEX CONCURRENTLY
+ */
+CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,10) AS x;
+CREATE INDEX CONCURRENTLY tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c2,c3);
+SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl_spgist' ORDER BY indexname;
+DROP TABLE tbl_spgist;
+
+
+/*
+ * 3. REINDEX
+ */
+CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,10) AS x;
+CREATE INDEX tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c3);
+SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl_spgist' ORDER BY indexname;
+REINDEX INDEX tbl_spgist_idx;
+SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl_spgist' ORDER BY indexname;
+ALTER TABLE tbl_spgist DROP COLUMN c1;
+SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl_spgist' ORDER BY indexname;
+DROP TABLE tbl_spgist;
+
+/*
+ * 4. Update, delete values in indexed table.
+ */
+CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,10) AS x;
+CREATE INDEX tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c3);
+UPDATE tbl_spgist SET c1 = 100 WHERE c1 = 2;
+UPDATE tbl_spgist SET c1 = 1 WHERE c1 = 3;
+DELETE FROM tbl_spgist WHERE c1 = 5 OR c3 = 12;
+DROP TABLE tbl_spgist;
+
+/*
+ * 5. Alter column type.
+ */
+CREATE TABLE tbl_spgist (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl_spgist SELECT x, 2*x, 3*x, box(point(x,x+1),point(2*x,2*x+1)) FROM generate_series(1,10) AS x;
+CREATE INDEX tbl_spgist_idx ON tbl_spgist using spgist (c4) INCLUDE (c1,c3);
+ALTER TABLE tbl_spgist ALTER c1 TYPE bigint;
+ALTER TABLE tbl_spgist ALTER c3 TYPE bigint;
+\d tbl_spgist
+DROP TABLE tbl_spgist;
+
-- 
2.28.0

