Hi @Yu Yang:
Time-based operations such as windows in both the Table API and SQL require
 information about the notion of time and its origin. Therefore, tables can 
offer
 logical time attributes for indicating time and accessing corresponding 
timestamps
 in table programs.[1]
This mean Window can only be defined over a time attribute column.
You need define a rowtime in your source just like (UserActionTime is a long 
field, you don't need convert it to Timestamp):
Table table = tEnv.fromDataStream(stream, "Username, Data, 
UserActionTime.rowtime");
See more information in below document:
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time

Best, JingsongLee


------------------------------------------------------------------
From:Yu Yang <yuyan...@gmail.com>
Send Time:2019年6月5日(星期三) 14:57
To:user <user@flink.apache.org>
Subject:can flink sql handle udf-generated timestamp field

Hi, 

I am trying to use Flink SQL to do aggregation on a hopping window. In the data 
stream, we store the timestamp in long type. So I wrote a UDF 'FROM_UNIXTIME' 
to convert long to Timestamp type. 

  public static class TimestampModifier extends ScalarFunction {
    public Timestamp eval(long t) {
      return new Timestamp(t);
    }
    public TypeInformation<?> getResultType(Class<?>[] signature) {
      return Types.SQL_TIMESTAMP;
    }
  }

With the above UDF, I wrote the following query, and ran into  
"ProgramInvocationException: The main method caused an error: Window can only 
be defined over a time attribute column". 
Any suggestions on how to resolve this issue? I am using Flink 1.8 for this 
experiment. 

my sql query: 

select  keyid, sum(value) 
from ( 
   select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value 
   from orders) 
 group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid 

flink exception: 

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Window can only be defined over a time attribute column.
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
 at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
 at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
 at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
 at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
 at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.table.api.ValidationException: Window can only be 
defined over a time attribute column.
 at 
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
 at 
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99)
 at 
org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
 at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
 at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559)
 at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418)

Regards, 
-Yu

Reply via email to