From b0e09eec056e6ecaf9d7bea7b22af278a62ead61 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Mon, 14 Aug 2023 16:21:45 +1000
Subject: [PATCH v8] Switch on worker type.

---
 src/backend/replication/logical/launcher.c  | 56 +++++++++++++++--------------
 src/backend/replication/logical/tablesync.c | 31 ++++++++++------
 src/backend/replication/logical/worker.c    | 42 +++++++++++++---------
 3 files changed, 75 insertions(+), 54 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7cc0a16..2119bf6 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -468,39 +468,43 @@ retry:
 	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
 	snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
 
-	if (is_parallel_apply_worker)
+	switch (worker->type)
 	{
-		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
-		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication parallel apply worker for subscription %u",
-				 subid);
-		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
-	}
-	else if (is_tablesync_worker)
-	{
-		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
-		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication tablesync worker for subscription %u sync %u",
-				 subid,
-				 relid);
-		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
-	}
-	else
-	{
-		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
-		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication apply worker for subscription %u",
-				 subid);
-		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
+		case WORKERTYPE_APPLY:
+			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
+			snprintf(bgw.bgw_name, BGW_MAXLEN,
+					 "logical replication apply worker for subscription %u",
+					 subid);
+			snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
+			break;
+
+		case WORKERTYPE_PARALLEL_APPLY:
+			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
+			snprintf(bgw.bgw_name, BGW_MAXLEN,
+					 "logical replication parallel apply worker for subscription %u",
+					 subid);
+			snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
+
+			memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
+			break;
+
+		case WORKERTYPE_TABLESYNC:
+			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
+			snprintf(bgw.bgw_name, BGW_MAXLEN,
+					 "logical replication tablesync worker for subscription %u sync %u",
+					 subid,
+					 relid);
+			snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
+			break;
+
+		case WORKERTYPE_UNKNOWN:
+			ereport(ERROR, errmsg_internal("logicalrep_worker_launch: unknown worker type"));
 	}
 
 	bgw.bgw_restart_time = BGW_NEVER_RESTART;
 	bgw.bgw_notify_pid = MyProcPid;
 	bgw.bgw_main_arg = Int32GetDatum(slot);
 
-	if (is_parallel_apply_worker)
-		memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
-
 	if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
 	{
 		/* Failed to start worker, so clean up the worker slot. */
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 67bdd14..89d9cec 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -649,18 +649,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 void
 process_syncing_tables(XLogRecPtr current_lsn)
 {
-	/*
-	 * Skip for parallel apply workers because they only operate on tables
-	 * that are in a READY state. See pa_can_start() and
-	 * should_apply_changes_for_rel().
-	 */
-	if (am_parallel_apply_worker())
-		return;
+	switch (MyLogicalRepWorker->type)
+	{
+		case WORKERTYPE_PARALLEL_APPLY:
+			/*
+			 * Skip for parallel apply workers because they only operate on tables
+			 * that are in a READY state. See pa_can_start() and
+			 * should_apply_changes_for_rel().
+			 */
+			break;
 
-	if (am_tablesync_worker())
-		process_syncing_tables_for_sync(current_lsn);
-	else
-		process_syncing_tables_for_apply(current_lsn);
+		case WORKERTYPE_TABLESYNC:
+			process_syncing_tables_for_sync(current_lsn);
+			break;
+
+		case WORKERTYPE_APPLY:
+			process_syncing_tables_for_apply(current_lsn);
+			break;
+
+		case WORKERTYPE_UNKNOWN:
+			ereport(ERROR, errmsg_internal("process_syncing_tables: Unknown worker type"));
+	}
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index a20d4c1..f596366 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -485,25 +485,33 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 static bool
 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 {
-	if (am_tablesync_worker())
-		return MyLogicalRepWorker->relid == rel->localreloid;
-	else if (am_parallel_apply_worker())
+	switch (MyLogicalRepWorker->type)
 	{
-		/* We don't synchronize rel's that are in unknown state. */
-		if (rel->state != SUBREL_STATE_READY &&
-			rel->state != SUBREL_STATE_UNKNOWN)
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
-							MySubscription->name),
-					 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
+		case WORKERTYPE_TABLESYNC:
+			return MyLogicalRepWorker->relid == rel->localreloid;
+
+		case WORKERTYPE_PARALLEL_APPLY:
+			/* We don't synchronize rel's that are in unknown state. */
+			if (rel->state != SUBREL_STATE_READY &&
+				rel->state != SUBREL_STATE_UNKNOWN)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
+								MySubscription->name),
+						 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
+
+			return rel->state == SUBREL_STATE_READY;
+
+		case WORKERTYPE_APPLY:
+			return (rel->state == SUBREL_STATE_READY ||
+					(rel->state == SUBREL_STATE_SYNCDONE &&
+					 rel->statelsn <= remote_final_lsn));
+
+		case WORKERTYPE_UNKNOWN:
+			ereport(ERROR, errmsg_internal("should_apply_changes_for_rel: Unknown worker type"));
+		}
 
-		return rel->state == SUBREL_STATE_READY;
-	}
-	else
-		return (rel->state == SUBREL_STATE_READY ||
-				(rel->state == SUBREL_STATE_SYNCDONE &&
-				 rel->statelsn <= remote_final_lsn));
+	return false; /* dummy for compiler */
 }
 
 /*
-- 
1.8.3.1

