> On Oct 24, 2025, at 23:22, vignesh C <[email protected]> wrote:
>
> Regards,
> Vignesh
> <v20251024-0001-Rename-sync_error_count-to-tbl_sync_error_.patch><v20251024-0002-Add-worker-type-argument-to-logicalrep_wor.patch><v20251024-0003-New-worker-for-sequence-synchronization-du.patch><v20251024-0004-Documentation-for-sequence-synchronization.patch>
The changes in 0001 are straightforward, looks good. I haven’t reviewed 0004
yet. Got a few comments for 0002 and 0003.
1 - 0002
```
* We are only interested in the leader apply worker or table sync worker.
+ * For apply workers, the relid should be set to InvalidOid, as they manage
+ * changes across all tables and sequences. For table sync workers, the relid
+ * should be set to the OID of the relation being synchronized.
*/
LogicalRepWorker *
-logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
+logicalrep_worker_find(Oid subid, Oid relid, LogicalRepWorkerType wtype,
+ bool only_running)
{
int i;
LogicalRepWorker *res = NULL;
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
```
The comment says that “for apply workers, the relid should be set to
InvalidOid”, so is it worthy adding an assert for that?
2 - 0002
```
- /* Search for attached worker for a given subscription id. */
+ /* Search for the attached worker matching the specified criteria. */
for (i = 0; i < max_logical_replication_workers; i++)
{
```
Minor issue with the comment:
* we are not search for a specific work, so “the” should be “a”
* “attached” is confusing. In the old comment, ‘attached” tied to “a given
subscription id”, but now, attach to what?
So suggested revision:
“/* Search for a logical replication worker matching the specified criteria */”
3 - 0002
```
/*
* Stop the logical replication worker for subid/relid, if any.
+ *
+ * Similar to logicalrep_worker_find, relid should be set to a valid OID only
+ * for table sync workers.
*/
void
-logicalrep_worker_stop(Oid subid, Oid relid)
+logicalrep_worker_stop(Oid subid, Oid relid, LogicalRepWorkerType wtype)
```
The comment should be updated: subid/relid => subid/relid/wtype.
4 - 0002
```
@@ -477,7 +477,8 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
syncworker =
logicalrep_worker_find(MyLogicalRepWorker->subid,
-
rstate->relid, false);
+
rstate->relid,
+
WORKERTYPE_TABLESYNC, true);
```
Why changed only_running from false to true? This commit adds a new worker
type, but don’t tend to change the existing logic.
5 - 0003
```
+/*
+ * Reset the last_seqsync_start_time of the sequencesync worker in the
+ * subscription's apply worker.
+ */
+void
+logicalrep_reset_seqsync_start_time(void)
+{
+ LogicalRepWorker *worker;
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ /*
+ * Set the last_seqsync_start_time for the sequence worker in the apply
+ * worker instead of the sequence sync worker, as the sequence sync
worker
+ * has finished and is about to exit.
+ */
+ worker = logicalrep_worker_find(MyLogicalRepWorker->subid, InvalidOid,
+
WORKERTYPE_APPLY, true);
+ if (worker)
+ worker->last_seqsync_start_time = 0;
+
+ LWLockRelease(LogicalRepWorkerLock);
+}
```
Two comments for this new function:
* The function comment and in-code comment are redundant. Suggesting move the
in-code comment to function comment.
* Why LW_SHARED is used? We are writing worker->last_seqsync_start_time,
shouldn’t LW_EXCLUSIVE be used?
6 - 0003
```
+ /*
+ * Count running sync workers for this subscription, while we have the
+ * lock.
+ */
+ nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+ LWLockRelease(LogicalRepWorkerLock);
+
+ launch_sync_worker(nsyncworkers, InvalidOid,
+
&MyLogicalRepWorker->last_seqsync_start_time);
```
I think here could be a race condition. Because the lock is acquired in
LW_SHARED, meaning multiple caller may get the same nsyncworkers. Then it
launches sync worker based on nsyncworkers, which would use inaccurate
nsyncworkers, because between LWLockRelease() and launch_sync_worker(), another
worker might be started.
But if that is not the case, only one caller should call
ProcessSyncingSequencesForApply(), then why the lock is needed?
7 - 0003
```
+ if (insuffperm_seqs->len)
+ {
+ appendStringInfo(combined_error_detail, "Insufficient
permission for sequence(s): (%s)",
+ insuffperm_seqs->data);
+ appendStringInfoString(combined_error_hint, "Grant permissions
for the sequence(s).");
+ }
```
“Grant permissions” is unclear. Should it be “Grant UPDATE privilege”?
8 - 0003
```
+ appendStringInfoString(combined_error_hint, " For
mismatched sequences, alter or re-create local sequences to have matching
parameters as publishers.");
```
“To have matching parameters as publishers” grammatically sound not good. Maybe
revision to “to match the publisher’s parameters”.
9 - 0003
```
+ /*
+ * current_indexes is not incremented sequentially because some
+ * sequences may be missing, and the number of fetched rows may
not
+ * match the batch size. The `hash_search` with HASH_REMOVE
takes care
+ * of the count.
+ */
```
Typo: current_indexes => current_index
10 - 0003
```
- /* Find the leader apply worker and signal it. */
- logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+ /*
+ * This is a clean exit of the sequencesync worker; reset the
+ * last_seqsync_start_time.
+ */
+ if (wtype == WORKERTYPE_SEQUENCESYNC)
+ logicalrep_reset_seqsync_start_time();
+ else
+ /* Find the leader apply worker and signal it. */
+ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
```
The comment “this is a clean exist of sequencsync worker” is specific to “if”,
so suggesting moving into “if”. And “this is a clean exis of the sequencesyc
worker” is not needed, keep consistent with the comment in “else”.
11 - 0003
```
+void
+launch_sync_worker(int nsyncworkers, Oid relid, TimestampTz *last_start_time)
+{
+ /* If there is a free sync worker slot, start a new sync worker */
+ if (nsyncworkers < max_sync_workers_per_subscription)
+ {
```
The entire function is under an “if”, so we can do “if (!…) return”, so saves a
level of indent.
Best regards,
—
Chao Li (Evan)
HighGo Software Co., Ltd.
https://www.highgo.com/