Hi Jiadong

Using the process time window in Batch jobs may be a little strange for
me. I prefer to partition the data according to the day level, and then the
Batch job reads data from different partitions instead of using Window.

Best,
Shammon FY

On Wed, Apr 26, 2023 at 12:03 PM Jiadong Lu <archzi...@gmail.com> wrote:

> Hi, Shammon,
> Thank you for your reply.
>
> Yes, the window configured with `Time.days(1)` has no special meaning,
> it is just used to group all data into the same global window.
> I tried using `GlobalWindow` for this scenario, but `GlobalWindow` also
> need a `Trigger` like
> `org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger`
> to tigger all data for window process.
>
> So I think `ProcessingTimeWindow` with `Time.days(10)` may be a good
> solution for this scenario. What do you think?
>
> As for what you mentioned
>  > use join directly
> I have no idea about using join without window. Would you mind writing a
> demo about it?
>
> Your help is greatly appreciated in advance.
>
> Best,
> Jiadong Lu
>
> On 2023/4/26 09:53, Shammon FY wrote:
> > Hi Jiadong,
> >
> > I think it depends on the specific role of the window here for you. If
> > this window has no specific business meaning and is only used for
> > performance optimization, maybe you can consider to use join directly
> >
> > Best,
> > Shammon FY
> >
> > On Tue, Apr 25, 2023 at 5:42 PM Jiadong Lu <archzi...@gmail.com
> > <mailto:archzi...@gmail.com>> wrote:
> >
> >     Hello,everyone,
> >
> >     I am confused about the window of join/coGroup operator in Batch
> mode.
> >     Here is my demo code, and it works fine for me at present. I wonder
> if
> >     this approach that using process time window in batch mode is
> >     appropriate? and does this approach have any problems? I want to use
> >     this solution to solve my problem(join two stream in batch mode).
> >
> >     ```java
> >     public static void main(String[] args) throws Exception {
> >
> >               StreamExecutionEnvironment env =
> >     StreamExecutionEnvironment.getExecutionEnvironment();
> >
> >               DataStream<Integer> s1 = env.fromCollection(Stream.of(1,
> >     2, 3,
> >     4, 5, 6, 7).collect(Collectors.toList()));
> >               DataStream<Integer> s2 = env.fromCollection(Stream.of(6,
> >     5, 4,
> >     3, 2, 1).collect(Collectors.toList()));
> >
> >               s1.coGroup(s2)
> >                       .where(new KeySelector<Integer, Integer>() {
> >                           @Override
> >                           public Integer getKey(Integer value) throws
> >     Exception {
> >                               return value;
> >                           }
> >                       })
> >                       .equalTo(new KeySelector<Integer, Integer>() {
> >                           @Override
> >                           public Integer getKey(Integer value) throws
> >     Exception {
> >                               return value;
> >                           }
> >
> >       }).window(TumblingProcessingTimeWindows.of(Time.days(1)))
> >                       .apply(new CoGroupFunction<Integer, Integer,
> >     Tuple2<Integer, Integer>>() {
> >                           @Override
> >                           public void coGroup(Iterable<Integer> first,
> >     Iterable<Integer> second, Collector<Tuple2<Integer, Integer>> out)
> >     throws Exception {
> >                               if (!second.iterator().hasNext()) {
> >                                   for (Integer integer : first) {
> >                                       out.collect(new Tuple2<>(integer,
> >     null));
> >                                   }
> >                               } else {
> >                                   for (Integer integer : first) {
> >                                       for (Integer integer1 : second) {
> >                                           out.collect(new
> Tuple2<>(integer,
> >     integer1));
> >                                       }
> >                                   }
> >                               }
> >                           }
> >                       }).printToErr();
> >               env.setParallelism(1);
> >               env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> >               env.execute();
> >           }
> >     ```
> >
> >     Thanks in advance.
> >
> >     --
> >     Jiadong Lu
> >
>

Reply via email to