From 9a6a6ca4f1877f0a112c63b6e47074fa15dd1d4b Mon Sep 17 00:00:00 2001
From: Matthias van de Meent <boekewurm+postgres@gmail.com>
Date: Fri, 25 Apr 2025 17:29:42 +0200
Subject: [PATCH v01 3/4] SP-GIST: Fix visibility issues in IOS

Previously, SP-GIST IOS could buffer tuples from pages while VACUUM came
along and cleaned up an ALL_DEAD tuple, marking the tuple's page
ALL_VISIBLE again and making IOS mistakenly believe the tuple is indeed
visible.

With this commit, buffer pins now conflict with SP-GIST vacuum, and we now
do visibility checks on heap items returned from index pages while we
hold the pin, so that the IOS infrastructure knows to recheck the heap
page even if that heap page has become ALL_VISIBLE after we released the
index page.

Idea from Heikki Linnakangas

Backpatch: 17-
---
 src/include/access/spgist_private.h   |  5 ++
 src/backend/access/spgist/spgscan.c   | 72 +++++++++++++++++++++------
 src/backend/access/spgist/spgvacuum.c |  2 +-
 3 files changed, 64 insertions(+), 15 deletions(-)

diff --git a/src/include/access/spgist_private.h b/src/include/access/spgist_private.h
index e7cbe10a89b..dd1dce660e8 100644
--- a/src/include/access/spgist_private.h
+++ b/src/include/access/spgist_private.h
@@ -21,6 +21,7 @@
 #include "storage/buf.h"
 #include "utils/geo_decls.h"
 #include "utils/relcache.h"
+#include "tableam.h"
 
 
 typedef struct SpGistOptions
@@ -175,6 +176,7 @@ typedef struct SpGistSearchItem
 	bool		isLeaf;			/* SearchItem is heap item */
 	bool		recheck;		/* qual recheck is needed */
 	bool		recheckDistances;	/* distance recheck is needed */
+	uint8		visrecheck;		/* IOS: TMVC_Result of contained heap tuple */
 
 	/* array with numberOfOrderBys entries */
 	double		distances[FLEXIBLE_ARRAY_MEMBER];
@@ -223,6 +225,7 @@ typedef struct SpGistScanOpaqueData
 
 	/* These fields are only used in amgettuple scans: */
 	bool		want_itup;		/* are we reconstructing tuples? */
+	Buffer		vmbuf;			/* IOS: used for table_index_vischeck_tuples */
 	TupleDesc	reconTupDesc;	/* if so, descriptor for reconstructed tuples */
 	int			nPtrs;			/* number of TIDs found on current page */
 	int			iPtr;			/* index for scanning through same */
@@ -231,6 +234,8 @@ typedef struct SpGistScanOpaqueData
 	bool		recheckDistances[MaxIndexTuplesPerPage];	/* distance recheck
 															 * flags */
 	HeapTuple	reconTups[MaxIndexTuplesPerPage];	/* reconstructed tuples */
+	/* support for IOS */
+	uint8	   *visrecheck;		/* IOS vis check results, counted by nPtrs */
 
 	/* distances (for recheck) */
 	IndexOrderByDistance *distances[MaxIndexTuplesPerPage];
diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c
index 03293a7816e..f3e5d9bf868 100644
--- a/src/backend/access/spgist/spgscan.c
+++ b/src/backend/access/spgist/spgscan.c
@@ -30,7 +30,8 @@
 typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr,
 							   Datum leafValue, bool isNull,
 							   SpGistLeafTuple leafTuple, bool recheck,
-							   bool recheckDistances, double *distances);
+							   bool recheckDistances, double *distances,
+							   TMVC_Result vischeck);
 
 /*
  * Pairing heap comparison function for the SpGistSearchItem queue.
@@ -142,6 +143,7 @@ spgAddStartItem(SpGistScanOpaque so, bool isnull)
 	startEntry->traversalValue = NULL;
 	startEntry->recheck = false;
 	startEntry->recheckDistances = false;
+	startEntry->visrecheck = TMVC_Unchecked;
 
 	spgAddSearchItemToQueue(so, startEntry);
 }
@@ -189,6 +191,11 @@ resetSpGistScanOpaque(SpGistScanOpaque so)
 
 		for (i = 0; i < so->nPtrs; i++)
 			pfree(so->reconTups[i]);
+
+		if (BufferIsValid(so->vmbuf))
+			ReleaseBuffer(so->vmbuf);
+
+		so->vmbuf = InvalidBuffer;
 	}
 	so->iPtr = so->nPtrs = 0;
 }
@@ -387,6 +394,13 @@ spgrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
 		memmove(scan->keyData, scankey,
 				scan->numberOfKeys * sizeof(ScanKeyData));
 
+	/* prepare index-only scan requirements */
+	if (scan->xs_want_itup)
+	{
+		if (so->visrecheck == NULL)
+			so->visrecheck = palloc(MaxIndexTuplesPerPage);
+	}
+
 	/* initialize order-by data if needed */
 	if (orderbys && scan->numberOfOrderBys > 0)
 	{
@@ -453,6 +467,9 @@ spgendscan(IndexScanDesc scan)
 		pfree(scan->xs_orderbynulls);
 	}
 
+	if (BufferIsValid(so->vmbuf))
+		ReleaseBuffer(so->vmbuf);
+
 	pfree(so);
 }
 
@@ -502,6 +519,7 @@ spgNewHeapItem(SpGistScanOpaque so, int level, SpGistLeafTuple leafTuple,
 	item->isLeaf = true;
 	item->recheck = recheck;
 	item->recheckDistances = recheckDistances;
+	item->visrecheck = TMVC_Unchecked;
 
 	return item;
 }
@@ -515,7 +533,8 @@ spgNewHeapItem(SpGistScanOpaque so, int level, SpGistLeafTuple leafTuple,
 static bool
 spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
 			SpGistLeafTuple leafTuple, bool isnull,
-			bool *reportedSome, storeRes_func storeRes)
+			bool *reportedSome, storeRes_func storeRes,
+			Relation tableRel)
 {
 	Datum		leafValue;
 	double	   *distances;
@@ -571,6 +590,15 @@ spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
 
 	if (result)
 	{
+		TMVC_Result vischeck = TMVC_Unchecked;
+
+		if (so->want_itup)
+		{
+			Assert(PointerIsValid(so->items));
+			vischeck = table_index_vischeck_tuple(tableRel, &so->vmbuf,
+												  &item->heapPtr);
+		}
+
 		/* item passes the scankeys */
 		if (so->numberOfNonNullOrderBys > 0)
 		{
@@ -583,6 +611,7 @@ spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
 														recheckDistances,
 														isnull,
 														distances);
+			heapItem->visrecheck = vischeck;
 
 			spgAddSearchItemToQueue(so, heapItem);
 
@@ -593,7 +622,7 @@ spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
 			/* non-ordered scan, so report the item right away */
 			Assert(!recheckDistances);
 			storeRes(so, &leafTuple->heapPtr, leafValue, isnull,
-					 leafTuple, recheck, false, NULL);
+					 leafTuple, recheck, false, NULL, vischeck);
 			*reportedSome = true;
 		}
 	}
@@ -765,7 +794,8 @@ spgTestLeafTuple(SpGistScanOpaque so,
 				 Page page, OffsetNumber offset,
 				 bool isnull, bool isroot,
 				 bool *reportedSome,
-				 storeRes_func storeRes)
+				 storeRes_func storeRes,
+				 Relation tablerel)
 {
 	SpGistLeafTuple leafTuple = (SpGistLeafTuple)
 		PageGetItem(page, PageGetItemId(page, offset));
@@ -801,7 +831,8 @@ spgTestLeafTuple(SpGistScanOpaque so,
 
 	Assert(ItemPointerIsValid(&leafTuple->heapPtr));
 
-	spgLeafTest(so, item, leafTuple, isnull, reportedSome, storeRes);
+	spgLeafTest(so, item, leafTuple, isnull, reportedSome, storeRes,
+				tablerel);
 
 	return SGLT_GET_NEXTOFFSET(leafTuple);
 }
@@ -814,8 +845,8 @@ spgTestLeafTuple(SpGistScanOpaque so,
  * next page boundary once we have reported at least one tuple.
  */
 static void
-spgWalk(Relation index, SpGistScanOpaque so, bool scanWholeIndex,
-		storeRes_func storeRes)
+spgWalk(Relation index, Relation table, SpGistScanOpaque so,
+		bool scanWholeIndex, storeRes_func storeRes)
 {
 	Buffer		buffer = InvalidBuffer;
 	bool		reportedSome = false;
@@ -837,7 +868,8 @@ redirect:
 			Assert(so->numberOfNonNullOrderBys > 0);
 			storeRes(so, &item->heapPtr, item->value, item->isNull,
 					 item->leafTuple, item->recheck,
-					 item->recheckDistances, item->distances);
+					 item->recheckDistances, item->distances,
+					 item->visrecheck);
 			reportedSome = true;
 		}
 		else
@@ -876,7 +908,8 @@ redirect:
 					for (offset = FirstOffsetNumber; offset <= max; offset++)
 						(void) spgTestLeafTuple(so, item, page, offset,
 												isnull, true,
-												&reportedSome, storeRes);
+												&reportedSome, storeRes,
+												table);
 				}
 				else
 				{
@@ -886,7 +919,8 @@ redirect:
 						Assert(offset >= FirstOffsetNumber && offset <= max);
 						offset = spgTestLeafTuple(so, item, page, offset,
 												  isnull, false,
-												  &reportedSome, storeRes);
+												  &reportedSome, storeRes,
+												  table);
 						if (offset == SpGistRedirectOffsetNumber)
 							goto redirect;
 					}
@@ -931,7 +965,8 @@ static void
 storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr,
 			Datum leafValue, bool isnull,
 			SpGistLeafTuple leafTuple, bool recheck,
-			bool recheckDistances, double *distances)
+			bool recheckDistances, double *distances,
+			TMVC_Result vischeck)
 {
 	Assert(!recheckDistances && !distances);
 	tbm_add_tuples(so->tbm, heapPtr, 1, recheck);
@@ -949,7 +984,7 @@ spggetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 	so->tbm = tbm;
 	so->ntids = 0;
 
-	spgWalk(scan->indexRelation, so, true, storeBitmap);
+	spgWalk(scan->indexRelation, scan->heapRelation, so, true, storeBitmap);
 
 	return so->ntids;
 }
@@ -959,7 +994,8 @@ static void
 storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
 			  Datum leafValue, bool isnull,
 			  SpGistLeafTuple leafTuple, bool recheck,
-			  bool recheckDistances, double *nonNullDistances)
+			  bool recheckDistances, double *nonNullDistances,
+			  TMVC_Result vischeck)
 {
 	Assert(so->nPtrs < MaxIndexTuplesPerPage);
 	so->heapPtrs[so->nPtrs] = *heapPtr;
@@ -1018,6 +1054,8 @@ storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
 		so->reconTups[so->nPtrs] = heap_form_tuple(so->reconTupDesc,
 												   leafDatums,
 												   leafIsnulls);
+
+		so->visrecheck[so->nPtrs] = vischeck;
 	}
 	so->nPtrs++;
 }
@@ -1042,6 +1080,11 @@ spggettuple(IndexScanDesc scan, ScanDirection dir)
 			scan->xs_recheck = so->recheck[so->iPtr];
 			scan->xs_hitup = so->reconTups[so->iPtr];
 
+			if (so->want_itup)
+				scan->xs_visrecheck = so->visrecheck[so->iPtr];
+
+			Assert(!scan->xs_want_itup || scan->xs_visrecheck != TMVC_Unchecked);
+
 			if (so->numberOfOrderBys > 0)
 				index_store_float8_orderby_distances(scan, so->orderByTypes,
 													 so->distances[so->iPtr],
@@ -1070,7 +1113,8 @@ spggettuple(IndexScanDesc scan, ScanDirection dir)
 		}
 		so->iPtr = so->nPtrs = 0;
 
-		spgWalk(scan->indexRelation, so, false, storeGettuple);
+		spgWalk(scan->indexRelation, scan->heapRelation, so, false,
+				storeGettuple);
 
 		if (so->nPtrs == 0)
 			break;				/* must have completed scan */
diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c
index 0da069fd4d7..d0680a5073e 100644
--- a/src/backend/access/spgist/spgvacuum.c
+++ b/src/backend/access/spgist/spgvacuum.c
@@ -629,7 +629,7 @@ spgvacuumpage(spgBulkDeleteState *bds, BlockNumber blkno)
 
 	buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
 								RBM_NORMAL, bds->info->strategy);
-	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+	LockBufferForCleanup(buffer);
 	page = (Page) BufferGetPage(buffer);
 
 	if (PageIsNew(page))
-- 
2.45.2

