From 1900bb2c1305b59713d66fa6194515cfdbf16c93 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Tue, 8 Aug 2023 17:53:42 +1000
Subject: [PATCH v4] Switch on worker type.

Now that the LogicalRepWorker has an enum type we can code to switch on that type,
which should be more efficient than cascading if/else.
---
 src/backend/replication/logical/launcher.c  | 51 ++++++++++++++++-------------
 src/backend/replication/logical/tablesync.c | 31 +++++++++++-------
 src/backend/replication/logical/worker.c    | 42 ++++++++++++++----------
 3 files changed, 73 insertions(+), 51 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7dc078a..45a7739 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -468,30 +468,35 @@ retry:
 	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
 	snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
 
-	if (is_parallel_apply_wkr)
-	{
-		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
-		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication parallel apply worker for subscription %u",
-				 subid);
-		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
-	}
-	else if (is_tablesync_wkr)
+	switch (wtype)
 	{
-		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
-		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication tablesync worker for subscription %u sync %u",
-				 subid,
-				 relid);
-		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
-	}
-	else
-	{
-		snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
-		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication apply worker for subscription %u",
-				 subid);
-		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
+		case WORKERTYPE_APPLY:
+			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
+			snprintf(bgw.bgw_name, BGW_MAXLEN,
+					 "logical replication apply worker for subscription %u",
+					 subid);
+			snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
+			break;
+
+		case WORKERTYPE_PARALLEL_APPLY:
+			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
+			snprintf(bgw.bgw_name, BGW_MAXLEN,
+					 "logical replication parallel apply worker for subscription %u",
+					 subid);
+			snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
+			break;
+
+		case WORKERTYPE_TABLESYNC:
+			snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
+			snprintf(bgw.bgw_name, BGW_MAXLEN,
+					 "logical replication tablesync worker for subscription %u sync %u",
+					 subid,
+					 relid);
+			snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
+			break;
+
+		case WORKERTYPE_UNKNOWN:
+			ereport(ERROR, errmsg_internal("logicalrep_worker_launch: unknown worker type"));
 	}
 
 	bgw.bgw_restart_time = BGW_NEVER_RESTART;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 25343be..0b12461 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -649,18 +649,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 void
 process_syncing_tables(XLogRecPtr current_lsn)
 {
-	/*
-	 * Skip for parallel apply workers because they only operate on tables
-	 * that are in a READY state. See pa_can_start() and
-	 * should_apply_changes_for_rel().
-	 */
-	if (is_parallel_apply_worker(MyLogicalRepWorker))
-		return;
+	switch (MyLogicalRepWorker->type)
+	{
+		case WORKERTYPE_TABLESYNC:
+			process_syncing_tables_for_sync(current_lsn);
+			break;
 
-	if (is_tablesync_worker(MyLogicalRepWorker))
-		process_syncing_tables_for_sync(current_lsn);
-	else
-		process_syncing_tables_for_apply(current_lsn);
+		case WORKERTYPE_APPLY:
+			process_syncing_tables_for_apply(current_lsn);
+			break;
+
+		case WORKERTYPE_PARALLEL_APPLY:
+			/*
+			 * Skip for parallel apply workers because they only operate on tables
+			 * that are in a READY state. See pa_can_start() and
+			 * should_apply_changes_for_rel().
+			 */
+			break;
+
+		case WORKERTYPE_UNKNOWN:
+			ereport(ERROR, errmsg_internal("process_syncing_tables: Unknown worker type"));
+	}
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 82b09ff..93188f5 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -485,25 +485,33 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 static bool
 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 {
-	if (is_tablesync_worker(MyLogicalRepWorker))
-		return MyLogicalRepWorker->relid == rel->localreloid;
-	else if (is_parallel_apply_worker(MyLogicalRepWorker))
+	switch (MyLogicalRepWorker->type)
 	{
-		/* We don't synchronize rel's that are in unknown state. */
-		if (rel->state != SUBREL_STATE_READY &&
-			rel->state != SUBREL_STATE_UNKNOWN)
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
-							MySubscription->name),
-					 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
-
-		return rel->state == SUBREL_STATE_READY;
+		case WORKERTYPE_TABLESYNC:
+			return MyLogicalRepWorker->relid == rel->localreloid;
+
+		case WORKERTYPE_APPLY:
+			return (rel->state == SUBREL_STATE_READY ||
+					(rel->state == SUBREL_STATE_SYNCDONE &&
+					 rel->statelsn <= remote_final_lsn));
+
+		case WORKERTYPE_PARALLEL_APPLY:
+				/* We don't synchronize rel's that are in unknown state. */
+				if (rel->state != SUBREL_STATE_READY &&
+					rel->state != SUBREL_STATE_UNKNOWN)
+						ereport(ERROR,
+								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+								 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
+										MySubscription->name),
+								 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
+
+				return rel->state == SUBREL_STATE_READY;
+
+		case WORKERTYPE_UNKNOWN:
+				ereport(ERROR, errmsg_internal("should_apply_changes_for_rel: Unknown worker type"));
 	}
-	else
-		return (rel->state == SUBREL_STATE_READY ||
-				(rel->state == SUBREL_STATE_SYNCDONE &&
-				 rel->statelsn <= remote_final_lsn));
+
+	return false; /* dummy for compiler */
 }
 
 /*
-- 
1.8.3.1

