给kafka吐数据时每条记录之间休眠1秒试试.

文档位置
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/event_timestamps_watermarks.html
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/joining.html


AssignerWithPeriodicWatermarks assigns timestamps and generates watermarks 
periodically (possibly depending on the stream elements, or purely based on 
processing time).

The interval (every n milliseconds) in which the watermark will be generated is 
defined via ExecutionConfig.setAutoWatermarkInterval(...). The assigner’s 
getCurrentWatermark() method will be called each time, and a new watermark will 
be emitted if the returned watermark is non-null and larger than the previous 
watermark.

Here we show two simple examples of timestamp assigners that use periodic 
watermark generation. Note that Flink ships with a 
BoundedOutOfOrdernessTimestampExtractor similar to the 
BoundedOutOfOrdernessGenerator shown below, which you can read about here.

在 2020/4/5 上午10:21,“忝忝向仧”<153488...@qq.com> 写入:

    Hi,
    
    
    感觉有点不对,我试着用stream1输入:
    1 tom1 1553503185000
    1 tom2 1553503186000
    1 tom3 1553503187000
    1 tom4 1553503188000
    1 tom_late 1553503185000
    
    
    
    stream2输入:
    1 jerry2 1553503186000
    1 jerry3 1553503187000
    1 jerry4 1553503188000
    
    
    
    但是结果打印的是:
    currentTimeStamp: 1553503185000,Key:1,EventTime:1553503185000,前一条数据的水位线:0
    4&gt; (1,tom1,1553503185000)
    currentTimeStamp: 
1553503186000,Key:1,EventTime:1553503186000,前一条数据的水位线:1553503185000
    4&gt; (1,tom2,1553503186000)
    currentTimeStamp: 
1553503187000,Key:1,EventTime:1553503187000,前一条数据的水位线:1553503186000
    4&gt; (1,tom3,1553503187000)
    currentTimeStamp: 
1553503188000,Key:1,EventTime:1553503188000,前一条数据的水位线:1553503187000
    4&gt; (1,tom4,1553503188000)
    currentTimeStamp: 
1553503188000,Key:1,EventTime:1553503185000,前一条数据的水位线:1553503188000
    4&gt; (1,tom_late,1553503185000)
    currentTimeStamp: 1553503186000,Key:1,EventTime:1553503186000,前一条数据的水位线:0
    3&gt; (1,jerry2,1553503186000)
    currentTimeStamp: 
1553503187000,Key:1,EventTime:1553503187000,前一条数据的水位线:1553503186000
    3&gt; (1,jerry3,1553503187000)
    currentTimeStamp: 
1553503188000,Key:1,EventTime:1553503188000,前一条数据的水位线:1553503187000
    3&gt; (1,jerry4,1553503188000)
    2&gt; tom1=jerry2
    2&gt; tom1=jerry3
    2&gt; tom2=jerry2
    2&gt; tom2=jerry3
    2&gt; tom3=jerry2
    2&gt; tom3=jerry3
    2&gt; tom_late=jerry2
    2&gt; tom_late=jerry3
    
    
    
    我第二个流的窗口是[1553503186000,1553503188000),怎么判断出stream1中那条迟到的是没过期的?
    谢谢.
    
    
    ------------------&nbsp;原始邮件&nbsp;------------------
    发件人:&nbsp;"libenchao"<libenc...@gmail.com&gt;;
    发送时间:&nbsp;2020年4月5日(星期天) 上午10:04
    收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
    
    主题:&nbsp;Re: 回复: 回复: Flink双流Join问题
    
    
    
    嗯,可以这么理解。
    
    忝忝向仧 <153488...@qq.com&gt; 于2020年4月4日周六 下午11:20写道:
    
    &gt; 额,明白了,意思是说两个流情况下
    &gt; 比如,stream1里面晚来的那条
    &gt; 1 tom_late 1553503185000的水印是1553503188000
    &gt; 但是stream2里面,这条1,jerry1,1553503185000的水印是1553503185000
    &gt; 所以取最小的,因此还是会被打印?
    &gt; 是这么理解么?
    &gt;
    &gt;
    &gt;
    &gt;
    &gt;
    &gt;
    &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
    &gt; 发件人:&amp;nbsp;"libenchao"<libenc...@gmail.com&amp;gt;;
    &gt; 发送时间:&amp;nbsp;2020年4月4日(星期六) 晚上11:04
    &gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
    &gt;
    &gt; 主题:&amp;nbsp;Re: 回复: 回复: Flink双流Join问题
    &gt;
    &gt;
    &gt;
    &gt; 两个stream输入的场景,operator的watermark是取两者的最小值。
    &gt; 所以虽然这条数据在第一个流里面看起来已经是肯定迟到了,但是有可能看第二个流的watermark它还没有过期。
    &gt;
    &gt;
    &gt; 忝忝向仧 <153488...@qq.com&amp;gt; 于2020年4月4日周六 下午10:42写道:
    &gt;
    &gt; &amp;gt; Hi:
    &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; 还有个疑问,我特意构造了个乱序的消息,还是3秒一个窗口
    &gt; &amp;gt; stream1:
    &gt; &amp;gt; 1 tom1 1553503185000
    &gt; &amp;gt; 1 tom2 1553503186000
    &gt; &amp;gt; 1 tom3 1553503187000
    &gt; &amp;gt; 1 tom4 1553503188000
    &gt; &amp;gt; 1 tom_late 1553503185000
    &gt; &amp;gt;
    &gt; &amp;gt;
    &gt; &amp;gt;
    &gt; &amp;gt; stream2:
    &gt; &amp;gt; 1 jerry1 1553503185000
    &gt; &amp;gt; 1 jerry2 1553503186000
    &gt; &amp;gt; 1 jerry3 1553503187000
    &gt; &amp;gt; 1 jerry4 1553503188000
    &gt; &amp;gt;
    &gt; &amp;gt;
    &gt; &amp;gt; 我代码还是之前的代码,定义水印都是maxDelayAllowed = 0L;也就是没有设置最大延时时间.
    &gt; &amp;gt; 那么,tom_late这条乱序,在第一个窗口[1553503185000,1553503188000)内不应该被输出吧?
    &gt; &amp;gt; 但是结果还是输出了,这个是为什么?
    &gt; &amp;gt;
    &gt; &amp;gt;
    &gt; &amp;gt; currentTimeStamp:
    &gt; 1553503185000,Key:1,EventTime:1553503185000,前一条数据的水位线:0
    &gt; &amp;gt; 4&amp;amp;gt; (1,tom1,1553503185000)
    &gt; &amp;gt; currentTimeStamp:
    &gt; &amp;gt; 
1553503186000,Key:1,EventTime:1553503186000,前一条数据的水位线:1553503185000
    &gt; &amp;gt; 4&amp;amp;gt; (1,tom2,1553503186000)
    &gt; &amp;gt; currentTimeStamp:
    &gt; &amp;gt; 
1553503187000,Key:1,EventTime:1553503187000,前一条数据的水位线:1553503186000
    &gt; &amp;gt; 4&amp;amp;gt; (1,tom3,1553503187000)
    &gt; &amp;gt; currentTimeStamp:
    &gt; &amp;gt; 
1553503188000,Key:1,EventTime:1553503188000,前一条数据的水位线:1553503187000
    &gt; &amp;gt; 4&amp;amp;gt; (1,tom4,1553503188000)
    &gt; &amp;gt; currentTimeStamp:
    &gt; &amp;gt; 
1553503188000,Key:1,EventTime:1553503185000,前一条数据的水位线:1553503188000
    &gt; &amp;gt; 4&amp;amp;gt; (1,tom_late,1553503185000)
    &gt; &amp;gt; currentTimeStamp:
    &gt; &amp;gt; 
1553503189000,Key:2,EventTime:1553503189000,前一条数据的水位线:1553503188000
    &gt; &amp;gt; 4&amp;amp;gt; (2,tom5,1553503189000)
    &gt; &amp;gt; currentTimeStamp:
    &gt; &amp;gt; 
1553503191000,Key:2,EventTime:1553503191000,前一条数据的水位线:1553503189000
    &gt; &amp;gt; 4&amp;amp;gt; (2,tom6,1553503191000)
    &gt; &amp;gt; currentTimeStamp:
    &gt; &amp;gt; 
1553503192000,Key:2,EventTime:1553503192000,前一条数据的水位线:1553503191000
    &gt; &amp;gt; 4&amp;amp;gt; (2,tom7,1553503192000)
    &gt; &amp;gt; currentTimeStamp:
    &gt; &amp;gt; 
1553503193000,Key:2,EventTime:1553503193000,前一条数据的水位线:1553503192000
    &gt; &amp;gt; 4&amp;amp;gt; (2,tom8,1553503193000)
    &gt; &amp;gt; currentTimeStamp:
    &gt; 1553503185000,Key:1,EventTime:1553503185000,前一条数据的水位线:0
    &gt; &amp;gt; 3&amp;amp;gt; (1,jerry1,1553503185000)
    &gt; &amp;gt; currentTimeStamp:
    &gt; &amp;gt; 
1553503186000,Key:1,EventTime:1553503186000,前一条数据的水位线:1553503185000
    &gt; &amp;gt; 3&amp;amp;gt; (1,jerry2,1553503186000)
    &gt; &amp;gt; currentTimeStamp:
    &gt; &amp;gt; 
1553503187000,Key:1,EventTime:1553503187000,前一条数据的水位线:1553503186000
    &gt; &amp;gt; 3&amp;amp;gt; (1,jerry3,1553503187000)
    &gt; &amp;gt; currentTimeStamp:
    &gt; &amp;gt; 
1553503188000,Key:1,EventTime:1553503188000,前一条数据的水位线:1553503187000
    &gt; &amp;gt; 3&amp;amp;gt; (1,jerry4,1553503188000)
    &gt; &amp;gt; 2&amp;amp;gt; tom1=jerry1
    &gt; &amp;gt; 2&amp;amp;gt; tom1=jerry2
    &gt; &amp;gt; 2&amp;amp;gt; tom1=jerry3
    &gt; &amp;gt; 2&amp;amp;gt; tom2=jerry1
    &gt; &amp;gt; 2&amp;amp;gt; tom2=jerry2
    &gt; &amp;gt; 2&amp;amp;gt; tom2=jerry3
    &gt; &amp;gt; 2&amp;amp;gt; tom3=jerry1
    &gt; &amp;gt; 2&amp;amp;gt; tom3=jerry2
    &gt; &amp;gt; 2&amp;amp;gt; tom3=jerry3
    &gt; &amp;gt; 2&amp;amp;gt; tom_late=jerry1
    &gt; &amp;gt; 2&amp;amp;gt; tom_late=jerry2
    &gt; &amp;gt; 2&amp;amp;gt; tom_late=jerry3
    &gt; &amp;gt;
    &gt; &amp;gt;
    &gt; &amp;gt;
    &gt; &amp;gt;
    &gt; &amp;gt;
    &gt; &amp;gt;
    &gt; &amp;gt;
    &gt; &amp;gt;
    &gt; &amp;gt;
    &gt; &amp;gt;
    &gt; &amp;gt;
    &gt; &amp;gt;
    &gt; &amp;gt; 
------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
    &gt; &amp;gt; 发件人:&amp;amp;nbsp;"Djeng Lee"<lee.ro...@gmail.com&amp;amp;gt;;
    &gt; &amp;gt; 发送时间:&amp;amp;nbsp;2020年4月4日(星期六) 晚上6:35
    &gt; &amp;gt; 
收件人:&amp;amp;nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org
    &gt; &amp;amp;gt;;
    &gt; &amp;gt;
    &gt; &amp;gt; 主题:&amp;amp;nbsp;Re: 回复: 回复: Flink双流Join问题
    &gt; &amp;gt;
    &gt; &amp;gt;
    &gt; &amp;gt;
    &gt; &amp;gt; 刚刚我说的不严谨
    &gt; &amp;gt;
    &gt; &amp;gt; Start = 1000000055000 - (1000000055000 - 0 + 3000) % 3000 =
    &gt; 1000000053000
    &gt; &amp;gt; End = 1000000053000 + 3000
    &gt; &amp;gt;
    &gt; &amp;gt; //源码位置,所以窗口开端并不是你传入首条记录的作为开端。窗口划分是从0时间戳切过来的。
    &gt; &amp;gt;
    &gt; &amp;gt; 文档说明:
    &gt; &amp;gt;
    &gt; 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/event_timestamps_watermarks.html
    &gt; &amp;gt;
    &gt; &amp;gt; 相关代码
    &gt; &amp;gt; public static long getWindowStartWithOffset(long timestamp, 
long
    &gt; offset,
    &gt; &amp;gt; long windowSize) {
    &gt; &amp;gt; &amp;amp;nbsp;&amp;nbsp; return timestamp - (timestamp - 
offset + windowSize)
    &gt; % windowSize;
    &gt; &amp;gt; }
    &gt; &amp;gt;
    &gt; &amp;gt; 在 2020/4/4 下午6:30,“忝忝向仧”<153488...@qq.com&amp;amp;gt; 写入:
    &gt; &amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 
额,你的意思是滚动3秒的窗口开始和结束应该是
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 1000000055000 % 3 
得出结果再拿到[start,end).
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 比如1000000055000 % 3
    &gt; &amp;gt; 的结果是1000000053000,那么窗口是[1000000053000,1000000056000)
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 是这么理解吧
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;gt; 
------------------&amp;amp;amp;nbsp;原始邮件&amp;amp;amp;nbsp;------------------
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 
发件人:&amp;amp;amp;nbsp;"lee.roval"<
    &gt; lee.ro...@gmail.com&amp;amp;amp;gt;;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 
发送时间:&amp;amp;amp;nbsp;2020年4月4日(星期六)
    &gt; 晚上6:25
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 
收件人:&amp;amp;amp;nbsp;"
    &gt; user-zh@flink.apache.org"<
    &gt; &amp;gt; user-zh@flink.apache.org&amp;amp;amp;gt;;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 
主题:&amp;amp;amp;nbsp;Re: 回复: Flink双流Join问题
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 
55000的窗口分配,是对windowSize 求模然后拿到start 和
    &gt; end。 不是从你首条记录开始算。
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 在 2020/4/4 
下午6:23,“忝忝向仧”<
    &gt; 153488...@qq.com&amp;amp;amp;gt; 写入:
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
对,我只是回复把前面那串省略了,没写.
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
    &gt; &amp;gt; [1000000053000,1000000056000),为什么是1000000053000开始?
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 我第一条输入的是
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
1,tom1,1000000055000
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
    &gt; &amp;gt;
    &gt; 
------------------&amp;amp;amp;amp;nbsp;原始邮件&amp;amp;amp;amp;nbsp;------------------
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
    &gt; &amp;gt; 发件人:&amp;amp;amp;amp;nbsp;"libenchao"<libenc...@gmail.com
    &gt; &amp;amp;amp;amp;gt;;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
    &gt; &amp;gt; 发送时间:&amp;amp;amp;amp;nbsp;2020年4月4日(星期六) 晚上6:20
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
    &gt; &amp;gt; 收件人:&amp;amp;amp;amp;nbsp;"user-zh"<user-zh@flink.apache.org
    &gt; &amp;amp;amp;amp;gt;;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
主题:&amp;amp;amp;amp;nbsp;Re:
    &gt; &amp;gt; Flink双流Join问题
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
    &gt; &amp;gt; 你的watermark不是56000,而是1000000056000吧。所以应该是[1000000053000,
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
1000000056000)是一个窗口吧。
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 忝忝向仧 
<153488...@qq.com
    &gt; &amp;amp;amp;amp;gt;
    &gt; &amp;gt; 于2020年4月4日周六 下午6:16写道:
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; 下发新的?
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 4是一个kafka的source,3是另外一个kafka的source.
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; 如果按照3秒的一个窗口
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
watermark触发窗口的条件是watermark_time&amp;amp;amp;amp;amp;gt;=window_endtime
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 也就是说[55000,57000)应该是一个窗口的.
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 我是这么理解的,但是结果56000后就输出了
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
currentTimeStamp:&amp;amp;amp;amp;amp;nbsp;1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 4&amp;amp;amp;amp;amp;gt; (1,tom1,1000000055000)
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
currentTimeStamp:&amp;amp;amp;amp;amp;nbsp;1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 4&amp;amp;amp;amp;amp;gt; (1,tom2,1000000056000)
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
currentTimeStamp:&amp;amp;amp;amp;amp;nbsp;1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 3&amp;amp;amp;amp;amp;gt; (1,jerry1,1000000055000)
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
currentTimeStamp:&amp;amp;amp;amp;amp;nbsp;1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 3&amp;amp;amp;amp;amp;gt; (1,jerry2,1000000056000)
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 2&amp;amp;amp;amp;amp;gt; tom1=jerry1
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
------------------&amp;amp;amp;amp;amp;nbsp;原始邮件&amp;amp;amp;amp;amp;nbsp;------------------
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 发件人:&amp;amp;amp;amp;amp;nbsp;"lee.roval"<lee.ro...@gmail.com
    &gt; &amp;amp;amp;amp;amp;gt;;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 发送时间:&amp;amp;amp;amp;amp;nbsp;2020年4月4日(星期六) 晚上6:10
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 收件人:&amp;amp;amp;amp;amp;nbsp;"user-zh@flink.apache.org"<
    &gt; user-zh@flink.apache.org
    &gt; &amp;gt; &amp;amp;amp;amp;amp;gt;;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 主题:&amp;amp;amp;amp;amp;nbsp;Re: Flink双流Join问题
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 56000后不是下发新的watermark了嘛
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; 在 2020/4/4
    &gt; &amp;gt; 下午5:57,“忝忝向仧”<153488...@qq.com&amp;amp;amp;amp;amp;gt; 写入:
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; 各位好:&amp;amp;amp;amp;amp;amp;nbsp; 
&amp;amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; Flink双流Join遇到一个问题,能否解释下,谢谢.
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; ds1和ds2分别读取kafka两个流数据,使用event time和watermark特性,3s的一个翻滚窗口,定义如下:
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 最后,join输出的时候,为什么触发窗口的数据第二条就触发了?
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;amp;amp;amp;amp;nbsp; &amp;amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
按照水印的触发条件应该是watermark_time&amp;amp;amp;amp;amp;amp;gt;=window_endtime.那么,这里应该是1000000057000这条数据来了后才会触发,但是结果却是56000就触发了.为什么?
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; 定义的代码如下:
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; DataStream<String&amp;amp;amp;amp;amp;amp;gt; stream1 = env
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; .addSource(new
    &gt; FlinkKafkaConsumer09<String&amp;amp;amp;amp;amp;amp;gt;("stream1",
    &gt; &amp;gt; new
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; SimpleStringSchema(), properties).setStartFromLatest())
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; .assignTimestampsAndWatermarks(
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; new
    &gt; &amp;gt; 
AssignerWithPeriodicWatermarks<String&amp;amp;amp;amp;amp;amp;gt;() {
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; long
    &gt; &amp;gt; currentTimeStamp = 0L;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; long
    &gt; &amp;gt; maxDelayAllowed = 0L;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; long
    &gt; &amp;gt; currentWaterMark;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; @Nullable
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; @Override
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; public
    &gt; &amp;gt; Watermark getCurrentWatermark() {
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; currentWaterMark = currentTimeStamp-maxDelayAllowed;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; return new
    &gt; &amp;gt; Watermark(currentWaterMark);
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; }
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; @Override
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; public long
    &gt; &amp;gt; extractTimestamp(String s, long l) {
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; String[]
    &gt; &amp;gt; arr= s.split(" ");
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; long
    &gt; &amp;gt; timeStamp = Long.parseLong(arr[2]);
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; currentTimeStamp = Math.max(timeStamp, currentTimeStamp);
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; System.out.println("currentTimeStamp: " 
+&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; currentTimeStamp +",Key:" +
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; arr[0] +
    &gt; &amp;gt; ",EventTime:" + timeStamp + ",前一条数据的水位线:" + currentWaterMark);
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; return
    &gt; &amp;gt; timeStamp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; }
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; }
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; );
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt;
    &gt; 
DataStream<Tuple3<String,String,String&amp;amp;amp;amp;amp;amp;gt;&amp;amp;amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; ds1 =
    &gt; &amp;gt; stream1.map(new MapFunction<String,
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
Tuple3<String,String,String&amp;amp;amp;amp;amp;amp;gt;&amp;amp;amp;amp;amp;amp;gt;()
    &gt; {
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; @Override
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; public Tuple3<String,
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; String,String&amp;amp;amp;amp;amp;amp;gt; map(String s1) 
throws Exception
    &gt; {
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; String[]
    &gt; &amp;gt; arr = s1.split(" ");
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; return
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; Tuple3.of(arr[0],arr[1],arr[2]);
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; }
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp; 
});
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; ds1.print();
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; DataStream<String&amp;amp;amp;amp;amp;amp;gt; stream2 = env
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; .addSource(new
    &gt; FlinkKafkaConsumer09<String&amp;amp;amp;amp;amp;amp;gt;("stream2",
    &gt; &amp;gt; new
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; SimpleStringSchema(), properties).setStartFromLatest())
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; .assignTimestampsAndWatermarks(
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; new
    &gt; &amp;gt; 
AssignerWithPeriodicWatermarks<String&amp;amp;amp;amp;amp;amp;gt;() {
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; long
    &gt; &amp;gt; currentTimeStamp = 0L;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; long
    &gt; &amp;gt; maxDelayAllowed = 0L;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; long
    &gt; &amp;gt; currentWaterMark;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; @Nullable
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; @Override
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; public
    &gt; &amp;gt; Watermark getCurrentWatermark() {
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; currentWaterMark = currentTimeStamp-maxDelayAllowed;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; return new
    &gt; &amp;gt; Watermark(currentWaterMark);
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; }
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; @Override
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; public long
    &gt; &amp;gt; extractTimestamp(String s, long l) {
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; String[]
    &gt; &amp;gt; arr= s.split(" ");
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; long
    &gt; &amp;gt; timeStamp = Long.parseLong(arr[2]);
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; currentTimeStamp = Math.max(timeStamp, currentTimeStamp);
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; System.out.println("currentTimeStamp: " 
+&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; currentTimeStamp +",Key:" +
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; arr[0] +
    &gt; &amp;gt; ",EventTime:" + timeStamp + ",前一条数据的水位线:" + currentWaterMark);
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; return
    &gt; &amp;gt; timeStamp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; }
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; }
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; );
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt;
    &gt; 
DataStream<Tuple3<String,String,String&amp;amp;amp;amp;amp;amp;gt;&amp;amp;amp;amp;amp;amp;gt;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; ds2
    &gt; &amp;gt; =stream2.map(new MapFunction<String,
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
Tuple3<String,String,String&amp;amp;amp;amp;amp;amp;gt;&amp;amp;amp;amp;amp;amp;gt;()
    &gt; {
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; @Override
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; public
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; Tuple3<String,String,String&amp;amp;amp;amp;amp;amp;gt; 
map(String s2)
    &gt; throws
    &gt; &amp;gt; Exception {
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; String
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; [] arr =
    &gt; &amp;gt; s2.split(" ");
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; return
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; Tuple3.of(arr[0],arr[1],arr[2]);
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; }
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp; 
});
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; ds2.print();
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; ds1.join(ds2).where(new KeySelector<Tuple3<String,
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
String,String&amp;amp;amp;amp;amp;amp;gt;,String&amp;amp;amp;amp;amp;amp;gt;() {
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; @Override
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; public String
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; getKey(Tuple3<String, 
String,String&amp;amp;amp;amp;amp;amp;gt; value)
    &gt; throws
    &gt; &amp;gt; Exception {
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; return
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; value.f0;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; }
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; }).equalTo(new
    &gt; &amp;gt; KeySelector<Tuple3<String, String
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; ,String&amp;amp;amp;amp;amp;amp;gt;, 
String&amp;amp;amp;amp;amp;amp;gt;() {
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; @Override
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; public String
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; getKey(Tuple3<String, 
String,String&amp;amp;amp;amp;amp;amp;gt; value)
    &gt; throws
    &gt; &amp;gt; Exception {
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; return
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; value.f0;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; }
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp; 
})
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; .apply(new
    &gt; &amp;gt; JoinFunction<Tuple3<String,
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; String,String&amp;amp;amp;amp;amp;amp;gt;, Tuple3<String,
    &gt; &amp;gt; String,String&amp;amp;amp;amp;amp;amp;gt;,
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; String&amp;amp;amp;amp;amp;amp;gt;() {
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; @Override
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; public String
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; join(Tuple3<String, String,String&amp;amp;amp;amp;amp;amp;gt; 
value1,
    &gt; Tuple3<String,
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; String,String&amp;amp;amp;amp;amp;amp;gt; value2) throws 
Exception {
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; return
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt; value1.f1 +
    &gt; &amp;gt; "=" + value2.f1;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt;
    &gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; }
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; }).print();
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; 结果如下:
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; currentTimeStamp:
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; 4&amp;amp;amp;amp;amp;amp;gt; (1,tom1,1000000055000)
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; currentTimeStamp:
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; 4&amp;amp;amp;amp;amp;amp;gt; (1,tom2,1000000056000)
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; currentTimeStamp:
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; 3&amp;amp;amp;amp;amp;amp;gt; (1,jerry1,1000000055000)
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; currentTimeStamp:
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; 3&amp;amp;amp;amp;amp;amp;gt; (1,jerry2,1000000056000)
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
&amp;amp;amp;amp;gt;
    &gt; &amp;gt; 
&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;&amp;amp;amp;amp;amp;nbsp;
    &gt; &amp;gt; 2&amp;amp;amp;amp;amp;amp;gt; tom1=jerry1
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; --
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; Benchao Li
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; School of 
Electronics
    &gt; &amp;gt; Engineering and Computer Science, Peking University
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
Tel:+86-15650713730
    &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
    &gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; Email:
    &gt; &amp;gt; libenc...@gmail.com; libenc...@pku.edu.cn
    &gt;
    &gt;
    &gt;
    &gt; --
    &gt;
    &gt; Benchao Li
    &gt; School of Electronics Engineering and Computer Science, Peking 
University
    &gt; Tel:+86-15650713730
    &gt; Email: libenc...@gmail.com; libenc...@pku.edu.cn
    
    
    
    -- 
    
    Benchao Li
    School of Electronics Engineering and Computer Science, Peking University
    Tel:+86-15650713730
    Email: libenc...@gmail.com; libenc...@pku.edu.cn

回复