From a5e6a3130e8e135ed2f88e0e47cb74c953dc0383 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Fri, 28 Jul 2023 19:25:29 +1000
Subject: [PATCH v1] Switch on worker type

---
 src/backend/replication/logical/tablesync.c | 28 +++++++++++++--------
 src/backend/replication/logical/worker.c    | 39 ++++++++++++++++-------------
 2 files changed, 39 insertions(+), 28 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6d46165..867105a 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -647,18 +647,24 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 void
 process_syncing_tables(XLogRecPtr current_lsn)
 {
-	/*
-	 * Skip for parallel apply workers because they only operate on tables
-	 * that are in a READY state. See pa_can_start() and
-	 * should_apply_changes_for_rel().
-	 */
-	if (am_parallel_apply_worker())
-		return;
+	switch (MyLogicalRepWorker->type)
+	{
+		case LR_WORKER_TABLESYNC:
+			process_syncing_tables_for_sync(current_lsn);
+			break;
 
-	if (am_tablesync_worker())
-		process_syncing_tables_for_sync(current_lsn);
-	else
-		process_syncing_tables_for_apply(current_lsn);
+		case LR_WORKER_APPLY:
+			process_syncing_tables_for_apply(current_lsn);
+			break;
+
+		case LR_WORKER_APPLY_PARALLEL:
+			/*
+			 * Skip for parallel apply workers because they only operate on tables
+			 * that are in a READY state. See pa_can_start() and
+			 * should_apply_changes_for_rel().
+			 */
+			break;
+	}
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index dd353fd..e3dc861 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -486,25 +486,30 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 static bool
 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 {
-	if (am_tablesync_worker())
-		return MyLogicalRepWorker->relid == rel->localreloid;
-	else if (am_parallel_apply_worker())
+	switch (MyLogicalRepWorker->type)
 	{
-		/* We don't synchronize rel's that are in unknown state. */
-		if (rel->state != SUBREL_STATE_READY &&
-			rel->state != SUBREL_STATE_UNKNOWN)
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
-							MySubscription->name),
-					 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
-
-		return rel->state == SUBREL_STATE_READY;
+		case LR_WORKER_TABLESYNC:
+			return MyLogicalRepWorker->relid == rel->localreloid;
+
+		case LR_WORKER_APPLY_PARALLEL:
+			/* We don't synchronize rel's that are in unknown state. */
+			if (rel->state != SUBREL_STATE_READY &&
+				rel->state != SUBREL_STATE_UNKNOWN)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
+								MySubscription->name),
+						 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
+
+			return rel->state == SUBREL_STATE_READY;
+
+		case LR_WORKER_APPLY:
+			return (rel->state == SUBREL_STATE_READY ||
+					(rel->state == SUBREL_STATE_SYNCDONE &&
+					 rel->statelsn <= remote_final_lsn));
 	}
-	else
-		return (rel->state == SUBREL_STATE_READY ||
-				(rel->state == SUBREL_STATE_SYNCDONE &&
-				 rel->statelsn <= remote_final_lsn));
+
+	return false; /* dummy for compiler */
 }
 
 /*
-- 
1.8.3.1

