??????:        Flink????Join??????????????????????????????.
        ds1??ds2????????kafka????????????????event 
time??watermark??????3s????????????????????????:
        
??????join???????????????????????????????????????????????
        
????????????????????????watermark_time>=window_endtime.????????????????1000000057000????????????????????????????????????56000????????.???????



??????????????:
DataStream<String&gt; stream1 = env
        .addSource(new FlinkKafkaConsumer09<String&gt;("stream1", new 
SimpleStringSchema(), properties).setStartFromLatest())
        .assignTimestampsAndWatermarks(
                new AssignerWithPeriodicWatermarks<String&gt;() {
                    long currentTimeStamp = 0L;
                    long maxDelayAllowed = 0L;
                    long currentWaterMark;
                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        currentWaterMark = currentTimeStamp-maxDelayAllowed;
                        return new Watermark(currentWaterMark);
                    }
                    @Override
                    public long extractTimestamp(String s, long l) {
                        String[] arr= s.split(" ");
                        long timeStamp = Long.parseLong(arr[2]);
                        currentTimeStamp = Math.max(timeStamp, 
currentTimeStamp);
                        System.out.println("currentTimeStamp: " +  
currentTimeStamp +",Key:" + arr[0] + ",EventTime:" + timeStamp + 
",??????????????????:" + currentWaterMark);
                        return timeStamp;
                    }
                }
        );

DataStream<Tuple3<String,String,String&gt;&gt; ds1 = stream1.map(new 
MapFunction<String, Tuple3<String,String,String&gt;&gt;() {
    @Override
    public Tuple3<String, String,String&gt; map(String s1) throws Exception {
        String[] arr = s1.split(" ");
        return Tuple3.of(arr[0],arr[1],arr[2]);
    }
});

ds1.print();
DataStream<String&gt; stream2 = env
        .addSource(new FlinkKafkaConsumer09<String&gt;("stream2", new 
SimpleStringSchema(), properties).setStartFromLatest())
        .assignTimestampsAndWatermarks(
                new AssignerWithPeriodicWatermarks<String&gt;() {
                    long currentTimeStamp = 0L;
                    long maxDelayAllowed = 0L;
                    long currentWaterMark;
                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        currentWaterMark = currentTimeStamp-maxDelayAllowed;
                        return new Watermark(currentWaterMark);
                    }
                    @Override
                    public long extractTimestamp(String s, long l) {
                        String[] arr= s.split(" ");
                        long timeStamp = Long.parseLong(arr[2]);
                        currentTimeStamp = Math.max(timeStamp, 
currentTimeStamp);
                        System.out.println("currentTimeStamp: " +  
currentTimeStamp +",Key:" + arr[0] + ",EventTime:" + timeStamp + 
",??????????????????:" + currentWaterMark);
                        return timeStamp;
                    }
                }
        );

DataStream<Tuple3<String,String,String&gt;&gt; ds2 =stream2.map(new 
MapFunction<String, Tuple3<String,String,String&gt;&gt;() {
    @Override
    public Tuple3<String,String,String&gt; map(String s2) throws Exception {
        String [] arr = s2.split(" ");
        return Tuple3.of(arr[0],arr[1],arr[2]);
    }
});
ds2.print();
ds1.join(ds2).where(new KeySelector<Tuple3<String, 
String,String&gt;,String&gt;() {
    @Override
    public String getKey(Tuple3<String, String,String&gt; value) throws 
Exception {
        return value.f0;
    }
}).equalTo(new KeySelector<Tuple3<String, String ,String&gt;, String&gt;() {
    @Override
    public String getKey(Tuple3<String, String,String&gt; value) throws 
Exception {
        return value.f0;
    }
})
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply(new JoinFunction<Tuple3<String, String,String&gt;, Tuple3<String, 
String,String&gt;, String&gt;() {
    @Override
    public String join(Tuple3<String, String,String&gt; value1, Tuple3<String, 
String,String&gt; value2) throws Exception {
        return value1.f1 + "=" + value2.f1;
    }
}).print();
????????:
currentTimeStamp: 
1000000055000,Key:1,EventTime:1000000055000,??????????????????:0
4&gt; (1,tom1,1000000055000)
currentTimeStamp: 
1000000056000,Key:1,EventTime:1000000056000,??????????????????:1000000055000
4&gt; (1,tom2,1000000056000)
currentTimeStamp: 
1000000055000,Key:1,EventTime:1000000055000,??????????????????:0
3&gt; (1,jerry1,1000000055000)
currentTimeStamp: 
1000000056000,Key:1,EventTime:1000000056000,??????????????????:1000000055000
3&gt; (1,jerry2,1000000056000)
2&gt; tom1=jerry1

回复