[ 
https://issues.apache.org/jira/browse/FLINK-39966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18090771#comment-18090771
 ] 

lincoln lee edited comment on FLINK-39966 at 6/24/26 3:34 AM:
--------------------------------------------------------------

Fixed in master: e7084a6d634a33ec48e74a4bd1bc3c5f43759a03

2.2: 70f75a7ac37ed4c4620a88c72fe2d917fcb6346e

2.3: ac21574e20cf7ff660ba9a5e414916132c24375b


was (Author: lincoln.86xy):
Fixed in master: e7084a6d634a33ec48e74a4bd1bc3c5f43759a03

> FlinkRelMdModifiedMonotonicity wrongly reports a non-time-attribute Top-1 
> Rank as insert-only
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39966
>                 URL: https://issues.apache.org/jira/browse/FLINK-39966
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 2.3.0, 2.2.1, 2.1.3
>            Reporter: lincoln lee
>            Assignee: lincoln lee
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.2.2, 2.3.1, 2.4.0
>
>
> FLINK-34702 removed the dedicated StreamPhysicalDeduplicate handler from 
> FlinkRelMdModifiedMonotonicity and re-routed deduplication monotonicity 
> derivation through StreamPhysicalRank. The new dispatch guard is:
> case physicalRank: StreamPhysicalRank if RankUtil.isDeduplication(rel) =>
> RankUtil.isDeduplication only checks "Top-1 ROW_NUMBER without rank-number 
> output". It is weaker than the condition the old StreamPhysicalDeduplicate 
> node type implicitly guaranteed — that node only existed when 
> RankUtil.canConvertToDeduplicate held, which additionally requires sorting on 
> a single time attribute (sortOnTimeAttributeOnly).
> As a result, a Top-1 Rank whose ORDER BY is not a single time attribute (a 
> regular column, or multiple columns) is mistakenly handled as an append-only 
> FirstRow deduplication and reported as all-CONSTANT (insert-only) modified 
> monotonicity. In reality such a Rank retracts and re-emits the kept row 
> whenever a new winner arrives, so it produces updates.
> Downstream operators that consume this metadata then make wrong decisions — 
> e.g. a MIN/MAX aggregation picks the non-retract variant instead of 
> MIN_RETRACT/MAX_RETRACT, yielding incorrect query results.
>  
> Reproduce:
> SELECT b, MIN(c) AS min_c
> FROM (
>   SELECT a, b, c,
>          ROW_NUMBER() OVER (PARTITION BY a ORDER BY b) AS rn  -- ORDER BY a 
> non-time column
>   FROM MyTable
> ) WHERE rn = 1
> GROUP BY b
> The inner query is a Top-1 Rank that updates. The downstream MIN(c) is 
> planned with the non-retract MIN (because the Rank output is treated as 
> insert-only) instead of MIN_RETRACT, so retractions from the Rank are not 
> handled and the aggregate result is wrong.
> Root cause: the dispatch guard dropped the sortOnTimeAttributeOnly invariant 
> previously carried by the StreamPhysicalDeduplicate node type.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to