Hi,
补充下昨天线下提供的答疑,在从datastream 转换到Table时,如果希望转换后的Table上能够继续使用watermark,
需要(1)让datastream中包含watermark (2)在table上声明event time 属性. 文档可以参考[1]

给出文档中省略的watermark生成部分code:

        // 老版本
//        Table orders = 
tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor<Order>() {
//                @Override
//                public long extractAscendingTimestamp(Order element) {
//                    return element.rowtime;
//                }
//            }), $("user"), $("product"), $("amount"),$("rowtime").rowtime());
        // 新版本
        Table orders = 
tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((ctx)
 -> new WatermarkGenerator<Order>() {
                @Override
                public void onEvent(Order order, long eventTimestamp, 
WatermarkOutput watermarkOutput) {
                    watermarkOutput.emitWatermark(new 
Watermark(eventTimestamp));
                }
                @Override
                public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                }
            }))
            , $("user"), $("product"), $("amount"),$("rowtime").rowtime());
如果代码不多,可以直接贴在邮件中哈。


祝好,
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/time_attributes.html#%E5%9C%A8-datastream-%E5%88%B0-table-%E8%BD%AC%E6%8D%A2%E6%97%B6%E5%AE%9A%E4%B9%89-1

> 在 2020年12月10日,11:10,Jark Wu <imj...@gmail.com> 写道:
> 
> 链接错了。重发下。
> 
> 1) 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
> <https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html>
> 2) 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
> 表。这一行应该执行不成功把。
> 
> Best,
> Jark
> 
> On Thu, 10 Dec 2020 at 11:09, Jark Wu <imj...@gmail.com> wrote:
> 
>> 1). 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
>> https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html
>> 2. 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
>> 表。这一行应该执行不成功把。
>> 
>> Best,
>> Jark
>> 
>> On Wed, 9 Dec 2020 at 15:44, Appleyuchi <appleyu...@163.com> wrote:
>> 
>>> 代码是:
>>> https://paste.ubuntu.com/p/gVGrj2V7ZF/
>>> 报错:
>>> A group window expects a time attribute for grouping in a stream
>>> environment.
>>> 但是代码的数据源中已经有时间属性了.
>>> 请问应该怎么修改代码?
>>> 谢谢
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>> 
>> 

回复