I think your "phase" concept matches my "stage". I'll use "phase" too.

Agree, we should not separate the sender and receiver of an Exchange
into separate RelNodes. I didn't mean to give that impression. Maybe I
should call it "isPhaseTransition" rather than "isStageLeaf".

Hive/Tez does not have a concept of threads (as distinct from
processes). But I think the "split" concept will serve both Hive and
Drill.

> So maybe the question is, do you delineate these as two different concepts or 
> combine them into one (memory usage stages and parallelization change phases 
> (e.g. exchanges)).

Really good question. However, I'm going to punt. I think there is
more complexity over the horizon when we start modeling blocking
operators, phased pipelines where phase 1 starts releasing memory as
phase 2 starts allocating it.

"isStageLeaf" allows us to model a collection of consecutive operators
that function as a single unit for purposes of memory allocation, and
that's a good start. (If you want to detect a change in distribution,
look at the Distribution metadata.)

On Tue, Feb 24, 2015 at 10:59 AM, Jacques Nadeau <[email protected]> wrote:
> some thoughts
>
> - We have generic (specific) terms we use to explain these concepts: phase
> (major fragment) & slice (minor fragment or thread).
> - It isn't clear to me why Parallelism needs to expose stageLeaf.  We are
> obviously aware of this fact but I'm not sure why it should be in the
> framework as a specialized concept.
>
> Note that for planning we also don't separate out the sending and receiving
> side of an exchange because it is often useful to reason about both
> concepts at the same time.  For example affinity mapping.
>
> To be clear, we mean phase (major fragment) as a unit between two exchanges
> (or leaf fragment and root fragments which are delineated by an exchange).
> Note that this is different from what we mean by stages which is a separate
> concept that describes memory transition states.  For example, you might
> have hash join.  The join will separate the build side versus the probe
> side as two separate stages.  Other blocking or partially blocking
> operators may also separate stages and memory accounting needs to
> understand both stages and phases.
>
> So maybe the question is, do you delineate these as two different concepts
> or combine them into one (memory usage stages and parallelization change
> phases (e.g. exchanges)).
>
>
> On Tue, Feb 24, 2015 at 10:45 AM, Julian Hyde <[email protected]> wrote:
>
>> Jesus,
>>
>> That makes sense. We basically need two carts: one in front of the
>> horse (before we've determined parallelism), and one behind (after we
>> have determined parallelism).
>>
>> As I said to Jacques, you could also use the "behind" cart with a
>> place-holder value of parallelism. But you have to be careful that you
>> don't use this calculation to determine parallelism.
>>
>> I have just checked into
>> https://github.com/julianhyde/incubator-calcite/tree/calcite-603 a new
>> metadata provider:
>>
>> interface Size {
>>   Double averageRowSize();
>>   List<Double> averageColumnSizes();
>> }
>>
>> Then I propose to add the following providers. (Recall that a metadata
>> providers is a mix-in interface to RelNode; each method is evaluated
>> for a particular RelNode.)
>>
>> interface Parallelism {
>>   /** Returns true if each physical operator implementing this
>> relational expression
>>     * belongs to a different process than its inputs. */
>>   boolean isStageLeaf();
>>
>>   /** Returns the number of distinct splits of the data.
>>     *
>>     * <p>For broadcast, where each copy is the same, returns 1. */
>>  int splitCount();
>> }
>>
>> interface Memory {
>>   /** Returns the expected amount of memory, in bytes, required by a
>> physical operator
>>    * implementing this relational expression, across all splits.
>>    *
>>    * <p>How much memory is used depends on the algorithm; for example,
>> an implementation
>>    *  Aggregate that builds a hash table requires approximately
>> rowCount * averageRowSize bytes,
>>    * whereas an implementation that assumes that the input is sorted
>> uses only averageRowSize. */
>>   Double memory();
>>
>>   /** Returns the cumulative amount of memory, in bytes, required by
>> the physical operator
>>    * implementing this relational expression, and all operators within
>> the same stage,
>>    * across all splits. */
>>   Double cumulativeMemoryWithinStage();
>>
>>   /** Returns the expected cumulative amount of memory, in bytes,
>> required by the physical operator
>>    * implementing this relational expression, and all operators within
>> the same stage,
>>    * within each split.
>>    *
>>    * <p>Basic formula:
>>    *   cumulativeMemoryWithinStageSplit
>>    *     = cumulativeMemoryWithinStage / Parallelism.splitCount */
>> Double cumulativeMemoryWithinStageSplit();
>> }
>>
>> If you have not yet determined the parallelism, use
>> cumulativeMemoryWithinStage; if you have determined parallelism, use
>> cumulativeMemoryWithinStageSplit.
>>
>> What do y'all think of my terminology: split, stage, stage-leaf,
>> process, memory. (It's not going to be the same as every engine, but
>> if it's at least clear.)
>>
>> Julian
>>
>> On Tue, Feb 24, 2015 at 10:34 AM, Julian Hyde <[email protected]>
>> wrote:
>> > Yes, absolutely, that's the only sane way to approach this.
>> >
>> > Calcite's metadata provider model does make it possible to use an
>> estimate of parallelism at one stage of planning, then use the real number
>> at a later stage.
>> >
>> > In my next email I propose some new metadata providers. Hopefully they
>> could be fitted into Drill's planning process. Also advise whether the
>> terminology (processes, splits, stages, memory, size) seems intuitive even
>> though not tied to a particular execution engine.
>> >
>> > Julian
>> >
>> >
>> >> On Feb 24, 2015, at 10:23 AM, Jacques Nadeau <[email protected]>
>> wrote:
>> >>
>> >> A little food for thought given our work on Drill...
>> >>
>> >> We punted on trying to optimize all of this at once.  Very little of the
>> >> core plan exploration process is influenced directly by degree of
>> >> parallelism.  We do manage between parallelized and not and use a few
>> >> "hints" during planning to make some decisions where we need to.
>> >> Afterwards, we do a secondary pass where we determine parallelism and
>> >> memory consumption.  Afterwards, we will do a replan with more
>> conservative
>> >> memory settings if our first plan turns out not to fit into available
>> >> cluster memory.  While we may lose some plans that are ideal, it makes
>> the
>> >> whole process substantially easier to reason about.
>> >>
>> >> We also did this because some of our initial experimentation where we
>> >> included a number of these things as part of the planning process caused
>> >> the planning time to get out of hand.
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> On Mon, Feb 23, 2015 at 1:26 PM, Julian Hyde <[email protected]> wrote:
>> >>
>> >>> I am currently helping the Hive team compute the memory usage of
>> >>> operators (more precisely, a set of operators that live in the same
>> >>> process) using a Calcite metadata provider.
>> >>>
>> >>> Part of this task is to create an “Average tuple size” metadata
>> >>> provider based on a “Average column size” metadata provider. This is
>> >>> fairly uncontroversial. (I’ll create a jira case when
>> >>> https://issues.apache.org/ is back up, and we can discuss details such
>> >>> as how to compute average size of columns computed using built-in
>> >>> functions, user-defined functions, or aggregate functions.)
>> >>>
>> >>> Also we want to create a “CumulativeMemoryUseWithinProcess” metadata
>> >>> provider. That would start at 0 for a table scan or exchange consumer,
>> >>> but increase for each operator building a hash-table or using sort
>> >>> buffers until we reach the edge of the process.
>> >>>
>> >>> But we realized that we also need to determine the degree of
>> >>> parallelism, because we need to multiply AverageTupleSize not by
>> >>> RowCount but by RowCountWithinPartition. (If the data set has 100M
>> >>> rows, each 100 bytes and is bucketed 5 ways then each process will
>> >>> need memory for 20M rows, i.e 20M rows * 100 bytes/row = 2GB.)
>> >>>
>> >>> Now, if we are already at the stage of planning where we have already
>> >>> determined the degree of parallelism, then we could expose this as a
>> >>> ParallelismDegree metadata provider.
>> >>>
>> >>> But maybe we’re getting the cart before the horse? Maybe we should
>> >>> compute the degree of parallelism AFTER we have assigned operators to
>> >>> processes. We actually want to choose the parallelism degree such that
>> >>> we can fit all necessary data in memory. There might be several
>> >>> operators, all building hash-tables or using sort buffers. The natural
>> >>> break point (at least in Tez) is where the data needs to be
>> >>> re-partitioned, i.e. an Exchange operator.
>> >>>
>> >>> So maybe we should instead compute “CumulativeMemoryUseWithinStage”.
>> >>> (I'm coining the term “stage” to mean all operators within like
>> >>> processes, summing over all buckets.) Let’s suppose that, in the above
>> >>> data set, we have two operators, each of which needs to build a hash
>> >>> table, and we have 2GB memory available to each process.
>> >>> CumulativeMemoryUseWithinStage is 100M rows * 100 bytes/row * 2
>> >>> hash-tables = 20GB. So, the parallelism degree should be 20GB / 2GB =
>> >>> 10.
>> >>>
>> >>> 10 is a better choice of parallelism degree than 5. We lose a little
>> >>> (more nodes used, and more overhead combining the 10 partitions into
>> >>> 1) but gain a lot (we save the cost of sending the data over the
>> >>> network).
>> >>>
>> >>> Thoughts on this? How do other projects determine the degree of
>> >>> parallelism?
>> >>>
>> >>> Julian
>> >>>
>> >
>>

Reply via email to