Hello,everyone,

I am confused about the window of join/coGroup operator in Batch mode.
Here is my demo code, and it works fine for me at present. I wonder if this approach that using process time window in batch mode is appropriate? and does this approach have any problems? I want to use this solution to solve my problem(join two stream in batch mode).

```java
public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Integer> s1 = env.fromCollection(Stream.of(1, 2, 3, 4, 5, 6, 7).collect(Collectors.toList())); DataStream<Integer> s2 = env.fromCollection(Stream.of(6, 5, 4, 3, 2, 1).collect(Collectors.toList()));

        s1.coGroup(s2)
                .where(new KeySelector<Integer, Integer>() {
                    @Override
                    public Integer getKey(Integer value) throws Exception {
                        return value;
                    }
                })
                .equalTo(new KeySelector<Integer, Integer>() {
                    @Override
                    public Integer getKey(Integer value) throws Exception {
                        return value;
                    }
                }).window(TumblingProcessingTimeWindows.of(Time.days(1)))
.apply(new CoGroupFunction<Integer, Integer, Tuple2<Integer, Integer>>() {
                    @Override
public void coGroup(Iterable<Integer> first, Iterable<Integer> second, Collector<Tuple2<Integer, Integer>> out) throws Exception {
                        if (!second.iterator().hasNext()) {
                            for (Integer integer : first) {
                                out.collect(new Tuple2<>(integer, null));
                            }
                        } else {
                            for (Integer integer : first) {
                                for (Integer integer1 : second) {
out.collect(new Tuple2<>(integer, integer1));
                                }
                            }
                        }
                    }
                }).printToErr();
        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.execute();
    }
```

Thanks in advance.

--
Jiadong Lu

Reply via email to