Hi, Vladimir.

Firstly, let me explain how the current solution handles your problems.

It is true that the current solution is not perfect. But it does solve most
problems. One thing to clarify is that, build rules are not necessary to
build all possible candidates. The key point is that parent should raise as
many requirements to inputs as possible.

Your first problem is, "we do not know what the parent needs". Actually we
do not need to. Considering that parent has raised all its possible
requirements, we only need to provide one initial implementation. If it is
not the one parent wants, sooner or later, the passThrough method gets
called. And the desired implementation gets built. Maybe the initial
implementation is wasty, but it is essential in the current solution. As
you have discovered, all passThrough and derive calls are applied and only
applied to it. It helps us to avoid duplicate passThrough requests.

Your second problem, and the key problem, is "we do not know which physical
children are available". If a parent really fires all possible
requirements, it could be very wasty. As a workaround, we introduce a
metadata called PulledUpTraits and relies on the RelMetaDataQuery to
collect all possible delivered traits from descendants. However, this does
not solve all problems, as it highly depends on how descendants have been
built and how traits have been derived at the moment RelMetaDataQuery is
called. Some opportunities could be missed. So we tend to fire all possible
candidates that have different input requirements if the candidates count
is acceptable, such as aggregations.

And I saw your alternative solution. It is really impressive. In summary, I
saw two key differences:
1. Traits "derive" and "passThrough" are applied to a logical node rather
than a physical node (the initial implementation I mentioned above)
2. The "passThrough" and "derive" procedures are integrated into
RelOptRule.onMatch

But I still don't get it on how this solution handles the wasty problems.
Taking the aggregations in previous discussion as an example, what should
calculateInputTraitSet returns in PhysicalAggregateRule? Will it return ANY
or dist[a]? If it returns dist[a], then the two-pass implementation should
never be fired, as it already ask inputs to satisfy the dist[a]
requirement. That is, calculateInputTraitSet already decides the
implementation before optimizing children. It is simple to solve this
problem: making calculateInputTraitSet return multiple requiredInputTraits,
instead of a single one. So calculateInputTraitSet needs to collect all
possible trait requirements. And then in the later part, those requirements
are passed to the children's optimize procedure one by one.

Now, it comes to the bottom aggregation. It already knows that a parent
requires dist[a] traits. Can calculateInputTraitSet omit some
requiredInputTraits now? I am afraid not. Because at this moment,
calculateInputTraitSet has no idea whether dist[a,b] could be a better
solution. For example, when the input table is a clustered table and the
data was already clustered by [a, b]. The opportunities could be hidden
deeply in descendants. So calculateInputTraitSet still needs to collect all
possible trait requirements for inputs, plus the requirements from parents.

Another interesting point is, can a child reject its parent's requirement?
Note that children can always satisfy parents requirements: the worst case
is injecting an extra exchange node. By default, a child should never
reject a parent's requirement as it does not know whether this requirement
would result in the best candidates ( or the only candidate ). To decide
whether input can reject the requirement is complicated. One systematic
solution is the group pruning mechanism. Whatever, it is not practical to
decide it in an onMatch method call.

Finally, if all required traits should need to pass to the children and
children always satisfy that requirements, all possible candidates that
have different input requirements will be created during the
createPhysicalAggregates step. This is actually similar to the current
solution.

Remind me if I have any misunderstandings.

Thanks,
Jinpeng Wu

On Thu, May 27, 2021 at 6:31 PM Vladimir Ozerov <ppoze...@gmail.com> wrote:

> Hi Jinpeng,
>
> Thank you, I would try this approach with aggregates. But as I mentioned in
> the previous email, it is not ideal, because we may generate wasteful
> alternatives. This is so because we generate the physical nodes outside of
> the optimization context: (1) we do not know what the parent needs, and (2)
> we do not know which physical children are available. For example, if there
> is only a single Aggregate on top of the Scan, I can generate only a single
> good alternative after the input is optimized instead of pessimistically
> emitting both 1-phase and 2-phase aggregates.
>
> Speaking of imaginary alternative implementation, consider that we have a
> slightly different API. First, we pass the optimization request to the
> "RelOptRule.onMatch". Second, the "RelOptRule.convert" triggers the
> synchronous optimization of the child group wrt the adjusted optimization
> request, and returns the List<RelTraitSet> for produced physical child
> nodes. That is, we basically converge "passThrough" and "derive" to a
> single point - "RelOptRule.onMatch". If this would be the case, there is a
> chance that we would not need to generate some physical nodes if there is
> either no demand from the parent, or there are no matching inputs.
>
> class PhysicalAggregateRule extends PhysicalRule {
>   void onMatch(RelOptRuleCall call, *RelTraitSet requiredTraits*) {
>     Aggregate logicalAgg = call.get(0);
>     RelNode input = logicalAgg.getInput();
>
>     // 1. Pass-through: calculate required input properties
>     //    based on the current optimization request. There
>     //    could be several such properties in the general
>     //    case.
>     RelTraitSet requiredInputTraits =
>       calculateInputTraitSet(logicalAgg, requiredTraits);
>
>     // 2. Derive: *optimize input* wrt the required properties.
>     //    Return collection of available physical input traits.
>     List<RelTraitSet> derivedInputTraits =
>       optimize(input, requiredInputTraits);
>
>     // 3. Create physical implementations based on (a) parent request,
>     //    (b) available inputs.
>     List<PhysicalAggregate> physicalAggs =
>       createPhysicalAggregates(requiredTraits, derivedInputTraits, ...);
>   }
> }
>
> I wonder if this idea might be viable from a theoretical standpoint. AFAIU
> the original Columbia paper doesn't address this: their APPLY_RULE task
> receives the optimization context, but it is not propagated to the rule,
> and the equivalent nodes are generated from a rule before O_INPUTS. In the
> code snippet above, the optimization context is passed to the rule, and the
> O_INPUTS is invoked from the rule before the equivalent node is emitted.
>
> Regards,
> Vladimir.
>
> чт, 27 мая 2021 г. в 11:20, Jinpeng Wu <wjpabc...@gmail.com>:
>
> > Hi,Vladimir.  This could be a picture of how calcite optimize the two
> > aggregates problem:
> >
> > step 1:
> > Without any hints for pruning, BOTH implementation of aggregations should
> > be built and held in memo.
> > For the top aggregation, the one-pass implementation requests a
> > HASH_DISTRIBUTED [a] distribution to its input and the two-pass
> > implementation requests an "ANY" distribution to its input.
> > When bottom aggregation gets built, it also builds two implementations.
> So
> > we get 4 valids candidates:
> >
> > // Candidate 1. top agg is one-pass and bottom agg is one-pass
> > PhysicalAggregate[group=[a], F2_phase2(c)]
> >   Exchange[dist[a]]
> >     PhysicalAggregate[group=[a,b], F1_phase2(c)]
> >       Exchange[dist[a,b]]
> >         ...
> >
> > // Candidate 2. top agg is one-pass and bottom agg is two-pass
> > PhysicalAggregate[group=[a], F2_phase2(c)]
> >   Exchange[dist[a]]
> >     PhysicalAggregate[group=[a,b], F1_phase2(c)]
> >       Exchange[dist[a,b]]
> >         PhysicalAggregate[group=[a,b], F1_phase1(c)]
> >           ...
> >
> > // Candidate 3. top agg is two-pass, bottom agg is one-pass
> > PhysicalAggregate[group=[a], F2_phase2(c)]
> >   Exchange[dist[a]]
> >     PhysicalAggregate[group=[a], F2_phase1(c)]
> >       PhysicalAggregate[group=[a,b], F1_phase2(c)]
> >         Exchange[dist[a,b]]
> >           ...
> >
> > // Candidate 4. top agg is two-pass, bottom agg is two-pass
> > PhysicalAggregate[group=[a], F2_phase2(c)]
> >   Exchange[dist[a]]
> >     PhysicalAggregate[group=[a], F2_phase1(c)]
> >       PhysicalAggregate[group=[a,b], F1_phase2(c)]
> >         Exchange[dist[a,b]]
> >           PhysicalAggregate[group=[a,b], F1_phase1(c)]
> >             ...
> >
> > step 2:
> > No matter which aggregation is built first. The calcite framework passes
> > the HASH_DISTRIBUTED[a] trait requirement through bottom aggregation,
> both
> > implementations. Note that a concrete physical node only needs to
> consider
> > its own implementation. And we get two more valid candidates:
> >
> > // Candidate 5. passThrough called on candidate1
> > PhysicalAggregate[group=[a], F2_phase2(c)]
> >     PhysicalAggregate[group=[a,b], F1_phase2(c)]  // deliver dist[a]
> >       Exchange[dist[a]]
> >         ...
> >
> > // Candidate 6. passThrough called on candidate2
> > PhysicalAggregate[group=[a], F2_phase2(c)]
> >   PhysicalAggregate[group=[a,b], F1_phase2(c)]  // deliver dist[a]
> >     Exchange[dist[a]]
> >       PhysicalAggregate[group=[a,b], F1_phase1(c)]
> >         ...
> >
> > step 3:
> > The cost model chooses the best candidate.
> > Note that Candidate 5 is not always the best. For example, when it is
> > detected, from stats or other, that data is skewed on key [a], Candidate
> 2
> > may be better. When it is detected that NDV(a, b) = 0.99 * ROWCOUNT() ,
> > Candidate 6 is preferred, as partial aggregate can reduce little data. So
> > it is not wasty to build all those candidates.
> >
> > Most of the above works are done by calcite frameworks. Users only need
> to:
> > 1. Fire both implementations during aggregation builds.
> > 2. Overwrite the passThroughTraits method.
> >
> > Thanks,
> > Jinpeng Wu
> >
> >
> > On Thu, May 27, 2021 at 8:19 AM Haisheng Yuan <hy...@apache.org> wrote:
> >
> > > Another point I would like to mention is that it is not recommended to
> > > override method "passThrough" and "derive" directly, override
> > > "passThroughTraits" and "deriveTraits" instead, so that we can make
> sure
> > > only the same type of physical node is created and no nested relnodes
> or
> > > additional RelSets are created, unless you know you have to create
> > > different type of nodes. For example, if the table foo has an btree
> index
> > > on column a, and the parent relnode is requesting ordering on column a,
> > > then we may consider to override "passThrough" of TableScan to return
> an
> > > IndexScan instead of a TableScan.
> > >
> > > Regards,
> > > Haisheng Yuan
> > > On 2021/05/26 22:45:20, Haisheng Yuan <hy...@apache.org> wrote:
> > > > Hi Vladimir,
> > > >
> > > > 1. You need a logical rule to split the aggregate into a local
> > aggregate
> > > and global aggregate, for example:
> > > >
> > >
> >
> https://github.com/greenplum-db/gporca/blob/master/libgpopt/src/xforms/CXformSplitGbAgg.cpp
> > > > Only implementation rules can convert a logical node to a physical
> node
> > > or multiple physical nodes.
> > > > After physical implementation, you have 2 physical alternatives:
> > > > 1) single phase global physical aggregate,
> > > > 2) 2 phase physical aggregate with local and global aggregate.
> > > > It should be up to the cost to decide which one to choose.
> > > >
> > > > 2. Given a desired traitset from parent node, the current relnode
> only
> > > needs to generate a single relnode after passing down the traitset.
> > Given a
> > > traitset delivered by child node, the current relnode only derive a
> > single
> > > relnode. Quite unlike other optimizer, in Calcite's top-down optimizer,
> > you
> > > don't need to worry about issuing multiple optimization requests to
> > inputs,
> > > which is handled by Calcite framework secretly. i.e.
> > > > SELECT a, b, min(c) from foo group by a, b;
> > > > In many other optimizer, we probably need ask the aggregate to issue
> 3
> > > distribution requests for tablescan on foo, which are
> > > > 1) hash distributed by a,
> > > > 2) hash distributed by b,
> > > > 3) hash distributed by a, b
> > > > However in Calcite top-down optimizer, your physical implementation
> > rule
> > > for global aggregate only need generate a single physical node with
> hash
> > > distribution by a, b. In case the table foo happens to be distributed
> by
> > a,
> > > or b, the derive() method will tell you there is an opportunity. This
> is
> > > the feature that Calcite's top-down optimizer excels over other
> > optimizers,
> > > because this can dramatically reduce the search space while keeping the
> > > optimal optimization opportunity.
> > > >
> > > > 3. This is by design. Nodes produced from "passThrough" and "derive"
> > and
> > > just sibling physical node with different traitset, we only need the
> > > initial physical nodes after implementation to avoid unnecessary
> > > operations. The fundamental reason is, unlike Orca optimizer where
> > physical
> > > node and physical property are separate things, Calcite's
> > logical/physical
> > > nodes contains traitset. With regard to the latter question, can you
> give
> > > an example?
> > > >
> > > > Regards,
> > > > Haisheng Yuan
> > > >
> > > >
> > > > On 2021/05/26 20:11:57, Vladimir Ozerov <ppoze...@gmail.com> wrote:
> > > > > Hi,
> > > > >
> > > > > I tried to optimize a certain combination of operators for the
> > > distributed
> > > > > engine and got stuck with the trait propagation in the top-down
> > > engine. I
> > > > > want to ask the community for advice on whether the problem is
> > solvable
> > > > > with the current Apache Calcite implementation or not.
> > > > >
> > > > > Consider the following logical tree:
> > > > > 3: LogicalAggregate[group=[a], F2(c)]
> > > > > 2:  LogicalAggregate[group=[a,b], F1(c)]
> > > > > 1:    LogicalScan[t]
> > > > >
> > > > > Consider that these two aggregates cannot be merged or simplified
> for
> > > > > whatever reason. We have only a set of physical rules to translate
> > this
> > > > > logical tree to a physical tree. Also, there could be any number of
> > > > > other operators between these two aggregates. We omit them for
> > clarity,
> > > > > assuming that the distribution is not destroyed.
> > > > >
> > > > > In the distributed environment, non-collocated aggregates are often
> > > > > implemented in two phases: local pre-aggregation and final
> > aggregation,
> > > > > with an exchange in between. Consider that the Scan operator is
> hash
> > > > > distributed by some key other than [a] or [b]. If we optimize
> > operators
> > > > > without considering the whole plan, we may optimize each operator
> > > > > independently, which would give us the following plan:
> > > > > 3: PhysicalAggregate[group=[a], F2_phase2(c)]             //
> > > > > HASH_DISTRIBUTED [a]
> > > > > 3:   Exchange[a]                                          //
> > > > > HASH_DISTRIBUTED [a]
> > > > > 3:     PhysicalAggregate[group=[a], F2_phase1(c)]         //
> > > > > HASH_DISTRIBUTED [a,b]
> > > > > 2:       PhysicalAggregate[group=[a,b], F1_phase2(c)]     //
> > > > > HASH_DISTRIBUTED [a,b]
> > > > > 2:         Exchange[a, b]                                 //
> > > > > HASH_DISTRIBUTED [a,b]
> > > > > 2:           PhysicalAggregate[group=[a,b], F1_phase1(c)] //
> > > > > HASH_DISTRIBUTED [d]
> > > > > 1:             PhysicalScan[t]                            //
> > > > > HASH_DISTRIBUTED [d]
> > > > >
> > > > > This plan is not optimal, because we re-hash inputs twice. A better
> > > plan
> > > > > that we want to get:
> > > > > 3: PhysicalAggregate[group=[a], F2(c)]                //
> > > HASH_DISTRIBUTED
> > > > > [a]
> > > > > 2:   PhysicalAggregate[group=[a,b], F1_phase2(c)]     //
> > > HASH_DISTRIBUTED
> > > > > [a]
> > > > > 2:     Exchange[a]                                    //
> > > HASH_DISTRIBUTED
> > > > > [a]
> > > > > 2:       PhysicalAggregate[group=[a,b], F1_phase1(c)] //
> > > HASH_DISTRIBUTED
> > > > > [d]
> > > > > 1:         PhysicalScan[t]                            //
> > > HASH_DISTRIBUTED
> > > > > [d]
> > > > >
> > > > > In this case, we take advantage of the fact that the distribution
> [a]
> > > is
> > > > > compatible with [a,b]. Therefore we may enforce only [a], instead
> of
> > > doing
> > > > > [a,b] and then [a]. Since exchange operators are very expensive,
> this
> > > > > optimization may bring a significant boost to the query engine. Now
> > the
> > > > > question - how do we reach that state? Intuitively, a pass-through
> is
> > > > > exactly what we need. We may pass the optimization request from top
> > > > > aggregate to bottom aggregate to find physical implementations
> shared
> > > by
> > > > > [a]. But the devil is in the details - when and how exactly to pass
> > > this
> > > > > request?
> > > > >
> > > > > Typically, we have a conversion rule that converts a logical
> > aggregate
> > > to a
> > > > > physical aggregate. We may invoke "convert" on the input to
> initiate
> > > the
> > > > > pass-through:
> > > > >
> > > > > RelNode convert(...) {
> > > > >     return new PhysicalAggregate(
> > > > >         convert(input, HASH_DISTRIBUTED[a])
> > > > >     )
> > > > > }
> > > > >
> > > > > The first problem - we cannot create the normal physical aggregate
> > here
> > > > > because we do not know input traits yet. The final decision whether
> > to
> > > do a
> > > > > one-phase or two-phase aggregate can be made only in the
> > > > > "PhysicalNode.derive" method when concrete input traits are
> resolved.
> > > > > Therefore the converter rule should create a kind of "template"
> > > physical
> > > > > operator, which would be used to construct the final operator(s)
> when
> > > input
> > > > > traits are resolved. AFAIU Enumerable works similarly: we create
> > > operators
> > > > > with virtually arbitrary traits taken from logical nodes in the
> > > conversion
> > > > > rules. We only later do create normal nodes in the derive()
> methods.
> > > > >
> > > > > The second problem - our top aggregate doesn't actually need the
> > > > > HASH_DISTRIBUTED[a] input. Instead, it may accept inputs with any
> > > > > distribution. What we really need is to inform the input (bottom
> > > aggregate)
> > > > > that it should look for additional implementations that satisfy
> > > > > HASH_DISTRIBUTED[a]. Therefore, enforcing a specific distribution
> on
> > > the
> > > > > input using the "convert" method is not what we need because this
> > > > > conversion might enforce unnecessary exchanges.
> > > > >
> > > > > The third problem - derivation. Consider that we delivered the
> > > optimization
> > > > > request to the bottom aggregate. As an implementor, what am I
> > supposed
> > > to
> > > > > do in this method? I cannot return the final aggregate from here
> > > because
> > > > > the real input traits are not derived yet. Therefore, I can only
> > return
> > > > > another template, hoping that the "derive" method will be called on
> > it.
> > > > > However, this will not happen because trait derivation is skipped
> on
> > > the
> > > > > nodes emitted from pass-through. See "DeriveTrait.perform" [1].
> > > > >
> > > > > BottomAggregate {
> > > > >     RelNode passThrough(distribution=HASH_DISTRIBUTED[a]) {
> > > > >         // ???
> > > > >     }
> > > > > }
> > > > >
> > > > > I feel that I am either going in the wrong direction, or some gaps
> in
> > > the
> > > > > product disallow such optimization. So I would like to ask the
> > > community to
> > > > > assist with the following questions:
> > > > > 1. In the top-down optimizer, how should we convert a logical node
> > to a
> > > > > physical node, provided that "derive" is not called yet? I have a
> gut
> > > > > feeling that the trait propagation is currently not implemented to
> > the
> > > full
> > > > > extent because based on Cascades paper I would expect that parent
> > > physical
> > > > > nodes are produced after the child physical nodes. But in our
> rules,
> > > this
> > > > > is not the case - some physical nodes are produced before the trait
> > > > > derivation.
> > > > > 2. How to propagate several optimization requests to inputs? We
> need
> > > either
> > > > > inputs with a specific distribution or inputs with an arbitrary
> > > > > distribution in the example above. It seems that to achieve that, I
> > > need to
> > > > > emit several alternative nodes with different requirements to
> inputs.
> > > Does
> > > > > it make sense?
> > > > > 3. Why are nodes produced from the "passThrough" method excluded
> from
> > > trait
> > > > > derivation? If this is by design, how can I preserve the
> optimization
> > > > > request to satisfy it on the derivation stage when input traits are
> > > > > available?
> > > > >
> > > > > Regards,
> > > > > Vladimir.
> > > > >
> > > > > [1]
> > > > >
> > >
> >
> https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java#L828
> > > > >
> > > >
> > >
> >
>

Reply via email to