Hello,everyone,
I am confused about the window of join/coGroup operator in Batch mode.
Here is my demo code, and it works fine for me at present. I wonder if
this approach that using process time window in batch mode is
appropriate? and does this approach have any problems? I want to use
this solution to solve my problem(join two stream in batch mode).
```java
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> s1 = env.fromCollection(Stream.of(1, 2, 3,
4, 5, 6, 7).collect(Collectors.toList()));
DataStream<Integer> s2 = env.fromCollection(Stream.of(6, 5, 4,
3, 2, 1).collect(Collectors.toList()));
s1.coGroup(s2)
.where(new KeySelector<Integer, Integer>() {
@Override
public Integer getKey(Integer value) throws Exception {
return value;
}
})
.equalTo(new KeySelector<Integer, Integer>() {
@Override
public Integer getKey(Integer value) throws Exception {
return value;
}
}).window(TumblingProcessingTimeWindows.of(Time.days(1)))
.apply(new CoGroupFunction<Integer, Integer,
Tuple2<Integer, Integer>>() {
@Override
public void coGroup(Iterable<Integer> first,
Iterable<Integer> second, Collector<Tuple2<Integer, Integer>> out)
throws Exception {
if (!second.iterator().hasNext()) {
for (Integer integer : first) {
out.collect(new Tuple2<>(integer, null));
}
} else {
for (Integer integer : first) {
for (Integer integer1 : second) {
out.collect(new Tuple2<>(integer,
integer1));
}
}
}
}
}).printToErr();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.execute();
}
```
Thanks in advance.
--
Jiadong Lu