Hi,

I am trying to use Flink SQL to do aggregation on a hopping window. In the
data stream, we store the timestamp in long type. So I wrote a UDF
'FROM_UNIXTIME' to convert long to Timestamp type.

  public static class TimestampModifier extends ScalarFunction {
    public Timestamp eval(long t) {
      return new Timestamp(t);
    }
    public TypeInformation<?> getResultType(Class<?>[] signature) {
      return Types.SQL_TIMESTAMP;
    }
  }

With the above UDF, I wrote the following query, and ran into
 "ProgramInvocationException: The main method caused an error: Window can
only be defined over a time attribute column".
Any suggestions on how to resolve this issue? I am using Flink 1.8 for this
experiment.

my sql query:

select  keyid, sum(value)
from (
   select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value
   from orders)
 group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid

flink exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Window can only be defined over a time attribute column.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.table.api.ValidationException: Window can only
be defined over a time attribute column.
at
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
at
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99)
at
org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418)

Regards,
-Yu

Reply via email to