From 4933bd730d40a4e9ca5c4a427526d91d12e70fa5 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Thu, 8 Aug 2024 08:06:30 +0000
Subject: [PATCH v2] Not to store the flush position of the PREPARE record by
 using the last commit position

Previously, when the subscriber mistook the wrong lsn position
(the end position of the last commit) as the end position of the current prepare.
This does not lead to data loss but uses a special value (invalidXLogRecPtr).
---
 src/backend/replication/logical/worker.c | 26 ++++++++++++++++++++----
 1 file changed, 22 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 6dc54c7283..58710e4c20 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1133,7 +1133,16 @@ apply_handle_prepare(StringInfo s)
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
-	store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
+	/*
+	 * Here, we set InvalidXLogRecPtr as the last flushed WAL location because
+	 * we do not have a way to detect the actual value. Even when there are no
+	 * pending transactions while sending feedback so that all the received
+	 * changes are regarded as flushed, there is no risk that prepared
+	 * transactions are lost. Because prepared transactions have already been
+	 * flushed at the end of PREPARE applications. When a variable tracks the
+	 * last PREPARE record, we can use it as local_lsn.
+	 */
+	store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
 
 	in_remote_transaction = false;
 
@@ -1251,7 +1260,7 @@ apply_handle_rollback_prepared(StringInfo s)
 
 	pgstat_report_stat(false);
 
-	store_flush_position(rollback_data.rollback_end_lsn, XactLastCommitEnd);
+	store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
 	in_remote_transaction = false;
 
 	/* Process any tables that are being synchronized in parallel. */
@@ -1306,7 +1315,12 @@ apply_handle_stream_prepare(StringInfo s)
 
 			CommitTransactionCommand();
 
-			store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
+			/*
+			 * We set InvalidXLogRecPtr as the last flushed WAL location
+			 * because we do not have a way to detect the actual value. See
+			 * comments in apply_handle_prepare().
+			 */
+			store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
 
 			in_remote_transaction = false;
 
@@ -1364,7 +1378,11 @@ apply_handle_stream_prepare(StringInfo s)
 
 			CommitTransactionCommand();
 
-			MyParallelShared->last_commit_end = XactLastCommitEnd;
+			/*
+			 * InvalidXLogRecPtr as the last flushed WAL location. See comments
+			 * in apply_handle_prepare().
+			 */
+			MyParallelShared->last_commit_end = InvalidXLogRecPtr;
 
 			pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
 			pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
-- 
2.43.0

