From 780da196890758f6bb2c160b0cab7a30ac9f799c Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Tue, 17 Dec 2024 10:11:08 +0800
Subject: [PATCH] wait for commit time

---
 src/backend/replication/logical/worker.c | 39 ++----------------------
 1 file changed, 3 insertions(+), 36 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 6406be4f17..893db32935 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -445,7 +445,6 @@ static void request_publisher_status(RetainConflictInfoData *data);
 static void wait_for_publisher_status(RetainConflictInfoData *data,
 									  bool status_received);
 static void wait_for_local_flush(RetainConflictInfoData *data);
-static inline bool can_advance_nonremovable_xid(RetainConflictInfoData *data);
 
 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
 static void apply_handle_insert_internal(ApplyExecutionData *edata,
@@ -3771,8 +3770,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 						 * to avoid accumulating dead rows when the worker is
 						 * busy.
 						 */
-						if (can_advance_nonremovable_xid(&data))
-							maybe_advance_nonremovable_xid(&data, false);
+						maybe_advance_nonremovable_xid(&data, false);
 					}
 					else if (c == 'k')
 					{
@@ -4196,7 +4194,7 @@ get_candidate_xid(RetainConflictInfoData *data)
 	adjust_xid_advancement_interval(data, true);
 
 	data->candidate_xid = full_xid;
-	data->phase = RCI_REQUEST_PUBLISHER_STATUS;
+	data->phase = RCI_WAIT_FOR_LOCAL_FLUSH;
 
 	/* process the next phase */
 	maybe_advance_nonremovable_xid(data, false);
@@ -4327,22 +4325,6 @@ wait_for_local_flush(RetainConflictInfoData *data)
 	Assert(!XLogRecPtrIsInvalid(data->remote_lsn) &&
 		   FullTransactionIdIsValid(data->candidate_xid));
 
-	/*
-	 * We expect the publisher and subscriber clocks to be in sync using time
-	 * sync service like NTP. Otherwise, we will advance this worker's
-	 * oldest_nonremovable_xid prematurely, leading to the removal of rows
-	 * required to detect update_delete conflict.
-	 *
-	 * XXX Consider waiting for the publisher's clock to catch up with the
-	 * subscriber's before proceeding to the next phase.
-	 */
-	if (TimestampDifferenceExceeds(data->reply_time,
-								   data->candidate_xid_time, 0))
-		ereport(ERROR,
-				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				errmsg("oldest_nonremovable_xid transaction ID may be advanced prematurely"),
-				errdetail("The clock on the publisher is behind that of the subscriber."));
-
 	/*
 	 * Do not attempt to advance the non-removable transaction ID when table
 	 * sync is in progress. During this time, changes from a single
@@ -4362,7 +4344,7 @@ wait_for_local_flush(RetainConflictInfoData *data)
 	 * oldest_nonremovable_xid may be delayed. We can always update
 	 * last_flushpos here if we notice such a delay.
 	 */
-	if (last_flushpos < data->remote_lsn)
+	if (replorigin_session_origin_timestamp < data->candidate_xid_time)
 		return;
 
 	/*
@@ -4391,21 +4373,6 @@ wait_for_local_flush(RetainConflictInfoData *data)
 	maybe_advance_nonremovable_xid(data, false);
 }
 
-/*
- * Determine if we can attempt to advance transaction ID.
- *
- * TODO: The remote flush location (last_flushpos) is currently not updated
- * during change application, making it impossible to satisfy the condition of
- * the final phase (RCI_WAIT_FOR_LOCAL_FLUSH) for advancing the transaction ID.
- * Consider updating the remote flush position in the final phase to enable
- * advancement during change application.
- */
-static inline bool
-can_advance_nonremovable_xid(RetainConflictInfoData *data)
-{
-	return data->phase == RCI_GET_CANDIDATE_XID;
-}
-
 /*
  * Exit routine for apply workers due to subscription parameter changes.
  */
-- 
2.30.0.windows.2

