lincoln lee created FLINK-24704:
-----------------------------------

             Summary: Exception occurs when the input record loses monotonicity 
on the sort key field of UpdatableTopNFunction
                 Key: FLINK-24704
                 URL: https://issues.apache.org/jira/browse/FLINK-24704
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.14.0
            Reporter: lincoln lee
             Fix For: 1.15.0


An IllegalArgumentException occurred when the input retract record's sort key 
is lower than old sort key, this's because it breaks the monotonicity on sort 
key field which is guaranteed by the sql semantic. It's highly possible 
upstream stateful operator has shorter state ttl than the stream records is 
that cause the staled record cleared by state ttl. 

A reproduce case like below:

{{{code:title=RankHarnessTest.java|borderStyle=solid}}}

val sql =
 """
 |SELECT word, cnt, rank_num
 |FROM (
 | SELECT word, cnt,
 | ROW_NUMBER() OVER (PARTITION BY type ORDER BY cnt DESC) as rank_num
 | FROM (
 | select word, type, sum(cnt) filter (where cnt > 0) cnt from T group by word, 
type
 | )
 | )
 |WHERE rank_num <= 6
 """.stripMargin

{code}

when then aggregated result column `cnt` becomes lower for a key, then 
downstream retract rank operator will fail on such exception:

 
{quote}java.lang.IllegalArgumentExceptionjava.lang.IllegalArgumentException at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122) at 
org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.emitRecordsWithRowNumber(UpdatableTopNFunction.java:399)
 at 
org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElementWithRowNumber(UpdatableTopNFunction.java:274)
 at 
org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:167)
 at 
org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:69)
 at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
 at 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:209)
{quote}
Here we should align with the RetractableTopNFunction, continue processing(but 
incorrectly result) by default or can be configured to failover after 
Flink-24666 was addressed.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to