I don't understand how I can save the state of a window on the RichCoGroupFunction if the events arrive on the RichCoGroupFunction.coCgroup only when the window closes. Then, upon a failure I will not recover events that were on the window. This is why I think the approach to this problem is to use a CoProcessFunction where I can update the state of events arriving at CoProcessFunction.processElement1 and CoProcessFunction.processElement2.
*--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* On Wed, Jun 16, 2021 at 4:28 PM Felipe Gutierrez < felipe.o.gutier...@gmail.com> wrote: > Hi Robert, > > 1 - I am using Kafka010 as data source. > 2 - No, I am not using any kind of ListState. That I think it must be used > 3 - Good. I am going to use CheckpointedFunction. > > Just a follow-up question. I was reimplementing it using CoProcessFunction > to save the state and trigger the window. So, based on your answer I think > I am overcomplicating it. If I just use RichCoGroupFunction, save the > states on a ListState, and implement CheckpointedFunction, it will do > everything that I need. Is that correct? Then I don't have to implement the > event window trigger at onTimer(). I just use the regular window from > Flink. is that correct? > > Thanks > > *--* > *-- Felipe Gutierrez* > *-- skype: felipe.o.gutierrez* > > > On Wed, Jun 16, 2021 at 2:16 PM Robert Metzger <rmetz...@apache.org> > wrote: > >> Hi Felipe, >> >> Which data source are you using? >> >> > Then, in the MyCoGroupFunction there are only events of stream02 >> >> Are you storing events in your state? >> >> > Is this the case where I have to use RichCoGroupFunction and save the >> state by implementing the CheckpointedFunction? >> >> If you want your state to be persisted with each checkpoint, and >> recovered after a failure, ye . >> >> On Tue, Jun 15, 2021 at 6:18 PM Felipe Gutierrez < >> felipe.o.gutier...@gmail.com> wrote: >> >>> Hi, >>> >>> I have a problem on my stream pipeline where the events on a >>> CoGroupFunction are not restored after the application crashes. The >>> application is like this: >>> >>> stream01.coGroup(stream02) >>> .where(...).equalTo(...) >>> .window(TumblingEventTimeWindows.of(1 minute)) >>> .apply(new MyCoGroupFunction()) >>> .process(new MyProcessFunction()) >>> .sink(new MySinkFunction) >>> >>> The checkpoint is configured to 20 seconds and the window is of 1 >>> minute. I follow this sequence to reproduce the error: >>> 1 - send 6 events to stream01 >>> 2 - after 25 seconds I send an event to make the application crash >>> 3 - at this meantime the application recovers >>> 4 - after 25 seconds I send 6 events to stream02 >>> >>> Then, in the MyCoGroupFunction there are only events of stream02. Is >>> this the case where I have to use RichCoGroupFunction and save the state by >>> implementing the CheckpointedFunction? I am confused because >>> the CoGroupFunction.coGroup() method is called only when the Window closes >>> and then I see the output stream events of this operator. That is when >>> the Collector.collect() is called. >>> >>> What I think is that the events are held in memory and when the window >>> closes the CoGroupFunction.coGroup() is called. So I have to snapshot the >>> state in an operator before the CoGroupFunction. Is that correct? In case >>> anyone have a toy example of it (CoGroupFunction with Checkpoint and >>> testing it in a unit test) could you please send me the link? >>> >>> Thanks, >>> Felipe >>> >>>