[ https://issues.apache.org/jira/browse/FLINK-4604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15568664#comment-15568664 ]
Anton Mushin commented on FLINK-4604: ------------------------------------- I tried check function in {{org.apache.flink.api.table.plan.rules.dataSet.DataSetAggregateRule#matches}}, but something went wrong :) I did so {code:title=org.apache.flink.api.table.plan.rules.dataSet.DataSetAggregateRule} override def matches(call: RelOptRuleCall): Boolean = { val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate] // check if we have distinct aggregates val distinctAggs = agg.getAggCallList.exists(_.isDistinct) if (distinctAggs) { throw new TableException("DISTINCT aggregates are currently not supported.") } // check if we have grouping sets val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet if (groupSets || agg.indicator) { throw new TableException("GROUPING SETS are currently not supported.") } (!distinctAggs && !groupSets && !agg.indicator) && !AggregateReduceFunctionsRule.INSTANCE.matches(call) } {code} And I got next plan and exception: {noformat} DataSetCalc(select=[CAST(/(-(CASE(=($f1, 0), null, $f0), /(*(CASE(=($f3, 0), null, $f2), CASE(=($f3, 0), null, $f2)), $f3)), CASE(=($f3, 1), null, -($f3, 1)))) AS $f0, CAST(/(-(CASE(=($f5, 0), null, $f4), /(*(CASE(=($f7, 0), null, $f6), CASE(=($f7, 0), null, $f6)), $f7)), CASE(=($f7, 1), null, -($f7, 1)))) AS $f1, CAST(/(-(CASE(=($f9, 0), null, $f8), /(*(CASE(=($f11, 0), null, $f10), CASE(=($f11, 0), null, $f10)), $f11)), CASE(=($f11, 1), null, -($f11, 1)))) AS $f2, CAST(/(-(CASE(=($f13, 0), null, $f12), /(*(CASE(=($f15, 0), null, $f14), CASE(=($f15, 0), null, $f14)), $f15)), CASE(=($f15, 1), null, -($f15, 1)))) AS $f3, CAST(/(-(CASE(=($f17, 0), null, $f16), /(*(CASE(=($f19, 0), null, $f18), CASE(=($f19, 0), null, $f18)), $f19)), CASE(=($f19, 1), null, -($f19, 1)))) AS $f4, CAST(/(-(CASE(=($f21, 0), null, $f20), /(*(CASE(=($f23, 0), null, $f22), CASE(=($f23, 0), null, $f22)), $f23)), CASE(=($f23, 1), null, -($f23, 1)))) AS $f5]) DataSetAggregate(select=[$SUM0($f6) AS $f0, COUNT($f6) AS $f1, $SUM0(_1) AS $f2, COUNT(_1) AS $f3, $SUM0($f7) AS $f4, COUNT($f7) AS $f5, $SUM0(_2) AS $f6, COUNT(_2) AS $f7, $SUM0($f8) AS $f8, COUNT($f8) AS $f9, $SUM0(_3) AS $f10, COUNT(_3) AS $f11, $SUM0($f9) AS $f12, COUNT($f9) AS $f13, $SUM0(_4) AS $f14, COUNT(_4) AS $f15, $SUM0($f10) AS $f16, COUNT($f10) AS $f17, $SUM0(_5) AS $f18, COUNT(_5) AS $f19, $SUM0($f11) AS $f20, COUNT($f11) AS $f21, $SUM0(_6) AS $f22, COUNT(_6) AS $f23]) DataSetCalc(select=[_1, _2, _3, _4, _5, _6]) DataSetScan(table=[[_DataSetTable_0]]) {noformat} {noformat} org.apache.flink.api.table.TableException: Type NULL is not supported. Null values must have a supported type. at org.apache.flink.api.table.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:128) at org.apache.flink.api.table.codegen.CodeGenerator.visitLiteral(CodeGenerator.scala:553) at org.apache.flink.api.table.codegen.CodeGenerator.visitLiteral(CodeGenerator.scala:56) at org.apache.calcite.rex.RexLiteral.accept(RexLiteral.java:658) at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675) at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:675) at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:56) at org.apache.calcite.rex.RexCall.accept(RexCall.java:108) at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675) at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:675) at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:56) at org.apache.calcite.rex.RexCall.accept(RexCall.java:108) at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675) at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:675) at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:56) at org.apache.calcite.rex.RexCall.accept(RexCall.java:108) at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675) at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:675) at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:56) at org.apache.calcite.rex.RexCall.accept(RexCall.java:108) at org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:181) at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:300) at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:300) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:300) at org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52) at org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39) at org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108) at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:274) at org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) at org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:41) at org.apache.flink.api.scala.batch.sql.AggregationsITCase.testVarSampAggregate(AggregationsITCase.scala:369) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.junit.runners.Suite.runChild(Suite.java:127) at org.junit.runners.Suite.runChild(Suite.java:26) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.junit.runner.JUnitCore.run(JUnitCore.java:160) {noformat} then I remove {{!AggregateReduceFunctionsRule.INSTANCE.matches(call)}} and return [this code|https://issues.apache.org/jira/browse/FLINK-4604?focusedCommentId=15554768&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15554768], tests is passed. I search resolution this problem, do you have any ideas about it? > Add support for standard deviation/variance > ------------------------------------------- > > Key: FLINK-4604 > URL: https://issues.apache.org/jira/browse/FLINK-4604 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Timo Walther > Assignee: Anton Mushin > Attachments: 1.jpg > > > Calcite's {{AggregateReduceFunctionsRule}} can convert SQL {{AVG, STDDEV_POP, > STDDEV_SAMP, VAR_POP, VAR_SAMP}} to sum/count functions. We should add, test > and document this rule. > If we also want to add this aggregates to Table API is up for discussion. -- This message was sent by Atlassian JIRA (v6.3.4#6332)