Hi,

I tried to optimize a certain combination of operators for the distributed
engine and got stuck with the trait propagation in the top-down engine. I
want to ask the community for advice on whether the problem is solvable
with the current Apache Calcite implementation or not.

Consider the following logical tree:
3: LogicalAggregate[group=[a], F2(c)]
2:  LogicalAggregate[group=[a,b], F1(c)]
1:    LogicalScan[t]

Consider that these two aggregates cannot be merged or simplified for
whatever reason. We have only a set of physical rules to translate this
logical tree to a physical tree. Also, there could be any number of
other operators between these two aggregates. We omit them for clarity,
assuming that the distribution is not destroyed.

In the distributed environment, non-collocated aggregates are often
implemented in two phases: local pre-aggregation and final aggregation,
with an exchange in between. Consider that the Scan operator is hash
distributed by some key other than [a] or [b]. If we optimize operators
without considering the whole plan, we may optimize each operator
independently, which would give us the following plan:
3: PhysicalAggregate[group=[a], F2_phase2(c)]             //
HASH_DISTRIBUTED [a]
3:   Exchange[a]                                          //
HASH_DISTRIBUTED [a]
3:     PhysicalAggregate[group=[a], F2_phase1(c)]         //
HASH_DISTRIBUTED [a,b]
2:       PhysicalAggregate[group=[a,b], F1_phase2(c)]     //
HASH_DISTRIBUTED [a,b]
2:         Exchange[a, b]                                 //
HASH_DISTRIBUTED [a,b]
2:           PhysicalAggregate[group=[a,b], F1_phase1(c)] //
HASH_DISTRIBUTED [d]
1:             PhysicalScan[t]                            //
HASH_DISTRIBUTED [d]

This plan is not optimal, because we re-hash inputs twice. A better plan
that we want to get:
3: PhysicalAggregate[group=[a], F2(c)]                // HASH_DISTRIBUTED
[a]
2:   PhysicalAggregate[group=[a,b], F1_phase2(c)]     // HASH_DISTRIBUTED
[a]
2:     Exchange[a]                                    // HASH_DISTRIBUTED
[a]
2:       PhysicalAggregate[group=[a,b], F1_phase1(c)] // HASH_DISTRIBUTED
[d]
1:         PhysicalScan[t]                            // HASH_DISTRIBUTED
[d]

In this case, we take advantage of the fact that the distribution [a] is
compatible with [a,b]. Therefore we may enforce only [a], instead of doing
[a,b] and then [a]. Since exchange operators are very expensive, this
optimization may bring a significant boost to the query engine. Now the
question - how do we reach that state? Intuitively, a pass-through is
exactly what we need. We may pass the optimization request from top
aggregate to bottom aggregate to find physical implementations shared by
[a]. But the devil is in the details - when and how exactly to pass this
request?

Typically, we have a conversion rule that converts a logical aggregate to a
physical aggregate. We may invoke "convert" on the input to initiate the
pass-through:

RelNode convert(...) {
    return new PhysicalAggregate(
        convert(input, HASH_DISTRIBUTED[a])
    )
}

The first problem - we cannot create the normal physical aggregate here
because we do not know input traits yet. The final decision whether to do a
one-phase or two-phase aggregate can be made only in the
"PhysicalNode.derive" method when concrete input traits are resolved.
Therefore the converter rule should create a kind of "template" physical
operator, which would be used to construct the final operator(s) when input
traits are resolved. AFAIU Enumerable works similarly: we create operators
with virtually arbitrary traits taken from logical nodes in the conversion
rules. We only later do create normal nodes in the derive() methods.

The second problem - our top aggregate doesn't actually need the
HASH_DISTRIBUTED[a] input. Instead, it may accept inputs with any
distribution. What we really need is to inform the input (bottom aggregate)
that it should look for additional implementations that satisfy
HASH_DISTRIBUTED[a]. Therefore, enforcing a specific distribution on the
input using the "convert" method is not what we need because this
conversion might enforce unnecessary exchanges.

The third problem - derivation. Consider that we delivered the optimization
request to the bottom aggregate. As an implementor, what am I supposed to
do in this method? I cannot return the final aggregate from here because
the real input traits are not derived yet. Therefore, I can only return
another template, hoping that the "derive" method will be called on it.
However, this will not happen because trait derivation is skipped on the
nodes emitted from pass-through. See "DeriveTrait.perform" [1].

BottomAggregate {
    RelNode passThrough(distribution=HASH_DISTRIBUTED[a]) {
        // ???
    }
}

I feel that I am either going in the wrong direction, or some gaps in the
product disallow such optimization. So I would like to ask the community to
assist with the following questions:
1. In the top-down optimizer, how should we convert a logical node to a
physical node, provided that "derive" is not called yet? I have a gut
feeling that the trait propagation is currently not implemented to the full
extent because based on Cascades paper I would expect that parent physical
nodes are produced after the child physical nodes. But in our rules, this
is not the case - some physical nodes are produced before the trait
derivation.
2. How to propagate several optimization requests to inputs? We need either
inputs with a specific distribution or inputs with an arbitrary
distribution in the example above. It seems that to achieve that, I need to
emit several alternative nodes with different requirements to inputs. Does
it make sense?
3. Why are nodes produced from the "passThrough" method excluded from trait
derivation? If this is by design, how can I preserve the optimization
request to satisfy it on the derivation stage when input traits are
available?

Regards,
Vladimir.

[1]
https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java#L828

Reply via email to