On Mon, Aug 25, 2025 at 5:05 PM Amit Kapila <[email protected]> wrote:
>
> A few comments on 0001:
>
Some more comments:
1.
+ /*
+ * Return false if the leader apply worker has stopped retaining
+ * information for detecting conflicts. This implies that update_deleted
+ * can no longer be reliably detected.
+ */
+ if (!retention_active)
+ return false;
+
/*
* For conflict detection, we use the conflict slot's xmin value instead
* of invoking GetOldestNonRemovableTransactionId(). The slot.xmin acts as
@@ -3254,7 +3315,15 @@ FindDeletedTupleInLocalRel(Relation localrel,
Oid localidxoid,
oldestxmin = slot->data.xmin;
SpinLockRelease(&slot->mutex);
- Assert(TransactionIdIsValid(oldestxmin));
+ /*
+ * Return false if the conflict detection slot.xmin is set to
+ * InvalidTransactionId. This situation arises if the current worker is
+ * either a table synchronization or parallel apply worker, and the leader
+ * stopped retention immediately after checking the
+ * oldest_nonremovable_xid above.
+ */
+ if (!TransactionIdIsValid(oldestxmin))
+ return false;
If the current worker is tablesync or parallel_apply, it should have
exited from the above check of retention_active as we get the leader's
oldest_nonremovable_xid to decide that. What am, I missing? This made
me wonder whether we need to use slot's xmin after we have fetched
leader's oldest_nonremovable_xid to find deleted tuple?
2.
- * The interval is reset to a minimum value of 100ms once there is some
- * activity on the node.
+ * The interval is reset to the lesser of 100ms and
+ * max_conflict_retention_duration once there is some activities on the node.
AFAICS, this is not adhered in the code because you are using it when
there is no activity aka when new_xid_found is false. IS the comment
wrong or code needs some updation?
3.
+
+ /* Ensure the wait time remains within the maximum limit */
+ rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
+ MySubscription->maxconflretention);
Can't we combine it with calculation of max_interval few lines above
this change? And also adjust comments atop
adjust_xid_advance_interval() accordingly?
4.
if (am_leader_apply_worker() &&
- MySubscription->retaindeadtuples &&
+ MySubscription->retaindeadtuples && MySubscription->retentionactive &&
!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
I think this code can look neat if you have one condition per line.
Apart from above comments, I have tried to improve some code comments
in the attached.
--
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/worker.c
b/src/backend/replication/logical/worker.c
index 29d0c9a6e45..3df1828e755 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4546,8 +4546,8 @@ wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
return;
/*
- * Stop retaining conflict information if required (See
- * should_stop_conflict_info_retention() for details).
+ * We don't need to maintain oldest_nonremovable_xid if we decide
+ * to stop retaining conflict information for this worker.
*/
if (should_stop_conflict_info_retention(rdt_data))
return;
@@ -4650,8 +4650,8 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
}
/*
- * Stop retaining conflict information if required (See
- * should_stop_conflict_info_retention() for details).
+ * We don't need to maintain oldest_nonremovable_xid if we decide
+ * to stop retaining conflict information for this worker.
*/
if (should_stop_conflict_info_retention(rdt_data))
return;
@@ -4771,16 +4771,16 @@ reset_retention_data_fields(RetainDeadTuplesData
*rdt_data)
}
/*
- * Check whether conflict information retention should be stopped because the
- * wait time has exceeded the maximum limit (max_conflict_retention_duration).
+ * Check whether conflict information retention should be stopped due to
+ * exceeding the maximum wait time (max_conflict_retention_duration).
*
- * If retention should be stopped, proceed to the
+ * If retention should be stopped, transition to the
* RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return
* false.
*
- * Currently, the retention will not resume automatically unless user manually
- * disables retain_dead_tuples and re-enables it after confirming that the
- * replication slot has been dropped.
+ * Note: Retention won't be resumed automatically. The user must manually
+ * disable retain_dead_tuples and re-enable it after confirming that the
+ * replication slot maintained by the launcher has been dropped.
*/
static bool
should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
@@ -4802,9 +4802,10 @@ should_stop_conflict_info_retention(RetainDeadTuplesData
*rdt_data)
now = rdt_data->last_recv_time ? rdt_data->last_recv_time :
GetCurrentTimestamp();
/*
- * Return if the wait time has not exceeded the maximum limit
- * (max_conflict_retention_duration). The time spent waiting for table
- * synchronization is not counted, as it's an infrequent operation.
+ * Return early if the wait time has not exceeded the configured maximum
+ * (max_conflict_retention_duration). Time spent waiting for table
+ * synchronization is excluded from this calculation, as it occurs
+ * infrequently.
*/
if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
MySubscription->maxconflretention +
diff --git a/src/include/replication/worker_internal.h
b/src/include/replication/worker_internal.h
index b86c759394f..62ea1a00580 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -95,8 +95,8 @@ typedef struct LogicalRepWorker
* named "pg_conflict_detection". It asynchronously collects this ID to
* decide when to advance the xmin value of the slot.
*
- * This ID would be set to InvalidTransactionId if the apply worker has
- * stopped retaining information useful for conflict detection.
+ * This ID is set to InvalidTransactionId when the apply worker stops
+ * retaining information needed for conflict detection.
*/
TransactionId oldest_nonremovable_xid;