Here are some review comments for the patch v2-0001. ====== Commit message
1. General Better to use consistent terms in this message. Either "relations" or "tables" -- not a mixture of both. ~~~ 2. Before this commit, tablesync workers were capable of syncing only one relation. For each table, a new sync worker was launched and the worker would exit when the worker is done with the current table. ~ SUGGESTION (2nd sentence) For each table, a new sync worker was launched and that worker would exit when done processing the table. ~~~ 3. Now, tablesync workers are not only limited with one relation and can move to another relation in the same subscription. This reduces the overhead of launching a new background worker and exiting from that worker for each relation. ~ SUGGESTION (1st sentence) Now, tablesync workers are not limited to processing only one relation. When done, they can move to processing another relation in the same subscription. ~~~ 4. A new tablesync worker gets launched only if the number of tablesync workers for the subscription does not exceed max_sync_workers_per_subscription. If there is a table needs to be synced, a tablesync worker picks that up and syncs it.The worker continues to picking new tables to sync until there is no table left for synchronization in the subscription. ~ This seems to be missing the point that only "available" workers go looking for more tables to process. Maybe reword something like below: SUGGESTION If there is a table that needs to be synced, an "available" tablesync worker picks up that table and syncs it. Each tablesync worker continues to pick new tables to sync until there are no tables left requiring synchronization. If there was no "available" worker to process the table, then a new tablesync worker will be launched, provided the number of tablesync workers for the subscription does not exceed max_sync_workers_per_subscription. ====== src/backend/replication/logical/launcher.c 5. logicalrep_worker_launch @@ -460,8 +461,10 @@ retry: if (is_parallel_apply_worker) snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain"); - else + else if (!OidIsValid(relid)) snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); + else + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); if (OidIsValid(relid)) snprintf(bgw.bgw_name, BGW_MAXLEN, ~ 5a. I felt at least these conditions can be rearranged, so you can use OidIsValid(relid) instead of !OidIsValid(relid). ~ 5b. Probably it can all be simplified, if you are happy to do it in one line: snprintf(bgw.bgw_function_name, BGW_MAXLEN, OidIsValid(relid) ? "TablesyncWorkerMain" : is_parallel_apply_worker ? "ParallelApplyWorkerMain" : "ApplyWorkerMain"); ====== src/backend/replication/logical/tablesync.c 6. finish_sync_worker This function is removed/renamed but there are still commenting in this file referring to 'finish-sync_worker' ~~~ 7. clean_sync_worker I agree with comment from Shi-san. There should still be logging somewhere that say this tablesync worker has completed the processing of the current table. ~~~ 8. sync_worker_exit There is inconsistent function naming for clean_sync_worker versus sync_worker_exit. How about: clean_sync_worker/exit_sync_worker? Or: sync_worker_clean/sync_worker_exit? ~~~ 9. process_syncing_tables_for_sync @@ -378,7 +387,13 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ replorigin_drop_by_name(originname, true, false); - finish_sync_worker(); + /* + * Sync worker is cleaned at this point. It's ready to sync next table, + * if needed. + */ + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->ready_to_reuse = true; + SpinLockRelease(&MyLogicalRepWorker->relmutex); 9a. I did not quite follow the logic. It says "Sync worker is cleaned at this point", but who is doing that? -- more details are needed. But, why not just call clean_sync_worker() right here like it use to call finish_sync_worker? ~ 9b. Shouldn't this "ready_to_use" flag be assigned within the clean_sync_worker() function, since that is the function making is clean for next re-use. The function comment even says so: "Prepares the synchronization worker for reuse or exit." ====== src/backend/replication/logical/worker.c 10. General -- run_tablesync_worker, TablesyncWorkerMain IMO these functions would be more appropriately reside in the tablesync.c instead of the (common) worker.c. Was there some reason why they cannot be put there? ~~~ 11. LogicalRepApplyLoop + /* + * apply_dispatch() may have gone into apply_handle_commit() + * which can go into process_syncing_tables_for_sync early. + * Before we were able to reuse tablesync workers, that + * process_syncing_tables_for_sync call would exit the worker + * instead of preparing for reuse. Now that tablesync workers + * can be reused and process_syncing_tables_for_sync is not + * responsible for exiting. We need to take care of memory + * contexts here before moving to sync the nex table or exit. + */ 11a. IMO it does not seem good to explain the reason by describing how the logic USED to work, with code that is removed (e.g. "Before we were..."). It's better to describe why this is needed here based on all the CURRENT code logic. ~ 11b. /nex table/next table/ ~ 12. + if (MyLogicalRepWorker->ready_to_reuse) + { + endofstream = true; + } Unnecessary parentheses. ~ 13. + /* + * If it's still not ready to reuse, this is probably an apply worker. + * End streaming before exiting. + */ + if (!MyLogicalRepWorker->ready_to_reuse) + { + /* All done */ + walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); + } How can we not be 100% sure of the kind of worker we are dealing with? E.g. "probably" ?? Should this code be using macros like am_tablesync_worker() to have some certainty what it is dealing with here? ~~~ 14. stream_build_options + /* stream_build_options + * Build logical replication streaming options. + * + * This function sets streaming options including replication slot name + * and origin start position. Workers need these options for logical replication. + */ +static void +stream_build_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos) The function name seem a bit strange -- it's not really "building" anything. How about something like SetStreamOptions, or set_stream_options. ~~~ 15. run_tablesync_worker +static void +run_tablesync_worker(WalRcvStreamOptions *options, + char *slotname, + char *originname, + int originname_size, + XLogRecPtr *origin_startpos) +{ + /* Set this to false for safety, in case we're already reusing the worker */ + MyLogicalRepWorker->ready_to_reuse = false; Maybe reword the comment so it does not say set 'this' to false. ~ 16. + /* Start applying changes to catcup. */ + start_apply(*origin_startpos); typo: catcup ~~~ 17. run_apply_worker +static void +run_apply_worker(WalRcvStreamOptions *options, + char *slotname, + char *originname, + int originname_size, + XLogRecPtr *origin_startpos) +{ + /* This is the leader apply worker */ + RepOriginId originid; + TimeLineID startpointTLI; + char *err; + bool must_use_password; The comment above the variable declarations seems redundant/misplaced. ~~ 18. InitializeLogRepWorker if (am_tablesync_worker()) ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", + (errmsg("logical replication table synchronization worker for subscription \"%s\", relation \"%s\" with relid %u has started", MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); else I felt this code could be using get_worker_name() function like the "else" does instead of the hardwired: "logical replication table synchronization worker" string ~~~ 19. TablesyncWorkerMain +TablesyncWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + char originname[NAMEDATALEN]; + XLogRecPtr origin_startpos = InvalidXLogRecPtr; + char *myslotname = NULL; + WalRcvStreamOptions options; + List *rstates; + SubscriptionRelState *rstate; + ListCell *lc; - /* Setup replication origin tracking. */ - StartTransactionCommand(); - ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, - originname, sizeof(originname)); - originid = replorigin_by_name(originname, true); - if (!OidIsValid(originid)) - originid = replorigin_create(originname); - replorigin_session_setup(originid, 0); - replorigin_session_origin = originid; - origin_startpos = replorigin_session_get_progress(false); - - /* Is the use of a password mandatory? */ - must_use_password = MySubscription->passwordrequired && - !superuser_arg(MySubscription->owner); - - /* Note that the superuser_arg call can access the DB */ - CommitTransactionCommand(); + elog(LOG, "logical replication table synchronization worker has started"); Would it be better if that elog was using the common function get_worker_name()? ~~~ 20. + if (MyLogicalRepWorker->ready_to_reuse) + { + /* This transaction will be committed by clean_sync_worker. */ + StartTransactionCommand(); The indentation is broken. ~~~ 21. + * Check if any table whose relation state is still INIT. If a table + * in INIT state is found, the worker will not be finished, it will be + * reused instead. */ First sentence is not meaningful. Should it say: "Check if there is any table whose relation state is still INIT." ?? ~~~ 22. + /* + * Pick the table for the next run if it is not already picked up + * by another worker. + * + * Take exclusive lock to prevent any other sync worker from picking + * the same table. + */ + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + if (rstate->state != SUBREL_STATE_SYNCDONE && + !logicalrep_worker_find(MySubscription->oid, rstate->relid, false)) + { + /* Update worker state for the next table */ + MyLogicalRepWorker->relid = rstate->relid; + MyLogicalRepWorker->relstate = rstate->state; + MyLogicalRepWorker->relstate_lsn = rstate->lsn; + LWLockRelease(LogicalRepWorkerLock); + break; + } + LWLockRelease(LogicalRepWorkerLock); } + + /* + * If a relation with INIT state is assigned, clean up the worker for + * the next iteration. + * + * If there is no more work left for this worker, break the loop to + * exit. + */ + if ( MyLogicalRepWorker->relstate == SUBREL_STATE_INIT) + clean_sync_worker(); else - { - walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); - } + break; I was unsure about this logic, but shouldn't the MyLogicalRepWorker->relstate be assigned a default value prior to all these loops, so that there can be no chance for it to be SUBREL_STATE_INIT by accident. ~ 23. + /* If not exited yet, then the worker will sync another table. */ + StartTransactionCommand(); + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\" has moved to sync table \"%s\" with relid %u.", + MySubscription->name, get_rel_name(MyLogicalRepWorker->relid), MyLogicalRepWorker->relid))); + CommitTransactionCommand(); 23a This code seems strangely structured. Why is the "not exited yet" part not within the preceding "if" block where the clean_sync_worker was done? ~~~ 23b. Wont it be better for that errmsg to use the common function get_worker_name() instead of having the hardcoded string? ====== src/include/replication/worker_internal.h 24. + /* + * Used to indicate whether sync worker is ready for being reused + * to sync another relation. + */ + bool ready_to_reuse; + IIUC this field has no meaning except for a tablesync worker, but the fieldname give no indication of that at all. To make this more obvious it might be better to put this with the other tablesync fields: /* Used for initial table synchronization. */ Oid relid; char relstate; XLogRecPtr relstate_lsn; slock_t relmutex; And maybe rename it according to that convention relXXX -- e.g. 'relworker_available' or something ------ Kind Regards, Peter Smith. Fujitsu Australia