diff --git a/contrib/test_decoding/t/002_online_activation.pl b/contrib/test_decoding/t/002_online_activation.pl
new file mode 100644
index 00000000000..08512a05ae6
--- /dev/null
+++ b/contrib/test_decoding/t/002_online_activation.pl
@@ -0,0 +1,87 @@
+
+# Copyright (c) 2021-2024, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize publisher node
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init(allows_streaming => 1);
+$node->append_conf('postgresql.conf', 'synchronous_commit = on');
+$node->start;
+
+my ($ret, $stdout, $stderr);
+$ret = $node->safe_psql('postgres', 'show wal_level');
+print $ret;
+
+$ret = $node->safe_psql('postgres', 'SELECT pg_get_logical_decoding_status()');
+is($ret, 'disabled', 'logical decoding must be disabled');
+
+# check if a logical replication slot cannot be craeted while logical
+# decoding is disabled.
+($ret, $stdout, $stderr) = $node->psql(
+    'postgres',
+    q[SELECT pg_create_logical_replication_slot('regression_slot1', 'test_decoding')]
+    );
+ok( $stderr =~ /ERROR:  logical decoding requires "wal_level" >= "logical"/,
+    "logical replication slot cannot be created while logical decoding is disableed");
+
+# Activate logical info logging.
+$node->safe_psql('postgres',
+		 'SELECT pg_activate_logical_decoding()');
+$ret = $node->safe_psql('postgres', 'SELECT pg_get_logical_decoding_status()');
+is($ret, 'ready', 'logical decoding gets activated');
+
+# Now we can create a logical replication slot.
+$ret = $node->safe_psql(
+    'postgres',
+    q[SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding')]
+    );
+is($ret, 'init', 'successfully create logical replication slot');
+
+# Cannot deactivate logical decoding while having valid slots.
+($ret, $stdout, $stderr) = $node->psql(
+    'postgres',
+    q[SELECT pg_deactivate_logical_decoding()]
+    );
+ok( $stderr =~ /ERROR:  cannot deactivate logical decoding while having valid logical replication slots/,
+    "cannot deactivate logical decoding while having valid logical slots");
+
+$node->safe_psql(
+    'postgres',
+    q[
+CREATE TABLE test (a int primary key, b int);
+INSERT INTO test values (1, 100), (2, 200);
+UPDATE test SET b = b + 10 WHERE a = 1;
+    ]);
+
+$ret = $node->safe_psql(
+    'postgres',
+    q[SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+    );
+is($ret, 'BEGIN
+table public.test: INSERT: a[integer]:1 b[integer]:100
+table public.test: INSERT: a[integer]:2 b[integer]:200
+COMMIT
+BEGIN
+table public.test: UPDATE: a[integer]:1 b[integer]:110
+COMMIT', 'logical decoding works fine');
+
+# Drop the replication slot.
+$node->safe_psql(
+    'postgres',
+    q[SELECT pg_drop_replication_slot('regression_slot1')]);
+
+# Deactivate logical decoding.
+$node->safe_psql(
+    'postgres',
+    q[SELECT pg_deactivate_logical_decoding()]);
+
+$ret = $node->safe_psql('postgres', 'SELECT pg_get_logical_decoding_status()');
+is($ret, 'disabled', 'logical decoding gets activated');
+
+done_testing();
+
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 6cdc68d981a..66c62745718 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8894,8 +8894,8 @@ log_heap_update(Relation reln, Buffer oldbuf,
 /*
  * Perform XLogInsert of an XLOG_HEAP2_NEW_CID record
  *
- * This is only used in wal_level >= WAL_LEVEL_LOGICAL, and only for catalog
- * tuples.
+ * This is only used when XLogLogicalInfoActive() is true, and only for
+ * catalog tuples.
  */
 static XLogRecPtr
 log_heap_new_cid(Relation relation, HeapTuple tup)
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index 363294d6234..8c5be07aa37 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -167,6 +167,13 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
 		memcpy(&wal_level, rec, sizeof(int));
 		appendStringInfo(buf, "wal_level %s", get_wal_level_string(wal_level));
 	}
+	else if (info == XLOG_LOGICAL_DECODING_STATUS)
+	{
+		bool		logical_decoding;
+
+		memcpy(&logical_decoding, rec, sizeof(bool));
+		appendStringInfoString(buf, logical_decoding ? "true" : "false");
+	}
 }
 
 const char *
@@ -218,6 +225,9 @@ xlog_identify(uint8 info)
 		case XLOG_CHECKPOINT_REDO:
 			id = "CHECKPOINT_REDO";
 			break;
+		case XLOG_LOGICAL_DECODING_STATUS:
+			id = "LOGICAL_DECODING_STATUS";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6f58412bcab..649bde82095 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -78,6 +78,7 @@
 #include "postmaster/walsummarizer.h"
 #include "postmaster/walwriter.h"
 #include "replication/origin.h"
+#include "replication/logicalxlog.h"
 #include "replication/slot.h"
 #include "replication/snapbuild.h"
 #include "replication/walreceiver.h"
@@ -5078,6 +5079,7 @@ BootStrapXLOG(uint32 data_checksum_version)
 	checkPoint.ThisTimeLineID = BootstrapTimeLineID;
 	checkPoint.PrevTimeLineID = BootstrapTimeLineID;
 	checkPoint.fullPageWrites = fullPageWrites;
+	checkPoint.logicalDecoding = false;
 	checkPoint.wal_level = wal_level;
 	checkPoint.nextXid =
 		FullTransactionIdFromEpochAndXid(0, FirstNormalTransactionId);
@@ -5594,6 +5596,12 @@ StartupXLOG(void)
 	 */
 	RelationCacheInitFileRemove();
 
+	/*
+	 * Initialize logical decoding status, before initializing replication
+	 * slots.
+	 */
+	StartupLogicalDecodingStatus(checkPoint.logicalDecoding);
+
 	/*
 	 * Initialize replication slots, before there's a chance to remove
 	 * required resources.
@@ -7002,6 +7010,7 @@ CreateCheckPoint(int flags)
 
 	checkPoint.fullPageWrites = Insert->fullPageWrites;
 	checkPoint.wal_level = wal_level;
+	checkPoint.logicalDecoding = IsLogicalDecodingActive();
 
 	if (shutdown)
 	{
@@ -8583,6 +8592,25 @@ xlog_redo(XLogReaderState *record)
 	{
 		/* nothing to do here, just for informational purposes */
 	}
+	else if (info == XLOG_LOGICAL_DECODING_STATUS)
+	{
+		bool		enabled;
+
+		memcpy(&enabled, XLogRecGetData(record), sizeof(bool));
+
+		/*
+		 * Invalidate logical slots if we are in hot standby and the primary
+		 * disables the logical decoding.
+		 */
+		if (InRecovery && InHotStandby &&
+			IsLogicalDecodingActive() &&
+			!enabled)
+			InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL,
+											   0, InvalidOid,
+											   InvalidTransactionId);
+
+		UpdateLogicalDecodingStatus(enabled);
+	}
 }
 
 /*
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 5050057a7e4..85953ba2588 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -898,11 +898,11 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 
 	InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
 
-	if (wal_level != WAL_LEVEL_LOGICAL)
+	if (!XLogLogicalInfoActive())
 		ereport(WARNING,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("\"wal_level\" is insufficient to publish logical changes"),
-				 errhint("Set \"wal_level\" to \"logical\" before creating subscriptions.")));
+				 errmsg("logical decoding needs to be enabled to publish logical changes"),
+				 errhint("Set \"wal_level\" to \"logical\" or activate logical decoding before creating subscriptions.")));
 
 	return myself;
 }
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 1e08bbbd4eb..95bcf25a02b 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -21,6 +21,7 @@ OBJS = \
 	launcher.o \
 	logical.o \
 	logicalfuncs.o \
+	logicalxlog.o \
 	message.o \
 	origin.o \
 	proto.o \
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 4dc14fdb495..e85d26aac14 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -35,6 +35,7 @@
 #include "pgstat.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
+#include "replication/logicalxlog.h"
 #include "replication/reorderbuffer.h"
 #include "replication/slotsync.h"
 #include "replication/snapbuild.h"
@@ -115,7 +116,7 @@ CheckLogicalDecodingRequirements(void)
 	 * needs the same check.
 	 */
 
-	if (wal_level < WAL_LEVEL_LOGICAL)
+	if (!XLogLogicalInfoActive())
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("logical decoding requires \"wal_level\" >= \"logical\"")));
@@ -138,7 +139,7 @@ CheckLogicalDecodingRequirements(void)
 		if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary")));
+					 errmsg("logical decoding on standby requires to enable logical decoding on the primary")));
 	}
 }
 
diff --git a/src/backend/replication/logical/logicalxlog.c b/src/backend/replication/logical/logicalxlog.c
new file mode 100644
index 00000000000..d409efffc28
--- /dev/null
+++ b/src/backend/replication/logical/logicalxlog.c
@@ -0,0 +1,379 @@
+/*-------------------------------------------------------------------------
+ * logicalxlog.c
+ *	   This module contains the codes for enabling or disabling to include
+ *	   logical information into WAL records and logical decoding.
+ *
+ * Copyright (c) 2012-2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/logicalxlog.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/transam.h"
+#include "access/xloginsert.h"
+#include "access/xlogrecovery.h"
+#include "catalog/pg_control.h"
+#include "fmgr.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "replication/logicalxlog.h"
+#include "replication/slot.h"
+#include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "storage/procarray.h"
+#include "storage/procsignal.h"
+#include "utils/builtins.h"
+#include "utils/wait_event.h"
+
+typedef struct XLogLogicalInfoCtlData
+{
+	LogicalDecodingStatus status;
+	slock_t		mutex;
+	ConditionVariable cv;
+}			XLogLogicalInfoCtlData;
+XLogLogicalInfoCtlData *XLogLogicalInfoCtl = NULL;
+
+static void logical_decoding_activation_abort_callback(int code, Datum arg);
+static void do_activate_logical_decoding(void);
+
+/*
+ * Initialization of shared memory.
+ */
+
+Size
+LogicalXlogShmemSize(void)
+{
+	return sizeof(XLogLogicalInfoCtlData);
+}
+
+void
+LogicalXlogShmemInit(void)
+{
+	bool		found;
+
+	XLogLogicalInfoCtl = ShmemInitStruct("Logical Info Logging",
+										 sizeof(XLogLogicalInfoCtlData),
+										 &found);
+
+	if (!found)
+	{
+		XLogLogicalInfoCtl->status = LOGICAL_DECODING_STATUS_DISABLED;
+		SpinLockInit(&XLogLogicalInfoCtl->mutex);
+		ConditionVariableInit(&XLogLogicalInfoCtl->cv);
+	}
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend startup,
+ * before calling StartupReplicationSlots().
+ */
+void
+StartupLogicalDecodingStatus(bool enabled_at_last_checkpoint)
+{
+	LogicalDecodingStatus status;
+
+	if (enabled_at_last_checkpoint)
+		status = LOGICAL_DECODING_STATUS_READY;
+	else
+		status = LOGICAL_DECODING_STATUS_DISABLED;
+
+	/*
+	 * On standbys, we always start with the status in the last checkpoint
+	 * record. If changes of wal_level or logical decoding status is sent from
+	 * the primary, we will enable or disable the logical decoding while
+	 * replaying the WAL record and invalidate slots if necessary.
+	 */
+	if (!StandbyMode)
+	{
+		/*
+		 * Disable logical decoding if replication slots are not available.
+		 */
+		if (max_replication_slots == 0)
+			status = LOGICAL_DECODING_STATUS_DISABLED;
+
+		/*
+		 * If previously the logical decoding was active but the server
+		 * restarted with wal_level < 'replica', we disable the logical
+		 * decoding.
+		 */
+		if (wal_level < WAL_LEVEL_REPLICA)
+			status = LOGICAL_DECODING_STATUS_DISABLED;
+
+		/*
+		 * Setting wal_level to 'logical' immediately enables logical decoding
+		 * and WAL-logging logical info.
+		 */
+		if (wal_level >= WAL_LEVEL_LOGICAL)
+			status = LOGICAL_DECODING_STATUS_READY;
+	}
+
+	XLogLogicalInfoCtl->status = status;
+}
+
+void
+UpdateLogicalDecodingStatus(bool activate)
+{
+	XLogLogicalInfoCtl->status = activate
+		? LOGICAL_DECODING_STATUS_READY
+		: LOGICAL_DECODING_STATUS_DISABLED;
+}
+
+/*
+ * Is WAL-Logging logical info enabled?
+ */
+bool inline
+XLogLogicalInfoEnabled(void)
+{
+	/*
+	 * Use volatile pointer to make sure we make a fresh read of the shared
+	 * variable.
+	 */
+	volatile	XLogLogicalInfoCtlData *ctl = XLogLogicalInfoCtl;
+
+	return ctl->status >= LOGICAL_DECODING_STATUS_XLOG_LOGICALINFO;
+}
+
+/*
+ * Is logical decoding active?
+ */
+bool inline
+IsLogicalDecodingActive(void)
+{
+	/*
+	 * Use volatile pointer to make sure we make a fresh read of the shared
+	 * variable.
+	 */
+	volatile	XLogLogicalInfoCtlData *ctl = XLogLogicalInfoCtl;
+
+	return ctl->status == LOGICAL_DECODING_STATUS_READY;
+}
+
+static void
+do_activate_logical_decoding(void)
+{
+	bool		recoveryInProgress = RecoveryInProgress();
+
+	/*
+	 * Get the latest status of logical info logging. If it's already
+	 * activated, quick return.
+	 */
+	SpinLockAcquire(&XLogLogicalInfoCtl->mutex);
+	if (XLogLogicalInfoCtl->status >= LOGICAL_DECODING_STATUS_XLOG_LOGICALINFO)
+	{
+		SpinLockRelease(&XLogLogicalInfoCtl->mutex);
+		return;
+	}
+
+	/* Activate the logical info logging */
+	XLogLogicalInfoCtl->status = LOGICAL_DECODING_STATUS_XLOG_LOGICALINFO;
+	SpinLockRelease(&XLogLogicalInfoCtl->mutex);
+
+	PG_ENSURE_ERROR_CLEANUP(logical_decoding_activation_abort_callback, 0);
+	{
+		RunningTransactions running_xacts;
+
+		running_xacts = GetRunningTransactionData();
+
+		/*
+		 * GetRunningTransactionData() acquired both ProcArrayLock and
+		 * XidGenLock, we must release them.
+		 */
+		LWLockRelease(ProcArrayLock);
+		LWLockRelease(XidGenLock);
+
+		for (int i = 0; i < running_xacts->xcnt; i++)
+		{
+			TransactionId xid = running_xacts->xids[i];
+
+			/*
+			 * Upper layers should prevent that we ever need to wait on
+			 * ourselves. Check anyway, since failing to do so would either
+			 * result in an endless wait or an Assert() failure.
+			 */
+			if (TransactionIdIsCurrentTransactionId(xid))
+				elog(ERROR, "waiting for ourselves");
+
+			XactLockTableWait(xid, NULL, NULL, XLTW_None);
+		}
+	}
+	PG_END_ENSURE_ERROR_CLEANUP(logical_decoding_activation_abort_callback, 0);
+
+	START_CRIT_SECTION();
+
+	/* Activate logical decoding on the database cluster */
+	SpinLockAcquire(&XLogLogicalInfoCtl->mutex);
+	XLogLogicalInfoCtl->status = LOGICAL_DECODING_STATUS_READY;
+	SpinLockRelease(&XLogLogicalInfoCtl->mutex);
+
+	if (XLogStandbyInfoActive() && !recoveryInProgress)
+	{
+		bool		enabled = true;
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) (&enabled), sizeof(bool));
+
+		XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS);
+	}
+
+	END_CRIT_SECTION();
+
+	/* Let everybody know we've activated the logical decoding */
+	ConditionVariableBroadcast(&XLogLogicalInfoCtl->cv);
+}
+
+static void
+logical_decoding_activation_abort_callback(int code, Datum arg)
+{
+	SpinLockAcquire(&XLogLogicalInfoCtl->mutex);
+	XLogLogicalInfoCtl->status = LOGICAL_DECODING_STATUS_DISABLED;
+	SpinLockRelease(&XLogLogicalInfoCtl->mutex);
+
+	/* Let everybody know we failed to activate the logical decoding */
+	ConditionVariableBroadcast(&XLogLogicalInfoCtl->cv);
+}
+
+Datum
+pg_activate_logical_decoding(PG_FUNCTION_ARGS)
+{
+	LogicalDecodingStatus status;
+
+	if (max_replication_slots == 0)
+		ereport(LOG,
+				(errmsg("logical decoding can only be activated if \"max_replication_slots\" > 0")));
+
+	if (wal_level < WAL_LEVEL_REPLICA)
+		ereport(LOG,
+				(errmsg("logical decoding can only be activated if \"wal_level\" >= \"replica\"")));
+
+	if (IsTransactionState() &&
+		GetTopTransactionIdIfAny() != InvalidTransactionId)
+		ereport(ERROR,
+				(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
+				 errmsg("cannot activate logical decoding in transaction that has performed writes")));
+
+
+	SpinLockAcquire(&(XLogLogicalInfoCtl->mutex));
+	status = XLogLogicalInfoCtl->status;
+	SpinLockRelease(&(XLogLogicalInfoCtl->mutex));
+
+	if (status == LOGICAL_DECODING_STATUS_READY)
+		PG_RETURN_VOID();
+
+	/*
+	 * Get ready to sleep until the logical decoding gets activated. We may
+	 * end up not sleeping, but we don't want to do this while holding the
+	 * spinlock.
+	 */
+	ConditionVariablePrepareToSleep(&XLogLogicalInfoCtl->cv);
+
+	for (;;)
+	{
+		SpinLockAcquire(&XLogLogicalInfoCtl->mutex);
+		status = XLogLogicalInfoCtl->status;
+		SpinLockRelease(&XLogLogicalInfoCtl->mutex);
+
+		/* Return if the logical decoding is activated */
+		if (status == LOGICAL_DECODING_STATUS_READY)
+			break;
+
+		if (status == LOGICAL_DECODING_STATUS_XLOG_LOGICALINFO)
+		{
+			/*
+			 * Someone has already started the activation process. Wait for
+			 * the activation to complete and check the status again.
+			 */
+			ConditionVariableSleep(&XLogLogicalInfoCtl->cv,
+								   WAIT_EVENT_LOGICAL_DECODING_ACTIVATION);
+
+			continue;
+		}
+
+		/* Activate logical decoding */
+		do_activate_logical_decoding();
+	}
+
+	ConditionVariableCancelSleep();
+
+	PG_RETURN_VOID();
+}
+
+Datum
+pg_deactivate_logical_decoding(PG_FUNCTION_ARGS)
+{
+	int			valid_logical_slots = 0;
+	bool		recoveryInProgress = RecoveryInProgress();
+
+	/*
+	 * Hold ReplicationSlotAllocationLock to prevent slots from newly
+	 * being created while deactivating the logical decoding.
+	 */
+	LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
+
+	for (int i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		if (!s->in_use)
+			continue;
+
+		if (SlotIsPhysical(s))
+			continue;
+
+		if (s->data.invalidated != RS_INVAL_NONE)
+			continue;
+
+		valid_logical_slots++;
+	}
+
+	if (valid_logical_slots > 0)
+		ereport(ERROR,
+				(errmsg("cannot deactivate logical decoding while having valid logical replication slots")));
+
+	if (XLogStandbyInfoActive() && !recoveryInProgress)
+	{
+		bool		enabled = false;
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) (&enabled), sizeof(bool));
+
+		XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS);
+	}
+
+	SpinLockAcquire(&(XLogLogicalInfoCtl->mutex));
+	XLogLogicalInfoCtl->status = LOGICAL_DECODING_STATUS_DISABLED;
+	SpinLockRelease(&(XLogLogicalInfoCtl->mutex));
+
+	LWLockRelease(ReplicationSlotAllocationLock);
+
+	PG_RETURN_BOOL(true);
+}
+
+Datum
+pg_get_logical_decoding_status(PG_FUNCTION_ARGS)
+{
+	LogicalDecodingStatus status;
+	const char *status_str = "unknown";
+
+	SpinLockAcquire(&(XLogLogicalInfoCtl->mutex));
+	status = XLogLogicalInfoCtl->status;
+	SpinLockRelease(&(XLogLogicalInfoCtl->mutex));
+
+	switch (status)
+	{
+		case LOGICAL_DECODING_STATUS_DISABLED:
+			status_str = "disabled";
+			break;
+		case LOGICAL_DECODING_STATUS_XLOG_LOGICALINFO:
+			status_str = "xlog-logicalinfo";
+			break;
+		case LOGICAL_DECODING_STATUS_READY:
+			status_str = "ready";
+			break;
+	}
+
+	PG_RETURN_TEXT_P(cstring_to_text(status_str));
+}
diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build
index 3d36249d8ad..82cec21d5ed 100644
--- a/src/backend/replication/logical/meson.build
+++ b/src/backend/replication/logical/meson.build
@@ -7,6 +7,7 @@ backend_sources += files(
   'launcher.c',
   'logical.c',
   'logicalfuncs.c',
+  'logicalxlog.c',
   'message.c',
   'origin.c',
   'proto.c',
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index f4f80b23129..85bf4a7be99 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -1042,10 +1042,10 @@ ValidateSlotSyncParams(int elevel)
 	 * Since altering the wal_level requires a server restart, so error out in
 	 * this case regardless of elevel provided by caller.
 	 */
-	if (wal_level < WAL_LEVEL_LOGICAL)
+	if (!XLogLogicalInfoActive())
 		ereport(ERROR,
 				errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-				errmsg("replication slot synchronization requires \"wal_level\" >= \"logical\""));
+				errmsg("replication slot synchronization requires \"wal_level\" >= \"logical\" or to activate logical decoding"));
 
 	/*
 	 * A physical replication slot(primary_slot_name) is required on the
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 4a206f95277..5d90a113bf2 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -50,6 +50,7 @@
 #include "replication/slotsync.h"
 #include "replication/slot.h"
 #include "replication/walsender_private.h"
+#include "replication/logicalxlog.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/proc.h"
@@ -2355,12 +2356,12 @@ RestoreSlotFromDisk(const char *name)
 	 * NB: Changing the requirements here also requires adapting
 	 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
 	 */
-	if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
+	if (cp.slotdata.database != InvalidOid && !XLogLogicalInfoActive())
 		ereport(FATAL,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"",
 						NameStr(cp.slotdata.name)),
-				 errhint("Change \"wal_level\" to be \"logical\" or higher.")));
+				 errhint("Change \"wal_level\" to be \"logical\" or higher, or activate logical decoding with \"replica\".")));
 	else if (wal_level < WAL_LEVEL_REPLICA)
 		ereport(FATAL,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 7783ba854fc..8a2bf9a54db 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -32,6 +32,7 @@
 #include "postmaster/bgwriter.h"
 #include "postmaster/walsummarizer.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalxlog.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/slotsync.h"
@@ -148,6 +149,7 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, WaitEventCustomShmemSize());
 	size = add_size(size, InjectionPointShmemSize());
 	size = add_size(size, SlotSyncShmemSize());
+	size = add_size(size, LogicalXlogShmemSize());
 
 	/* include additional requested shmem from preload libraries */
 	size = add_size(size, total_addin_request);
@@ -330,6 +332,7 @@ CreateOrAttachShmemStructs(void)
 	PgArchShmemInit();
 	ApplyLauncherShmemInit();
 	SlotSyncShmemInit();
+	LogicalXlogShmemInit();
 
 	/*
 	 * Set up other modules that need some shared memory space
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 87027f27eb7..719ec72cd9a 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -23,6 +23,7 @@
 #include "pgstat.h"
 #include "port/pg_bitutils.h"
 #include "replication/logicalworker.h"
+#include "replication/logicalxlog.h"
 #include "replication/walsender.h"
 #include "storage/condition_variable.h"
 #include "storage/ipc.h"
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 25267f0f85d..c02ead5bad7 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -498,7 +498,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
 	 * seems OK, given that this kind of conflict should not normally be
 	 * reached, e.g. due to using a physical replication slot.
 	 */
-	if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel)
+	if (XLogLogicalInfoActive() && isCatalogRel)
 		InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid,
 										   snapshotConflictHorizon);
 }
@@ -1313,13 +1313,13 @@ LogStandbySnapshot(void)
 	 * record. Fortunately this routine isn't executed frequently, and it's
 	 * only a shared lock.
 	 */
-	if (wal_level < WAL_LEVEL_LOGICAL)
+	if (!XLogLogicalInfoActive())
 		LWLockRelease(ProcArrayLock);
 
 	recptr = LogCurrentRunningXacts(running);
 
 	/* Release lock if we kept it longer ... */
-	if (wal_level >= WAL_LEVEL_LOGICAL)
+	if (XLogLogicalInfoActive())
 		LWLockRelease(ProcArrayLock);
 
 	/* GetRunningTransactionData() acquired XidGenLock, we must release it */
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 720ef99ee83..8a8ba6765f7 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -41,6 +41,7 @@
 #include "postmaster/autovacuum.h"
 #include "replication/slotsync.h"
 #include "replication/syncrep.h"
+#include "replication/logicalxlog.h"
 #include "storage/condition_variable.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 16144c2b72d..45bf8da63db 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -131,6 +131,7 @@ HASH_GROW_BUCKETS_ELECT	"Waiting to elect a Parallel Hash participant to allocat
 HASH_GROW_BUCKETS_REALLOCATE	"Waiting for an elected Parallel Hash participant to finish allocating more buckets."
 HASH_GROW_BUCKETS_REINSERT	"Waiting for other Parallel Hash participants to finish inserting tuples into new buckets."
 LOGICAL_APPLY_SEND_DATA	"Waiting for a logical replication leader apply process to send data to a parallel apply process."
+LOGICAL_DECODING_ACTIVATION	"Waiting for logical decoding to be activated."
 LOGICAL_PARALLEL_APPLY_STATE_CHANGE	"Waiting for a logical replication parallel apply process to change state."
 LOGICAL_SYNC_DATA	"Waiting for a logical replication remote server to send data for initial table synchronization."
 LOGICAL_SYNC_STATE_CHANGE	"Waiting for a logical replication remote server to change state."
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 34ad46c067b..e5f661be85e 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -14,6 +14,7 @@
 #include "access/xlogbackup.h"
 #include "access/xlogdefs.h"
 #include "datatype/timestamp.h"
+#include "replication/logicalxlog.h"
 #include "lib/stringinfo.h"
 #include "nodes/pg_list.h"
 
@@ -123,7 +124,7 @@ extern PGDLLIMPORT int wal_level;
 #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_REPLICA)
 
 /* Do we need to WAL-log information required only for logical replication? */
-#define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL)
+#define XLogLogicalInfoActive() (XLogLogicalInfoEnabled())
 
 #ifdef WAL_DEBUG
 extern PGDLLIMPORT bool XLOG_DEBUG;
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index e80ff8e4140..764e22798dc 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -40,6 +40,7 @@ typedef struct CheckPoint
 	TimeLineID	PrevTimeLineID; /* previous TLI, if this record begins a new
 								 * timeline (equals ThisTimeLineID otherwise) */
 	bool		fullPageWrites; /* current full_page_writes */
+	bool		logicalDecoding;	/* true if logical decoding is enabled */
 	int			wal_level;		/* current wal_level */
 	FullTransactionId nextXid;	/* next free transaction ID */
 	Oid			nextOid;		/* next free OID */
@@ -80,6 +81,7 @@ typedef struct CheckPoint
 /* 0xC0 is used in Postgres 9.5-11 */
 #define XLOG_OVERWRITE_CONTRECORD		0xD0
 #define XLOG_CHECKPOINT_REDO			0xE0
+#define XLOG_LOGICAL_DECODING_STATUS	0xF0
 
 
 /*
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 2dcc2d42dac..5d04212ac1f 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11425,6 +11425,22 @@
   proname => 'pg_sync_replication_slots', provolatile => 'v',
   proparallel => 'u', prorettype => 'void', proargtypes => '',
   prosrc => 'pg_sync_replication_slots' },
+{ oid => '8976',
+  descr => 'enable both logical info WAL-logging and logical decoding facility',
+  proname => 'pg_activate_logical_decoding', provolatile => 'v',
+  proparallel => 'u', prorettype => 'void', proargtypes => '',
+  prosrc => 'pg_activate_logical_decoding' },
+{ oid => '8977',
+  descr => 'deactivate logical decoding facility',
+  proname => 'pg_deactivate_logical_decoding', provolatile => 'v',
+  proparallel => 'u', prorettype => 'bool', proargtypes => '',
+  prosrc => 'pg_deactivate_logical_decoding' },
+{ oid => '8978',
+  descr => 'get the current logical decoding state',
+  proname => 'pg_get_logical_decoding_status', provolatile => 'v',
+  proparallel => 'u', prorettype => 'text', proargtypes => '',
+  prosrc => 'pg_get_logical_decoding_status' },
+
 
 # event triggers
 { oid => '3566', descr => 'list objects dropped by the current command',
diff --git a/src/include/replication/logicalxlog.h b/src/include/replication/logicalxlog.h
new file mode 100644
index 00000000000..7a95b947c36
--- /dev/null
+++ b/src/include/replication/logicalxlog.h
@@ -0,0 +1,33 @@
+/*-------------------------------------------------------------------------
+ * logicalxlog.h
+ *	  Exports from replication/logical/logicalxlog.c.
+ *
+ * Copyright (c) 2012-2024, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LOGICALXLOG_H
+#define LOGICALXLOG_H
+
+/*
+ * The status of WAL-logging logical info and logical decoding.
+ */
+typedef enum LogicalDecodingStatus
+{
+	LOGICAL_DECODING_STATUS_DISABLED = 0,
+	LOGICAL_DECODING_STATUS_XLOG_LOGICALINFO,
+	LOGICAL_DECODING_STATUS_READY,
+}			LogicalDecodingStatus;
+
+typedef struct XLogLogicalInfoCtlData XLogLogicalInfoCtlData;
+extern XLogLogicalInfoCtlData *XLogLogicalInfoCtl;
+extern bool XLogLogicalInfo;
+
+extern Size LogicalXlogShmemSize(void);
+extern void LogicalXlogShmemInit(void);
+extern void StartupLogicalDecodingStatus(bool enabled_at_last_checkpoint);
+extern bool XLogLogicalInfoEnabled(void);
+extern bool IsLogicalDecodingActive(void);
+extern void UpdateLogicalDecodingStatus(bool enabled);
+
+#endif							/* LOGICALXLOG_H */
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl
index d658fdf417c..3bc06afcd8c 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -801,7 +801,7 @@ $handle =
   make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
 # We are not able to read from the slot as it requires wal_level >= logical on the primary server
 check_pg_recvlogical_stderr($handle,
-	"logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary"
+	"logical decoding on standby requires to enable logical decoding on the primary"
 );
 
 # Restore primary wal_level
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index d377e7ae2b6..4a3fec73914 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -573,7 +573,7 @@ CREATE PUBLICATION tap_pub2 FOR TABLE skip_wal;
 ROLLBACK;
 });
 ok( $reterr =~
-	  m/WARNING:  "wal_level" is insufficient to publish logical changes/,
+	  m/WARNING:  logical decoding needs to be enabled to publish logical changes/,
 	'CREATE PUBLICATION while "wal_level=minimal"');
 
 done_testing();
