On Tue, Jan 31, 2023 at 7:20 PM Robert Haas <robertmh...@gmail.com> wrote:

> On Mon, Jan 30, 2023 at 8:24 AM Himanshu Upadhyaya
> <upadhyaya.himan...@gmail.com> wrote:
> > Before this we stop the node by "$node->stop;" and then only we progress
> to
> > manual corruption. This will abort all running/in-progress transactions.
> > So, if we create an in-progress transaction and comment "$node->stop;"
> > then somehow all the code that we have for manual corruption does not
> work.
> >
> > I think it is required to stop the server and then only proceed for
> manual corruption?
> > If this is the case then please suggest if there is a way to get an
> in-progress transaction
> > that we can use for manual corruption.
>
> How about using a prepared transaction?
>
> Thanks, yes it's working fine with Prepared Transaction.
Please find attached the v9 patch incorporating all the review comments.


-- 
Regards,
Himanshu Upadhyaya
EnterpriseDB: http://www.enterprisedb.com
From d241937841c5990ff4df00d1abc6bfbb3491ac3e Mon Sep 17 00:00:00 2001
From: Himanshu Upadhyaya <himanshu.upadhyaya@enterprisedb.com>
Date: Sun, 5 Feb 2023 12:50:21 +0530
Subject: [PATCH v9] Implement HOT chain validation in verify_heapam()

Himanshu Upadhyaya, reviewed by Robert Haas, Aleksander Alekseev, Andres Freund
---
 contrib/amcheck/verify_heapam.c           | 297 ++++++++++++++++++++++
 src/bin/pg_amcheck/t/004_verify_heapam.pl | 241 +++++++++++++++++-
 2 files changed, 527 insertions(+), 11 deletions(-)

diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index 4fcfd6df72..01c639b5e1 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -399,9 +399,14 @@ verify_heapam(PG_FUNCTION_ARGS)
 	for (ctx.blkno = first_block; ctx.blkno <= last_block; ctx.blkno++)
 	{
 		OffsetNumber maxoff;
+		OffsetNumber predecessor[MaxOffsetNumber];
+		OffsetNumber successor[MaxOffsetNumber];
+		bool		lp_valid[MaxOffsetNumber];
 
 		CHECK_FOR_INTERRUPTS();
 
+		memset(predecessor, 0, sizeof(OffsetNumber) * MaxOffsetNumber);
+
 		/* Optionally skip over all-frozen or all-visible blocks */
 		if (skip_option != SKIP_PAGES_NONE)
 		{
@@ -433,6 +438,10 @@ verify_heapam(PG_FUNCTION_ARGS)
 		for (ctx.offnum = FirstOffsetNumber; ctx.offnum <= maxoff;
 			 ctx.offnum = OffsetNumberNext(ctx.offnum))
 		{
+			OffsetNumber nextoffnum;
+
+			successor[ctx.offnum] = InvalidOffsetNumber;
+			lp_valid[ctx.offnum] = false;
 			ctx.itemid = PageGetItemId(ctx.page, ctx.offnum);
 
 			/* Skip over unused/dead line pointers */
@@ -469,6 +478,13 @@ verify_heapam(PG_FUNCTION_ARGS)
 					report_corruption(&ctx,
 									  psprintf("line pointer redirection to unused item at offset %u",
 											   (unsigned) rdoffnum));
+
+				/*
+				 * Make entry in successor array, redirected lp will be
+				 * validated at the time when we loop over successor array.
+				 */
+				successor[ctx.offnum] = rdoffnum;
+				lp_valid[ctx.offnum] = true;
 				continue;
 			}
 
@@ -504,9 +520,283 @@ verify_heapam(PG_FUNCTION_ARGS)
 			/* It should be safe to examine the tuple's header, at least */
 			ctx.tuphdr = (HeapTupleHeader) PageGetItem(ctx.page, ctx.itemid);
 			ctx.natts = HeapTupleHeaderGetNatts(ctx.tuphdr);
+			lp_valid[ctx.offnum] = true;
 
 			/* Ok, ready to check this next tuple */
 			check_tuple(&ctx);
+
+			/*
+			 * Add the data to the successor array if next updated tuple is in
+			 * the same page. It will be used later to generate the
+			 * predecessor array.
+			 *
+			 * We need to access the tuple's header to populate the
+			 * predecessor array. However the tuple is not necessarily sanity
+			 * checked yet so delaying construction of predecessor array until
+			 * all tuples are sanity checked.
+			 */
+			nextoffnum = ItemPointerGetOffsetNumber(&(ctx.tuphdr)->t_ctid);
+			if (ItemPointerGetBlockNumber(&(ctx.tuphdr)->t_ctid) == ctx.blkno &&
+				nextoffnum != ctx.offnum)
+			{
+				successor[ctx.offnum] = nextoffnum;
+			}
+		}
+
+		/*
+		 * Loop over offset and populate predecessor array from all entries
+		 * that are present in successor array.
+		 */
+		ctx.attnum = -1;
+		for (ctx.offnum = FirstOffsetNumber; ctx.offnum <= maxoff;
+			 ctx.offnum = OffsetNumberNext(ctx.offnum))
+		{
+			ItemId		curr_lp;
+			ItemId		next_lp;
+			HeapTupleHeader curr_htup;
+			HeapTupleHeader next_htup;
+			TransactionId curr_xmax;
+			TransactionId next_xmin;
+
+			OffsetNumber nextoffnum = successor[ctx.offnum];
+
+			if (nextoffnum == InvalidOffsetNumber)
+			{
+				/*
+				 * This is either the last updated tuple in the chain or a
+				 * corrupted Tuple/lp or unused/dead line pointer.
+				 */
+				continue;
+			}
+
+			/*
+			 * Add data to the predecessor array even if the current or
+			 * successor's LP is not valid. We will not process/validate these
+			 * offset entries while looping over the predecessor array but
+			 * having all entries in the predecessor array will help in
+			 * identifying(and validating) the Root of a chain.
+			 */
+			if (!lp_valid[ctx.offnum] || !lp_valid[nextoffnum])
+			{
+				predecessor[nextoffnum] = ctx.offnum;
+				continue;
+			}
+
+			curr_lp = PageGetItemId(ctx.page, ctx.offnum);
+			if (ItemIdIsRedirected(curr_lp))
+			{
+				next_lp = PageGetItemId(ctx.page, nextoffnum);
+				if (ItemIdIsRedirected(next_lp))
+				{
+					report_corruption(&ctx,
+									  psprintf("redirected line pointer points to another redirected line pointer at offset %u",
+											   (unsigned) nextoffnum));
+					continue;
+				}
+				next_htup = (HeapTupleHeader) PageGetItem(ctx.page, next_lp);
+				if (!HeapTupleHeaderIsHeapOnly(next_htup))
+				{
+					report_corruption(&ctx,
+									  psprintf("redirected line pointer points to a non-heap-only tuple at offset %u",
+											   (unsigned) nextoffnum));
+				}
+				if ((next_htup->t_infomask & HEAP_UPDATED) == 0)
+				{
+					report_corruption(&ctx,
+									  psprintf("redirected line pointer points to a non-heap-updated tuple at offset %u",
+											   (unsigned) nextoffnum));
+				}
+
+				/*
+				 * Add data related to redirected offset to predecessor array
+				 * so that we can differentiate between all the cases of
+				 * missing offset in predecessor array, this will help in
+				 * validating the root of chain when we loop over predecessor
+				 * array.
+				 */
+				predecessor[nextoffnum] = ctx.offnum;
+				continue;
+			}
+
+			/*
+			 * Add a line pointer offset to the predecessor array if xmax is
+			 * matching with xmin of next tuple (reaching via its t_ctid).
+			 * Raise corruption if we have two tuples having the same
+			 * predecessor.
+			 *
+			 * We add the offset to the predecessor array irrespective of the
+			 * transaction (t_xmin) status. We will do validation related to
+			 * the transaction status (and also all other validations) when we
+			 * loop over the predecessor array.
+			 */
+			curr_htup = (HeapTupleHeader) PageGetItem(ctx.page, curr_lp);
+			curr_xmax = HeapTupleHeaderGetUpdateXid(curr_htup);
+
+			next_lp = PageGetItemId(ctx.page, nextoffnum);
+			next_htup = (HeapTupleHeader) PageGetItem(ctx.page, next_lp);
+			next_xmin = HeapTupleHeaderGetXmin(next_htup);
+			if (TransactionIdIsValid(curr_xmax) &&
+				TransactionIdEquals(curr_xmax, next_xmin))
+			{
+				if (predecessor[nextoffnum] != InvalidOffsetNumber)
+				{
+					report_corruption(&ctx,
+									  psprintf("updated version at offset %u is also the updated version of tuple at offset %u",
+											   (unsigned) nextoffnum, (unsigned) predecessor[nextoffnum]));
+					continue;
+				}
+			}
+			/* Non matching xmax with xmin is not a corruption */
+			predecessor[nextoffnum] = ctx.offnum;
+
+		}
+
+		/* Loop over offsets and validate the data in the predecessor array. */
+		for (OffsetNumber currentoffnum = FirstOffsetNumber; currentoffnum <= maxoff;
+			 currentoffnum = OffsetNumberNext(currentoffnum))
+		{
+			HeapTupleHeader pred_htup;
+			HeapTupleHeader curr_htup;
+			TransactionId pred_xmin;
+			TransactionId curr_xmin;
+			TransactionId pred_xmax;
+			ItemId		pred_lp;
+			ItemId		curr_lp;
+			bool		pred_in_progress;
+			XidBoundsViolation xid_status;
+
+			ctx.offnum = predecessor[currentoffnum];
+			ctx.attnum = -1;
+
+			/* Don't process if predecessor or current offset is not valid */
+			if (!lp_valid[currentoffnum] ||
+				(ctx.offnum != InvalidOffsetNumber && !lp_valid[ctx.offnum]))
+				continue;
+
+			curr_lp = PageGetItemId(ctx.page, currentoffnum);
+			if (ItemIdIsRedirected(curr_lp))
+				continue;
+
+			curr_htup = (HeapTupleHeader) PageGetItem(ctx.page, curr_lp);
+			curr_xmin = HeapTupleHeaderGetXmin(curr_htup);
+			xid_status = get_xid_status(curr_xmin, &ctx, NULL);
+			if (!(xid_status == XID_BOUNDS_OK || xid_status == XID_INVALID))
+				continue;
+
+			if (ctx.offnum == InvalidOffsetNumber)
+			{
+				/*
+				 * No harm in overriding value of ctx.offnum as we will always
+				 * continue if we are here.
+				 */
+				ctx.offnum = currentoffnum;
+				if (TransactionIdIsInProgress(curr_xmin) ||
+					TransactionIdDidCommit(curr_xmin))
+				{
+					/* Root of chain must not be HEAP_ONLY_TUPLE. */
+					if (HeapTupleHeaderIsHeapOnly(curr_htup))
+					{
+						report_corruption(&ctx,
+										  psprintf("tuple is root of chain but is marked as heap-only tuple"));
+					}
+					continue;
+				}
+				else
+				{
+					/*
+					 * xmin aborted tuple from an abandoned portion of the
+					 * chain.
+					 */
+					continue;
+				}
+			}
+
+			ctx.itemid = pred_lp = PageGetItemId(ctx.page, ctx.offnum);
+
+			/*
+			 * Redirected LP were validated previously, so don't need any
+			 * validation.
+			 */
+			if (ItemIdIsRedirected(pred_lp))
+				continue;
+
+			pred_htup = (HeapTupleHeader) PageGetItem(ctx.page, pred_lp);
+			pred_xmin = HeapTupleHeaderGetXmin(pred_htup);
+			xid_status = get_xid_status(pred_xmin, &ctx, NULL);
+			if (!(xid_status == XID_BOUNDS_OK || xid_status == XID_INVALID))
+				continue;
+
+			pred_xmax = HeapTupleHeaderGetUpdateXid(pred_htup);
+
+			/*
+			 * No need to validate if XMIN and XMAX are not matching
+			 * because they can be from different chains in case of a
+			 * mismatch.
+			 */
+			if (!(TransactionIdIsValid(pred_xmax) &&
+				  TransactionIdEquals(pred_xmax, curr_xmin)))
+			{
+				continue;
+			}
+
+			/*
+			 * If the predecessor's xmin is in progress then current tuple's
+			 * xmin should either be aborted (in case of subtransaction) or
+			 * in-progress.
+			 */
+			pred_in_progress = TransactionIdIsInProgress(pred_xmin);
+			if (pred_in_progress && TransactionIdDidCommit(curr_xmin))
+			{
+				/* Re-check to avoid race condition */
+				if (TransactionIdIsInProgress(pred_xmin))
+				{
+					report_corruption(&ctx,
+									  psprintf("tuple with in-progress xmin %u was updated to produce a tuple at offset %u with committed xmin %u",
+											   (unsigned) pred_xmin, (unsigned) currentoffnum, (unsigned) curr_xmin));
+				}
+				else
+				{
+					pred_in_progress = false;
+				}
+			}
+
+			/* raise corruption if pred_xmin is aborted but curr_xmin is not */
+			if (!pred_in_progress && !TransactionIdDidCommit(pred_xmin))
+			{
+				if (TransactionIdIsInProgress(curr_xmin))
+					report_corruption(&ctx,
+									  psprintf("tuple with aborted xmin %u was updated to produce a tuple at offset %u with in-progress xmin %u",
+											   (unsigned) pred_xmin, (unsigned) currentoffnum, (unsigned) curr_xmin));
+				else if (TransactionIdDidCommit(curr_xmin))
+					report_corruption(&ctx,
+									  psprintf("tuple with aborted xmin %u was updated to produce a tuple at offset %u with committed xmin %u",
+											   (unsigned) pred_xmin, (unsigned) currentoffnum, (unsigned) curr_xmin));
+
+			}
+
+			/*
+			 * If the current tuple is HOT then it's predecessor's tuple must
+			 * be HEAP_HOT_UPDATED.
+			 */
+			if (!HeapTupleHeaderIsHotUpdated(pred_htup) &&
+				HeapTupleHeaderIsHeapOnly(curr_htup))
+			{
+				report_corruption(&ctx,
+								  psprintf("non-heap-only update produced a heap-only tuple at offset %u",
+										   (unsigned) currentoffnum));
+			}
+
+			/*
+			 * If the current tuple is not HOT then its predecessor's tuple
+			 * must not be HEAP_HOT_UPDATED.
+			 */
+			if (HeapTupleHeaderIsHotUpdated(pred_htup) &&
+				!HeapTupleHeaderIsHeapOnly(curr_htup))
+			{
+				report_corruption(&ctx,
+								  psprintf("heap-only update produced a non-heap only tuple at offset %u",
+										   (unsigned) currentoffnum));
+			}
 		}
 
 		/* clean up */
@@ -638,6 +928,7 @@ check_tuple_header(HeapCheckContext *ctx)
 {
 	HeapTupleHeader tuphdr = ctx->tuphdr;
 	uint16		infomask = tuphdr->t_infomask;
+	TransactionId curr_xmax = HeapTupleHeaderGetUpdateXid(tuphdr);
 	bool		result = true;
 	unsigned	expected_hoff;
 
@@ -649,6 +940,12 @@ check_tuple_header(HeapCheckContext *ctx)
 		result = false;
 	}
 
+	if (!TransactionIdIsValid(curr_xmax) && HeapTupleHeaderIsHotUpdated(tuphdr))
+	{
+		report_corruption(ctx,
+						  psprintf("tuple has been HOT updated, but xmax is 0"));
+		result = false;
+	}
 	if ((ctx->tuphdr->t_infomask & HEAP_XMAX_COMMITTED) &&
 		(ctx->tuphdr->t_infomask & HEAP_XMAX_IS_MULTI))
 	{
diff --git a/src/bin/pg_amcheck/t/004_verify_heapam.pl b/src/bin/pg_amcheck/t/004_verify_heapam.pl
index 215c30eaa8..f5b4092343 100644
--- a/src/bin/pg_amcheck/t/004_verify_heapam.pl
+++ b/src/bin/pg_amcheck/t/004_verify_heapam.pl
@@ -174,12 +174,16 @@ sub write_tuple
 # Set umask so test directories and files are created with default permissions
 umask(0077);
 
+my $pred_xmax;
+my $pred_posid;
+my $aborted_xid;
 # Set up the node.  Once we create and corrupt the table,
 # autovacuum workers visiting the table could crash the backend.
 # Disable autovacuum so that won't happen.
 my $node = PostgreSQL::Test::Cluster->new('test');
 $node->init;
 $node->append_conf('postgresql.conf', 'autovacuum=off');
+$node->append_conf('postgresql.conf','max_prepared_transactions=100');
 
 # Start the node and load the extensions.  We depend on both
 # amcheck and pageinspect for this test.
@@ -217,7 +221,9 @@ my $rel = $node->safe_psql('postgres',
 my $relpath = "$pgdata/$rel";
 
 # Insert data and freeze public.test
-use constant ROWCOUNT => 16;
+use constant ROWCOUNT => 37 ; # Total row count in this page.
+use constant ROWCOUNT_HOTCHAIN => 21; # Row count related to test of HOT chains validations and redirected LP.
+# First insert data needed for non-HOT chain validation.
 $node->safe_psql(
 	'postgres', qq(
 	INSERT INTO public.test (a, b, c)
@@ -227,7 +233,54 @@ $node->safe_psql(
 			repeat('w', 10000)
 		);
 	VACUUM FREEZE public.test
-	)) for (1 .. ROWCOUNT);
+	)) for (1 .. ROWCOUNT-ROWCOUNT_HOTCHAIN);
+
+# Data for Redirected LP.
+$node->safe_psql(
+	'postgres', qq(
+		INSERT INTO public.test (a, b, c)
+			VALUES ( x'DEADF9F9DEADF9F9'::bigint, 'abcdefg', generate_series(1,2));
+		UPDATE public.test SET c = 'a' WHERE c = '1';
+		UPDATE public.test SET c = 'a' WHERE c = '2';
+		VACUUM FREEZE public.test;
+	));
+
+# Data for HOT chains validation, so not calling VACUUM FREEZE.
+$node->safe_psql(
+	'postgres', qq(
+		INSERT INTO public.test (a, b, c)
+			VALUES ( x'DEADF9F9DEADF9F9'::bigint, 'abcdefg', generate_series(3,11));
+		UPDATE public.test SET c = 'a' WHERE c = '3';
+		UPDATE public.test SET c = 'a' WHERE c = '6';
+		UPDATE public.test SET c = 'a' WHERE c = '7';
+		UPDATE public.test SET c = 'a' WHERE c = '8';
+		UPDATE public.test SET c = 'a' WHERE c = '9';
+		UPDATE public.test SET c = 'a' WHERE c = '10';
+		UPDATE public.test SET c = 'a' WHERE c = '11';
+	));
+
+# Need one aborted transaction to test corruption in HOT chain.
+$node->safe_psql(
+	'postgres', qq(
+		BEGIN;
+			UPDATE public.test SET c = 'a' WHERE c = '5';
+		ABORT;
+	));
+
+# Need one in-progress transaction to test few corruption in HOT chain.
+# We are creating PREPARE TRANSACTION here as these will not be aborted
+# even if we stop the node.
+$node->safe_psql(
+	'postgres', qq(
+		BEGIN;
+			PREPARE TRANSACTION 'in_progress_tx';
+	));
+my $in_progress_xid = $node->safe_psql(
+				'postgres', qq(
+					SELECT transaction FROM pg_prepared_xacts;
+				));
+
+
 
 my $relfrozenxid = $node->safe_psql('postgres',
 	q(select relfrozenxid from pg_class where relname = 'test'));
@@ -249,12 +302,21 @@ if ($datfrozenxid <= 3 || $datfrozenxid >= $relfrozenxid)
 my @lp_off;
 for my $tup (0 .. ROWCOUNT - 1)
 {
-	push(
-		@lp_off,
-		$node->safe_psql(
-			'postgres', qq(
-select lp_off from heap_page_items(get_raw_page('test', 'main', 0))
-	offset $tup limit 1)));
+	my $islpredirected = $node->safe_psql('postgres',
+		qq(select lp_flags from heap_page_items(get_raw_page('test', 'main', 0)) offset $tup limit 1));
+	if ($islpredirected != 2)
+	{
+		push(
+			@lp_off,
+			$node->safe_psql(
+				'postgres', qq(
+			select lp_off from heap_page_items(get_raw_page('test', 'main', 0))
+				offset $tup limit 1)));
+	}
+	else
+	{
+		push(@lp_off, (-1));
+	}
 }
 
 # Sanity check that our 'test' table on disk layout matches expectations.  If
@@ -271,6 +333,10 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 {
 	my $offnum = $tupidx + 1;        # offnum is 1-based, not zero-based
 	my $offset = $lp_off[$tupidx];
+	if ($offset == -1)
+	{
+		next;
+	}
 	my $tup = read_tuple($file, $offset);
 
 	# Sanity-check that the data appears on the page where we expect.
@@ -283,7 +349,7 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 		$node->clean_node;
 		plan skip_all =>
 		  sprintf(
-			"Page layout differs from our expectations: expected (%x, %x, \"%s\"), got (%x, %x, \"%s\")",
+			"Page layout of index %d differs from our expectations: expected (%x, %x, \"%s\"), got (%x, %x, \"%s\")", $tupidx,
 			0xDEADF9F9, 0xDEADF9F9, "abcdefg", $a_1, $a_2, $b);
 		exit;
 	}
@@ -318,6 +384,9 @@ use constant HEAP_XMAX_INVALID   => 0x0800;
 use constant HEAP_NATTS_MASK     => 0x07FF;
 use constant HEAP_XMAX_IS_MULTI  => 0x1000;
 use constant HEAP_KEYS_UPDATED   => 0x2000;
+use constant HEAP_HOT_UPDATED    => 0x4000;
+use constant HEAP_ONLY_TUPLE     => 0x8000;
+use constant HEAP_UPDATED        => 0x2000;
 
 # Helper function to generate a regular expression matching the header we
 # expect verify_heapam() to return given which fields we expect to be non-null.
@@ -349,9 +418,49 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 {
 	my $offnum = $tupidx + 1;        # offnum is 1-based, not zero-based
 	my $offset = $lp_off[$tupidx];
+	my $header = header(0, $offnum, undef);
+	# offset -1 means its redirected lp.
+	if ($offset == -1)
+	{	# at offnum 19 we will unset HEAP_ONLY_TUPLE and HEAP_UPDATED flags.
+		if ($offnum == 17)
+		{
+			push @expected,
+			  qr/${header}redirected line pointer points to a non-heap-only tuple at offset \d+/;
+			push @expected,
+			  qr/${header}redirected line pointer points to a non-heap-updated tuple at offset \d+/;
+		}
+		elsif ($offnum == 18)
+		{
+			# we re-set lp offset to 17, we need to rewrite the 4 bytes values so that line pointer will be
+			# lp.off = 17, lp_flags = 2, lp_len = 0.
+			if ($ENDIANNESS eq 'little')
+			{
+				sysseek($file, 92, 0)
+				  or BAIL_OUT("sysseek failed: $!");
+				syswrite(
+					$file,
+					pack("L",
+						0x00010011)
+				) or BAIL_OUT("syswrite failed: $!");
+			}
+			else
+			{
+				sysseek($file, 92, 0)
+				  or BAIL_OUT("sysseek failed: $!");
+				syswrite(
+					$file,
+					pack("L",
+						0x11000100)
+				) or BAIL_OUT("syswrite failed: $!");
+
+			}
+			push @expected,
+			  qr/${header}redirected line pointer points to another redirected line pointer at offset \d+/;
+		}
+		next;
+	}
 	my $tup = read_tuple($file, $offset);
 
-	my $header = header(0, $offnum, undef);
 	if ($offnum == 1)
 	{
 		# Corruptly set xmin < relfrozenxid
@@ -502,7 +611,7 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 		push @expected,
 		  qr/${header}multitransaction ID 4 equals or exceeds next valid multitransaction ID 1/;
 	}
-	elsif ($offnum == 15)    # Last offnum must equal ROWCOUNT
+	elsif ($offnum == 15)
 	{
 		# Set both HEAP_XMAX_COMMITTED and HEAP_XMAX_IS_MULTI
 		$tup->{t_infomask} |= HEAP_XMAX_COMMITTED;
@@ -512,6 +621,112 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
 		push @expected,
 		  qr/${header}multitransaction ID 4000000000 precedes relation minimum multitransaction ID threshold 1/;
 	}
+	# Test for redirected line pointer.
+	# offnum 17 and 18 are redirected line pointer, so don't need any tuple
+	# validation.
+	elsif ($offnum == 19)
+	{
+		# unset HEAP_ONLY_TUPLE and HEAP_UPDATED flag.
+		$tup->{t_infomask2} &= ~HEAP_ONLY_TUPLE;
+		$tup->{t_infomask} &= ~HEAP_UPDATED;
+	}
+	# offnum 18 is redirected lp and is redirected to offset 20,
+	# We have corrupted it to route its lp.off to point it to line pointer at
+	# offset 17.
+
+	# Test related to HOT chains.
+	elsif ($offnum == 21)
+	{
+		# Unset HEAP_HOT_UPDATED.
+		$tup->{t_infomask2} &= ~HEAP_HOT_UPDATED;
+		$pred_xmax = $tup->{t_xmax}; # to be used for tuple at offnum 22.
+		$pred_posid = $tup->{ip_posid}; # to be used for tuple at offnum 22.
+		push @expected,
+		  qr/${header}non-heap-only update produced a heap-only tuple at offset \d+/;
+	}
+	elsif ($offnum == 22)
+	{
+		# Set ip_posid and t_xmax from ip_posid and t_xmax of tuple at offnum 21.
+		$tup->{t_xmax} = $pred_xmax;
+		$tup->{ip_posid} = $pred_posid;
+		push @expected,
+		  qr/${header}updated version at offset \d+ is also the updated version of tuple at offset \d+/;
+	}
+	elsif ($offnum == 23)
+	{
+		# Get aborted xid, that is needed to test corruption at offnum 24.
+		$aborted_xid = $tup->{t_xmax};
+	}
+	elsif ($offnum == 24)
+	{
+		# Set xmin to aborted xid.
+		$tup->{t_xmin} = $aborted_xid;
+		$tup->{t_infomask} &= ~HEAP_XMIN_COMMITTED;
+		push @expected,
+		  qr/${header}tuple with aborted xmin \d+ was updated to produce a tuple at offset \d+ with committed xmin \d+/;
+	}
+	elsif ($offnum == 25)
+	{
+		# Raised corruption as root of HOT chain can't be HEAP_ONLY_TUPLE.
+		# set HEAP_ONLY_TUPLE.
+		$tup->{t_infomask2} |= HEAP_ONLY_TUPLE;
+		push @expected,
+		  qr/${header}tuple is root of chain but is marked as heap-only tuple/;
+	}
+	elsif ($offnum == 26)
+	{
+		# Next updated Tuple at offnum 33 is corrupted.
+		push @expected,
+		  qr/${header}heap-only update produced a non-heap only tuple at offset \d+/;
+	}
+	elsif ($offnum == 27)
+	{
+		# set xmax to invalid transaction id.
+		$tup->{t_xmax} = 0;
+		push @expected,
+		  qr/${header}tuple has been HOT updated, but xmax is 0/;
+	}
+	elsif ($offnum == 28)
+	{
+		# set xmax to invalid transaction id.
+		$tup->{t_xmin} = $in_progress_xid;
+		$tup->{t_infomask} &= ~HEAP_XMIN_COMMITTED;
+		push @expected,
+		  qr/${header}tuple with in-progress xmin \d+ was updated to produce a tuple at offset \d+ with committed xmin \d+/;
+	}
+	elsif ($offnum == 29)
+	{
+		# set xmax to invalid transaction id.
+		$tup->{t_xmin} = $aborted_xid;
+		$tup->{t_xmax} = $in_progress_xid;
+		$tup->{t_infomask} &= ~HEAP_XMIN_COMMITTED;
+		push @expected,
+		  qr/${header}tuple with aborted xmin \d+ was updated to produce a tuple at offset \d+ with in-progress xmin \d+/;
+	}
+	# Tuple at offnum 30 is an update of tuple at offnum 21.
+
+	# Tuple at offnum 31 is an update of tuple at 24.
+
+	# Tuple at offnum 32 is an update of tuple at 25.
+
+	# Tuple at offnum 33 is an update of tuple at offnum 26.
+	elsif($offnum == 33)
+	{
+		# Unset HEAP_ONLY_TUPLE, corrupton will be raised for tuple at offnum #26
+		$tup->{t_infomask2} &= ~HEAP_ONLY_TUPLE;
+	}
+	# Tuple at offnum 34 is an update of corrupted tuple at offnum 27.
+	# Tuple at offnum 35 is an update of corrupted tuple at offnum 28.
+	# Tuple at offnum 36 is an update of tuple at offnum 29.
+	elsif ($offnum == 36)
+	{
+		# set xmax to invalid transaction id.
+		$tup->{t_xmin} = $in_progress_xid;
+		$tup->{t_infomask} &= ~HEAP_XMIN_COMMITTED;
+	}
+	# Tuple at offnum 37 is an update of tuple at offnum 22.
+	# offset 37 is an updated tuple of tuple at offset #23 and was updated by an aborted transaction.
+	# this is needed to have aborted transaction xid to test corruption related to aborted transaction at offset #24.
 	write_tuple($file, $offset, $tup);
 }
 close($file)
@@ -523,6 +738,10 @@ $node->start;
 $node->command_checks_all(
 	[ 'pg_amcheck', '--no-dependent-indexes', '-p', $port, 'postgres' ],
 	2, [@expected], [], 'Expected corruption message output');
+$node->safe_psql(
+        'postgres', qq(
+                        COMMIT PREPARED 'in_progress_tx';
+        ));
 
 $node->teardown_node;
 $node->clean_node;
-- 
2.25.1

Reply via email to