From 71ce0f09c601f6dfacf0fb807f240e5c7e6374ab Mon Sep 17 00:00:00 2001
From: Andrey <amborodin@acm.org>
Date: Thu, 15 Aug 2019 16:43:16 +0300
Subject: [PATCH] In amcheck nbtree do rightlink verification with lock
 coupling

When doing nbtree verification we ignore rightlink-leftling invariant violations
unless ShareLock is taken. To ensure detection of corruption, we couple page
locks and recheck links consistency again.
---
 contrib/amcheck/verify_nbtree.c | 41 +++++++++++++++++++++++++++++++++
 1 file changed, 41 insertions(+)

diff --git a/contrib/amcheck/verify_nbtree.c b/contrib/amcheck/verify_nbtree.c
index ef7e411cdb..c6467530ed 100644
--- a/contrib/amcheck/verify_nbtree.c
+++ b/contrib/amcheck/verify_nbtree.c
@@ -122,6 +122,7 @@ static void bt_index_check_internal(Oid indrelid, bool parentcheck,
 static inline bool btree_index_checkable(Relation rel);
 static void bt_check_every_level(Relation rel, Relation heaprel,
 					 bool readonly, bool heapallindexed);
+static void bt_recheck_block_rightlink(BtreeCheckState *state, BlockNumber leftblockno);
 static BtreeLevel bt_check_level_from_leftmost(BtreeCheckState *state,
 							 BtreeLevel level);
 static void bt_target_page_check(BtreeCheckState *state);
@@ -560,6 +561,42 @@ bt_check_every_level(Relation rel, Relation heaprel, bool readonly,
 	MemoryContextDelete(state->targetcontext);
 }
 
+/*
+ * Verify that coherence of rightling's leftlink under lock
+ */
+static void bt_recheck_block_rightlink(BtreeCheckState *state, BlockNumber leftblockno)
+{
+	Buffer		lbuffer, rbuffer;
+	Page		lpage,rpage;
+	BTPageOpaque lopaque, ropaque;
+
+	/* Read and lock left block */
+	lbuffer = ReadBufferExtended(state->rel, MAIN_FORKNUM, leftblockno, RBM_NORMAL,
+								state->checkstrategy);
+	LockBuffer(lbuffer, BT_READ);
+	lpage = BufferGetPage(lbuffer);
+	lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
+
+	/* Read right block by following lopaque->btpo_next of left block */
+	rbuffer = ReadBufferExtended(state->rel, MAIN_FORKNUM, lopaque->btpo_next, RBM_NORMAL,
+								 state->checkstrategy);
+	/* Here we are going to couple locks on left block and right block */
+	LockBuffer(rbuffer, BT_READ);
+	rpage = BufferGetPage(rbuffer);
+	ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage);
+
+	if (ropaque->btpo_prev != leftblockno)
+		ereport(ERROR,
+				(errcode(ERRCODE_INDEX_CORRUPTED),
+				 errmsg("left link/right link pair in index \"%s\" not in agreement",
+						RelationGetRelationName(state->rel)),
+				 errdetail_internal("Block=%u left block=%u left link from block=%u.",
+									leftblockno, lopaque->btpo_next, ropaque->btpo_prev)));
+
+	UnlockReleaseBuffer(lbuffer);
+	UnlockReleaseBuffer(rbuffer);
+}
+
 /*
  * Given a left-most block at some level, move right, verifying each page
  * individually (with more verification across pages for "readonly"
@@ -713,6 +750,7 @@ bt_check_level_from_leftmost(BtreeCheckState *state, BtreeLevel level)
 		/*
 		 * readonly mode can only ever land on live pages and half-dead pages,
 		 * so sibling pointers should always be in mutual agreement
+		 * if not in readonly mode - we have to recheck links under lock
 		 */
 		if (state->readonly && opaque->btpo_prev != leftcurrent)
 			ereport(ERROR,
@@ -721,6 +759,9 @@ bt_check_level_from_leftmost(BtreeCheckState *state, BtreeLevel level)
 							RelationGetRelationName(state->rel)),
 					 errdetail_internal("Block=%u left block=%u left link from block=%u.",
 										current, leftcurrent, opaque->btpo_prev)));
+		else if (opaque->btpo_prev != leftcurrent && level.level == 0)
+		/* on leaf level - recheck rightlinks */
+			bt_recheck_block_rightlink(state, leftcurrent);
 
 		/* Check level, which must be valid for non-ignorable page */
 		if (level.level != opaque->btpo.level)
-- 
2.20.1

