From 86f162871f612f2f533c0d0e7574f02ac08e6321 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@cn.fujitsu.com>
Date: Thu, 8 May 2025 10:53:53 +0800
Subject: [PATCH v222] 2 injection points

---
 src/backend/replication/logical/tablesync.c | 14 ++++++++++++++
 src/backend/replication/logical/worker.c    |  6 ++++++
 src/backend/replication/walsender.c         |  4 ++++
 3 files changed, 24 insertions(+)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 8e1e8762f62..4e8626f5a61 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -122,6 +122,7 @@
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 #include "utils/usercontext.h"
+#include "utils/injection_point.h"
 
 typedef enum
 {
@@ -293,6 +294,15 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
 static void
 process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 {
+	if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
+		current_lsn >= MyLogicalRepWorker->relstate_lsn)
+	{
+		if (MyLogicalRepWorker->relid % 2 == 0)
+			INJECTION_POINT("table-sync-done-2");
+		else
+			INJECTION_POINT("table-sync-done-1");
+	}
+
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 
 	if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
@@ -1551,6 +1561,10 @@ copy_table_done:
 		 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
 		 originname, LSN_FORMAT_ARGS(*origin_startpos));
 
+	if (MyLogicalRepWorker->relid % 2 == 0)
+		INJECTION_POINT("table-sync-wait-2");
+	else
+		INJECTION_POINT("table-sync-wait-1");
 	/*
 	 * We are done with the initial data synchronization, update the state.
 	 */
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 4151a4b2a96..d269b69df77 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -192,6 +192,7 @@
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 #include "utils/usercontext.h"
+#include "utils/injection_point.h"
 
 #define NAPTIME_PER_CYCLE 1000	/* max sleep time between cycles (1s) */
 
@@ -2451,6 +2452,9 @@ apply_handle_insert(StringInfo s)
 	slot_fill_defaults(rel, estate, remoteslot);
 	MemoryContextSwitchTo(oldctx);
 
+	if (am_leader_apply_worker())
+		elog(LOG, "handle remote insert");
+
 	/* For a partitioned table, insert the tuple into a partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 		apply_handle_tuple_routing(edata,
@@ -4589,6 +4593,8 @@ run_apply_worker()
 	must_use_password = MySubscription->passwordrequired &&
 		!MySubscription->ownersuperuser;
 
+	INJECTION_POINT("connecting-walsender");
+
 	LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
 											true, must_use_password,
 											MySubscription->name, &err);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9fa8beb6103..c74dea40815 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -95,6 +95,7 @@
 #include "utils/ps_status.h"
 #include "utils/timeout.h"
 #include "utils/timestamp.h"
+#include "utils/injection_point.h"
 
 /* Minimum interval used by walsender for stats flushes, in ms */
 #define WALSENDER_STATS_FLUSH_INTERVAL         1000
@@ -2810,6 +2811,9 @@ WalSndLoop(WalSndSendDataCallback send_data)
 			SyncRepInitConfig();
 		}
 
+		if (send_data == XLogSendLogical)
+			INJECTION_POINT("process-replies");
+
 		/* Check for input from the client */
 		ProcessRepliesIfAny();
 
-- 
2.30.0.windows.2

