Hi @Yu Yang: Time-based operations such as windows in both the Table API and SQL require information about the notion of time and its origin. Therefore, tables can offer logical time attributes for indicating time and accessing corresponding timestamps in table programs.[1] This mean Window can only be defined over a time attribute column. You need define a rowtime in your source just like (UserActionTime is a long field, you don't need convert it to Timestamp): Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime"); See more information in below document: [1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time
Best, JingsongLee ------------------------------------------------------------------ From:Yu Yang <yuyan...@gmail.com> Send Time:2019年6月5日(星期三) 14:57 To:user <user@flink.apache.org> Subject:can flink sql handle udf-generated timestamp field Hi, I am trying to use Flink SQL to do aggregation on a hopping window. In the data stream, we store the timestamp in long type. So I wrote a UDF 'FROM_UNIXTIME' to convert long to Timestamp type. public static class TimestampModifier extends ScalarFunction { public Timestamp eval(long t) { return new Timestamp(t); } public TypeInformation<?> getResultType(Class<?>[] signature) { return Types.SQL_TIMESTAMP; } } With the above UDF, I wrote the following query, and ran into "ProgramInvocationException: The main method caused an error: Window can only be defined over a time attribute column". Any suggestions on how to resolve this issue? I am using Flink 1.8 for this experiment. my sql query: select keyid, sum(value) from ( select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value from orders) group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid flink exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Window can only be defined over a time attribute column. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Caused by: org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column. at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85) at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99) at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418) Regards, -Yu