> 2) Optimization requests are basically sent to RelSet-s, not RelSubset-s, > as we make pairwise comparisons between the requested RelSubset and other > subsets in the set [5][6].
I agree with you. There could be some waste when the new delivered / required traitset is generated by "passThrough"/ "derive", in which case, we only need enforcer between the pair of subsets, instead of pairing with all other required / delivered subsets in the RelSet. i.e. In the MEMO group, we have 2 required traitsets: 1) Hash[a] Sort[b] 2) Hash[b] Sort[c] When we try to pass Hash[a] Sort[b] to one of physical operators say Project, we found that we can pass down Hash[a] down to its child, then we get a new physical Project with traitset Hash[a], we only need enforcer between Hash[a] and Hash[a]Sort[b], but currently in method "addConverters", we also generate enforcer between Hash[a] and Hash[b]Sort[c], which is not actually what we want. I think it is definitely worth trying to optimize. Regards, Haisheng Yuan On 2021/05/28 19:15:03, Haisheng Yuan <hy...@apache.org> wrote: > Hi Vladimir, > > The top-down optimizer does NOT require implementation rule to generate 1 to > 1 physical operator for a logical operator, as you can see, if you generate a > 2 phase physical aggregates for the logical aggregate in the implementation > rule, it still works. Window is special because we can reshuffle the > execution order of window functions, and that order makes a difference > according to different parent physical property request. A single converged > physical Window operator catered for this speciality. However as I said I > don't think it is a common scenario. > > > the whole decision of whether to go with 1-phase or 2-phase > > aggregate is a physical decision that should be made based on available (or > > assumed) input traits. > What is the problem of generating both 1-phase and 2-phase aggregates and > choose the best one based on the cost? > > Let's see the following query: > select a, min(b) from (select * from foo, bar where foo.a=bar.a) t group by a; > suppose foo is randomly distributed fact table, and bar is randomly > distributed dimension table. > Consider the 2 following plans: > 1) > PhysicalAggregate > +-- HashJoin > +-- HashDistribute by a > +-- TableScan on foo > +-- HashDistribute by a > +-- TableScan on bar > > 2) > PhysicalAggregate(global) > +-- HashDistribute by a > +---- PhysicalAggregate(local) > +---- HashJoin > +-- TableScan on foo > +-- Broadcast > +-- TableScan on bar > > Can you tell that the single phase aggregate plan is always better than the 2 > phase aggregate plan? > > > Therefore, the typical way to optimize > > LogicalAggregate is to split in the physical phase (implementation rule, > > pass-through, derive). Practical systems like Dremio [1] and Flink [2] > > work this way. > Dremio and Flink work this way doesn't mean it is a good way. Greenplum Orca > and Alibaba MaxCompute optimizer work in another way. In Flink and Dremio, > they have HashAggPRule to generate 1 phase HashAgg and 2 phase HashAgg, > SortAggPRule to generate 1 phase SortAgg and 2 phase SortAgg. However do you > think there is possibility that the global SortAgg combined with local > HashAgg, or the global HashAgg combined with local SortAgg may perform better > in difference cases? Are you going to generate all the 4 combinations in the > implementation rule? There are some cases we found we'd better to split the > aggregate into 3 phase aggregate [1], in which case, will the implementation > rule generate 3 HashAggs or 3 SortAggs, or all the 6 combinations? > > In our system, we have 1 phase, 2 phase, 3 phase logical aggregate rules to > transform the LogicalAggregate to another kind of logical aggregate(s) with > phase info, say LogicalXXXAggregate, then our physical aggregate rules match > this kind of node to generate HashAgg or StreamAgg. Of course, in the logical > rules, we can add business logic to guess the possible traits delivered by > child nodes to determine whether the rule definitely won't generate a better > alternative and may decide to abort this transformation early. But I would > rather let the cost model decide. > > Admittedly, the current top-down optimization is not pure on-demand request > oriented, because it will always generate a physical request regardless the > parent nodes' trait request. For example the following query in a > non-distributed environment: > select a, b, c, max(d) from foo group by a, b, c order by a desc; > > It will first generate a StreamAgg[a ASC, b ASC, c ASC] no matter what the > parent node requires, then the "passThrough" tells StreamAgg that parent > requires [a DESC], we get a StreamAgg[a DESC, b ASC, c ASC]. It would be > ideal if we only generate StreamAgg[a DESC, b ASC, c ASC] by request, but I > don't think that will make much difference, the bottleneck relies on the join > order enumeration and the Project related operation. > > Regards, > Haisheng Yuan > > [1] > https://github.com/greenplum-db/gporca/blob/master/libgpopt/src/xforms/CXformSplitDQA.cpp > > On 2021/05/28 09:17:45, Vladimir Ozerov <ppoze...@gmail.com> wrote: > > Hi Jinpeng, Haisheng, > > > > Thank you for your inputs. I really appreciate that. Let me try to address > > some of your comments and share some experience with the implementation of > > optimizers for a distributed engine I am currently working with. > > > > First of all, I would argue that multiple logical operators do not have a > > 1-1 mapping to physical operators, and Window is not special here. For > > instance, LogicalAggregate doesn't have 1-1 mapping to physical aggregates > > because the physical implementation can be either 1-phase or 2-phase. It > > doesn't matter that the 2-phase aggregate is a composition of two 1-phase > > aggregates: the whole decision of whether to go with 1-phase or 2-phase > > aggregate is a physical decision that should be made based on available (or > > assumed) input traits. > > > > Consider the following logical tree: > > LogicalAggregate[group=$0, agg=SUM($1)] > > Input > > > > If I do the split on the logical phase with a separate transformation rule, > > I will get the following tree: > > LogicalAggregate[group=$0, agg=SUM($1)] > > LogicalAggregate[group=$0, agg=SUM($1)] > > Input > > > > Now we have an infinite loop because the rule takes one aggregate and > > produces two aggregates. To fix that, we may extend the LogicalAggregate > > with some flag or so. But this (1) potentially breaks other LogicalAggregate > > optimizations (e.g., transpose with other operators), and (2) breaks the > > whole idea of the logical operators because the execution phase > > (pre-aggregate of final aggregate) is a property of concrete backend, not a > > property of relational algebra. Therefore, the typical way to optimize > > LogicalAggregate is to split in the physical phase (implementation rule, > > pass-through, derive). Practical systems like Dremio [1] and Flink [2] > > work this way. > > > > That said, as an optimizer developer, I need the flexibility to emit any > > physical trees for the given logical operator, and 1-1 mapping cannot be > > assumed. Calcite's API allows for that, and I am not aware of formal > > documentation or guidelines that discourage that. > > > > Now the question when exactly to emit the operators. Normally, we produce > > operators from rules. As discussed above, if the logical operator may > > produce different physical trees depending on input traits, the > > recommendation is to emit all combinations, even though we do not know > > whether there would be good inputs for that alternatives. This contradicts > > the idea of the guided top-down search, where we explore the search space > > in response to a concrete optimization request, rather than with a > > pessimistic assumption that a certain plan might be required in the future. > > > > I found a way to mitigate this problem partially. Funny, my solution is > > almost similar to what Haisheng proposed for the Window operator. > > 1. For every logical operator, I emit a single physical operator from the > > implementation rule, maintaining the exact 1-1 mapping. The emitted > > operators (1) have a special flag "template" which makes their const > > infinite, (2) never exposes or demands non-default traits except for > > convention, (3) have OMAKASE derivation mode. > > 2. When the input is optimized, the "derive" is called on the template, > > which produces the concrete physical tree, that is not necessarily 1-1 to > > the original logical node. > > > > Before rule: > > LogicalAggregate[group=$0, agg=SUM($1)] > > LogicalInput > > > > After rule: > > PhysicalAggregate[group=$0, agg=SUM($1), template=true, cost=infinite] > > LogicalInput > > > > After "derive" if the input is not shared on $0: > > PhysicalAggregate[group=$0, agg=SUM($1)] > > Exchange > > PhysicalAggregate[group=$0, agg=SUM($1)] > > PhysicalInputNotSharded > > > > After "derive" if the input is shared on $0: > > PhysicalAggregate[group=$0, agg=SUM($1)] > > PhysicalInputNotSharded > > > > This approach allows me to avoid the generation of unnecessary alternatives > > by delaying the optimization to derive phase. The aggregate split is > > implemented in rules in Dremio/Flink, but in my case, this logic migrates > > to "derive". > > > > This solution worked well for the whole TPC-DS suite until we wanted to > > optimize combinations of operators rather than individual operators. A good > > example is TPC-DS query 1 [3]. During the logical optimization, we get the > > following logical tree, which is exactly the case that I demonstrated at > > the beginning of this mail thread: > > G1: Aggregate(groupBy=[ctr_store_sk]) > > G2: Aggregate(groupBy=[ctr_customer_sk, ctr_store_sk] > > > > And this is where I got stuck. I need to do a simple thing - propagate an > > optimization request from G1 to G2, informing G2 that it should consider > > the distribution [ctr_store_sk]. I can deliver that request to my physical > > template in G2 through "convert". But the problem is that the current > > Calcite implementation doesn't allow me to satisfy this request later on in > > the derivation stage. Instead, I am forced to emit the final execution tree > > from the "passThrough" method, which will not be notified at the derivation > > stage. I prepared a scheme [4] that demonstrates the problem. > > > > It feels that I almost achieved what I need. The last step is to ensure > > that "derive" is called on the newly created template. And this is where I > > think I reach the inflexibility of the current top-down optimizer > > implementation. The current design forces us to define all possible > > structures of physical operators in advance, but I want to delay the > > decision to the derive stage when input traits are known because these > > traits are essential to make the proper physical decisions. > > > > There are some similarities with Haisheng's proposal about the Window > > operator. We also maintain the 1-1 correspondence between the logical > > operator and a physical template. However, Haisheng's proposal is basically > > heuristic, as we split optimization into two phases (implementation, > > post-processing). It is impossible to properly calculate the cost of the > > Window operator because we do not know which exchanges would be needed > > before the post-processing. In my case, we do the proper cost estimation > > within a single expanded MEMO. > > > > Now switching to theoretical considerations. We may make several > > observations from the previous discussion: > > 1) Our ideas converge to the solution where every logical operator has a > > single corresponding physical operator, which is later expanded into more > > alternatives. > > 2) Optimization requests are basically sent to RelSet-s, not RelSubset-s, > > as we make pairwise comparisons between the requested RelSubset and other > > subsets in the set [5][6]. > > 3) Irrespective of the design, the complete exploration requires multiple > > invocations of some implementation logic for different combinations of > > required traits and available input traits. > > > > These observations led me to think that maybe trait propagation through > > some dedicated nodes (templates in my case and Haisheng's Window proposal, > > or pessimistically emitted physical nodes in the previous Jinpeng/Haisheng > > proposal) is not the ideal design, at least for some cases. > > > > From the design standpoint, we propagate traits top-down and bottom-up > > across equivalence groups, not individual RelSubset-s or RelNode-s. > > Currently, we ignore the optimization context when optimizing the group > > (except for the cost pruning). Rules emit partially constructed nodes since > > neither parent requirements nor child traits are available to the rule. > > > > Instead, there could exist a true guided top-down optimization flow when > > the "guided" term applies to rules as well: > > 1. Pass-through: RelSet receives an optimization request and chooses > > appropriate implementation rules to fire. A rule receives optimization > > requests, constructs optimization requests for children (adjusting traits, > > optimization budget, etc.), then sends these requests down. The process > > repeated recursively until we either reach the bottom node or some set that > > is already optimized for this request. > > 3. Derive: given the now known input traits, emit appropriate physical > > nodes from the rule. Then notify the parent. Repeat the process recursively. > > > > For common use cases, this design would require the same logic, which is > > currently split between rules, "derive" and "passThrough", just the code > > location will be different, as everything now converges to the rule. But > > for the advanced use cases, that approach may allow for more flexible > > optimization patterns, like for these two chained aggregates. > > > > I'll try to implement both solutions - (1) emit multiple nodes from a > > physical rule, and (2) enable derivation for some nodes emitted from > > "passThrough", and share the results here. > > > > Regards, > > Vladimir. > > > > [1] > > https://github.com/dremio/dremio-oss/blob/8e85901e7222c81ccad3436ba9b63485503472ac/sabot/kernel/src/main/java/com/dremio/exec/planner/physical/HashAggPrule.java#L123-L146 > > [2] > > https://github.com/apache/flink/blob/release-1.13.0/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashAggRule.scala#L101-L147 > > [3] https://github.com/Agirish/tpcds/blob/master/query1.sql > > [4] > > https://docs.google.com/document/d/1u7V4bNp51OjytKnS2kzD_ikHLiNUPbhjZfRjkTfdsDA/edit > > [5] > > https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java#L196 > > [6] > > https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java#L203 > > > > пт, 28 мая 2021 г. в 00:10, Haisheng Yuan <hy...@apache.org>: > > > > > Getting back to your window query example: > > > > > > > Consider the Window function: > > > > SELECT > > > > AGG1 over (partition by a), > > > > AGG2 over (partition by b), > > > > AGG3 over (partition by c), > > > > ... > > > > FROM input > > > > > > Window is quite special because the logical vs physical operator count is > > > not 1 to 1, generally we generate a physical window operator for each > > > window function with different partition column. That determines that once > > > the physical operators are created, their order can't be changed. Hence > > > your proposal of passing required traits to physical rule can mitigate the > > > problem. > > > > > > But things would be much easier if we define a different physical window > > > operator. > > > For the above query, we can generate the *Single* physical window operator > > > like this: > > > PhysicalWindow[AGG1 over (partition by a), AGG2 over (partition by b), > > > AGG3 over (partition by c)] > > > or PhysicalWindow(a, b, c) for brevity. > > > How do we define the physical properties for it? > > > The operator delivers hash distribution on first window partition column > > > a, but requires its child input to be hash distributed by its last window > > > partition column c. > > > > > > If the parent operator request hash distribution on b, or c, the window > > > operator will be called on "passthrough" method and generate > > > PhysicalWindow(b, a, c), or PhysicalWindow(c, a, b). After final plan is > > > generated, during post processing, we can replace the window operator with > > > multiple layer nested window operators, and insert Exchange operators if > > > necessary. But frankly speaking, I haven't seen any use cases of this kind > > > in production. > > > > > > Regarding the rule alternative you proposed; > > > > class PhysicalAggregateRule extends PhysicalRule { > > > > void onMatch(RelOptRuleCall call, *RelTraitSet requiredTraits*) {... > > > > > > Consider the following plan: > > > InnerJoin (on a) > > > +-- Agg (on b) > > > +-- Scan > > > > > > For the inner join, we can generate sort merge join and hash join. > > > The sort merge join can request the following traits to Agg: > > > 1) Singleton > > > 2) hash distribution on a, sorted by a > > > The hash join can request the following traits to Agg: > > > 1) Singleton > > > 2) hash distribution on a > > > 3) any distribution > > > 4) broadcast distribution > > > > > > The PhysicalAggregateRule will be called and executed 5 times, while > > > generating the same physical aggregate candidates, unless we pass a whole > > > list of required traits to the physical rule, which I have prototyped some > > > time ago with the exact idea. > > > > > > Regards, > > > Haisheng Yuan > > > > > > On 2021/05/27 17:44:23, Haisheng Yuan <hy...@apache.org> wrote: > > > > > In distributed systems, an implementation rule may produce > > > > > different > > > > > physical operators depending on the input traits. Examples are > > > Aggregate, > > > > > Sort, Window. > > > > > > > > No, in most cases, physical operators are generated regardless the > > > input, because the input traits are not know yet. Window might be an > > > exception. > > > > > > > > > Since input traits are not known when the rule is fired, we must > > > > > generate *all possible combinations* of physical operators that we > > > may > > > > > need. For LogicalAggregate, we must generate 1-phase and 2-phase > > > > > alternatives. For LogicalSort, we also have 1-phase and 2-phase > > > > > alternatives. Etc. > > > > > > > > IMHO, 1 phase and 2 phase are just different logical alternatives, that > > > is also why I call it a logical rule to split the aggregate into a 2 phase > > > aggregate. But HashAggregate and StreamAggregate are indeed the different > > > physical alternatives for a LogicalAggregate. > > > > > > > > > > > > > Unlike Aggregate or Sort, which may have only 1 or 2 phases, certain > > > > > logical operators may have many physical alternatives. Consider the > > > Window > > > > > function:...... > > > > > > > > In window implementation rule, when building physical operator for > > > Window that has multiple window functions but with different partition > > > columns, we can infer the possible traits that can be delivered by input > > > operators by creating your own RelMetaData, hence multiple window > > > combination with certain order, but not exhausted enumeration. In fact, > > > the > > > window ordering problem exists in every different kind of optimizer. > > > > > > > > > As input traits are not known when the rule is fired, the nodes > > > > > emitted > > > > > from the implementation rules most likely would not be used in the > > > final > > > > > plan. > > > > > > > > That is quite normal, any operator generated by implementation rule > > > might not be used in the final plan, because there may be tens of > > > thousands > > > of alternatives, we only choose the one with lowest cost. > > > > > > > > > For example, I can create a physical aggregate that demands > > > > > non-strict distribution {a,b} from its input, meaning that both [a,b] > > > and > > > > > [b,a] is ok. However, in the final plan, we are obligated to have a > > > strict > > > > > distribution - either [a, b] in that order, or [b, a] in that order - > > > > > otherwise, physical operators like Join and Union will not work. > > > > > > > > It depends on your own satisfaction model and how do you coordinate > > > property requirement among child operators. Unlike Orca optimizer, where > > > there is exact match, partial satisfying, orderless match etc, Calcite's > > > default implementation always require exact satisfying. But we can still > > > make use of "passThrough" and "derive" to achieve our goal. i.e. the > > > aggregate generated by implementation rule requires itself and its child > > > to > > > delivered distribution on [a,b], but the "derive" method tells Aggregate > > > that [b,a] is available, it can generate another option to require [b,a] > > > instead. > > > > > > > > > In distributed engines, the nodes emitted from rules are basically > > > "templates" > > > > > that must be replaced with normal nodes. > > > > > > > > There is no difference between distributed and non-distributed engines > > > when dealing with this. In Orca and CockroachDB optimizer, the nodes > > > emitted from rules are operators without physical properties, the > > > optimizer > > > then request physical properties in top-down manner, either recursively or > > > stack, or state machine. Calcite is quite different. when the physical > > > operator is generated by implementation rule, the physical operator must > > > has its own traits, at the same time, the traits that it expects its child > > > operators to deliver. So in Calcite, they are not "templates". The > > > difference is there since Calcite's inception. > > > > > > > > Regards, > > > > Haisheng Yuan > > > > > > > > On 2021/05/27 08:59:33, Vladimir Ozerov <ppoze...@gmail.com> wrote: > > > > > Hi Haisheng, > > > > > > > > > > Thank you for your inputs. They are really helpful. Let me summarize > > > your > > > > > feedback in my own words to verify that I understand it correctly. > > > > > > > > > > 1. In distributed systems, an implementation rule may produce > > > different > > > > > physical operators depending on the input traits. Examples are > > > Aggregate, > > > > > Sort, Window. > > > > > 2. Since input traits are not known when the rule is fired, we must > > > > > generate *all possible combinations* of physical operators that we > > > may > > > > > need. For LogicalAggregate, we must generate 1-phase and 2-phase > > > > > alternatives. For LogicalSort, we also have 1-phase and 2-phase > > > > > alternatives. Etc. > > > > > 3. If all combinations are generated, it is expected that > > > "passThrough" > > > > > and "derive" would be just trivial replacements of traits for most > > > cases. > > > > > This is why "passThroughTraits" and "deriveTraits" are recommended. > > > A > > > > > notable exception is TableScan that may emit alternative indexes in > > > > > response to the pass-through requests. > > > > > > > > > > If my understanding is correct, then there are several issues with > > > > > this > > > > > approach still. > > > > > > > > > > 1. Unlike Aggregate or Sort, which may have only 1 or 2 phases, > > > > > certain > > > > > logical operators may have many physical alternatives. Consider the > > > Window > > > > > function: > > > > > SELECT > > > > > AGG1 over (partition by a), > > > > > AGG2 over (partition by b), > > > > > AGG3 over (partition by c), > > > > > ... > > > > > FROM input > > > > > > > > > > To calculate each aggregate, we need to re-shuffle the input based on > > > the > > > > > partition key. The key question is the order of reshuffling. If the > > > input > > > > > is shared by [a], I want to calculate AGG1 locally and then re-shuffle > > > the > > > > > input to calculate other aggregates. For the remaining AGG2 and AGG3, > > > the > > > > > order is also important. If the parent demands sharding by [b], then > > > the > > > > > proper sequence is b-c-a: > > > > > 1: Window[AGG2 over (partition by b)] // SHARDED[b] > > > > > 2: Window[AGG3 over (partition by c)] // SHARDED[c] > > > > > 3: Window[AGG1 over (partition by a)] // SHARDED[a] > > > > > 4: Input // SHARDED[a] > > > > > > > > > > But if the parent demands [c], the proper sequence is c-b-a. Since we > > > do > > > > > not know real distributions when the rule is fired, we must emit all > > > the > > > > > permutations to ensure that no optimization opportunity is missed. But > > > with > > > > > complex window aggregate, this might be impractical because we will > > > emit > > > > > lots of unnecessary nodes. > > > > > > > > > > 2. As input traits are not known when the rule is fired, the nodes > > > emitted > > > > > from the implementation rules most likely would not be used in the > > > final > > > > > plan. For example, I can create a physical aggregate that demands > > > > > non-strict distribution {a,b} from its input, meaning that both [a,b] > > > and > > > > > [b,a] is ok. However, in the final plan, we are obligated to have a > > > strict > > > > > distribution - either [a, b] in that order, or [b, a] in that order - > > > > > otherwise, physical operators like Join and Union will not work. In > > > > > distributed engines, the nodes emitted from rules are basically > > > "templates" > > > > > that must be replaced with normal nodes. > > > > > > > > > > Does this reasoning make any sense? If yes, it means that the current > > > > > approach forces us to produce many unnecessary nodes to explore the > > > full > > > > > search space. The question is whether alternative approaches could > > > better > > > > > fit the requirements of the distributed engine? This is a purely > > > > > theoretical question. I am currently looking deeper at CockroachDB. > > > They > > > > > have very different architecture: no separation between logical and > > > > > physical nodes, physical properties are completely decoupled from > > > nodes, > > > > > usage of recursion instead of the stack, etc. > > > > > > > > > > Regards, > > > > > Vladimir. > > > > > > > > > > чт, 27 мая 2021 г. в 03:19, Haisheng Yuan <hy...@apache.org>: > > > > > > > > > > > Another point I would like to mention is that it is not recommended > > > to > > > > > > override method "passThrough" and "derive" directly, override > > > > > > "passThroughTraits" and "deriveTraits" instead, so that we can make > > > sure > > > > > > only the same type of physical node is created and no nested > > > relnodes or > > > > > > additional RelSets are created, unless you know you have to create > > > > > > different type of nodes. For example, if the table foo has an btree > > > index > > > > > > on column a, and the parent relnode is requesting ordering on column > > > a, > > > > > > then we may consider to override "passThrough" of TableScan to > > > return an > > > > > > IndexScan instead of a TableScan. > > > > > > > > > > > > Regards, > > > > > > Haisheng Yuan > > > > > > On 2021/05/26 22:45:20, Haisheng Yuan <hy...@apache.org> wrote: > > > > > > > Hi Vladimir, > > > > > > > > > > > > > > 1. You need a logical rule to split the aggregate into a local > > > aggregate > > > > > > and global aggregate, for example: > > > > > > > > > > > > > > > > https://github.com/greenplum-db/gporca/blob/master/libgpopt/src/xforms/CXformSplitGbAgg.cpp > > > > > > > Only implementation rules can convert a logical node to a physical > > > node > > > > > > or multiple physical nodes. > > > > > > > After physical implementation, you have 2 physical alternatives: > > > > > > > 1) single phase global physical aggregate, > > > > > > > 2) 2 phase physical aggregate with local and global aggregate. > > > > > > > It should be up to the cost to decide which one to choose. > > > > > > > > > > > > > > 2. Given a desired traitset from parent node, the current relnode > > > only > > > > > > needs to generate a single relnode after passing down the traitset. > > > Given a > > > > > > traitset delivered by child node, the current relnode only derive a > > > single > > > > > > relnode. Quite unlike other optimizer, in Calcite's top-down > > > optimizer, you > > > > > > don't need to worry about issuing multiple optimization requests to > > > inputs, > > > > > > which is handled by Calcite framework secretly. i.e. > > > > > > > SELECT a, b, min(c) from foo group by a, b; > > > > > > > In many other optimizer, we probably need ask the aggregate to > > > issue 3 > > > > > > distribution requests for tablescan on foo, which are > > > > > > > 1) hash distributed by a, > > > > > > > 2) hash distributed by b, > > > > > > > 3) hash distributed by a, b > > > > > > > However in Calcite top-down optimizer, your physical > > > implementation rule > > > > > > for global aggregate only need generate a single physical node with > > > hash > > > > > > distribution by a, b. In case the table foo happens to be > > > distributed by a, > > > > > > or b, the derive() method will tell you there is an opportunity. > > > This is > > > > > > the feature that Calcite's top-down optimizer excels over other > > > optimizers, > > > > > > because this can dramatically reduce the search space while keeping > > > the > > > > > > optimal optimization opportunity. > > > > > > > > > > > > > > 3. This is by design. Nodes produced from "passThrough" and > > > "derive" and > > > > > > just sibling physical node with different traitset, we only need the > > > > > > initial physical nodes after implementation to avoid unnecessary > > > > > > operations. The fundamental reason is, unlike Orca optimizer where > > > physical > > > > > > node and physical property are separate things, Calcite's > > > logical/physical > > > > > > nodes contains traitset. With regard to the latter question, can you > > > give > > > > > > an example? > > > > > > > > > > > > > > Regards, > > > > > > > Haisheng Yuan > > > > > > > > > > > > > > > > > > > > > On 2021/05/26 20:11:57, Vladimir Ozerov <ppoze...@gmail.com> > > > wrote: > > > > > > > > Hi, > > > > > > > > > > > > > > > > I tried to optimize a certain combination of operators for the > > > > > > distributed > > > > > > > > engine and got stuck with the trait propagation in the top-down > > > > > > engine. I > > > > > > > > want to ask the community for advice on whether the problem is > > > solvable > > > > > > > > with the current Apache Calcite implementation or not. > > > > > > > > > > > > > > > > Consider the following logical tree: > > > > > > > > 3: LogicalAggregate[group=[a], F2(c)] > > > > > > > > 2: LogicalAggregate[group=[a,b], F1(c)] > > > > > > > > 1: LogicalScan[t] > > > > > > > > > > > > > > > > Consider that these two aggregates cannot be merged or > > > simplified for > > > > > > > > whatever reason. We have only a set of physical rules to > > > translate this > > > > > > > > logical tree to a physical tree. Also, there could be any number > > > of > > > > > > > > other operators between these two aggregates. We omit them for > > > clarity, > > > > > > > > assuming that the distribution is not destroyed. > > > > > > > > > > > > > > > > In the distributed environment, non-collocated aggregates are > > > often > > > > > > > > implemented in two phases: local pre-aggregation and final > > > aggregation, > > > > > > > > with an exchange in between. Consider that the Scan operator is > > > hash > > > > > > > > distributed by some key other than [a] or [b]. If we optimize > > > operators > > > > > > > > without considering the whole plan, we may optimize each > > > > > > > > operator > > > > > > > > independently, which would give us the following plan: > > > > > > > > 3: PhysicalAggregate[group=[a], F2_phase2(c)] // > > > > > > > > HASH_DISTRIBUTED [a] > > > > > > > > 3: Exchange[a] // > > > > > > > > HASH_DISTRIBUTED [a] > > > > > > > > 3: PhysicalAggregate[group=[a], F2_phase1(c)] // > > > > > > > > HASH_DISTRIBUTED [a,b] > > > > > > > > 2: PhysicalAggregate[group=[a,b], F1_phase2(c)] // > > > > > > > > HASH_DISTRIBUTED [a,b] > > > > > > > > 2: Exchange[a, b] // > > > > > > > > HASH_DISTRIBUTED [a,b] > > > > > > > > 2: PhysicalAggregate[group=[a,b], F1_phase1(c)] // > > > > > > > > HASH_DISTRIBUTED [d] > > > > > > > > 1: PhysicalScan[t] // > > > > > > > > HASH_DISTRIBUTED [d] > > > > > > > > > > > > > > > > This plan is not optimal, because we re-hash inputs twice. A > > > better > > > > > > plan > > > > > > > > that we want to get: > > > > > > > > 3: PhysicalAggregate[group=[a], F2(c)] // > > > > > > HASH_DISTRIBUTED > > > > > > > > [a] > > > > > > > > 2: PhysicalAggregate[group=[a,b], F1_phase2(c)] // > > > > > > HASH_DISTRIBUTED > > > > > > > > [a] > > > > > > > > 2: Exchange[a] // > > > > > > HASH_DISTRIBUTED > > > > > > > > [a] > > > > > > > > 2: PhysicalAggregate[group=[a,b], F1_phase1(c)] // > > > > > > HASH_DISTRIBUTED > > > > > > > > [d] > > > > > > > > 1: PhysicalScan[t] // > > > > > > HASH_DISTRIBUTED > > > > > > > > [d] > > > > > > > > > > > > > > > > In this case, we take advantage of the fact that the > > > distribution [a] > > > > > > is > > > > > > > > compatible with [a,b]. Therefore we may enforce only [a], > > > instead of > > > > > > doing > > > > > > > > [a,b] and then [a]. Since exchange operators are very expensive, > > > this > > > > > > > > optimization may bring a significant boost to the query engine. > > > Now the > > > > > > > > question - how do we reach that state? Intuitively, a > > > pass-through is > > > > > > > > exactly what we need. We may pass the optimization request from > > > top > > > > > > > > aggregate to bottom aggregate to find physical implementations > > > shared > > > > > > by > > > > > > > > [a]. But the devil is in the details - when and how exactly to > > > pass > > > > > > this > > > > > > > > request? > > > > > > > > > > > > > > > > Typically, we have a conversion rule that converts a logical > > > aggregate > > > > > > to a > > > > > > > > physical aggregate. We may invoke "convert" on the input to > > > initiate > > > > > > the > > > > > > > > pass-through: > > > > > > > > > > > > > > > > RelNode convert(...) { > > > > > > > > return new PhysicalAggregate( > > > > > > > > convert(input, HASH_DISTRIBUTED[a]) > > > > > > > > ) > > > > > > > > } > > > > > > > > > > > > > > > > The first problem - we cannot create the normal physical > > > aggregate here > > > > > > > > because we do not know input traits yet. The final decision > > > whether to > > > > > > do a > > > > > > > > one-phase or two-phase aggregate can be made only in the > > > > > > > > "PhysicalNode.derive" method when concrete input traits are > > > resolved. > > > > > > > > Therefore the converter rule should create a kind of "template" > > > > > > physical > > > > > > > > operator, which would be used to construct the final operator(s) > > > when > > > > > > input > > > > > > > > traits are resolved. AFAIU Enumerable works similarly: we create > > > > > > operators > > > > > > > > with virtually arbitrary traits taken from logical nodes in the > > > > > > conversion > > > > > > > > rules. We only later do create normal nodes in the derive() > > > methods. > > > > > > > > > > > > > > > > The second problem - our top aggregate doesn't actually need the > > > > > > > > HASH_DISTRIBUTED[a] input. Instead, it may accept inputs with > > > > > > > > any > > > > > > > > distribution. What we really need is to inform the input (bottom > > > > > > aggregate) > > > > > > > > that it should look for additional implementations that satisfy > > > > > > > > HASH_DISTRIBUTED[a]. Therefore, enforcing a specific > > > distribution on > > > > > > the > > > > > > > > input using the "convert" method is not what we need because > > > > > > > > this > > > > > > > > conversion might enforce unnecessary exchanges. > > > > > > > > > > > > > > > > The third problem - derivation. Consider that we delivered the > > > > > > optimization > > > > > > > > request to the bottom aggregate. As an implementor, what am I > > > supposed > > > > > > to > > > > > > > > do in this method? I cannot return the final aggregate from here > > > > > > because > > > > > > > > the real input traits are not derived yet. Therefore, I can only > > > return > > > > > > > > another template, hoping that the "derive" method will be called > > > on it. > > > > > > > > However, this will not happen because trait derivation is > > > skipped on > > > > > > the > > > > > > > > nodes emitted from pass-through. See "DeriveTrait.perform" [1]. > > > > > > > > > > > > > > > > BottomAggregate { > > > > > > > > RelNode passThrough(distribution=HASH_DISTRIBUTED[a]) { > > > > > > > > // ??? > > > > > > > > } > > > > > > > > } > > > > > > > > > > > > > > > > I feel that I am either going in the wrong direction, or some > > > gaps in > > > > > > the > > > > > > > > product disallow such optimization. So I would like to ask the > > > > > > community to > > > > > > > > assist with the following questions: > > > > > > > > 1. In the top-down optimizer, how should we convert a logical > > > node to a > > > > > > > > physical node, provided that "derive" is not called yet? I have > > > a gut > > > > > > > > feeling that the trait propagation is currently not implemented > > > to the > > > > > > full > > > > > > > > extent because based on Cascades paper I would expect that > > > > > > > > parent > > > > > > physical > > > > > > > > nodes are produced after the child physical nodes. But in our > > > rules, > > > > > > this > > > > > > > > is not the case - some physical nodes are produced before the > > > trait > > > > > > > > derivation. > > > > > > > > 2. How to propagate several optimization requests to inputs? We > > > need > > > > > > either > > > > > > > > inputs with a specific distribution or inputs with an arbitrary > > > > > > > > distribution in the example above. It seems that to achieve > > > that, I > > > > > > need to > > > > > > > > emit several alternative nodes with different requirements to > > > inputs. > > > > > > Does > > > > > > > > it make sense? > > > > > > > > 3. Why are nodes produced from the "passThrough" method excluded > > > from > > > > > > trait > > > > > > > > derivation? If this is by design, how can I preserve the > > > optimization > > > > > > > > request to satisfy it on the derivation stage when input traits > > > are > > > > > > > > available? > > > > > > > > > > > > > > > > Regards, > > > > > > > > Vladimir. > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java#L828 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >