From fb1cca76163226e450d5e04b30e4e867446d72d0 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Mon, 8 Sep 2025 18:25:00 +0800
Subject: [PATCH v3] Add a race condition test

An injection point is introduced along with a tap-test to verify that the
conflict-relevant is not prematurely removed when a concurrent prepared
transaction is being committed on the publisher. Also, verify that the
DELAY_CHKPT_IN_COMMIT marking and timestamp acquisition occur correctly.
---
 src/backend/access/transam/twophase.c    |   6 +
 src/test/subscription/Makefile           |   4 +-
 src/test/subscription/meson.build        |   5 +-
 src/test/subscription/t/035_conflicts.pl | 160 +++++++++++++++++++++++
 4 files changed, 173 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 3e20f448787..d8e2fce2c99 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -103,6 +103,7 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/builtins.h"
+#include "utils/injection_point.h"
 #include "utils/memutils.h"
 #include "utils/timestamp.h"
 
@@ -2332,12 +2333,17 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
 				  replorigin_session_origin != DoNotReplicateId);
 
+	/* Load the injection point before entering the critical section */
+	INJECTION_POINT_LOAD("commit-after-delay-checkpoint");
+
 	START_CRIT_SECTION();
 
 	/* See notes in RecordTransactionCommit */
 	Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0);
 	MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT;
 
+	INJECTION_POINT_CACHED("commit-after-delay-checkpoint", NULL);
+
 	/*
 	 * Ensures the DELAY_CHKPT_IN_COMMIT flag write is globally visible before
 	 * commit time is written.
diff --git a/src/test/subscription/Makefile b/src/test/subscription/Makefile
index 50b65d8f6ea..9d97e7d5c0d 100644
--- a/src/test/subscription/Makefile
+++ b/src/test/subscription/Makefile
@@ -13,9 +13,11 @@ subdir = src/test/subscription
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-EXTRA_INSTALL = contrib/hstore
+EXTRA_INSTALL = contrib/hstore \
+	src/test/modules/injection_points
 
 export with_icu
+export enable_injection_points
 
 check:
 	$(prove_check)
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index 586ffba434e..20b4e523d93 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -5,7 +5,10 @@ tests += {
   'sd': meson.current_source_dir(),
   'bd': meson.current_build_dir(),
   'tap': {
-    'env': {'with_icu': icu.found() ? 'yes' : 'no'},
+    'env': {
+      'with_icu': icu.found() ? 'yes' : 'no',
+      'enable_injection_points': get_option('injection_points') ? 'yes' : 'no',
+    },
     'tests': [
       't/001_rep_changes.pl',
       't/002_types.pl',
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl
index e06429c288f..2dc93defc34 100644
--- a/src/test/subscription/t/035_conflicts.pl
+++ b/src/test/subscription/t/035_conflicts.pl
@@ -415,6 +415,166 @@ $node_B->safe_psql('postgres', "ALTER PUBLICATION tap_pub_B ADD TABLE tab");
 $node_A->safe_psql('postgres',
 	"ALTER SUBSCRIPTION $subname_AB REFRESH PUBLICATION WITH (copy_data = false)");
 
+###############################################################################
+# Test that publisher's transactions marked with DELAY_CHKPT_IN_COMMIT prevent
+# concurrently deleted tuples on the subscriber from being removed. This test
+# also acts as a safeguard to prevent developers from moving the commit
+# timestamp acquisition before marking DELAY_CHKPT_IN_COMMIT in
+# RecordTransactionCommitPrepared.
+###############################################################################
+
+my $injection_points_supported = $node_B->check_extension('injection_points');
+
+# This test depends on an injection point to block the prepared transaction
+# commit after marking DELAY_CHKPT_IN_COMMIT flag.
+if ($injection_points_supported != 0)
+{
+	$node_B->append_conf('postgresql.conf',
+		"shared_preload_libraries = 'injection_points'
+		max_prepared_transactions = 1");
+	$node_B->restart;
+
+	# Disable the subscription on Node B for testing only one-way
+	# replication.
+	$node_B->psql('postgres', "ALTER SUBSCRIPTION $subname_BA DISABLE;");
+
+	# Wait for the apply worker to stop
+	$node_B->poll_query_until('postgres',
+		"SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'"
+	);
+
+	# Truncate the table to cleanup existing dead rows in the table. Then insert
+	# a new row.
+	$node_B->safe_psql(
+		'postgres', qq(
+		TRUNCATE tab;
+		INSERT INTO tab VALUES(1, 1);
+	));
+
+	$node_B->wait_for_catchup($subname_AB);
+
+	# Create the injection_points extension on the publisher node and attach to the
+	# commit-after-delay-checkpoint injection point.
+	$node_B->safe_psql(
+		'postgres',
+		"CREATE EXTENSION injection_points;
+		 SELECT injection_points_attach('commit-after-delay-checkpoint', 'wait');"
+	);
+
+	# Start a background session on the publisher node to perform an update and
+	# pause at the injection point.
+	my $pub_session = $node_B->background_psql('postgres');
+	$pub_session->query_until(
+		qr/starting_bg_psql/,
+		q{
+			\echo starting_bg_psql
+			BEGIN;
+			UPDATE tab SET b = 2 WHERE a = 1;
+			PREPARE TRANSACTION 'txn_with_later_commit_ts';
+			COMMIT PREPARED 'txn_with_later_commit_ts';
+		}
+	);
+
+	# Confirm the update is suspended
+	$result =
+	  $node_B->safe_psql('postgres', 'SELECT * FROM tab WHERE a = 1');
+	is($result, qq(1|1), 'publisher sees the old row');
+
+	# Delete the row on the subscriber. The deleted row should be retained due to a
+	# transaction on the publisher, which is currently marked with the
+	# DELAY_CHKPT_IN_COMMIT flag.
+	$node_A->safe_psql('postgres', "DELETE FROM tab WHERE a = 1;");
+
+	# Get the commit timestamp for the delete
+	my $sub_ts = $node_A->safe_psql('postgres',
+		"SELECT timestamp FROM pg_last_committed_xact();");
+
+	$log_location = -s $node_A->logfile;
+
+	# Confirm that the apply worker keeps requesting publisher status, while
+	# awaiting the prepared transaction to commit. Thus, the request log should
+	# appear more than once.
+	$node_A->wait_for_log(
+		qr/sending publisher status request message/,
+		$log_location);
+
+	$log_location = -s $node_A->logfile;
+
+	$node_A->wait_for_log(
+		qr/sending publisher status request message/,
+		$log_location);
+
+	# Confirm that the dead tuple cannot be removed
+	($cmdret, $stdout, $stderr) =
+	  $node_A->psql('postgres', qq(VACUUM (verbose) public.tab;));
+
+	ok($stderr =~ qr/1 are dead but not yet removable/,
+		'the deleted column is non-removable');
+
+	$log_location = -s $node_A->logfile;
+
+	# Wakeup and detach the injection point on the publisher node. The prepared
+	# transaction should now commit.
+	$node_B->safe_psql(
+		'postgres',
+		"SELECT injection_points_wakeup('commit-after-delay-checkpoint');
+		 SELECT injection_points_detach('commit-after-delay-checkpoint');"
+	);
+
+	# Close the background session on the publisher node
+	ok($pub_session->quit, "close publisher session");
+
+	# Confirm that the transaction committed
+	$result =
+	  $node_B->safe_psql('postgres', 'SELECT * FROM tab WHERE a = 1');
+	is($result, qq(1|2), 'publisher sees the new row');
+
+	# Ensure the UPDATE is replayed on subscriber
+	$node_B->wait_for_catchup($subname_AB);
+
+	$logfile = slurp_file($node_A->logfile(), $log_location);
+	ok( $logfile =~
+		  qr/conflict detected on relation "public.tab": conflict=update_deleted.*
+.*DETAIL:.* The row to be updated was deleted locally in transaction [0-9]+ at .*
+.*Remote row \(1, 2\); replica identity full \(1, 1\)/,
+		'update target row was deleted in tab');
+
+	# Remember the next transaction ID to be assigned
+	$next_xid =
+	  $node_A->safe_psql('postgres', "SELECT txid_current() + 1;");
+
+	# Confirm that the xmin value is advanced to the latest nextXid even if there is
+	# one transaction on the publisher that has not committed.
+	ok( $node_A->poll_query_until(
+			'postgres',
+			"SELECT xmin = $next_xid from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
+		),
+		"the xmin value of slot 'pg_conflict_detection' is updated on subscriber"
+	);
+
+	# Confirm that the dead tuple can be removed now
+	($cmdret, $stdout, $stderr) =
+	  $node_A->psql('postgres', qq(VACUUM (verbose) public.tab;));
+
+	ok($stderr =~ qr/1 removed, 0 remain, 0 are dead but not yet removable/,
+		'the deleted column is removed');
+
+	# Get the commit timestamp for the publisher's update
+	my $pub_ts = $node_B->safe_psql('postgres',
+		"SELECT pg_xact_commit_timestamp(xmin) from tab where a=1;");
+
+	# Check that the commit timestamp for the update on the publisher is later than
+	# or equal to the timestamp of the local deletion, as the commit timestamp
+	# should be assigned after marking the DELAY_CHKPT_IN_COMMIT flag.
+	$result = $node_B->safe_psql('postgres',
+		"SELECT '$pub_ts'::timestamp >= '$sub_ts'::timestamp");
+	is($result, qq(t),
+		"pub UPDATE's timestamp is later than that of sub's DELETE");
+
+	# Re-enable the subscription for further tests
+	$node_B->psql('postgres', "ALTER SUBSCRIPTION $subname_BA ENABLE;");
+}
+
 ###############################################################################
 # Check that dead tuple retention stops due to the wait time surpassing
 # max_retention_duration.
-- 
2.51.0.windows.1

