On Wed, Dec 25, 2024 at 8:13 AM Zhijie Hou (Fujitsu)
<[email protected]> wrote:
>
> Attach the new version patch set which addressed all other comments.
>
Review comments on 0001 and 0002
=============================
1.
/*
+ * Reset all data fields except those used to determine the timing for the
+ * next round of transaction ID advancement.
+ */
+ data->phase = RCI_GET_CANDIDATE_XID;
+ data->remote_lsn = InvalidXLogRecPtr;
There is no comment in the data-structure RetainConflictInfoData that
indicates the fields used to determine the timing for the next round
of transaction ID advancement. Can we add a comment in
RetainConflictInfoData to indicate the same?
2.
+ int xid_advancement_interval; /* how much time (ms) to wait
+ * before attempting to advance
+ * the non-removable transaction
+ * ID */
} RetainConflictInfoData;
Shall we rename it to a bit simpler name xid_advance_interval? If you
agree with this change, we can probably rename
adjust_xid_advancement_interval() to adjust_xid_advance_interval() as
well.
3.
+/*
+ * The minimum (100ms) and maximum (3 minutes) intervals for advancing
+ * non-removable transaction IDs.
+ */
+#define MIN_XID_ADVANCEMENT_INTERVAL 100
+#define MAX_XID_ADVANCEMENT_INTERVAL 180000
Is there any reason to keep the maximum value as 3 minutes? If not
then mention that it is arbitrary and sufficient to not cause any
undue network traffic.
4.
@@ -4129,7 +4149,7 @@ get_candidate_xid(RetainConflictInfoData *data)
* can consider the other interval or a separate GUC if the need arises.
*/
if (!TimestampDifferenceExceeds(data->candidate_xid_time, now,
- wal_receiver_status_interval * 1000))
+ data->xid_advancement_interval))
The comment atop the above change in the second patch needs to change.
5.
+static void
+adjust_xid_advancement_interval(RetainConflictInfoData *data, bool
new_xid_found)
Let's move the location of this function to after
can_advance_nonremovable_xid(). This is to keep the functions to
transition the retain_data_phases together.
Apart from the above, I have made changes in a few comments in the
attached. Please include those after review and combine 0001 and 0002
as one patch.
--
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/worker.c
b/src/backend/replication/logical/worker.c
index d5772a6b22..090aae126b 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -391,8 +391,9 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
static BufFile *stream_fd = NULL;
/*
- * The remote WAL position that has been applied and flushed locally. Refer to
- * send_feedback() for details on its usage.
+ * The remote WAL position that has been applied and flushed locally. We
+ * record this information while sending feedback to the server and use this
+ * both while sending feedback and advancing oldest_nonremovable_xid.
*/
static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
@@ -3849,7 +3850,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
wait_time = NAPTIME_PER_CYCLE;
/*
- * Ensure to wake up when it's possible to attempt advancing the
+ * Ensure to wake up when it's possible to attempt to advance
the
* non-removable transaction ID.
*/
if (data.phase == RCI_GET_CANDIDATE_XID &&
data.xid_advancement_interval)
@@ -4066,7 +4067,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool
requestReply)
*
* The overall state progression is: GET_CANDIDATE_XID ->
* REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
- * REQUEST_PUBLISHER_STATUS if concurrent remote transactions persist) ->
+ * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
* WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
*
* Retaining the dead tuples for this period is sufficient for ensuring
@@ -4184,11 +4185,14 @@ get_candidate_xid(RetainConflictInfoData *data)
/*
* Adjust the interval for advancing non-removable transaction IDs.
*
- * If no new transaction ID has been assigned since the last advancement, the
- * interval is doubled. This increase is limited by the
- * wal_receiver_status_interval if it is not zero, or otherwise restricted to a
- * maximum of 3 minutes. If a new transaction ID is detected, the interval is
- * reset to a minimum of 100ms.
+ * We double the interval to try advancing the non-removable transaction IDs
+ * if there is no activity on the node. The maximum value of the interval is
+ * capped by wal_receiver_status_interval if it is not zero, otherwise to a
+ * 3 minutes which should be sufficient to avoid using CPU or network
+ * resources without much benefit.
+ *
+ * The interval is reset to a minimum value of 100ms once there is some
+ * activity on the node.
*/
static void
adjust_xid_advancement_interval(RetainConflictInfoData *data, bool
new_xid_found)
@@ -4200,8 +4204,8 @@ adjust_xid_advancement_interval(RetainConflictInfoData
*data, bool new_xid_found
: MAX_XID_ADVANCEMENT_INTERVAL;
/*
- * No new transaction ID assigned since the last check, so
double the
- * interval, but not beyond the maximum allowable value.
+ * No new transaction ID has been assigned since the last
check, so
+ * double the interval, but not beyond the maximum allowable
value.
*/
data->xid_advancement_interval =
Min(data->xid_advancement_interval * 2,
max_interval);
@@ -4331,11 +4335,8 @@ wait_for_local_flush(RetainConflictInfoData *data)
* effort.
*
* It is safe to add new tables with initial states to the subscription
- * after this check because WAL positions of changes from these new
- * tables, which will be applied, should be greater than remote_lsn and
- * are included in transactions with later commit timestamps. So, there
is
- * no need to wait for these changes to be applied in this round of
- * advancement.
+ * after this check because any changes applied to these tables should
have
+ * a WAL position greater than the data->remote_lsn.
*/
if (!AllTablesyncsReady())
return;
@@ -4354,8 +4355,7 @@ wait_for_local_flush(RetainConflictInfoData *data)
/*
* Reaching here means the remote WAL position has been received, and
all
* transactions up to that position on the publisher have been applied
and
- * flushed locally. So, now we can advance the non-removable transaction
- * ID.
+ * flushed locally. So, we can advance the non-removable transaction ID.
*/
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
MyLogicalRepWorker->oldest_nonremovable_xid = data->candidate_xid;
diff --git a/src/include/replication/worker_internal.h
b/src/include/replication/worker_internal.h
index 1eab8a5e46..22cdd0a591 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -87,20 +87,18 @@ typedef struct LogicalRepWorker
bool parallel_apply;
/*
- * The changes made by this and later transactions are still
non-removable
- * to allow for the detection of update_deleted conflicts when applying
+ * The changes made by this and later transactions shouldn't be removed.
+ * This allows the detection of update_deleted conflicts when applying
* changes in this logical replication worker.
*
* Note that this info cannot directly protect dead tuples from being
* prematurely frozen or removed. The logical replication launcher
* asynchronously collects this info to determine whether to advance the
- * xmin value of the replication slot.
+ * xmin value of its replication slot.
*
- * Therefore, FullTransactionId that includes both the transaction ID
and
- * its epoch is used here instead of a single Transaction ID. This is
- * critical because without considering the epoch, the transaction ID
- * alone may appear as if it is in the future due to transaction ID
- * wraparound.
+ * We need to use FullTransactionId here because without considering the
+ * epoch, the transaction ID alone may appear as if it is in the future
due
+ * to the transaction ID wraparound.
*/
FullTransactionId oldest_nonremovable_xid;