Hi Yu,

When you register a DataStream as a Table, you can create a new attribute
that contains the event timestamp of the DataStream records.
For that, you would need to assign timestamps and generate watermarks
before registering the stream:

FlinkKafkaConsumer kafkaConsumer =
        new FlinkKafkaConsumer(“customer_orders”, deserializationSchema,
m10n05Properties);

// create DataStream from Kafka consumer
DataStream<CustomerOrders> orders = env.addSource(kafkaConsumer);
// assign timestamps with a custom timestamp assigner & WM generator
DataStream<CustomerOrders> ordersWithTS =
orders.assignTimestampsAndWatermarks(new YourTimestampAssigner());

// register DataStream as Table with ts as timestamp which is automatically
extracted (see [1] for how to map POJO fields and [2] for timestamps)
tableEnv.registerDataStream("custom_orders", ordersWithTS, "userName, ...,
ts.rowtime");

Hope this helps,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#mapping-of-data-types-to-table-schema
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion-1

Am Do., 6. Juni 2019 um 08:48 Uhr schrieb Yu Yang <yuyan...@gmail.com>:

> Hi Jingsong,
>
> Thanks for the reply! The following is our code snippet for creating the
> log stream.  Our messages are in thrift format. We use a customized
> serializer for serializing/deserializing messages ( see
> https://github.com/apache/flink/pull/8067 for the implementation) . Given
> that, how shall we define a time attribute column?  We'd like to leverage
> customized serializer to figure out column names as much as possible.
>
>     ThriftDeserializationSchema deserializationSchema =
>         new ThriftDeserializationSchema(CustomerOrders.class,
> ThriftCodeGenerator.SCROOGE);
>
>     FlinkKafkaConsumer kafkaConsumer =
>         new FlinkKafkaConsumer(“customer_orders”, deserializationSchema,
> m10n05Properties);
>
>     tableEnv.registerDataStream(“orders”, kafkaConsumer);
>
> Regards,
> -Yu
>
> On Wed, Jun 5, 2019 at 11:15 PM JingsongLee <lzljs3620...@aliyun.com>
> wrote:
>
>> Hi @Yu Yang:
>>
>> Time-based operations such as windows in both the Table API and SQL require
>>
>>  information about the notion of time and its origin. Therefore, tables can 
>> offer
>>
>>  logical time attributes for indicating time and accessing corresponding 
>> timestamps
>>  in table programs.[1]
>> This mean Window can only be defined over a time attribute column.
>> You need define a rowtime in your source just like (UserActionTime is a
>> long field, you don't need convert it to Timestamp):
>>
>> Table table = tEnv.fromDataStream(stream, "Username, Data, 
>> UserActionTime.rowtime");
>>
>> See more information in below document:
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time
>>
>> Best, JingsongLee
>>
>> ------------------------------------------------------------------
>> From:Yu Yang <yuyan...@gmail.com>
>> Send Time:2019年6月5日(星期三) 14:57
>> To:user <user@flink.apache.org>
>> Subject:can flink sql handle udf-generated timestamp field
>>
>> Hi,
>>
>> I am trying to use Flink SQL to do aggregation on a hopping window. In
>> the data stream, we store the timestamp in long type. So I wrote a UDF
>> 'FROM_UNIXTIME' to convert long to Timestamp type.
>>
>>   public static class TimestampModifier extends ScalarFunction {
>>     public Timestamp eval(long t) {
>>       return new Timestamp(t);
>>     }
>>     public TypeInformation<?> getResultType(Class<?>[] signature) {
>>       return Types.SQL_TIMESTAMP;
>>     }
>>   }
>>
>> With the above UDF, I wrote the following query, and ran into
>>  "ProgramInvocationException: The main method caused an error: Window can
>> only be defined over a time attribute column".
>> Any suggestions on how to resolve this issue? I am using Flink 1.8 for
>> this experiment.
>>
>> my sql query:
>>
>> select  keyid, sum(value)
>> from (
>>    select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value
>>    from orders)
>>  group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid
>>
>> flink exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Window can only be defined over a time attribute
>> column.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>> at
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>> at
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>> Caused by: org.apache.flink.table.api.ValidationException: Window can
>> only be defined over a time attribute column.
>> at
>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
>> at
>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99)
>> at
>> org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
>> at
>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
>> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559)
>> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418)
>>
>> Regards,
>> -Yu
>>
>>
>>

Reply via email to