From f314184765c5180b26cea8ffba2038941e4b199c Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Thu, 15 Jun 2023 12:19:49 +1000
Subject: [PATCH v1] Consistent naming of LR workers

---
 .../replication/logical/applyparallelworker.c      | 34 +++++++++++++++-------
 src/backend/replication/logical/launcher.c         |  6 ++--
 src/backend/replication/logical/tablesync.c        |  8 +++--
 src/backend/replication/logical/worker.c           | 27 +++++++++++------
 src/include/replication/worker_internal.h          |  4 +++
 5 files changed, 54 insertions(+), 25 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 82c1ddc..979b1d4 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -716,7 +716,9 @@ ProcessParallelApplyInterrupts(void)
 	if (ShutdownRequestPending)
 	{
 		ereport(LOG,
-				(errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
+		/* translator: first %s is the name of logical replication worker */
+				(errmsg("%s for subscription \"%s\" has finished",
+						LR_WORKER_NAME_APPLY_PARALLEL,
 						MySubscription->name)));
 
 		proc_exit(0);
@@ -821,8 +823,9 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
 			Assert(shmq_res == SHM_MQ_DETACHED);
 
 			ereport(ERROR,
+			/* translator: first %s is the name of logical replication worker */
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("lost connection to the logical replication apply worker")));
+					 errmsg("lost connection to the %s", LR_WORKER_NAME_APPLY)));
 		}
 
 		MemoryContextReset(ApplyMessageContext);
@@ -1024,9 +1027,9 @@ HandleParallelApplyMessage(StringInfo msg)
 				 */
 				if (edata.context)
 					edata.context = psprintf("%s\n%s", edata.context,
-											 _("logical replication parallel apply worker"));
+											LR_WORKER_NAME_APPLY_PARALLEL);
 				else
-					edata.context = pstrdup(_("logical replication parallel apply worker"));
+					edata.context = pstrdup(LR_WORKER_NAME_APPLY_PARALLEL);
 
 				/*
 				 * Context beyond that should use the error context callbacks
@@ -1040,7 +1043,8 @@ HandleParallelApplyMessage(StringInfo msg)
 				 */
 				ereport(ERROR,
 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-						 errmsg("logical replication parallel apply worker exited due to error"),
+				/* translator: first %s is the name of logical replication worker */
+						 errmsg("%s exited due to error", LR_WORKER_NAME_APPLY_PARALLEL),
 						 errcontext("%s", edata.context)));
 			}
 
@@ -1054,7 +1058,9 @@ HandleParallelApplyMessage(StringInfo msg)
 			break;
 
 		default:
-			elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
+			/* translator: first %s is the name of logical replication worker */
+			elog(ERROR, "unrecognized message type received %s: %c (message length %d bytes)",
+				 LR_WORKER_NAME_APPLY_PARALLEL,
 				 msgtype, msg->len);
 	}
 }
@@ -1126,8 +1132,9 @@ HandleParallelApplyMessages(void)
 		}
 		else
 			ereport(ERROR,
+			/* translator: first %s is the name of logical replication worker */
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("lost connection to the logical replication parallel apply worker")));
+					 errmsg("lost connection to the %s", LR_WORKER_NAME_APPLY_PARALLEL)));
 	}
 
 	MemoryContextSwitchTo(oldcontext);
@@ -1215,7 +1222,9 @@ pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
 							   bool stream_locked)
 {
 	ereport(LOG,
-			(errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
+			/* translator: first %s is the name of logical replication worker */
+			(errmsg("%s will serialize the remaining changes of remote transaction %u to a file",
+					LR_WORKER_NAME_APPLY,
 					winfo->shared->xid)));
 
 	/*
@@ -1299,8 +1308,9 @@ pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
 	 */
 	if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED)
 		ereport(ERROR,
+		/* translator: first %s is the name of logical replication worker */
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("lost connection to the logical replication parallel apply worker")));
+				 errmsg("lost connection to the %s", LR_WORKER_NAME_APPLY_PARALLEL)));
 }
 
 /*
@@ -1373,7 +1383,8 @@ pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
 		pa_savepoint_name(MySubscription->oid, current_xid,
 						  spname, sizeof(spname));
 
-		elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
+		/* translator: second %s is the name of logical replication worker */
+		elog(DEBUG1, "defining savepoint %s in %s", LR_WORKER_NAME_APPLY_PARALLEL, spname);
 
 		/* We must be in transaction block to define the SAVEPOINT. */
 		if (!IsTransactionBlock())
@@ -1468,7 +1479,8 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data)
 
 		pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
 
-		elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
+		/* translator: second %s is the name of logical replication worker */
+		elog(DEBUG1, "rolling back to savepoint %s in %s", LR_WORKER_NAME_APPLY_PARALLEL, spname);
 
 		/*
 		 * Search the subxactlist, determine the offset tracked for the
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 87b5593..ee47793 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -465,13 +465,13 @@ retry:
 
 	if (OidIsValid(relid))
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication worker for subscription %u sync %u", subid, relid);
+				 "%s for subscription %u sync %u", LR_WORKER_NAME_TABLESYNC, subid, relid);
 	else if (is_parallel_apply_worker)
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication parallel apply worker for subscription %u", subid);
+				 "%s for subscription %u", LR_WORKER_NAME_APPLY_PARALLEL, subid);
 	else
 		snprintf(bgw.bgw_name, BGW_MAXLEN,
-				 "logical replication apply worker for subscription %u", subid);
+				 "%s for subscription %u", LR_WORKER_NAME_APPLY, subid);
 
 	if (is_parallel_apply_worker)
 		snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index abae8d4..a410520 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -150,7 +150,9 @@ finish_sync_worker(void)
 
 	StartTransactionCommand();
 	ereport(LOG,
-			(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
+	/* translator: first %s is the name of logical replication worker */
+			(errmsg("%s for subscription \"%s\", table \"%s\" has finished",
+					LR_WORKER_NAME_TABLESYNC,
 					MySubscription->name,
 					get_rel_name(MyLogicalRepWorker->relid))));
 	CommitTransactionCommand();
@@ -619,7 +621,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			if (AllTablesyncsReady())
 			{
 				ereport(LOG,
-						(errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
+				/* translator: first %s is the name of logical replication worker */
+						(errmsg("%s for subscription \"%s\" will restart so that two_phase can be enabled",
+								LR_WORKER_NAME_APPLY,
 								MySubscription->name)));
 				should_exit = true;
 			}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 0ee764d..b8c9eba 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -442,11 +442,11 @@ static const char *
 get_worker_name(void)
 {
 	if (am_tablesync_worker())
-		return _("logical replication table synchronization worker");
+		return LR_WORKER_NAME_TABLESYNC;
 	else if (am_parallel_apply_worker())
-		return _("logical replication parallel apply worker");
+		return LR_WORKER_NAME_APPLY_PARALLEL;
 	else
-		return _("logical replication apply worker");
+		return LR_WORKER_NAME_APPLY;
 }
 
 /*
@@ -509,7 +509,9 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 			rel->state != SUBREL_STATE_UNKNOWN)
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
+			/* translator: first %s is the name of logical replication worker */
+					 errmsg("%s for subscription \"%s\" will stop",
+							LR_WORKER_NAME_APPLY_PARALLEL,
 							MySubscription->name),
 					 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
 
@@ -1071,7 +1073,8 @@ apply_handle_begin_prepare(StringInfo s)
 	if (am_tablesync_worker())
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
-				 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
+				 errmsg_internal("%s received a BEGIN PREPARE message",
+								 LR_WORKER_NAME_TABLESYNC)));
 
 	/* There must not be an active streaming transaction. */
 	Assert(!TransactionIdIsValid(stream_xid));
@@ -1310,7 +1313,8 @@ apply_handle_stream_prepare(StringInfo s)
 	if (am_tablesync_worker())
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
-				 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
+				 errmsg_internal("%s received a STREAM PREPARE message",
+								 LR_WORKER_NAME_TABLESYNC)));
 
 	logicalrep_read_stream_prepare(s, &prepare_data);
 	set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
@@ -3950,7 +3954,9 @@ maybe_reread_subscription(void)
 	{
 		if (am_parallel_apply_worker())
 			ereport(LOG,
-					(errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
+			/* translator: first %s is the name of logical replication worker */
+					(errmsg("%s for subscription \"%s\" will stop because of a parameter change",
+							LR_WORKER_NAME_APPLY_PARALLEL,
 							MySubscription->name)));
 		else
 			ereport(LOG,
@@ -4512,7 +4518,9 @@ InitializeApplyWorker(void)
 
 	if (am_tablesync_worker())
 		ereport(LOG,
-				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
+		/* translator: first %s is the name of logical replication worker */
+				(errmsg("%s for subscription \"%s\", table \"%s\" has started",
+						LR_WORKER_NAME_TABLESYNC,
 						MySubscription->name,
 						get_rel_name(MyLogicalRepWorker->relid))));
 	else
@@ -4707,7 +4715,8 @@ ApplyWorkerMain(Datum main_arg)
 		}
 
 		ereport(DEBUG1,
-				(errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
+				(errmsg_internal("%s for subscription \"%s\" two_phase is %s",
+								 LR_WORKER_NAME_APPLY,
 								 MySubscription->name,
 								 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
 								 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 343e781..aeb26dd 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -26,6 +26,10 @@
 #include "storage/shm_toc.h"
 #include "storage/spin.h"
 
+/* Names for the different kinds of logical replication workers. */
+#define LR_WORKER_NAME_TABLESYNC _("logical replication table synchronization worker")
+#define LR_WORKER_NAME_APPLY _("logical replication apply worker")
+#define LR_WORKER_NAME_APPLY_PARALLEL _("logical replication parallel apply worker")
 
 typedef struct LogicalRepWorker
 {
-- 
1.8.3.1

