Hi devs,
I want to discuss about a new way to optimize the multi-sink query in Flink.
Currently, Flink use the RelNodeBlock mechanism to do optimization for the
mutl-sink query.
It has following steps:
1. Multi-sink query will be parsed and validated to multi RelNode tree
2. Merge the multi RelNode's common node into a single node if
table.optimizer.reuse-optimize-block-with-digest-enabledis enabled
3. Split the multi RelNode tree to multi RelNodeBlock.
4. Feed the each RelNodeBlock to Calcite Planner to do the optimization
5. Reconstruct back to the original structure with optimized RelNodeBlock
As far as I know (Please correct me if I'm wrong). The main purpose of
RelNodeBlock is doing the following two thing:
- Calcite not support DAG optimization, So the RelNodeBlock can split
the multi-tree to much single tree, then we can leverage calcite to do the
optimization
- In the Multi-sink query, we need to reduce the repeat calculation of
the same node. So, if
table.optimizer.reuse-optimize-block-with-digest-enabledis enabled, we
can preserve the common node from being optimized to different results and
lead to the repeat calculation
However, In our production, We found the ability of the RelNodeBlock
optimization is not enough. As shown in CommonSubGraphBasedOptimizer's
comments: The optimization of the RelNodeBlock is local optimization. There
are no optimization way between the RelNodeBlock. Take a simple example:
Sink Sink
| |
Project(a,b) Project (a,b,c)
| |
Scan (a,b,c,d,e) Scan (a,b,c,d,e)
It scan from the same table, In the current optimization, we can only
choose whether or not merge the Scan to a RelNodeBlock before optimization.
If merged, the Scan can not leverage the optimization of ProjectPushDown
and so on.
If not merged, during the optimization, two RelNodeBlock will generated two
different scan {a, b} and {a,b,c}.
So I'm proposing a new way to improve the CTE optimization of the
multi-query (or single query).
1. Insert a VirtualMultiSink to pack the sink nodes together. described
in [2]. Which is inspired by the [3]
2. Insert a new Spool node (which is means produced once, consumed multi
times) to the RelNode who has multi output.
3. Implementing several rules around the Spool node
1. PushProjectToSpool to pass away the unused the fields from all the
Spool node's parents
2. PushFilterToSpool to push down the DNF conditions of all the Spool
node's parents
3. ...
1. Further more, we can implement the rule to discard the Spool, then
let the Planner to decide whether to reuse or not based on the cost of each.
2. After the physcial rewrite, we can remove the
PhysicalVirtualMultiSink and the Spool node.
The benefits of the new way is:
1. It can do the optimization in a single tree, So the local
optimization can be avoid
2. The cost based CTE optimization is available in the new solution.
3. The new solution can optimize for the CTE in both multi-query and
single-query, So the problem of [1] can also be resolved
4. Avoid the trait propagate between the RelNodeBlocks in the current
solution. as described in [4]
Looking forward to your inputs.
Best,
Aitozi.
[1]: https://issues.apache.org/jira/projects/FLINK/issues/FLINK-29088
[2]: https://issues.apache.org/jira/projects/FLINK/issues/FLINK-31205
[3]: Efficient and Extensible Algorithms for Multi Query Optimization
https://arxiv.org/pdf/cs/9910021.pdf
[4]: https://issues.apache.org/jira/browse/FLINK-24048