From 6645685c4d53996a99f5621218b624df46e9041d Mon Sep 17 00:00:00 2001
From: Anthonin Bonnefoy <anthonin.bonnefoy@datadoghq.com>
Date: Wed, 12 Mar 2025 09:45:15 +0100
Subject: Avoid using deleted context with replication command

When creating a new replication slot with snapshot export and using
replication command, a new transaction is started by
SnapBuildExportSnapshot. This transaction will save the memory context
at the start of the transaction, which would be the replication command
context.

Currently, this memory context is deleted at the end of
exec_replication_command. Thus during the next replication command call,
SnapBuildClearExportedSnapshot will abort the previous transaction and
restore the deleted memory context as CurrentMemoryContext.

To fix this issue, the replication command MemoryContext is kept alive
if there's an ongoing snapshot export. This context will be restored
during the next replication command when the snapshot transaction is
aborted.
---
 src/backend/replication/logical/snapbuild.c   |  9 +++
 src/backend/replication/walsender.c           | 60 ++++++++++++++-----
 src/include/replication/snapbuild.h           |  1 +
 src/test/recovery/meson.build                 |  1 +
 .../recovery/t/045_replication_commands.pl    | 32 ++++++++++
 5 files changed, 89 insertions(+), 14 deletions(-)
 create mode 100644 src/test/recovery/t/045_replication_commands.pl

diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index b64e53de017..7b524327e20 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -630,6 +630,15 @@ SnapBuildResetExportedSnapshotState(void)
 	ExportInProgress = false;
 }
 
+/*
+ * Return true if there's an ongoing export
+ */
+bool
+SnapBuildExportInProgress(void)
+{
+	return ExportInProgress;
+}
+
 /*
  * Handle the effects of a single heap change, appropriate to the current state
  * of the snapshot builder and returns whether changes made at (xid, lsn) can
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 446d10c1a7d..4b5a954b308 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1975,22 +1975,44 @@ exec_replication_command(const char *cmd_string)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("cannot execute new commands while WAL sender is in stopping mode")));
 
-	/*
-	 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
-	 * command arrives. Clean up the old stuff if there's anything.
-	 */
-	SnapBuildClearExportedSnapshot();
+	if (SnapBuildExportInProgress())
+	{
+		Assert(IsTransactionState());
+		Assert(MemoryContextIsValid(CurTransactionContext));
+
+		/*
+		 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the
+		 * next command arrives. It started a transaction that will be aborted
+		 * and will restore the MemoryContext used at the time of the
+		 * transaction start. We need to save the CurrentMemoryContext before
+		 * this.
+		 */
+		old_context = CurrentMemoryContext;
+		SnapBuildClearExportedSnapshot();
+
+		/*
+		 * Set the restored MemoryContext as our cmd_context
+		 */
+		cmd_context = CurrentMemoryContext;
+	}
+	else
+	{
+		/*
+		 * Prepare to parse and execute the command. Since a transaction could
+		 * be started, this context may be stored in
+		 * transactionState->priorContext and live longer than the
+		 * CurrentMemoryContext. Thus, we need to create the replication
+		 * command context under the TopMemoryContext to avoid being deleted
+		 * by the parent while still referenced.
+		 */
+		cmd_context = AllocSetContextCreate(TopMemoryContext,
+											"Replication command context",
+											ALLOCSET_DEFAULT_SIZES);
+		old_context = MemoryContextSwitchTo(cmd_context);
+	}
 
 	CHECK_FOR_INTERRUPTS();
 
-	/*
-	 * Prepare to parse and execute the command.
-	 */
-	cmd_context = AllocSetContextCreate(CurrentMemoryContext,
-										"Replication command context",
-										ALLOCSET_DEFAULT_SIZES);
-	old_context = MemoryContextSwitchTo(cmd_context);
-
 	replication_scanner_init(cmd_string, &scanner);
 
 	/*
@@ -2164,7 +2186,17 @@ exec_replication_command(const char *cmd_string)
 
 	/* done */
 	MemoryContextSwitchTo(old_context);
-	MemoryContextDelete(cmd_context);
+
+	if (SnapBuildExportInProgress())
+
+		/*
+		 * A snapshot export started a transaction that needs to live until
+		 * the next command. We need to keep the cmd_context alive as it will
+		 * be restored when the transaction abort.
+		 */
+		Assert(IsTransactionState());
+	else
+		MemoryContextDelete(cmd_context);
 
 	/*
 	 * We need not update ps display or pg_stat_activity, because PostgresMain
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index 44031dcf6e3..bf300ba9b6d 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -76,6 +76,7 @@ extern Snapshot SnapBuildInitialSnapshot(SnapBuild *builder);
 extern const char *SnapBuildExportSnapshot(SnapBuild *builder);
 extern void SnapBuildClearExportedSnapshot(void);
 extern void SnapBuildResetExportedSnapshotState(void);
+extern bool SnapBuildExportInProgress(void);
 
 extern SnapBuildState SnapBuildCurrentState(SnapBuild *builder);
 extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder);
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 057bcde1434..7e33dd897b2 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -53,6 +53,7 @@ tests += {
       't/042_low_level_backup.pl',
       't/043_no_contrecord_switch.pl',
       't/044_invalidate_inactive_slots.pl',
+      't/045_replication_commands.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/045_replication_commands.pl b/src/test/recovery/t/045_replication_commands.pl
new file mode 100644
index 00000000000..385f666aa96
--- /dev/null
+++ b/src/test/recovery/t/045_replication_commands.pl
@@ -0,0 +1,32 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test of replication commands
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 'logical');
+$node_primary->start;
+
+my $primary_host = $node_primary->host;
+my $primary_port = $node_primary->port;
+my $connstr_db = "host=$primary_host port=$primary_port replication=database dbname=postgres";
+
+my ($ret, $stdout, $stderr) = $node_primary->psql(
+	'postgres', qq[
+		CREATE_REPLICATION_SLOT "test_slot" LOGICAL "test_decoding" ( SNAPSHOT "export");
+		DROP_REPLICATION_SLOT "test_slot";
+	],
+	on_error_die => 1,
+	extra_params => [ '-d', $connstr_db ]);
+ok($ret == 0, "Create and drop of replication slot");
+
+# Testcase end
+# =============================================================================
+
+done_testing();
+
-- 
2.39.5 (Apple Git-154)

