On Fri, Sep 12, 2025 at 3:39 PM Zhijie Hou (Fujitsu)
<[email protected]> wrote:
>
> Here is the V74 patch which addressed all comments.
>
+ ereport(LOG,
+ errmsg("logical replication worker for subscription \"%s\" will
resume retaining the information for detecting conflicts",
+ MySubscription->name),
+ MySubscription->maxretention
+ ? errdetail("Retention of information used for conflict detection is
now within the max_retention_duration of %u ms.",
+ MySubscription->maxretention)
+ : errdetail("Retention of information used for conflict detection is
now indefinite."));
The detail message doesn't seems to convey the correct meaning as the
duration is compared with something vague. How about changing
errdetail messages as follows:
"Retention is re-enabled as the apply process is advancing its xmin
within the configured max_retention_duration of %u ms."
"Retention is re-enabled as max_retention_duration is set to unlimited."
If you agree with the above then we can consider changing the existing
errdetail related to stop_retention functionality as follows:
"Retention is stopped as the apply process is not advancing its xmin
within the configured max_retention_duration of %u ms."
Apart from these, I have made some cosmetic changes in the attached.
--
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/launcher.c
b/src/backend/replication/logical/launcher.c
index 1d42ca91ea3..ed6ba5b2d7e 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1267,10 +1267,10 @@ ApplyLauncherMain(Datum main_arg)
/*
* Compute the minimum xmin required to protect
dead tuples
* required for conflict detection among all
running apply
- * workers. This computation is performed under
- * LogicalRepWorkerLock to avoid accessing
invalid worker
- * information in scenarios where a worker may
exit and reset
- * data concurrently.
+ * workers. This computation is performed while
holding
+ * LogicalRepWorkerLock to prevent accessing
invalid worker
+ * data, in scenarios where a worker might exit
and reset
+ * its state concurrently.
*/
if (sub->retaindeadtuples &&
sub->retentionactive &&
diff --git a/src/backend/replication/logical/worker.c
b/src/backend/replication/logical/worker.c
index 6d9ff6828a4..722a3678e7f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4723,12 +4723,12 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
/*
* Reaching this point implies should_stop_conflict_info_retention()
- * returned false earlier, indicating that the most recent duration for
+ * returned false earlier, meaning that the most recent duration for
* advancing the non-removable transaction ID is within the
* max_retention_duration or max_retention_duration is set to 0.
*
* Therefore, if conflict info retention was previously stopped due to a
- * timeout, proceed to resume retention now.
+ * timeout, it is now safe to resume retention.
*/
if (!MySubscription->retentionactive)
{
@@ -4803,8 +4803,9 @@ static void
stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
{
/*
- * Return if unable to update subretentionactive (see
- * update_retention_status).
+ * If the retention status cannot be updated (e.g., due to active
+ * transaction), skip further processing to avoid inconsistent retention
+ * behavior.
*/
if (!update_retention_status(false))
return;
@@ -4828,10 +4829,7 @@ stop_conflict_info_retention(RetainDeadTuplesData
*rdt_data)
static void
resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
{
- /*
- * Return if unable to update subretentionactive (see
- * update_retention_status).
- */
+ /* We can't resume retention without updating retention status. */
if (!update_retention_status(true))
return;
@@ -4844,27 +4842,30 @@ resume_conflict_info_retention(RetainDeadTuplesData
*rdt_data)
: errdetail("Retention of information used for conflict
detection is now indefinite."));
/*
- * Restart the worker to allow the launcher to initialize
- * oldest_nonremovable_xid value at startup.
+ * Restart the worker to let the launcher initialize
+ * oldest_nonremovable_xid at startup.
+ *
+ * While it's technically possible to derive this value on-the-fly using
+ * the conflict detection slot's xmin, doing so risks a race condition:
the
+ * launcher might clean slot.xmin just after retention resumes. This
would
+ * make oldest_nonremovable_xid unreliable, especially during xid
+ * wraparound.
*
- * An alternative approach is using the conflict detection slot.xmin to
- * initialize the oldest_nonremovable_xid on-the-fly, without restarting
- * the worker. However, this could create a race condition where the
- * launcher invalidates slot.xmin immediately after the worker resumes
- * retention, making oldest_nonremovable_xid unreliable if xid
wraparound
- * occurs. While implementing a heavy lock to prevent concurrent slot
- * updates by the launcher is feasible, given that resuming is an
- * infrequent operation, it may not be worthwhile to handle it.
+ * Although this can be prevented by introducing heavy weight locking,
the
+ * complexity it will bring doesn't seem worthwhile given how rarely
+ * retention is resumed.
*/
apply_worker_exit();
}
/*
- * Update pg_subscription.subretentionactive to the given value within a new
- * transaction.
+ * Updates pg_subscription.subretentionactive to the given value within a
+ * new transaction.
+ *
+ * If already inside an active transaction, skips the update and returns
+ * false.
*
- * Returns true upon successful update; however, if currently within an active
- * transaction, skip the update and return false.
+ * Returns true if the update is successfully performed.
*/
static bool
update_retention_status(bool active)
@@ -4959,10 +4960,10 @@ adjust_xid_advance_interval(RetainDeadTuplesData
*rdt_data, bool new_xid_found)
!MySubscription->retentionactive)
{
/*
- * Retention has been stopped, so double the interval, but not
beyond
- * 3 minutes. The wal_receiver_status_interval is not
considered as a
- * maximum, since the chance of retention resuming is less than
that
- * of activity resuming.
+ * Retention has been stopped, so double the interval-capped at
a
+ * maximum of 3 minutes. The wal_receiver_status_interval is
+ * intentionally not used as a upper bound, since the
likelihood of
+ * retention resuming is lower than that of general activity
resuming.
*/
rdt_data->xid_advance_interval =
Min(rdt_data->xid_advance_interval * 2,
MAX_XID_ADVANCE_INTERVAL);
@@ -4977,8 +4978,8 @@ adjust_xid_advance_interval(RetainDeadTuplesData
*rdt_data, bool new_xid_found)
}
/*
- * Ensure the wait time remains within the maximum limit when retention
is
- * active.
+ * Ensure the wait time remains within the maximum retention time limit
+ * when retention is active.
*/
if (MySubscription->retentionactive)
rdt_data->xid_advance_interval =
Min(rdt_data->xid_advance_interval,