lincoln lee created FLINK-24704:
-----------------------------------
Summary: Exception occurs when the input record loses monotonicity
on the sort key field of UpdatableTopNFunction
Key: FLINK-24704
URL: https://issues.apache.org/jira/browse/FLINK-24704
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.14.0
Reporter: lincoln lee
Fix For: 1.15.0
An IllegalArgumentException occurred when the input retract record's sort key
is lower than old sort key, this's because it breaks the monotonicity on sort
key field which is guaranteed by the sql semantic. It's highly possible
upstream stateful operator has shorter state ttl than the stream records is
that cause the staled record cleared by state ttl.
A reproduce case like below:
{{{code:title=RankHarnessTest.java|borderStyle=solid}}}
val sql =
"""
|SELECT word, cnt, rank_num
|FROM (
| SELECT word, cnt,
| ROW_NUMBER() OVER (PARTITION BY type ORDER BY cnt DESC) as rank_num
| FROM (
| select word, type, sum(cnt) filter (where cnt > 0) cnt from T group by word,
type
| )
| )
|WHERE rank_num <= 6
""".stripMargin
{code}
when then aggregated result column `cnt` becomes lower for a key, then
downstream retract rank operator will fail on such exception:
{quote}java.lang.IllegalArgumentExceptionjava.lang.IllegalArgumentException at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122) at
org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.emitRecordsWithRowNumber(UpdatableTopNFunction.java:399)
at
org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElementWithRowNumber(UpdatableTopNFunction.java:274)
at
org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:167)
at
org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:69)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
at
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:209)
{quote}
Here we should align with the RetractableTopNFunction, continue processing(but
incorrectly result) by default or can be configured to failover after
Flink-24666 was addressed.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)