[ https://issues.apache.org/jira/browse/FLINK-11136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Fabian Hueske closed FLINK-11136. --------------------------------- Resolution: Fixed Fix Version/s: 1.7.1 1.8.0 1.6.3 Fixed for 1.8.0 with ed0aefa6775f655591b4c0fe46382446921b7155 Fixed for 1.7.1 with ff1821a6d2f8317d0c344719b14350ac362143d9 Fixed for 1.6.3 with ff9b7f1b60a4aeb1d925b236b9818002aad830de > Fix the logical of merge for DISTINCT aggregates > ------------------------------------------------ > > Key: FLINK-11136 > URL: https://issues.apache.org/jira/browse/FLINK-11136 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Reporter: Dian Fu > Assignee: Dian Fu > Priority: Major > Labels: pull-request-available > Fix For: 1.6.3, 1.8.0, 1.7.1 > > Time Spent: 10m > Remaining Estimate: 0h > > The logic of merge for DISTINCT aggregates has bug. For the following query: > {code:java} > SELECT > c, > COUNT(DISTINCT b), > SUM(DISTINCT b), > SESSION_END(rowtime, INTERVAL '0.005' SECOND) > FROM MyTable > GROUP BY SESSION(rowtime, INTERVAL '0.005' SECOND), c{code} > the following exception will be thrown: > {code:java} > Caused by: java.lang.ClassCastException: org.apache.flink.types.Row cannot be > cast to java.lang.Integer > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101) > at scala.math.Numeric$IntIsIntegral$.plus(Numeric.scala:58) > at > org.apache.flink.table.functions.aggfunctions.SumAggFunction.accumulate(SumAggFunction.scala:50) > at GroupingWindowAggregateHelper$18.mergeAccumulatorsPair(Unknown Source) > at > org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:66) > at > org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:33) > at > org.apache.flink.runtime.state.heap.HeapAggregatingState.mergeState(HeapAggregatingState.java:117) > at > org.apache.flink.runtime.state.heap.AbstractHeapMergingState$MergeTransformation.apply(AbstractHeapMergingState.java:102) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:463) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341) > at > org.apache.flink.runtime.state.heap.AbstractHeapMergingState.mergeNamespaces(AbstractHeapMergingState.java:91) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:341) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311) > at > org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)