can flink sql handle udf-generated timestamp field

2019-06-04 Thread Yu Yang
Hi,

I am trying to use Flink SQL to do aggregation on a hopping window. In the
data stream, we store the timestamp in long type. So I wrote a UDF
'FROM_UNIXTIME' to convert long to Timestamp type.

  public static class TimestampModifier extends ScalarFunction {
public Timestamp eval(long t) {
  return new Timestamp(t);
}
public TypeInformation getResultType(Class[] signature) {
  return Types.SQL_TIMESTAMP;
}
  }

With the above UDF, I wrote the following query, and ran into
 "ProgramInvocationException: The main method caused an error: Window can
only be defined over a time attribute column".
Any suggestions on how to resolve this issue? I am using Flink 1.8 for this
experiment.

my sql query:

select  keyid, sum(value)
from (
   select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value
   from orders)
 group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid

flink exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Window can only be defined over a time attribute column.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.table.api.ValidationException: Window can only
be defined over a time attribute column.
at
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
at
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99)
at
org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418)

Regards,
-Yu


Re: can flink sql handle udf-generated timestamp field

2019-06-05 Thread JingsongLee
Hi @Yu Yang:
Time-based operations such as windows in both the Table API and SQL require
 information about the notion of time and its origin. Therefore, tables can 
offer
 logical time attributes for indicating time and accessing corresponding 
timestamps
 in table programs.[1]
This mean Window can only be defined over a time attribute column.
You need define a rowtime in your source just like (UserActionTime is a long 
field, you don't need convert it to Timestamp):
Table table = tEnv.fromDataStream(stream, "Username, Data, 
UserActionTime.rowtime");
See more information in below document:
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time

Best, JingsongLee


--
From:Yu Yang 
Send Time:2019年6月5日(星期三) 14:57
To:user 
Subject:can flink sql handle udf-generated timestamp field

Hi, 

I am trying to use Flink SQL to do aggregation on a hopping window. In the data 
stream, we store the timestamp in long type. So I wrote a UDF 'FROM_UNIXTIME' 
to convert long to Timestamp type. 

  public static class TimestampModifier extends ScalarFunction {
public Timestamp eval(long t) {
  return new Timestamp(t);
}
public TypeInformation getResultType(Class[] signature) {
  return Types.SQL_TIMESTAMP;
}
  }

With the above UDF, I wrote the following query, and ran into  
"ProgramInvocationException: The main method caused an error: Window can only 
be defined over a time attribute column". 
Any suggestions on how to resolve this issue? I am using Flink 1.8 for this 
experiment. 

my sql query: 

select  keyid, sum(value) 
from ( 
   select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value 
   from orders) 
 group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid 

flink exception: 

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Window can only be defined over a time attribute column.
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
 at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
 at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
 at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
 at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
 at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.table.api.ValidationException: Window can only be 
defined over a time attribute column.
 at 
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
 at 
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99)
 at 
org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
 at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
 at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559)
 at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418)

Regards, 
-Yu



Re: can flink sql handle udf-generated timestamp field

2019-06-05 Thread Yu Yang
+flink-user

On Wed, Jun 5, 2019 at 9:58 AM Yu Yang  wrote:

> Thanks for the reply!  In flink-table-planner, TimeIndicatorTypeInfo is an
> internal class that cannot be referenced from application. I got "cannot
> find symbol" error when I tried to use it. I have also tried to use "
> SqlTimeTypeInfo.getInfoFor(Timestamp.class) " as return type for my udf
> type info. With that, I got the same  "Window can only be defined over a
> time attribute column" error as before.
>
> On Wed, Jun 5, 2019 at 4:41 AM Lee tinker 
> wrote:
>
>> Hi Yu Yang:
>> When you want to use time on window, the type of time should be right
>> according to flink.  We can see you return a Types.SQL_TIMESTAMP in your
>> UDF. This type should be TimeIndicatorTypeInfo.PROCTIME_INDICATOR
>> or  TimeIndicatorTypeInfo.ROWTIME_INDICATOR instead of Types.SQL_TIMESTAMP
>> according to your time type(proctime or rowtime). You can try it again by
>> using it.
>>
>> Yu Yang  于2019年6月5日周三 下午2:57写道:
>>
>>> Hi,
>>>
>>> I am trying to use Flink SQL to do aggregation on a hopping window. In
>>> the data stream, we store the timestamp in long type. So I wrote a UDF
>>> 'FROM_UNIXTIME' to convert long to Timestamp type.
>>>
>>>   public static class TimestampModifier extends ScalarFunction {
>>> public Timestamp eval(long t) {
>>>   return new Timestamp(t);
>>> }
>>> public TypeInformation getResultType(Class[] signature) {
>>>   return Types.SQL_TIMESTAMP;
>>> }
>>>   }
>>>
>>> With the above UDF, I wrote the following query, and ran into
>>>  "ProgramInvocationException: The main method caused an error: Window can
>>> only be defined over a time attribute column".
>>> Any suggestions on how to resolve this issue? I am using Flink 1.8 for
>>> this experiment.
>>>
>>> my sql query:
>>>
>>> select  keyid, sum(value)
>>> from (
>>>select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value
>>>from orders)
>>>  group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid
>>>
>>> flink exception:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Window can only be defined over a time attribute
>>> column.
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>> at
>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>>> Caused by: org.apache.flink.table.api.ValidationException: Window can
>>> only be defined over a time attribute column.
>>> at
>>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
>>> at
>>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99)
>>> at
>>> org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
>>> at
>>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
>>> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559)
>>> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418)
>>>
>>> Regards,
>>> -Yu
>>>
>>


Re: can flink sql handle udf-generated timestamp field

2019-06-05 Thread Yu Yang
Hi Jingsong,

Thanks for the reply! The following is our code snippet for creating the
log stream.  Our messages are in thrift format. We use a customized
serializer for serializing/deserializing messages ( see
https://github.com/apache/flink/pull/8067 for the implementation) . Given
that, how shall we define a time attribute column?  We'd like to leverage
customized serializer to figure out column names as much as possible.

ThriftDeserializationSchema deserializationSchema =
new ThriftDeserializationSchema(CustomerOrders.class,
ThriftCodeGenerator.SCROOGE);

FlinkKafkaConsumer kafkaConsumer =
new FlinkKafkaConsumer(“customer_orders”, deserializationSchema,
m10n05Properties);

tableEnv.registerDataStream(“orders”, kafkaConsumer);

Regards,
-Yu

On Wed, Jun 5, 2019 at 11:15 PM JingsongLee  wrote:

> Hi @Yu Yang:
> Time-based operations such as windows in both the Table API and SQL require
>
>  information about the notion of time and its origin. Therefore, tables can 
> offer
>
>  logical time attributes for indicating time and accessing corresponding 
> timestamps
>  in table programs.[1]
> This mean Window can only be defined over a time attribute column.
> You need define a rowtime in your source just like (UserActionTime is a
> long field, you don't need convert it to Timestamp):
>
> Table table = tEnv.fromDataStream(stream, "Username, Data, 
> UserActionTime.rowtime");
>
> See more information in below document:
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time
>
> Best, JingsongLee
>
> --
> From:Yu Yang 
> Send Time:2019年6月5日(星期三) 14:57
> To:user 
> Subject:can flink sql handle udf-generated timestamp field
>
> Hi,
>
> I am trying to use Flink SQL to do aggregation on a hopping window. In the
> data stream, we store the timestamp in long type. So I wrote a UDF
> 'FROM_UNIXTIME' to convert long to Timestamp type.
>
>   public static class TimestampModifier extends ScalarFunction {
> public Timestamp eval(long t) {
>   return new Timestamp(t);
> }
> public TypeInformation getResultType(Class[] signature) {
>   return Types.SQL_TIMESTAMP;
> }
>   }
>
> With the above UDF, I wrote the following query, and ran into
>  "ProgramInvocationException: The main method caused an error: Window can
> only be defined over a time attribute column".
> Any suggestions on how to resolve this issue? I am using Flink 1.8 for
> this experiment.
>
> my sql query:
>
> select  keyid, sum(value)
> from (
>select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value
>from orders)
>  group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid
>
> flink exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Window can only be defined over a time attribute
> column.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.table.api.ValidationException: Window can only
> be defined over a time attribute column.
> at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
> at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99)
> at
> org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559)
> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418)
>
> Regards,
> -Yu
>
>
>


Re: can flink sql handle udf-generated timestamp field

2019-06-06 Thread Fabian Hueske
Hi Yu,

When you register a DataStream as a Table, you can create a new attribute
that contains the event timestamp of the DataStream records.
For that, you would need to assign timestamps and generate watermarks
before registering the stream:

FlinkKafkaConsumer kafkaConsumer =
new FlinkKafkaConsumer(“customer_orders”, deserializationSchema,
m10n05Properties);

// create DataStream from Kafka consumer
DataStream orders = env.addSource(kafkaConsumer);
// assign timestamps with a custom timestamp assigner & WM generator
DataStream ordersWithTS =
orders.assignTimestampsAndWatermarks(new YourTimestampAssigner());

// register DataStream as Table with ts as timestamp which is automatically
extracted (see [1] for how to map POJO fields and [2] for timestamps)
tableEnv.registerDataStream("custom_orders", ordersWithTS, "userName, ...,
ts.rowtime");

Hope this helps,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#mapping-of-data-types-to-table-schema
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion-1

Am Do., 6. Juni 2019 um 08:48 Uhr schrieb Yu Yang :

> Hi Jingsong,
>
> Thanks for the reply! The following is our code snippet for creating the
> log stream.  Our messages are in thrift format. We use a customized
> serializer for serializing/deserializing messages ( see
> https://github.com/apache/flink/pull/8067 for the implementation) . Given
> that, how shall we define a time attribute column?  We'd like to leverage
> customized serializer to figure out column names as much as possible.
>
> ThriftDeserializationSchema deserializationSchema =
> new ThriftDeserializationSchema(CustomerOrders.class,
> ThriftCodeGenerator.SCROOGE);
>
> FlinkKafkaConsumer kafkaConsumer =
> new FlinkKafkaConsumer(“customer_orders”, deserializationSchema,
> m10n05Properties);
>
> tableEnv.registerDataStream(“orders”, kafkaConsumer);
>
> Regards,
> -Yu
>
> On Wed, Jun 5, 2019 at 11:15 PM JingsongLee 
> wrote:
>
>> Hi @Yu Yang:
>>
>> Time-based operations such as windows in both the Table API and SQL require
>>
>>  information about the notion of time and its origin. Therefore, tables can 
>> offer
>>
>>  logical time attributes for indicating time and accessing corresponding 
>> timestamps
>>  in table programs.[1]
>> This mean Window can only be defined over a time attribute column.
>> You need define a rowtime in your source just like (UserActionTime is a
>> long field, you don't need convert it to Timestamp):
>>
>> Table table = tEnv.fromDataStream(stream, "Username, Data, 
>> UserActionTime.rowtime");
>>
>> See more information in below document:
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time
>>
>> Best, JingsongLee
>>
>> --
>> From:Yu Yang 
>> Send Time:2019年6月5日(星期三) 14:57
>> To:user 
>> Subject:can flink sql handle udf-generated timestamp field
>>
>> Hi,
>>
>> I am trying to use Flink SQL to do aggregation on a hopping window. In
>> the data stream, we store the timestamp in long type. So I wrote a UDF
>> 'FROM_UNIXTIME' to convert long to Timestamp type.
>>
>>   public static class TimestampModifier extends ScalarFunction {
>> public Timestamp eval(long t) {
>>   return new Timestamp(t);
>> }
>> public TypeInformation getResultType(Class[] signature) {
>>   return Types.SQL_TIMESTAMP;
>> }
>>   }
>>
>> With the above UDF, I wrote the following query, and ran into
>>  "ProgramInvocationException: The main method caused an error: Window can
>> only be defined over a time attribute column".
>> Any suggestions on how to resolve this issue? I am using Flink 1.8 for
>> this experiment.
>>
>> my sql query:
>>
>> select  keyid, sum(value)
>> from (
>>select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value
>>from orders)
>>  group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid
>>
>> flink exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Window can only be defined over a time attribute
>> column.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>> at
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>> at
>> org.apa

Re: can flink sql handle udf-generated timestamp field

2019-06-06 Thread Yu Yang
Thank you Fabian!  We will try the approach that you suggest.

On Thu, Jun 6, 2019 at 1:03 AM Fabian Hueske  wrote:

> Hi Yu,
>
> When you register a DataStream as a Table, you can create a new attribute
> that contains the event timestamp of the DataStream records.
> For that, you would need to assign timestamps and generate watermarks
> before registering the stream:
>
> FlinkKafkaConsumer kafkaConsumer =
> new FlinkKafkaConsumer(“customer_orders”, deserializationSchema,
> m10n05Properties);
>
> // create DataStream from Kafka consumer
> DataStream orders = env.addSource(kafkaConsumer);
> // assign timestamps with a custom timestamp assigner & WM generator
> DataStream ordersWithTS =
> orders.assignTimestampsAndWatermarks(new YourTimestampAssigner());
>
> // register DataStream as Table with ts as timestamp which is
> automatically extracted (see [1] for how to map POJO fields and [2] for
> timestamps)
> tableEnv.registerDataStream("custom_orders", ordersWithTS, "userName, ...,
> ts.rowtime");
>
> Hope this helps,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#mapping-of-data-types-to-table-schema
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion-1
>
> Am Do., 6. Juni 2019 um 08:48 Uhr schrieb Yu Yang :
>
>> Hi Jingsong,
>>
>> Thanks for the reply! The following is our code snippet for creating the
>> log stream.  Our messages are in thrift format. We use a customized
>> serializer for serializing/deserializing messages ( see
>> https://github.com/apache/flink/pull/8067 for the implementation) .
>> Given that, how shall we define a time attribute column?  We'd like to
>> leverage customized serializer to figure out column names as much as
>> possible.
>>
>> ThriftDeserializationSchema deserializationSchema =
>> new ThriftDeserializationSchema(CustomerOrders.class,
>> ThriftCodeGenerator.SCROOGE);
>>
>> FlinkKafkaConsumer kafkaConsumer =
>> new FlinkKafkaConsumer(“customer_orders”, deserializationSchema,
>> m10n05Properties);
>>
>> tableEnv.registerDataStream(“orders”, kafkaConsumer);
>>
>> Regards,
>> -Yu
>>
>> On Wed, Jun 5, 2019 at 11:15 PM JingsongLee 
>> wrote:
>>
>>> Hi @Yu Yang:
>>>
>>> Time-based operations such as windows in both the Table API and SQL require
>>>
>>>  information about the notion of time and its origin. Therefore, tables can 
>>> offer
>>>
>>>  logical time attributes for indicating time and accessing corresponding 
>>> timestamps
>>>  in table programs.[1]
>>> This mean Window can only be defined over a time attribute column.
>>> You need define a rowtime in your source just like (UserActionTime is a
>>> long field, you don't need convert it to Timestamp):
>>>
>>> Table table = tEnv.fromDataStream(stream, "Username, Data, 
>>> UserActionTime.rowtime");
>>>
>>> See more information in below document:
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time
>>>
>>> Best, JingsongLee
>>>
>>> --
>>> From:Yu Yang 
>>> Send Time:2019年6月5日(星期三) 14:57
>>> To:user 
>>> Subject:can flink sql handle udf-generated timestamp field
>>>
>>> Hi,
>>>
>>> I am trying to use Flink SQL to do aggregation on a hopping window. In
>>> the data stream, we store the timestamp in long type. So I wrote a UDF
>>> 'FROM_UNIXTIME' to convert long to Timestamp type.
>>>
>>>   public static class TimestampModifier extends ScalarFunction {
>>> public Timestamp eval(long t) {
>>>   return new Timestamp(t);
>>> }
>>> public TypeInformation getResultType(Class[] signature) {
>>>   return Types.SQL_TIMESTAMP;
>>> }
>>>   }
>>>
>>> With the above UDF, I wrote the following query, and ran into
>>>  "ProgramInvocationException: The main method caused an error: Window can
>>> only be defined over a time attribute column".
>>> Any suggestions on how to resolve this issue? I am using Flink 1.8 for
>>> this experiment.
>>>
>>> my sql query:
>>>
>>> select  keyid, sum(value)
>>> from (
>>>select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value
>>>from orders)
>>>  group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid
>>>
>>> flink exception:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Window can only be defined over a time attribute
>>> column.
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFronte