On Wed, Nov 16, 2022 at 1:50 PM houzj.f...@fujitsu.com <houzj.f...@fujitsu.com> wrote: > > On Tuesday, November 15, 2022 7:58 PM houzj.f...@fujitsu.com > <houzj.f...@fujitsu.com> wrote: > > I noticed that I didn't add CHECK_FOR_INTERRUPTS while retrying send message. > So, attach the new version which adds that. Also attach the 0004 patch that > restarts logical replication with temporarily disabling the parallel apply if > failed to apply a transaction in parallel apply worker. >
Few comments on v48-0001 ====================== 1. The variable name pending_message_count seems to indicate a number of pending messages but normally it is pending start/stop streams except for probably rollback to savepoint case. Shall we name it pending_stream_count and change the comments accordingly? 2. The variable name abort_toplevel_transaction seems unnecessarily long. Shall we rename it to toplevel_xact or something like that? 3. + /* + * Increment the number of messages waiting to be processed by + * parallel apply worker. + */ + if (!abort_toplevel_transaction) + pg_atomic_add_fetch_u32(&(winfo->shared->pending_message_count), 1); + else + pa_unlock_stream(xid, AccessExclusiveLock); It is better to explain here why different actions are required for subtransaction and transaction rather than the current comment. 4. + + if (abort_toplevel_transaction) + { + (void) pa_free_worker(winfo, xid); + } {} is not required here. 5. /* + * Although the lock can be automatically released during transaction + * rollback, but we still release the lock here as we may not in a + * transaction. + */ + pa_unlock_transaction(xid, AccessShareLock); + It is better to explain for which case (I think it is for empty xacts) it will be useful to release it explicitly. 6. + * + * XXX We can avoid sending pairs of the START/STOP messages to the parallel + * worker because unlike apply worker it will process only one transaction at a + * time. However, it is not clear whether any optimization is worthwhile + * because these messages are sent only when the logical_decoding_work_mem + * threshold is exceeded. */ static void apply_handle_stream_start(StringInfo s) I think this comment is no longer valid as now we need to wait for the next stream at stream_stop message and also need to acquire the lock in stream_start message. So, I think it is better to remove it unless I am missing something. 7. I am able to compile applyparallelworker.c by commenting few of the header includes. Please check if those are really required. #include "libpq/pqformat.h" #include "libpq/pqmq.h" //#include "mb/pg_wchar.h" #include "pgstat.h" #include "postmaster/interrupt.h" #include "replication/logicallauncher.h" //#include "replication/logicalworker.h" #include "replication/origin.h" //#include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "storage/ipc.h" #include "storage/lmgr.h" //#include "storage/procarray.h" #include "tcop/tcopprot.h" #include "utils/inval.h" #include "utils/memutils.h" //#include "utils/resowner.h" #include "utils/syscache.h" 8. +/* + * Is there a message sent by parallel apply worker which we need to receive? + */ +volatile sig_atomic_t ParallelApplyMessagePending = false; This comment and variable are placed in applyparallelworker.c, so 'we' in the above sentence is not clear. I think you need to use leader apply worker instead. 9. +static ParallelApplyWorkerInfo *pa_get_free_worker(void); Will it be better if we name this function pa_get_available_worker()? -- With Regards, Amit Kapila.