[jira] [Updated] (FLINK-20440) `LAST_VALUE` aggregate function can not be used in Hop window
[ https://issues.apache.org/jira/browse/FLINK-20440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zouyunhe updated FLINK-20440: - Description: Hi, I run a sql job which use `last_value` aggregate function in a hop window, the sql as shown below {code:java} create table test_in( id BIGINT, `name` VARCHAR, cost INT, proctime as PROCTIME() ) with ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'test_in', 'connector.startup-mode' = 'latest-offset', 'connector.properties.bootstrap.servers' = '', 'connector.properties.group.id' = 'cdbddd', 'connector.properties.zookeeper.connect' = '', 'format.type' = 'csv' ); create table test_mysql( id BIGINT, `name` VARCHAR, COST DOUBLE ) with ( 'connector.type' = 'jdbc', 'connector.url' = '', 'connector.table' = 'abc', 'connector.username' = 'abcdd', 'connector.write.flush.interval' = '2s' );insert into `test_mysql` select a.id, last_value(a.`name`), last_value(a.cost) from test_in as a group by id, HOP(PROCTIME(), interval '10' second, interval '30' second); {code} and when submit the job, the exception throws {code:java} org.apache.flink.table.api.ValidationException: Function class 'org.apache.flink.table.planner.functions.aggfunctions.LastValueAggFunction.StringLastValueAggFunction' does not implement at least one method named 'merge' which is public, not abstract and (in case of table functions) not static. at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.checkAndExtractMethods(UserDefinedFunctionUtils.scala:442) at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:318) at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:273) at org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:474) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.$anonfun$checkNeededMethods$1(AggsHandlerCodeGenerator.scala:1116) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.$anonfun$checkNeededMethods$1$adapted(AggsHandlerCodeGenerator.scala:1116) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1116) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:929) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:578) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.createAggsHandler(StreamExecGroupWindowAggregateBase.scala:248) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:162) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:165) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:105) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
[jira] [Updated] (FLINK-20440) `LAST_VALUE` aggregate function can not be used in hop window
[ https://issues.apache.org/jira/browse/FLINK-20440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zouyunhe updated FLINK-20440: - Summary: `LAST_VALUE` aggregate function can not be used in hop window (was: `LAST_VALUE` aggregate function can not be used in Hop window) > `LAST_VALUE` aggregate function can not be used in hop window > - > > Key: FLINK-20440 > URL: https://issues.apache.org/jira/browse/FLINK-20440 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.0 >Reporter: zouyunhe >Priority: Major > > Hi, I run a sql job which use `last_value` aggregate function in a hop > window, the sql as shown below > {code:java} > create table test_in( > id BIGINT, > `name` VARCHAR, > cost INT, > proctime as PROCTIME() > ) with ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'test_in', > 'connector.startup-mode' = 'latest-offset', > 'connector.properties.bootstrap.servers' = '', > 'connector.properties.group.id' = 'cdbddd', > 'connector.properties.zookeeper.connect' = '', > 'format.type' = 'csv' > ); > create table test_mysql( > id BIGINT, > `name` VARCHAR, > COST DOUBLE > ) with ( > 'connector.type' = 'jdbc', > 'connector.url' = '', > 'connector.table' = 'abc', > 'connector.username' = 'abcdd', > 'connector.write.flush.interval' = '2s' > );insert into > `test_mysql` > select > a.id, > last_value(a.`name`), > last_value(a.cost) > from > test_in as a group by id, HOP(PROCTIME(), interval '10' second, interval > '30' second); > {code} > and when submit the job, the exception throws > {code:java} > org.apache.flink.table.api.ValidationException: Function class > 'org.apache.flink.table.planner.functions.aggfunctions.LastValueAggFunction.StringLastValueAggFunction' > does not implement at least one method named 'merge' which is public, not > abstract and (in case of table functions) not static. > at > org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.checkAndExtractMethods(UserDefinedFunctionUtils.scala:442) > at > org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:318) > at > org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:273) > at > org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:474) > at > org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.$anonfun$checkNeededMethods$1(AggsHandlerCodeGenerator.scala:1116) > at > org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.$anonfun$checkNeededMethods$1$adapted(AggsHandlerCodeGenerator.scala:1116) > at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) > at > scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) > at > org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1116) > at > org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:929) > at > org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:578) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.createAggsHandler(StreamExecGroupWindowAggregateBase.scala:248) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:162) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > at >