下发新的?
4是一个kafka的source,3是另外一个kafka的source. 如果按照3秒的一个窗口 watermark触发窗口的条件是watermark_time>=window_endtime 也就是说[55000,57000)应该是一个窗口的. 我是这么理解的,但是结果56000后就输出了 currentTimeStamp: 1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0 4> (1,tom1,1000000055000) currentTimeStamp: 1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000 4> (1,tom2,1000000056000) currentTimeStamp: 1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0 3> (1,jerry1,1000000055000) currentTimeStamp: 1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000 3> (1,jerry2,1000000056000) 2> tom1=jerry1 ------------------ 原始邮件 ------------------ 发件人: "lee.roval"<lee.ro...@gmail.com>; 发送时间: 2020年4月4日(星期六) 晚上6:10 收件人: "user-zh@flink.apache.org"<user-zh@flink.apache.org>; 主题: Re: Flink双流Join问题 56000后不是下发新的watermark了嘛 在 2020/4/4 下午5:57,“忝忝向仧”<153488...@qq.com> 写入: 各位好:&nbsp; &nbsp; &nbsp; &nbsp; Flink双流Join遇到一个问题,能否解释下,谢谢. &nbsp; &nbsp; &nbsp; &nbsp; ds1和ds2分别读取kafka两个流数据,使用event time和watermark特性,3s的一个翻滚窗口,定义如下: &nbsp; &nbsp; &nbsp; &nbsp; 最后,join输出的时候,为什么触发窗口的数据第二条就触发了? &nbsp; &nbsp; &nbsp; &nbsp; 按照水印的触发条件应该是watermark_time&gt;=window_endtime.那么,这里应该是1000000057000这条数据来了后才会触发,但是结果却是56000就触发了.为什么? 定义的代码如下: DataStream<String&gt; stream1 = env .addSource(new FlinkKafkaConsumer09<String&gt;("stream1", new SimpleStringSchema(), properties).setStartFromLatest()) .assignTimestampsAndWatermarks( new AssignerWithPeriodicWatermarks<String&gt;() { long currentTimeStamp = 0L; long maxDelayAllowed = 0L; long currentWaterMark; @Nullable @Override public Watermark getCurrentWatermark() { currentWaterMark = currentTimeStamp-maxDelayAllowed; return new Watermark(currentWaterMark); } @Override public long extractTimestamp(String s, long l) { String[] arr= s.split(" "); long timeStamp = Long.parseLong(arr[2]); currentTimeStamp = Math.max(timeStamp, currentTimeStamp); System.out.println("currentTimeStamp: " + currentTimeStamp +",Key:" + arr[0] + ",EventTime:" + timeStamp + ",前一条数据的水位线:" + currentWaterMark); return timeStamp; } } ); DataStream<Tuple3<String,String,String&gt;&gt; ds1 = stream1.map(new MapFunction<String, Tuple3<String,String,String&gt;&gt;() { @Override public Tuple3<String, String,String&gt; map(String s1) throws Exception { String[] arr = s1.split(" "); return Tuple3.of(arr[0],arr[1],arr[2]); } }); ds1.print(); DataStream<String&gt; stream2 = env .addSource(new FlinkKafkaConsumer09<String&gt;("stream2", new SimpleStringSchema(), properties).setStartFromLatest()) .assignTimestampsAndWatermarks( new AssignerWithPeriodicWatermarks<String&gt;() { long currentTimeStamp = 0L; long maxDelayAllowed = 0L; long currentWaterMark; @Nullable @Override public Watermark getCurrentWatermark() { currentWaterMark = currentTimeStamp-maxDelayAllowed; return new Watermark(currentWaterMark); } @Override public long extractTimestamp(String s, long l) { String[] arr= s.split(" "); long timeStamp = Long.parseLong(arr[2]); currentTimeStamp = Math.max(timeStamp, currentTimeStamp); System.out.println("currentTimeStamp: " + currentTimeStamp +",Key:" + arr[0] + ",EventTime:" + timeStamp + ",前一条数据的水位线:" + currentWaterMark); return timeStamp; } } ); DataStream<Tuple3<String,String,String&gt;&gt; ds2 =stream2.map(new MapFunction<String, Tuple3<String,String,String&gt;&gt;() { @Override public Tuple3<String,String,String&gt; map(String s2) throws Exception { String [] arr = s2.split(" "); return Tuple3.of(arr[0],arr[1],arr[2]); } }); ds2.print(); ds1.join(ds2).where(new KeySelector<Tuple3<String, String,String&gt;,String&gt;() { @Override public String getKey(Tuple3<String, String,String&gt; value) throws Exception { return value.f0; } }).equalTo(new KeySelector<Tuple3<String, String ,String&gt;, String&gt;() { @Override public String getKey(Tuple3<String, String,String&gt; value) throws Exception { return value.f0; } }) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply(new JoinFunction<Tuple3<String, String,String&gt;, Tuple3<String, String,String&gt;, String&gt;() { @Override public String join(Tuple3<String, String,String&gt; value1, Tuple3<String, String,String&gt; value2) throws Exception { return value1.f1 + "=" + value2.f1; } }).print(); 结果如下: currentTimeStamp: 1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0 4&gt; (1,tom1,1000000055000) currentTimeStamp: 1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000 4&gt; (1,tom2,1000000056000) currentTimeStamp: 1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0 3&gt; (1,jerry1,1000000055000) currentTimeStamp: 1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000 3&gt; (1,jerry2,1000000056000) 2&gt; tom1=jerry1