[jira] [Updated] (FLINK-11136) Fix the logical of merge for DISTINCT aggregates
[ https://issues.apache.org/jira/browse/FLINK-11136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-11136: -- Issue Type: Bug (was: Test) > Fix the logical of merge for DISTINCT aggregates > > > Key: FLINK-11136 > URL: https://issues.apache.org/jira/browse/FLINK-11136 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > > The logic of merge for DISTINCT aggregates has bug. For the following query: > {code:java} > SELECT > c, > COUNT(DISTINCT b), > SUM(DISTINCT b), > SESSION_END(rowtime, INTERVAL '0.005' SECOND) > FROM MyTable > GROUP BY SESSION(rowtime, INTERVAL '0.005' SECOND), c{code} > the following exception will be thrown: > {code:java} > Caused by: java.lang.ClassCastException: org.apache.flink.types.Row cannot be > cast to java.lang.Integer > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101) > at scala.math.Numeric$IntIsIntegral$.plus(Numeric.scala:58) > at > org.apache.flink.table.functions.aggfunctions.SumAggFunction.accumulate(SumAggFunction.scala:50) > at GroupingWindowAggregateHelper$18.mergeAccumulatorsPair(Unknown Source) > at > org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:66) > at > org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:33) > at > org.apache.flink.runtime.state.heap.HeapAggregatingState.mergeState(HeapAggregatingState.java:117) > at > org.apache.flink.runtime.state.heap.AbstractHeapMergingState$MergeTransformation.apply(AbstractHeapMergingState.java:102) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:463) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341) > at > org.apache.flink.runtime.state.heap.AbstractHeapMergingState.mergeNamespaces(AbstractHeapMergingState.java:91) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:341) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311) > at > org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11136) Fix the logical of merge for DISTINCT aggregates
[ https://issues.apache.org/jira/browse/FLINK-11136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11136: --- Labels: pull-request-available (was: ) > Fix the logical of merge for DISTINCT aggregates > > > Key: FLINK-11136 > URL: https://issues.apache.org/jira/browse/FLINK-11136 > Project: Flink > Issue Type: Test > Components: Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > > The logic of merge for DISTINCT aggregates has bug. For the following query: > {code:java} > SELECT > c, > COUNT(DISTINCT b), > SUM(DISTINCT b), > SESSION_END(rowtime, INTERVAL '0.005' SECOND) > FROM MyTable > GROUP BY SESSION(rowtime, INTERVAL '0.005' SECOND), c{code} > the following exception will be thrown: > {code:java} > Caused by: java.lang.ClassCastException: org.apache.flink.types.Row cannot be > cast to java.lang.Integer > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101) > at scala.math.Numeric$IntIsIntegral$.plus(Numeric.scala:58) > at > org.apache.flink.table.functions.aggfunctions.SumAggFunction.accumulate(SumAggFunction.scala:50) > at GroupingWindowAggregateHelper$18.mergeAccumulatorsPair(Unknown Source) > at > org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:66) > at > org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:33) > at > org.apache.flink.runtime.state.heap.HeapAggregatingState.mergeState(HeapAggregatingState.java:117) > at > org.apache.flink.runtime.state.heap.AbstractHeapMergingState$MergeTransformation.apply(AbstractHeapMergingState.java:102) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:463) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341) > at > org.apache.flink.runtime.state.heap.AbstractHeapMergingState.mergeNamespaces(AbstractHeapMergingState.java:91) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:341) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311) > at > org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)