Here are some review comments for the patch v2-0001.

======
Commit message

1. General
Better to use consistent terms in this message. Either "relations" or
"tables" -- not a mixture of both.

~~~

2.
Before this commit, tablesync workers were capable of syncing only one
relation. For each table, a new sync worker was launched and the worker
would exit when the worker is done with the current table.

~

SUGGESTION (2nd sentence)
For each table, a new sync worker was launched and that worker would
exit when done processing the table.

~~~

3.
Now, tablesync workers are not only limited with one relation and can
move to another relation in the same subscription. This reduces the
overhead of launching a new background worker and exiting from that
worker for each relation.

~

SUGGESTION (1st sentence)
Now, tablesync workers are not limited to processing only one
relation. When done, they can move to processing another relation in
the same subscription.

~~~

4.
A new tablesync worker gets launched only if the number of tablesync
workers for the subscription does not exceed
max_sync_workers_per_subscription. If there is a table needs to be synced,
a tablesync worker picks that up and syncs it.The worker continues to
picking new tables to sync until there is no table left for synchronization
in the subscription.

~

This seems to be missing the point that only "available" workers go
looking for more tables to process. Maybe reword something like below:

SUGGESTION
If there is a table that needs to be synced, an "available" tablesync
worker picks up that table and syncs it. Each tablesync worker
continues to pick new tables to sync until there are no tables left
requiring synchronization. If there was no "available" worker to
process the table, then a new tablesync worker will be launched,
provided the number of tablesync workers for the subscription does not
exceed max_sync_workers_per_subscription.

======
src/backend/replication/logical/launcher.c

5. logicalrep_worker_launch

@@ -460,8 +461,10 @@ retry:

  if (is_parallel_apply_worker)
  snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
- else
+ else if (!OidIsValid(relid))
  snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
+ else
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");

  if (OidIsValid(relid))
  snprintf(bgw.bgw_name, BGW_MAXLEN,

~

5a.
I felt at least these conditions can be rearranged, so you can use
OidIsValid(relid) instead of !OidIsValid(relid).

~

5b.
Probably it can all be simplified, if you are happy to do it in one line:

snprintf(bgw.bgw_function_name, BGW_MAXLEN,
    OidIsValid(relid) ? "TablesyncWorkerMain" :
    is_parallel_apply_worker ? "ParallelApplyWorkerMain" :
"ApplyWorkerMain");

======
src/backend/replication/logical/tablesync.c

6. finish_sync_worker

This function is removed/renamed but there are still commenting in
this file referring to 'finish-sync_worker'

~~~

7. clean_sync_worker

I agree with comment from Shi-san. There should still be logging
somewhere that say this tablesync worker has completed the processing
of the current table.

~~~

8. sync_worker_exit

There is inconsistent function naming for clean_sync_worker versus
sync_worker_exit.

How about: clean_sync_worker/exit_sync_worker?
Or: sync_worker_clean/sync_worker_exit?

~~~

9. process_syncing_tables_for_sync

@@ -378,7 +387,13 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
  */
  replorigin_drop_by_name(originname, true, false);

- finish_sync_worker();
+ /*
+ * Sync worker is cleaned at this point. It's ready to sync next table,
+ * if needed.
+ */
+ SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+ MyLogicalRepWorker->ready_to_reuse = true;
+ SpinLockRelease(&MyLogicalRepWorker->relmutex);

9a.
I did not quite follow the logic. It says "Sync worker is cleaned at
this point", but who is doing that? -- more details are needed. But,
why not just call clean_sync_worker() right here like it use to call
finish_sync_worker?

~

9b.
Shouldn't this "ready_to_use" flag be assigned within the
clean_sync_worker() function, since that is the function making is
clean for next re-use. The function comment even says so: "Prepares
the synchronization worker for reuse or exit."

======
src/backend/replication/logical/worker.c

10. General -- run_tablesync_worker, TablesyncWorkerMain

IMO these functions would be more appropriately reside in the
tablesync.c instead of the (common) worker.c. Was there some reason
why they cannot be put there?

~~~

11. LogicalRepApplyLoop

+ /*
+ * apply_dispatch() may have gone into apply_handle_commit()
+ * which can go into process_syncing_tables_for_sync early.
+ * Before we were able to reuse tablesync workers, that
+ * process_syncing_tables_for_sync call would exit the worker
+ * instead of preparing for reuse. Now that tablesync workers
+ * can be reused and process_syncing_tables_for_sync is not
+ * responsible for exiting. We need to take care of memory
+ * contexts here before moving to sync the nex table or exit.
+ */

11a.
IMO it does not seem good to explain the reason by describing how the
logic USED to work, with code that is removed (e.g. "Before we
were..."). It's better to describe why this is needed here based on
all the CURRENT code logic.

~

11b.
/nex table/next table/

~

12.
+ if (MyLogicalRepWorker->ready_to_reuse)
+ {
+ endofstream = true;
+ }

Unnecessary parentheses.

~

13.
+ /*
+ * If it's still not ready to reuse, this is probably an apply worker.
+ * End streaming before exiting.
+ */
+ if (!MyLogicalRepWorker->ready_to_reuse)
+ {
+ /* All done */
+ walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
+ }

How can we not be 100% sure of the kind of worker we are dealing with?
E.g. "probably" ??

Should this code be using macros like am_tablesync_worker() to have
some certainty what it is dealing with here?

~~~

14. stream_build_options

+ /* stream_build_options
+  * Build logical replication streaming options.
+  *
+  * This function sets streaming options including replication slot name
+  * and origin start position. Workers need these options for logical
replication.
+  */
+static void
+stream_build_options(WalRcvStreamOptions *options, char *slotname,
XLogRecPtr *origin_startpos)

The function name seem a bit strange -- it's not really "building"
anything. How about something like SetStreamOptions, or
set_stream_options.

~~~

15. run_tablesync_worker

+static void
+run_tablesync_worker(WalRcvStreamOptions *options,
+ char *slotname,
+ char *originname,
+ int originname_size,
+ XLogRecPtr *origin_startpos)
+{
+ /* Set this to false for safety, in case we're already reusing the worker */
+ MyLogicalRepWorker->ready_to_reuse = false;

Maybe reword the comment so it does not say set 'this' to false.

~

16.
+ /* Start applying changes to catcup. */
+ start_apply(*origin_startpos);

typo: catcup

~~~

17. run_apply_worker

+static void
+run_apply_worker(WalRcvStreamOptions *options,
+ char *slotname,
+ char *originname,
+ int originname_size,
+ XLogRecPtr *origin_startpos)
+{
+ /* This is the leader apply worker */
+ RepOriginId originid;
+ TimeLineID startpointTLI;
+ char    *err;
+ bool must_use_password;


The comment above the variable declarations seems redundant/misplaced.

~~

18. InitializeLogRepWorker

  if (am_tablesync_worker())
  ereport(LOG,
- (errmsg("logical replication table synchronization worker for
subscription \"%s\", table \"%s\" has started",
+ (errmsg("logical replication table synchronization worker for
subscription \"%s\", relation \"%s\" with relid %u has started",
  MySubscription->name,
- get_rel_name(MyLogicalRepWorker->relid))));
+ get_rel_name(MyLogicalRepWorker->relid),
+ MyLogicalRepWorker->relid)));
  else


I felt this code could be using get_worker_name() function like the
"else" does instead of the hardwired: "logical replication table
synchronization worker" string

~~~

19. TablesyncWorkerMain

+TablesyncWorkerMain(Datum main_arg)
+{
+ int worker_slot = DatumGetInt32(main_arg);
+ char originname[NAMEDATALEN];
+ XLogRecPtr origin_startpos = InvalidXLogRecPtr;
+ char    *myslotname = NULL;
+ WalRcvStreamOptions options;
+ List    *rstates;
+ SubscriptionRelState *rstate;
+ ListCell   *lc;

- /* Setup replication origin tracking. */
- StartTransactionCommand();
- ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
-    originname, sizeof(originname));
- originid = replorigin_by_name(originname, true);
- if (!OidIsValid(originid))
- originid = replorigin_create(originname);
- replorigin_session_setup(originid, 0);
- replorigin_session_origin = originid;
- origin_startpos = replorigin_session_get_progress(false);
-
- /* Is the use of a password mandatory? */
- must_use_password = MySubscription->passwordrequired &&
- !superuser_arg(MySubscription->owner);
-
- /* Note that the superuser_arg call can access the DB */
- CommitTransactionCommand();
+ elog(LOG, "logical replication table synchronization worker has started");

Would it be better if that elog was using the common function get_worker_name()?

~~~

20.
+ if (MyLogicalRepWorker->ready_to_reuse)
+ {
+ /* This transaction will be committed by clean_sync_worker. */
+ StartTransactionCommand();

The indentation is broken.

~~~

21.
+ * Check if any table whose relation state is still INIT. If a table
+ * in INIT state is found, the worker will not be finished, it will be
+ * reused instead.
  */

First sentence is not meaningful. Should it say: "Check if there is
any table whose relation state is still INIT." ??

~~~

22.
+ /*
+ * Pick the table for the next run if it is not already picked up
+ * by another worker.
+ *
+ * Take exclusive lock to prevent any other sync worker from picking
+ * the same table.
+ */
+ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
+ if (rstate->state != SUBREL_STATE_SYNCDONE &&
+ !logicalrep_worker_find(MySubscription->oid, rstate->relid, false))
+ {
+ /* Update worker state for the next table */
+ MyLogicalRepWorker->relid = rstate->relid;
+ MyLogicalRepWorker->relstate = rstate->state;
+ MyLogicalRepWorker->relstate_lsn = rstate->lsn;
+ LWLockRelease(LogicalRepWorkerLock);
+ break;
+ }
+ LWLockRelease(LogicalRepWorkerLock);
  }
+
+ /*
+ * If a relation with INIT state is assigned, clean up the worker for
+ * the next iteration.
+ *
+ * If there is no more work left for this worker, break the loop to
+ * exit.
+ */
+ if ( MyLogicalRepWorker->relstate == SUBREL_STATE_INIT)
+ clean_sync_worker();
  else
- {
- walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
- }
+ break;

I was unsure about this logic, but shouldn't the
MyLogicalRepWorker->relstate be assigned a default value prior to all
these loops, so that there can be no chance for it to be
SUBREL_STATE_INIT by accident.

~

23.
+ /* If not exited yet, then the worker will sync another table. */
+ StartTransactionCommand();
+ ereport(LOG,
+ (errmsg("logical replication table synchronization worker for
subscription \"%s\" has moved to sync table \"%s\" with relid %u.",
+ MySubscription->name, get_rel_name(MyLogicalRepWorker->relid),
MyLogicalRepWorker->relid)));
+ CommitTransactionCommand();

23a
This code seems strangely structured. Why is the "not exited yet" part
not within the preceding "if" block where the clean_sync_worker was
done?

~~~

23b.
Wont it be better for that errmsg to use the common function
get_worker_name() instead of having the hardcoded string?


======
src/include/replication/worker_internal.h

24.
+ /*
+ * Used to indicate whether sync worker is ready for being reused
+ * to sync another relation.
+ */
+ bool ready_to_reuse;
+

IIUC this field has no meaning except for a tablesync worker, but the
fieldname give no indication of that at all.

To make this more obvious it might be better to put this with the
other tablesync fields:

/* Used for initial table synchronization. */
Oid relid;
char relstate;
XLogRecPtr relstate_lsn;
slock_t relmutex;
And maybe rename it according to that convention relXXX -- e.g.
'relworker_available' or something

------
Kind Regards,
Peter Smith.
Fujitsu Australia


Reply via email to