Thank you Fabian!  We will try the approach that you suggest.

On Thu, Jun 6, 2019 at 1:03 AM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Yu,
>
> When you register a DataStream as a Table, you can create a new attribute
> that contains the event timestamp of the DataStream records.
> For that, you would need to assign timestamps and generate watermarks
> before registering the stream:
>
> FlinkKafkaConsumer kafkaConsumer =
>         new FlinkKafkaConsumer(“customer_orders”, deserializationSchema,
> m10n05Properties);
>
> // create DataStream from Kafka consumer
> DataStream<CustomerOrders> orders = env.addSource(kafkaConsumer);
> // assign timestamps with a custom timestamp assigner & WM generator
> DataStream<CustomerOrders> ordersWithTS =
> orders.assignTimestampsAndWatermarks(new YourTimestampAssigner());
>
> // register DataStream as Table with ts as timestamp which is
> automatically extracted (see [1] for how to map POJO fields and [2] for
> timestamps)
> tableEnv.registerDataStream("custom_orders", ordersWithTS, "userName, ...,
> ts.rowtime");
>
> Hope this helps,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#mapping-of-data-types-to-table-schema
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion-1
>
> Am Do., 6. Juni 2019 um 08:48 Uhr schrieb Yu Yang <yuyan...@gmail.com>:
>
>> Hi Jingsong,
>>
>> Thanks for the reply! The following is our code snippet for creating the
>> log stream.  Our messages are in thrift format. We use a customized
>> serializer for serializing/deserializing messages ( see
>> https://github.com/apache/flink/pull/8067 for the implementation) .
>> Given that, how shall we define a time attribute column?  We'd like to
>> leverage customized serializer to figure out column names as much as
>> possible.
>>
>>     ThriftDeserializationSchema deserializationSchema =
>>         new ThriftDeserializationSchema(CustomerOrders.class,
>> ThriftCodeGenerator.SCROOGE);
>>
>>     FlinkKafkaConsumer kafkaConsumer =
>>         new FlinkKafkaConsumer(“customer_orders”, deserializationSchema,
>> m10n05Properties);
>>
>>     tableEnv.registerDataStream(“orders”, kafkaConsumer);
>>
>> Regards,
>> -Yu
>>
>> On Wed, Jun 5, 2019 at 11:15 PM JingsongLee <lzljs3620...@aliyun.com>
>> wrote:
>>
>>> Hi @Yu Yang:
>>>
>>> Time-based operations such as windows in both the Table API and SQL require
>>>
>>>  information about the notion of time and its origin. Therefore, tables can 
>>> offer
>>>
>>>  logical time attributes for indicating time and accessing corresponding 
>>> timestamps
>>>  in table programs.[1]
>>> This mean Window can only be defined over a time attribute column.
>>> You need define a rowtime in your source just like (UserActionTime is a
>>> long field, you don't need convert it to Timestamp):
>>>
>>> Table table = tEnv.fromDataStream(stream, "Username, Data, 
>>> UserActionTime.rowtime");
>>>
>>> See more information in below document:
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time
>>>
>>> Best, JingsongLee
>>>
>>> ------------------------------------------------------------------
>>> From:Yu Yang <yuyan...@gmail.com>
>>> Send Time:2019年6月5日(星期三) 14:57
>>> To:user <user@flink.apache.org>
>>> Subject:can flink sql handle udf-generated timestamp field
>>>
>>> Hi,
>>>
>>> I am trying to use Flink SQL to do aggregation on a hopping window. In
>>> the data stream, we store the timestamp in long type. So I wrote a UDF
>>> 'FROM_UNIXTIME' to convert long to Timestamp type.
>>>
>>>   public static class TimestampModifier extends ScalarFunction {
>>>     public Timestamp eval(long t) {
>>>       return new Timestamp(t);
>>>     }
>>>     public TypeInformation<?> getResultType(Class<?>[] signature) {
>>>       return Types.SQL_TIMESTAMP;
>>>     }
>>>   }
>>>
>>> With the above UDF, I wrote the following query, and ran into
>>>  "ProgramInvocationException: The main method caused an error: Window can
>>> only be defined over a time attribute column".
>>> Any suggestions on how to resolve this issue? I am using Flink 1.8 for
>>> this experiment.
>>>
>>> my sql query:
>>>
>>> select  keyid, sum(value)
>>> from (
>>>    select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value
>>>    from orders)
>>>  group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid
>>>
>>> flink exception:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Window can only be defined over a time attribute
>>> column.
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>> at
>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>>> Caused by: org.apache.flink.table.api.ValidationException: Window can
>>> only be defined over a time attribute column.
>>> at
>>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
>>> at
>>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99)
>>> at
>>> org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
>>> at
>>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
>>> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559)
>>> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418)
>>>
>>> Regards,
>>> -Yu
>>>
>>>
>>>

Reply via email to