From 8e083d7e7f34039fbedc5d306d088eca4744f6df Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@cn.fujitsu.com>
Date: Thu, 8 May 2025 10:53:53 +0800
Subject: [PATCH v1] Injection points to reproduce the confirmed_flush issue.

---
 src/backend/postmaster/bgwriter.c           |  2 +-
 src/backend/replication/logical/logical.c   | 11 +++++++++++
 src/backend/replication/logical/tablesync.c |  3 +++
 src/backend/replication/logical/worker.c    |  6 ++++++
 src/backend/replication/walsender.c         |  4 ++++
 5 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index 72f5acceec7..3bbe8ff6ab2 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -67,7 +67,7 @@ int			BgWriterDelay = 200;
  * Interval in which standby snapshots are logged into the WAL stream, in
  * milliseconds.
  */
-#define LOG_SNAPSHOT_INTERVAL_MS 15000
+#define LOG_SNAPSHOT_INTERVAL_MS 1500000
 
 /*
  * LSN and timestamp at which we last issued a LogStandbySnapshot(), to avoid
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index a8d2e024d34..ff498c6b539 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1828,6 +1828,11 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 
 		SpinLockAcquire(&MyReplicationSlot->mutex);
 
+		if (lsn < MyReplicationSlot->data.confirmed_flush)
+			elog(LOG, "confirmed_flush moved backwards from %X/%X to %X/%X",
+				LSN_FORMAT_ARGS(MyReplicationSlot->data.confirmed_flush),
+				LSN_FORMAT_ARGS(lsn));
+
 		MyReplicationSlot->data.confirmed_flush = lsn;
 
 		/* if we're past the location required for bumping xmin, do so */
@@ -1893,6 +1898,12 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 	else
 	{
 		SpinLockAcquire(&MyReplicationSlot->mutex);
+
+		if (lsn < MyReplicationSlot->data.confirmed_flush)
+			elog(LOG, "confirmed_flush moved backwards from %X/%X to %X/%X",
+				LSN_FORMAT_ARGS(MyReplicationSlot->data.confirmed_flush),
+				LSN_FORMAT_ARGS(lsn));
+
 		MyReplicationSlot->data.confirmed_flush = lsn;
 		SpinLockRelease(&MyReplicationSlot->mutex);
 	}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 8e1e8762f62..f0c2cad40a4 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -122,6 +122,7 @@
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 #include "utils/usercontext.h"
+#include "utils/injection_point.h"
 
 typedef enum
 {
@@ -1551,6 +1552,8 @@ copy_table_done:
 		 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
 		 originname, LSN_FORMAT_ARGS(*origin_startpos));
 
+	if (MyLogicalRepWorker->relid % 2 == 0)
+		INJECTION_POINT("table-sync-wait-2", NULL);
 	/*
 	 * We are done with the initial data synchronization, update the state.
 	 */
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 4151a4b2a96..9fc967b8463 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -192,6 +192,7 @@
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 #include "utils/usercontext.h"
+#include "utils/injection_point.h"
 
 #define NAPTIME_PER_CYCLE 1000	/* max sleep time between cycles (1s) */
 
@@ -2451,6 +2452,8 @@ apply_handle_insert(StringInfo s)
 	slot_fill_defaults(rel, estate, remoteslot);
 	MemoryContextSwitchTo(oldctx);
 
+	elog(LOG, "handle remote insert");
+
 	/* For a partitioned table, insert the tuple into a partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 		apply_handle_tuple_routing(edata,
@@ -3725,6 +3728,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 			 * now.
 			 */
 			AcceptInvalidationMessages();
+
+			if (am_leader_apply_worker())
+				INJECTION_POINT("reread-sub", NULL);
 			maybe_reread_subscription();
 
 			/* Process any table synchronization changes. */
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9fa8beb6103..f41fc398f9f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -95,6 +95,7 @@
 #include "utils/ps_status.h"
 #include "utils/timeout.h"
 #include "utils/timestamp.h"
+#include "utils/injection_point.h"
 
 /* Minimum interval used by walsender for stats flushes, in ms */
 #define WALSENDER_STATS_FLUSH_INTERVAL         1000
@@ -2810,6 +2811,9 @@ WalSndLoop(WalSndSendDataCallback send_data)
 			SyncRepInitConfig();
 		}
 
+		if (send_data == XLogSendLogical)
+			INJECTION_POINT("process-replies", NULL);
+
 		/* Check for input from the client */
 		ProcessRepliesIfAny();
 
-- 
2.34.1

