[ https://issues.apache.org/jira/browse/FLINK-9021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16404738#comment-16404738 ]
Xingcan Cui commented on FLINK-9021: ------------------------------------ Hi [~yizhou_liujiarui], yes, the distinct aggregate function has not been supported yet [1]. But I think it will be added in future versions. [1] [https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#unsupported-functions|https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#unsupported-functions] > org.apache.flink.table.codegen.CodeGenException: Unsupported call: TUMBLE > -------------------------------------------------------------------------- > > Key: FLINK-9021 > URL: https://issues.apache.org/jira/browse/FLINK-9021 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.4.2 > Environment: java 8 > flink 1.4.2 > scala 2.11 > Reporter: yizhou > Priority: Major > > I have a stream like this: {{<_time(timestamp), uri(string), userId(int)>}}. > The {{_time}} attribute is rowtime and I register it as a table: > {{tableEnv.registerDataStream("userVisitPage", stream, "_time.rowtime, > uri,userId");}} > {{Then I query the table:}} > {code:java} > final String sql = "SELECT tumble_start(_time, interval '10' second) as > timestart, " > + " count(distinct userId) as uv, " > + " uri as uri, " > + " count(1) as pv " > + "FROM userVisitPage " > + "GROUP BY tumble(_time, interval '10' second), uri"; > final Table table = tableEnv.sqlQuery(sql); > tableEnv.toRetractStream(table, Row.class); > {code} > {{but occur exceptions:}}{{}} > > > {code:java} > 2018-03-19 19:30:53,881 ERROR > [com.qunhe.logcomplex.oceanus.util.TaskSubmitter] - main - submit task failed > org.apache.flink.table.codegen.CodeGenException: Unsupported call: TUMBLE If > you think this function should be supported, you can create an issue and > start a discussion for it. at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1006) > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1006) > at scala.Option.getOrElse(Option.scala:121) at > org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1006) > at > org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:67) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) at > org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:234) > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$7.apply(CodeGenerator.scala:321) > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$7.apply(CodeGenerator.scala:321) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:321) > at > org.apache.flink.table.plan.nodes.CommonCalc$class.generateFunction(CommonCalc.scala:44) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.generateFunction(DataStreamCalc.scala:43) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:116) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamGroupAggregate.translateToPlan(DataStreamGroupAggregate.scala:113) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamGroupAggregate.translateToPlan(DataStreamGroupAggregate.scala:113) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97) > at > org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:837) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:764) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:734) > at > org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:414) > at > org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:357 > {code} > > how can I implement this query -- This message was sent by Atlassian JIRA (v7.6.3#76005)