On Fri, Sep 12, 2025 at 3:39 PM Zhijie Hou (Fujitsu)
<[email protected]> wrote:
>
> Here is the V74 patch which addressed all comments.
>

+ ereport(LOG,
+ errmsg("logical replication worker for subscription \"%s\" will
resume retaining the information for detecting conflicts",
+    MySubscription->name),
+ MySubscription->maxretention
+ ? errdetail("Retention of information used for conflict detection is
now within the max_retention_duration of %u ms.",
+ MySubscription->maxretention)
+ : errdetail("Retention of information used for conflict detection is
now indefinite."));

The detail message doesn't seems to convey the correct meaning as the
duration is compared with something vague. How about changing
errdetail messages as follows:
"Retention is re-enabled as the apply process is advancing its xmin
within the configured max_retention_duration of %u ms."
"Retention is re-enabled as max_retention_duration is set to unlimited."

If you agree with the above then we can consider changing the existing
errdetail related to stop_retention functionality as follows:
"Retention is stopped as the apply process is not advancing its xmin
within the configured max_retention_duration of %u ms."

Apart from these, I have made some cosmetic changes in the attached.

-- 
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/launcher.c 
b/src/backend/replication/logical/launcher.c
index 1d42ca91ea3..ed6ba5b2d7e 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1267,10 +1267,10 @@ ApplyLauncherMain(Datum main_arg)
                                /*
                                 * Compute the minimum xmin required to protect 
dead tuples
                                 * required for conflict detection among all 
running apply
-                                * workers. This computation is performed under
-                                * LogicalRepWorkerLock to avoid accessing 
invalid worker
-                                * information in scenarios where a worker may 
exit and reset
-                                * data concurrently.
+                                * workers. This computation is performed while 
holding
+                                * LogicalRepWorkerLock to prevent accessing 
invalid worker
+                                * data, in scenarios where a worker might exit 
and reset
+                                * its state concurrently.
                                 */
                                if (sub->retaindeadtuples &&
                                        sub->retentionactive &&
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index 6d9ff6828a4..722a3678e7f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4723,12 +4723,12 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
 
        /*
         * Reaching this point implies should_stop_conflict_info_retention()
-        * returned false earlier, indicating that the most recent duration for
+        * returned false earlier, meaning that the most recent duration for
         * advancing the non-removable transaction ID is within the
         * max_retention_duration or max_retention_duration is set to 0.
         *
         * Therefore, if conflict info retention was previously stopped due to a
-        * timeout, proceed to resume retention now.
+        * timeout, it is now safe to resume retention.
         */
        if (!MySubscription->retentionactive)
        {
@@ -4803,8 +4803,9 @@ static void
 stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
 {
        /*
-        * Return if unable to update subretentionactive (see
-        * update_retention_status).
+        * If the retention status cannot be updated (e.g., due to active
+        * transaction), skip further processing to avoid inconsistent retention
+        * behavior.
         */
        if (!update_retention_status(false))
                return;
@@ -4828,10 +4829,7 @@ stop_conflict_info_retention(RetainDeadTuplesData 
*rdt_data)
 static void
 resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
 {
-       /*
-        * Return if unable to update subretentionactive (see
-        * update_retention_status).
-        */
+       /* We can't resume retention without updating retention status. */
        if (!update_retention_status(true))
                return;
 
@@ -4844,27 +4842,30 @@ resume_conflict_info_retention(RetainDeadTuplesData 
*rdt_data)
                        : errdetail("Retention of information used for conflict 
detection is now indefinite."));
 
        /*
-        * Restart the worker to allow the launcher to initialize
-        * oldest_nonremovable_xid value at startup.
+        * Restart the worker to let the launcher initialize
+        * oldest_nonremovable_xid at startup.
+        *
+        * While it's technically possible to derive this value on-the-fly using
+        * the conflict detection slot's xmin, doing so risks a race condition: 
the
+        * launcher might clean slot.xmin just after retention resumes. This 
would
+        * make oldest_nonremovable_xid unreliable, especially during xid
+        * wraparound.
         *
-        * An alternative approach is using the conflict detection slot.xmin to
-        * initialize the oldest_nonremovable_xid on-the-fly, without restarting
-        * the worker. However, this could create a race condition where the
-        * launcher invalidates slot.xmin immediately after the worker resumes
-        * retention, making oldest_nonremovable_xid unreliable if xid 
wraparound
-        * occurs. While implementing a heavy lock to prevent concurrent slot
-        * updates by the launcher is feasible, given that resuming is an
-        * infrequent operation, it may not be worthwhile to handle it.
+        * Although this can be prevented by introducing heavy weight locking, 
the
+        * complexity it will bring doesn't seem worthwhile given how rarely
+        * retention is resumed.
         */
        apply_worker_exit();
 }
 
 /*
- * Update pg_subscription.subretentionactive to the given value within a new
- * transaction.
+ * Updates pg_subscription.subretentionactive to the given value within a
+ * new transaction.
+ *
+ * If already inside an active transaction, skips the update and returns
+ * false.
  *
- * Returns true upon successful update; however, if currently within an active
- * transaction, skip the update and return false.
+ * Returns true if the update is successfully performed.
  */
 static bool
 update_retention_status(bool active)
@@ -4959,10 +4960,10 @@ adjust_xid_advance_interval(RetainDeadTuplesData 
*rdt_data, bool new_xid_found)
                         !MySubscription->retentionactive)
        {
                /*
-                * Retention has been stopped, so double the interval, but not 
beyond
-                * 3 minutes. The wal_receiver_status_interval is not 
considered as a
-                * maximum, since the chance of retention resuming is less than 
that
-                * of activity resuming.
+                * Retention has been stopped, so double the interval-capped at 
a
+                * maximum of 3 minutes. The wal_receiver_status_interval is
+                * intentionally not used as a upper bound, since the 
likelihood of
+                * retention resuming is lower than that of general activity 
resuming.
                 */
                rdt_data->xid_advance_interval = 
Min(rdt_data->xid_advance_interval * 2,
                                                                                
         MAX_XID_ADVANCE_INTERVAL);
@@ -4977,8 +4978,8 @@ adjust_xid_advance_interval(RetainDeadTuplesData 
*rdt_data, bool new_xid_found)
        }
 
        /*
-        * Ensure the wait time remains within the maximum limit when retention 
is
-        * active.
+        * Ensure the wait time remains within the maximum retention time limit
+        * when retention is active.
         */
        if (MySubscription->retentionactive)
                rdt_data->xid_advance_interval = 
Min(rdt_data->xid_advance_interval,

Reply via email to