On Mon, Sep 26, 2022 at 8:41 AM wangw.f...@fujitsu.com <wangw.f...@fujitsu.com> wrote: > > On Thur, Sep 22, 2022 at 18:12 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > > 3. > > ApplyWorkerMain() > > { > > ... > > ... > > + > > + if (server_version >= 160000 && > > + MySubscription->stream == SUBSTREAM_PARALLEL) > > + options.proto.logical.streaming = pstrdup("parallel"); > > > > After deciding here whether the parallel streaming mode is enabled or > > not, we recheck the same thing in apply_handle_stream_abort() and > > parallel_apply_can_start(). In parallel_apply_can_start(), we do it > > via two different checks. How about storing this information say in > > structure MyLogicalRepWorker in ApplyWorkerMain() and then use it at > > other places? > > Improved as suggested. > Added a new flag "in_parallel_apply" to structure MyLogicalRepWorker. >
Can we name the variable in_parallel_apply as parallel_apply and set it in logicalrep_worker_launch() instead of in ParallelApplyWorkerMain()? Few other comments: ================== 1. + if (is_subworker && + nparallelapplyworkers >= max_parallel_apply_workers_per_subscription) + { + LWLockRelease(LogicalRepWorkerLock); + + ereport(DEBUG1, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("out of parallel apply workers"), + errhint("You might need to increase max_parallel_apply_workers_per_subscription."))); I think it is better to keep the level of this as LOG. Similar messages at other places use WARNING or LOG. Here, I prefer LOG because the system can still proceed without blocking anything. 2. +/* Reset replication origin tracking. */ +void +parallel_apply_replorigin_reset(void) +{ + bool started_tx = false; + + /* This function might be called inside or outside of transaction. */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } Why do we need a transaction in this function? 3. Few suggestions to improve in the patch: diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 1623c9e2fa..d9c519dfab 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1264,6 +1264,10 @@ apply_handle_stream_prepare(StringInfo s) case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); + /* + * The origin can be active only in one process. See + * apply_handle_stream_commit. + */ parallel_apply_replorigin_reset(); /* Send STREAM PREPARE message to the parallel apply worker. */ @@ -1623,12 +1627,7 @@ apply_handle_stream_abort(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("STREAM ABORT message without STREAM STOP"))); - /* - * Check whether the publisher sends abort_lsn and abort_time. - * - * Note that the parallel apply worker is only started when the publisher - * sends abort_lsn and abort_time. - */ + /* We receive abort information only when we can apply in parallel. */ if (MyLogicalRepWorker->in_parallel_apply) read_abort_info = true; @@ -1656,7 +1655,13 @@ apply_handle_stream_abort(StringInfo s) Assert(winfo); if (subxid == xid) + { + /* + * The origin can be active only in one process. See + * apply_handle_stream_commit. + */ parallel_apply_replorigin_reset(); + } /* Send STREAM ABORT message to the parallel apply worker. */ parallel_apply_send_data(winfo, s->len, s->data); @@ -1858,6 +1863,12 @@ apply_handle_stream_commit(StringInfo s) case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); + /* + * We need to reset the replication origin before sending the commit + * message and set it up again after confirming that parallel worker + * has processed the message. This is required because origin can be + * active only in one process at-a-time. + */ parallel_apply_replorigin_reset(); /* Send STREAM COMMIT message to the parallel apply worker. */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 4cbfb43492..2bd9664f86 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -70,11 +70,7 @@ typedef struct LogicalRepWorker */ pid_t apply_leader_pid; - /* - * Indicates whether to use parallel apply workers. - * - * Determined based on streaming parameter and publisher version. - */ + /* Indicates whether apply can be performed parallelly. */ bool in_parallel_apply; -- With Regards, Amit Kapila.