From 2cbf3265c1a35a8ba04d3d0b37fa68ffb8e2c25d Mon Sep 17 00:00:00 2001
From: "Chao Li (Evan)" <lic@highgo.com>
Date: Thu, 28 May 2026 16:52:32 +0800
Subject: [PATCH v1] Fix race during concurrent logical decoding activation

Concurrent logical slot creation can let two backends enter logical decoding
activation at the same time. If one backend is interrupted after enabling
logical WAL logging but before marking logical decoding enabled, another
backend can complete activation meanwhile. The interrupted backend then runs
abort_logical_decoding_activation(), whose cleanup path assumes logical
decoding has not been enabled yet. In assertion builds this can trip the
!LogicalDecodingCtl->logical_decoding_enabled assertion; in non-assertion
builds it can incorrectly clear xlog_logical_info after another backend has
completed activation.

Serialize logical decoding activation with an activation_in_progress flag
protected by LogicalDecodingControlLock, and use a condition variable to wait
for an in-progress activation to finish. The cleanup path clears the flag and
wakes waiters if the owning backend is interrupted.

Author: Chao Li <lic@highgo.com>
---
 src/backend/replication/logical/logicalctl.c  | 116 ++++++++++++++++--
 .../utils/activity/wait_event_names.txt       |   1 +
 2 files changed, 105 insertions(+), 12 deletions(-)

diff --git a/src/backend/replication/logical/logicalctl.c b/src/backend/replication/logical/logicalctl.c
index 72f68ec58ef..cd5835cd876 100644
--- a/src/backend/replication/logical/logicalctl.c
+++ b/src/backend/replication/logical/logicalctl.c
@@ -67,6 +67,7 @@
 #include "catalog/pg_control.h"
 #include "miscadmin.h"
 #include "replication/slot.h"
+#include "storage/condition_variable.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
@@ -74,6 +75,7 @@
 #include "storage/procsignal.h"
 #include "storage/subsystems.h"
 #include "utils/injection_point.h"
+#include "utils/wait_event.h"
 
 /*
  * Struct for controlling the logical decoding status.
@@ -95,13 +97,21 @@ typedef struct LogicalDecodingCtlData
 
 	/* True if logical decoding might need to be disabled */
 	bool		pending_disable;
+
+	/* True if a backend is currently enabling logical decoding */
+	bool		activation_in_progress;
+
+	/* Signaled when activation_in_progress changes */
+	ConditionVariable activation_cv;
 } LogicalDecodingCtlData;
 
 static LogicalDecodingCtlData *LogicalDecodingCtl = NULL;
 
 static void LogicalDecodingCtlShmemRequest(void *arg);
+static void LogicalDecodingCtlShmemInit(void *arg);
 
 const ShmemCallbacks LogicalDecodingCtlShmemCallbacks = {
+	.init_fn = LogicalDecodingCtlShmemInit,
 	.request_fn = LogicalDecodingCtlShmemRequest,
 };
 
@@ -124,6 +134,8 @@ bool		XLogLogicalInfo = false;
 static bool XLogLogicalInfoUpdatePending = false;
 
 static void update_xlog_logical_info(void);
+static void begin_logical_decoding_activation(bool *activation_started);
+static void finish_logical_decoding_activation(bool *activation_started);
 static void abort_logical_decoding_activation(int code, Datum arg);
 static void write_logical_decoding_status_update_record(bool status);
 
@@ -136,6 +148,12 @@ LogicalDecodingCtlShmemRequest(void *arg)
 		);
 }
 
+static void
+LogicalDecodingCtlShmemInit(void *arg)
+{
+	ConditionVariableInit(&LogicalDecodingCtl->activation_cv);
+}
+
 /*
  * Initialize the logical decoding status in shmem at server startup. This
  * must be called ONCE during postmaster or standalone-backend startup.
@@ -262,19 +280,32 @@ write_logical_decoding_status_update_record(bool status)
 static void
 abort_logical_decoding_activation(int code, Datum arg)
 {
-	Assert(MyReplicationSlot);
-	Assert(!LogicalDecodingCtl->logical_decoding_enabled);
+	bool	   *activation_started = (bool *) DatumGetPointer(arg);
+	bool		need_update = false;
+
+	if (!*activation_started)
+		return;
 
 	elog(DEBUG1, "aborting logical decoding activation process");
 
 	/*
-	 * Abort the change to xlog_logical_info. We don't need to check
-	 * CheckLogicalSlotExists() as we're still holding a logical slot.
+	 * Abort the change to xlog_logical_info, unless the activation has
+	 * already completed.  The activation flag prevents another backend from
+	 * starting its own activation attempt while this backend is in process of
+	 * enabling logical decoding.
 	 */
 	LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
-	LogicalDecodingCtl->xlog_logical_info = false;
+	if (!LogicalDecodingCtl->logical_decoding_enabled)
+	{
+		LogicalDecodingCtl->xlog_logical_info = false;
+		need_update = true;
+	}
+	LogicalDecodingCtl->activation_in_progress = false;
 	LWLockRelease(LogicalDecodingControlLock);
 
+	*activation_started = false;
+	ConditionVariableBroadcast(&LogicalDecodingCtl->activation_cv);
+
 	/*
 	 * Some processes might have already started logical info WAL logging, so
 	 * tell all running processes to update their caches. We don't need to
@@ -282,7 +313,55 @@ abort_logical_decoding_activation(int code, Datum arg)
 	 * always safe to write logical information to WAL records, even when not
 	 * strictly required.
 	 */
-	EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO);
+	if (need_update)
+		EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO);
+}
+
+/*
+ * Wait for any concurrent activation to complete, then mark this backend as
+ * the one currently enabling logical decoding.  If activation completed while
+ * we were waiting, this still marks activation_in_progress so that the caller
+ * can run EnableLogicalDecoding(), which will recheck and return quickly.
+ */
+static void
+begin_logical_decoding_activation(bool *activation_started)
+{
+	Assert(!*activation_started);
+
+	LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
+
+	while (LogicalDecodingCtl->activation_in_progress)
+	{
+		ConditionVariablePrepareToSleep(&LogicalDecodingCtl->activation_cv);
+		LWLockRelease(LogicalDecodingControlLock);
+
+		ConditionVariableSleep(&LogicalDecodingCtl->activation_cv,
+							   WAIT_EVENT_LOGICAL_DECODING_ACTIVATION);
+
+		LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
+	}
+	ConditionVariableCancelSleep();
+
+	LogicalDecodingCtl->activation_in_progress = true;
+	*activation_started = true;
+
+	LWLockRelease(LogicalDecodingControlLock);
+}
+
+/*
+ * Finish the activation attempt and wake any backend waiting to activate.
+ */
+static void
+finish_logical_decoding_activation(bool *activation_started)
+{
+	Assert(*activation_started);
+
+	LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
+	LogicalDecodingCtl->activation_in_progress = false;
+	LWLockRelease(LogicalDecodingControlLock);
+
+	*activation_started = false;
+	ConditionVariableBroadcast(&LogicalDecodingCtl->activation_cv);
 }
 
 /*
@@ -294,14 +373,15 @@ abort_logical_decoding_activation(int code, Datum arg)
  * The caller should use CheckLogicalDecodingRequirements() before calling this
  * function to make sure that the logical decoding status can be modified.
  *
- * Note that there is no interlock between logical decoding activation
- * and slot creation. To ensure enabling logical decoding, the caller
- * needs to call this function after creating a logical slot before
- * initializing the logical decoding context.
+ * Concurrent logical decoding activations are serialized here. To ensure
+ * enabling logical decoding, the caller needs to call this function after
+ * creating a logical slot before initializing the logical decoding context.
  */
 void
 EnsureLogicalDecodingEnabled(void)
 {
+	bool		activation_started = false;
+
 	Assert(MyReplicationSlot);
 	Assert(wal_level >= WAL_LEVEL_REPLICA);
 
@@ -324,11 +404,23 @@ EnsureLogicalDecodingEnabled(void)
 	 * Ensure to abort the activation process in cases where there in an
 	 * interruption during the wait.
 	 */
-	PG_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation, (Datum) 0);
+	PG_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation,
+							PointerGetDatum(&activation_started));
 	{
+		begin_logical_decoding_activation(&activation_started);
+
+		/*
+		 * Another backend may have completed activation while we waited for
+		 * the activation flag. EnableLogicalDecoding() rechecks the enabled
+		 * state while holding LogicalDecodingControlLock, so it is safe to
+		 * call unconditionally here.
+		 */
 		EnableLogicalDecoding();
+
+		finish_logical_decoding_activation(&activation_started);
 	}
-	PG_END_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation, (Datum) 0);
+	PG_END_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation,
+								PointerGetDatum(&activation_started));
 }
 
 /*
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 560659f9568..2e59fafccb4 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -138,6 +138,7 @@ HASH_GROW_BUCKETS_ELECT	"Waiting to elect a Parallel Hash participant to allocat
 HASH_GROW_BUCKETS_REALLOCATE	"Waiting for an elected Parallel Hash participant to finish allocating more buckets."
 HASH_GROW_BUCKETS_REINSERT	"Waiting for other Parallel Hash participants to finish inserting tuples into new buckets."
 LOGICAL_APPLY_SEND_DATA	"Waiting for a logical replication leader apply process to send data to a parallel apply process."
+LOGICAL_DECODING_ACTIVATION	"Waiting for another backend to finish enabling logical decoding."
 LOGICAL_PARALLEL_APPLY_STATE_CHANGE	"Waiting for a logical replication parallel apply process to change state."
 LOGICAL_SYNC_DATA	"Waiting for a logical replication remote server to send data for initial table synchronization."
 LOGICAL_SYNC_STATE_CHANGE	"Waiting for a logical replication remote server to change state."
-- 
2.50.1 (Apple Git-155)

