Hi,Vladimir.  This could be a picture of how calcite optimize the two
aggregates problem:

step 1:
Without any hints for pruning, BOTH implementation of aggregations should
be built and held in memo.
For the top aggregation, the one-pass implementation requests a
HASH_DISTRIBUTED [a] distribution to its input and the two-pass
implementation requests an "ANY" distribution to its input.
When bottom aggregation gets built, it also builds two implementations. So
we get 4 valids candidates:

// Candidate 1. top agg is one-pass and bottom agg is one-pass
PhysicalAggregate[group=[a], F2_phase2(c)]
  Exchange[dist[a]]
    PhysicalAggregate[group=[a,b], F1_phase2(c)]
      Exchange[dist[a,b]]
        ...

// Candidate 2. top agg is one-pass and bottom agg is two-pass
PhysicalAggregate[group=[a], F2_phase2(c)]
  Exchange[dist[a]]
    PhysicalAggregate[group=[a,b], F1_phase2(c)]
      Exchange[dist[a,b]]
        PhysicalAggregate[group=[a,b], F1_phase1(c)]
          ...

// Candidate 3. top agg is two-pass, bottom agg is one-pass
PhysicalAggregate[group=[a], F2_phase2(c)]
  Exchange[dist[a]]
    PhysicalAggregate[group=[a], F2_phase1(c)]
      PhysicalAggregate[group=[a,b], F1_phase2(c)]
        Exchange[dist[a,b]]
          ...

// Candidate 4. top agg is two-pass, bottom agg is two-pass
PhysicalAggregate[group=[a], F2_phase2(c)]
  Exchange[dist[a]]
    PhysicalAggregate[group=[a], F2_phase1(c)]
      PhysicalAggregate[group=[a,b], F1_phase2(c)]
        Exchange[dist[a,b]]
          PhysicalAggregate[group=[a,b], F1_phase1(c)]
            ...

step 2:
No matter which aggregation is built first. The calcite framework passes
the HASH_DISTRIBUTED[a] trait requirement through bottom aggregation, both
implementations. Note that a concrete physical node only needs to consider
its own implementation. And we get two more valid candidates:

// Candidate 5. passThrough called on candidate1
PhysicalAggregate[group=[a], F2_phase2(c)]
    PhysicalAggregate[group=[a,b], F1_phase2(c)]  // deliver dist[a]
      Exchange[dist[a]]
        ...

// Candidate 6. passThrough called on candidate2
PhysicalAggregate[group=[a], F2_phase2(c)]
  PhysicalAggregate[group=[a,b], F1_phase2(c)]  // deliver dist[a]
    Exchange[dist[a]]
      PhysicalAggregate[group=[a,b], F1_phase1(c)]
        ...

step 3:
The cost model chooses the best candidate.
Note that Candidate 5 is not always the best. For example, when it is
detected, from stats or other, that data is skewed on key [a], Candidate 2
may be better. When it is detected that NDV(a, b) = 0.99 * ROWCOUNT() ,
Candidate 6 is preferred, as partial aggregate can reduce little data. So
it is not wasty to build all those candidates.

Most of the above works are done by calcite frameworks. Users only need to:
1. Fire both implementations during aggregation builds.
2. Overwrite the passThroughTraits method.

Thanks,
Jinpeng Wu


On Thu, May 27, 2021 at 8:19 AM Haisheng Yuan <hy...@apache.org> wrote:

> Another point I would like to mention is that it is not recommended to
> override method "passThrough" and "derive" directly, override
> "passThroughTraits" and "deriveTraits" instead, so that we can make sure
> only the same type of physical node is created and no nested relnodes or
> additional RelSets are created, unless you know you have to create
> different type of nodes. For example, if the table foo has an btree index
> on column a, and the parent relnode is requesting ordering on column a,
> then we may consider to override "passThrough" of TableScan to return an
> IndexScan instead of a TableScan.
>
> Regards,
> Haisheng Yuan
> On 2021/05/26 22:45:20, Haisheng Yuan <hy...@apache.org> wrote:
> > Hi Vladimir,
> >
> > 1. You need a logical rule to split the aggregate into a local aggregate
> and global aggregate, for example:
> >
> https://github.com/greenplum-db/gporca/blob/master/libgpopt/src/xforms/CXformSplitGbAgg.cpp
> > Only implementation rules can convert a logical node to a physical node
> or multiple physical nodes.
> > After physical implementation, you have 2 physical alternatives:
> > 1) single phase global physical aggregate,
> > 2) 2 phase physical aggregate with local and global aggregate.
> > It should be up to the cost to decide which one to choose.
> >
> > 2. Given a desired traitset from parent node, the current relnode only
> needs to generate a single relnode after passing down the traitset. Given a
> traitset delivered by child node, the current relnode only derive a single
> relnode. Quite unlike other optimizer, in Calcite's top-down optimizer, you
> don't need to worry about issuing multiple optimization requests to inputs,
> which is handled by Calcite framework secretly. i.e.
> > SELECT a, b, min(c) from foo group by a, b;
> > In many other optimizer, we probably need ask the aggregate to issue 3
> distribution requests for tablescan on foo, which are
> > 1) hash distributed by a,
> > 2) hash distributed by b,
> > 3) hash distributed by a, b
> > However in Calcite top-down optimizer, your physical implementation rule
> for global aggregate only need generate a single physical node with hash
> distribution by a, b. In case the table foo happens to be distributed by a,
> or b, the derive() method will tell you there is an opportunity. This is
> the feature that Calcite's top-down optimizer excels over other optimizers,
> because this can dramatically reduce the search space while keeping the
> optimal optimization opportunity.
> >
> > 3. This is by design. Nodes produced from "passThrough" and "derive" and
> just sibling physical node with different traitset, we only need the
> initial physical nodes after implementation to avoid unnecessary
> operations. The fundamental reason is, unlike Orca optimizer where physical
> node and physical property are separate things, Calcite's logical/physical
> nodes contains traitset. With regard to the latter question, can you give
> an example?
> >
> > Regards,
> > Haisheng Yuan
> >
> >
> > On 2021/05/26 20:11:57, Vladimir Ozerov <ppoze...@gmail.com> wrote:
> > > Hi,
> > >
> > > I tried to optimize a certain combination of operators for the
> distributed
> > > engine and got stuck with the trait propagation in the top-down
> engine. I
> > > want to ask the community for advice on whether the problem is solvable
> > > with the current Apache Calcite implementation or not.
> > >
> > > Consider the following logical tree:
> > > 3: LogicalAggregate[group=[a], F2(c)]
> > > 2:  LogicalAggregate[group=[a,b], F1(c)]
> > > 1:    LogicalScan[t]
> > >
> > > Consider that these two aggregates cannot be merged or simplified for
> > > whatever reason. We have only a set of physical rules to translate this
> > > logical tree to a physical tree. Also, there could be any number of
> > > other operators between these two aggregates. We omit them for clarity,
> > > assuming that the distribution is not destroyed.
> > >
> > > In the distributed environment, non-collocated aggregates are often
> > > implemented in two phases: local pre-aggregation and final aggregation,
> > > with an exchange in between. Consider that the Scan operator is hash
> > > distributed by some key other than [a] or [b]. If we optimize operators
> > > without considering the whole plan, we may optimize each operator
> > > independently, which would give us the following plan:
> > > 3: PhysicalAggregate[group=[a], F2_phase2(c)]             //
> > > HASH_DISTRIBUTED [a]
> > > 3:   Exchange[a]                                          //
> > > HASH_DISTRIBUTED [a]
> > > 3:     PhysicalAggregate[group=[a], F2_phase1(c)]         //
> > > HASH_DISTRIBUTED [a,b]
> > > 2:       PhysicalAggregate[group=[a,b], F1_phase2(c)]     //
> > > HASH_DISTRIBUTED [a,b]
> > > 2:         Exchange[a, b]                                 //
> > > HASH_DISTRIBUTED [a,b]
> > > 2:           PhysicalAggregate[group=[a,b], F1_phase1(c)] //
> > > HASH_DISTRIBUTED [d]
> > > 1:             PhysicalScan[t]                            //
> > > HASH_DISTRIBUTED [d]
> > >
> > > This plan is not optimal, because we re-hash inputs twice. A better
> plan
> > > that we want to get:
> > > 3: PhysicalAggregate[group=[a], F2(c)]                //
> HASH_DISTRIBUTED
> > > [a]
> > > 2:   PhysicalAggregate[group=[a,b], F1_phase2(c)]     //
> HASH_DISTRIBUTED
> > > [a]
> > > 2:     Exchange[a]                                    //
> HASH_DISTRIBUTED
> > > [a]
> > > 2:       PhysicalAggregate[group=[a,b], F1_phase1(c)] //
> HASH_DISTRIBUTED
> > > [d]
> > > 1:         PhysicalScan[t]                            //
> HASH_DISTRIBUTED
> > > [d]
> > >
> > > In this case, we take advantage of the fact that the distribution [a]
> is
> > > compatible with [a,b]. Therefore we may enforce only [a], instead of
> doing
> > > [a,b] and then [a]. Since exchange operators are very expensive, this
> > > optimization may bring a significant boost to the query engine. Now the
> > > question - how do we reach that state? Intuitively, a pass-through is
> > > exactly what we need. We may pass the optimization request from top
> > > aggregate to bottom aggregate to find physical implementations shared
> by
> > > [a]. But the devil is in the details - when and how exactly to pass
> this
> > > request?
> > >
> > > Typically, we have a conversion rule that converts a logical aggregate
> to a
> > > physical aggregate. We may invoke "convert" on the input to initiate
> the
> > > pass-through:
> > >
> > > RelNode convert(...) {
> > >     return new PhysicalAggregate(
> > >         convert(input, HASH_DISTRIBUTED[a])
> > >     )
> > > }
> > >
> > > The first problem - we cannot create the normal physical aggregate here
> > > because we do not know input traits yet. The final decision whether to
> do a
> > > one-phase or two-phase aggregate can be made only in the
> > > "PhysicalNode.derive" method when concrete input traits are resolved.
> > > Therefore the converter rule should create a kind of "template"
> physical
> > > operator, which would be used to construct the final operator(s) when
> input
> > > traits are resolved. AFAIU Enumerable works similarly: we create
> operators
> > > with virtually arbitrary traits taken from logical nodes in the
> conversion
> > > rules. We only later do create normal nodes in the derive() methods.
> > >
> > > The second problem - our top aggregate doesn't actually need the
> > > HASH_DISTRIBUTED[a] input. Instead, it may accept inputs with any
> > > distribution. What we really need is to inform the input (bottom
> aggregate)
> > > that it should look for additional implementations that satisfy
> > > HASH_DISTRIBUTED[a]. Therefore, enforcing a specific distribution on
> the
> > > input using the "convert" method is not what we need because this
> > > conversion might enforce unnecessary exchanges.
> > >
> > > The third problem - derivation. Consider that we delivered the
> optimization
> > > request to the bottom aggregate. As an implementor, what am I supposed
> to
> > > do in this method? I cannot return the final aggregate from here
> because
> > > the real input traits are not derived yet. Therefore, I can only return
> > > another template, hoping that the "derive" method will be called on it.
> > > However, this will not happen because trait derivation is skipped on
> the
> > > nodes emitted from pass-through. See "DeriveTrait.perform" [1].
> > >
> > > BottomAggregate {
> > >     RelNode passThrough(distribution=HASH_DISTRIBUTED[a]) {
> > >         // ???
> > >     }
> > > }
> > >
> > > I feel that I am either going in the wrong direction, or some gaps in
> the
> > > product disallow such optimization. So I would like to ask the
> community to
> > > assist with the following questions:
> > > 1. In the top-down optimizer, how should we convert a logical node to a
> > > physical node, provided that "derive" is not called yet? I have a gut
> > > feeling that the trait propagation is currently not implemented to the
> full
> > > extent because based on Cascades paper I would expect that parent
> physical
> > > nodes are produced after the child physical nodes. But in our rules,
> this
> > > is not the case - some physical nodes are produced before the trait
> > > derivation.
> > > 2. How to propagate several optimization requests to inputs? We need
> either
> > > inputs with a specific distribution or inputs with an arbitrary
> > > distribution in the example above. It seems that to achieve that, I
> need to
> > > emit several alternative nodes with different requirements to inputs.
> Does
> > > it make sense?
> > > 3. Why are nodes produced from the "passThrough" method excluded from
> trait
> > > derivation? If this is by design, how can I preserve the optimization
> > > request to satisfy it on the derivation stage when input traits are
> > > available?
> > >
> > > Regards,
> > > Vladimir.
> > >
> > > [1]
> > >
> https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java#L828
> > >
> >
>

Reply via email to