Hi,Vladimir. This could be a picture of how calcite optimize the two aggregates problem:
step 1: Without any hints for pruning, BOTH implementation of aggregations should be built and held in memo. For the top aggregation, the one-pass implementation requests a HASH_DISTRIBUTED [a] distribution to its input and the two-pass implementation requests an "ANY" distribution to its input. When bottom aggregation gets built, it also builds two implementations. So we get 4 valids candidates: // Candidate 1. top agg is one-pass and bottom agg is one-pass PhysicalAggregate[group=[a], F2_phase2(c)] Exchange[dist[a]] PhysicalAggregate[group=[a,b], F1_phase2(c)] Exchange[dist[a,b]] ... // Candidate 2. top agg is one-pass and bottom agg is two-pass PhysicalAggregate[group=[a], F2_phase2(c)] Exchange[dist[a]] PhysicalAggregate[group=[a,b], F1_phase2(c)] Exchange[dist[a,b]] PhysicalAggregate[group=[a,b], F1_phase1(c)] ... // Candidate 3. top agg is two-pass, bottom agg is one-pass PhysicalAggregate[group=[a], F2_phase2(c)] Exchange[dist[a]] PhysicalAggregate[group=[a], F2_phase1(c)] PhysicalAggregate[group=[a,b], F1_phase2(c)] Exchange[dist[a,b]] ... // Candidate 4. top agg is two-pass, bottom agg is two-pass PhysicalAggregate[group=[a], F2_phase2(c)] Exchange[dist[a]] PhysicalAggregate[group=[a], F2_phase1(c)] PhysicalAggregate[group=[a,b], F1_phase2(c)] Exchange[dist[a,b]] PhysicalAggregate[group=[a,b], F1_phase1(c)] ... step 2: No matter which aggregation is built first. The calcite framework passes the HASH_DISTRIBUTED[a] trait requirement through bottom aggregation, both implementations. Note that a concrete physical node only needs to consider its own implementation. And we get two more valid candidates: // Candidate 5. passThrough called on candidate1 PhysicalAggregate[group=[a], F2_phase2(c)] PhysicalAggregate[group=[a,b], F1_phase2(c)] // deliver dist[a] Exchange[dist[a]] ... // Candidate 6. passThrough called on candidate2 PhysicalAggregate[group=[a], F2_phase2(c)] PhysicalAggregate[group=[a,b], F1_phase2(c)] // deliver dist[a] Exchange[dist[a]] PhysicalAggregate[group=[a,b], F1_phase1(c)] ... step 3: The cost model chooses the best candidate. Note that Candidate 5 is not always the best. For example, when it is detected, from stats or other, that data is skewed on key [a], Candidate 2 may be better. When it is detected that NDV(a, b) = 0.99 * ROWCOUNT() , Candidate 6 is preferred, as partial aggregate can reduce little data. So it is not wasty to build all those candidates. Most of the above works are done by calcite frameworks. Users only need to: 1. Fire both implementations during aggregation builds. 2. Overwrite the passThroughTraits method. Thanks, Jinpeng Wu On Thu, May 27, 2021 at 8:19 AM Haisheng Yuan <hy...@apache.org> wrote: > Another point I would like to mention is that it is not recommended to > override method "passThrough" and "derive" directly, override > "passThroughTraits" and "deriveTraits" instead, so that we can make sure > only the same type of physical node is created and no nested relnodes or > additional RelSets are created, unless you know you have to create > different type of nodes. For example, if the table foo has an btree index > on column a, and the parent relnode is requesting ordering on column a, > then we may consider to override "passThrough" of TableScan to return an > IndexScan instead of a TableScan. > > Regards, > Haisheng Yuan > On 2021/05/26 22:45:20, Haisheng Yuan <hy...@apache.org> wrote: > > Hi Vladimir, > > > > 1. You need a logical rule to split the aggregate into a local aggregate > and global aggregate, for example: > > > https://github.com/greenplum-db/gporca/blob/master/libgpopt/src/xforms/CXformSplitGbAgg.cpp > > Only implementation rules can convert a logical node to a physical node > or multiple physical nodes. > > After physical implementation, you have 2 physical alternatives: > > 1) single phase global physical aggregate, > > 2) 2 phase physical aggregate with local and global aggregate. > > It should be up to the cost to decide which one to choose. > > > > 2. Given a desired traitset from parent node, the current relnode only > needs to generate a single relnode after passing down the traitset. Given a > traitset delivered by child node, the current relnode only derive a single > relnode. Quite unlike other optimizer, in Calcite's top-down optimizer, you > don't need to worry about issuing multiple optimization requests to inputs, > which is handled by Calcite framework secretly. i.e. > > SELECT a, b, min(c) from foo group by a, b; > > In many other optimizer, we probably need ask the aggregate to issue 3 > distribution requests for tablescan on foo, which are > > 1) hash distributed by a, > > 2) hash distributed by b, > > 3) hash distributed by a, b > > However in Calcite top-down optimizer, your physical implementation rule > for global aggregate only need generate a single physical node with hash > distribution by a, b. In case the table foo happens to be distributed by a, > or b, the derive() method will tell you there is an opportunity. This is > the feature that Calcite's top-down optimizer excels over other optimizers, > because this can dramatically reduce the search space while keeping the > optimal optimization opportunity. > > > > 3. This is by design. Nodes produced from "passThrough" and "derive" and > just sibling physical node with different traitset, we only need the > initial physical nodes after implementation to avoid unnecessary > operations. The fundamental reason is, unlike Orca optimizer where physical > node and physical property are separate things, Calcite's logical/physical > nodes contains traitset. With regard to the latter question, can you give > an example? > > > > Regards, > > Haisheng Yuan > > > > > > On 2021/05/26 20:11:57, Vladimir Ozerov <ppoze...@gmail.com> wrote: > > > Hi, > > > > > > I tried to optimize a certain combination of operators for the > distributed > > > engine and got stuck with the trait propagation in the top-down > engine. I > > > want to ask the community for advice on whether the problem is solvable > > > with the current Apache Calcite implementation or not. > > > > > > Consider the following logical tree: > > > 3: LogicalAggregate[group=[a], F2(c)] > > > 2: LogicalAggregate[group=[a,b], F1(c)] > > > 1: LogicalScan[t] > > > > > > Consider that these two aggregates cannot be merged or simplified for > > > whatever reason. We have only a set of physical rules to translate this > > > logical tree to a physical tree. Also, there could be any number of > > > other operators between these two aggregates. We omit them for clarity, > > > assuming that the distribution is not destroyed. > > > > > > In the distributed environment, non-collocated aggregates are often > > > implemented in two phases: local pre-aggregation and final aggregation, > > > with an exchange in between. Consider that the Scan operator is hash > > > distributed by some key other than [a] or [b]. If we optimize operators > > > without considering the whole plan, we may optimize each operator > > > independently, which would give us the following plan: > > > 3: PhysicalAggregate[group=[a], F2_phase2(c)] // > > > HASH_DISTRIBUTED [a] > > > 3: Exchange[a] // > > > HASH_DISTRIBUTED [a] > > > 3: PhysicalAggregate[group=[a], F2_phase1(c)] // > > > HASH_DISTRIBUTED [a,b] > > > 2: PhysicalAggregate[group=[a,b], F1_phase2(c)] // > > > HASH_DISTRIBUTED [a,b] > > > 2: Exchange[a, b] // > > > HASH_DISTRIBUTED [a,b] > > > 2: PhysicalAggregate[group=[a,b], F1_phase1(c)] // > > > HASH_DISTRIBUTED [d] > > > 1: PhysicalScan[t] // > > > HASH_DISTRIBUTED [d] > > > > > > This plan is not optimal, because we re-hash inputs twice. A better > plan > > > that we want to get: > > > 3: PhysicalAggregate[group=[a], F2(c)] // > HASH_DISTRIBUTED > > > [a] > > > 2: PhysicalAggregate[group=[a,b], F1_phase2(c)] // > HASH_DISTRIBUTED > > > [a] > > > 2: Exchange[a] // > HASH_DISTRIBUTED > > > [a] > > > 2: PhysicalAggregate[group=[a,b], F1_phase1(c)] // > HASH_DISTRIBUTED > > > [d] > > > 1: PhysicalScan[t] // > HASH_DISTRIBUTED > > > [d] > > > > > > In this case, we take advantage of the fact that the distribution [a] > is > > > compatible with [a,b]. Therefore we may enforce only [a], instead of > doing > > > [a,b] and then [a]. Since exchange operators are very expensive, this > > > optimization may bring a significant boost to the query engine. Now the > > > question - how do we reach that state? Intuitively, a pass-through is > > > exactly what we need. We may pass the optimization request from top > > > aggregate to bottom aggregate to find physical implementations shared > by > > > [a]. But the devil is in the details - when and how exactly to pass > this > > > request? > > > > > > Typically, we have a conversion rule that converts a logical aggregate > to a > > > physical aggregate. We may invoke "convert" on the input to initiate > the > > > pass-through: > > > > > > RelNode convert(...) { > > > return new PhysicalAggregate( > > > convert(input, HASH_DISTRIBUTED[a]) > > > ) > > > } > > > > > > The first problem - we cannot create the normal physical aggregate here > > > because we do not know input traits yet. The final decision whether to > do a > > > one-phase or two-phase aggregate can be made only in the > > > "PhysicalNode.derive" method when concrete input traits are resolved. > > > Therefore the converter rule should create a kind of "template" > physical > > > operator, which would be used to construct the final operator(s) when > input > > > traits are resolved. AFAIU Enumerable works similarly: we create > operators > > > with virtually arbitrary traits taken from logical nodes in the > conversion > > > rules. We only later do create normal nodes in the derive() methods. > > > > > > The second problem - our top aggregate doesn't actually need the > > > HASH_DISTRIBUTED[a] input. Instead, it may accept inputs with any > > > distribution. What we really need is to inform the input (bottom > aggregate) > > > that it should look for additional implementations that satisfy > > > HASH_DISTRIBUTED[a]. Therefore, enforcing a specific distribution on > the > > > input using the "convert" method is not what we need because this > > > conversion might enforce unnecessary exchanges. > > > > > > The third problem - derivation. Consider that we delivered the > optimization > > > request to the bottom aggregate. As an implementor, what am I supposed > to > > > do in this method? I cannot return the final aggregate from here > because > > > the real input traits are not derived yet. Therefore, I can only return > > > another template, hoping that the "derive" method will be called on it. > > > However, this will not happen because trait derivation is skipped on > the > > > nodes emitted from pass-through. See "DeriveTrait.perform" [1]. > > > > > > BottomAggregate { > > > RelNode passThrough(distribution=HASH_DISTRIBUTED[a]) { > > > // ??? > > > } > > > } > > > > > > I feel that I am either going in the wrong direction, or some gaps in > the > > > product disallow such optimization. So I would like to ask the > community to > > > assist with the following questions: > > > 1. In the top-down optimizer, how should we convert a logical node to a > > > physical node, provided that "derive" is not called yet? I have a gut > > > feeling that the trait propagation is currently not implemented to the > full > > > extent because based on Cascades paper I would expect that parent > physical > > > nodes are produced after the child physical nodes. But in our rules, > this > > > is not the case - some physical nodes are produced before the trait > > > derivation. > > > 2. How to propagate several optimization requests to inputs? We need > either > > > inputs with a specific distribution or inputs with an arbitrary > > > distribution in the example above. It seems that to achieve that, I > need to > > > emit several alternative nodes with different requirements to inputs. > Does > > > it make sense? > > > 3. Why are nodes produced from the "passThrough" method excluded from > trait > > > derivation? If this is by design, how can I preserve the optimization > > > request to satisfy it on the derivation stage when input traits are > > > available? > > > > > > Regards, > > > Vladimir. > > > > > > [1] > > > > https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java#L828 > > > > > >