On Wed, Feb 8, 2023 at 11:17 PM Robert Haas <robertmh...@gmail.com> wrote:

> On Sun, Feb 5, 2023 at 3:57 AM Himanshu Upadhyaya
> <upadhyaya.himan...@gmail.com> wrote:
> > Thanks, yes it's working fine with Prepared Transaction.
> > Please find attached the v9 patch incorporating all the review comments.
>
> I don't know quite how we're still going around in circles about this,
> but this code makes no sense to me at all:
>
>             /*
>              * Add data to the predecessor array even if the current or
>              * successor's LP is not valid. We will not process/validate
> these
>              * offset entries while looping over the predecessor array but
>              * having all entries in the predecessor array will help in
>              * identifying(and validating) the Root of a chain.
>              */
>             if (!lp_valid[ctx.offnum] || !lp_valid[nextoffnum])
>             {
>                 predecessor[nextoffnum] = ctx.offnum;
>                 continue;
>             }
>
> If the current offset number is not for a valid line pointer, then it
> makes no sense to talk about the successor. An invalid redirected line
> pointer is one that points off the end of the line pointer array, or
> to before the beginning of the line pointer array, or to a line
> pointer that is unused. An invalid line pointer that is LP_USED is one
> which points to a location outside the page, or to a location inside
> the page. In none of these cases does it make any sense to talk about
> the next tuple. If the line pointer isn't valid, it's pointing to some
> invalid location where there cannot possibly be a tuple. In other
> words, if lp_valid[ctx.offnum] is false, then nextoffnum is a garbage
> value, and therefore referencing predecessor[nextoffnum] is useless
> and dangerous.
>
> If the next offset number is not for a valid line pointer, we could in
> theory still assign to the predecessor array, as you propose here. In
> that case, the tuple or line pointer at ctx.offnum is pointing to the
> line pointer at nextoffnum and that is all fine. But what is the
> point? The comment claims that the point is that it will help us
> identify and validate the root of the hot chain. But if the line
> pointer at nextoffnum is not valid, it can't be the root of a hot
> chain. When we're talking about the root of a HOT chain, we're
> speaking about a tuple. If lp_valid[nextoffnum] is false, there is no
> tuple. Instead of pointing to a tuple, that line pointer is pointing
> to garbage.
>
>
Initially while implementing logic to identify the root of the HOT chain
I was getting crash and regression failure's that time I thought of having
this check along with a few other changes that were required,
but you are right, it's unnecessary to add data to the predecessor
array(in this case) and is not required. I am removing this from the patch.

-- 
Regards,
Himanshu Upadhyaya
EnterpriseDB: http://www.enterprisedb.com
From 19c93cce0189150d6bfe68237eb3d5a414a18ad9 Mon Sep 17 00:00:00 2001
From: Himanshu Upadhyaya <himanshu.upadhy...@enterprisedb.com>
Date: Thu, 9 Feb 2023 22:00:25 +0530
Subject: [PATCH v10] Implement HOT chain validation in verify_heapam()

Himanshu Upadhyaya, reviewed by Robert Haas, Aleksander Alekseev, Andres Freund
---
 contrib/amcheck/verify_heapam.c           | 303 +++++++++++++++++++++-
 src/bin/pg_amcheck/t/004_verify_heapam.pl | 241 ++++++++++++++++-
 2 files changed, 527 insertions(+), 17 deletions(-)

diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index 4fcfd6df72..7fc984dd33 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -150,7 +150,7 @@ typedef struct HeapCheckContext
 } HeapCheckContext;
 
 /* Internal implementation */
-static void check_tuple(HeapCheckContext *ctx);
+static void check_tuple(HeapCheckContext *ctx, bool *lp_valid);
 static void check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx,
 							  ToastedAttribute *ta, int32 *expected_chunk_seq,
 							  uint32 extsize);
@@ -160,7 +160,7 @@ static void check_toasted_attribute(HeapCheckContext *ctx,
 									ToastedAttribute *ta);
 
 static bool check_tuple_header(HeapCheckContext *ctx);
-static bool check_tuple_visibility(HeapCheckContext *ctx);
+static bool check_tuple_visibility(HeapCheckContext *ctx, bool *lp_valid);
 
 static void report_corruption(HeapCheckContext *ctx, char *msg);
 static void report_toast_corruption(HeapCheckContext *ctx,
@@ -399,9 +399,14 @@ verify_heapam(PG_FUNCTION_ARGS)
 	for (ctx.blkno = first_block; ctx.blkno <= last_block; ctx.blkno++)
 	{
 		OffsetNumber maxoff;
+		OffsetNumber predecessor[MaxOffsetNumber];
+		OffsetNumber successor[MaxOffsetNumber];
+		bool		lp_valid[MaxOffsetNumber];
 
 		CHECK_FOR_INTERRUPTS();
 
+		memset(predecessor, 0, sizeof(OffsetNumber) * MaxOffsetNumber);
+
 		/* Optionally skip over all-frozen or all-visible blocks */
 		if (skip_option != SKIP_PAGES_NONE)
 		{
@@ -433,6 +438,10 @@ verify_heapam(PG_FUNCTION_ARGS)
 		for (ctx.offnum = FirstOffsetNumber; ctx.offnum <= maxoff;
 			 ctx.offnum = OffsetNumberNext(ctx.offnum))
 		{
+			OffsetNumber nextoffnum;
+
+			successor[ctx.offnum] = InvalidOffsetNumber;
+			lp_valid[ctx.offnum] = false;
 			ctx.itemid = PageGetItemId(ctx.page, ctx.offnum);
 
 			/* Skip over unused/dead line pointers */
@@ -469,6 +478,13 @@ verify_heapam(PG_FUNCTION_ARGS)
 					report_corruption(&ctx,
 									  psprintf("line pointer redirection to unused item at offset %u",
 											   (unsigned) rdoffnum));
+
+				/*
+				 * Make entry in successor array, redirected lp will be
+				 * validated at the time when we loop over successor array.
+				 */
+				successor[ctx.offnum] = rdoffnum;
+				lp_valid[ctx.offnum] = true;
 				continue;
 			}
 
@@ -506,7 +522,265 @@ verify_heapam(PG_FUNCTION_ARGS)
 			ctx.natts = HeapTupleHeaderGetNatts(ctx.tuphdr);
 
 			/* Ok, ready to check this next tuple */
-			check_tuple(&ctx);
+			check_tuple(&ctx, &(lp_valid[ctx.offnum]));
+
+			/*
+			 * Add the data to the successor array if next updated tuple is in
+			 * the same page. It will be used later to generate the
+			 * predecessor array.
+			 *
+			 * We need to access the tuple's header to populate the
+			 * predecessor array. However the tuple is not necessarily sanity
+			 * checked yet so delaying construction of predecessor array until
+			 * all tuples are sanity checked.
+			 */
+			nextoffnum = ItemPointerGetOffsetNumber(&(ctx.tuphdr)->t_ctid);
+			if (ItemPointerGetBlockNumber(&(ctx.tuphdr)->t_ctid) == ctx.blkno &&
+				nextoffnum != ctx.offnum)
+			{
+				successor[ctx.offnum] = nextoffnum;
+			}
+		}
+
+		/*
+		 * Loop over offset and populate predecessor array from all entries
+		 * that are present in successor array.
+		 */
+		ctx.attnum = -1;
+		for (ctx.offnum = FirstOffsetNumber; ctx.offnum <= maxoff;
+			 ctx.offnum = OffsetNumberNext(ctx.offnum))
+		{
+			ItemId		curr_lp;
+			ItemId		next_lp;
+			HeapTupleHeader curr_htup;
+			HeapTupleHeader next_htup;
+			TransactionId curr_xmax;
+			TransactionId next_xmin;
+			OffsetNumber nextoffnum;
+
+			/*
+			 * if current lp is not valid then no need to look for its
+			 * successor.
+			 */
+			if (!lp_valid[ctx.offnum])
+			{
+				continue;
+			}
+
+			nextoffnum = successor[ctx.offnum];
+			if (nextoffnum == InvalidOffsetNumber || !lp_valid[nextoffnum])
+			{
+				/*
+				 * This is either the last updated tuple in the chain or a
+				 * corrupted Tuple/lp or unused/dead line pointer.
+				 */
+				continue;
+			}
+
+			curr_lp = PageGetItemId(ctx.page, ctx.offnum);
+			if (ItemIdIsRedirected(curr_lp))
+			{
+				next_lp = PageGetItemId(ctx.page, nextoffnum);
+				if (ItemIdIsRedirected(next_lp))
+				{
+					report_corruption(&ctx,
+									  psprintf("redirected line pointer points to another redirected line pointer at offset %u",
+											   (unsigned) nextoffnum));
+					continue;
+				}
+				next_htup = (HeapTupleHeader) PageGetItem(ctx.page, next_lp);
+				if (!HeapTupleHeaderIsHeapOnly(next_htup))
+				{
+					report_corruption(&ctx,
+									  psprintf("redirected line pointer points to a non-heap-only tuple at offset %u",
+											   (unsigned) nextoffnum));
+				}
+				if ((next_htup->t_infomask & HEAP_UPDATED) == 0)
+				{
+					report_corruption(&ctx,
+									  psprintf("redirected line pointer points to a non-heap-updated tuple at offset %u",
+											   (unsigned) nextoffnum));
+				}
+
+				/*
+				 * Add data related to redirected offset to predecessor array
+				 * so that we can differentiate between all the cases of
+				 * missing offset in predecessor array, this will help in
+				 * validating the root of chain when we loop over predecessor
+				 * array.
+				 */
+				predecessor[nextoffnum] = ctx.offnum;
+				continue;
+			}
+
+			/*
+			 * Add a line pointer offset to the predecessor array if xmax is
+			 * matching with xmin of next tuple (reaching via its t_ctid).
+			 * Raise corruption if we have two tuples having the same
+			 * predecessor.
+			 *
+			 * We add the offset to the predecessor array irrespective of the
+			 * transaction (t_xmin) status. We will do validation related to
+			 * the transaction status (and also all other validations) when we
+			 * loop over the predecessor array.
+			 */
+			curr_htup = (HeapTupleHeader) PageGetItem(ctx.page, curr_lp);
+			curr_xmax = HeapTupleHeaderGetUpdateXid(curr_htup);
+
+			next_lp = PageGetItemId(ctx.page, nextoffnum);
+			next_htup = (HeapTupleHeader) PageGetItem(ctx.page, next_lp);
+			next_xmin = HeapTupleHeaderGetXmin(next_htup);
+			if (TransactionIdIsValid(curr_xmax) &&
+				TransactionIdEquals(curr_xmax, next_xmin))
+			{
+				if (predecessor[nextoffnum] != InvalidOffsetNumber)
+				{
+					report_corruption(&ctx,
+									  psprintf("updated version at offset %u is also the updated version of tuple at offset %u",
+											   (unsigned) nextoffnum, (unsigned) predecessor[nextoffnum]));
+					continue;
+				}
+			}
+			/* Non matching xmax with xmin is not a corruption */
+			predecessor[nextoffnum] = ctx.offnum;
+
+		}
+
+		/* Loop over offsets and validate the data in the predecessor array. */
+		for (OffsetNumber currentoffnum = FirstOffsetNumber; currentoffnum <= maxoff;
+			 currentoffnum = OffsetNumberNext(currentoffnum))
+		{
+			HeapTupleHeader pred_htup;
+			HeapTupleHeader curr_htup;
+			TransactionId pred_xmin;
+			TransactionId curr_xmin;
+			TransactionId pred_xmax;
+			ItemId		pred_lp;
+			ItemId		curr_lp;
+			bool		pred_in_progress;
+
+			/* Don't process if current offset is not valid */
+			if (!lp_valid[currentoffnum])
+				continue;
+
+			curr_lp = PageGetItemId(ctx.page, currentoffnum);
+			if (ItemIdIsRedirected(curr_lp))
+				continue;
+
+			curr_htup = (HeapTupleHeader) PageGetItem(ctx.page, curr_lp);
+			curr_xmin = HeapTupleHeaderGetXmin(curr_htup);
+			ctx.offnum = predecessor[currentoffnum];
+			ctx.attnum = -1;
+
+			if (ctx.offnum == InvalidOffsetNumber)
+			{
+				/*
+				 * No harm in overriding value of ctx.offnum as we will always
+				 * continue if we are here.
+				 */
+				ctx.offnum = currentoffnum;
+				if (TransactionIdIsInProgress(curr_xmin) ||
+					TransactionIdDidCommit(curr_xmin))
+				{
+					/* Root of chain must not be HEAP_ONLY_TUPLE. */
+					if (HeapTupleHeaderIsHeapOnly(curr_htup))
+					{
+						report_corruption(&ctx,
+										  psprintf("tuple is root of chain but is marked as heap-only tuple"));
+					}
+					continue;
+				}
+				else
+				{
+					/*
+					 * xmin aborted tuple from an abandoned portion of the
+					 * chain.
+					 */
+					continue;
+				}
+			}
+
+			ctx.itemid = pred_lp = PageGetItemId(ctx.page, ctx.offnum);
+
+			/*
+			 * Redirected LP were validated previously, so don't need any
+			 * validation.
+			 */
+			if (ItemIdIsRedirected(pred_lp))
+				continue;
+
+			pred_htup = (HeapTupleHeader) PageGetItem(ctx.page, pred_lp);
+			pred_xmin = HeapTupleHeaderGetXmin(pred_htup);
+			pred_xmax = HeapTupleHeaderGetUpdateXid(pred_htup);
+
+			/*
+			 * No need to validate if XMIN and XMAX are not matching because
+			 * they can be from different chains in case of a mismatch.
+			 */
+			if (!(TransactionIdIsValid(pred_xmax) &&
+				  TransactionIdEquals(pred_xmax, curr_xmin)))
+			{
+				continue;
+			}
+
+			/*
+			 * If the predecessor's xmin is in progress then current tuple's
+			 * xmin should either be aborted (in case of subtransaction) or
+			 * in-progress.
+			 */
+			pred_in_progress = TransactionIdIsInProgress(pred_xmin);
+			if (pred_in_progress && TransactionIdDidCommit(curr_xmin))
+			{
+				/* Re-check to avoid race condition */
+				if (TransactionIdIsInProgress(pred_xmin))
+				{
+					report_corruption(&ctx,
+									  psprintf("tuple with in-progress xmin %u was updated to produce a tuple at offset %u with committed xmin %u",
+											   (unsigned) pred_xmin, (unsigned) currentoffnum, (unsigned) curr_xmin));
+				}
+				else
+				{
+					pred_in_progress = false;
+				}
+			}
+
+			/* raise corruption if pred_xmin is aborted but curr_xmin is not */
+			if (!pred_in_progress && !TransactionIdDidCommit(pred_xmin))
+			{
+				if (TransactionIdIsInProgress(curr_xmin))
+					report_corruption(&ctx,
+									  psprintf("tuple with aborted xmin %u was updated to produce a tuple at offset %u with in-progress xmin %u",
+											   (unsigned) pred_xmin, (unsigned) currentoffnum, (unsigned) curr_xmin));
+				else if (TransactionIdDidCommit(curr_xmin))
+					report_corruption(&ctx,
+									  psprintf("tuple with aborted xmin %u was updated to produce a tuple at offset %u with committed xmin %u",
+											   (unsigned) pred_xmin, (unsigned) currentoffnum, (unsigned) curr_xmin));
+
+			}
+
+			/*
+			 * If the current tuple is HOT then it's predecessor's tuple must
+			 * be HEAP_HOT_UPDATED.
+			 */
+			if (!HeapTupleHeaderIsHotUpdated(pred_htup) &&
+				HeapTupleHeaderIsHeapOnly(curr_htup))
+			{
+				report_corruption(&ctx,
+								  psprintf("non-heap-only update produced a heap-only tuple at offset %u",
+										   (unsigned) currentoffnum));
+			}
+
+			/*
+			 * If the current tuple is not HOT then its predecessor's tuple
+			 * must not be HEAP_HOT_UPDATED.
+			 */
+			if (HeapTupleHeaderIsHotUpdated(pred_htup) &&
+				!HeapTupleHeaderIsHeapOnly(curr_htup))
+			{
+				report_corruption(&ctx,
+								  psprintf("heap-only update produced a non-heap only tuple at offset %u",
+										   (unsigned) currentoffnum));
+			}
 		}
 
 		/* clean up */
@@ -638,6 +912,7 @@ check_tuple_header(HeapCheckContext *ctx)
 {
 	HeapTupleHeader tuphdr = ctx->tuphdr;
 	uint16		infomask = tuphdr->t_infomask;
+	TransactionId curr_xmax = HeapTupleHeaderGetUpdateXid(tuphdr);
 	bool		result = true;
 	unsigned	expected_hoff;
 
@@ -649,6 +924,12 @@ check_tuple_header(HeapCheckContext *ctx)
 		result = false;
 	}
 
+	if (!TransactionIdIsValid(curr_xmax) && HeapTupleHeaderIsHotUpdated(tuphdr))
+	{
+		report_corruption(ctx,
+						  psprintf("tuple has been HOT updated, but xmax is 0"));
+		result = false;
+	}
 	if ((ctx->tuphdr->t_infomask & HEAP_XMAX_COMMITTED) &&
 		(ctx->tuphdr->t_infomask & HEAP_XMAX_IS_MULTI))
 	{
@@ -720,7 +1001,7 @@ check_tuple_header(HeapCheckContext *ctx)
  * TOAST tuples -- are eligible for pruning.
  */
 static bool
-check_tuple_visibility(HeapCheckContext *ctx)
+check_tuple_visibility(HeapCheckContext *ctx, bool *lp_valid)
 {
 	TransactionId xmin;
 	TransactionId xvac;
@@ -917,7 +1198,11 @@ check_tuple_visibility(HeapCheckContext *ctx)
 			 * any such DDL changes ought to be visible to us, so perhaps we
 			 * could check anyway in that case. But, for now, let's be
 			 * conservative and treat this like any other uncommitted insert.
+			 *
+			 * Set lp_valid to true as we need to validate HOT chain with
+			 * regards to in-progress transaction.
 			 */
+			*lp_valid = true;
 			return false;
 		}
 	}
@@ -1517,7 +1802,7 @@ check_toasted_attribute(HeapCheckContext *ctx, ToastedAttribute *ta)
  * ctx->tupstore.
  */
 static void
-check_tuple(HeapCheckContext *ctx)
+check_tuple(HeapCheckContext *ctx, bool *lp_valid)
 {
 	/*
 	 * Check various forms of tuple header corruption, and if the header is
@@ -1531,9 +1816,15 @@ check_tuple(HeapCheckContext *ctx)
 	 * cannot assume our relation description matches the tuple structure, and
 	 * therefore cannot check it.
 	 */
-	if (!check_tuple_visibility(ctx))
+	if (!check_tuple_visibility(ctx, lp_valid))
 		return;
 
+	/*
+	 * We can now mark lp_valid as true because we should validated HOT chains
+	 * even if there are corruptions related to attributes.
+	 */
+	*lp_valid = true;
+
 	/*
 	 * The tuple is visible, so it must be compatible with the current version
 	 * of the relation descriptor. It might have fewer columns than are
diff --git a/src/bin/pg_amcheck/t/004_verify_heapam.pl b/src/bin/pg_amcheck/t/004_verify_heapam.pl
index 215c30eaa8..f5b4092343 100644
--- a/src/bin/pg_amcheck/t/004_verify_heapam.pl
+++ b/src/bin/pg_amcheck/t/004_verify_heapam.pl
@@ -174,12 +174,16 @@ sub write_tuple
 # Set umask so test directories and files are created with default permissions
 umask(0077);
 
+my $pred_xmax;
+my $pred_posid;
+my $aborted_xid;
 # Set up the node.  Once we create and corrupt the table,
 # autovacuum workers visiting the table could crash the backend.
 # Disable autovacuum so that won't happen.
 my $node = PostgreSQL::Test::Cluster->new('test');
 $node->init;
 $node->append_conf('postgresql.conf', 'autovacuum=off');
+$node->append_conf('postgresql.conf','max_prepared_transactions=100');
 
 # Start the node and load the extensions.  We depend on both
 # amcheck and pageinspect for this test.
@@ -217,7 +221,9 @@ my $rel = $node->safe_psql('postgres',
 my $relpath = "$pgdata/$rel";
 
 # Insert data and freeze public.test
-use constant ROWCOUNT => 16;
+use constant ROWCOUNT => 37 ; # Total row count in this page.
+use constant ROWCOUNT_HOTCHAIN => 21; # Row count related to test of HOT chains validations and redirected LP.
+# First insert data needed for non-HOT chain validation.
 $node->safe_psql(
 	'postgres', qq(
 	INSERT INTO public.test (a, b, c)
@@ -227,7 +233,54 @@ $node->safe_psql(
 			repeat('w', 10000)
 		);
 	VACUUM FREEZE public.test
-	)) for (1 .. ROWCOUNT);
+	)) for (1 .. ROWCOUNT-ROWCOUNT_HOTCHAIN);
+
+# Data for Redirected LP.
+$node->safe_psql(
+	'postgres', qq(
+		INSERT INTO public.test (a, b, c)
+			VALUES ( x'DEADF9F9DEADF9F9'::bigint, 'abcdefg', generate_series(1,2));
+		UPDATE public.test SET c = 'a' WHERE c = '1';
+		UPDATE public.test SET c = 'a' WHERE c = '2';
+		VACUUM FREEZE public.test;
+	));
+
+# Data for HOT chains validation, so not calling VACUUM FREEZE.
+$node->safe_psql(
+	'postgres', qq(
+		INSERT INTO public.test (a, b, c)
+			VALUES ( x'DEADF9F9DEADF9F9'::bigint, 'abcdefg', generate_series(3,11));
+		UPDATE public.test SET c = 'a' WHERE c = '3';
+		UPDATE public.test SET c = 'a' WHERE c = '6';
+		UPDATE public.test SET c = 'a' WHERE c = '7';
+		UPDATE public.test SET c = 'a' WHERE c = '8';
+		UPDATE public.test SET c = 'a' WHERE c = '9';
+		UPDATE public.test SET c = 'a' WHERE c = '10';
+		UPDATE public.test SET c = 'a' WHERE c = '11';
+	));
+
+# Need one aborted transaction to test corruption in HOT chain.
+$node->safe_psql(
+	'postgres', qq(
+		BEGIN;
+			UPDATE public.test SET c = 'a' WHERE c = '5';
+		ABORT;
+	));
+
+# Need one in-progress transaction to test few corruption in HOT chain.
+# We are creating PREPARE TRANSACTION here as these will not be aborted
+# even if we stop the node.
+$node->safe_psql(
+	'postgres', qq(
+		BEGIN;
+			PREPARE TRANSACTION 'in_progress_tx';
+	));
+my $in_progress_xid = $node->safe_psql(
+				'postgres', qq(
+					SELECT transaction FROM pg_prepared_xacts;
+				));
+
+
 
 my $relfrozenxid = $node->safe_psql('postgres',
 	q(select relfrozenxid from pg_class where relname = 'test'));
@@ -249,12 +302,21 @@ if ($datfrozenxid <= 3 || $datfrozenxid >= $relfrozenxid)
 my @lp_off;
 for my $tup (0 .. ROWCOUNT - 1)
 {
-	push(
-		@lp_off,
-		$node->safe_psql(
-			'postgres', qq(
-select lp_off from heap_page_items(get_raw_page('test', 'main', 0))
-	offset $tup limit 1)));
+	my $islpredirected = $node->safe_psql('postgres',
+		qq(select lp_flags from heap_page_items(get_raw_page('test', 'main', 0)) offset $tup limit 1));
+	if ($islpredirected != 2)
+	{
+		push(
+			@lp_off,
+			$node->safe_psql(
+				'postgres', qq(
+			select lp_off from heap_page_items(get_raw_page('test', 'main', 0))
+				offset $tup limit 1)));
+	}
+	else
+	{
+		push(@lp_off, (-1));
+	}
 }
 
 # Sanity check that our 'test' table on disk layout matches expectations.  If
@@ -271,6 +333,10 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 {
 	my $offnum = $tupidx + 1;        # offnum is 1-based, not zero-based
 	my $offset = $lp_off[$tupidx];
+	if ($offset == -1)
+	{
+		next;
+	}
 	my $tup = read_tuple($file, $offset);
 
 	# Sanity-check that the data appears on the page where we expect.
@@ -283,7 +349,7 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 		$node->clean_node;
 		plan skip_all =>
 		  sprintf(
-			"Page layout differs from our expectations: expected (%x, %x, \"%s\"), got (%x, %x, \"%s\")",
+			"Page layout of index %d differs from our expectations: expected (%x, %x, \"%s\"), got (%x, %x, \"%s\")", $tupidx,
 			0xDEADF9F9, 0xDEADF9F9, "abcdefg", $a_1, $a_2, $b);
 		exit;
 	}
@@ -318,6 +384,9 @@ use constant HEAP_XMAX_INVALID   => 0x0800;
 use constant HEAP_NATTS_MASK     => 0x07FF;
 use constant HEAP_XMAX_IS_MULTI  => 0x1000;
 use constant HEAP_KEYS_UPDATED   => 0x2000;
+use constant HEAP_HOT_UPDATED    => 0x4000;
+use constant HEAP_ONLY_TUPLE     => 0x8000;
+use constant HEAP_UPDATED        => 0x2000;
 
 # Helper function to generate a regular expression matching the header we
 # expect verify_heapam() to return given which fields we expect to be non-null.
@@ -349,9 +418,49 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 {
 	my $offnum = $tupidx + 1;        # offnum is 1-based, not zero-based
 	my $offset = $lp_off[$tupidx];
+	my $header = header(0, $offnum, undef);
+	# offset -1 means its redirected lp.
+	if ($offset == -1)
+	{	# at offnum 19 we will unset HEAP_ONLY_TUPLE and HEAP_UPDATED flags.
+		if ($offnum == 17)
+		{
+			push @expected,
+			  qr/${header}redirected line pointer points to a non-heap-only tuple at offset \d+/;
+			push @expected,
+			  qr/${header}redirected line pointer points to a non-heap-updated tuple at offset \d+/;
+		}
+		elsif ($offnum == 18)
+		{
+			# we re-set lp offset to 17, we need to rewrite the 4 bytes values so that line pointer will be
+			# lp.off = 17, lp_flags = 2, lp_len = 0.
+			if ($ENDIANNESS eq 'little')
+			{
+				sysseek($file, 92, 0)
+				  or BAIL_OUT("sysseek failed: $!");
+				syswrite(
+					$file,
+					pack("L",
+						0x00010011)
+				) or BAIL_OUT("syswrite failed: $!");
+			}
+			else
+			{
+				sysseek($file, 92, 0)
+				  or BAIL_OUT("sysseek failed: $!");
+				syswrite(
+					$file,
+					pack("L",
+						0x11000100)
+				) or BAIL_OUT("syswrite failed: $!");
+
+			}
+			push @expected,
+			  qr/${header}redirected line pointer points to another redirected line pointer at offset \d+/;
+		}
+		next;
+	}
 	my $tup = read_tuple($file, $offset);
 
-	my $header = header(0, $offnum, undef);
 	if ($offnum == 1)
 	{
 		# Corruptly set xmin < relfrozenxid
@@ -502,7 +611,7 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 		push @expected,
 		  qr/${header}multitransaction ID 4 equals or exceeds next valid multitransaction ID 1/;
 	}
-	elsif ($offnum == 15)    # Last offnum must equal ROWCOUNT
+	elsif ($offnum == 15)
 	{
 		# Set both HEAP_XMAX_COMMITTED and HEAP_XMAX_IS_MULTI
 		$tup->{t_infomask} |= HEAP_XMAX_COMMITTED;
@@ -512,6 +621,112 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 		push @expected,
 		  qr/${header}multitransaction ID 4000000000 precedes relation minimum multitransaction ID threshold 1/;
 	}
+	# Test for redirected line pointer.
+	# offnum 17 and 18 are redirected line pointer, so don't need any tuple
+	# validation.
+	elsif ($offnum == 19)
+	{
+		# unset HEAP_ONLY_TUPLE and HEAP_UPDATED flag.
+		$tup->{t_infomask2} &= ~HEAP_ONLY_TUPLE;
+		$tup->{t_infomask} &= ~HEAP_UPDATED;
+	}
+	# offnum 18 is redirected lp and is redirected to offset 20,
+	# We have corrupted it to route its lp.off to point it to line pointer at
+	# offset 17.
+
+	# Test related to HOT chains.
+	elsif ($offnum == 21)
+	{
+		# Unset HEAP_HOT_UPDATED.
+		$tup->{t_infomask2} &= ~HEAP_HOT_UPDATED;
+		$pred_xmax = $tup->{t_xmax}; # to be used for tuple at offnum 22.
+		$pred_posid = $tup->{ip_posid}; # to be used for tuple at offnum 22.
+		push @expected,
+		  qr/${header}non-heap-only update produced a heap-only tuple at offset \d+/;
+	}
+	elsif ($offnum == 22)
+	{
+		# Set ip_posid and t_xmax from ip_posid and t_xmax of tuple at offnum 21.
+		$tup->{t_xmax} = $pred_xmax;
+		$tup->{ip_posid} = $pred_posid;
+		push @expected,
+		  qr/${header}updated version at offset \d+ is also the updated version of tuple at offset \d+/;
+	}
+	elsif ($offnum == 23)
+	{
+		# Get aborted xid, that is needed to test corruption at offnum 24.
+		$aborted_xid = $tup->{t_xmax};
+	}
+	elsif ($offnum == 24)
+	{
+		# Set xmin to aborted xid.
+		$tup->{t_xmin} = $aborted_xid;
+		$tup->{t_infomask} &= ~HEAP_XMIN_COMMITTED;
+		push @expected,
+		  qr/${header}tuple with aborted xmin \d+ was updated to produce a tuple at offset \d+ with committed xmin \d+/;
+	}
+	elsif ($offnum == 25)
+	{
+		# Raised corruption as root of HOT chain can't be HEAP_ONLY_TUPLE.
+		# set HEAP_ONLY_TUPLE.
+		$tup->{t_infomask2} |= HEAP_ONLY_TUPLE;
+		push @expected,
+		  qr/${header}tuple is root of chain but is marked as heap-only tuple/;
+	}
+	elsif ($offnum == 26)
+	{
+		# Next updated Tuple at offnum 33 is corrupted.
+		push @expected,
+		  qr/${header}heap-only update produced a non-heap only tuple at offset \d+/;
+	}
+	elsif ($offnum == 27)
+	{
+		# set xmax to invalid transaction id.
+		$tup->{t_xmax} = 0;
+		push @expected,
+		  qr/${header}tuple has been HOT updated, but xmax is 0/;
+	}
+	elsif ($offnum == 28)
+	{
+		# set xmax to invalid transaction id.
+		$tup->{t_xmin} = $in_progress_xid;
+		$tup->{t_infomask} &= ~HEAP_XMIN_COMMITTED;
+		push @expected,
+		  qr/${header}tuple with in-progress xmin \d+ was updated to produce a tuple at offset \d+ with committed xmin \d+/;
+	}
+	elsif ($offnum == 29)
+	{
+		# set xmax to invalid transaction id.
+		$tup->{t_xmin} = $aborted_xid;
+		$tup->{t_xmax} = $in_progress_xid;
+		$tup->{t_infomask} &= ~HEAP_XMIN_COMMITTED;
+		push @expected,
+		  qr/${header}tuple with aborted xmin \d+ was updated to produce a tuple at offset \d+ with in-progress xmin \d+/;
+	}
+	# Tuple at offnum 30 is an update of tuple at offnum 21.
+
+	# Tuple at offnum 31 is an update of tuple at 24.
+
+	# Tuple at offnum 32 is an update of tuple at 25.
+
+	# Tuple at offnum 33 is an update of tuple at offnum 26.
+	elsif($offnum == 33)
+	{
+		# Unset HEAP_ONLY_TUPLE, corrupton will be raised for tuple at offnum #26
+		$tup->{t_infomask2} &= ~HEAP_ONLY_TUPLE;
+	}
+	# Tuple at offnum 34 is an update of corrupted tuple at offnum 27.
+	# Tuple at offnum 35 is an update of corrupted tuple at offnum 28.
+	# Tuple at offnum 36 is an update of tuple at offnum 29.
+	elsif ($offnum == 36)
+	{
+		# set xmax to invalid transaction id.
+		$tup->{t_xmin} = $in_progress_xid;
+		$tup->{t_infomask} &= ~HEAP_XMIN_COMMITTED;
+	}
+	# Tuple at offnum 37 is an update of tuple at offnum 22.
+	# offset 37 is an updated tuple of tuple at offset #23 and was updated by an aborted transaction.
+	# this is needed to have aborted transaction xid to test corruption related to aborted transaction at offset #24.
 	write_tuple($file, $offset, $tup);
 }
 close($file)
@@ -523,6 +738,10 @@ $node->start;
 $node->command_checks_all(
 	[ 'pg_amcheck', '--no-dependent-indexes', '-p', $port, 'postgres' ],
 	2, [@expected], [], 'Expected corruption message output');
+$node->safe_psql(
+        'postgres', qq(
+                        COMMIT PREPARED 'in_progress_tx';
+        ));
 
 $node->teardown_node;
 $node->clean_node;
-- 
2.25.1

Reply via email to