From 55eb5d8980c59cd5743f941517bc3af2744d761e Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Mon, 26 Sep 2022 12:59:43 +1000
Subject: [PATCH v4 2/2] Add common function
 ReplicationOriginNameForLogicalRep.

Make a common replication origin name formatting function to replace multiple
snprintf() expressions. This also includes logic previously done by
ReplicationOriginNameForTablesync().

Peter Smith, reviewed by Aleksander Alekseev
Discussion: https://postgr.es/m/CAHut%2BPsa8hhfSE6ozUK-ih7GkQziAVAf4f3bqiXEj2nQiu-43g%40mail.gmail.com
---
 src/backend/commands/subscriptioncmds.c     | 15 +++++----
 src/backend/replication/logical/tablesync.c | 36 +++++++--------------
 src/backend/replication/logical/worker.c    | 35 +++++++++++++++++---
 src/include/replication/worker_internal.h   |  4 +--
 4 files changed, 52 insertions(+), 38 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f3bfcca434..97594cd9b1 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -657,7 +657,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 
 	recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
 
-	snprintf(originname, sizeof(originname), "pg_%u", subid);
+	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
 	replorigin_create(originname);
 
 	/*
@@ -946,8 +946,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 					 * origin and by this time the origin might be already
 					 * removed. For these reasons, passing missing_ok = true.
 					 */
-					ReplicationOriginNameForTablesync(sub->oid, relid, originname,
-													  sizeof(originname));
+					ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
+													   sizeof(originname));
 					replorigin_drop_by_name(originname, true, false);
 				}
 
@@ -1315,7 +1315,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					char		originname[NAMEDATALEN];
 					XLogRecPtr	remote_lsn;
 
-					snprintf(originname, sizeof(originname), "pg_%u", subid);
+					ReplicationOriginNameForLogicalRep(subid, InvalidOid,
+													   originname, sizeof(originname));
 					originid = replorigin_by_name(originname, false);
 					remote_lsn = replorigin_get_progress(originid, false);
 
@@ -1521,8 +1522,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 		 * worker so passing missing_ok = true. This can happen for the states
 		 * before SUBREL_STATE_FINISHEDCOPY.
 		 */
-		ReplicationOriginNameForTablesync(subid, relid, originname,
-										  sizeof(originname));
+		ReplicationOriginNameForLogicalRep(subid, relid, originname,
+										   sizeof(originname));
 		replorigin_drop_by_name(originname, true, false);
 	}
 
@@ -1533,7 +1534,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	RemoveSubscriptionRel(subid, InvalidOid);
 
 	/* Remove the origin tracking if exists. */
-	snprintf(originname, sizeof(originname), "pg_%u", subid);
+	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
 	replorigin_drop_by_name(originname, true, false);
 
 	/*
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 9e52fc401c..b6e0af0a76 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -353,10 +353,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		 */
 		StartTransactionCommand();
 
-		ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
-										  MyLogicalRepWorker->relid,
-										  originname,
-										  sizeof(originname));
+		ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
+										   MyLogicalRepWorker->relid,
+										   originname,
+										   sizeof(originname));
 
 		/*
 		 * Resetting the origin session removes the ownership of the slot.
@@ -505,10 +505,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 * error while dropping we won't restart it to drop the
 				 * origin. So passing missing_ok = true.
 				 */
-				ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
-												  rstate->relid,
-												  originname,
-												  sizeof(originname));
+				ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
+												   rstate->relid,
+												   originname,
+												   sizeof(originname));
 				replorigin_drop_by_name(originname, true, false);
 
 				/*
@@ -1193,18 +1193,6 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
 			 relid, GetSystemIdentifier());
 }
 
-/*
- * Form the origin name for tablesync.
- *
- * Return the name in the supplied buffer.
- */
-void
-ReplicationOriginNameForTablesync(Oid suboid, Oid relid,
-								  char *originname, Size szorgname)
-{
-	snprintf(originname, szorgname, "pg_%u_%u", suboid, relid);
-}
-
 /*
  * Start syncing the table in the sync worker.
  *
@@ -1274,10 +1262,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		   MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
 
 	/* Assign the origin tracking record name. */
-	ReplicationOriginNameForTablesync(MySubscription->oid,
-									  MyLogicalRepWorker->relid,
-									  originname,
-									  sizeof(originname));
+	ReplicationOriginNameForLogicalRep(MySubscription->oid,
+									   MyLogicalRepWorker->relid,
+									   originname,
+									   sizeof(originname));
 
 	if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
 	{
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 188c51660e..bfa32391bf 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -364,6 +364,30 @@ static void apply_error_callback(void *arg);
 static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
 static inline void reset_apply_error_context_info(void);
 
+/*
+ * Form the origin name for the subscription.
+ *
+ * This is a common function for tablesync and other workers. Tablesync workers
+ * must pass a valid relid. Other callers must pass relid = InvalidOid.
+ *
+ * Return the name in the supplied buffer.
+ */
+void
+ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
+								   char *originname, Size szoriginname)
+{
+	if (OidIsValid(relid))
+	{
+		/* Replication origin name for tablesync workers. */
+		snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
+	}
+	else
+	{
+		/* Replication origin name for non-tablesync workers. */
+		snprintf(originname, szoriginname, "pg_%u", suboid);
+	}
+}
+
 /*
  * Should this worker apply changes for given relation.
  *
@@ -3679,10 +3703,10 @@ ApplyWorkerMain(Datum main_arg)
 		 * Allocate the origin name in long-lived context for error context
 		 * message.
 		 */
-		ReplicationOriginNameForTablesync(MySubscription->oid,
-										  MyLogicalRepWorker->relid,
-										  originname,
-										  sizeof(originname));
+		ReplicationOriginNameForLogicalRep(MySubscription->oid,
+										   MyLogicalRepWorker->relid,
+										   originname,
+										   sizeof(originname));
 		apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
 																   originname);
 	}
@@ -3707,7 +3731,8 @@ ApplyWorkerMain(Datum main_arg)
 
 		/* Setup replication origin tracking. */
 		StartTransactionCommand();
-		snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
+		ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+										   originname, sizeof(originname));
 		originid = replorigin_by_name(originname, true);
 		if (!OidIsValid(originid))
 			originid = replorigin_create(originname);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index f82bc518c3..2b7114ff6d 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -92,8 +92,8 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
 extern int	logicalrep_sync_worker_count(Oid subid);
 
-extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid,
-											  char *originname, Size szorgname);
+extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
+											   char *originname, Size szoriginname);
 extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
 
 extern bool AllTablesyncsReady(void);
-- 
2.37.2

