[ https://issues.apache.org/jira/browse/FLINK-7338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236687#comment-16236687 ]
Fabian Hueske commented on FLINK-7338: -------------------------------------- Thank you [~stefano.bortoli] for reporting the bug and providing the fix! I'll fix this for the 1.4.0 release. Thanks, Fabian > User Defined aggregation with constants causes error under in lowerbound over > window extraction > ----------------------------------------------------------------------------------------------- > > Key: FLINK-7338 > URL: https://issues.apache.org/jira/browse/FLINK-7338 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.3.1 > Reporter: Stefano Bortoli > Assignee: Fabian Hueske > Priority: Critical > > A user defined aggregation that passes a constant among the arguments causes > a RuntimeException extracting the lower boundary over window. > {code} > val sqlQuery = "SELECT a, " + > " myAgg(a, CAST('1' as BIGINT)) "+ > " OVER (ORDER BY proctime RANGE BETWEEN INTERVAL '30' SECOND > PRECEDING AND CURRENT ROW) " + > "FROM MyTable" > {code} > The error is in the org.apache.flink.table.plan.nodes.OverAggregate.scala > we do : field count - lower bound index > -- which causes a -1 get, and subsequent RuntimeException. > We should do: lower bound offset - field count to find the value in the > constant array. > The code below should fix the problem. > {code} > private[flink] def getLowerBoundary( > logicWindow: Window, > overWindow: Group, > input: RelNode): Long = { > val ref: RexInputRef = > overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef] > val lowerBoundIndex = ref.getIndex - input.getRowType.getFieldCount > val lowerBound = logicWindow.constants.get(lowerBoundIndex).getValue2 > lowerBound match { > case x: java.math.BigDecimal => > x.asInstanceOf[java.math.BigDecimal].longValue() > case _ => lowerBound.asInstanceOf[Long] > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)