From 4505b85e6357d2588e7e379724d47599ea4be0c2 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Wed, 15 Oct 2025 16:30:23 +0800
Subject: [PATCH v20251015 1/4] Reorganize tablesync Code and Introduce
 syncutils

Reorganized the tablesync code by creating a new syncutils file.
This refactoring will facilitate the development of sequence
synchronization worker code.

This commit separates code reorganization from functional changes,
making it clearer to reviewers that only existing code has been moved.
The changes in this patch can be merged with subsequent patches during
the commit process.

Author: Vignesh C
Reviewer:  Amit Kapila, Shveta Malik, Dilip Kumar, Peter Smith, Nisha Moond
Discussion: https://www.postgresql.org/message-id/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com
---
 src/backend/catalog/pg_subscription.c         |   4 +-
 src/backend/replication/logical/Makefile      |   1 +
 .../replication/logical/applyparallelworker.c |   2 +-
 src/backend/replication/logical/meson.build   |   1 +
 src/backend/replication/logical/syncutils.c   | 188 ++++++++++++++++
 src/backend/replication/logical/tablesync.c   | 204 +++---------------
 src/backend/replication/logical/worker.c      |  22 +-
 src/bin/pg_dump/common.c                      |   4 +-
 src/bin/pg_dump/pg_dump.c                     |   8 +-
 src/bin/pg_dump/pg_dump.h                     |   2 +-
 src/include/catalog/pg_subscription_rel.h     |   2 +-
 src/include/replication/worker_internal.h     |  14 +-
 src/tools/pgindent/typedefs.list              |   2 +-
 13 files changed, 248 insertions(+), 206 deletions(-)
 create mode 100644 src/backend/replication/logical/syncutils.c

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index b885890de37..e06587b0265 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -506,13 +506,13 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
 }
 
 /*
- * Does the subscription have any relations?
+ * Does the subscription have any tables?
  *
  * Use this function only to know true/false, and when you have no need for the
  * List returned by GetSubscriptionRelations.
  */
 bool
-HasSubscriptionRelations(Oid subid)
+HasSubscriptionTables(Oid subid)
 {
 	Relation	rel;
 	ScanKeyData skey[1];
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 1e08bbbd4eb..c62c8c67521 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -28,6 +28,7 @@ OBJS = \
 	reorderbuffer.o \
 	slotsync.o \
 	snapbuild.o \
+	syncutils.o \
 	tablesync.o \
 	worker.o
 
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 33b7ec7f029..14325581afc 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -970,7 +970,7 @@ ParallelApplyWorkerMain(Datum main_arg)
 	 * the subscription relation state.
 	 */
 	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
-								  invalidate_syncing_table_states,
+								  InvalidateSyncingRelStates,
 								  (Datum) 0);
 
 	set_apply_error_context_origin(originname);
diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build
index 6f19614c79d..9283e996ef4 100644
--- a/src/backend/replication/logical/meson.build
+++ b/src/backend/replication/logical/meson.build
@@ -14,6 +14,7 @@ backend_sources += files(
   'reorderbuffer.c',
   'slotsync.c',
   'snapbuild.c',
+  'syncutils.c',
   'tablesync.c',
   'worker.c',
 )
diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c
new file mode 100644
index 00000000000..c30de1275e7
--- /dev/null
+++ b/src/backend/replication/logical/syncutils.c
@@ -0,0 +1,188 @@
+/*-------------------------------------------------------------------------
+ * syncutils.c
+ *	  PostgreSQL logical replication: common synchronization code
+ *
+ * Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/syncutils.c
+ *
+ * NOTES
+ *	  This file contains code common to table synchronization workers, and
+ *	  the sequence synchronization worker.
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "catalog/pg_subscription_rel.h"
+#include "pgstat.h"
+#include "replication/worker_internal.h"
+#include "storage/ipc.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+
+/*
+ * Enum for phases of the subscription relations state.
+ *
+ * SYNC_RELATIONS_STATE_NEEDS_REBUILD indicates that the subscription relations
+ * state is no longer valid, and the subscription relations should be rebuilt.
+ *
+ * SYNC_RELATIONS_STATE_REBUILD_STARTED indicates that the subscription
+ * relations state is being rebuilt.
+ *
+ * SYNC_RELATIONS_STATE_VALID indicates that the subscription relation state is
+ * up-to-date and valid.
+ */
+typedef enum
+{
+	SYNC_RELATIONS_STATE_NEEDS_REBUILD,
+	SYNC_RELATIONS_STATE_REBUILD_STARTED,
+	SYNC_RELATIONS_STATE_VALID,
+} SyncingRelationsState;
+
+static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
+
+/*
+ * Exit routine for synchronization worker.
+ */
+pg_noreturn void
+FinishSyncWorker(void)
+{
+	/*
+	 * Commit any outstanding transaction. This is the usual case, unless
+	 * there was nothing to do for the table.
+	 */
+	if (IsTransactionState())
+	{
+		CommitTransactionCommand();
+		pgstat_report_stat(true);
+	}
+
+	/* And flush all writes. */
+	XLogFlush(GetXLogWriteRecPtr());
+
+	StartTransactionCommand();
+	ereport(LOG,
+			(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
+					MySubscription->name,
+					get_rel_name(MyLogicalRepWorker->relid))));
+	CommitTransactionCommand();
+
+	/* Find the leader apply worker and signal it. */
+	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+
+	/* Stop gracefully */
+	proc_exit(0);
+}
+
+/*
+ * Callback from syscache invalidation.
+ */
+void
+InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
+{
+	relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
+}
+
+/*
+ * Process possible state change(s) of relations that are being synchronized.
+ */
+void
+ProcessSyncingRelations(XLogRecPtr current_lsn)
+{
+	switch (MyLogicalRepWorker->type)
+	{
+		case WORKERTYPE_PARALLEL_APPLY:
+
+			/*
+			 * Skip for parallel apply workers because they only operate on
+			 * tables that are in a READY state. See pa_can_start() and
+			 * should_apply_changes_for_rel().
+			 */
+			break;
+
+		case WORKERTYPE_TABLESYNC:
+			ProcessSyncingTablesForSync(current_lsn);
+			break;
+
+		case WORKERTYPE_APPLY:
+			ProcessSyncingTablesForApply(current_lsn);
+			break;
+
+		case WORKERTYPE_UNKNOWN:
+			/* Should never happen. */
+			elog(ERROR, "Unknown worker type");
+	}
+}
+
+/*
+ * Common code to fetch the up-to-date sync state info into the static lists.
+ *
+ * Returns true if subscription has 1 or more tables, else false.
+ *
+ * Note: If this function started the transaction (indicated by the parameter)
+ * then it is the caller's responsibility to commit it.
+ */
+bool
+FetchRelationStates(bool *started_tx)
+{
+	static bool has_subtables = false;
+
+	*started_tx = false;
+
+	if (relation_states_validity != SYNC_RELATIONS_STATE_VALID)
+	{
+		MemoryContext oldctx;
+		List	   *rstates;
+		ListCell   *lc;
+		SubscriptionRelState *rstate;
+
+		relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED;
+
+		/* Clean the old lists. */
+		list_free_deep(relation_states_not_ready);
+		relation_states_not_ready = NIL;
+
+		if (!IsTransactionState())
+		{
+			StartTransactionCommand();
+			*started_tx = true;
+		}
+
+		/* Fetch tables and sequences that are in non-ready state. */
+		rstates = GetSubscriptionRelations(MySubscription->oid, true);
+
+		/* Allocate the tracking info in a permanent memory context. */
+		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+		foreach(lc, rstates)
+		{
+			rstate = palloc(sizeof(SubscriptionRelState));
+			memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
+			relation_states_not_ready = lappend(relation_states_not_ready, rstate);
+		}
+		MemoryContextSwitchTo(oldctx);
+
+		/*
+		 * Does the subscription have tables?
+		 *
+		 * If there were not-READY tables found then we know it does. But if
+		 * relation_states_not_ready was empty we still need to check again to
+		 * see if there are 0 tables.
+		 */
+		has_subtables = (relation_states_not_ready != NIL) ||
+			HasSubscriptionTables(MySubscription->oid);
+
+		/*
+		 * If the subscription relation cache has been invalidated since we
+		 * entered this routine, we still use and return the relations we just
+		 * finished constructing, to avoid infinite loops, but we leave the
+		 * table states marked as stale so that we'll rebuild it again on next
+		 * access. Otherwise, we mark the table states as valid.
+		 */
+		if (relation_states_validity == SYNC_RELATIONS_STATE_REBUILD_STARTED)
+			relation_states_validity = SYNC_RELATIONS_STATE_VALID;
+	}
+
+	return has_subtables;
+}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e6da4028d39..921395c2409 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -117,58 +117,15 @@
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
-#include "utils/memutils.h"
 #include "utils/rls.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 #include "utils/usercontext.h"
 
-typedef enum
-{
-	SYNC_TABLE_STATE_NEEDS_REBUILD,
-	SYNC_TABLE_STATE_REBUILD_STARTED,
-	SYNC_TABLE_STATE_VALID,
-} SyncingTablesState;
-
-static SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
-static List *table_states_not_ready = NIL;
-static bool FetchTableStates(bool *started_tx);
+List	   *relation_states_not_ready = NIL;
 
 static StringInfo copybuf = NULL;
 
-/*
- * Exit routine for synchronization worker.
- */
-pg_noreturn static void
-finish_sync_worker(void)
-{
-	/*
-	 * Commit any outstanding transaction. This is the usual case, unless
-	 * there was nothing to do for the table.
-	 */
-	if (IsTransactionState())
-	{
-		CommitTransactionCommand();
-		pgstat_report_stat(true);
-	}
-
-	/* And flush all writes. */
-	XLogFlush(GetXLogWriteRecPtr());
-
-	StartTransactionCommand();
-	ereport(LOG,
-			(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
-					MySubscription->name,
-					get_rel_name(MyLogicalRepWorker->relid))));
-	CommitTransactionCommand();
-
-	/* Find the leader apply worker and signal it. */
-	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
-
-	/* Stop gracefully */
-	proc_exit(0);
-}
-
 /*
  * Wait until the relation sync state is set in the catalog to the expected
  * one; return true when it happens.
@@ -180,7 +137,7 @@ finish_sync_worker(void)
  * CATCHUP state to SYNCDONE.
  */
 static bool
-wait_for_relation_state_change(Oid relid, char expected_state)
+wait_for_table_state_change(Oid relid, char expected_state)
 {
 	char		state;
 
@@ -273,15 +230,6 @@ wait_for_worker_state_change(char expected_state)
 	return false;
 }
 
-/*
- * Callback from syscache invalidation.
- */
-void
-invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
-{
-	table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
-}
-
 /*
  * Handle table synchronization cooperation from the synchronization
  * worker.
@@ -290,8 +238,8 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
  * predetermined synchronization point in the WAL stream, mark the table as
  * SYNCDONE and finish.
  */
-static void
-process_syncing_tables_for_sync(XLogRecPtr current_lsn)
+void
+ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
 {
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 
@@ -349,9 +297,9 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 
 		/*
 		 * Start a new transaction to clean up the tablesync origin tracking.
-		 * This transaction will be ended within the finish_sync_worker().
-		 * Now, even, if we fail to remove this here, the apply worker will
-		 * ensure to clean it up afterward.
+		 * This transaction will be ended within the FinishSyncWorker(). Now,
+		 * even, if we fail to remove this here, the apply worker will ensure
+		 * to clean it up afterward.
 		 *
 		 * We need to do this after the table state is set to SYNCDONE.
 		 * Otherwise, if an error occurs while performing the database
@@ -387,7 +335,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		 */
 		replorigin_drop_by_name(originname, true, false);
 
-		finish_sync_worker();
+		FinishSyncWorker();
 	}
 	else
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
@@ -414,8 +362,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
  * If the synchronization position is reached (SYNCDONE), then the table can
  * be marked as READY and is no longer tracked.
  */
-static void
-process_syncing_tables_for_apply(XLogRecPtr current_lsn)
+void
+ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
 {
 	struct tablesync_start_time_mapping
 	{
@@ -431,14 +379,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	Assert(!IsTransactionState());
 
 	/* We need up-to-date sync state info for subscription tables here. */
-	FetchTableStates(&started_tx);
+	FetchRelationStates(&started_tx);
 
 	/*
 	 * Prepare a hash table for tracking last start times of workers, to avoid
 	 * immediate restarts.  We don't need it if there are no tables that need
 	 * syncing.
 	 */
-	if (table_states_not_ready != NIL && !last_start_times)
+	if (relation_states_not_ready != NIL && !last_start_times)
 	{
 		HASHCTL		ctl;
 
@@ -452,7 +400,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	 * Clean up the hash table when we're done with all tables (just to
 	 * release the bit of memory).
 	 */
-	else if (table_states_not_ready == NIL && last_start_times)
+	else if (relation_states_not_ready == NIL && last_start_times)
 	{
 		hash_destroy(last_start_times);
 		last_start_times = NULL;
@@ -461,7 +409,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	/*
 	 * Process all tables that are being synchronized.
 	 */
-	foreach(lc, table_states_not_ready)
+	foreach(lc, relation_states_not_ready)
 	{
 		SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
 
@@ -586,8 +534,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 					StartTransactionCommand();
 					started_tx = true;
 
-					wait_for_relation_state_change(rstate->relid,
-												   SUBREL_STATE_SYNCDONE);
+					wait_for_table_state_change(rstate->relid,
+												SUBREL_STATE_SYNCDONE);
 				}
 				else
 					LWLockRelease(LogicalRepWorkerLock);
@@ -689,37 +637,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	}
 }
 
-/*
- * Process possible state change(s) of tables that are being synchronized.
- */
-void
-process_syncing_tables(XLogRecPtr current_lsn)
-{
-	switch (MyLogicalRepWorker->type)
-	{
-		case WORKERTYPE_PARALLEL_APPLY:
-
-			/*
-			 * Skip for parallel apply workers because they only operate on
-			 * tables that are in a READY state. See pa_can_start() and
-			 * should_apply_changes_for_rel().
-			 */
-			break;
-
-		case WORKERTYPE_TABLESYNC:
-			process_syncing_tables_for_sync(current_lsn);
-			break;
-
-		case WORKERTYPE_APPLY:
-			process_syncing_tables_for_apply(current_lsn);
-			break;
-
-		case WORKERTYPE_UNKNOWN:
-			/* Should never happen. */
-			elog(ERROR, "Unknown worker type");
-	}
-}
-
 /*
  * Create list of columns for COPY based on logical relation mapping.
  */
@@ -1356,7 +1273,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		case SUBREL_STATE_SYNCDONE:
 		case SUBREL_STATE_READY:
 		case SUBREL_STATE_UNKNOWN:
-			finish_sync_worker();	/* doesn't return */
+			FinishSyncWorker(); /* doesn't return */
 	}
 
 	/* Calculate the name of the tablesync slot. */
@@ -1599,77 +1516,6 @@ copy_table_done:
 	return slotname;
 }
 
-/*
- * Common code to fetch the up-to-date sync state info into the static lists.
- *
- * Returns true if subscription has 1 or more tables, else false.
- *
- * Note: If this function started the transaction (indicated by the parameter)
- * then it is the caller's responsibility to commit it.
- */
-static bool
-FetchTableStates(bool *started_tx)
-{
-	static bool has_subrels = false;
-
-	*started_tx = false;
-
-	if (table_states_validity != SYNC_TABLE_STATE_VALID)
-	{
-		MemoryContext oldctx;
-		List	   *rstates;
-		ListCell   *lc;
-		SubscriptionRelState *rstate;
-
-		table_states_validity = SYNC_TABLE_STATE_REBUILD_STARTED;
-
-		/* Clean the old lists. */
-		list_free_deep(table_states_not_ready);
-		table_states_not_ready = NIL;
-
-		if (!IsTransactionState())
-		{
-			StartTransactionCommand();
-			*started_tx = true;
-		}
-
-		/* Fetch all non-ready tables. */
-		rstates = GetSubscriptionRelations(MySubscription->oid, true);
-
-		/* Allocate the tracking info in a permanent memory context. */
-		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
-		foreach(lc, rstates)
-		{
-			rstate = palloc(sizeof(SubscriptionRelState));
-			memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
-			table_states_not_ready = lappend(table_states_not_ready, rstate);
-		}
-		MemoryContextSwitchTo(oldctx);
-
-		/*
-		 * Does the subscription have tables?
-		 *
-		 * If there were not-READY relations found then we know it does. But
-		 * if table_states_not_ready was empty we still need to check again to
-		 * see if there are 0 tables.
-		 */
-		has_subrels = (table_states_not_ready != NIL) ||
-			HasSubscriptionRelations(MySubscription->oid);
-
-		/*
-		 * If the subscription relation cache has been invalidated since we
-		 * entered this routine, we still use and return the relations we just
-		 * finished constructing, to avoid infinite loops, but we leave the
-		 * table states marked as stale so that we'll rebuild it again on next
-		 * access. Otherwise, we mark the table states as valid.
-		 */
-		if (table_states_validity == SYNC_TABLE_STATE_REBUILD_STARTED)
-			table_states_validity = SYNC_TABLE_STATE_VALID;
-	}
-
-	return has_subrels;
-}
-
 /*
  * Execute the initial sync with error handling. Disable the subscription,
  * if it's required.
@@ -1755,7 +1601,7 @@ TablesyncWorkerMain(Datum main_arg)
 
 	run_tablesync_worker();
 
-	finish_sync_worker();
+	FinishSyncWorker();
 }
 
 /*
@@ -1773,7 +1619,7 @@ AllTablesyncsReady(void)
 	bool		has_subrels = false;
 
 	/* We need up-to-date sync state info for subscription tables here. */
-	has_subrels = FetchTableStates(&started_tx);
+	has_subrels = FetchRelationStates(&started_tx);
 
 	if (started_tx)
 	{
@@ -1785,25 +1631,25 @@ AllTablesyncsReady(void)
 	 * Return false when there are no tables in subscription or not all tables
 	 * are in ready state; true otherwise.
 	 */
-	return has_subrels && (table_states_not_ready == NIL);
+	return has_subrels && (relation_states_not_ready == NIL);
 }
 
 /*
- * Return whether the subscription currently has any relations.
+ * Return whether the subscription currently has any tables.
  *
- * Note: Unlike HasSubscriptionRelations(), this function relies on cached
- * information for subscription relations. Additionally, it should not be
+ * Note: Unlike HasSubscriptionTables(), this function relies on cached
+ * information for subscription tables. Additionally, it should not be
  * invoked outside of apply or tablesync workers, as MySubscription must be
  * initialized first.
  */
 bool
-HasSubscriptionRelationsCached(void)
+HasSubscriptionTablesCached(void)
 {
 	bool		started_tx;
 	bool		has_subrels;
 
 	/* We need up-to-date subscription tables info here */
-	has_subrels = FetchTableStates(&started_tx);
+	has_subrels = FetchRelationStates(&started_tx);
 
 	if (started_tx)
 	{
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 419e478b4c6..3c58ad88476 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -91,7 +91,7 @@
  * behave as if two_phase = off. When the apply worker detects that all
  * tablesyncs have become READY (while the tri-state was PENDING) it will
  * restart the apply worker process. This happens in
- * process_syncing_tables_for_apply.
+ * ProcessSyncingTablesForApply.
  *
  * When the (re-started) apply worker finds that all tablesyncs are READY for a
  * two_phase tri-state of PENDING it start streaming messages with the
@@ -1243,7 +1243,7 @@ apply_handle_commit(StringInfo s)
 	apply_handle_commit_internal(&commit_data);
 
 	/* Process any tables that are being synchronized in parallel. */
-	process_syncing_tables(commit_data.end_lsn);
+	ProcessSyncingRelations(commit_data.end_lsn);
 
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
@@ -1365,7 +1365,7 @@ apply_handle_prepare(StringInfo s)
 	in_remote_transaction = false;
 
 	/* Process any tables that are being synchronized in parallel. */
-	process_syncing_tables(prepare_data.end_lsn);
+	ProcessSyncingRelations(prepare_data.end_lsn);
 
 	/*
 	 * Since we have already prepared the transaction, in a case where the
@@ -1421,7 +1421,7 @@ apply_handle_commit_prepared(StringInfo s)
 	in_remote_transaction = false;
 
 	/* Process any tables that are being synchronized in parallel. */
-	process_syncing_tables(prepare_data.end_lsn);
+	ProcessSyncingRelations(prepare_data.end_lsn);
 
 	clear_subscription_skip_lsn(prepare_data.end_lsn);
 
@@ -1487,7 +1487,7 @@ apply_handle_rollback_prepared(StringInfo s)
 	in_remote_transaction = false;
 
 	/* Process any tables that are being synchronized in parallel. */
-	process_syncing_tables(rollback_data.rollback_end_lsn);
+	ProcessSyncingRelations(rollback_data.rollback_end_lsn);
 
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
@@ -1622,7 +1622,7 @@ apply_handle_stream_prepare(StringInfo s)
 	pgstat_report_stat(false);
 
 	/* Process any tables that are being synchronized in parallel. */
-	process_syncing_tables(prepare_data.end_lsn);
+	ProcessSyncingRelations(prepare_data.end_lsn);
 
 	/*
 	 * Similar to prepare case, the subskiplsn could be left in a case of
@@ -2464,7 +2464,7 @@ apply_handle_stream_commit(StringInfo s)
 	}
 
 	/* Process any tables that are being synchronized in parallel. */
-	process_syncing_tables(commit_data.end_lsn);
+	ProcessSyncingRelations(commit_data.end_lsn);
 
 	pgstat_report_activity(STATE_IDLE, NULL);
 
@@ -4133,7 +4133,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 			maybe_reread_subscription();
 
 			/* Process any table synchronization changes. */
-			process_syncing_tables(last_received);
+			ProcessSyncingRelations(last_received);
 		}
 
 		/* Cleanup the memory. */
@@ -4623,7 +4623,7 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
 	 * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
 	 * concurrently add tables to the subscription, the apply worker may not
 	 * process invalidations in time. Consequently,
-	 * HasSubscriptionRelationsCached() might miss the new tables, leading to
+	 * HasSubscriptionTablesCached() might miss the new tables, leading to
 	 * premature advancement of oldest_nonremovable_xid.
 	 *
 	 * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
@@ -4637,7 +4637,7 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
 	 * subscription tables at this stage to prevent unnecessary tuple
 	 * retention.
 	 */
-	if (HasSubscriptionRelationsCached() && !AllTablesyncsReady())
+	if (HasSubscriptionTablesCached() && !AllTablesyncsReady())
 	{
 		TimestampTz now;
 
@@ -5876,7 +5876,7 @@ SetupApplyOrSyncWorker(int worker_slot)
 	 * the subscription relation state.
 	 */
 	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
-								  invalidate_syncing_table_states,
+								  InvalidateSyncingRelStates,
 								  (Datum) 0);
 }
 
diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c
index a1976fae607..4e7303ea631 100644
--- a/src/bin/pg_dump/common.c
+++ b/src/bin/pg_dump/common.c
@@ -244,8 +244,8 @@ getSchemaData(Archive *fout, int *numTablesPtr)
 	pg_log_info("reading subscriptions");
 	getSubscriptions(fout);
 
-	pg_log_info("reading subscription membership of tables");
-	getSubscriptionTables(fout);
+	pg_log_info("reading subscription membership of relations");
+	getSubscriptionRelations(fout);
 
 	free(inhinfo);				/* not needed any longer */
 
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 641bece12c7..890db7b08c2 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5305,12 +5305,12 @@ getSubscriptions(Archive *fout)
 }
 
 /*
- * getSubscriptionTables
- *	  Get information about subscription membership for dumpable tables. This
+ * getSubscriptionRelations
+ *	  Get information about subscription membership for dumpable relations. This
  *    will be used only in binary-upgrade mode for PG17 or later versions.
  */
 void
-getSubscriptionTables(Archive *fout)
+getSubscriptionRelations(Archive *fout)
 {
 	DumpOptions *dopt = fout->dopt;
 	SubscriptionInfo *subinfo = NULL;
@@ -5364,7 +5364,7 @@ getSubscriptionTables(Archive *fout)
 
 		tblinfo = findTableByOid(relid);
 		if (tblinfo == NULL)
-			pg_fatal("failed sanity check, table with OID %u not found",
+			pg_fatal("failed sanity check, relation with OID %u not found",
 					 relid);
 
 		/* OK, make a DumpableObject for this relationship */
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index fa6d1a510f7..72a00e1bc20 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -829,6 +829,6 @@ extern void getPublicationNamespaces(Archive *fout);
 extern void getPublicationTables(Archive *fout, TableInfo tblinfo[],
 								 int numTables);
 extern void getSubscriptions(Archive *fout);
-extern void getSubscriptionTables(Archive *fout);
+extern void getSubscriptionRelations(Archive *fout);
 
 #endif							/* PG_DUMP_H */
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 02f97a547dd..61b63c6bb7a 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -89,7 +89,7 @@ extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 
-extern bool HasSubscriptionRelations(Oid subid);
+extern bool HasSubscriptionTables(Oid subid);
 extern List *GetSubscriptionRelations(Oid subid, bool not_ready);
 
 extern void UpdateDeadTupleRetentionStatus(Oid subid, bool active);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index de003802612..5f11c4de217 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -251,6 +251,8 @@ extern PGDLLIMPORT bool in_remote_transaction;
 
 extern PGDLLIMPORT bool InitializingApplyWorker;
 
+extern PGDLLIMPORT List *relation_states_not_ready;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
@@ -272,12 +274,16 @@ extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 											   char *originname, Size szoriginname);
 
 extern bool AllTablesyncsReady(void);
-extern bool HasSubscriptionRelationsCached(void);
+extern bool HasSubscriptionTablesCached(void);
 extern void UpdateTwoPhaseState(Oid suboid, char new_state);
 
-extern void process_syncing_tables(XLogRecPtr current_lsn);
-extern void invalidate_syncing_table_states(Datum arg, int cacheid,
-											uint32 hashvalue);
+extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn);
+extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn);
+
+pg_noreturn extern void FinishSyncWorker(void);
+extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue);
+extern void ProcessSyncingRelations(XLogRecPtr current_lsn);
+extern bool FetchRelationStates(bool *started_tx);
 
 extern void stream_start_internal(TransactionId xid, bool first_segment);
 extern void stream_stop_internal(TransactionId xid);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 5290b91e83e..ee1cab6190f 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2922,7 +2922,7 @@ SyncRepStandbyData
 SyncRequestHandler
 SyncRequestType
 SyncStandbySlotsConfigData
-SyncingTablesState
+SyncingRelationsState
 SysFKRelationship
 SysScanDesc
 SyscacheCallbackFunction
-- 
2.31.1

