On Friday, December 16, 2022 3:08 PM Masahiko Sawada <sawada.m...@gmail.com> wrote: > > >Here are some minor comments:
Thanks for the comments! > --- > +pa_has_spooled_message_pending() > +{ > + PartialFileSetState fileset_state; > + > + fileset_state = pa_get_fileset_state(); > + > + if (fileset_state != FS_UNKNOWN) > + return true; > + else > + return false; > +} > > I think we can simply do: > > return (fileset_state != FS_UNKNOWN); Will change. > > Or do we need this function in the first place? I think we can do in > LogicalParallelApplyLoop() like: I was intended to not expose the file state in the main loop, so maybe better to keep this function. > --- > + active_workers = list_copy(ParallelApplyWorkerPool); > + > + foreach(lc, active_workers) > + { > + int slot_no; > + uint16 generation; > + ParallelApplyWorkerInfo *winfo = > (ParallelApplyWorkerInfo *) lfirst(lc); > + > + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); > + napplyworkers = > logicalrep_pa_worker_count(MyLogicalRepWorker->subid); > + LWLockRelease(LogicalRepWorkerLock); > + > + if (napplyworkers <= > max_parallel_apply_workers_per_subscription / 2) > + return; > + > > Calling logicalrep_pa_worker_count() with lwlock for each worker seems > not efficient to me. I think we can get the number of workers once at > the top of this function and return if it's already lower than the > maximum pool size. Otherwise, we attempt to stop extra workers. How about we directly check the length of worker pool list here which seems simpler and don't need to lock ? > --- > +bool > +pa_free_worker(ParallelApplyWorkerInfo *winfo, TransactionId xid) > +{ > > > Is there any reason why this function has the XID as a separate > argument? It seems to me that since we always call this function with > 'winfo' and 'winfo->shared->xid', we can remove xid from the function > argument. > > --- > + /* Initialize shared memory area. */ > + SpinLockAcquire(&winfo->shared->mutex); > + winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN; > + winfo->shared->xid = xid; > + SpinLockRelease(&winfo->shared->mutex); > > It's practically no problem but is there any reason why some fields of > ParallelApplyWorkerInfo are initialized in pa_setup_dsm() whereas some > fields are done here? We could be using old worker in the pool here in which case we need to update these fields with the new streaming transaction information. I will address other comments except above ones which are being discussed. Best regards, Hou zj