On Fri, Nov 29, 2024 at 7:54 AM Zhijie Hou (Fujitsu)
<[email protected]> wrote:
>
> It is possible to reach here if user creates a subscription with
> (connect=false,detect_update_deleted=true), in which case we could not check
> it
> during creation. But I agree that it would be better to report an ERROR here,
> so changed as suggested. After this change, there is no need to check the
> invalid remote lsn in apply worker and thus the error can also be removed.
>
1.
if (XLogRecPtrIsInvalid(data.remote_lsn))
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot get the latest WAL position from the publisher"),
+ errdetail("The connected publisher is also a standby server."));
+
Instead of removing this message from the patch, we should change it
to elog(ERROR, category of ERROR.
2.
+ Timestamp xid_advance_attempt_time; /* when the candidate_xid is
+ * decided */
How about naming this variable as candidate_xid_time /* time when the
next candidate_xid is computed */?
3.
+ /* Return if the new transaction ID is unchanged */
+ if
(FullTransactionIdFollowsOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
+ full_xid))
+ return;
This comment is unclear. Can we change it to: "Return if the
oldest_nonremovable_xid can't be advanced" or something like that?
4.
+request_publisher_status(RetainConflictInfoData *data)
+{
+ if (!reply_message)
+ {
+ MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
+
+ reply_message = makeStringInfo();
+ MemoryContextSwitchTo(oldctx);
+ }
+ else
+ resetStringInfo(reply_message);
+
+ pq_sendbyte(reply_message, 'S');
+ pq_sendint64(reply_message, GetCurrentTimestamp());
+
+ elog(DEBUG2, "sending publisher status request message");
The name 'reply_message' sounds confusing as this is a request
message. Can we change it to request_message? Also, let's avoid
reusing the same variable among different messages as it makes the
code unclear.
Apart from the above, I have modified a few comments after applying
0001 and 0002 in the attached.
--
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/worker.c
b/src/backend/replication/logical/worker.c
index 07e9916a1b..75fd3a5f7f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -280,8 +280,8 @@ typedef enum
/*
* The phases involved in advancing the non-removable transaction ID.
*
- * Refer to maybe_advance_nonremovable_xid() for details on how the function
- * transitions between these phases.
+ * See maybe_advance_nonremovable_xid() for details of the transition
+ * between these phases.
*/
typedef enum
{
@@ -385,7 +385,7 @@ static BufFile *stream_fd = NULL;
*/
static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
-/* Buffers for constructing outgoing messages. */
+/* Buffer for constructing outgoing messages. */
static StringInfo reply_message = NULL;
typedef struct SubXactInfo
@@ -4082,8 +4082,12 @@ get_candidate_xid(RetainConflictInfoData *data)
now = GetCurrentTimestamp();
/*
- * Compute the candidate_xid and send a message at most once per
- * wal_receiver_status_interval.
+ * Compute the candidate_xid and request the publisher status at most
once
+ * per wal_receiver_status_interval. This is to avoid using CPU and
network
+ * resources without making much progress.
+ *
+ * XXX The use of wal_receiver_status_interval is a bit arbitrary so we
can
+ * consider the other interval or a separate GUC if the need arises.
*/
if (!TimestampDifferenceExceeds(data->xid_advance_attempt_time, now,
wal_receiver_status_interval * 1000))