From 601cbb02265a5373d298e6803715d30c2370a111 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Wed, 17 Sep 2025 18:15:33 +0900
Subject: [PATCH v8 3/3] Avoid setting ReplicationState in case of ERROR

---
 contrib/test_decoding/expected/replorigin.out |  3 ++
 contrib/test_decoding/sql/replorigin.sql      |  3 ++
 src/backend/replication/logical/origin.c      | 31 +++++++++++++------
 3 files changed, 27 insertions(+), 10 deletions(-)

diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index c85e1a01b23..4f64ea8942f 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -41,6 +41,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 ERROR:  duplicate key value violates unique constraint "pg_replication_origin_roname_index"
 DETAIL:  Key (roname)=(regress_test_decoding: regression_slot) already exists.
+-- ensure session setup with invalid pid fail
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+ERROR:  could not find replication state slot for replication origin with OID 1 which was acquired by -1
 --ensure deletions work (once)
 SELECT pg_replication_origin_create('regress_test_decoding: temp');
  pg_replication_origin_create 
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index e71ee02d050..d899d5cdc18 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -26,6 +26,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 -- ensure duplicate creations fail
 SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 
+-- ensure session setup with invalid pid fail
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+
 --ensure deletions work (once)
 SELECT pg_replication_origin_create('regress_test_decoding: temp');
 SELECT pg_replication_origin_drop('regress_test_decoding: temp');
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 98d47e1beb8..0bbc96bcee5 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1122,6 +1122,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 	static bool registered_cleanup;
 	int			i;
 	int			free_slot = -1;
+	ReplicationState *candidate_state = NULL;
+	bool		initialized = false;
 
 	if (!registered_cleanup)
 	{
@@ -1168,34 +1170,43 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 		}
 
 		/* ok, found slot */
-		session_replication_state = curstate;
+		candidate_state = curstate;
 		break;
 	}
 
 
-	if (session_replication_state == NULL && free_slot == -1)
+	if (candidate_state == NULL && free_slot == -1)
 		ereport(ERROR,
 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
 				 errmsg("could not find free replication state slot for replication origin with ID %d",
 						node),
 				 errhint("Increase \"max_active_replication_origins\" and try again.")));
-	else if (session_replication_state == NULL)
+	else if (candidate_state == NULL)
 	{
 		/* initialize new slot */
-		session_replication_state = &replication_states[free_slot];
-		Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
-		Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
-		session_replication_state->roident = node;
+		candidate_state = &replication_states[free_slot];
+		Assert(candidate_state->remote_lsn == InvalidXLogRecPtr);
+		Assert(candidate_state->local_lsn == InvalidXLogRecPtr);
+		candidate_state->roident = node;
+		initialized = true;
 	}
 
 
-	Assert(session_replication_state->roident != InvalidRepOriginId);
+	Assert(candidate_state->roident != InvalidRepOriginId);
 
 	if (acquired_by == 0)
-		session_replication_state->acquired_by = MyProcPid;
-	else if (session_replication_state->acquired_by != acquired_by)
+		candidate_state->acquired_by = MyProcPid;
+	else if (candidate_state->acquired_by != acquired_by)
+	{
+		if (initialized)
+			candidate_state->roident = InvalidRepOriginId;
+
 		elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
 			 node, acquired_by);
+	}
+
+	/* Candidate slot looks ok, use it */
+	session_replication_state = candidate_state;
 
 	LWLockRelease(ReplicationOriginLock);
 
-- 
2.47.3

