On Friday, December 16, 2022 3:08 PM Masahiko Sawada <sawada.m...@gmail.com> 
wrote:
> 
> 
>Here are some minor comments:

Thanks for the comments!

> ---
> +pa_has_spooled_message_pending()
> +{
> +       PartialFileSetState fileset_state;
> +
> +       fileset_state = pa_get_fileset_state();
> +
> +       if (fileset_state != FS_UNKNOWN)
> +               return true;
> +       else
> +               return false;
> +}
> 
> I think we can simply do:
> 
> return (fileset_state != FS_UNKNOWN);

Will change.

> 
> Or do we need this function in the first place? I think we can do in
> LogicalParallelApplyLoop() like:

I was intended to not expose the file state in the main loop, so maybe better
to keep this function.

> ---
> +       active_workers = list_copy(ParallelApplyWorkerPool);
> +
> +       foreach(lc, active_workers)
> +       {
> +               int                     slot_no;
> +               uint16          generation;
> +               ParallelApplyWorkerInfo *winfo =
> (ParallelApplyWorkerInfo *) lfirst(lc);
> +
> +               LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
> +               napplyworkers =
> logicalrep_pa_worker_count(MyLogicalRepWorker->subid);
> +               LWLockRelease(LogicalRepWorkerLock);
> +
> +               if (napplyworkers <=
> max_parallel_apply_workers_per_subscription / 2)
> +                       return;
> +
> 
> Calling logicalrep_pa_worker_count() with lwlock for each worker seems
> not efficient to me. I think we can get the number of workers once at
> the top of this function and return if it's already lower than the
> maximum pool size. Otherwise, we attempt to stop extra workers.

How about we directly check the length of worker pool list here which
seems simpler and don't need to lock ?

> ---
> +bool
> +pa_free_worker(ParallelApplyWorkerInfo *winfo, TransactionId xid)
> +{
> 
> 
> Is there any reason why this function has the XID as a separate
> argument? It seems to me that since we always call this function with
> 'winfo' and 'winfo->shared->xid', we can remove xid from the function
> argument.
> 
> ---
> +       /* Initialize shared memory area. */
> +       SpinLockAcquire(&winfo->shared->mutex);
> +       winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
> +       winfo->shared->xid = xid;
> +       SpinLockRelease(&winfo->shared->mutex);
> 
> It's practically no problem but is there any reason why some fields of
> ParallelApplyWorkerInfo are initialized in pa_setup_dsm() whereas some
> fields are done here?

We could be using old worker in the pool here in which case we need to update
these fields with the new streaming transaction information.

I will address other comments except above ones which are being discussed.

Best regards,
Hou zj

Reply via email to