From afd6e9fc22bed5e135d3cc5a5b970466f771c22f Mon Sep 17 00:00:00 2001
From: "Chao Li (Evan)" <lic@highgo.com>
Date: Tue, 30 Dec 2025 12:55:31 +0800
Subject: [PATCH v3 2/2] Consolidate replication origin session globals into a
 single state struct

Replace the separate global variables used to track replication origin
session state (origin ID, origin LSN, and origin timestamp) with a single
RepOriginSessionState struct.

This refactoring groups logically related session state into one object,
reducing the proliferation of loosely related globals and making the code
easier to read and reason about. All existing users are updated to refer
to replorigin_session_state.{origin, origin_lsn, origin_timestamp}.

There is no intended behavior change; this is a mechanical cleanup to
improve clarity and maintainability of replication origin handling.

Author: Chao Li <lic@highgo.com>
Discussion: https://postgr.es/m/CAEoWx2=pYvfRthXHTzSrOsf5_FfyY4zJyK4zV2v4W=yjUij1cA@mail.gmail.com
---
 src/backend/access/transam/twophase.c         | 30 ++++++++---------
 src/backend/access/transam/xact.c             | 32 +++++++++----------
 src/backend/access/transam/xloginsert.c       |  6 ++--
 .../replication/logical/applyparallelworker.c |  6 ++--
 src/backend/replication/logical/origin.c      | 18 +++++------
 src/backend/replication/logical/tablesync.c   |  4 +--
 src/backend/replication/logical/worker.c      | 28 ++++++++--------
 src/include/replication/origin.h              | 11 +++++--
 8 files changed, 70 insertions(+), 65 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 3bc85986829..fd69f687bf2 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1157,13 +1157,13 @@ EndPrepare(GlobalTransaction gxact)
 	Assert(hdr->magic == TWOPHASE_MAGIC);
 	hdr->total_len = records.total_len + sizeof(pg_crc32c);
 
-	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
-				  replorigin_session_origin != DoNotReplicateId);
+	replorigin = (replorigin_session_state.origin != InvalidRepOriginId &&
+				  replorigin_session_state.origin != DoNotReplicateId);
 
 	if (replorigin)
 	{
-		hdr->origin_lsn = replorigin_session_origin_lsn;
-		hdr->origin_timestamp = replorigin_session_origin_timestamp;
+		hdr->origin_lsn = replorigin_session_state.origin_lsn;
+		hdr->origin_timestamp = replorigin_session_state.origin_timestamp;
 	}
 
 	/*
@@ -1211,7 +1211,7 @@ EndPrepare(GlobalTransaction gxact)
 	if (replorigin)
 	{
 		/* Move LSNs forward for this replication origin */
-		replorigin_session_advance(replorigin_session_origin_lsn,
+		replorigin_session_advance(replorigin_session_state.origin_lsn,
 								   gxact->prepare_end_lsn);
 	}
 
@@ -2330,8 +2330,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	 * Are we using the replication origins feature?  Or, in other words, are
 	 * we replaying remote actions?
 	 */
-	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
-				  replorigin_session_origin != DoNotReplicateId);
+	replorigin = (replorigin_session_state.origin != InvalidRepOriginId &&
+				  replorigin_session_state.origin != DoNotReplicateId);
 
 	/* Load the injection point before entering the critical section */
 	INJECTION_POINT_LOAD("commit-after-delay-checkpoint");
@@ -2376,7 +2376,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
 
 	if (replorigin)
 		/* Move LSNs forward for this replication origin */
-		replorigin_session_advance(replorigin_session_origin_lsn,
+		replorigin_session_advance(replorigin_session_state.origin_lsn,
 								   XactLastRecEnd);
 
 	/*
@@ -2387,12 +2387,12 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	 * We don't need to WAL-log anything here, as the commit record written
 	 * above already contains the data.
 	 */
-	if (!replorigin || replorigin_session_origin_timestamp == 0)
-		replorigin_session_origin_timestamp = committs;
+	if (!replorigin || replorigin_session_state.origin_timestamp == 0)
+		replorigin_session_state.origin_timestamp = committs;
 
 	TransactionTreeSetCommitTsData(xid, nchildren, children,
-								   replorigin_session_origin_timestamp,
-								   replorigin_session_origin);
+								   replorigin_session_state.origin_timestamp,
+								   replorigin_session_state.origin);
 
 	/*
 	 * We don't currently try to sleep before flush here ... nor is there any
@@ -2445,8 +2445,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
 	 * Are we using the replication origins feature?  Or, in other words, are
 	 * we replaying remote actions?
 	 */
-	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
-				  replorigin_session_origin != DoNotReplicateId);
+	replorigin = (replorigin_session_state.origin != InvalidRepOriginId &&
+				  replorigin_session_state.origin != DoNotReplicateId);
 
 	/*
 	 * Catch the scenario where we aborted partway through
@@ -2472,7 +2472,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
 
 	if (replorigin)
 		/* Move LSNs forward for this replication origin */
-		replorigin_session_advance(replorigin_session_origin_lsn,
+		replorigin_session_advance(replorigin_session_state.origin_lsn,
 								   XactLastRecEnd);
 
 	/* Always flush, since we're about to remove the 2PC state file */
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 1b5c1f6b763..0c32f633c6d 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1413,8 +1413,8 @@ RecordTransactionCommit(void)
 		 * Are we using the replication origins feature?  Or, in other words,
 		 * are we replaying remote actions?
 		 */
-		replorigin = (replorigin_session_origin != InvalidRepOriginId &&
-					  replorigin_session_origin != DoNotReplicateId);
+		replorigin = (replorigin_session_state.origin != InvalidRepOriginId &&
+					  replorigin_session_state.origin != DoNotReplicateId);
 
 		/*
 		 * Mark ourselves as within our "commit critical section".  This
@@ -1462,7 +1462,7 @@ RecordTransactionCommit(void)
 
 		if (replorigin)
 			/* Move LSNs forward for this replication origin */
-			replorigin_session_advance(replorigin_session_origin_lsn,
+			replorigin_session_advance(replorigin_session_state.origin_lsn,
 									   XactLastRecEnd);
 
 		/*
@@ -1475,12 +1475,12 @@ RecordTransactionCommit(void)
 		 * written above already contains the data.
 		 */
 
-		if (!replorigin || replorigin_session_origin_timestamp == 0)
-			replorigin_session_origin_timestamp = GetCurrentTransactionStopTimestamp();
+		if (!replorigin || replorigin_session_state.origin_timestamp == 0)
+			replorigin_session_state.origin_timestamp = GetCurrentTransactionStopTimestamp();
 
 		TransactionTreeSetCommitTsData(xid, nchildren, children,
-									   replorigin_session_origin_timestamp,
-									   replorigin_session_origin);
+									   replorigin_session_state.origin_timestamp,
+									   replorigin_session_state.origin);
 	}
 
 	/*
@@ -1810,8 +1810,8 @@ RecordTransactionAbort(bool isSubXact)
 	 * Are we using the replication origins feature?  Or, in other words, are
 	 * we replaying remote actions?
 	 */
-	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
-				  replorigin_session_origin != DoNotReplicateId);
+	replorigin = (replorigin_session_state.origin != InvalidRepOriginId &&
+				  replorigin_session_state.origin != DoNotReplicateId);
 
 	/* Fetch the data we need for the abort record */
 	nrels = smgrGetPendingDeletes(false, &rels);
@@ -1838,7 +1838,7 @@ RecordTransactionAbort(bool isSubXact)
 
 	if (replorigin)
 		/* Move LSNs forward for this replication origin */
-		replorigin_session_advance(replorigin_session_origin_lsn,
+		replorigin_session_advance(replorigin_session_state.origin_lsn,
 								   XactLastRecEnd);
 
 	/*
@@ -5927,12 +5927,12 @@ XactLogCommitRecord(TimestampTz commit_time,
 	}
 
 	/* dump transaction origin information */
-	if (replorigin_session_origin != InvalidRepOriginId)
+	if (replorigin_session_state.origin != InvalidRepOriginId)
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
 
-		xl_origin.origin_lsn = replorigin_session_origin_lsn;
-		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
+		xl_origin.origin_lsn = replorigin_session_state.origin_lsn;
+		xl_origin.origin_timestamp = replorigin_session_state.origin_timestamp;
 	}
 
 	if (xl_xinfo.xinfo != 0)
@@ -6080,12 +6080,12 @@ XactLogAbortRecord(TimestampTz abort_time,
 	 * Dump transaction origin information. We need this during recovery to
 	 * update the replication origin progress.
 	 */
-	if (replorigin_session_origin != InvalidRepOriginId)
+	if (replorigin_session_state.origin != InvalidRepOriginId)
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
 
-		xl_origin.origin_lsn = replorigin_session_origin_lsn;
-		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
+		xl_origin.origin_lsn = replorigin_session_state.origin_lsn;
+		xl_origin.origin_timestamp = replorigin_session_state.origin_timestamp;
 	}
 
 	if (xl_xinfo.xinfo != 0)
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index a56d5a55282..ed576ae578b 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -861,11 +861,11 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 
 	/* followed by the record's origin, if any */
 	if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
-		replorigin_session_origin != InvalidRepOriginId)
+		replorigin_session_state.origin != InvalidRepOriginId)
 	{
 		*(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
-		memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
-		scratch += sizeof(replorigin_session_origin);
+		memcpy(scratch, &replorigin_session_state.origin, sizeof(replorigin_session_state.origin));
+		scratch += sizeof(replorigin_session_state.origin);
 	}
 
 	/* followed by toplevel XID, if not already included in previous record */
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index a4aafcf5b6e..15c600faa64 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -962,7 +962,7 @@ ParallelApplyWorkerMain(Datum main_arg)
 	 * origin which was already acquired by its leader process.
 	 */
 	replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
-	replorigin_session_origin = originid;
+	replorigin_session_state.origin = originid;
 	CommitTransactionCommand();
 
 	/*
@@ -1430,8 +1430,8 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data)
 	 * Update origin state so we can restart streaming from correct position
 	 * in case of crash.
 	 */
-	replorigin_session_origin_lsn = abort_data->abort_lsn;
-	replorigin_session_origin_timestamp = abort_data->abort_time;
+	replorigin_session_state.origin_lsn = abort_data->abort_lsn;
+	replorigin_session_state.origin_timestamp = abort_data->abort_time;
 
 	/*
 	 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 2635bf3d54f..96c9c7e5f7e 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -160,9 +160,9 @@ typedef struct ReplicationStateCtl
 } ReplicationStateCtl;
 
 /* external variables */
-RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
-XLogRecPtr	replorigin_session_origin_lsn = InvalidXLogRecPtr;
-TimestampTz replorigin_session_origin_timestamp = 0;
+RepOriginSessionState replorigin_session_state = {
+	InvalidRepOriginId, InvalidXLogRecPtr,
+0};
 
 /*
  * Base address into a shared memory array of replication states of size
@@ -1295,10 +1295,10 @@ replorigin_session_get_progress(bool flush)
 void
 replorigin_session_clear_state(bool xact_only)
 {
-	replorigin_session_origin_lsn = InvalidXLogRecPtr;
-	replorigin_session_origin_timestamp = 0;
+	replorigin_session_state.origin_lsn = InvalidXLogRecPtr;
+	replorigin_session_state.origin_timestamp = 0;
 	if (!xact_only)
-		replorigin_session_origin = InvalidRepOriginId;
+		replorigin_session_state.origin = InvalidRepOriginId;
 }
 
 
@@ -1434,7 +1434,7 @@ pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
 {
 	replorigin_check_prerequisites(false, false);
 
-	PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
+	PG_RETURN_BOOL(replorigin_session_state.origin != InvalidRepOriginId);
 }
 
 
@@ -1478,8 +1478,8 @@ pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("no replication origin is configured")));
 
-	replorigin_session_origin_lsn = location;
-	replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
+	replorigin_session_state.origin_lsn = location;
+	replorigin_session_state.origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
 
 	PG_RETURN_VOID();
 }
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 47104ed676c..61952e7d995 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1318,7 +1318,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		 */
 		originid = replorigin_by_name(originname, false);
 		replorigin_session_setup(originid, 0);
-		replorigin_session_origin = originid;
+		replorigin_session_state.origin = originid;
 		*origin_startpos = replorigin_session_get_progress(false);
 
 		CommitTransactionCommand();
@@ -1405,7 +1405,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
 
 	replorigin_session_setup(originid, 0);
-	replorigin_session_origin = originid;
+	replorigin_session_state.origin = originid;
 
 	/*
 	 * If the user did not opt to run as the owner of the subscription
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 4df177664b7..135c65e56bd 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1318,8 +1318,8 @@ apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
 	 * Update origin state so we can restart streaming from correct position
 	 * in case of crash.
 	 */
-	replorigin_session_origin_lsn = prepare_data->end_lsn;
-	replorigin_session_origin_timestamp = prepare_data->prepare_time;
+	replorigin_session_state.origin_lsn = prepare_data->end_lsn;
+	replorigin_session_state.origin_timestamp = prepare_data->prepare_time;
 
 	PrepareTransactionBlock(gid);
 }
@@ -1421,8 +1421,8 @@ apply_handle_commit_prepared(StringInfo s)
 	 * Update origin state so we can restart streaming from correct position
 	 * in case of crash.
 	 */
-	replorigin_session_origin_lsn = prepare_data.end_lsn;
-	replorigin_session_origin_timestamp = prepare_data.commit_time;
+	replorigin_session_state.origin_lsn = prepare_data.end_lsn;
+	replorigin_session_state.origin_timestamp = prepare_data.commit_time;
 
 	FinishPreparedTransaction(gid, true);
 	end_replication_step();
@@ -1479,8 +1479,8 @@ apply_handle_rollback_prepared(StringInfo s)
 		 * Update origin state so we can restart streaming from correct
 		 * position in case of crash.
 		 */
-		replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
-		replorigin_session_origin_timestamp = rollback_data.rollback_time;
+		replorigin_session_state.origin_lsn = rollback_data.rollback_end_lsn;
+		replorigin_session_state.origin_timestamp = rollback_data.rollback_time;
 
 		/* There is no transaction when ABORT/ROLLBACK PREPARED is called */
 		begin_replication_step();
@@ -2526,8 +2526,8 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data)
 		 * Update origin state so we can restart streaming from correct
 		 * position in case of crash.
 		 */
-		replorigin_session_origin_lsn = commit_data->end_lsn;
-		replorigin_session_origin_timestamp = commit_data->committime;
+		replorigin_session_state.origin_lsn = commit_data->end_lsn;
+		replorigin_session_state.origin_timestamp = commit_data->committime;
 
 		CommitTransactionCommand();
 
@@ -2940,7 +2940,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 		 */
 		if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
 									&conflicttuple.origin, &conflicttuple.ts) &&
-			conflicttuple.origin != replorigin_session_origin)
+			conflicttuple.origin != replorigin_session_state.origin)
 		{
 			TupleTableSlot *newslot;
 
@@ -2982,7 +2982,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 									   &conflicttuple.xmin,
 									   &conflicttuple.origin,
 									   &conflicttuple.ts) &&
-			conflicttuple.origin != replorigin_session_origin)
+			conflicttuple.origin != replorigin_session_state.origin)
 			type = CT_UPDATE_DELETED;
 		else
 			type = CT_UPDATE_MISSING;
@@ -3135,7 +3135,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 		 */
 		if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
 									&conflicttuple.origin, &conflicttuple.ts) &&
-			conflicttuple.origin != replorigin_session_origin)
+			conflicttuple.origin != replorigin_session_state.origin)
 		{
 			conflicttuple.slot = localslot;
 			ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
@@ -3477,7 +3477,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 												   &conflicttuple.xmin,
 												   &conflicttuple.origin,
 												   &conflicttuple.ts) &&
-						conflicttuple.origin != replorigin_session_origin)
+						conflicttuple.origin != replorigin_session_state.origin)
 						type = CT_UPDATE_DELETED;
 					else
 						type = CT_UPDATE_MISSING;
@@ -3503,7 +3503,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 				if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
 											&conflicttuple.origin,
 											&conflicttuple.ts) &&
-					conflicttuple.origin != replorigin_session_origin)
+					conflicttuple.origin != replorigin_session_state.origin)
 				{
 					TupleTableSlot *newslot;
 
@@ -5652,7 +5652,7 @@ run_apply_worker(void)
 	if (!OidIsValid(originid))
 		originid = replorigin_create(originname);
 	replorigin_session_setup(originid, 0);
-	replorigin_session_origin = originid;
+	replorigin_session_state.origin = originid;
 	origin_startpos = replorigin_session_get_progress(false);
 	CommitTransactionCommand();
 
diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h
index ab34ef97c46..8eace3a16db 100644
--- a/src/include/replication/origin.h
+++ b/src/include/replication/origin.h
@@ -40,9 +40,14 @@ typedef struct xl_replorigin_drop
  */
 #define MAX_RONAME_LEN	512
 
-extern PGDLLIMPORT RepOriginId replorigin_session_origin;
-extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn;
-extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp;
+typedef struct RepOriginSessionState
+{
+	RepOriginId origin;
+	XLogRecPtr	origin_lsn;
+	TimestampTz origin_timestamp;
+}			RepOriginSessionState;
+
+extern PGDLLIMPORT RepOriginSessionState replorigin_session_state;
 
 /* GUCs */
 extern PGDLLIMPORT int max_active_replication_origins;
-- 
2.39.5 (Apple Git-154)

