On Mon, Sep 19, 2022 at 10:04 PM Aleksander Alekseev <
aleksan...@timescale.com> wrote:

> Hi Himanshu,
>
> > I have changed this in the attached patch.
>
> If it's not too much trouble could you please base your changes on v4
> that I submitted? I put some effort into writing a proper commit
> message, editing the comments, etc. The easiest way of doing this is
> using `git am` and `git format-patch`.
>
> Please find it attached.

-- 
Regards,
Himanshu Upadhyaya
EnterpriseDB: http://www.enterprisedb.com
From 3f3290ffb857117385f79f85aa599c588340924b Mon Sep 17 00:00:00 2001
From: Himanshu Upadhyaya <himanshu.upadhy...@enterprisedb.com>
Date: Tue, 20 Sep 2022 14:23:31 +0530
Subject: [PATCH v5] Implement HOT chain validation in verify_heapam()

Himanshu Upadhyaya, reviewed by Robert Haas, Aleksander Alekseev

Discussion: https://postgr.es/m/CAPF61jBBR2-iE-EmN_9v0hcQEfyz_17e5Lbb0%2Bu2%3D9ukA9sWmQ%40mail.gmail.com
---
 contrib/amcheck/verify_heapam.c | 214 ++++++++++++++++++++++++++++++++
 1 file changed, 214 insertions(+)

diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index c875f3e5a2..e90e1ef2f3 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -399,6 +399,9 @@ verify_heapam(PG_FUNCTION_ARGS)
 	for (ctx.blkno = first_block; ctx.blkno <= last_block; ctx.blkno++)
 	{
 		OffsetNumber maxoff;
+		OffsetNumber predecessor[MaxOffsetNumber] = {0};
+		OffsetNumber successor[MaxOffsetNumber] = {0};
+		bool		lp_valid[MaxOffsetNumber] = {false};
 
 		CHECK_FOR_INTERRUPTS();
 
@@ -433,6 +436,8 @@ verify_heapam(PG_FUNCTION_ARGS)
 		for (ctx.offnum = FirstOffsetNumber; ctx.offnum <= maxoff;
 			 ctx.offnum = OffsetNumberNext(ctx.offnum))
 		{
+			OffsetNumber nextoffnum;
+
 			ctx.itemid = PageGetItemId(ctx.page, ctx.offnum);
 
 			/* Skip over unused/dead line pointers */
@@ -469,6 +474,12 @@ verify_heapam(PG_FUNCTION_ARGS)
 					report_corruption(&ctx,
 									  psprintf("line pointer redirection to unused item at offset %u",
 											   (unsigned) rdoffnum));
+
+				/*
+				 * make entry in successor array, redirected tuple will be
+				 * validated at the time when we loop over successor array
+				 */
+				successor[ctx.offnum] = rdoffnum;
 				continue;
 			}
 
@@ -504,9 +515,205 @@ verify_heapam(PG_FUNCTION_ARGS)
 			/* It should be safe to examine the tuple's header, at least */
 			ctx.tuphdr = (HeapTupleHeader) PageGetItem(ctx.page, ctx.itemid);
 			ctx.natts = HeapTupleHeaderGetNatts(ctx.tuphdr);
+			lp_valid[ctx.offnum] = true;
 
 			/* Ok, ready to check this next tuple */
 			check_tuple(&ctx);
+
+			/*
+			 * Add the data to the successor array. It will be used later to
+			 * generate the predecessor array.
+			 *
+			 * We need to access the tuple's header to populate the
+			 * predecessor array. However the tuple is not necessarily sanity
+			 * checked yet so delaying construction of predecessor array until
+			 * all tuples are sanity checked.
+			 */
+			nextoffnum = ItemPointerGetOffsetNumber(&(ctx.tuphdr)->t_ctid);
+			if (ItemPointerGetBlockNumber(&(ctx.tuphdr)->t_ctid) == ctx.blkno &&
+				nextoffnum != ctx.offnum)
+			{
+				successor[ctx.offnum] = nextoffnum;
+			}
+		}
+
+		/*
+		 * Loop over offset and populate predecessor array from all entries
+		 * that are present in successor array.
+		 */
+		ctx.attnum = -1;
+		for (ctx.offnum = FirstOffsetNumber; ctx.offnum <= maxoff;
+			 ctx.offnum = OffsetNumberNext(ctx.offnum))
+		{
+			ItemId		curr_lp;
+			ItemId		next_lp;
+			HeapTupleHeader curr_htup;
+			HeapTupleHeader next_htup;
+			TransactionId curr_xmax;
+			TransactionId next_xmin;
+
+			OffsetNumber nextoffnum = successor[ctx.offnum];
+
+			curr_lp = PageGetItemId(ctx.page, ctx.offnum);
+			if (nextoffnum == 0)
+			{
+				/*
+				 * This is either the last updated tuple in the chain or a
+				 * corruption raised for this tuple.
+				 */
+				continue;
+			}
+			if (ItemIdIsRedirected(curr_lp) && lp_valid[nextoffnum])
+			{
+				next_lp = PageGetItemId(ctx.page, nextoffnum);
+				next_htup = (HeapTupleHeader) PageGetItem(ctx.page, next_lp);;
+				if (!HeapTupleHeaderIsHeapOnly(next_htup))
+				{
+					report_corruption(&ctx,
+									  psprintf("redirected tuple at line pointer offset %u is not heap only tuple",
+											   (unsigned) nextoffnum));
+				}
+				if ((next_htup->t_infomask & HEAP_UPDATED) == 0)
+				{
+					report_corruption(&ctx,
+									  psprintf("redirected tuple at line pointer offset %u is not heap updated tuple",
+											   (unsigned) nextoffnum));
+				}
+				continue;
+			}
+
+			/*
+			 * Add a line pointer offset to the predecessor array. 1) If xmax
+			 * is matching with xmin of next tuple (reaching via its t_ctid).
+			 * 2) If the next tuple is in the same page. Raise corruption if
+			 * we have two tuples having the same predecessor.
+			 *
+			 * We add the offset to the predecessor array irrespective of the
+			 * transaction (t_xmin) status. We will do validation related to
+			 * the transaction status (and also all other validations) when we
+			 * loop over the predecessor array.
+			 */
+			if (lp_valid[nextoffnum] && lp_valid[ctx.offnum])
+			{
+				curr_htup = (HeapTupleHeader) PageGetItem(ctx.page, curr_lp);
+				curr_xmax = HeapTupleHeaderGetUpdateXid(curr_htup);
+
+				next_lp = PageGetItemId(ctx.page, nextoffnum);
+				next_htup = (HeapTupleHeader) PageGetItem(ctx.page, next_lp);
+				next_xmin = HeapTupleHeaderGetXmin(next_htup);
+				if (TransactionIdIsValid(curr_xmax) &&
+					TransactionIdEquals(curr_xmax, next_xmin))
+				{
+					if (predecessor[nextoffnum] != 0)
+					{
+						report_corruption(&ctx,
+										  psprintf("updated version at offset %u is also the updated version of tuple at offset %u",
+												   (unsigned) nextoffnum, (unsigned) predecessor[nextoffnum]));
+						continue;
+					}
+					predecessor[nextoffnum] = ctx.offnum;
+				}
+				/* Non matching xmax with xmin is not a corruption */
+			}
+
+		}
+
+		/* Loop over offsets and validate the data in the predecessor array. */
+		for (OffsetNumber currentoffnum = FirstOffsetNumber; currentoffnum <= maxoff;
+			 currentoffnum = OffsetNumberNext(currentoffnum))
+		{
+			HeapTupleHeader pred_htup;
+			HeapTupleHeader curr_htup;
+			TransactionId pred_xmin;
+			TransactionId curr_xmin;
+			TransactionId curr_xmax;
+			CommandId	pred_cmin;
+			CommandId	curr_cmin;
+			ItemId		pred_lp;
+			ItemId		curr_lp;
+
+			ctx.offnum = predecessor[currentoffnum];
+			ctx.attnum = -1;
+
+			if (ctx.offnum == 0)
+			{
+				/*
+				 * Either the root of the chain or an xmin-aborted tuple from
+				 * an abandoned portion of the HOT chain.
+				 */
+				continue;
+			}
+
+			/*
+			 * Validation via the predecessor array. 1) If the predecessor's
+			 * xmin is aborted or in progress, the current tuples xmin should
+			 * be aborted or in progress respectively. Also both xmin's must
+			 * be equal. 2) If the predecessor's xmin is not frozen, then
+			 * current tuple's shouldn't be either. 3) If the predecessor's
+			 * xmin is equal to the current tuple's xmin, the current tuple's
+			 * cmin should be greater than the predecessor's cmin. 4) If the
+			 * current tuple is not HOT then its predecessor's tuple must not
+			 * be HEAP_HOT_UPDATED. 5) If the current tuple is HOT then its
+			 * predecessor's tuple must be HEAP_HOT_UPDATED.
+			 */
+			curr_lp = PageGetItemId(ctx.page, currentoffnum);
+			curr_htup = (HeapTupleHeader) PageGetItem(ctx.page, curr_lp);
+			curr_xmin = HeapTupleHeaderGetXmin(curr_htup);
+			curr_xmax = HeapTupleHeaderGetUpdateXid(curr_htup);
+			curr_cmin = HeapTupleHeaderGetRawCommandId(curr_htup);
+
+			ctx.itemid = pred_lp = PageGetItemId(ctx.page, ctx.offnum);
+			pred_htup = (HeapTupleHeader) PageGetItem(ctx.page, pred_lp);
+			pred_xmin = HeapTupleHeaderGetXmin(pred_htup);
+			pred_cmin = HeapTupleHeaderGetRawCommandId(pred_htup);
+
+			if (!TransactionIdEquals(pred_xmin, curr_xmin) &&
+				!TransactionIdDidCommit(pred_xmin))
+			{
+				report_corruption(&ctx,
+								  psprintf("tuple with uncommitted xmin %u was updated to produce a tuple at offset %u with differing xmin %u",
+										   (unsigned) pred_xmin, (unsigned) currentoffnum, (unsigned) curr_xmin));
+			}
+			if (pred_xmin != FrozenTransactionId && curr_xmin == FrozenTransactionId)
+			{
+				report_corruption(&ctx,
+								  psprintf("unfrozen tuple was updated to produce a tuple at offset %u which is not frozen",
+										   (unsigned) currentoffnum));
+			}
+
+			/*
+			 * Not a corruption if current tuple is updated/deleted by a
+			 * different transaction, means t_cid will point to cmax (that is
+			 * command id of deleting transaction) and cid of predecessor not
+			 * necessarily will be smaller than cid of current tuple. t_cid
+			 * can hold combo command id but we are not worrying here since
+			 * combo command id of the next updated tuple (if present) must be
+			 * greater than combo command id of the current tuple. So here we
+			 * are not checking HEAP_COMBOCID flag and simply doing t_cid
+			 * comparison.
+			 */
+			if (TransactionIdEquals(pred_xmin, curr_xmin) &&
+				(TransactionIdEquals(curr_xmin, curr_xmax) ||
+				 !TransactionIdIsValid(curr_xmax)) && pred_cmin >= curr_cmin)
+			{
+				report_corruption(&ctx,
+								  psprintf("tuple with xmin %u has cid %u was updated to produce a tuple at offset %u with same xmin but has cid %u",
+										   (unsigned) pred_xmin, (unsigned) pred_cmin, (unsigned) currentoffnum, (unsigned) curr_cmin));
+			}
+			if (HeapTupleHeaderIsHeapOnly(curr_htup) &&
+				!HeapTupleHeaderIsHotUpdated(pred_htup))
+			{
+				report_corruption(&ctx,
+								  psprintf("non-heap-only update produced a heap-only tuple at offset %u",
+										   (unsigned) currentoffnum));
+			}
+			if (!HeapTupleHeaderIsHeapOnly(curr_htup) &&
+				HeapTupleHeaderIsHotUpdated(pred_htup))
+			{
+				report_corruption(&ctx,
+								  psprintf("heap-only update produced a non-heap only tuple at offset %u",
+										   (unsigned) currentoffnum));
+			}
 		}
 
 		/* clean up */
@@ -640,6 +847,7 @@ check_tuple_header(HeapCheckContext *ctx)
 {
 	HeapTupleHeader tuphdr = ctx->tuphdr;
 	uint16		infomask = tuphdr->t_infomask;
+	TransactionId curr_xmax = HeapTupleHeaderGetUpdateXid(tuphdr);
 	bool		result = true;
 	unsigned	expected_hoff;
 
@@ -651,6 +859,12 @@ check_tuple_header(HeapCheckContext *ctx)
 		result = false;
 	}
 
+	if (!TransactionIdIsValid(curr_xmax) && HeapTupleHeaderIsHotUpdated(tuphdr))
+	{
+		report_corruption(ctx,
+						  psprintf("tuple has been updated, but xmax is 0"));
+		result = false;
+	}
 	if ((ctx->tuphdr->t_infomask & HEAP_XMAX_COMMITTED) &&
 		(ctx->tuphdr->t_infomask & HEAP_XMAX_IS_MULTI))
 	{
-- 
2.25.1

Reply via email to