文档说明: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/event_timestamps_watermarks.html


这个网页我打不开,有其他的地址么?
谢谢




------------------ 原始邮件 ------------------
发件人:&nbsp;"Djeng Lee"<lee.ro...@gmail.com&gt;;
发送时间:&nbsp;2020年4月4日(星期六) 晚上6:35
收件人:&nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: 回复: 回复: Flink双流Join问题



刚刚我说的不严谨

Start = 1000000055000 - (1000000055000 - 0 + 3000) % 3000 = 1000000053000
End = 1000000053000 + 3000

//源码位置,所以窗口开端并不是你传入首条记录的作为开端。窗口划分是从0时间戳切过来的。

文档说明: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/event_timestamps_watermarks.html

相关代码
public static long getWindowStartWithOffset(long timestamp, long offset, long 
windowSize) {
&nbsp;  return timestamp - (timestamp - offset + windowSize) % windowSize;
}

在 2020/4/4 下午6:30,“忝忝向仧”<153488...@qq.com&gt; 写入:

&nbsp;&nbsp;&nbsp; 额,你的意思是滚动3秒的窗口开始和结束应该是
&nbsp;&nbsp;&nbsp; 1000000055000 % 3 得出结果再拿到[start,end).
&nbsp;&nbsp;&nbsp; 比如1000000055000 % 3 
的结果是1000000053000,那么窗口是[1000000053000,1000000056000)
&nbsp;&nbsp;&nbsp; 是这么理解吧
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&nbsp;&nbsp;&nbsp; 发件人:&amp;nbsp;"lee.roval"<lee.ro...@gmail.com&amp;gt;;
&nbsp;&nbsp;&nbsp; 发送时间:&amp;nbsp;2020年4月4日(星期六) 晚上6:25
&nbsp;&nbsp;&nbsp; 
收件人:&amp;nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&amp;gt;;
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 主题:&amp;nbsp;Re: 回复: Flink双流Join问题
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 55000的窗口分配,是对windowSize 求模然后拿到start 和 end。 不是从你首条记录开始算。
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 在 2020/4/4 下午6:23,“忝忝向仧”<153488...@qq.com&amp;gt; 写入:
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 对,我只是回复把前面那串省略了,没写.
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 
[1000000053000,1000000056000),为什么是1000000053000开始?
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 我第一条输入的是
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 1,tom1,1000000055000
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 
------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 
发件人:&amp;amp;nbsp;"libenchao"<libenc...@gmail.com&amp;amp;gt;;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 
发送时间:&amp;amp;nbsp;2020年4月4日(星期六) 晚上6:20
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 
收件人:&amp;amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;amp;gt;;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 主题:&amp;amp;nbsp;Re: 
Flink双流Join问题
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 
你的watermark不是56000,而是1000000056000吧。所以应该是[1000000053000,
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 1000000056000)是一个窗口吧。
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 忝忝向仧 
<153488...@qq.com&amp;amp;gt; 于2020年4月4日周六 下午6:16写道:
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 下发新的?
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
4是一个kafka的source,3是另外一个kafka的source.
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 如果按照3秒的一个窗口
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
watermark触发窗口的条件是watermark_time&amp;amp;amp;gt;=window_endtime
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
也就是说[55000,57000)应该是一个窗口的.
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
我是这么理解的,但是结果56000后就输出了
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
currentTimeStamp:&amp;amp;amp;nbsp;1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
4&amp;amp;amp;gt; (1,tom1,1000000055000)
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
currentTimeStamp:&amp;amp;amp;nbsp;1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
4&amp;amp;amp;gt; (1,tom2,1000000056000)
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
currentTimeStamp:&amp;amp;amp;nbsp;1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
3&amp;amp;amp;gt; (1,jerry1,1000000055000)
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
currentTimeStamp:&amp;amp;amp;nbsp;1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
3&amp;amp;amp;gt; (1,jerry2,1000000056000)
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
2&amp;amp;amp;gt; tom1=jerry1
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
------------------&amp;amp;amp;nbsp;原始邮件&amp;amp;amp;nbsp;------------------
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
发件人:&amp;amp;amp;nbsp;"lee.roval"<lee.ro...@gmail.com&amp;amp;amp;gt;;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
发送时间:&amp;amp;amp;nbsp;2020年4月4日(星期六) 晚上6:10
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
收件人:&amp;amp;amp;nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&amp;amp;amp;gt;;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
主题:&amp;amp;amp;nbsp;Re: Flink双流Join问题
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
56000后不是下发新的watermark了嘛
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 在 2020/4/4 
下午5:57,“忝忝向仧”<153488...@qq.com&amp;amp;amp;gt; 写入:
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
各位好:&amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp; 
&amp;amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
Flink双流Join遇到一个问题,能否解释下,谢谢.
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp; 
&amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
ds1和ds2分别读取kafka两个流数据,使用event time和watermark特性,3s的一个翻滚窗口,定义如下:
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp; 
&amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
最后,join输出的时候,为什么触发窗口的数据第二条就触发了?
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp; 
&amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
按照水印的触发条件应该是watermark_time&amp;amp;amp;amp;gt;=window_endtime.那么,这里应该是1000000057000这条数据来了后才会触发,但是结果却是56000就触发了.为什么?
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 定义的代码如下:
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
DataStream<String&amp;amp;amp;amp;gt; stream1 = env
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; .addSource(new 
FlinkKafkaConsumer09<String&amp;amp;amp;amp;gt;("stream1", new
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
SimpleStringSchema(), properties).setStartFromLatest())
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
.assignTimestampsAndWatermarks(
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; new 
AssignerWithPeriodicWatermarks<String&amp;amp;amp;amp;gt;() {
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long 
currentTimeStamp = 0L;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long 
maxDelayAllowed = 0L;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long 
currentWaterMark;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; @Nullable
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; @Override
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; public Watermark 
getCurrentWatermark() {
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; currentWaterMark 
= currentTimeStamp-maxDelayAllowed;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; return new 
Watermark(currentWaterMark);
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; }
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; @Override
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; public long 
extractTimestamp(String s, long l) {
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; String[] arr= 
s.split(" ");
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long timeStamp = 
Long.parseLong(arr[2]);
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; currentTimeStamp 
= Math.max(timeStamp, currentTimeStamp);
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
System.out.println("currentTimeStamp: " +&amp;amp;amp;nbsp; currentTimeStamp 
+",Key:" +
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; arr[0] + 
",EventTime:" + timeStamp + ",前一条数据的水位线:" + currentWaterMark);
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; return timeStamp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; }
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; }
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 );
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
DataStream<Tuple3<String,String,String&amp;amp;amp;amp;gt;&amp;amp;amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; ds1 = 
stream1.map(new MapFunction<String,
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
Tuple3<String,String,String&amp;amp;amp;amp;gt;&amp;amp;amp;amp;gt;() {
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 @Override
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 public Tuple3<String,
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
String,String&amp;amp;amp;amp;gt; map(String s1) throws Exception {
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; String[] arr = 
s1.split(" ");
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 return
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
Tuple3.of(arr[0],arr[1],arr[2]);
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 }
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; });
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; ds1.print();
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
DataStream<String&amp;amp;amp;amp;gt; stream2 = env
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; .addSource(new 
FlinkKafkaConsumer09<String&amp;amp;amp;amp;gt;("stream2", new
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
SimpleStringSchema(), properties).setStartFromLatest())
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
.assignTimestampsAndWatermarks(
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; new 
AssignerWithPeriodicWatermarks<String&amp;amp;amp;amp;gt;() {
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long 
currentTimeStamp = 0L;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long 
maxDelayAllowed = 0L;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long 
currentWaterMark;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; @Nullable
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; @Override
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; public Watermark 
getCurrentWatermark() {
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; currentWaterMark 
= currentTimeStamp-maxDelayAllowed;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; return new 
Watermark(currentWaterMark);
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; }
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; @Override
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; public long 
extractTimestamp(String s, long l) {
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; String[] arr= 
s.split(" ");
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; long timeStamp = 
Long.parseLong(arr[2]);
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; currentTimeStamp 
= Math.max(timeStamp, currentTimeStamp);
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
System.out.println("currentTimeStamp: " +&amp;amp;amp;nbsp; currentTimeStamp 
+",Key:" +
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; arr[0] + 
",EventTime:" + timeStamp + ",前一条数据的水位线:" + currentWaterMark);
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; return timeStamp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; }
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; }
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 );
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
DataStream<Tuple3<String,String,String&amp;amp;amp;amp;gt;&amp;amp;amp;amp;gt;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; ds2 
=stream2.map(new MapFunction<String,
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
Tuple3<String,String,String&amp;amp;amp;amp;gt;&amp;amp;amp;amp;gt;() {
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 @Override
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 public
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
Tuple3<String,String,String&amp;amp;amp;amp;gt; map(String s2) throws Exception 
{
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 String
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; [] arr = 
s2.split(" ");
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 return
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
Tuple3.of(arr[0],arr[1],arr[2]);
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 }
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; });
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; ds2.print();
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; ds1.join(ds2).where(new 
KeySelector<Tuple3<String,
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
String,String&amp;amp;amp;amp;gt;,String&amp;amp;amp;amp;gt;() {
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 @Override
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 public String
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
getKey(Tuple3<String, String,String&amp;amp;amp;amp;gt; value) throws Exception 
{
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 return
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; value.f0;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 }
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; }).equalTo(new 
KeySelector<Tuple3<String, String
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
,String&amp;amp;amp;amp;gt;, String&amp;amp;amp;amp;gt;() {
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 @Override
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 public String
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
getKey(Tuple3<String, String,String&amp;amp;amp;amp;gt; value) throws Exception 
{
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 return
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; value.f0;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 }
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; })
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; .apply(new 
JoinFunction<Tuple3<String,
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
String,String&amp;amp;amp;amp;gt;, Tuple3<String, 
String,String&amp;amp;amp;amp;gt;,
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
String&amp;amp;amp;amp;gt;() {
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 @Override
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 public String
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
join(Tuple3<String, String,String&amp;amp;amp;amp;gt; value1, Tuple3<String,
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
String,String&amp;amp;amp;amp;gt; value2) throws Exception {
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 return
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; value1.f1 + "=" 
+ value2.f1;
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 }
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; }).print();
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 结果如下:
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; currentTimeStamp:
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 4&amp;amp;amp;amp;gt; 
(1,tom1,1000000055000)
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; currentTimeStamp:
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 4&amp;amp;amp;amp;gt; 
(1,tom2,1000000056000)
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; currentTimeStamp:
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 3&amp;amp;amp;amp;gt; 
(1,jerry1,1000000055000)
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; currentTimeStamp:
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 3&amp;amp;amp;amp;gt; 
(1,jerry2,1000000056000)
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;amp;gt; 
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 2&amp;amp;amp;amp;gt; 
tom1=jerry1
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; -- 
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; Benchao Li
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; School of Electronics 
Engineering and Computer Science, Peking University
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; Tel:+86-15650713730
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; Email: libenc...@gmail.com; 
libenc...@pku.edu.cn

回复