Hi, Shammon,
Thank you for your reply.

Yes, the window configured with `Time.days(1)` has no special meaning,
it is just used to group all data into the same global window.
I tried using `GlobalWindow` for this scenario, but `GlobalWindow` also need a `Trigger` like `org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger` to tigger all data for window process.

So I think `ProcessingTimeWindow` with `Time.days(10)` may be a good solution for this scenario. What do you think?

As for what you mentioned
> use join directly
I have no idea about using join without window. Would you mind writing a demo about it?

Your help is greatly appreciated in advance.

Best,
Jiadong Lu

On 2023/4/26 09:53, Shammon FY wrote:
Hi Jiadong,

I think it depends on the specific role of the window here for you. If this window has no specific business meaning and is only used for performance optimization, maybe you can consider to use join directly

Best,
Shammon FY

On Tue, Apr 25, 2023 at 5:42 PM Jiadong Lu <archzi...@gmail.com <mailto:archzi...@gmail.com>> wrote:

    Hello,everyone,

    I am confused about the window of join/coGroup operator in Batch mode.
    Here is my demo code, and it works fine for me at present. I wonder if
    this approach that using process time window in batch mode is
    appropriate? and does this approach have any problems? I want to use
    this solution to solve my problem(join two stream in batch mode).

    ```java
    public static void main(String[] args) throws Exception {

              StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();

              DataStream<Integer> s1 = env.fromCollection(Stream.of(1,
    2, 3,
    4, 5, 6, 7).collect(Collectors.toList()));
              DataStream<Integer> s2 = env.fromCollection(Stream.of(6,
    5, 4,
    3, 2, 1).collect(Collectors.toList()));

              s1.coGroup(s2)
                      .where(new KeySelector<Integer, Integer>() {
                          @Override
                          public Integer getKey(Integer value) throws
    Exception {
                              return value;
                          }
                      })
                      .equalTo(new KeySelector<Integer, Integer>() {
                          @Override
                          public Integer getKey(Integer value) throws
    Exception {
                              return value;
                          }
 }).window(TumblingProcessingTimeWindows.of(Time.days(1)))
                      .apply(new CoGroupFunction<Integer, Integer,
    Tuple2<Integer, Integer>>() {
                          @Override
                          public void coGroup(Iterable<Integer> first,
    Iterable<Integer> second, Collector<Tuple2<Integer, Integer>> out)
    throws Exception {
                              if (!second.iterator().hasNext()) {
                                  for (Integer integer : first) {
                                      out.collect(new Tuple2<>(integer,
    null));
                                  }
                              } else {
                                  for (Integer integer : first) {
                                      for (Integer integer1 : second) {
                                          out.collect(new Tuple2<>(integer,
    integer1));
                                      }
                                  }
                              }
                          }
                      }).printToErr();
              env.setParallelism(1);
              env.setRuntimeMode(RuntimeExecutionMode.BATCH);
              env.execute();
          }
    ```

    Thanks in advance.

-- Jiadong Lu

Reply via email to