Re: How to make onTimer() trigger on a CoProcessFunction after a failure?
Sorry for the delayed response but I'm glad to hear you have solved the problem. Piotrek czw., 24 cze 2021 o 10:55 Felipe Gutierrez napisał(a): > So, just an update. > > When I used this code (My stateful watermark) on the original application > it seems that I can recover the latest watermark and further process the > join with stuck events on it. > I don't even have to create MyCoProcessFunction to implement a low-level > join. The available .coGroup(MyCoGroupFunction) works as a charm. > > Thank you again for the clarifications! > Felipe > > On Mon, Jun 21, 2021 at 5:18 PM Felipe Gutierrez < > felipe.o.gutier...@gmail.com> wrote: > >> Hello Piotr, >> >> Could you please help me to ensure that I am implementing it in the >> correct way? >> >> I created the WatermarkFunction [1] based on the FilterFunction from >> Flink and the WatermarkStreamOperator [2] and I am doing unit test [3]. >> Then there are things that I am not sure how to do. >> >> How to make the ListState singleton on all parallel operators? >> >> When my job restarts I don't even have to call "processWatermark(new >> Watermark(maxWatermark));" on the end of the "initializeState()". I can see >> that the job process the previous watermarks before it fails. Is it because >> the source is one that I created at the end of the unit test "MySource"? Or >> is it because I don't have a join on the stream pipeline? I have the output >> of my unit test below at this message in case you are not able to runt the >> test. >> >> [1] >> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkFunction.java >> [2] >> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperator.java >> [3] >> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperatorTest.java#L113 >> >> $ cd explore-flink/docker/ops-playground-image/java/explore-flink/ >> $ mvn -Dtest=WatermarkStreamOperatorTest#testRestartWithLatestWatermark >> test >> >> WatermarkStreamOperator.initializeState >> WatermarkStreamOperator.initializeState >> WatermarkStreamOperator.initializeState >> WatermarkStreamOperator.initializeState >> initializeState... 0 >> initializeState... 0 >> initializeState... 0 >> initializeState... 0 >> maxWatermark: 0 >> maxWatermark: 0 >> maxWatermark: 0 >> maxWatermark: 0 >> processing watermark: 0 >> processing watermark: 0 >> processing watermark: 0 >> processing watermark: 0 >> processing watermark: 0 >> processing watermark: 0 >> processing watermark: 0 >> processing watermark: 0 >> Attempts restart: 0 >> processing watermark: 1 >> processing watermark: 1 >> processing watermark: 1 >> processing watermark: 1 >> Attempts restart: 0 >> processing watermark: 2 >> processing watermark: 2 >> processing watermark: 2 >> processing watermark: 2 >> Attempts restart: 0 >> processing watermark: 3 >> processing watermark: 3 >> processing watermark: 3 >> processing watermark: 3 >> Attempts restart: 0 >> processing watermark: 9223372036854775807 >> processing watermark: 9223372036854775807 >> processing watermark: 9223372036854775807 >> processing watermark: 9223372036854775807 >> This exception will trigger until the reference time [2021-06-21 >> 16:57:19.531] reaches the trigger time [2021-06-21 16:57:21.672] // HERE >> THE JOB IS RESTARTING >> initializeState... 1 >> initializeState... 1 >> initializeState... 1 >> WatermarkStreamOperator.initializeState >> WatermarkStreamOperator.initializeState >> WatermarkStreamOperator.initializeState >> WatermarkStreamOperator.initializeState >> watermarkList recovered: 0 >> watermarkList recovered: 0 >> watermarkList recovered: 0 >> watermarkList recovered: 0 >> watermarkList recovered: 0 >> watermarkList recovered: 1 >> watermarkList recovered: 2 >> initializeState... 1 >> maxWatermark: 2 // HERE IS THE LATEST WATERMARK >> processing watermark: 2 // I PROCESS IT HERE >> watermarkList recovered: 0 >> watermarkList recovered: 1 >> watermarkList recovered: 0 >> watermarkList recovered: 0 >> watermarkList recovered: 1 >> watermarkList recovered: 1 >> watermarkList recovered: 2 >> watermarkList recovered: 2 >> watermarkList recovered: 2 >> maxWatermark: 2 >> maxWatermark: 2 >> processing watermark: 2 >> processing watermark: 2 >> maxWatermark: 2 >> processing watermark: 2 >> processing watermark: 0 // IS IS ALSO PROCESSING THE OTHER WATERMARKS. >> WHY? >> processing watermark: 0 >> processing watermark: 0 >> processing watermark: 0 >> Attempts restart: 1 >> processing watermark: 1 >> processing watermark: 1 >> processing watermark: 1 >> processing watermark: 1 >> Attempts restart: 1 >> processing watermark: 2 >> processing watermark: 2 >> process
Re: How to make onTimer() trigger on a CoProcessFunction after a failure?
So, just an update. When I used this code (My stateful watermark) on the original application it seems that I can recover the latest watermark and further process the join with stuck events on it. I don't even have to create MyCoProcessFunction to implement a low-level join. The available .coGroup(MyCoGroupFunction) works as a charm. Thank you again for the clarifications! Felipe On Mon, Jun 21, 2021 at 5:18 PM Felipe Gutierrez < felipe.o.gutier...@gmail.com> wrote: > Hello Piotr, > > Could you please help me to ensure that I am implementing it in the > correct way? > > I created the WatermarkFunction [1] based on the FilterFunction from Flink > and the WatermarkStreamOperator [2] and I am doing unit test [3]. Then > there are things that I am not sure how to do. > > How to make the ListState singleton on all parallel operators? > > When my job restarts I don't even have to call "processWatermark(new > Watermark(maxWatermark));" on the end of the "initializeState()". I can see > that the job process the previous watermarks before it fails. Is it because > the source is one that I created at the end of the unit test "MySource"? Or > is it because I don't have a join on the stream pipeline? I have the output > of my unit test below at this message in case you are not able to runt the > test. > > [1] > https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkFunction.java > [2] > https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperator.java > [3] > https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperatorTest.java#L113 > > $ cd explore-flink/docker/ops-playground-image/java/explore-flink/ > $ mvn -Dtest=WatermarkStreamOperatorTest#testRestartWithLatestWatermark > test > > WatermarkStreamOperator.initializeState > WatermarkStreamOperator.initializeState > WatermarkStreamOperator.initializeState > WatermarkStreamOperator.initializeState > initializeState... 0 > initializeState... 0 > initializeState... 0 > initializeState... 0 > maxWatermark: 0 > maxWatermark: 0 > maxWatermark: 0 > maxWatermark: 0 > processing watermark: 0 > processing watermark: 0 > processing watermark: 0 > processing watermark: 0 > processing watermark: 0 > processing watermark: 0 > processing watermark: 0 > processing watermark: 0 > Attempts restart: 0 > processing watermark: 1 > processing watermark: 1 > processing watermark: 1 > processing watermark: 1 > Attempts restart: 0 > processing watermark: 2 > processing watermark: 2 > processing watermark: 2 > processing watermark: 2 > Attempts restart: 0 > processing watermark: 3 > processing watermark: 3 > processing watermark: 3 > processing watermark: 3 > Attempts restart: 0 > processing watermark: 9223372036854775807 > processing watermark: 9223372036854775807 > processing watermark: 9223372036854775807 > processing watermark: 9223372036854775807 > This exception will trigger until the reference time [2021-06-21 > 16:57:19.531] reaches the trigger time [2021-06-21 16:57:21.672] // HERE > THE JOB IS RESTARTING > initializeState... 1 > initializeState... 1 > initializeState... 1 > WatermarkStreamOperator.initializeState > WatermarkStreamOperator.initializeState > WatermarkStreamOperator.initializeState > WatermarkStreamOperator.initializeState > watermarkList recovered: 0 > watermarkList recovered: 0 > watermarkList recovered: 0 > watermarkList recovered: 0 > watermarkList recovered: 0 > watermarkList recovered: 1 > watermarkList recovered: 2 > initializeState... 1 > maxWatermark: 2 // HERE IS THE LATEST WATERMARK > processing watermark: 2 // I PROCESS IT HERE > watermarkList recovered: 0 > watermarkList recovered: 1 > watermarkList recovered: 0 > watermarkList recovered: 0 > watermarkList recovered: 1 > watermarkList recovered: 1 > watermarkList recovered: 2 > watermarkList recovered: 2 > watermarkList recovered: 2 > maxWatermark: 2 > maxWatermark: 2 > processing watermark: 2 > processing watermark: 2 > maxWatermark: 2 > processing watermark: 2 > processing watermark: 0 // IS IS ALSO PROCESSING THE OTHER WATERMARKS. WHY? > processing watermark: 0 > processing watermark: 0 > processing watermark: 0 > Attempts restart: 1 > processing watermark: 1 > processing watermark: 1 > processing watermark: 1 > processing watermark: 1 > Attempts restart: 1 > processing watermark: 2 > processing watermark: 2 > processing watermark: 2 > processing watermark: 2 > Attempts restart: 1 > processing watermark: 3 > processing watermark: 3 > processing watermark: 3 > processing watermark: 3 > Attempts restart: 1 > processing watermark: 9223372036854775807 > processing watermark: 9223372036854775807 > processing wat
Re: How to make onTimer() trigger on a CoProcessFunction after a failure?
Hello Piotr, Could you please help me to ensure that I am implementing it in the correct way? I created the WatermarkFunction [1] based on the FilterFunction from Flink and the WatermarkStreamOperator [2] and I am doing unit test [3]. Then there are things that I am not sure how to do. How to make the ListState singleton on all parallel operators? When my job restarts I don't even have to call "processWatermark(new Watermark(maxWatermark));" on the end of the "initializeState()". I can see that the job process the previous watermarks before it fails. Is it because the source is one that I created at the end of the unit test "MySource"? Or is it because I don't have a join on the stream pipeline? I have the output of my unit test below at this message in case you are not able to runt the test. [1] https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkFunction.java [2] https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperator.java [3] https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperatorTest.java#L113 $ cd explore-flink/docker/ops-playground-image/java/explore-flink/ $ mvn -Dtest=WatermarkStreamOperatorTest#testRestartWithLatestWatermark test WatermarkStreamOperator.initializeState WatermarkStreamOperator.initializeState WatermarkStreamOperator.initializeState WatermarkStreamOperator.initializeState initializeState... 0 initializeState... 0 initializeState... 0 initializeState... 0 maxWatermark: 0 maxWatermark: 0 maxWatermark: 0 maxWatermark: 0 processing watermark: 0 processing watermark: 0 processing watermark: 0 processing watermark: 0 processing watermark: 0 processing watermark: 0 processing watermark: 0 processing watermark: 0 Attempts restart: 0 processing watermark: 1 processing watermark: 1 processing watermark: 1 processing watermark: 1 Attempts restart: 0 processing watermark: 2 processing watermark: 2 processing watermark: 2 processing watermark: 2 Attempts restart: 0 processing watermark: 3 processing watermark: 3 processing watermark: 3 processing watermark: 3 Attempts restart: 0 processing watermark: 9223372036854775807 processing watermark: 9223372036854775807 processing watermark: 9223372036854775807 processing watermark: 9223372036854775807 This exception will trigger until the reference time [2021-06-21 16:57:19.531] reaches the trigger time [2021-06-21 16:57:21.672] // HERE THE JOB IS RESTARTING initializeState... 1 initializeState... 1 initializeState... 1 WatermarkStreamOperator.initializeState WatermarkStreamOperator.initializeState WatermarkStreamOperator.initializeState WatermarkStreamOperator.initializeState watermarkList recovered: 0 watermarkList recovered: 0 watermarkList recovered: 0 watermarkList recovered: 0 watermarkList recovered: 0 watermarkList recovered: 1 watermarkList recovered: 2 initializeState... 1 maxWatermark: 2 // HERE IS THE LATEST WATERMARK processing watermark: 2 // I PROCESS IT HERE watermarkList recovered: 0 watermarkList recovered: 1 watermarkList recovered: 0 watermarkList recovered: 0 watermarkList recovered: 1 watermarkList recovered: 1 watermarkList recovered: 2 watermarkList recovered: 2 watermarkList recovered: 2 maxWatermark: 2 maxWatermark: 2 processing watermark: 2 processing watermark: 2 maxWatermark: 2 processing watermark: 2 processing watermark: 0 // IS IS ALSO PROCESSING THE OTHER WATERMARKS. WHY? processing watermark: 0 processing watermark: 0 processing watermark: 0 Attempts restart: 1 processing watermark: 1 processing watermark: 1 processing watermark: 1 processing watermark: 1 Attempts restart: 1 processing watermark: 2 processing watermark: 2 processing watermark: 2 processing watermark: 2 Attempts restart: 1 processing watermark: 3 processing watermark: 3 processing watermark: 3 processing watermark: 3 Attempts restart: 1 processing watermark: 9223372036854775807 processing watermark: 9223372036854775807 processing watermark: 9223372036854775807 processing watermark: 9223372036854775807 This is a poison but we do NOT throw an exception because the reference time passed :) [2021-06-21 16:57:22.849] >= [2021-06-21 16:57:21.672] Attempts restart: 1 Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 6.836 sec On Fri, Jun 18, 2021 at 2:46 PM Piotr Nowojski wrote: > I'm glad I could help, I hope it will solve your problem :) > > Best, > Piotrek > > pt., 18 cze 2021 o 14:38 Felipe Gutierrez > napisał(a): > >> On Fri, Jun 18, 2021 at 1:41 PM Piotr Nowojski >> wrote: >> >>> Hi, >>> >>> Keep in mind that this is a quite low level approach to this problem. It >>> would be much better to make sure that after recovery wate
Re: How to make onTimer() trigger on a CoProcessFunction after a failure?
I'm glad I could help, I hope it will solve your problem :) Best, Piotrek pt., 18 cze 2021 o 14:38 Felipe Gutierrez napisał(a): > On Fri, Jun 18, 2021 at 1:41 PM Piotr Nowojski > wrote: > >> Hi, >> >> Keep in mind that this is a quite low level approach to this problem. It >> would be much better to make sure that after recovery watermarks are still >> being emitted. >> > > yes. Indeed it looks like a very low level. I did a small test to emit one > watermark for the stream that was recovered and then it can process > the join. It has the same behavior on using a CoGroupFunction nad a > CoProcessFunction. So in the end I don't need to implement > MyCoProcessFunction with checkpoint. I just need to emit a new watermark > after the job recovers. > > In my case, I am using Kafka source. so, if I make Kafka keeping emitting > watermarks I solve the problem. Otherwise, I have to implement this custom > operator. > > Thanks for your answer! > Felipe > > >> >> If you are using a built-in source, it's probably easier to do it in a >> custom operator. I would try to implement a custom one based on >> AbstractStreamOperator. Your class would also need to implement the >> OneInputStreamOperator interface. `processElement` you could implement as >> an identity function (just pass down the stream element unchanged). In >> `processWatermark` you would need to store the latest watermark on the >> `ListState` field (you can declare it inside >> `AbstractStreamOperator#initializeState` via `context.getListState(new >> ListStateDescriptor<>("your-field-name", Long.class));`). During normal >> processing (`processWatermark`) make sure it's a singleton list. During >> recovery (`AbstractStreamOpeartor#initializeState()`) without rescaling, >> you would just access this state field and re-emit the only element on that >> list. However during recovery, depending if you are scaling up (a) or down >> (b), you could have a case where you sometimes have either (a) empty list >> (in that case you can not emit anything), or (b) many elements on the list >> (in that case you would need to calculate a minimum of all elements). >> >> As operator API is not a very oficial one, it's not well documented. For >> an example you would need to take a look in the Flink code itself by >> finding existing implementations of the `AbstractStreamOperator` or >> `OneInputStreamOperator`. >> >> Best, >> Piotrek >> >> pt., 18 cze 2021 o 12:49 Felipe Gutierrez >> napisał(a): >> >>> Hello Piotrek, >>> >>> On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski >>> wrote: >>> Hi, As far as I can tell timers should be checkpointed and recovered. What may be happening is that the state of the last seen watermarks by operators on different inputs and different channels inside an input is not persisted. Flink is assuming that after the restart, watermark assigners will emit newer watermarks after the recovery. However if one of your inputs is dormant and it has already emitted some very high watermark long time before the failure, after recovery if no new watermark is emitted, this input/input channel might be preventing timers from firing. Can you check if that's what's happening in your case? >>> >>> I think you are correct. at least when I reproduce the bug it is like >>> you said. >>> >>> If so you would have to make sure one way or another that some watermarks will be emitted after recovery. As a last resort, you could manually store the watermarks in the operators/sources state and re-emit last seen watermark during recovery. >>> >>> Could you please point how I can checkpoint the watermarks on a source >>> operator? Is it done by this code below from here ( >>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector >>> )? >>> >>> FlinkKafkaConsumer kafkaSource = new >>> FlinkKafkaConsumer<>("myTopic", schema, props); >>> kafkaSource.assignTimestampsAndWatermarks( >>> WatermarkStrategy. >>> .forBoundedOutOfOrderness(Duration.ofSeconds(20))); >>> >>> Thanks, >>> Felipe >>> >>> Best, Piotrek czw., 17 cze 2021 o 13:46 Felipe Gutierrez < felipe.o.gutier...@gmail.com> napisał(a): > Hi community, > > I have implemented a join function using CoProcessFunction with > CheckpointedFunction to recover from failures. I added some debug lines to > check if it is restoring and it does. Before the crash, I process events > that fall at processElement2. I create snapshots at snapshotState(), the > application comes back and restores the events. That is fine. > > After the restore, I process events that fall on processElement1. I > register event timers for them as I did on processElement2 before the > crash. But the onTimer() is never called. The point is that I don't have > any event
Re: How to make onTimer() trigger on a CoProcessFunction after a failure?
On Fri, Jun 18, 2021 at 1:41 PM Piotr Nowojski wrote: > Hi, > > Keep in mind that this is a quite low level approach to this problem. It > would be much better to make sure that after recovery watermarks are still > being emitted. > yes. Indeed it looks like a very low level. I did a small test to emit one watermark for the stream that was recovered and then it can process the join. It has the same behavior on using a CoGroupFunction nad a CoProcessFunction. So in the end I don't need to implement MyCoProcessFunction with checkpoint. I just need to emit a new watermark after the job recovers. In my case, I am using Kafka source. so, if I make Kafka keeping emitting watermarks I solve the problem. Otherwise, I have to implement this custom operator. Thanks for your answer! Felipe > > If you are using a built-in source, it's probably easier to do it in a > custom operator. I would try to implement a custom one based on > AbstractStreamOperator. Your class would also need to implement the > OneInputStreamOperator interface. `processElement` you could implement as > an identity function (just pass down the stream element unchanged). In > `processWatermark` you would need to store the latest watermark on the > `ListState` field (you can declare it inside > `AbstractStreamOperator#initializeState` via `context.getListState(new > ListStateDescriptor<>("your-field-name", Long.class));`). During normal > processing (`processWatermark`) make sure it's a singleton list. During > recovery (`AbstractStreamOpeartor#initializeState()`) without rescaling, > you would just access this state field and re-emit the only element on that > list. However during recovery, depending if you are scaling up (a) or down > (b), you could have a case where you sometimes have either (a) empty list > (in that case you can not emit anything), or (b) many elements on the list > (in that case you would need to calculate a minimum of all elements). > > As operator API is not a very oficial one, it's not well documented. For > an example you would need to take a look in the Flink code itself by > finding existing implementations of the `AbstractStreamOperator` or > `OneInputStreamOperator`. > > Best, > Piotrek > > pt., 18 cze 2021 o 12:49 Felipe Gutierrez > napisał(a): > >> Hello Piotrek, >> >> On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski >> wrote: >> >>> Hi, >>> >>> As far as I can tell timers should be checkpointed and recovered. What >>> may be happening is that the state of the last seen watermarks by operators >>> on different inputs and different channels inside an input is not >>> persisted. Flink is assuming that after the restart, watermark assigners >>> will emit newer watermarks after the recovery. However if one of your >>> inputs is dormant and it has already emitted some very high watermark long >>> time before the failure, after recovery if no new watermark is emitted, >>> this input/input channel might be preventing timers from firing. Can you >>> check if that's what's happening in your case? >>> >> >> I think you are correct. at least when I reproduce the bug it is like you >> said. >> >> >>> If so you would have to make sure one way or another that some >>> watermarks will be emitted after recovery. As a last resort, you could >>> manually store the watermarks in the operators/sources state and re-emit >>> last seen watermark during recovery. >>> >> >> Could you please point how I can checkpoint the watermarks on a source >> operator? Is it done by this code below from here ( >> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector >> )? >> >> FlinkKafkaConsumer kafkaSource = new >> FlinkKafkaConsumer<>("myTopic", schema, props); >> kafkaSource.assignTimestampsAndWatermarks( >> WatermarkStrategy. >> .forBoundedOutOfOrderness(Duration.ofSeconds(20))); >> >> Thanks, >> Felipe >> >> >>> >>> Best, >>> Piotrek >>> >>> czw., 17 cze 2021 o 13:46 Felipe Gutierrez >>> napisał(a): >>> Hi community, I have implemented a join function using CoProcessFunction with CheckpointedFunction to recover from failures. I added some debug lines to check if it is restoring and it does. Before the crash, I process events that fall at processElement2. I create snapshots at snapshotState(), the application comes back and restores the events. That is fine. After the restore, I process events that fall on processElement1. I register event timers for them as I did on processElement2 before the crash. But the onTimer() is never called. The point is that I don't have any events to send to processElement2() to make the CoProcessFunction register a time for them. They were sent before the crash. I suppose that the onTimer() is called only when there are "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and processEleme
Re: How to make onTimer() trigger on a CoProcessFunction after a failure?
Hi, Keep in mind that this is a quite low level approach to this problem. It would be much better to make sure that after recovery watermarks are still being emitted. If you are using a built-in source, it's probably easier to do it in a custom operator. I would try to implement a custom one based on AbstractStreamOperator. Your class would also need to implement the OneInputStreamOperator interface. `processElement` you could implement as an identity function (just pass down the stream element unchanged). In `processWatermark` you would need to store the latest watermark on the `ListState` field (you can declare it inside `AbstractStreamOperator#initializeState` via `context.getListState(new ListStateDescriptor<>("your-field-name", Long.class));`). During normal processing (`processWatermark`) make sure it's a singleton list. During recovery (`AbstractStreamOpeartor#initializeState()`) without rescaling, you would just access this state field and re-emit the only element on that list. However during recovery, depending if you are scaling up (a) or down (b), you could have a case where you sometimes have either (a) empty list (in that case you can not emit anything), or (b) many elements on the list (in that case you would need to calculate a minimum of all elements). As operator API is not a very oficial one, it's not well documented. For an example you would need to take a look in the Flink code itself by finding existing implementations of the `AbstractStreamOperator` or `OneInputStreamOperator`. Best, Piotrek pt., 18 cze 2021 o 12:49 Felipe Gutierrez napisał(a): > Hello Piotrek, > > On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski > wrote: > >> Hi, >> >> As far as I can tell timers should be checkpointed and recovered. What >> may be happening is that the state of the last seen watermarks by operators >> on different inputs and different channels inside an input is not >> persisted. Flink is assuming that after the restart, watermark assigners >> will emit newer watermarks after the recovery. However if one of your >> inputs is dormant and it has already emitted some very high watermark long >> time before the failure, after recovery if no new watermark is emitted, >> this input/input channel might be preventing timers from firing. Can you >> check if that's what's happening in your case? >> > > I think you are correct. at least when I reproduce the bug it is like you > said. > > >> If so you would have to make sure one way or another that some watermarks >> will be emitted after recovery. As a last resort, you could manually store >> the watermarks in the operators/sources state and re-emit last seen >> watermark during recovery. >> > > Could you please point how I can checkpoint the watermarks on a source > operator? Is it done by this code below from here ( > https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector > )? > > FlinkKafkaConsumer kafkaSource = new > FlinkKafkaConsumer<>("myTopic", schema, props); > kafkaSource.assignTimestampsAndWatermarks( > WatermarkStrategy. > .forBoundedOutOfOrderness(Duration.ofSeconds(20))); > > Thanks, > Felipe > > >> >> Best, >> Piotrek >> >> czw., 17 cze 2021 o 13:46 Felipe Gutierrez >> napisał(a): >> >>> Hi community, >>> >>> I have implemented a join function using CoProcessFunction with >>> CheckpointedFunction to recover from failures. I added some debug lines to >>> check if it is restoring and it does. Before the crash, I process events >>> that fall at processElement2. I create snapshots at snapshotState(), the >>> application comes back and restores the events. That is fine. >>> >>> After the restore, I process events that fall on processElement1. I >>> register event timers for them as I did on processElement2 before the >>> crash. But the onTimer() is never called. The point is that I don't have >>> any events to send to processElement2() to make the CoProcessFunction >>> register a time for them. They were sent before the crash. >>> >>> I suppose that the onTimer() is called only when there are >>> "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and >>> processElement2. Because when I test the same application without crashing >>> and the CoProcessFunction triggers the onTimer() method. >>> >>> But if I have a crash in the middle the CoProcessFunction does not call >>> onTimer(). Why is that? Is that normal? What do I have to do to make the >>> CoProcessFunction trigger the onTime() method even if only one stream is >>> processed let's say at the processElement2() method and the other stream is >>> restored from a snapshot? I imagine that I have to register a time during >>> the recovery (initializeState()). But how? >>> >>> thanks, >>> Felipe >>> >>
Re: How to make onTimer() trigger on a CoProcessFunction after a failure?
Hello Piotrek, On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski wrote: > Hi, > > As far as I can tell timers should be checkpointed and recovered. What may > be happening is that the state of the last seen watermarks by operators on > different inputs and different channels inside an input is not persisted. > Flink is assuming that after the restart, watermark assigners will emit > newer watermarks after the recovery. However if one of your inputs is > dormant and it has already emitted some very high watermark long time > before the failure, after recovery if no new watermark is emitted, this > input/input channel might be preventing timers from firing. Can you check > if that's what's happening in your case? > I think you are correct. at least when I reproduce the bug it is like you said. > If so you would have to make sure one way or another that some watermarks > will be emitted after recovery. As a last resort, you could manually store > the watermarks in the operators/sources state and re-emit last seen > watermark during recovery. > Could you please point how I can checkpoint the watermarks on a source operator? Is it done by this code below from here ( https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector )? FlinkKafkaConsumer kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props); kafkaSource.assignTimestampsAndWatermarks( WatermarkStrategy. .forBoundedOutOfOrderness(Duration.ofSeconds(20))); Thanks, Felipe > > Best, > Piotrek > > czw., 17 cze 2021 o 13:46 Felipe Gutierrez > napisał(a): > >> Hi community, >> >> I have implemented a join function using CoProcessFunction with >> CheckpointedFunction to recover from failures. I added some debug lines to >> check if it is restoring and it does. Before the crash, I process events >> that fall at processElement2. I create snapshots at snapshotState(), the >> application comes back and restores the events. That is fine. >> >> After the restore, I process events that fall on processElement1. I >> register event timers for them as I did on processElement2 before the >> crash. But the onTimer() is never called. The point is that I don't have >> any events to send to processElement2() to make the CoProcessFunction >> register a time for them. They were sent before the crash. >> >> I suppose that the onTimer() is called only when there are >> "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and >> processElement2. Because when I test the same application without crashing >> and the CoProcessFunction triggers the onTimer() method. >> >> But if I have a crash in the middle the CoProcessFunction does not call >> onTimer(). Why is that? Is that normal? What do I have to do to make the >> CoProcessFunction trigger the onTime() method even if only one stream is >> processed let's say at the processElement2() method and the other stream is >> restored from a snapshot? I imagine that I have to register a time during >> the recovery (initializeState()). But how? >> >> thanks, >> Felipe >> >
Re: How to make onTimer() trigger on a CoProcessFunction after a failure?
Hi, As far as I can tell timers should be checkpointed and recovered. What may be happening is that the state of the last seen watermarks by operators on different inputs and different channels inside an input is not persisted. Flink is assuming that after the restart, watermark assigners will emit newer watermarks after the recovery. However if one of your inputs is dormant and it has already emitted some very high watermark long time before the failure, after recovery if no new watermark is emitted, this input/input channel might be preventing timers from firing. Can you check if that's what's happening in your case? If so you would have to make sure one way or another that some watermarks will be emitted after recovery. As a last resort, you could manually store the watermarks in the operators/sources state and re-emit last seen watermark during recovery. Best, Piotrek czw., 17 cze 2021 o 13:46 Felipe Gutierrez napisał(a): > Hi community, > > I have implemented a join function using CoProcessFunction with > CheckpointedFunction to recover from failures. I added some debug lines to > check if it is restoring and it does. Before the crash, I process events > that fall at processElement2. I create snapshots at snapshotState(), the > application comes back and restores the events. That is fine. > > After the restore, I process events that fall on processElement1. I > register event timers for them as I did on processElement2 before the > crash. But the onTimer() is never called. The point is that I don't have > any events to send to processElement2() to make the CoProcessFunction > register a time for them. They were sent before the crash. > > I suppose that the onTimer() is called only when there are > "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and > processElement2. Because when I test the same application without crashing > and the CoProcessFunction triggers the onTimer() method. > > But if I have a crash in the middle the CoProcessFunction does not call > onTimer(). Why is that? Is that normal? What do I have to do to make the > CoProcessFunction trigger the onTime() method even if only one stream is > processed let's say at the processElement2() method and the other stream is > restored from a snapshot? I imagine that I have to register a time during > the recovery (initializeState()). But how? > > thanks, > Felipe >
How to make onTimer() trigger on a CoProcessFunction after a failure?
Hi community, I have implemented a join function using CoProcessFunction with CheckpointedFunction to recover from failures. I added some debug lines to check if it is restoring and it does. Before the crash, I process events that fall at processElement2. I create snapshots at snapshotState(), the application comes back and restores the events. That is fine. After the restore, I process events that fall on processElement1. I register event timers for them as I did on processElement2 before the crash. But the onTimer() is never called. The point is that I don't have any events to send to processElement2() to make the CoProcessFunction register a time for them. They were sent before the crash. I suppose that the onTimer() is called only when there are "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and processElement2. Because when I test the same application without crashing and the CoProcessFunction triggers the onTimer() method. But if I have a crash in the middle the CoProcessFunction does not call onTimer(). Why is that? Is that normal? What do I have to do to make the CoProcessFunction trigger the onTime() method even if only one stream is processed let's say at the processElement2() method and the other stream is restored from a snapshot? I imagine that I have to register a time during the recovery (initializeState()). But how? thanks, Felipe