Hi,

On 2017-03-24 21:32:57 +0530, Amit Khandekar wrote:
> diff --git a/src/backend/executor/nodeAppend.c 
> b/src/backend/executor/nodeAppend.c
> index a107545..e9e8676 100644
> --- a/src/backend/executor/nodeAppend.c
> +++ b/src/backend/executor/nodeAppend.c
> @@ -59,9 +59,47 @@
>  
>  #include "executor/execdebug.h"
>  #include "executor/nodeAppend.h"
> +#include "miscadmin.h"
> +#include "optimizer/cost.h"
> +#include "storage/spin.h"
> +
> +/*
> + * Shared state for Parallel Append.
> + *
> + * Each backend participating in a Parallel Append has its own
> + * descriptor in backend-private memory, and those objects all contain
> + * a pointer to this structure.
> + */
> +typedef struct ParallelAppendDescData
> +{
> +     LWLock          pa_lock;                /* mutual exclusion to choose 
> next subplan */
> +     int                     pa_first_plan;  /* plan to choose while 
> wrapping around plans */
> +     int                     pa_next_plan;   /* next plan to choose by any 
> worker */
> +
> +     /*
> +      * pa_finished : workers currently executing the subplan. A worker which
> +      * finishes a subplan should set pa_finished to true, so that no new
> +      * worker picks this subplan. For non-partial subplan, a worker which 
> picks
> +      * up that subplan should immediately set to true, so as to make sure
> +      * there are no more than 1 worker assigned to this subplan.
> +      */
> +     bool            pa_finished[FLEXIBLE_ARRAY_MEMBER];
> +} ParallelAppendDescData;


> +typedef ParallelAppendDescData *ParallelAppendDesc;

Pointer hiding typedefs make this Andres sad.



> @@ -291,6 +362,276 @@ ExecReScanAppend(AppendState *node)
>               if (subnode->chgParam == NULL)
>                       ExecReScan(subnode);
>       }
> +
> +     if (padesc)
> +     {
> +             padesc->pa_first_plan = padesc->pa_next_plan = 0;
> +             memset(padesc->pa_finished, 0, sizeof(bool) * node->as_nplans);
> +     }
> +

Is it actually guaranteed that none of the parallel workers are doing
something at that point?


> +/* ----------------------------------------------------------------
> + *           exec_append_parallel_next
> + *
> + *           Determine the next subplan that should be executed. Each worker 
> uses a
> + *           shared next_subplan counter index to start looking for 
> unfinished plan,
> + *           executes the subplan, then shifts ahead this counter to the next
> + *           subplan, so that other workers know which next plan to choose. 
> This
> + *           way, workers choose the subplans in round robin order, and thus 
> they
> + *           get evenly distributed among the subplans.
> + *
> + *           Returns false if and only if all subplans are already finished
> + *           processing.
> + * ----------------------------------------------------------------
> + */
> +static bool
> +exec_append_parallel_next(AppendState *state)
> +{
> +     ParallelAppendDesc padesc = state->as_padesc;
> +     int             whichplan;
> +     int             initial_plan;
> +     int             first_partial_plan = ((Append 
> *)state->ps.plan)->first_partial_plan;
> +     bool    found;
> +
> +     Assert(padesc != NULL);
> +
> +     /* Backward scan is not supported by parallel-aware plans */
> +     Assert(ScanDirectionIsForward(state->ps.state->es_direction));
> +
> +     /* The parallel leader chooses its next subplan differently */
> +     if (!IsParallelWorker())
> +             return exec_append_leader_next(state);

It's a bit weird that the leader's case does is so separate, and does
it's own lock acquisition.


> +     found = false;
> +     for (whichplan = initial_plan; whichplan != PA_INVALID_PLAN;)
> +     {
> +             /*
> +              * Ignore plans that are already done processing. These also 
> include
> +              * non-partial subplans which have already been taken by a 
> worker.
> +              */
> +             if (!padesc->pa_finished[whichplan])
> +             {
> +                     found = true;
> +                     break;
> +             }
> +
> +             /*
> +              * Note: There is a chance that just after the child plan node 
> is
> +              * chosen above, some other worker finishes this node and sets
> +              * pa_finished to true. In that case, this worker will go ahead 
> and
> +              * call ExecProcNode(child_node), which will return NULL tuple 
> since it
> +              * is already finished, and then once again this worker will 
> try to
> +              * choose next subplan; but this is ok : it's just an extra
> +              * "choose_next_subplan" operation.
> +              */

IIRC not all node types are safe against being executed again when
they've previously returned NULL.  That's why e.g. nodeMaterial.c
contains the following blurb:
        /*
         * If necessary, try to fetch another row from the subplan.
         *
         * Note: the eof_underlying state variable exists to short-circuit 
further
         * subplan calls.  It's not optional, unfortunately, because some plan
         * node types are not robust about being called again when they've 
already
         * returned NULL.
         */


> +     else if (IsA(subpath, MergeAppendPath))
> +     {
> +             MergeAppendPath *mpath = (MergeAppendPath *) subpath;
> +
> +             /*
> +              * If at all MergeAppend is partial, all its child plans have 
> to be
> +              * partial : we don't currently support a mix of partial and
> +              * non-partial MergeAppend subpaths.
> +              */

Why is that?



> +int
> +get_append_num_workers(List *partial_subpaths, List *nonpartial_subpaths)
> +{
> +     ListCell   *lc;
> +     double          log2w;
> +     int                     num_workers;
> +     int                     max_per_plan_workers;
> +
> +     /*
> +      * log2(number_of_subpaths)+1 formula seems to give an appropriate 
> number of
> +      * workers for Append path either having high number of children (> 
> 100) or
> +      * having all non-partial subpaths or subpaths with 1-2 
> parallel_workers.
> +      * Whereas, if the subpaths->parallel_workers is high, this formula is 
> not
> +      * suitable, because it does not take into account per-subpath workers.
> +      * For e.g., with workers (2, 8, 8),

That's the per-subplan workers for three subplans?  That's not
necessarily clear.


> the Append workers should be at least
> +      * 8, whereas the formula gives 2. In this case, it seems better to 
> follow
> +      * the method used for calculating parallel_workers of an unpartitioned
> +      * table : log3(table_size). So we treat the UNION query as if the data

Which "UNION query"?


> +      * belongs to a single unpartitioned table, and then derive its 
> workers. So
> +      * it will be : logb(b^w1 + b^w2 + b^w3) where w1, w2.. are per-subplan
> +      * workers and b is some logarithmic base such as 2 or 3. It turns out 
> that
> +      * this evaluates to a value just a bit greater than max(w1,w2, w3). 
> So, we
> +      * just use the maximum of workers formula. But this formula gives too 
> few
> +      * workers when all paths have single worker (meaning they are 
> non-partial)
> +      * For e.g. with workers : (1, 1, 1, 1, 1, 1), it is better to allocate 
> 3
> +      * workers, whereas this method allocates only 1.
> +      * So we use whichever method that gives higher number of workers.
> +      */
> +
> +     /* Get log2(num_subpaths) */
> +     log2w = fls(list_length(partial_subpaths) +
> +                             list_length(nonpartial_subpaths));
> +
> +     /* Avoid further calculations if we already crossed max workers limit */
> +     if (max_parallel_workers_per_gather <= log2w + 1)
> +             return max_parallel_workers_per_gather;
> +
> +
> +     /*
> +      * Get the parallel_workers value of the partial subpath having the 
> highest
> +      * parallel_workers.
> +      */
> +     max_per_plan_workers = 1;
> +     foreach(lc, partial_subpaths)
> +     {
> +             Path       *subpath = lfirst(lc);
> +             max_per_plan_workers = Max(max_per_plan_workers,
> +                                                                
> subpath->parallel_workers);
> +     }
> +
> +     /* Choose the higher of the results of the two formulae */
> +     num_workers = rint(Max(log2w, max_per_plan_workers) + 1);
> +
> +     /* In no case use more than max_parallel_workers_per_gather workers. */
> +     num_workers = Min(num_workers, max_parallel_workers_per_gather);
> +
> +     return num_workers;
> +}

Hm.  I'm not really convinced by the logic here.  Wouldn't it be better
to try to compute the minimum total cost across all workers for
1..#max_workers for the plans in an iterative manner?  I.e. try to map
each of the subplans to 1 (if non-partial) or N workers (partial) using
some fitting algorith (e.g. always choosing the worker(s) that currently
have the least work assigned).  I think the current algorithm doesn't
lead to useful #workers for e.g. cases with a lot of non-partial,
high-startup plans - imo a quite reasonable scenario.


I'm afraid this is too late for v10 - do you agree?

- Andres


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to