From 39cdd382fe483c5e5576fc02f7a5ed7928293fa4 Mon Sep 17 00:00:00 2001
From: "Chao Li (Evan)" <lic@highgo.com>
Date: Tue, 30 Dec 2025 12:55:31 +0800
Subject: [PATCH v4 2/2] Consolidate replication origin session globals into a
 single state struct

Replace the separate global variables used to track replication origin
session state (origin ID, origin LSN, and origin timestamp) with a single
RepOriginSessionState struct.

This refactoring groups logically related session state into one object,
reducing the proliferation of loosely related globals and making the code
easier to read and reason about. All existing users are updated to refer
to replorigin_session_state.{origin, origin_lsn, origin_timestamp}.

There is no intended behavior change; this is a mechanical cleanup to
improve clarity and maintainability of replication origin handling.

Author: Chao Li <lic@highgo.com>
Discussion: https://postgr.es/m/CAEoWx2=pYvfRthXHTzSrOsf5_FfyY4zJyK4zV2v4W=yjUij1cA@mail.gmail.com
---
 src/backend/access/transam/twophase.c         | 32 ++++++++---------
 src/backend/access/transam/xact.c             | 36 +++++++++----------
 src/backend/access/transam/xloginsert.c       |  6 ++--
 .../replication/logical/applyparallelworker.c |  6 ++--
 src/backend/replication/logical/origin.c      | 22 ++++++------
 src/backend/replication/logical/tablesync.c   |  4 +--
 src/backend/replication/logical/worker.c      | 28 +++++++--------
 src/include/replication/origin.h              | 11 ++++--
 src/tools/pgindent/typedefs.list              |  1 +
 9 files changed, 76 insertions(+), 70 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 3bc85986829..7d34692f05f 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1157,13 +1157,13 @@ EndPrepare(GlobalTransaction gxact)
 	Assert(hdr->magic == TWOPHASE_MAGIC);
 	hdr->total_len = records.total_len + sizeof(pg_crc32c);
 
-	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
-				  replorigin_session_origin != DoNotReplicateId);
+	replorigin = (replorigin_session_state.origin != InvalidRepOriginId &&
+				  replorigin_session_state.origin != DoNotReplicateId);
 
 	if (replorigin)
 	{
-		hdr->origin_lsn = replorigin_session_origin_lsn;
-		hdr->origin_timestamp = replorigin_session_origin_timestamp;
+		hdr->origin_lsn = replorigin_session_state.origin_lsn;
+		hdr->origin_timestamp = replorigin_session_state.origin_timestamp;
 	}
 
 	/*
@@ -1211,7 +1211,7 @@ EndPrepare(GlobalTransaction gxact)
 	if (replorigin)
 	{
 		/* Move LSNs forward for this replication origin */
-		replorigin_session_advance(replorigin_session_origin_lsn,
+		replorigin_session_advance(replorigin_session_state.origin_lsn,
 								   gxact->prepare_end_lsn);
 	}
 
@@ -2330,8 +2330,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	 * Are we using the replication origins feature?  Or, in other words, are
 	 * we replaying remote actions?
 	 */
-	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
-				  replorigin_session_origin != DoNotReplicateId);
+	replorigin = (replorigin_session_state.origin != InvalidRepOriginId &&
+				  replorigin_session_state.origin != DoNotReplicateId);
 
 	/* Load the injection point before entering the critical section */
 	INJECTION_POINT_LOAD("commit-after-delay-checkpoint");
@@ -2376,23 +2376,23 @@ RecordTransactionCommitPrepared(TransactionId xid,
 
 	if (replorigin)
 		/* Move LSNs forward for this replication origin */
-		replorigin_session_advance(replorigin_session_origin_lsn,
+		replorigin_session_advance(replorigin_session_state.origin_lsn,
 								   XactLastRecEnd);
 
 	/*
 	 * Record commit timestamp.  The value comes from plain commit timestamp
 	 * if replorigin is not enabled, or replorigin already set a value for us
-	 * in replorigin_session_origin_timestamp otherwise.
+	 * in replorigin_session_state.origin_timestamp otherwise.
 	 *
 	 * We don't need to WAL-log anything here, as the commit record written
 	 * above already contains the data.
 	 */
-	if (!replorigin || replorigin_session_origin_timestamp == 0)
-		replorigin_session_origin_timestamp = committs;
+	if (!replorigin || replorigin_session_state.origin_timestamp == 0)
+		replorigin_session_state.origin_timestamp = committs;
 
 	TransactionTreeSetCommitTsData(xid, nchildren, children,
-								   replorigin_session_origin_timestamp,
-								   replorigin_session_origin);
+								   replorigin_session_state.origin_timestamp,
+								   replorigin_session_state.origin);
 
 	/*
 	 * We don't currently try to sleep before flush here ... nor is there any
@@ -2445,8 +2445,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
 	 * Are we using the replication origins feature?  Or, in other words, are
 	 * we replaying remote actions?
 	 */
-	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
-				  replorigin_session_origin != DoNotReplicateId);
+	replorigin = (replorigin_session_state.origin != InvalidRepOriginId &&
+				  replorigin_session_state.origin != DoNotReplicateId);
 
 	/*
 	 * Catch the scenario where we aborted partway through
@@ -2472,7 +2472,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
 
 	if (replorigin)
 		/* Move LSNs forward for this replication origin */
-		replorigin_session_advance(replorigin_session_origin_lsn,
+		replorigin_session_advance(replorigin_session_state.origin_lsn,
 								   XactLastRecEnd);
 
 	/* Always flush, since we're about to remove the 2PC state file */
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 1b5c1f6b763..5e7122b23f3 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1413,8 +1413,8 @@ RecordTransactionCommit(void)
 		 * Are we using the replication origins feature?  Or, in other words,
 		 * are we replaying remote actions?
 		 */
-		replorigin = (replorigin_session_origin != InvalidRepOriginId &&
-					  replorigin_session_origin != DoNotReplicateId);
+		replorigin = (replorigin_session_state.origin != InvalidRepOriginId &&
+					  replorigin_session_state.origin != DoNotReplicateId);
 
 		/*
 		 * Mark ourselves as within our "commit critical section".  This
@@ -1462,25 +1462,25 @@ RecordTransactionCommit(void)
 
 		if (replorigin)
 			/* Move LSNs forward for this replication origin */
-			replorigin_session_advance(replorigin_session_origin_lsn,
+			replorigin_session_advance(replorigin_session_state.origin_lsn,
 									   XactLastRecEnd);
 
 		/*
 		 * Record commit timestamp.  The value comes from plain commit
 		 * timestamp if there's no replication origin; otherwise, the
-		 * timestamp was already set in replorigin_session_origin_timestamp by
-		 * replication.
+		 * timestamp was already set in
+		 * replorigin_session_state.origin_timestamp by replication.
 		 *
 		 * We don't need to WAL-log anything here, as the commit record
 		 * written above already contains the data.
 		 */
 
-		if (!replorigin || replorigin_session_origin_timestamp == 0)
-			replorigin_session_origin_timestamp = GetCurrentTransactionStopTimestamp();
+		if (!replorigin || replorigin_session_state.origin_timestamp == 0)
+			replorigin_session_state.origin_timestamp = GetCurrentTransactionStopTimestamp();
 
 		TransactionTreeSetCommitTsData(xid, nchildren, children,
-									   replorigin_session_origin_timestamp,
-									   replorigin_session_origin);
+									   replorigin_session_state.origin_timestamp,
+									   replorigin_session_state.origin);
 	}
 
 	/*
@@ -1810,8 +1810,8 @@ RecordTransactionAbort(bool isSubXact)
 	 * Are we using the replication origins feature?  Or, in other words, are
 	 * we replaying remote actions?
 	 */
-	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
-				  replorigin_session_origin != DoNotReplicateId);
+	replorigin = (replorigin_session_state.origin != InvalidRepOriginId &&
+				  replorigin_session_state.origin != DoNotReplicateId);
 
 	/* Fetch the data we need for the abort record */
 	nrels = smgrGetPendingDeletes(false, &rels);
@@ -1838,7 +1838,7 @@ RecordTransactionAbort(bool isSubXact)
 
 	if (replorigin)
 		/* Move LSNs forward for this replication origin */
-		replorigin_session_advance(replorigin_session_origin_lsn,
+		replorigin_session_advance(replorigin_session_state.origin_lsn,
 								   XactLastRecEnd);
 
 	/*
@@ -5927,12 +5927,12 @@ XactLogCommitRecord(TimestampTz commit_time,
 	}
 
 	/* dump transaction origin information */
-	if (replorigin_session_origin != InvalidRepOriginId)
+	if (replorigin_session_state.origin != InvalidRepOriginId)
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
 
-		xl_origin.origin_lsn = replorigin_session_origin_lsn;
-		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
+		xl_origin.origin_lsn = replorigin_session_state.origin_lsn;
+		xl_origin.origin_timestamp = replorigin_session_state.origin_timestamp;
 	}
 
 	if (xl_xinfo.xinfo != 0)
@@ -6080,12 +6080,12 @@ XactLogAbortRecord(TimestampTz abort_time,
 	 * Dump transaction origin information. We need this during recovery to
 	 * update the replication origin progress.
 	 */
-	if (replorigin_session_origin != InvalidRepOriginId)
+	if (replorigin_session_state.origin != InvalidRepOriginId)
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
 
-		xl_origin.origin_lsn = replorigin_session_origin_lsn;
-		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
+		xl_origin.origin_lsn = replorigin_session_state.origin_lsn;
+		xl_origin.origin_timestamp = replorigin_session_state.origin_timestamp;
 	}
 
 	if (xl_xinfo.xinfo != 0)
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index a56d5a55282..ed576ae578b 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -861,11 +861,11 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 
 	/* followed by the record's origin, if any */
 	if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
-		replorigin_session_origin != InvalidRepOriginId)
+		replorigin_session_state.origin != InvalidRepOriginId)
 	{
 		*(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
-		memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
-		scratch += sizeof(replorigin_session_origin);
+		memcpy(scratch, &replorigin_session_state.origin, sizeof(replorigin_session_state.origin));
+		scratch += sizeof(replorigin_session_state.origin);
 	}
 
 	/* followed by toplevel XID, if not already included in previous record */
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index a4aafcf5b6e..15c600faa64 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -962,7 +962,7 @@ ParallelApplyWorkerMain(Datum main_arg)
 	 * origin which was already acquired by its leader process.
 	 */
 	replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
-	replorigin_session_origin = originid;
+	replorigin_session_state.origin = originid;
 	CommitTransactionCommand();
 
 	/*
@@ -1430,8 +1430,8 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data)
 	 * Update origin state so we can restart streaming from correct position
 	 * in case of crash.
 	 */
-	replorigin_session_origin_lsn = abort_data->abort_lsn;
-	replorigin_session_origin_timestamp = abort_data->abort_time;
+	replorigin_session_state.origin_lsn = abort_data->abort_lsn;
+	replorigin_session_state.origin_timestamp = abort_data->abort_time;
 
 	/*
 	 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index fc9e14ecbc5..5324da2719f 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -160,9 +160,9 @@ typedef struct ReplicationStateCtl
 } ReplicationStateCtl;
 
 /* external variables */
-RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
-XLogRecPtr	replorigin_session_origin_lsn = InvalidXLogRecPtr;
-TimestampTz replorigin_session_origin_timestamp = 0;
+RepOriginSessionState replorigin_session_state = {
+	InvalidRepOriginId, InvalidXLogRecPtr, 0
+};
 
 /*
  * Base address into a shared memory array of replication states of size
@@ -896,7 +896,7 @@ replorigin_redo(XLogReaderState *record)
  * Tell the replication origin progress machinery that a commit from 'node'
  * that originated at the LSN remote_commit on the remote node was replayed
  * successfully and that we don't need to do so again. In combination with
- * setting up replorigin_session_origin_lsn and replorigin_session_origin
+ * setting up replorigin_session_state.origin_lsn and replorigin_session_state.origin
  * that ensures we won't lose knowledge about that after a crash if the
  * transaction had a persistent effect (think of asynchronous commits).
  *
@@ -1295,10 +1295,10 @@ replorigin_session_get_progress(bool flush)
 void
 replorigin_session_clear_state(bool xact_only)
 {
-	replorigin_session_origin_lsn = InvalidXLogRecPtr;
-	replorigin_session_origin_timestamp = 0;
+	replorigin_session_state.origin_lsn = InvalidXLogRecPtr;
+	replorigin_session_state.origin_timestamp = 0;
 	if (!xact_only)
-		replorigin_session_origin = InvalidRepOriginId;
+		replorigin_session_state.origin = InvalidRepOriginId;
 }
 
 
@@ -1408,7 +1408,7 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
 	pid = PG_GETARG_INT32(1);
 	replorigin_session_setup(origin, pid);
 
-	replorigin_session_origin = origin;
+	replorigin_session_state.origin = origin;
 
 	pfree(name);
 
@@ -1438,7 +1438,7 @@ pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
 {
 	replorigin_check_prerequisites(false, false);
 
-	PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
+	PG_RETURN_BOOL(replorigin_session_state.origin != InvalidRepOriginId);
 }
 
 
@@ -1482,8 +1482,8 @@ pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("no replication origin is configured")));
 
-	replorigin_session_origin_lsn = location;
-	replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
+	replorigin_session_state.origin_lsn = location;
+	replorigin_session_state.origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
 
 	PG_RETURN_VOID();
 }
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 47104ed676c..61952e7d995 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1318,7 +1318,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		 */
 		originid = replorigin_by_name(originname, false);
 		replorigin_session_setup(originid, 0);
-		replorigin_session_origin = originid;
+		replorigin_session_state.origin = originid;
 		*origin_startpos = replorigin_session_get_progress(false);
 
 		CommitTransactionCommand();
@@ -1405,7 +1405,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
 
 	replorigin_session_setup(originid, 0);
-	replorigin_session_origin = originid;
+	replorigin_session_state.origin = originid;
 
 	/*
 	 * If the user did not opt to run as the owner of the subscription
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 4df177664b7..135c65e56bd 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1318,8 +1318,8 @@ apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
 	 * Update origin state so we can restart streaming from correct position
 	 * in case of crash.
 	 */
-	replorigin_session_origin_lsn = prepare_data->end_lsn;
-	replorigin_session_origin_timestamp = prepare_data->prepare_time;
+	replorigin_session_state.origin_lsn = prepare_data->end_lsn;
+	replorigin_session_state.origin_timestamp = prepare_data->prepare_time;
 
 	PrepareTransactionBlock(gid);
 }
@@ -1421,8 +1421,8 @@ apply_handle_commit_prepared(StringInfo s)
 	 * Update origin state so we can restart streaming from correct position
 	 * in case of crash.
 	 */
-	replorigin_session_origin_lsn = prepare_data.end_lsn;
-	replorigin_session_origin_timestamp = prepare_data.commit_time;
+	replorigin_session_state.origin_lsn = prepare_data.end_lsn;
+	replorigin_session_state.origin_timestamp = prepare_data.commit_time;
 
 	FinishPreparedTransaction(gid, true);
 	end_replication_step();
@@ -1479,8 +1479,8 @@ apply_handle_rollback_prepared(StringInfo s)
 		 * Update origin state so we can restart streaming from correct
 		 * position in case of crash.
 		 */
-		replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
-		replorigin_session_origin_timestamp = rollback_data.rollback_time;
+		replorigin_session_state.origin_lsn = rollback_data.rollback_end_lsn;
+		replorigin_session_state.origin_timestamp = rollback_data.rollback_time;
 
 		/* There is no transaction when ABORT/ROLLBACK PREPARED is called */
 		begin_replication_step();
@@ -2526,8 +2526,8 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data)
 		 * Update origin state so we can restart streaming from correct
 		 * position in case of crash.
 		 */
-		replorigin_session_origin_lsn = commit_data->end_lsn;
-		replorigin_session_origin_timestamp = commit_data->committime;
+		replorigin_session_state.origin_lsn = commit_data->end_lsn;
+		replorigin_session_state.origin_timestamp = commit_data->committime;
 
 		CommitTransactionCommand();
 
@@ -2940,7 +2940,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 		 */
 		if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
 									&conflicttuple.origin, &conflicttuple.ts) &&
-			conflicttuple.origin != replorigin_session_origin)
+			conflicttuple.origin != replorigin_session_state.origin)
 		{
 			TupleTableSlot *newslot;
 
@@ -2982,7 +2982,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 									   &conflicttuple.xmin,
 									   &conflicttuple.origin,
 									   &conflicttuple.ts) &&
-			conflicttuple.origin != replorigin_session_origin)
+			conflicttuple.origin != replorigin_session_state.origin)
 			type = CT_UPDATE_DELETED;
 		else
 			type = CT_UPDATE_MISSING;
@@ -3135,7 +3135,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 		 */
 		if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
 									&conflicttuple.origin, &conflicttuple.ts) &&
-			conflicttuple.origin != replorigin_session_origin)
+			conflicttuple.origin != replorigin_session_state.origin)
 		{
 			conflicttuple.slot = localslot;
 			ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
@@ -3477,7 +3477,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 												   &conflicttuple.xmin,
 												   &conflicttuple.origin,
 												   &conflicttuple.ts) &&
-						conflicttuple.origin != replorigin_session_origin)
+						conflicttuple.origin != replorigin_session_state.origin)
 						type = CT_UPDATE_DELETED;
 					else
 						type = CT_UPDATE_MISSING;
@@ -3503,7 +3503,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 				if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
 											&conflicttuple.origin,
 											&conflicttuple.ts) &&
-					conflicttuple.origin != replorigin_session_origin)
+					conflicttuple.origin != replorigin_session_state.origin)
 				{
 					TupleTableSlot *newslot;
 
@@ -5652,7 +5652,7 @@ run_apply_worker(void)
 	if (!OidIsValid(originid))
 		originid = replorigin_create(originname);
 	replorigin_session_setup(originid, 0);
-	replorigin_session_origin = originid;
+	replorigin_session_state.origin = originid;
 	origin_startpos = replorigin_session_get_progress(false);
 	CommitTransactionCommand();
 
diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h
index ab34ef97c46..8eace3a16db 100644
--- a/src/include/replication/origin.h
+++ b/src/include/replication/origin.h
@@ -40,9 +40,14 @@ typedef struct xl_replorigin_drop
  */
 #define MAX_RONAME_LEN	512
 
-extern PGDLLIMPORT RepOriginId replorigin_session_origin;
-extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn;
-extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp;
+typedef struct RepOriginSessionState
+{
+	RepOriginId origin;
+	XLogRecPtr	origin_lsn;
+	TimestampTz origin_timestamp;
+}			RepOriginSessionState;
+
+extern PGDLLIMPORT RepOriginSessionState replorigin_session_state;
 
 /* GUCs */
 extern PGDLLIMPORT int max_active_replication_origins;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index ceb3fc5d980..92a6567d4f1 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2569,6 +2569,7 @@ ReorderBufferTupleCidKey
 ReorderBufferUpdateProgressTxnCB
 ReorderTuple
 RepOriginId
+RepOriginSessionState
 ReparameterizeForeignPathByChild_function
 ReplaceVarsFromTargetList_context
 ReplaceVarsNoMatchOption
-- 
2.39.5 (Apple Git-154)

