[
https://issues.apache.org/jira/browse/FLINK-34702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17883911#comment-17883911
]
lincoln lee commented on FLINK-34702:
-------------------------------------
Thank you guys bringing this up again!
First of all, I think we can all agree that deduplicate is a special case of
rank
2nd, the options we've discussed so far are not ideal(whether it's
physicaldedup -> exec or logical -> physical dedup, to do something special to
change the conversion)
Going back to the problem itself, something new come to my mind: maybe we
should consider removing the physical dedup node, and instead just keep the
physical rank node(because dedup is just one specialized execution/operator
which satisfies several conditions, the input changelog mode, rank range, sort
spec and other traits)
I'm preparing a draft pr based on this idea, maybe take some time(probably a
day or two) but I think it can completely solve the problems.
> Rank should not convert to StreamExecDuplicate when the input is not insert
> only
> --------------------------------------------------------------------------------
>
> Key: FLINK-34702
> URL: https://issues.apache.org/jira/browse/FLINK-34702
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.20.0
> Reporter: Jacky Lau
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.0.0
>
>
> {code:java}
> @Test
> def testSimpleFirstRowOnBuiltinProctime1(): Unit = {
> val sqlQuery =
> """
> |SELECT *
> |FROM (
> | SELECT *,
> | ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as
> rowNum
> | FROM (select a, count(b) as b from MyTable group by a)
> |)
> |WHERE rowNum = 1
> """.stripMargin
> util.verifyExecPlan(sqlQuery)
> } {code}
> Exception:
> org.apache.flink.table.api.TableException: StreamPhysicalDeduplicate doesn't
> support consuming update changes which is produced by node
> GroupAggregate(groupBy=[a], select=[a, COUNT(b) AS b])
> because the StreamPhysicalDeduplicate can not consuming update changes now
> while StreamExecRank can.
> so we should not convert the FlinkLogicalRank to StreamPhysicalDeduplicate in
> this case. and we can defer whether input contains update change in the
> "optimize the physical plan" phase.
> so we can add an option to solve it. and when the StreamPhysicalDeduplicate
> can support consuming update changes , we can deprecate it
--
This message was sent by Atlassian Jira
(v8.20.10#820010)