From 22d48c240a3ecbce74f4b92d23b6d0b281bd6c32 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Thu, 27 Feb 2025 10:49:32 +0800
Subject: [PATCH v2] Fix slot synchronization with two_phase decoding enabled

This commit fixes a bug for slot synchronization with logical replication
slots that enabled two_phase decoding. As it stands, transactions prepared
before two-phase decoding is enabled may fail to replicate to the subscriber
after being committed on a promoted standby following a failover.

The issue arises because the two_phase_at field of a slot, which tracks the LSN
from which two-phase decoding starts, is not synchronized to standby servers.
Without this field, the logical decoding might incorrectly identify prepared
transaction as already replicated to the subscriber, causing them to be
skipped.

To address the issue on HEAD, this commit makes the two_phase_at field of the slot
visible in the pg_replication_slots view and enables the slot synchronization
to copy this value to the corresponding synced slot on the standby server.

The bug has been present since the introduction of slot synchronization in
PostgreSQL 17. However, due to the need for catalog changes, backpatching this
fix is not feasible. Instead, to prevent the risk of losing prepared
transactions in prior versions, we now disallow enabling failover and two-phase
decoding together for a replication slot.

---
 doc/src/sgml/system-views.sgml                | 11 +++
 src/backend/catalog/system_views.sql          |  1 +
 src/backend/replication/logical/slotsync.c    | 14 +++-
 src/backend/replication/slotfuncs.c           |  8 +-
 src/include/catalog/pg_proc.dat               |  6 +-
 .../t/040_standby_failover_slots_sync.pl      | 81 ++++++++++++++++++-
 src/test/regress/expected/rules.out           |  3 +-
 7 files changed, 111 insertions(+), 13 deletions(-)

diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index 3f5a306247e..141c140331d 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2560,6 +2560,17 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>two_phase_at</structfield> <type>pg_lsn</type>
+      </para>
+      <para>
+       The address (<literal>LSN</literal>) from which the decoding of prepared
+       transactions is enabled. Always <literal>NULL</literal> for physical
+       slots.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>inactive_since</structfield> <type>timestamptz</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 31d269b7ee0..a8fddd0183c 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1025,6 +1025,7 @@ CREATE VIEW pg_replication_slots AS
             L.wal_status,
             L.safe_wal_size,
             L.two_phase,
+            L.two_phase_at,
             L.inactive_since,
             L.conflicting,
             L.invalidation_reason,
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 2c0a7439be4..e22d41891e6 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -139,6 +139,7 @@ typedef struct RemoteSlot
 	bool		failover;
 	XLogRecPtr	restart_lsn;
 	XLogRecPtr	confirmed_lsn;
+	XLogRecPtr	two_phase_at;
 	TransactionId catalog_xmin;
 
 	/* RS_INVAL_NONE if valid, or the reason of invalidation */
@@ -276,7 +277,8 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
 	if (remote_dbid != slot->data.database ||
 		remote_slot->two_phase != slot->data.two_phase ||
 		remote_slot->failover != slot->data.failover ||
-		strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0)
+		strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0 ||
+		remote_slot->two_phase_at != slot->data.two_phase_at)
 	{
 		NameData	plugin_name;
 
@@ -287,6 +289,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
 		slot->data.plugin = plugin_name;
 		slot->data.database = remote_dbid;
 		slot->data.two_phase = remote_slot->two_phase;
+		slot->data.two_phase_at = remote_slot->two_phase_at;
 		slot->data.failover = remote_slot->failover;
 		SpinLockRelease(&slot->mutex);
 
@@ -788,9 +791,9 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 static bool
 synchronize_slots(WalReceiverConn *wrconn)
 {
-#define SLOTSYNC_COLUMN_COUNT 9
+#define SLOTSYNC_COLUMN_COUNT 10
 	Oid			slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
-	LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID};
+	LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID};
 
 	WalRcvExecResult *res;
 	TupleTableSlot *tupslot;
@@ -798,7 +801,7 @@ synchronize_slots(WalReceiverConn *wrconn)
 	bool		some_slot_updated = false;
 	bool		started_tx = false;
 	const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
-		" restart_lsn, catalog_xmin, two_phase, failover,"
+		" restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
 		" database, invalidation_reason"
 		" FROM pg_catalog.pg_replication_slots"
 		" WHERE failover and NOT temporary";
@@ -853,6 +856,9 @@ synchronize_slots(WalReceiverConn *wrconn)
 														   &isnull));
 		Assert(!isnull);
 
+		d = slot_getattr(tupslot, ++col, &isnull);
+		remote_slot->two_phase_at = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
+
 		remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col,
 														  &isnull));
 		Assert(!isnull);
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 146eef5871e..8a314b5ff3b 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -235,7 +235,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 19
+#define PG_GET_REPLICATION_SLOTS_COLS 20
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	XLogRecPtr	currlsn;
 	int			slotno;
@@ -406,6 +406,12 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
 		values[i++] = BoolGetDatum(slot_contents.data.two_phase);
 
+		if (slot_contents.data.two_phase &&
+			!XLogRecPtrIsInvalid(slot_contents.data.two_phase_at))
+			values[i++] = LSNGetDatum(slot_contents.data.two_phase_at);
+		else
+			nulls[i++] = true;
+
 		if (slot_contents.inactive_since > 0)
 			values[i++] = TimestampTzGetDatum(slot_contents.inactive_since);
 		else
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 8b68b16d79d..a89488419a6 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11409,9 +11409,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,timestamptz,bool,text,bool,bool}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,inactive_since,conflicting,invalidation_reason,failover,synced}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,pg_lsn,timestamptz,bool,text,bool,bool}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,two_phase_at,inactive_since,conflicting,invalidation_reason,failover,synced}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index 8f65142909a..67cc6374565 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -22,7 +22,11 @@ $publisher->init(
 # Disable autovacuum to avoid generating xid during stats update as otherwise
 # the new XID could then be replicated to standby at some random point making
 # slots at primary lag behind standby during slot sync.
-$publisher->append_conf('postgresql.conf', 'autovacuum = off');
+$publisher->append_conf(
+	'postgresql.conf', qq{
+autovacuum = off
+max_prepared_transactions = 1
+});
 $publisher->start;
 
 $publisher->safe_psql('postgres',
@@ -33,6 +37,7 @@ my $publisher_connstr = $publisher->connstr . ' dbname=postgres';
 # Create a subscriber node, wait for sync to complete
 my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
 $subscriber1->init;
+$subscriber1->append_conf('postgresql.conf', 'max_prepared_transactions = 1');
 $subscriber1->start;
 
 # Capture the time before the logical failover slot is created on the
@@ -830,13 +835,72 @@ $primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
 	"'sb1_slot'");
 $primary->reload;
 
+##################################################
+# Test the synchronization of the two_phase setting for a subscription with the
+# standby. Additionally, prepare a transaction before enabling the two_phase
+# option; subsequent tests will verify if it can be correctly replicated to the
+# subscriber after committing it on the promoted standby.
+##################################################
+
+$standby1->start;
+
+# Prepare a transaction
+$primary->safe_psql(
+	'postgres', qq[
+	BEGIN;
+	INSERT INTO tab_int values(0);
+	PREPARE TRANSACTION 'test_twophase_slotsync';
+]);
+
+$primary->wait_for_replay_catchup($standby1);
+$primary->wait_for_catchup('regress_mysub1');
+
+# Disable the subscription to allow changing the two_phase option.
+$subscriber1->safe_psql('postgres',
+	"ALTER SUBSCRIPTION regress_mysub1 DISABLE");
+
+# Wait for the replication slot to become inactive on the publisher
+$primary->poll_query_until(
+	'postgres',
+	"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'",
+	1);
+
+# Set two_phase to true and enable the subscription
+$subscriber1->safe_psql(
+	'postgres', qq[
+	ALTER SUBSCRIPTION regress_mysub1 SET (two_phase = true);
+	ALTER SUBSCRIPTION regress_mysub1 ENABLE;
+]);
+
+$primary->wait_for_catchup('regress_mysub1');
+
+my $two_phase_at = $primary->safe_psql('postgres',
+	"SELECT two_phase_at from pg_replication_slots WHERE slot_name = 'lsub1_slot';"
+);
+
+# Confirm that two_phase setting of lsub1_slot slot is synced to the standby
+ok( $standby1->poll_query_until(
+		'postgres',
+		"SELECT two_phase AND '$two_phase_at' = two_phase_at from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"
+	),
+	'two_phase setting of slot lsub1_slot synced to standby');
+
+# Confirm that the prepared transaction is not yet replicated to the
+# subscriber.
+$result = $subscriber1->safe_psql('postgres',
+	"SELECT count(*) = 0 FROM pg_prepared_xacts;");
+is($result, 't',
+	"the prepared transaction is not replicated to the subscriber");
+
 ##################################################
 # Promote the standby1 to primary. Confirm that:
 # a) the slot 'lsub1_slot' and 'snap_test_slot' are retained on the new primary
 # b) logical replication for regress_mysub1 is resumed successfully after failover
-# c) changes can be consumed from the synced slot 'snap_test_slot'
+# c) changes from the transaction 'test_twophase_slotsync', which was prepared
+#    on the old primary, can be consumed from the synced slot 'snap_test_slot'
+#    once committed on the new primary.
+# d) changes can be consumed from the synced slot 'snap_test_slot'
 ##################################################
-$standby1->start;
 $primary->wait_for_replay_catchup($standby1);
 
 # Capture the time before the standby is promoted
@@ -876,6 +940,15 @@ is( $standby1->safe_psql(
 	't',
 	'synced slot retained on the new primary');
 
+# Commit the prepared transaction
+$standby1->safe_psql('postgres',
+	"COMMIT PREPARED 'test_twophase_slotsync';");
+$standby1->wait_for_catchup('regress_mysub1');
+
+# Confirm that the prepared transaction is replicated to the subscriber
+is($subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}),
+	"1", 'prepared data replicated from the new primary');
+
 # Insert data on the new primary
 $standby1->safe_psql('postgres',
 	"INSERT INTO tab_int SELECT generate_series(11, 20);");
@@ -883,7 +956,7 @@ $standby1->wait_for_catchup('regress_mysub1');
 
 # Confirm that data in tab_int replicated on the subscriber
 is($subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}),
-	"20", 'data replicated from the new primary');
+	"21", 'data replicated from the new primary');
 
 # Consume the data from the snap_test_slot. The synced slot should reach a
 # consistent point by restoring the snapshot at the restart_lsn serialized
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 47478969135..035769b4624 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1474,12 +1474,13 @@ pg_replication_slots| SELECT l.slot_name,
     l.wal_status,
     l.safe_wal_size,
     l.two_phase,
+    l.two_phase_at,
     l.inactive_since,
     l.conflicting,
     l.invalidation_reason,
     l.failover,
     l.synced
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, inactive_since, conflicting, invalidation_reason, failover, synced)
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, two_phase_at, inactive_since, conflicting, invalidation_reason, failover, synced)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.30.0.windows.2

