Hi Jinpeng, Haisheng,

Thank you for your inputs. I really appreciate that. Let me try to address
some of your comments and share some experience with the implementation of
optimizers for a distributed engine I am currently working with.

First of all, I would argue that multiple logical operators do not have a
1-1 mapping to physical operators, and Window is not special here. For
instance, LogicalAggregate doesn't have 1-1 mapping to physical aggregates
because the physical implementation can be either 1-phase or 2-phase. It
doesn't matter that the 2-phase aggregate is a composition of two 1-phase
aggregates: the whole decision of whether to go with 1-phase or 2-phase
aggregate is a physical decision that should be made based on available (or
assumed) input traits.

Consider the following logical tree:
LogicalAggregate[group=$0, agg=SUM($1)]
  Input

If I do the split on the logical phase with a separate transformation rule,
I will get the following tree:
LogicalAggregate[group=$0, agg=SUM($1)]
  LogicalAggregate[group=$0, agg=SUM($1)]
    Input

Now we have an infinite loop because the rule takes one aggregate and
produces two aggregates. To fix that, we may extend the LogicalAggregate
with some flag or so. But this (1) potentially breaks other LogicalAggregate
optimizations (e.g., transpose with other operators), and (2) breaks the
whole idea of the logical operators because the execution phase
(pre-aggregate of final aggregate) is a property of concrete backend, not a
property of relational algebra. Therefore, the typical way to optimize
LogicalAggregate is to split in the physical phase (implementation rule,
pass-through, derive). Practical systems like Dremio [1] and Flink [2]
work this way.

That said, as an optimizer developer, I need the flexibility to emit any
physical trees for the given logical operator, and 1-1 mapping cannot be
assumed. Calcite's API allows for that, and I am not aware of formal
documentation or guidelines that discourage that.

Now the question when exactly to emit the operators. Normally, we produce
operators from rules. As discussed above, if the logical operator may
produce different physical trees depending on input traits, the
recommendation is to emit all combinations, even though we do not know
whether there would be good inputs for that alternatives. This contradicts
the idea of the guided top-down search, where we explore the search space
in response to a concrete optimization request, rather than with a
pessimistic assumption that a certain plan might be required in the future.

I found a way to mitigate this problem partially. Funny, my solution is
almost similar to what Haisheng proposed for the Window operator.
1. For every logical operator, I emit a single physical operator from the
implementation rule, maintaining the exact 1-1 mapping. The emitted
operators (1) have a special flag "template" which makes their const
infinite, (2) never exposes or demands non-default traits except for
convention, (3) have OMAKASE derivation mode.
2. When the input is optimized, the "derive" is called on the template,
which produces the concrete physical tree, that is not necessarily 1-1 to
the original logical node.

Before rule:
LogicalAggregate[group=$0, agg=SUM($1)]
  LogicalInput

After rule:
PhysicalAggregate[group=$0, agg=SUM($1), template=true, cost=infinite]
  LogicalInput

After "derive" if the input is not shared on $0:
PhysicalAggregate[group=$0, agg=SUM($1)]
  Exchange
    PhysicalAggregate[group=$0, agg=SUM($1)]
      PhysicalInputNotSharded

After "derive" if the input is shared on $0:
PhysicalAggregate[group=$0, agg=SUM($1)]
  PhysicalInputNotSharded

This approach allows me to avoid the generation of unnecessary alternatives
by delaying the optimization to derive phase. The aggregate split is
implemented in rules in Dremio/Flink, but in my case, this logic migrates
to "derive".

This solution worked well for the whole TPC-DS suite until we wanted to
optimize combinations of operators rather than individual operators. A good
example is TPC-DS query 1 [3]. During the logical optimization, we get the
following logical tree, which is exactly the case that I demonstrated at
the beginning of this mail thread:
G1: Aggregate(groupBy=[ctr_store_sk])
G2:  Aggregate(groupBy=[ctr_customer_sk, ctr_store_sk]

And this is where I got stuck. I need to do a simple thing - propagate an
optimization request from G1 to G2, informing G2 that it should consider
the distribution [ctr_store_sk]. I can deliver that request to my physical
template in G2 through "convert". But the problem is that the current
Calcite implementation doesn't allow me to satisfy this request later on in
the derivation stage. Instead, I am forced to emit the final execution tree
from the "passThrough" method, which will not be notified at the derivation
stage. I prepared a scheme [4] that demonstrates the problem.

It feels that I almost achieved what I need. The last step is to ensure
that "derive" is called on the newly created template. And this is where I
think I reach the inflexibility of the current top-down optimizer
implementation. The current design forces us to define all possible
structures of physical operators in advance, but I want to delay the
decision to the derive stage when input traits are known because these
traits are essential to make the proper physical decisions.

There are some similarities with Haisheng's proposal about the Window
operator. We also maintain the 1-1 correspondence between the logical
operator and a physical template. However, Haisheng's proposal is basically
heuristic, as we split optimization into two phases (implementation,
post-processing). It is impossible to properly calculate the cost of the
Window operator because we do not know which exchanges would be needed
before the post-processing. In my case, we do the proper cost estimation
within a single expanded MEMO.

Now switching to theoretical considerations. We may make several
observations from the previous discussion:
1) Our ideas converge to the solution where every logical operator has a
single corresponding physical operator, which is later expanded into more
alternatives.
2) Optimization requests are basically sent to RelSet-s, not RelSubset-s,
as we make pairwise comparisons between the requested RelSubset and other
subsets in the set [5][6].
3) Irrespective of the design, the complete exploration requires multiple
invocations of some implementation logic for different combinations of
required traits and available input traits.

These observations led me to think that maybe trait propagation through
some dedicated nodes (templates in my case and Haisheng's Window proposal,
or pessimistically emitted physical nodes in the previous Jinpeng/Haisheng
proposal) is not the ideal design, at least for some cases.

>From the design standpoint, we propagate traits top-down and bottom-up
across equivalence groups, not individual RelSubset-s or RelNode-s.
Currently, we ignore the optimization context when optimizing the group
(except for the cost pruning). Rules emit partially constructed nodes since
neither parent requirements nor child traits are available to the rule.

Instead, there could exist a true guided top-down optimization flow when
the "guided" term applies to rules as well:
1. Pass-through: RelSet receives an optimization request and chooses
appropriate implementation rules to fire. A rule receives optimization
requests, constructs optimization requests for children (adjusting traits,
optimization budget, etc.), then sends these requests down. The process
repeated recursively until we either reach the bottom node or some set that
is already optimized for this request.
3. Derive: given the now known input traits, emit appropriate physical
nodes from the rule. Then notify the parent. Repeat the process recursively.

For common use cases, this design would require the same logic, which is
currently split between rules, "derive" and "passThrough", just the code
location will be different, as everything now converges to the rule. But
for the advanced use cases, that approach may allow for more flexible
optimization patterns, like for these two chained aggregates.

I'll try to implement both solutions - (1) emit multiple nodes from a
physical rule, and (2) enable derivation for some nodes emitted from
"passThrough", and share the results here.

Regards,
Vladimir.

[1]
https://github.com/dremio/dremio-oss/blob/8e85901e7222c81ccad3436ba9b63485503472ac/sabot/kernel/src/main/java/com/dremio/exec/planner/physical/HashAggPrule.java#L123-L146
[2]
https://github.com/apache/flink/blob/release-1.13.0/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashAggRule.scala#L101-L147
[3] https://github.com/Agirish/tpcds/blob/master/query1.sql
[4]
https://docs.google.com/document/d/1u7V4bNp51OjytKnS2kzD_ikHLiNUPbhjZfRjkTfdsDA/edit
[5]
https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java#L196
[6]
https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java#L203

пт, 28 мая 2021 г. в 00:10, Haisheng Yuan <hy...@apache.org>:

> Getting back to your window query example:
>
> > Consider the Window function:
> > SELECT
> >   AGG1 over (partition by a),
> >   AGG2 over (partition by b),
> >   AGG3 over (partition by c),
> >   ...
> > FROM input
>
> Window is quite special because the logical vs physical operator count is
> not 1 to 1, generally we generate a physical window operator for each
> window function with different partition column. That determines that once
> the physical operators are created, their order can't be changed. Hence
> your proposal of passing required traits to physical rule can mitigate the
> problem.
>
> But things would be much easier if we define a different physical window
> operator.
> For the above query, we can generate the *Single* physical window operator
> like this:
> PhysicalWindow[AGG1 over (partition by a), AGG2 over (partition by b),
> AGG3 over (partition by c)]
> or PhysicalWindow(a, b, c) for brevity.
> How do we define the physical properties for it?
> The operator delivers hash distribution on first window partition column
> a, but requires its child input to be hash distributed by its last window
> partition column c.
>
> If the parent operator request hash distribution on b, or c, the window
> operator will be called on "passthrough" method and generate
> PhysicalWindow(b, a, c), or PhysicalWindow(c, a, b). After final plan is
> generated, during post processing, we can replace the window operator with
> multiple layer nested window operators, and insert Exchange operators if
> necessary. But frankly speaking, I haven't seen any use cases of this kind
> in production.
>
> Regarding the rule alternative you proposed;
> > class PhysicalAggregateRule extends PhysicalRule {
> >  void onMatch(RelOptRuleCall call, *RelTraitSet requiredTraits*) {...
>
> Consider the following plan:
> InnerJoin (on a)
>   +-- Agg (on b)
>   +-- Scan
>
> For the inner join, we can generate sort merge join and hash join.
> The sort merge join can request the following traits to Agg:
> 1) Singleton
> 2) hash distribution on a, sorted by a
> The hash join can request the following traits to Agg:
> 1) Singleton
> 2) hash distribution on a
> 3) any distribution
> 4) broadcast distribution
>
> The PhysicalAggregateRule will be called and executed 5 times, while
> generating the same physical aggregate candidates, unless we pass a whole
> list of required traits to the physical rule, which I have prototyped some
> time ago with the exact idea.
>
> Regards,
> Haisheng Yuan
>
> On 2021/05/27 17:44:23, Haisheng Yuan <hy...@apache.org> wrote:
> > >    In distributed systems, an implementation rule may produce different
> > >    physical operators depending on the input traits. Examples are
> Aggregate,
> > >    Sort, Window.
> >
> > No, in most cases, physical operators are generated regardless the
> input, because the input traits are not know yet. Window might be an
> exception.
> >
> > >    Since input traits are not known when the rule is fired, we must
> > >    generate *all possible combinations* of physical operators that we
> may
> > >    need. For LogicalAggregate, we must generate 1-phase and 2-phase
> > >    alternatives. For LogicalSort, we also have 1-phase and 2-phase
> > >    alternatives. Etc.
> >
> > IMHO, 1 phase and 2 phase are just different logical alternatives, that
> is also why I call it a logical rule to split the aggregate into a 2 phase
> aggregate. But HashAggregate and StreamAggregate are indeed the different
> physical alternatives for a LogicalAggregate.
> >
> >
> > >   Unlike Aggregate or Sort, which may have only 1 or 2 phases, certain
> > >   logical operators may have many physical alternatives. Consider the
> Window
> > >   function:......
> >
> > In window implementation rule, when building physical operator for
> Window that has multiple window functions but with different partition
> columns, we can infer the possible traits that can be delivered by input
> operators by creating your own RelMetaData, hence multiple window
> combination with certain order, but not exhausted enumeration. In fact, the
> window ordering problem exists in every different kind of optimizer.
> >
> > > As input traits are not known when the rule is fired, the nodes emitted
> > > from the implementation rules most likely would not be used in the
> final
> > > plan.
> >
> > That is quite normal, any operator generated by implementation rule
> might not be used in the final plan, because there may be tens of thousands
> of alternatives, we only choose the one with lowest cost.
> >
> > > For example, I can create a physical aggregate that demands
> > > non-strict distribution {a,b} from its input, meaning that both [a,b]
> and
> > > [b,a] is ok. However, in the final plan, we are obligated to have a
> strict
> > > distribution - either [a, b] in that order, or [b, a] in that order -
> > > otherwise, physical operators like Join and Union will not work.
> >
> > It depends on your own satisfaction model and how do you coordinate
> property requirement among child operators. Unlike Orca optimizer, where
> there is exact match, partial satisfying, orderless match etc, Calcite's
> default implementation always require exact satisfying. But we can still
> make use of "passThrough" and "derive" to achieve our goal. i.e. the
> aggregate generated by implementation rule requires itself and its child to
> delivered distribution on [a,b], but the "derive" method tells Aggregate
> that [b,a] is available, it can generate another option to require [b,a]
> instead.
> >
> > > In distributed engines, the nodes emitted from rules are basically
> "templates"
> > > that must be replaced with normal nodes.
> >
> > There is no difference between distributed and non-distributed engines
> when dealing with this. In Orca and CockroachDB optimizer, the nodes
> emitted from rules are operators without physical properties, the optimizer
> then request physical properties in top-down manner, either recursively or
> stack, or state machine. Calcite is quite different. when the physical
> operator is generated by implementation rule, the physical operator must
> has its own traits, at the same time, the traits that it expects its child
> operators to deliver. So in Calcite, they are not "templates". The
> difference is there since Calcite's inception.
> >
> > Regards,
> > Haisheng Yuan
> >
> > On 2021/05/27 08:59:33, Vladimir Ozerov <ppoze...@gmail.com> wrote:
> > > Hi Haisheng,
> > >
> > > Thank you for your inputs. They are really helpful. Let me summarize
> your
> > > feedback in my own words to verify that I understand it correctly.
> > >
> > >    1. In distributed systems, an implementation rule may produce
> different
> > >    physical operators depending on the input traits. Examples are
> Aggregate,
> > >    Sort, Window.
> > >    2. Since input traits are not known when the rule is fired, we must
> > >    generate *all possible combinations* of physical operators that we
> may
> > >    need. For LogicalAggregate, we must generate 1-phase and 2-phase
> > >    alternatives. For LogicalSort, we also have 1-phase and 2-phase
> > >    alternatives. Etc.
> > >    3. If all combinations are generated, it is expected that
> "passThrough"
> > >    and "derive" would be just trivial replacements of traits for most
> cases.
> > >    This is why "passThroughTraits" and "deriveTraits" are recommended.
> A
> > >    notable exception is TableScan that may emit alternative indexes in
> > >    response to the pass-through requests.
> > >
> > > If my understanding is correct, then there are several issues with this
> > > approach still.
> > >
> > > 1. Unlike Aggregate or Sort, which may have only 1 or 2 phases, certain
> > > logical operators may have many physical alternatives. Consider the
> Window
> > > function:
> > > SELECT
> > >   AGG1 over (partition by a),
> > >   AGG2 over (partition by b),
> > >   AGG3 over (partition by c),
> > >   ...
> > > FROM input
> > >
> > > To calculate each aggregate, we need to re-shuffle the input based on
> the
> > > partition key. The key question is the order of reshuffling. If the
> input
> > > is shared by [a], I want to calculate AGG1 locally and then re-shuffle
> the
> > > input to calculate other aggregates. For the remaining AGG2 and AGG3,
> the
> > > order is also important. If the parent demands sharding by [b], then
> the
> > > proper sequence is b-c-a:
> > > 1: Window[AGG2 over (partition by b)]     // SHARDED[b]
> > > 2:   Window[AGG3 over (partition by c)]   // SHARDED[c]
> > > 3:     Window[AGG1 over (partition by a)] // SHARDED[a]
> > > 4:       Input                            // SHARDED[a]
> > >
> > > But if the parent demands [c], the proper sequence is c-b-a. Since we
> do
> > > not know real distributions when the rule is fired, we must emit all
> the
> > > permutations to ensure that no optimization opportunity is missed. But
> with
> > > complex window aggregate, this might be impractical because we will
> emit
> > > lots of unnecessary nodes.
> > >
> > > 2. As input traits are not known when the rule is fired, the nodes
> emitted
> > > from the implementation rules most likely would not be used in the
> final
> > > plan. For example, I can create a physical aggregate that demands
> > > non-strict distribution {a,b} from its input, meaning that both [a,b]
> and
> > > [b,a] is ok. However, in the final plan, we are obligated to have a
> strict
> > > distribution - either [a, b] in that order, or [b, a] in that order -
> > > otherwise, physical operators like Join and Union will not work. In
> > > distributed engines, the nodes emitted from rules are basically
> "templates"
> > > that must be replaced with normal nodes.
> > >
> > > Does this reasoning make any sense? If yes, it means that the current
> > > approach forces us to produce many unnecessary nodes to explore the
> full
> > > search space. The question is whether alternative approaches could
> better
> > > fit the requirements of the distributed engine? This is a purely
> > > theoretical question. I am currently looking deeper at CockroachDB.
> They
> > > have very different architecture: no separation between logical and
> > > physical nodes, physical properties are completely decoupled from
> nodes,
> > > usage of recursion instead of the stack, etc.
> > >
> > > Regards,
> > > Vladimir.
> > >
> > > чт, 27 мая 2021 г. в 03:19, Haisheng Yuan <hy...@apache.org>:
> > >
> > > > Another point I would like to mention is that it is not recommended
> to
> > > > override method "passThrough" and "derive" directly, override
> > > > "passThroughTraits" and "deriveTraits" instead, so that we can make
> sure
> > > > only the same type of physical node is created and no nested
> relnodes or
> > > > additional RelSets are created, unless you know you have to create
> > > > different type of nodes. For example, if the table foo has an btree
> index
> > > > on column a, and the parent relnode is requesting ordering on column
> a,
> > > > then we may consider to override "passThrough" of TableScan to
> return an
> > > > IndexScan instead of a TableScan.
> > > >
> > > > Regards,
> > > > Haisheng Yuan
> > > > On 2021/05/26 22:45:20, Haisheng Yuan <hy...@apache.org> wrote:
> > > > > Hi Vladimir,
> > > > >
> > > > > 1. You need a logical rule to split the aggregate into a local
> aggregate
> > > > and global aggregate, for example:
> > > > >
> > > >
> https://github.com/greenplum-db/gporca/blob/master/libgpopt/src/xforms/CXformSplitGbAgg.cpp
> > > > > Only implementation rules can convert a logical node to a physical
> node
> > > > or multiple physical nodes.
> > > > > After physical implementation, you have 2 physical alternatives:
> > > > > 1) single phase global physical aggregate,
> > > > > 2) 2 phase physical aggregate with local and global aggregate.
> > > > > It should be up to the cost to decide which one to choose.
> > > > >
> > > > > 2. Given a desired traitset from parent node, the current relnode
> only
> > > > needs to generate a single relnode after passing down the traitset.
> Given a
> > > > traitset delivered by child node, the current relnode only derive a
> single
> > > > relnode. Quite unlike other optimizer, in Calcite's top-down
> optimizer, you
> > > > don't need to worry about issuing multiple optimization requests to
> inputs,
> > > > which is handled by Calcite framework secretly. i.e.
> > > > > SELECT a, b, min(c) from foo group by a, b;
> > > > > In many other optimizer, we probably need ask the aggregate to
> issue 3
> > > > distribution requests for tablescan on foo, which are
> > > > > 1) hash distributed by a,
> > > > > 2) hash distributed by b,
> > > > > 3) hash distributed by a, b
> > > > > However in Calcite top-down optimizer, your physical
> implementation rule
> > > > for global aggregate only need generate a single physical node with
> hash
> > > > distribution by a, b. In case the table foo happens to be
> distributed by a,
> > > > or b, the derive() method will tell you there is an opportunity.
> This is
> > > > the feature that Calcite's top-down optimizer excels over other
> optimizers,
> > > > because this can dramatically reduce the search space while keeping
> the
> > > > optimal optimization opportunity.
> > > > >
> > > > > 3. This is by design. Nodes produced from "passThrough" and
> "derive" and
> > > > just sibling physical node with different traitset, we only need the
> > > > initial physical nodes after implementation to avoid unnecessary
> > > > operations. The fundamental reason is, unlike Orca optimizer where
> physical
> > > > node and physical property are separate things, Calcite's
> logical/physical
> > > > nodes contains traitset. With regard to the latter question, can you
> give
> > > > an example?
> > > > >
> > > > > Regards,
> > > > > Haisheng Yuan
> > > > >
> > > > >
> > > > > On 2021/05/26 20:11:57, Vladimir Ozerov <ppoze...@gmail.com>
> wrote:
> > > > > > Hi,
> > > > > >
> > > > > > I tried to optimize a certain combination of operators for the
> > > > distributed
> > > > > > engine and got stuck with the trait propagation in the top-down
> > > > engine. I
> > > > > > want to ask the community for advice on whether the problem is
> solvable
> > > > > > with the current Apache Calcite implementation or not.
> > > > > >
> > > > > > Consider the following logical tree:
> > > > > > 3: LogicalAggregate[group=[a], F2(c)]
> > > > > > 2:  LogicalAggregate[group=[a,b], F1(c)]
> > > > > > 1:    LogicalScan[t]
> > > > > >
> > > > > > Consider that these two aggregates cannot be merged or
> simplified for
> > > > > > whatever reason. We have only a set of physical rules to
> translate this
> > > > > > logical tree to a physical tree. Also, there could be any number
> of
> > > > > > other operators between these two aggregates. We omit them for
> clarity,
> > > > > > assuming that the distribution is not destroyed.
> > > > > >
> > > > > > In the distributed environment, non-collocated aggregates are
> often
> > > > > > implemented in two phases: local pre-aggregation and final
> aggregation,
> > > > > > with an exchange in between. Consider that the Scan operator is
> hash
> > > > > > distributed by some key other than [a] or [b]. If we optimize
> operators
> > > > > > without considering the whole plan, we may optimize each operator
> > > > > > independently, which would give us the following plan:
> > > > > > 3: PhysicalAggregate[group=[a], F2_phase2(c)]             //
> > > > > > HASH_DISTRIBUTED [a]
> > > > > > 3:   Exchange[a]                                          //
> > > > > > HASH_DISTRIBUTED [a]
> > > > > > 3:     PhysicalAggregate[group=[a], F2_phase1(c)]         //
> > > > > > HASH_DISTRIBUTED [a,b]
> > > > > > 2:       PhysicalAggregate[group=[a,b], F1_phase2(c)]     //
> > > > > > HASH_DISTRIBUTED [a,b]
> > > > > > 2:         Exchange[a, b]                                 //
> > > > > > HASH_DISTRIBUTED [a,b]
> > > > > > 2:           PhysicalAggregate[group=[a,b], F1_phase1(c)] //
> > > > > > HASH_DISTRIBUTED [d]
> > > > > > 1:             PhysicalScan[t]                            //
> > > > > > HASH_DISTRIBUTED [d]
> > > > > >
> > > > > > This plan is not optimal, because we re-hash inputs twice. A
> better
> > > > plan
> > > > > > that we want to get:
> > > > > > 3: PhysicalAggregate[group=[a], F2(c)]                //
> > > > HASH_DISTRIBUTED
> > > > > > [a]
> > > > > > 2:   PhysicalAggregate[group=[a,b], F1_phase2(c)]     //
> > > > HASH_DISTRIBUTED
> > > > > > [a]
> > > > > > 2:     Exchange[a]                                    //
> > > > HASH_DISTRIBUTED
> > > > > > [a]
> > > > > > 2:       PhysicalAggregate[group=[a,b], F1_phase1(c)] //
> > > > HASH_DISTRIBUTED
> > > > > > [d]
> > > > > > 1:         PhysicalScan[t]                            //
> > > > HASH_DISTRIBUTED
> > > > > > [d]
> > > > > >
> > > > > > In this case, we take advantage of the fact that the
> distribution [a]
> > > > is
> > > > > > compatible with [a,b]. Therefore we may enforce only [a],
> instead of
> > > > doing
> > > > > > [a,b] and then [a]. Since exchange operators are very expensive,
> this
> > > > > > optimization may bring a significant boost to the query engine.
> Now the
> > > > > > question - how do we reach that state? Intuitively, a
> pass-through is
> > > > > > exactly what we need. We may pass the optimization request from
> top
> > > > > > aggregate to bottom aggregate to find physical implementations
> shared
> > > > by
> > > > > > [a]. But the devil is in the details - when and how exactly to
> pass
> > > > this
> > > > > > request?
> > > > > >
> > > > > > Typically, we have a conversion rule that converts a logical
> aggregate
> > > > to a
> > > > > > physical aggregate. We may invoke "convert" on the input to
> initiate
> > > > the
> > > > > > pass-through:
> > > > > >
> > > > > > RelNode convert(...) {
> > > > > >     return new PhysicalAggregate(
> > > > > >         convert(input, HASH_DISTRIBUTED[a])
> > > > > >     )
> > > > > > }
> > > > > >
> > > > > > The first problem - we cannot create the normal physical
> aggregate here
> > > > > > because we do not know input traits yet. The final decision
> whether to
> > > > do a
> > > > > > one-phase or two-phase aggregate can be made only in the
> > > > > > "PhysicalNode.derive" method when concrete input traits are
> resolved.
> > > > > > Therefore the converter rule should create a kind of "template"
> > > > physical
> > > > > > operator, which would be used to construct the final operator(s)
> when
> > > > input
> > > > > > traits are resolved. AFAIU Enumerable works similarly: we create
> > > > operators
> > > > > > with virtually arbitrary traits taken from logical nodes in the
> > > > conversion
> > > > > > rules. We only later do create normal nodes in the derive()
> methods.
> > > > > >
> > > > > > The second problem - our top aggregate doesn't actually need the
> > > > > > HASH_DISTRIBUTED[a] input. Instead, it may accept inputs with any
> > > > > > distribution. What we really need is to inform the input (bottom
> > > > aggregate)
> > > > > > that it should look for additional implementations that satisfy
> > > > > > HASH_DISTRIBUTED[a]. Therefore, enforcing a specific
> distribution on
> > > > the
> > > > > > input using the "convert" method is not what we need because this
> > > > > > conversion might enforce unnecessary exchanges.
> > > > > >
> > > > > > The third problem - derivation. Consider that we delivered the
> > > > optimization
> > > > > > request to the bottom aggregate. As an implementor, what am I
> supposed
> > > > to
> > > > > > do in this method? I cannot return the final aggregate from here
> > > > because
> > > > > > the real input traits are not derived yet. Therefore, I can only
> return
> > > > > > another template, hoping that the "derive" method will be called
> on it.
> > > > > > However, this will not happen because trait derivation is
> skipped on
> > > > the
> > > > > > nodes emitted from pass-through. See "DeriveTrait.perform" [1].
> > > > > >
> > > > > > BottomAggregate {
> > > > > >     RelNode passThrough(distribution=HASH_DISTRIBUTED[a]) {
> > > > > >         // ???
> > > > > >     }
> > > > > > }
> > > > > >
> > > > > > I feel that I am either going in the wrong direction, or some
> gaps in
> > > > the
> > > > > > product disallow such optimization. So I would like to ask the
> > > > community to
> > > > > > assist with the following questions:
> > > > > > 1. In the top-down optimizer, how should we convert a logical
> node to a
> > > > > > physical node, provided that "derive" is not called yet? I have
> a gut
> > > > > > feeling that the trait propagation is currently not implemented
> to the
> > > > full
> > > > > > extent because based on Cascades paper I would expect that parent
> > > > physical
> > > > > > nodes are produced after the child physical nodes. But in our
> rules,
> > > > this
> > > > > > is not the case - some physical nodes are produced before the
> trait
> > > > > > derivation.
> > > > > > 2. How to propagate several optimization requests to inputs? We
> need
> > > > either
> > > > > > inputs with a specific distribution or inputs with an arbitrary
> > > > > > distribution in the example above. It seems that to achieve
> that, I
> > > > need to
> > > > > > emit several alternative nodes with different requirements to
> inputs.
> > > > Does
> > > > > > it make sense?
> > > > > > 3. Why are nodes produced from the "passThrough" method excluded
> from
> > > > trait
> > > > > > derivation? If this is by design, how can I preserve the
> optimization
> > > > > > request to satisfy it on the derivation stage when input traits
> are
> > > > > > available?
> > > > > >
> > > > > > Regards,
> > > > > > Vladimir.
> > > > > >
> > > > > > [1]
> > > > > >
> > > >
> https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java#L828
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to