Re: How to make onTimer() trigger on a CoProcessFunction after a failure?

2021-06-25 Thread Piotr Nowojski
Sorry for the delayed response but I'm glad to hear you have solved the
problem.

Piotrek



czw., 24 cze 2021 o 10:55 Felipe Gutierrez 
napisał(a):

> So, just an update.
>
> When I used this code (My stateful watermark) on the original application
> it seems that I can recover the latest watermark and further process the
> join with stuck events on it.
> I don't even have to create MyCoProcessFunction to implement a low-level
> join. The available .coGroup(MyCoGroupFunction) works as a charm.
>
> Thank you again for the clarifications!
> Felipe
>
> On Mon, Jun 21, 2021 at 5:18 PM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hello Piotr,
>>
>> Could you please help me to ensure that I am implementing it in the
>> correct way?
>>
>> I created the WatermarkFunction [1] based on the FilterFunction from
>> Flink and the WatermarkStreamOperator [2] and I am doing unit test [3].
>> Then there are things that I am not sure how to do.
>>
>> How to make the ListState singleton on all parallel operators?
>>
>> When my job restarts I don't even have to call "processWatermark(new
>> Watermark(maxWatermark));" on the end of the "initializeState()". I can see
>> that the job process the previous watermarks before it fails. Is it because
>> the source is one that I created at the end of the unit test "MySource"? Or
>> is it because I don't have a join on the stream pipeline? I have the output
>> of my unit test below at this message in case you are not able to runt the
>> test.
>>
>> [1]
>> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkFunction.java
>> [2]
>> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperator.java
>> [3]
>> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperatorTest.java#L113
>>
>> $ cd explore-flink/docker/ops-playground-image/java/explore-flink/
>> $ mvn -Dtest=WatermarkStreamOperatorTest#testRestartWithLatestWatermark
>> test
>>
>> WatermarkStreamOperator.initializeState
>> WatermarkStreamOperator.initializeState
>> WatermarkStreamOperator.initializeState
>> WatermarkStreamOperator.initializeState
>> initializeState... 0
>> initializeState... 0
>> initializeState... 0
>> initializeState... 0
>> maxWatermark: 0
>> maxWatermark: 0
>> maxWatermark: 0
>> maxWatermark: 0
>> processing watermark: 0
>> processing watermark: 0
>> processing watermark: 0
>> processing watermark: 0
>> processing watermark: 0
>> processing watermark: 0
>> processing watermark: 0
>> processing watermark: 0
>> Attempts restart: 0
>> processing watermark: 1
>> processing watermark: 1
>> processing watermark: 1
>> processing watermark: 1
>> Attempts restart: 0
>> processing watermark: 2
>> processing watermark: 2
>> processing watermark: 2
>> processing watermark: 2
>> Attempts restart: 0
>> processing watermark: 3
>> processing watermark: 3
>> processing watermark: 3
>> processing watermark: 3
>> Attempts restart: 0
>> processing watermark: 9223372036854775807
>> processing watermark: 9223372036854775807
>> processing watermark: 9223372036854775807
>> processing watermark: 9223372036854775807
>> This exception will trigger until the reference time [2021-06-21
>> 16:57:19.531] reaches the trigger time [2021-06-21 16:57:21.672] // HERE
>> THE JOB IS RESTARTING
>> initializeState... 1
>> initializeState... 1
>> initializeState... 1
>> WatermarkStreamOperator.initializeState
>> WatermarkStreamOperator.initializeState
>> WatermarkStreamOperator.initializeState
>> WatermarkStreamOperator.initializeState
>> watermarkList recovered: 0
>> watermarkList recovered: 0
>> watermarkList recovered: 0
>> watermarkList recovered: 0
>> watermarkList recovered: 0
>> watermarkList recovered: 1
>> watermarkList recovered: 2
>> initializeState... 1
>> maxWatermark: 2 // HERE IS THE LATEST WATERMARK
>> processing watermark: 2 // I PROCESS IT HERE
>> watermarkList recovered: 0
>> watermarkList recovered: 1
>> watermarkList recovered: 0
>> watermarkList recovered: 0
>> watermarkList recovered: 1
>> watermarkList recovered: 1
>> watermarkList recovered: 2
>> watermarkList recovered: 2
>> watermarkList recovered: 2
>> maxWatermark: 2
>> maxWatermark: 2
>> processing watermark: 2
>> processing watermark: 2
>> maxWatermark: 2
>> processing watermark: 2
>> processing watermark: 0 // IS IS ALSO PROCESSING THE OTHER WATERMARKS.
>> WHY?
>> processing watermark: 0
>> processing watermark: 0
>> processing watermark: 0
>> Attempts restart: 1
>> processing watermark: 1
>> processing watermark: 1
>> processing watermark: 1
>> processing watermark: 1
>> Attempts restart: 1
>> processing watermark: 2
>> processing watermark: 2
>> process

Re: How to make onTimer() trigger on a CoProcessFunction after a failure?

2021-06-24 Thread Felipe Gutierrez
So, just an update.

When I used this code (My stateful watermark) on the original application
it seems that I can recover the latest watermark and further process the
join with stuck events on it.
I don't even have to create MyCoProcessFunction to implement a low-level
join. The available .coGroup(MyCoGroupFunction) works as a charm.

Thank you again for the clarifications!
Felipe

On Mon, Jun 21, 2021 at 5:18 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hello Piotr,
>
> Could you please help me to ensure that I am implementing it in the
> correct way?
>
> I created the WatermarkFunction [1] based on the FilterFunction from Flink
> and the WatermarkStreamOperator [2] and I am doing unit test [3]. Then
> there are things that I am not sure how to do.
>
> How to make the ListState singleton on all parallel operators?
>
> When my job restarts I don't even have to call "processWatermark(new
> Watermark(maxWatermark));" on the end of the "initializeState()". I can see
> that the job process the previous watermarks before it fails. Is it because
> the source is one that I created at the end of the unit test "MySource"? Or
> is it because I don't have a join on the stream pipeline? I have the output
> of my unit test below at this message in case you are not able to runt the
> test.
>
> [1]
> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkFunction.java
> [2]
> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperator.java
> [3]
> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperatorTest.java#L113
>
> $ cd explore-flink/docker/ops-playground-image/java/explore-flink/
> $ mvn -Dtest=WatermarkStreamOperatorTest#testRestartWithLatestWatermark
> test
>
> WatermarkStreamOperator.initializeState
> WatermarkStreamOperator.initializeState
> WatermarkStreamOperator.initializeState
> WatermarkStreamOperator.initializeState
> initializeState... 0
> initializeState... 0
> initializeState... 0
> initializeState... 0
> maxWatermark: 0
> maxWatermark: 0
> maxWatermark: 0
> maxWatermark: 0
> processing watermark: 0
> processing watermark: 0
> processing watermark: 0
> processing watermark: 0
> processing watermark: 0
> processing watermark: 0
> processing watermark: 0
> processing watermark: 0
> Attempts restart: 0
> processing watermark: 1
> processing watermark: 1
> processing watermark: 1
> processing watermark: 1
> Attempts restart: 0
> processing watermark: 2
> processing watermark: 2
> processing watermark: 2
> processing watermark: 2
> Attempts restart: 0
> processing watermark: 3
> processing watermark: 3
> processing watermark: 3
> processing watermark: 3
> Attempts restart: 0
> processing watermark: 9223372036854775807
> processing watermark: 9223372036854775807
> processing watermark: 9223372036854775807
> processing watermark: 9223372036854775807
> This exception will trigger until the reference time [2021-06-21
> 16:57:19.531] reaches the trigger time [2021-06-21 16:57:21.672] // HERE
> THE JOB IS RESTARTING
> initializeState... 1
> initializeState... 1
> initializeState... 1
> WatermarkStreamOperator.initializeState
> WatermarkStreamOperator.initializeState
> WatermarkStreamOperator.initializeState
> WatermarkStreamOperator.initializeState
> watermarkList recovered: 0
> watermarkList recovered: 0
> watermarkList recovered: 0
> watermarkList recovered: 0
> watermarkList recovered: 0
> watermarkList recovered: 1
> watermarkList recovered: 2
> initializeState... 1
> maxWatermark: 2 // HERE IS THE LATEST WATERMARK
> processing watermark: 2 // I PROCESS IT HERE
> watermarkList recovered: 0
> watermarkList recovered: 1
> watermarkList recovered: 0
> watermarkList recovered: 0
> watermarkList recovered: 1
> watermarkList recovered: 1
> watermarkList recovered: 2
> watermarkList recovered: 2
> watermarkList recovered: 2
> maxWatermark: 2
> maxWatermark: 2
> processing watermark: 2
> processing watermark: 2
> maxWatermark: 2
> processing watermark: 2
> processing watermark: 0 // IS IS ALSO PROCESSING THE OTHER WATERMARKS. WHY?
> processing watermark: 0
> processing watermark: 0
> processing watermark: 0
> Attempts restart: 1
> processing watermark: 1
> processing watermark: 1
> processing watermark: 1
> processing watermark: 1
> Attempts restart: 1
> processing watermark: 2
> processing watermark: 2
> processing watermark: 2
> processing watermark: 2
> Attempts restart: 1
> processing watermark: 3
> processing watermark: 3
> processing watermark: 3
> processing watermark: 3
> Attempts restart: 1
> processing watermark: 9223372036854775807
> processing watermark: 9223372036854775807
> processing wat

Re: How to make onTimer() trigger on a CoProcessFunction after a failure?

2021-06-21 Thread Felipe Gutierrez
Hello Piotr,

Could you please help me to ensure that I am implementing it in the correct
way?

I created the WatermarkFunction [1] based on the FilterFunction from Flink
and the WatermarkStreamOperator [2] and I am doing unit test [3]. Then
there are things that I am not sure how to do.

How to make the ListState singleton on all parallel operators?

When my job restarts I don't even have to call "processWatermark(new
Watermark(maxWatermark));" on the end of the "initializeState()". I can see
that the job process the previous watermarks before it fails. Is it because
the source is one that I created at the end of the unit test "MySource"? Or
is it because I don't have a join on the stream pipeline? I have the output
of my unit test below at this message in case you are not able to runt the
test.

[1]
https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkFunction.java
[2]
https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperator.java
[3]
https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperatorTest.java#L113

$ cd explore-flink/docker/ops-playground-image/java/explore-flink/
$ mvn -Dtest=WatermarkStreamOperatorTest#testRestartWithLatestWatermark test

WatermarkStreamOperator.initializeState
WatermarkStreamOperator.initializeState
WatermarkStreamOperator.initializeState
WatermarkStreamOperator.initializeState
initializeState... 0
initializeState... 0
initializeState... 0
initializeState... 0
maxWatermark: 0
maxWatermark: 0
maxWatermark: 0
maxWatermark: 0
processing watermark: 0
processing watermark: 0
processing watermark: 0
processing watermark: 0
processing watermark: 0
processing watermark: 0
processing watermark: 0
processing watermark: 0
Attempts restart: 0
processing watermark: 1
processing watermark: 1
processing watermark: 1
processing watermark: 1
Attempts restart: 0
processing watermark: 2
processing watermark: 2
processing watermark: 2
processing watermark: 2
Attempts restart: 0
processing watermark: 3
processing watermark: 3
processing watermark: 3
processing watermark: 3
Attempts restart: 0
processing watermark: 9223372036854775807
processing watermark: 9223372036854775807
processing watermark: 9223372036854775807
processing watermark: 9223372036854775807
This exception will trigger until the reference time [2021-06-21
16:57:19.531] reaches the trigger time [2021-06-21 16:57:21.672] // HERE
THE JOB IS RESTARTING
initializeState... 1
initializeState... 1
initializeState... 1
WatermarkStreamOperator.initializeState
WatermarkStreamOperator.initializeState
WatermarkStreamOperator.initializeState
WatermarkStreamOperator.initializeState
watermarkList recovered: 0
watermarkList recovered: 0
watermarkList recovered: 0
watermarkList recovered: 0
watermarkList recovered: 0
watermarkList recovered: 1
watermarkList recovered: 2
initializeState... 1
maxWatermark: 2 // HERE IS THE LATEST WATERMARK
processing watermark: 2 // I PROCESS IT HERE
watermarkList recovered: 0
watermarkList recovered: 1
watermarkList recovered: 0
watermarkList recovered: 0
watermarkList recovered: 1
watermarkList recovered: 1
watermarkList recovered: 2
watermarkList recovered: 2
watermarkList recovered: 2
maxWatermark: 2
maxWatermark: 2
processing watermark: 2
processing watermark: 2
maxWatermark: 2
processing watermark: 2
processing watermark: 0 // IS IS ALSO PROCESSING THE OTHER WATERMARKS. WHY?
processing watermark: 0
processing watermark: 0
processing watermark: 0
Attempts restart: 1
processing watermark: 1
processing watermark: 1
processing watermark: 1
processing watermark: 1
Attempts restart: 1
processing watermark: 2
processing watermark: 2
processing watermark: 2
processing watermark: 2
Attempts restart: 1
processing watermark: 3
processing watermark: 3
processing watermark: 3
processing watermark: 3
Attempts restart: 1
processing watermark: 9223372036854775807
processing watermark: 9223372036854775807
processing watermark: 9223372036854775807
processing watermark: 9223372036854775807
This is a poison but we do NOT throw an exception because the reference
time passed :) [2021-06-21 16:57:22.849] >= [2021-06-21 16:57:21.672]
Attempts restart: 1
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 6.836 sec


On Fri, Jun 18, 2021 at 2:46 PM Piotr Nowojski  wrote:

> I'm glad I could help, I hope it will solve your problem :)
>
> Best,
> Piotrek
>
> pt., 18 cze 2021 o 14:38 Felipe Gutierrez 
> napisał(a):
>
>> On Fri, Jun 18, 2021 at 1:41 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> Keep in mind that this is a quite low level approach to this problem. It
>>> would be much better to make sure that after recovery wate

Re: How to make onTimer() trigger on a CoProcessFunction after a failure?

2021-06-18 Thread Piotr Nowojski
I'm glad I could help, I hope it will solve your problem :)

Best,
Piotrek

pt., 18 cze 2021 o 14:38 Felipe Gutierrez 
napisał(a):

> On Fri, Jun 18, 2021 at 1:41 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> Keep in mind that this is a quite low level approach to this problem. It
>> would be much better to make sure that after recovery watermarks are still
>> being emitted.
>>
>
> yes. Indeed it looks like a very low level. I did a small test to emit one
> watermark for the stream that was recovered and then it can process
> the join. It has the same behavior on using a CoGroupFunction nad a
> CoProcessFunction. So in the end I don't need to implement
> MyCoProcessFunction with checkpoint. I just need to emit a new watermark
> after the job recovers.
>
> In my case, I am using Kafka source. so, if I make Kafka keeping emitting
> watermarks I solve the problem. Otherwise, I have to implement this custom
> operator.
>
> Thanks for your answer!
> Felipe
>
>
>>
>> If you are using a built-in source, it's probably easier to do it in a
>> custom operator. I would try to implement a custom one based on
>> AbstractStreamOperator. Your class would also need to implement the
>> OneInputStreamOperator interface. `processElement` you could implement as
>> an identity function (just pass down the stream element unchanged). In
>> `processWatermark` you would need to store the latest watermark on the
>> `ListState` field (you can declare it inside
>> `AbstractStreamOperator#initializeState` via `context.getListState(new
>> ListStateDescriptor<>("your-field-name", Long.class));`). During normal
>> processing (`processWatermark`) make sure it's a singleton list. During
>> recovery (`AbstractStreamOpeartor#initializeState()`) without rescaling,
>> you would just access this state field and re-emit the only element on that
>> list. However during recovery, depending if you are scaling up (a) or down
>> (b), you could have a case where you sometimes have either (a) empty list
>> (in that case you can not emit anything), or (b) many elements on the list
>> (in that case you would need to calculate a minimum of all elements).
>>
>> As operator API is not a very oficial one, it's not well documented. For
>> an example you would need to take a look in the Flink code itself by
>> finding existing implementations of the `AbstractStreamOperator` or
>> `OneInputStreamOperator`.
>>
>> Best,
>> Piotrek
>>
>> pt., 18 cze 2021 o 12:49 Felipe Gutierrez 
>> napisał(a):
>>
>>> Hello Piotrek,
>>>
>>> On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski 
>>> wrote:
>>>
 Hi,

 As far as I can tell timers should be checkpointed and recovered. What
 may be happening is that the state of the last seen watermarks by operators
 on different inputs and different channels inside an input is not
 persisted. Flink is assuming that after the restart, watermark assigners
 will emit newer watermarks after the recovery. However if one of your
 inputs is dormant and it has already emitted some very high watermark long
 time before the failure, after recovery if no new watermark is emitted,
 this input/input channel might be preventing timers from firing. Can you
 check if that's what's happening in your case?

>>>
>>> I think you are correct. at least when I reproduce the bug it is like
>>> you said.
>>>
>>>
 If so you would have to make sure one way or another that some
 watermarks will be emitted after recovery. As a last resort, you could
 manually store the watermarks in the operators/sources state and re-emit
 last seen watermark during recovery.

>>>
>>> Could you please point how I can checkpoint the watermarks on a source
>>> operator? Is it done by this code below from here (
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>> )?
>>>
>>> FlinkKafkaConsumer kafkaSource = new
>>> FlinkKafkaConsumer<>("myTopic", schema, props);
>>> kafkaSource.assignTimestampsAndWatermarks(
>>> WatermarkStrategy.
>>> .forBoundedOutOfOrderness(Duration.ofSeconds(20)));
>>>
>>> Thanks,
>>> Felipe
>>>
>>>

 Best,
 Piotrek

 czw., 17 cze 2021 o 13:46 Felipe Gutierrez <
 felipe.o.gutier...@gmail.com> napisał(a):

> Hi community,
>
> I have implemented a join function using CoProcessFunction with
> CheckpointedFunction to recover from failures. I added some debug lines to
> check if it is restoring and it does. Before the crash, I process events
> that fall at processElement2. I create snapshots at snapshotState(), the
> application comes back and restores the events. That is fine.
>
> After the restore, I process events that fall on processElement1. I
> register event timers for them as I did on processElement2 before the
> crash. But the onTimer() is never called. The point is that I don't have
> any event

Re: How to make onTimer() trigger on a CoProcessFunction after a failure?

2021-06-18 Thread Felipe Gutierrez
On Fri, Jun 18, 2021 at 1:41 PM Piotr Nowojski  wrote:

> Hi,
>
> Keep in mind that this is a quite low level approach to this problem. It
> would be much better to make sure that after recovery watermarks are still
> being emitted.
>

yes. Indeed it looks like a very low level. I did a small test to emit one
watermark for the stream that was recovered and then it can process
the join. It has the same behavior on using a CoGroupFunction nad a
CoProcessFunction. So in the end I don't need to implement
MyCoProcessFunction with checkpoint. I just need to emit a new watermark
after the job recovers.

In my case, I am using Kafka source. so, if I make Kafka keeping emitting
watermarks I solve the problem. Otherwise, I have to implement this custom
operator.

Thanks for your answer!
Felipe


>
> If you are using a built-in source, it's probably easier to do it in a
> custom operator. I would try to implement a custom one based on
> AbstractStreamOperator. Your class would also need to implement the
> OneInputStreamOperator interface. `processElement` you could implement as
> an identity function (just pass down the stream element unchanged). In
> `processWatermark` you would need to store the latest watermark on the
> `ListState` field (you can declare it inside
> `AbstractStreamOperator#initializeState` via `context.getListState(new
> ListStateDescriptor<>("your-field-name", Long.class));`). During normal
> processing (`processWatermark`) make sure it's a singleton list. During
> recovery (`AbstractStreamOpeartor#initializeState()`) without rescaling,
> you would just access this state field and re-emit the only element on that
> list. However during recovery, depending if you are scaling up (a) or down
> (b), you could have a case where you sometimes have either (a) empty list
> (in that case you can not emit anything), or (b) many elements on the list
> (in that case you would need to calculate a minimum of all elements).
>
> As operator API is not a very oficial one, it's not well documented. For
> an example you would need to take a look in the Flink code itself by
> finding existing implementations of the `AbstractStreamOperator` or
> `OneInputStreamOperator`.
>
> Best,
> Piotrek
>
> pt., 18 cze 2021 o 12:49 Felipe Gutierrez 
> napisał(a):
>
>> Hello Piotrek,
>>
>> On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> As far as I can tell timers should be checkpointed and recovered. What
>>> may be happening is that the state of the last seen watermarks by operators
>>> on different inputs and different channels inside an input is not
>>> persisted. Flink is assuming that after the restart, watermark assigners
>>> will emit newer watermarks after the recovery. However if one of your
>>> inputs is dormant and it has already emitted some very high watermark long
>>> time before the failure, after recovery if no new watermark is emitted,
>>> this input/input channel might be preventing timers from firing. Can you
>>> check if that's what's happening in your case?
>>>
>>
>> I think you are correct. at least when I reproduce the bug it is like you
>> said.
>>
>>
>>> If so you would have to make sure one way or another that some
>>> watermarks will be emitted after recovery. As a last resort, you could
>>> manually store the watermarks in the operators/sources state and re-emit
>>> last seen watermark during recovery.
>>>
>>
>> Could you please point how I can checkpoint the watermarks on a source
>> operator? Is it done by this code below from here (
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>> )?
>>
>> FlinkKafkaConsumer kafkaSource = new
>> FlinkKafkaConsumer<>("myTopic", schema, props);
>> kafkaSource.assignTimestampsAndWatermarks(
>> WatermarkStrategy.
>> .forBoundedOutOfOrderness(Duration.ofSeconds(20)));
>>
>> Thanks,
>> Felipe
>>
>>
>>>
>>> Best,
>>> Piotrek
>>>
>>> czw., 17 cze 2021 o 13:46 Felipe Gutierrez 
>>> napisał(a):
>>>
 Hi community,

 I have implemented a join function using CoProcessFunction with
 CheckpointedFunction to recover from failures. I added some debug lines to
 check if it is restoring and it does. Before the crash, I process events
 that fall at processElement2. I create snapshots at snapshotState(), the
 application comes back and restores the events. That is fine.

 After the restore, I process events that fall on processElement1. I
 register event timers for them as I did on processElement2 before the
 crash. But the onTimer() is never called. The point is that I don't have
 any events to send to processElement2() to make the CoProcessFunction
 register a time for them. They were sent before the crash.

 I suppose that the onTimer() is called only when there are
 "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and
 processEleme

Re: How to make onTimer() trigger on a CoProcessFunction after a failure?

2021-06-18 Thread Piotr Nowojski
Hi,

Keep in mind that this is a quite low level approach to this problem. It
would be much better to make sure that after recovery watermarks are still
being emitted.

If you are using a built-in source, it's probably easier to do it in a
custom operator. I would try to implement a custom one based on
AbstractStreamOperator. Your class would also need to implement the
OneInputStreamOperator interface. `processElement` you could implement as
an identity function (just pass down the stream element unchanged). In
`processWatermark` you would need to store the latest watermark on the
`ListState` field (you can declare it inside
`AbstractStreamOperator#initializeState` via `context.getListState(new
ListStateDescriptor<>("your-field-name", Long.class));`). During normal
processing (`processWatermark`) make sure it's a singleton list. During
recovery (`AbstractStreamOpeartor#initializeState()`) without rescaling,
you would just access this state field and re-emit the only element on that
list. However during recovery, depending if you are scaling up (a) or down
(b), you could have a case where you sometimes have either (a) empty list
(in that case you can not emit anything), or (b) many elements on the list
(in that case you would need to calculate a minimum of all elements).

As operator API is not a very oficial one, it's not well documented. For an
example you would need to take a look in the Flink code itself by finding
existing implementations of the `AbstractStreamOperator` or
`OneInputStreamOperator`.

Best,
Piotrek

pt., 18 cze 2021 o 12:49 Felipe Gutierrez 
napisał(a):

> Hello Piotrek,
>
> On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> As far as I can tell timers should be checkpointed and recovered. What
>> may be happening is that the state of the last seen watermarks by operators
>> on different inputs and different channels inside an input is not
>> persisted. Flink is assuming that after the restart, watermark assigners
>> will emit newer watermarks after the recovery. However if one of your
>> inputs is dormant and it has already emitted some very high watermark long
>> time before the failure, after recovery if no new watermark is emitted,
>> this input/input channel might be preventing timers from firing. Can you
>> check if that's what's happening in your case?
>>
>
> I think you are correct. at least when I reproduce the bug it is like you
> said.
>
>
>> If so you would have to make sure one way or another that some watermarks
>> will be emitted after recovery. As a last resort, you could manually store
>> the watermarks in the operators/sources state and re-emit last seen
>> watermark during recovery.
>>
>
> Could you please point how I can checkpoint the watermarks on a source
> operator? Is it done by this code below from here (
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
> )?
>
> FlinkKafkaConsumer kafkaSource = new
> FlinkKafkaConsumer<>("myTopic", schema, props);
> kafkaSource.assignTimestampsAndWatermarks(
> WatermarkStrategy.
> .forBoundedOutOfOrderness(Duration.ofSeconds(20)));
>
> Thanks,
> Felipe
>
>
>>
>> Best,
>> Piotrek
>>
>> czw., 17 cze 2021 o 13:46 Felipe Gutierrez 
>> napisał(a):
>>
>>> Hi community,
>>>
>>> I have implemented a join function using CoProcessFunction with
>>> CheckpointedFunction to recover from failures. I added some debug lines to
>>> check if it is restoring and it does. Before the crash, I process events
>>> that fall at processElement2. I create snapshots at snapshotState(), the
>>> application comes back and restores the events. That is fine.
>>>
>>> After the restore, I process events that fall on processElement1. I
>>> register event timers for them as I did on processElement2 before the
>>> crash. But the onTimer() is never called. The point is that I don't have
>>> any events to send to processElement2() to make the CoProcessFunction
>>> register a time for them. They were sent before the crash.
>>>
>>> I suppose that the onTimer() is called only when there are
>>> "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and
>>> processElement2. Because when I test the same application without crashing
>>> and the CoProcessFunction triggers the onTimer() method.
>>>
>>> But if I have a crash in the middle the CoProcessFunction does not call
>>> onTimer(). Why is that? Is that normal? What do I have to do to make the
>>> CoProcessFunction trigger the onTime() method even if only one stream is
>>> processed let's say at the processElement2() method and the other stream is
>>> restored from a snapshot? I imagine that I have to register a time during
>>> the recovery (initializeState()). But how?
>>>
>>> thanks,
>>> Felipe
>>>
>>


Re: How to make onTimer() trigger on a CoProcessFunction after a failure?

2021-06-18 Thread Felipe Gutierrez
Hello Piotrek,

On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski 
wrote:

> Hi,
>
> As far as I can tell timers should be checkpointed and recovered. What may
> be happening is that the state of the last seen watermarks by operators on
> different inputs and different channels inside an input is not persisted.
> Flink is assuming that after the restart, watermark assigners will emit
> newer watermarks after the recovery. However if one of your inputs is
> dormant and it has already emitted some very high watermark long time
> before the failure, after recovery if no new watermark is emitted, this
> input/input channel might be preventing timers from firing. Can you check
> if that's what's happening in your case?
>

I think you are correct. at least when I reproduce the bug it is like you
said.


> If so you would have to make sure one way or another that some watermarks
> will be emitted after recovery. As a last resort, you could manually store
> the watermarks in the operators/sources state and re-emit last seen
> watermark during recovery.
>

Could you please point how I can checkpoint the watermarks on a source
operator? Is it done by this code below from here (
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
)?

FlinkKafkaConsumer kafkaSource = new
FlinkKafkaConsumer<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(
WatermarkStrategy.
.forBoundedOutOfOrderness(Duration.ofSeconds(20)));

Thanks,
Felipe


>
> Best,
> Piotrek
>
> czw., 17 cze 2021 o 13:46 Felipe Gutierrez 
> napisał(a):
>
>> Hi community,
>>
>> I have implemented a join function using CoProcessFunction with
>> CheckpointedFunction to recover from failures. I added some debug lines to
>> check if it is restoring and it does. Before the crash, I process events
>> that fall at processElement2. I create snapshots at snapshotState(), the
>> application comes back and restores the events. That is fine.
>>
>> After the restore, I process events that fall on processElement1. I
>> register event timers for them as I did on processElement2 before the
>> crash. But the onTimer() is never called. The point is that I don't have
>> any events to send to processElement2() to make the CoProcessFunction
>> register a time for them. They were sent before the crash.
>>
>> I suppose that the onTimer() is called only when there are
>> "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and
>> processElement2. Because when I test the same application without crashing
>> and the CoProcessFunction triggers the onTimer() method.
>>
>> But if I have a crash in the middle the CoProcessFunction does not call
>> onTimer(). Why is that? Is that normal? What do I have to do to make the
>> CoProcessFunction trigger the onTime() method even if only one stream is
>> processed let's say at the processElement2() method and the other stream is
>> restored from a snapshot? I imagine that I have to register a time during
>> the recovery (initializeState()). But how?
>>
>> thanks,
>> Felipe
>>
>


Re: How to make onTimer() trigger on a CoProcessFunction after a failure?

2021-06-18 Thread Piotr Nowojski
Hi,

As far as I can tell timers should be checkpointed and recovered. What may
be happening is that the state of the last seen watermarks by operators on
different inputs and different channels inside an input is not persisted.
Flink is assuming that after the restart, watermark assigners will emit
newer watermarks after the recovery. However if one of your inputs is
dormant and it has already emitted some very high watermark long time
before the failure, after recovery if no new watermark is emitted, this
input/input channel might be preventing timers from firing. Can you check
if that's what's happening in your case?

If so you would have to make sure one way or another that some watermarks
will be emitted after recovery. As a last resort, you could manually store
the watermarks in the operators/sources state and re-emit last seen
watermark during recovery.

Best,
Piotrek

czw., 17 cze 2021 o 13:46 Felipe Gutierrez 
napisał(a):

> Hi community,
>
> I have implemented a join function using CoProcessFunction with
> CheckpointedFunction to recover from failures. I added some debug lines to
> check if it is restoring and it does. Before the crash, I process events
> that fall at processElement2. I create snapshots at snapshotState(), the
> application comes back and restores the events. That is fine.
>
> After the restore, I process events that fall on processElement1. I
> register event timers for them as I did on processElement2 before the
> crash. But the onTimer() is never called. The point is that I don't have
> any events to send to processElement2() to make the CoProcessFunction
> register a time for them. They were sent before the crash.
>
> I suppose that the onTimer() is called only when there are
> "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and
> processElement2. Because when I test the same application without crashing
> and the CoProcessFunction triggers the onTimer() method.
>
> But if I have a crash in the middle the CoProcessFunction does not call
> onTimer(). Why is that? Is that normal? What do I have to do to make the
> CoProcessFunction trigger the onTime() method even if only one stream is
> processed let's say at the processElement2() method and the other stream is
> restored from a snapshot? I imagine that I have to register a time during
> the recovery (initializeState()). But how?
>
> thanks,
> Felipe
>


How to make onTimer() trigger on a CoProcessFunction after a failure?

2021-06-17 Thread Felipe Gutierrez
Hi community,

I have implemented a join function using CoProcessFunction with
CheckpointedFunction to recover from failures. I added some debug lines to
check if it is restoring and it does. Before the crash, I process events
that fall at processElement2. I create snapshots at snapshotState(), the
application comes back and restores the events. That is fine.

After the restore, I process events that fall on processElement1. I
register event timers for them as I did on processElement2 before the
crash. But the onTimer() is never called. The point is that I don't have
any events to send to processElement2() to make the CoProcessFunction
register a time for them. They were sent before the crash.

I suppose that the onTimer() is called only when there are
"timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and
processElement2. Because when I test the same application without crashing
and the CoProcessFunction triggers the onTimer() method.

But if I have a crash in the middle the CoProcessFunction does not call
onTimer(). Why is that? Is that normal? What do I have to do to make the
CoProcessFunction trigger the onTime() method even if only one stream is
processed let's say at the processElement2() method and the other stream is
restored from a snapshot? I imagine that I have to register a time during
the recovery (initializeState()). But how?

thanks,
Felipe