[ 
https://issues.apache.org/jira/browse/FLINK-4604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15568664#comment-15568664
 ] 

Anton Mushin commented on FLINK-4604:
-------------------------------------

I tried check function in 
{{org.apache.flink.api.table.plan.rules.dataSet.DataSetAggregateRule#matches}}, 
but something went wrong :)
I did so
{code:title=org.apache.flink.api.table.plan.rules.dataSet.DataSetAggregateRule}
override def matches(call: RelOptRuleCall): Boolean = {
    val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]

    // check if we have distinct aggregates
    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
    if (distinctAggs) {
      throw new TableException("DISTINCT aggregates are currently not 
supported.")
    }

    // check if we have grouping sets
    val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != 
agg.getGroupSet
    if (groupSets || agg.indicator) {
      throw new TableException("GROUPING SETS are currently not supported.")
    }

    (!distinctAggs && !groupSets && !agg.indicator) && 
!AggregateReduceFunctionsRule.INSTANCE.matches(call)
  }
{code}
And I got next plan and exception:
{noformat}
DataSetCalc(select=[CAST(/(-(CASE(=($f1, 0), null, $f0), /(*(CASE(=($f3, 0), 
null, $f2), CASE(=($f3, 0), null, $f2)), $f3)), CASE(=($f3, 1), null, -($f3, 
1)))) AS $f0, CAST(/(-(CASE(=($f5, 0), null, $f4), /(*(CASE(=($f7, 0), null, 
$f6), CASE(=($f7, 0), null, $f6)), $f7)), CASE(=($f7, 1), null, -($f7, 1)))) AS 
$f1, CAST(/(-(CASE(=($f9, 0), null, $f8), /(*(CASE(=($f11, 0), null, $f10), 
CASE(=($f11, 0), null, $f10)), $f11)), CASE(=($f11, 1), null, -($f11, 1)))) AS 
$f2, CAST(/(-(CASE(=($f13, 0), null, $f12), /(*(CASE(=($f15, 0), null, $f14), 
CASE(=($f15, 0), null, $f14)), $f15)), CASE(=($f15, 1), null, -($f15, 1)))) AS 
$f3, CAST(/(-(CASE(=($f17, 0), null, $f16), /(*(CASE(=($f19, 0), null, $f18), 
CASE(=($f19, 0), null, $f18)), $f19)), CASE(=($f19, 1), null, -($f19, 1)))) AS 
$f4, CAST(/(-(CASE(=($f21, 0), null, $f20), /(*(CASE(=($f23, 0), null, $f22), 
CASE(=($f23, 0), null, $f22)), $f23)), CASE(=($f23, 1), null, -($f23, 1)))) AS 
$f5])
  DataSetAggregate(select=[$SUM0($f6) AS $f0, COUNT($f6) AS $f1, $SUM0(_1) AS 
$f2, COUNT(_1) AS $f3, $SUM0($f7) AS $f4, COUNT($f7) AS $f5, $SUM0(_2) AS $f6, 
COUNT(_2) AS $f7, $SUM0($f8) AS $f8, COUNT($f8) AS $f9, $SUM0(_3) AS $f10, 
COUNT(_3) AS $f11, $SUM0($f9) AS $f12, COUNT($f9) AS $f13, $SUM0(_4) AS $f14, 
COUNT(_4) AS $f15, $SUM0($f10) AS $f16, COUNT($f10) AS $f17, $SUM0(_5) AS $f18, 
COUNT(_5) AS $f19, $SUM0($f11) AS $f20, COUNT($f11) AS $f21, $SUM0(_6) AS $f22, 
COUNT(_6) AS $f23])
    DataSetCalc(select=[_1, _2, _3, _4, _5, _6])
      DataSetScan(table=[[_DataSetTable_0]])
{noformat}
{noformat}
org.apache.flink.api.table.TableException: Type NULL is not supported. Null 
values must have a supported type.

        at 
org.apache.flink.api.table.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:128)
        at 
org.apache.flink.api.table.codegen.CodeGenerator.visitLiteral(CodeGenerator.scala:553)
        at 
org.apache.flink.api.table.codegen.CodeGenerator.visitLiteral(CodeGenerator.scala:56)
        at org.apache.calcite.rex.RexLiteral.accept(RexLiteral.java:658)
        at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
        at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at 
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:675)
        at 
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:56)
        at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
        at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
        at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at 
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:675)
        at 
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:56)
        at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
        at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
        at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at 
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:675)
        at 
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:56)
        at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
        at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
        at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at 
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:675)
        at 
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:56)
        at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
        at 
org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:181)
        at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:300)
        at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:300)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at 
org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:300)
        at 
org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52)
        at 
org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39)
        at 
org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108)
        at 
org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:274)
        at 
org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139)
        at 
org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:41)
        at 
org.apache.flink.api.scala.batch.sql.AggregationsITCase.testVarSampAggregate(AggregationsITCase.scala:369)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
        at org.junit.rules.RunRules.evaluate(RunRules.java:20)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
        at org.junit.runners.Suite.runChild(Suite.java:127)
        at org.junit.runners.Suite.runChild(Suite.java:26)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
        at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
{noformat}
then I remove {{!AggregateReduceFunctionsRule.INSTANCE.matches(call)}} and 
return [this 
code|https://issues.apache.org/jira/browse/FLINK-4604?focusedCommentId=15554768&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15554768],
 tests is passed.
I search resolution this problem, do you have any ideas about it?

> Add support for standard deviation/variance
> -------------------------------------------
>
>                 Key: FLINK-4604
>                 URL: https://issues.apache.org/jira/browse/FLINK-4604
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Timo Walther
>            Assignee: Anton Mushin
>         Attachments: 1.jpg
>
>
> Calcite's {{AggregateReduceFunctionsRule}} can convert SQL {{AVG, STDDEV_POP, 
> STDDEV_SAMP, VAR_POP, VAR_SAMP}} to sum/count functions. We should add, test 
> and document this rule. 
> If we also want to add this aggregates to Table API is up for discussion.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to