From ccb6f0146445b2fd2205b228c247f0ef2a515dfc Mon Sep 17 00:00:00 2001
From: Mark Dilger <mark.dilger@enterprisedb.com>
Date: Tue, 16 Mar 2021 12:32:07 -0700
Subject: [PATCH v11 1/2] Fixing amcheck tuple visibility rules

The implementation of visibility rules in the heap checking code had
diverged considerably from HeapTupleSatisfiesVacuum, upon which it
was based, and with which it was supposed to be compatible.  In
fact, the two functions were not compatible.

Removing the divergent implementation from the heap checking code,
instead copying HeapTupleSatisfiesVacuumHorizon, renaming it,
modifying it to return boolean, to not call SetHintBits, and to not
perform work to distinguish between cases where the same boolean
result will be returned anyway.

Extending amcheck's regression tests with a test case that reliably
reproduces the buggy behavior fixed in this commit, to be sure it
does not come back.
---
 contrib/amcheck/t/001_verify_heapam.pl |  13 +-
 contrib/amcheck/verify_heapam.c        | 292 ++++++++++++-------------
 2 files changed, 150 insertions(+), 155 deletions(-)

diff --git a/contrib/amcheck/t/001_verify_heapam.pl b/contrib/amcheck/t/001_verify_heapam.pl
index 6050feb712..b6fc640a53 100644
--- a/contrib/amcheck/t/001_verify_heapam.pl
+++ b/contrib/amcheck/t/001_verify_heapam.pl
@@ -4,7 +4,7 @@ use warnings;
 use PostgresNode;
 use TestLib;
 
-use Test::More tests => 80;
+use Test::More tests => 128;
 
 my ($node, $result);
 
@@ -17,6 +17,17 @@ $node->append_conf('postgresql.conf', 'autovacuum=off');
 $node->start;
 $node->safe_psql('postgres', q(CREATE EXTENSION amcheck));
 
+#
+# Check for false positives against pg_statistic.  There was a bug in the
+# visibility checking logic that resulted in a consistently reproducible
+# complaint about missing toast table entries for table pg_statistic.  The
+# problem was that main table entries were being checked despite being dead,
+# which is wrong, and though the main table entries were not corrupt, the
+# missing toast was reported.
+#
+$node->safe_psql('postgres', q(ANALYZE));
+check_all_options_uncorrupted('pg_catalog.pg_statistic', 'plain');
+
 #
 # Check a table with data loaded but no corruption, freezing, etc.
 #
diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index 6f972e630a..e377b9ab8e 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -150,6 +150,8 @@ static XidBoundsViolation get_xid_status(TransactionId xid,
 										 HeapCheckContext *ctx,
 										 XidCommitStatus *status);
 
+static bool heap_tuple_satisfies_corruption_checking(HeapTupleHeader tuphdr);
+
 /*
  * Scan and report corruption in heap pages, optionally reconciling toasted
  * attributes with entries in the associated toast table.  Intended to be
@@ -658,160 +660,7 @@ check_tuple_header_and_visibilty(HeapTupleHeader tuphdr, HeapCheckContext *ctx)
 	 * HeapTupleSatisfiesVacuum.  Where possible the comments indicate which
 	 * HTSV_Result we think that function might return for this tuple.
 	 */
-	if (!HeapTupleHeaderXminCommitted(tuphdr))
-	{
-		TransactionId raw_xmin = HeapTupleHeaderGetRawXmin(tuphdr);
-
-		if (HeapTupleHeaderXminInvalid(tuphdr))
-			return false;		/* HEAPTUPLE_DEAD */
-		/* Used by pre-9.0 binary upgrades */
-		else if (infomask & HEAP_MOVED_OFF ||
-				 infomask & HEAP_MOVED_IN)
-		{
-			XidCommitStatus status;
-			TransactionId xvac = HeapTupleHeaderGetXvac(tuphdr);
-
-			switch (get_xid_status(xvac, ctx, &status))
-			{
-				case XID_INVALID:
-					report_corruption(ctx,
-									  pstrdup("old-style VACUUM FULL transaction ID is invalid"));
-					return false;	/* corrupt */
-				case XID_IN_FUTURE:
-					report_corruption(ctx,
-									  psprintf("old-style VACUUM FULL transaction ID %u equals or exceeds next valid transaction ID %u:%u",
-											   xvac,
-											   EpochFromFullTransactionId(ctx->next_fxid),
-											   XidFromFullTransactionId(ctx->next_fxid)));
-					return false;	/* corrupt */
-				case XID_PRECEDES_RELMIN:
-					report_corruption(ctx,
-									  psprintf("old-style VACUUM FULL transaction ID %u precedes relation freeze threshold %u:%u",
-											   xvac,
-											   EpochFromFullTransactionId(ctx->relfrozenfxid),
-											   XidFromFullTransactionId(ctx->relfrozenfxid)));
-					return false;	/* corrupt */
-					break;
-				case XID_PRECEDES_CLUSTERMIN:
-					report_corruption(ctx,
-									  psprintf("old-style VACUUM FULL transaction ID %u precedes oldest valid transaction ID %u:%u",
-											   xvac,
-											   EpochFromFullTransactionId(ctx->oldest_fxid),
-											   XidFromFullTransactionId(ctx->oldest_fxid)));
-					return false;	/* corrupt */
-					break;
-				case XID_BOUNDS_OK:
-					switch (status)
-					{
-						case XID_IN_PROGRESS:
-							return true;	/* HEAPTUPLE_DELETE_IN_PROGRESS */
-						case XID_COMMITTED:
-						case XID_ABORTED:
-							return false;	/* HEAPTUPLE_DEAD */
-					}
-			}
-		}
-		else
-		{
-			XidCommitStatus status;
-
-			switch (get_xid_status(raw_xmin, ctx, &status))
-			{
-				case XID_INVALID:
-					report_corruption(ctx,
-									  pstrdup("raw xmin is invalid"));
-					return false;
-				case XID_IN_FUTURE:
-					report_corruption(ctx,
-									  psprintf("raw xmin %u equals or exceeds next valid transaction ID %u:%u",
-											   raw_xmin,
-											   EpochFromFullTransactionId(ctx->next_fxid),
-											   XidFromFullTransactionId(ctx->next_fxid)));
-					return false;	/* corrupt */
-				case XID_PRECEDES_RELMIN:
-					report_corruption(ctx,
-									  psprintf("raw xmin %u precedes relation freeze threshold %u:%u",
-											   raw_xmin,
-											   EpochFromFullTransactionId(ctx->relfrozenfxid),
-											   XidFromFullTransactionId(ctx->relfrozenfxid)));
-					return false;	/* corrupt */
-				case XID_PRECEDES_CLUSTERMIN:
-					report_corruption(ctx,
-									  psprintf("raw xmin %u precedes oldest valid transaction ID %u:%u",
-											   raw_xmin,
-											   EpochFromFullTransactionId(ctx->oldest_fxid),
-											   XidFromFullTransactionId(ctx->oldest_fxid)));
-					return false;	/* corrupt */
-				case XID_BOUNDS_OK:
-					switch (status)
-					{
-						case XID_COMMITTED:
-							break;
-						case XID_IN_PROGRESS:
-							return true;	/* insert or delete in progress */
-						case XID_ABORTED:
-							return false;	/* HEAPTUPLE_DEAD */
-					}
-			}
-		}
-	}
-
-	if (!(infomask & HEAP_XMAX_INVALID) && !HEAP_XMAX_IS_LOCKED_ONLY(infomask))
-	{
-		if (infomask & HEAP_XMAX_IS_MULTI)
-		{
-			XidCommitStatus status;
-			TransactionId xmax = HeapTupleGetUpdateXid(tuphdr);
-
-			switch (get_xid_status(xmax, ctx, &status))
-			{
-					/* not LOCKED_ONLY, so it has to have an xmax */
-				case XID_INVALID:
-					report_corruption(ctx,
-									  pstrdup("xmax is invalid"));
-					return false;	/* corrupt */
-				case XID_IN_FUTURE:
-					report_corruption(ctx,
-									  psprintf("xmax %u equals or exceeds next valid transaction ID %u:%u",
-											   xmax,
-											   EpochFromFullTransactionId(ctx->next_fxid),
-											   XidFromFullTransactionId(ctx->next_fxid)));
-					return false;	/* corrupt */
-				case XID_PRECEDES_RELMIN:
-					report_corruption(ctx,
-									  psprintf("xmax %u precedes relation freeze threshold %u:%u",
-											   xmax,
-											   EpochFromFullTransactionId(ctx->relfrozenfxid),
-											   XidFromFullTransactionId(ctx->relfrozenfxid)));
-					return false;	/* corrupt */
-				case XID_PRECEDES_CLUSTERMIN:
-					report_corruption(ctx,
-									  psprintf("xmax %u precedes oldest valid transaction ID %u:%u",
-											   xmax,
-											   EpochFromFullTransactionId(ctx->oldest_fxid),
-											   XidFromFullTransactionId(ctx->oldest_fxid)));
-					return false;	/* corrupt */
-				case XID_BOUNDS_OK:
-					switch (status)
-					{
-						case XID_IN_PROGRESS:
-							return true;	/* HEAPTUPLE_DELETE_IN_PROGRESS */
-						case XID_COMMITTED:
-						case XID_ABORTED:
-							return false;	/* HEAPTUPLE_RECENTLY_DEAD or
-											 * HEAPTUPLE_DEAD */
-					}
-			}
-
-			/* Ok, the tuple is live */
-		}
-		else if (!(infomask & HEAP_XMAX_COMMITTED))
-			return true;		/* HEAPTUPLE_DELETE_IN_PROGRESS or
-								 * HEAPTUPLE_LIVE */
-		else
-			return false;		/* HEAPTUPLE_RECENTLY_DEAD or HEAPTUPLE_DEAD */
-	}
-	return true;				/* not dead */
+	return heap_tuple_satisfies_corruption_checking(tuphdr);
 }
 
 /*
@@ -1461,3 +1310,138 @@ get_xid_status(TransactionId xid, HeapCheckContext *ctx,
 	ctx->cached_status = *status;
 	return XID_BOUNDS_OK;
 }
+
+/*
+ * heap_tuple_satisfies_corruption_checking
+ *
+ * Determine the visibility of tuples for corruption checking purposes.  If a
+ * tuple might not be visible to any running transaction, then we must not
+ * check it.  This function is based heavily on
+ * HeapTupleSatisfiesVacuumHorizon, with comments indicating what we think that
+ * function would return for the tuple.
+ *
+ * Callers should first check for tuple header corruption that might cause this
+ * function to error or assert prior to calling.  Changing this function to be
+ * more robust against errors is less desireable than hardening the code prior
+ * to this function, as we don't want this function to diverge from
+ * HeapTupleSatisfiesVacuumHorizon more than necessary.
+ *
+ * Reliance on hint bits is a bit dubious during corruption checking, as the
+ * hint bits in question may themselves be corrupt.  We rely on them only to
+ * the extent that other visibility functions do, as if other functions say a
+ * tuple is visible, then it had better not be corrupt, else regular code may
+ * hit the corruption.
+ *
+ * Returns whether the tuple may be checked.
+ */
+bool
+heap_tuple_satisfies_corruption_checking(HeapTupleHeader tuphdr)
+{
+	/*
+	 * Has inserting transaction committed?
+	 *
+	 * If the inserting transaction aborted, then the tuple was never visible
+	 * to any other transaction.
+	 */
+	if (!HeapTupleHeaderXminCommitted(tuphdr))
+	{
+		if (HeapTupleHeaderXminInvalid(tuphdr))
+			return false;		/* HEAPTUPLE_DEAD */
+		/* Used by pre-9.0 binary upgrades */
+		else if (tuphdr->t_infomask & HEAP_MOVED_OFF)
+		{
+			TransactionId xvac = HeapTupleHeaderGetXvac(tuphdr);
+
+			if (TransactionIdIsCurrentTransactionId(xvac))
+				return true;	/* HEAPTUPLE_DELETE_IN_PROGRESS */
+			if (TransactionIdIsInProgress(xvac))
+				return true;	/* HEAPTUPLE_DELETE_IN_PROGRESS */
+			if (TransactionIdDidCommit(xvac))
+				return false;	/* HEAPTUPLE_DEAD */
+		}
+		/* Used by pre-9.0 binary upgrades */
+		else if (tuphdr->t_infomask & HEAP_MOVED_IN)
+		{
+			TransactionId xvac = HeapTupleHeaderGetXvac(tuphdr);
+
+			if (TransactionIdIsCurrentTransactionId(xvac))
+				return false;	/* HEAPTUPLE_INSERT_IN_PROGRESS */
+			if (TransactionIdIsInProgress(xvac))
+				return false;	/* HEAPTUPLE_INSERT_IN_PROGRESS */
+			if (!TransactionIdDidCommit(xvac))
+				return false;	/* HEAPTUPLE_DEAD */
+		}
+		else if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmin(tuphdr)))
+		{
+			if (tuphdr->t_infomask & HEAP_XMAX_INVALID) /* xid invalid */
+				return false;	/* HEAPTUPLE_INSERT_IN_PROGRESS */
+			/* only locked? run infomask-only check first, for performance */
+			if (HEAP_XMAX_IS_LOCKED_ONLY(tuphdr->t_infomask) ||
+				HeapTupleHeaderIsOnlyLocked(tuphdr))
+				return false;	/* HEAPTUPLE_INSERT_IN_PROGRESS */
+			/* inserted and then deleted by same xact */
+			if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetUpdateXid(tuphdr)))
+				return true;	/* HEAPTUPLE_DELETE_IN_PROGRESS */
+			/* deleting subtransaction must have aborted */
+			return false;		/* HEAPTUPLE_INSERT_IN_PROGRESS */
+		}
+		else if (TransactionIdIsInProgress(HeapTupleHeaderGetRawXmin(tuphdr)))
+		{
+			return false;		/* HEAPTUPLE_INSERT_IN_PROGRESS */
+		}
+		else if (!TransactionIdDidCommit(HeapTupleHeaderGetRawXmin(tuphdr)))
+			return false;		/* HEAPTUPLE_DEAD */
+
+		/* At this point the xmin is known committed */
+	}
+
+	/*
+	 * Okay, the inserter committed, so it was good at some point.  Now what
+	 * about the deleting transaction?
+	 */
+	if (tuphdr->t_infomask & HEAP_XMAX_INVALID)
+		return true;			/* HEAPTUPLE_LIVE */
+
+	if (HEAP_XMAX_IS_LOCKED_ONLY(tuphdr->t_infomask))
+		return true;			/* HEAPTUPLE_LIVE */
+
+	if (tuphdr->t_infomask & HEAP_XMAX_IS_MULTI)
+	{
+		TransactionId xmax = HeapTupleGetUpdateXid(tuphdr);
+
+		/* already checked above */
+		Assert(!HEAP_XMAX_IS_LOCKED_ONLY(tuphdr->t_infomask));
+
+		/* not LOCKED_ONLY, so it has to have an xmax */
+		Assert(TransactionIdIsValid(xmax));
+
+		if (TransactionIdIsInProgress(xmax))
+			return true;		/* HEAPTUPLE_DELETE_IN_PROGRESS */
+		else if (TransactionIdDidCommit(xmax))
+		{
+			return false;		/* HEAPTUPLE_RECENTLY_DEAD */
+		}
+
+		return true;			/* HEAPTUPLE_LIVE */
+	}
+
+	if (!(tuphdr->t_infomask & HEAP_XMAX_COMMITTED))
+	{
+		if (TransactionIdIsInProgress(HeapTupleHeaderGetRawXmax(tuphdr)))
+			return true;		/* HEAPTUPLE_DELETE_IN_PROGRESS */
+		else if (!TransactionIdDidCommit(HeapTupleHeaderGetRawXmax(tuphdr)))
+
+			/*
+			 * Not in Progress, Not Committed, so either Aborted or crashed
+			 */
+			return true;		/* HEAPTUPLE_LIVE */
+
+		/* At this point the xmax is known committed */
+	}
+
+	/*
+	 * Deleter committed, and it may have been recent enough that some open
+	 * transactions could still see the tuple, but don't count on that.
+	 */
+	return false;				/* HEAPTUPLE_RECENTLY_DEAD */
+}
-- 
2.21.1 (Apple Git-122.3)

