From 35bd37aff2be272aa1ce3a0edd70f0e3f1373563 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Wed, 2 Aug 2023 16:34:24 +1000
Subject: [PATCH v3] Add LogicalRepWorkerType enum

---
 .../replication/logical/applyparallelworker.c      | 12 +++---
 src/backend/replication/logical/launcher.c         | 24 ++++++++----
 src/backend/replication/logical/tablesync.c        |  4 +-
 src/backend/replication/logical/worker.c           | 44 +++++++++++-----------
 src/include/replication/worker_internal.h          | 33 ++++++++--------
 src/tools/pgindent/typedefs.list                   |  1 +
 6 files changed, 64 insertions(+), 54 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 6fb9614..6d25fde 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -265,7 +265,7 @@ static bool
 pa_can_start(void)
 {
 	/* Only leader apply workers can start parallel apply workers. */
-	if (!am_leader_apply_worker())
+	if (!IsLeaderApplyWorker())
 		return false;
 
 	/*
@@ -555,7 +555,7 @@ pa_find_worker(TransactionId xid)
 static void
 pa_free_worker(ParallelApplyWorkerInfo *winfo)
 {
-	Assert(!am_parallel_apply_worker());
+	Assert(!IsParallelApplyWorker());
 	Assert(winfo->in_use);
 	Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
 
@@ -1506,7 +1506,7 @@ pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
 
 	if (fileset_state == FS_SERIALIZE_DONE)
 	{
-		Assert(am_leader_apply_worker());
+		Assert(IsLeaderApplyWorker());
 		Assert(MyLogicalRepWorker->stream_fileset);
 		wshared->fileset = *MyLogicalRepWorker->stream_fileset;
 	}
@@ -1522,7 +1522,7 @@ pa_get_fileset_state(void)
 {
 	PartialFileSetState fileset_state;
 
-	Assert(am_parallel_apply_worker());
+	Assert(IsParallelApplyWorker());
 
 	SpinLockAcquire(&MyParallelShared->mutex);
 	fileset_state = MyParallelShared->fileset_state;
@@ -1593,7 +1593,7 @@ pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
 void
 pa_decr_and_wait_stream_block(void)
 {
-	Assert(am_parallel_apply_worker());
+	Assert(IsParallelApplyWorker());
 
 	/*
 	 * It is only possible to not have any pending stream chunks when we are
@@ -1620,7 +1620,7 @@ pa_decr_and_wait_stream_block(void)
 void
 pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
 {
-	Assert(am_leader_apply_worker());
+	Assert(IsLeaderApplyWorker());
 
 	/*
 	 * Unlock the shared object lock so that parallel apply worker can
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 542af7d..9022b64 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -259,7 +259,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
 		/* Skip parallel apply workers. */
-		if (isParallelApplyWorker(w))
+		if (is_worker_type(w, TYPE_PARALLEL_APPLY_WORKER))
 			continue;
 
 		if (w->in_use && w->subid == subid && w->relid == relid &&
@@ -427,6 +427,13 @@ retry:
 	}
 
 	/* Prepare the worker slot. */
+	if (OidIsValid(relid))
+		worker->type = TYPE_TABLESYNC_WORKER;
+	else if (is_parallel_apply_worker)
+		worker->type = TYPE_PARALLEL_APPLY_WORKER;
+	else
+		worker->type = TYPE_APPLY_WORKER;
+
 	worker->launch_time = now;
 	worker->in_use = true;
 	worker->generation++;
@@ -601,7 +608,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 
 	if (worker)
 	{
-		Assert(!isParallelApplyWorker(worker));
+		Assert(!is_worker_type(worker, TYPE_PARALLEL_APPLY_WORKER));
 		logicalrep_worker_stop_internal(worker, SIGTERM);
 	}
 
@@ -643,7 +650,7 @@ logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
 	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
 	worker = &LogicalRepCtx->workers[slot_no];
-	Assert(isParallelApplyWorker(worker));
+	Assert(is_worker_type(worker, TYPE_PARALLEL_APPLY_WORKER));
 
 	/*
 	 * Only stop the worker if the generation matches and the worker is alive.
@@ -729,7 +736,7 @@ static void
 logicalrep_worker_detach(void)
 {
 	/* Stop the parallel apply workers. */
-	if (am_leader_apply_worker())
+	if (IsLeaderApplyWorker())
 	{
 		List	   *workers;
 		ListCell   *lc;
@@ -749,7 +756,7 @@ logicalrep_worker_detach(void)
 		{
 			LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
-			if (isParallelApplyWorker(w))
+			if (is_worker_type(w, TYPE_PARALLEL_APPLY_WORKER))
 				logicalrep_worker_stop_internal(w, SIGTERM);
 		}
 
@@ -772,6 +779,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
 {
 	Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
 
+	worker->type = TYPE_UNKNOWN;
 	worker->in_use = false;
 	worker->proc = NULL;
 	worker->dbid = InvalidOid;
@@ -868,7 +876,7 @@ logicalrep_pa_worker_count(Oid subid)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (w->subid == subid && isParallelApplyWorker(w))
+		if (w->subid == subid && is_worker_type(w, TYPE_PARALLEL_APPLY_WORKER))
 			res++;
 	}
 
@@ -1237,7 +1245,7 @@ GetLeaderApplyWorkerPid(pid_t pid)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
+		if (is_worker_type(w, TYPE_PARALLEL_APPLY_WORKER) && w->proc && pid == w->proc->pid)
 		{
 			leader_pid = w->leader_pid;
 			break;
@@ -1290,7 +1298,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 			nulls[1] = true;
 		values[2] = Int32GetDatum(worker_pid);
 
-		if (isParallelApplyWorker(&worker))
+		if (is_worker_type(&worker, TYPE_PARALLEL_APPLY_WORKER))
 			values[3] = Int32GetDatum(worker.leader_pid);
 		else
 			nulls[3] = true;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6d46165..e359286 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -652,10 +652,10 @@ process_syncing_tables(XLogRecPtr current_lsn)
 	 * that are in a READY state. See pa_can_start() and
 	 * should_apply_changes_for_rel().
 	 */
-	if (am_parallel_apply_worker())
+	if (IsParallelApplyWorker())
 		return;
 
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 		process_syncing_tables_for_sync(current_lsn);
 	else
 		process_syncing_tables_for_apply(current_lsn);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index dd353fd..d165db6 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -486,9 +486,9 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 static bool
 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 {
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 		return MyLogicalRepWorker->relid == rel->localreloid;
-	else if (am_parallel_apply_worker())
+	else if (IsParallelApplyWorker())
 	{
 		/* We don't synchronize rel's that are in unknown state. */
 		if (rel->state != SUBREL_STATE_READY &&
@@ -1054,7 +1054,7 @@ apply_handle_begin_prepare(StringInfo s)
 	LogicalRepPreparedTxnData begin_data;
 
 	/* Tablesync should never receive prepare. */
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
@@ -1293,7 +1293,7 @@ apply_handle_stream_prepare(StringInfo s)
 				 errmsg_internal("STREAM PREPARE message without STREAM STOP")));
 
 	/* Tablesync should never receive prepare. */
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
@@ -1423,7 +1423,7 @@ apply_handle_origin(StringInfo s)
 	 */
 	if (!in_streamed_transaction &&
 		(!in_remote_transaction ||
-		 (IsTransactionState() && !am_tablesync_worker())))
+		 (IsTransactionState() && !IsTablesyncWorker())))
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("ORIGIN message sent out of order")));
@@ -2020,7 +2020,7 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
 	int			fileno;
 	off_t		offset;
 
-	if (!am_parallel_apply_worker())
+	if (!IsParallelApplyWorker())
 		maybe_start_skipping_changes(lsn);
 
 	/* Make sure we have an open transaction */
@@ -3440,7 +3440,7 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 	 * Skip for parallel apply workers, because the lsn_mapping is maintained
 	 * by the leader apply worker.
 	 */
-	if (am_parallel_apply_worker())
+	if (IsParallelApplyWorker())
 		return;
 
 	/* Need to do this in permanent context */
@@ -3832,7 +3832,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 static void
 apply_worker_exit(void)
 {
-	if (am_parallel_apply_worker())
+	if (IsParallelApplyWorker())
 	{
 		/*
 		 * Don't stop the parallel apply worker as the leader will detect the
@@ -3851,7 +3851,7 @@ apply_worker_exit(void)
 	 * subscription is still active, and so that we won't leak that hash table
 	 * entry if it isn't.
 	 */
-	if (!am_tablesync_worker())
+	if (!IsTablesyncWorker())
 		ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
 
 	proc_exit(0);
@@ -3894,7 +3894,7 @@ maybe_reread_subscription(void)
 						MySubscription->name)));
 
 		/* Ensure we remove no-longer-useful entry for worker's start time */
-		if (!am_tablesync_worker() && !am_parallel_apply_worker())
+		if (!IsTablesyncWorker() && !IsParallelApplyWorker())
 			ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
 		proc_exit(0);
 	}
@@ -3932,7 +3932,7 @@ maybe_reread_subscription(void)
 		newsub->owner != MySubscription->owner ||
 		!equal(newsub->publications, MySubscription->publications))
 	{
-		if (am_parallel_apply_worker())
+		if (IsParallelApplyWorker())
 			ereport(LOG,
 					(errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
 							MySubscription->name)));
@@ -4359,7 +4359,7 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
 {
 	char	   *syncslotname = NULL;
 
-	Assert(am_tablesync_worker());
+	Assert(IsTablesyncWorker());
 
 	PG_TRY();
 	{
@@ -4416,7 +4416,7 @@ start_apply(XLogRecPtr origin_startpos)
 			 * idle state.
 			 */
 			AbortOutOfAnyTransaction();
-			pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+			pgstat_report_subscription_error(MySubscription->oid, !IsTablesyncWorker());
 
 			PG_RE_THROW();
 		}
@@ -4465,7 +4465,7 @@ InitializeApplyWorker(void)
 						MyLogicalRepWorker->subid)));
 
 		/* Ensure we remove no-longer-useful entry for worker's start time */
-		if (!am_tablesync_worker() && !am_parallel_apply_worker())
+		if (!IsTablesyncWorker() && !IsParallelApplyWorker())
 			ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
 		proc_exit(0);
 	}
@@ -4491,7 +4491,7 @@ InitializeApplyWorker(void)
 								  subscription_change_cb,
 								  (Datum) 0);
 
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 		ereport(LOG,
 				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
 						MySubscription->name,
@@ -4545,7 +4545,7 @@ ApplyWorkerMain(Datum main_arg)
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
 
-	if (am_tablesync_worker())
+	if (IsTablesyncWorker())
 	{
 		start_table_sync(&origin_startpos, &myslotname);
 
@@ -4658,7 +4658,7 @@ ApplyWorkerMain(Datum main_arg)
 	options.proto.logical.twophase = false;
 	options.proto.logical.origin = pstrdup(MySubscription->origin);
 
-	if (!am_tablesync_worker())
+	if (!IsTablesyncWorker())
 	{
 		/*
 		 * Even when the two_phase mode is requested by the user, it remains
@@ -4727,7 +4727,7 @@ DisableSubscriptionAndExit(void)
 
 	/* Report the worker failed during either table synchronization or apply */
 	pgstat_report_subscription_error(MyLogicalRepWorker->subid,
-									 !am_tablesync_worker());
+									 !IsTablesyncWorker());
 
 	/* Disable the subscription */
 	StartTransactionCommand();
@@ -4735,7 +4735,7 @@ DisableSubscriptionAndExit(void)
 	CommitTransactionCommand();
 
 	/* Ensure we remove no-longer-useful entry for worker's start time */
-	if (!am_tablesync_worker() && !am_parallel_apply_worker())
+	if (!IsTablesyncWorker() && !IsParallelApplyWorker())
 		ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
 
 	/* Notify the subscription has been disabled and exit */
@@ -4761,7 +4761,7 @@ IsLogicalWorker(void)
 bool
 IsLogicalParallelApplyWorker(void)
 {
-	return IsLogicalWorker() && am_parallel_apply_worker();
+	return IsLogicalWorker() && IsParallelApplyWorker();
 }
 
 /*
@@ -4826,7 +4826,7 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
 	XLogRecPtr	myskiplsn = MySubscription->skiplsn;
 	bool		started_tx = false;
 
-	if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
+	if (likely(XLogRecPtrIsInvalid(myskiplsn)) || IsParallelApplyWorker())
 		return;
 
 	if (!IsTransactionState())
@@ -5060,7 +5060,7 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
 {
 	*winfo = NULL;
 
-	if (am_parallel_apply_worker())
+	if (IsParallelApplyWorker())
 	{
 		return TRANS_PARALLEL_APPLY;
 	}
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 343e781..2c9a069 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -27,8 +27,20 @@
 #include "storage/spin.h"
 
 
+/* Different types of worker */
+typedef enum LogicalRepWorkerType
+{
+	TYPE_UNKNOWN = 0,
+	TYPE_TABLESYNC_WORKER,
+	TYPE_APPLY_WORKER,
+	TYPE_PARALLEL_APPLY_WORKER
+} LogicalRepWorkerType;
+
 typedef struct LogicalRepWorker
 {
+	/* What type of worker is this? */
+	LogicalRepWorkerType type;
+
 	/* Time at which this worker was launched. */
 	TimestampTz launch_time;
 
@@ -305,25 +317,14 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
 
-#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
-
 static inline bool
-am_tablesync_worker(void)
+is_worker_type(LogicalRepWorker *worker, LogicalRepWorkerType type)
 {
-	return OidIsValid(MyLogicalRepWorker->relid);
+	return worker->type == type;
 }
 
-static inline bool
-am_leader_apply_worker(void)
-{
-	return (!am_tablesync_worker() &&
-			!isParallelApplyWorker(MyLogicalRepWorker));
-}
-
-static inline bool
-am_parallel_apply_worker(void)
-{
-	return isParallelApplyWorker(MyLogicalRepWorker);
-}
+#define IsLeaderApplyWorker() is_worker_type(MyLogicalRepWorker, TYPE_APPLY_WORKER)
+#define IsParallelApplyWorker() is_worker_type(MyLogicalRepWorker, TYPE_PARALLEL_APPLY_WORKER)
+#define IsTablesyncWorker() is_worker_type(MyLogicalRepWorker, TYPE_TABLESYNC_WORKER)
 
 #endif							/* WORKER_INTERNAL_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e941fb6..0b39394 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1498,6 +1498,7 @@ LogicalRepStreamAbortData
 LogicalRepTupleData
 LogicalRepTyp
 LogicalRepWorker
+LogicalRepWorkerType
 LogicalRewriteMappingData
 LogicalTape
 LogicalTapeSet
-- 
1.8.3.1

