??????: Flink????Join??????????????????????????????. ds1??ds2????????kafka????????????????event time??watermark??????3s????????????????????????: ??????join??????????????????????????????????????????????? ????????????????????????watermark_time>=window_endtime.????????????????1000000057000????????????????????????????????????56000????????.???????
??????????????: DataStream<String> stream1 = env .addSource(new FlinkKafkaConsumer09<String>("stream1", new SimpleStringSchema(), properties).setStartFromLatest()) .assignTimestampsAndWatermarks( new AssignerWithPeriodicWatermarks<String>() { long currentTimeStamp = 0L; long maxDelayAllowed = 0L; long currentWaterMark; @Nullable @Override public Watermark getCurrentWatermark() { currentWaterMark = currentTimeStamp-maxDelayAllowed; return new Watermark(currentWaterMark); } @Override public long extractTimestamp(String s, long l) { String[] arr= s.split(" "); long timeStamp = Long.parseLong(arr[2]); currentTimeStamp = Math.max(timeStamp, currentTimeStamp); System.out.println("currentTimeStamp: " + currentTimeStamp +",Key:" + arr[0] + ",EventTime:" + timeStamp + ",??????????????????:" + currentWaterMark); return timeStamp; } } ); DataStream<Tuple3<String,String,String>> ds1 = stream1.map(new MapFunction<String, Tuple3<String,String,String>>() { @Override public Tuple3<String, String,String> map(String s1) throws Exception { String[] arr = s1.split(" "); return Tuple3.of(arr[0],arr[1],arr[2]); } }); ds1.print(); DataStream<String> stream2 = env .addSource(new FlinkKafkaConsumer09<String>("stream2", new SimpleStringSchema(), properties).setStartFromLatest()) .assignTimestampsAndWatermarks( new AssignerWithPeriodicWatermarks<String>() { long currentTimeStamp = 0L; long maxDelayAllowed = 0L; long currentWaterMark; @Nullable @Override public Watermark getCurrentWatermark() { currentWaterMark = currentTimeStamp-maxDelayAllowed; return new Watermark(currentWaterMark); } @Override public long extractTimestamp(String s, long l) { String[] arr= s.split(" "); long timeStamp = Long.parseLong(arr[2]); currentTimeStamp = Math.max(timeStamp, currentTimeStamp); System.out.println("currentTimeStamp: " + currentTimeStamp +",Key:" + arr[0] + ",EventTime:" + timeStamp + ",??????????????????:" + currentWaterMark); return timeStamp; } } ); DataStream<Tuple3<String,String,String>> ds2 =stream2.map(new MapFunction<String, Tuple3<String,String,String>>() { @Override public Tuple3<String,String,String> map(String s2) throws Exception { String [] arr = s2.split(" "); return Tuple3.of(arr[0],arr[1],arr[2]); } }); ds2.print(); ds1.join(ds2).where(new KeySelector<Tuple3<String, String,String>,String>() { @Override public String getKey(Tuple3<String, String,String> value) throws Exception { return value.f0; } }).equalTo(new KeySelector<Tuple3<String, String ,String>, String>() { @Override public String getKey(Tuple3<String, String,String> value) throws Exception { return value.f0; } }) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply(new JoinFunction<Tuple3<String, String,String>, Tuple3<String, String,String>, String>() { @Override public String join(Tuple3<String, String,String> value1, Tuple3<String, String,String> value2) throws Exception { return value1.f1 + "=" + value2.f1; } }).print(); ????????: currentTimeStamp: 1000000055000,Key:1,EventTime:1000000055000,??????????????????:0 4> (1,tom1,1000000055000) currentTimeStamp: 1000000056000,Key:1,EventTime:1000000056000,??????????????????:1000000055000 4> (1,tom2,1000000056000) currentTimeStamp: 1000000055000,Key:1,EventTime:1000000055000,??????????????????:0 3> (1,jerry1,1000000055000) currentTimeStamp: 1000000056000,Key:1,EventTime:1000000056000,??????????????????:1000000055000 3> (1,jerry2,1000000056000) 2> tom1=jerry1