Hi, I am trying to use Flink SQL to do aggregation on a hopping window. In the data stream, we store the timestamp in long type. So I wrote a UDF 'FROM_UNIXTIME' to convert long to Timestamp type.
public static class TimestampModifier extends ScalarFunction { public Timestamp eval(long t) { return new Timestamp(t); } public TypeInformation<?> getResultType(Class<?>[] signature) { return Types.SQL_TIMESTAMP; } } With the above UDF, I wrote the following query, and ran into "ProgramInvocationException: The main method caused an error: Window can only be defined over a time attribute column". Any suggestions on how to resolve this issue? I am using Flink 1.8 for this experiment. my sql query: select keyid, sum(value) from ( select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value from orders) group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid flink exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Window can only be defined over a time attribute column. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Caused by: org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column. at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85) at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99) at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418) Regards, -Yu