Hi, Shammon,
Thank you for your reply.
Yes, the window configured with `Time.days(1)` has no special meaning,
it is just used to group all data into the same global window.
I tried using `GlobalWindow` for this scenario, but `GlobalWindow` also
need a `Trigger` like
`org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger`
to tigger all data for window process.
So I think `ProcessingTimeWindow` with `Time.days(10)` may be a good
solution for this scenario. What do you think?
As for what you mentioned
> use join directly
I have no idea about using join without window. Would you mind writing a
demo about it?
Your help is greatly appreciated in advance.
Best,
Jiadong Lu
On 2023/4/26 09:53, Shammon FY wrote:
Hi Jiadong,
I think it depends on the specific role of the window here for you. If
this window has no specific business meaning and is only used for
performance optimization, maybe you can consider to use join directly
Best,
Shammon FY
On Tue, Apr 25, 2023 at 5:42 PM Jiadong Lu <archzi...@gmail.com
<mailto:archzi...@gmail.com>> wrote:
Hello,everyone,
I am confused about the window of join/coGroup operator in Batch mode.
Here is my demo code, and it works fine for me at present. I wonder if
this approach that using process time window in batch mode is
appropriate? and does this approach have any problems? I want to use
this solution to solve my problem(join two stream in batch mode).
```java
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> s1 = env.fromCollection(Stream.of(1,
2, 3,
4, 5, 6, 7).collect(Collectors.toList()));
DataStream<Integer> s2 = env.fromCollection(Stream.of(6,
5, 4,
3, 2, 1).collect(Collectors.toList()));
s1.coGroup(s2)
.where(new KeySelector<Integer, Integer>() {
@Override
public Integer getKey(Integer value) throws
Exception {
return value;
}
})
.equalTo(new KeySelector<Integer, Integer>() {
@Override
public Integer getKey(Integer value) throws
Exception {
return value;
}
}).window(TumblingProcessingTimeWindows.of(Time.days(1)))
.apply(new CoGroupFunction<Integer, Integer,
Tuple2<Integer, Integer>>() {
@Override
public void coGroup(Iterable<Integer> first,
Iterable<Integer> second, Collector<Tuple2<Integer, Integer>> out)
throws Exception {
if (!second.iterator().hasNext()) {
for (Integer integer : first) {
out.collect(new Tuple2<>(integer,
null));
}
} else {
for (Integer integer : first) {
for (Integer integer1 : second) {
out.collect(new Tuple2<>(integer,
integer1));
}
}
}
}
}).printToErr();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.execute();
}
```
Thanks in advance.
--
Jiadong Lu