From 1f439e0c6cadc952eecbcded2d2d249d9fec9d36 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Wed, 26 Mar 2025 14:19:50 +0900
Subject: [PATCH] Use injection_point to stabilize 035_standby_logical_decoding

---
 src/backend/storage/ipc/standby.c             | 16 ++++++++
 .../t/035_standby_logical_decoding.pl         | 39 +++++++++++++++----
 2 files changed, 47 insertions(+), 8 deletions(-)

diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 5acb4508f85..35056eee67b 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -31,6 +31,7 @@
 #include "storage/sinvaladt.h"
 #include "storage/standby.h"
 #include "utils/hsearch.h"
+#include "utils/injection_point.h"
 #include "utils/ps_status.h"
 #include "utils/timeout.h"
 #include "utils/timestamp.h"
@@ -1287,6 +1288,21 @@ LogStandbySnapshot(void)
 
 	Assert(XLogStandbyInfoActive());
 
+	/* For testing slot invalidation due to the conflict */
+#ifdef USE_INJECTION_POINTS
+	if (IS_INJECTION_POINT_ATTACHED("log-running-xacts"))
+	{
+		/*
+		 * In 035_standby_logical_decoding.pl, RUNNING_XACTS could move slots's
+		 * xmin forward and cause random failures. Skip generating to avoid it.
+		 *
+		 * XXX What value should we return here? Originally this returns the
+		 * inserted location of RUNNING_XACT record. Based on that, here
+		 * returns the latest insert location for now.
+		 */
+		return GetInsertRecPtr();
+	}
+#endif
 	/*
 	 * Get details of any AccessExclusiveLocks being held at the moment.
 	 */
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl
index c31cab06f1c..1a721744ef0 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -10,6 +10,11 @@ use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
 
+if ($ENV{enable_injection_points} ne 'yes')
+{
+	plan skip_all => 'Injection points not supported by this build';
+}
+
 my ($stdout, $stderr, $cascading_stdout, $cascading_stderr, $handle);
 
 my $node_primary = PostgreSQL::Test::Cluster->new('primary');
@@ -251,6 +256,11 @@ sub wait_until_vacuum_can_remove
 {
 	my ($vac_option, $sql, $to_vac) = @_;
 
+	# Note that from this point the checkpointer and bgwriter will wait before
+	# they write RUNNING_XACT record.
+	$node_primary->safe_psql('testdb',
+		"SELECT injection_points_attach('log-running-xacts', 'wait');");
+
 	# Get the current xid horizon,
 	my $xid_horizon = $node_primary->safe_psql('testdb',
 		qq[select pg_snapshot_xmin(pg_current_snapshot());]);
@@ -258,6 +268,10 @@ sub wait_until_vacuum_can_remove
 	# Launch our sql.
 	$node_primary->safe_psql('testdb', qq[$sql]);
 
+	# XXX If the instance does not attach 'log-running-xacts', the bgwriter
+	# pocess would generate RUNNING_XACTS record, so that the test would fail.
+	sleep(20);
+
 	# Wait until we get a newer horizon.
 	$node_primary->poll_query_until('testdb',
 		"SELECT (select pg_snapshot_xmin(pg_current_snapshot())::text::int - $xid_horizon) > 0"
@@ -268,6 +282,12 @@ sub wait_until_vacuum_can_remove
 	$node_primary->safe_psql(
 		'testdb', qq[VACUUM $vac_option verbose $to_vac;
 										  INSERT INTO flush_wal DEFAULT VALUES;]);
+
+	$node_primary->wait_for_replay_catchup($node_standby);
+
+	# Resume working processes
+	$node_primary->safe_psql('testdb',
+		"SELECT injection_points_detach('log-running-xacts');");
 }
 
 ########################
@@ -285,6 +305,14 @@ autovacuum = off
 $node_primary->dump_info;
 $node_primary->start;
 
+# Check if the extension injection_points is available, as it may be
+# possible that this script is run with installcheck, where the module
+# would not be installed by default.
+if (!$node_primary->check_extension('injection_points'))
+{
+	plan skip_all => 'Extension injection_points not installed';
+}
+
 $node_primary->psql('postgres', q[CREATE DATABASE testdb]);
 
 $node_primary->safe_psql('testdb',
@@ -528,6 +556,9 @@ is($result, qq(10), 'check replicated inserts after subscription on standby');
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
 $node_subscriber->stop;
 
+# Create the injection_points extension
+$node_primary->safe_psql('testdb', 'CREATE EXTENSION injection_points;');
+
 ##################################################
 # Recovery conflict: Invalidate conflicting slots, including in-use slots
 # Scenario 1: hot_standby_feedback off and vacuum FULL
@@ -557,8 +588,6 @@ wait_until_vacuum_can_remove(
 	'full', 'CREATE TABLE conflict_test(x integer, y text);
 								 DROP TABLE conflict_test;', 'pg_class');
 
-$node_primary->wait_for_replay_catchup($node_standby);
-
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('vacuum_full_', 1, 'with vacuum FULL on pg_class');
 
@@ -656,8 +685,6 @@ wait_until_vacuum_can_remove(
 	'', 'CREATE TABLE conflict_test(x integer, y text);
 							 DROP TABLE conflict_test;', 'pg_class');
 
-$node_primary->wait_for_replay_catchup($node_standby);
-
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('row_removal_', $logstart, 'with vacuum on pg_class');
 
@@ -690,8 +717,6 @@ wait_until_vacuum_can_remove(
 	'', 'CREATE ROLE create_trash;
 							 DROP ROLE create_trash;', 'pg_authid');
 
-$node_primary->wait_for_replay_catchup($node_standby);
-
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('shared_row_removal_', $logstart,
 	'with vacuum on pg_authid');
@@ -724,8 +749,6 @@ wait_until_vacuum_can_remove(
 							 INSERT INTO conflict_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;
 							 UPDATE conflict_test set x=1, y=1;', 'conflict_test');
 
-$node_primary->wait_for_replay_catchup($node_standby);
-
 # message should not be issued
 ok( !$node_standby->log_contains(
 		"invalidating obsolete slot \"no_conflict_inactiveslot\"", $logstart),
-- 
2.43.5

