[jira] [Updated] (FLINK-20440) `LAST_VALUE` aggregate function can not be used in Hop window

2020-12-01 Thread zouyunhe (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zouyunhe updated FLINK-20440:
-
Description: 
Hi,  I run a sql job which use `last_value`  aggregate function in a hop 
window,  the sql as shown below
{code:java}
create table test_in(
  id BIGINT,
  `name` VARCHAR,
  cost INT,
  proctime as PROCTIME()
) with (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'test_in',
  'connector.startup-mode' = 'latest-offset',
  'connector.properties.bootstrap.servers' = '',
  'connector.properties.group.id' = 'cdbddd',
  'connector.properties.zookeeper.connect' = '',
  'format.type' = 'csv'
);
create table test_mysql(
  id BIGINT,
  `name` VARCHAR,
  COST DOUBLE
) with (
  'connector.type' = 'jdbc',
'connector.url' = '',
  'connector.table' = 'abc',
  'connector.username' = 'abcdd',
  'connector.write.flush.interval' = '2s'
);insert into
  `test_mysql`
select
  a.id,
  last_value(a.`name`),
  last_value(a.cost)
from
  test_in as a group by id, HOP(PROCTIME(), interval '10' second, interval '30' 
second);
{code}
and when submit the job, the exception throws 
{code:java}
org.apache.flink.table.api.ValidationException: Function class 
'org.apache.flink.table.planner.functions.aggfunctions.LastValueAggFunction.StringLastValueAggFunction'
 does not implement at least one method named 'merge' which is public, not 
abstract and (in case of table functions) not static.
at 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.checkAndExtractMethods(UserDefinedFunctionUtils.scala:442)
at 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:318)
at 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:273)
at 
org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:474)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.$anonfun$checkNeededMethods$1(AggsHandlerCodeGenerator.scala:1116)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.$anonfun$checkNeededMethods$1$adapted(AggsHandlerCodeGenerator.scala:1116)
at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1116)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:929)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:578)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.createAggsHandler(StreamExecGroupWindowAggregateBase.scala:248)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:162)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:165)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:105)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
   

[jira] [Updated] (FLINK-20440) `LAST_VALUE` aggregate function can not be used in hop window

2020-12-01 Thread zouyunhe (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zouyunhe updated FLINK-20440:
-
Summary: `LAST_VALUE` aggregate function can not be used in hop window  
(was: `LAST_VALUE` aggregate function can not be used in Hop window)

> `LAST_VALUE` aggregate function can not be used in hop window
> -
>
> Key: FLINK-20440
> URL: https://issues.apache.org/jira/browse/FLINK-20440
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.0
>Reporter: zouyunhe
>Priority: Major
>
> Hi,  I run a sql job which use `last_value`  aggregate function in a hop 
> window,  the sql as shown below
> {code:java}
> create table test_in(
>   id BIGINT,
>   `name` VARCHAR,
>   cost INT,
>   proctime as PROCTIME()
> ) with (
>   'connector.type' = 'kafka',
>   'connector.version' = 'universal',
>   'connector.topic' = 'test_in',
>   'connector.startup-mode' = 'latest-offset',
>   'connector.properties.bootstrap.servers' = '',
>   'connector.properties.group.id' = 'cdbddd',
>   'connector.properties.zookeeper.connect' = '',
>   'format.type' = 'csv'
> );
> create table test_mysql(
>   id BIGINT,
>   `name` VARCHAR,
>   COST DOUBLE
> ) with (
>   'connector.type' = 'jdbc',
> 'connector.url' = '',
>   'connector.table' = 'abc',
>   'connector.username' = 'abcdd',
>   'connector.write.flush.interval' = '2s'
> );insert into
>   `test_mysql`
> select
>   a.id,
>   last_value(a.`name`),
>   last_value(a.cost)
> from
>   test_in as a group by id, HOP(PROCTIME(), interval '10' second, interval 
> '30' second);
> {code}
> and when submit the job, the exception throws 
> {code:java}
> org.apache.flink.table.api.ValidationException: Function class 
> 'org.apache.flink.table.planner.functions.aggfunctions.LastValueAggFunction.StringLastValueAggFunction'
>  does not implement at least one method named 'merge' which is public, not 
> abstract and (in case of table functions) not static.
> at 
> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.checkAndExtractMethods(UserDefinedFunctionUtils.scala:442)
> at 
> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:318)
> at 
> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:273)
> at 
> org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:474)
> at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.$anonfun$checkNeededMethods$1(AggsHandlerCodeGenerator.scala:1116)
> at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.$anonfun$checkNeededMethods$1$adapted(AggsHandlerCodeGenerator.scala:1116)
> at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
> at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
> at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1116)
> at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:929)
> at 
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:578)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.createAggsHandler(StreamExecGroupWindowAggregateBase.scala:248)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:162)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> at 
>