[ https://issues.apache.org/jira/browse/FLINK-34665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17826981#comment-17826981 ]
Jacky Lau edited comment on FLINK-34665 at 3/14/24 7:46 AM: ------------------------------------------------------------ hi [~jark] [~xuyangzhong] what do you think? and if you think it is ok, i can do it was (Author: jackylau): hi [~jark] [~xuyangzhong] what do you think? > Add streaming rule for union to Rand and it convert to StreamExecDeduplicate > finally > ------------------------------------------------------------------------------------ > > Key: FLINK-34665 > URL: https://issues.apache.org/jira/browse/FLINK-34665 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.20.0 > Reporter: Jacky Lau > Priority: Major > Fix For: 1.20.0 > > > The semantics of a union in SQL involves deduplication, and in Calcite, when > converting a SQL node to a RelNode, a Distinct Aggregate is inserted above > the Union to achieve this deduplication. In Flink, the Distinct Aggregate > eventually gets converted into a StreamExecGroupAggregate operator. This > operator accesses the state multiple times, and from our observations of > numerous jobs, we can see that the stack often gets stuck at state access. > This is because the key for the distinct aggregate is all the fields of the > union, meaning that for the state, the key will be relatively large, and > repeated access and comparisons to the state can be time-consuming. > In fact, a potential optimization is to add a rule to convert the Union into > a Rank with processing time, which then ultimately gets converted into a > StreamExecDeduplicate. Currently, we have users rewrite their SQL to use > Row_number for deduplication, and this approach works very well. Therefore, > it is possible to add a rule at the engine level to support this optimization. > > and it will break the change of plan, it will cause user upgrade flink > version failed. so i suggest add a flag.default value is not change the > behavior -- This message was sent by Atlassian Jira (v8.20.10#820010)