Re: Data loss when restoring from savepoint

2019-02-14 Thread Konstantin Knauf
Hi Juho,


* does the output of the streaming job contain any data, which is not
>> contained in the batch
>
>
> No.
>
> * do you know if all lost records are contained in the last savepoint you
>> took before the window fired? This would mean that no records are lost
>> after the last restore.
>
>
> I haven't built the tooling required to check all IDs like that, but yes,
> that's my understanding currently. To check that I would need to:
> - kill the stream only once on a given day (so that there's only one
> savepoint creation & restore)
> - next day or later: save all missing ids from batch output comparison
> - next day or later: read the savepoint with bravo & check that it
> contains all of those missing IDs
>
> However I haven't built the tooling for that yet. Do you think it's
> necessary to verify that this assumption holds?
>

It would be another data point and might help us to track down the problem.
Wether it is worth doing it, depends on the result, i.e. wether the current
assumption would be falsified or not, but we only know that in retrospect ;)


> * could you please check the numRecordsOut metric for the WindowOperator
>> (FlinkUI -> TaskMetrics -> Select TaskChain containing WindowOperator ->
>> find metric)? Is the count reported there correct (no missing data)?
>
>
> Is that metric the result of window trigger? If yes, you must mean that I
> check the value of that metric on the next day after restore, so that it
> only contains the count for the output of previous day's window? The
> counter is reset to 0 when job starts (even when state is restored), right?
>

Yes, this metric would be incremented when the window is triggered. Yes,
please check this metric after the window, during which the restore
happened, is fired.

If you don't have a MetricsReporter configured so far, I recommend to
quickly register a Sl4jReporter to log out all metrics every X seconds
(maybe even minutes for your use case):
https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter.
Then you don't need to go trough the WebUI and can keep a history of the
metrics.


> Otherwise, do you have any suggestions for how to instrument the code to
> narrow down further where the data gets lost? To me it would make sense to
> proceed with this, because the problem seems hard to reproduce outside of
> our environment.
>

Let's focus on checking this metric above, to make sure that the
WindowOperator is actually emitting less records than the overall number of
keys in the state as your experiments suggest, and on sharing the code.


> On Thu, Feb 14, 2019 at 10:57 AM Konstantin Knauf <
> konstan...@ververica.com> wrote:
>
>> Hi Juho,
>>
>> you are right the problem has actually been narrowed down quite a bit
>> over time. Nevertheless, sharing the code (incl. flink-conf.yaml) might be
>> a good idea. Maybe something strikes the eye, that we have not thought
>> about so far. If you don't feel comfortable sharing the code on the ML,
>> feel free to send me a PM.
>>
>> Besides that, three more questions:
>>
>> * does the output of the streaming job contain any data, which is not
>> contained in the batch output?
>> * do you know if all lost records are contained in the last savepoint you
>> took before the window fired? This would mean that no records are lost
>> after the last restore.
>> * could you please check the numRecordsOut metric for the WindowOperator
>> (FlinkUI -> TaskMetrics -> Select TaskChain containing WindowOperator ->
>> find metric)? Is the count reported there correct (no missing data)?
>>
>> Cheers,
>>
>> Konstantin
>>
>>
>>
>>
>> On Wed, Feb 13, 2019 at 3:19 PM Gyula Fóra  wrote:
>>
>>> Sorry not posting on the mail list was my mistake :/
>>>
>>>
>>> On Wed, 13 Feb 2019 at 15:01, Juho Autio  wrote:
>>>
 Thanks for stepping in, did you post outside of the mailing list on
 purpose btw?

 This I did long time ago:

 To rule out for good any questions about sink behaviour, the job was
> killed and started with an additional Kafka sink.
> The same number of ids were missed in both outputs: KafkaSink &
> BucketingSink.


 (I wrote about that On Oct 1, 2018 in this email thread)

 After that I did the savepoint analysis with Bravo.

 Currently I'm indeed trying to get suggestions how to debug further,
 for example, where to add additional kafka output, to catch where the data
 gets lost. That would probably be somewhere in Flink's internals.

 I could try to share the full code also, but IMHO the problem has been
 quite well narrowed down, considering that data can be found in savepoint,
 savepoint is successfully restored, and after restoring the data doesn't go
 to "user code" (like the reducer) any more.

 On Wed, Feb 13, 2019 at 3:47 PM Gyula Fóra 
 wrote:

> Hi Juho!
> I think the reason you are not getting much answers 

Re: Data loss when restoring from savepoint

2019-02-14 Thread Juho Autio
Thanks Konstantin!

I'll try to see if I can prepare code & conf to be shared as fully as
possible.

In the meantime:

* does the output of the streaming job contain any data, which is not
> contained in the batch


No.

* do you know if all lost records are contained in the last savepoint you
> took before the window fired? This would mean that no records are lost
> after the last restore.


I haven't built the tooling required to check all IDs like that, but yes,
that's my understanding currently. To check that I would need to:
- kill the stream only once on a given day (so that there's only one
savepoint creation & restore)
- next day or later: save all missing ids from batch output comparison
- next day or later: read the savepoint with bravo & check that it contains
all of those missing IDs

However I haven't built the tooling for that yet. Do you think it's
necessary to verify that this assumption holds?

* could you please check the numRecordsOut metric for the WindowOperator
> (FlinkUI -> TaskMetrics -> Select TaskChain containing WindowOperator ->
> find metric)? Is the count reported there correct (no missing data)?


Is that metric the result of window trigger? If yes, you must mean that I
check the value of that metric on the next day after restore, so that it
only contains the count for the output of previous day's window? The
counter is reset to 0 when job starts (even when state is restored), right?

Otherwise, do you have any suggestions for how to instrument the code to
narrow down further where the data gets lost? To me it would make sense to
proceed with this, because the problem seems hard to reproduce outside of
our environment.

On Thu, Feb 14, 2019 at 10:57 AM Konstantin Knauf 
wrote:

> Hi Juho,
>
> you are right the problem has actually been narrowed down quite a bit over
> time. Nevertheless, sharing the code (incl. flink-conf.yaml) might be a
> good idea. Maybe something strikes the eye, that we have not thought about
> so far. If you don't feel comfortable sharing the code on the ML, feel free
> to send me a PM.
>
> Besides that, three more questions:
>
> * does the output of the streaming job contain any data, which is not
> contained in the batch output?
> * do you know if all lost records are contained in the last savepoint you
> took before the window fired? This would mean that no records are lost
> after the last restore.
> * could you please check the numRecordsOut metric for the WindowOperator
> (FlinkUI -> TaskMetrics -> Select TaskChain containing WindowOperator ->
> find metric)? Is the count reported there correct (no missing data)?
>
> Cheers,
>
> Konstantin
>
>
>
>
> On Wed, Feb 13, 2019 at 3:19 PM Gyula Fóra  wrote:
>
>> Sorry not posting on the mail list was my mistake :/
>>
>>
>> On Wed, 13 Feb 2019 at 15:01, Juho Autio  wrote:
>>
>>> Thanks for stepping in, did you post outside of the mailing list on
>>> purpose btw?
>>>
>>> This I did long time ago:
>>>
>>> To rule out for good any questions about sink behaviour, the job was
 killed and started with an additional Kafka sink.
 The same number of ids were missed in both outputs: KafkaSink &
 BucketingSink.
>>>
>>>
>>> (I wrote about that On Oct 1, 2018 in this email thread)
>>>
>>> After that I did the savepoint analysis with Bravo.
>>>
>>> Currently I'm indeed trying to get suggestions how to debug further, for
>>> example, where to add additional kafka output, to catch where the data gets
>>> lost. That would probably be somewhere in Flink's internals.
>>>
>>> I could try to share the full code also, but IMHO the problem has been
>>> quite well narrowed down, considering that data can be found in savepoint,
>>> savepoint is successfully restored, and after restoring the data doesn't go
>>> to "user code" (like the reducer) any more.
>>>
>>> On Wed, Feb 13, 2019 at 3:47 PM Gyula Fóra  wrote:
>>>
 Hi Juho!
 I think the reason you are not getting much answers here is because it
 is very hard to debug this problem remotely.
 Seemingly you do very normal operations, the state contains all the
 required data and nobody else has hit a similar problem for ages.

 My best guess would be some bug with the deduplication or output
 writing logic but without a complete code example its very hard to say
 anything useful.
 Did you try writing it to Kafka to see if the output is there? (that
 way we could rule out the dedup probllem)

 Cheers,
 Gyula

 On Wed, Feb 13, 2019 at 2:37 PM Juho Autio 
 wrote:

> Stefan (or anyone!), please, could I have some feedback on the
> findings that I reported on Dec 21, 2018? This is still a major blocker..
>
> On Thu, Jan 31, 2019 at 11:46 AM Juho Autio 
> wrote:
>
>> Hello, is there anyone that could help with this?
>>
>> On Fri, Jan 11, 2019 at 8:14 AM Juho Autio 
>> wrote:
>>
>>> Stefan, would you have time to comment?
>>>
>>> On Wednesday, January 

Re: Data loss when restoring from savepoint

2019-02-14 Thread Konstantin Knauf
Hi Juho,

you are right the problem has actually been narrowed down quite a bit over
time. Nevertheless, sharing the code (incl. flink-conf.yaml) might be a
good idea. Maybe something strikes the eye, that we have not thought about
so far. If you don't feel comfortable sharing the code on the ML, feel free
to send me a PM.

Besides that, three more questions:

* does the output of the streaming job contain any data, which is not
contained in the batch output?
* do you know if all lost records are contained in the last savepoint you
took before the window fired? This would mean that no records are lost
after the last restore.
* could you please check the numRecordsOut metric for the WindowOperator
(FlinkUI -> TaskMetrics -> Select TaskChain containing WindowOperator ->
find metric)? Is the count reported there correct (no missing data)?

Cheers,

Konstantin




On Wed, Feb 13, 2019 at 3:19 PM Gyula Fóra  wrote:

> Sorry not posting on the mail list was my mistake :/
>
>
> On Wed, 13 Feb 2019 at 15:01, Juho Autio  wrote:
>
>> Thanks for stepping in, did you post outside of the mailing list on
>> purpose btw?
>>
>> This I did long time ago:
>>
>> To rule out for good any questions about sink behaviour, the job was
>>> killed and started with an additional Kafka sink.
>>> The same number of ids were missed in both outputs: KafkaSink &
>>> BucketingSink.
>>
>>
>> (I wrote about that On Oct 1, 2018 in this email thread)
>>
>> After that I did the savepoint analysis with Bravo.
>>
>> Currently I'm indeed trying to get suggestions how to debug further, for
>> example, where to add additional kafka output, to catch where the data gets
>> lost. That would probably be somewhere in Flink's internals.
>>
>> I could try to share the full code also, but IMHO the problem has been
>> quite well narrowed down, considering that data can be found in savepoint,
>> savepoint is successfully restored, and after restoring the data doesn't go
>> to "user code" (like the reducer) any more.
>>
>> On Wed, Feb 13, 2019 at 3:47 PM Gyula Fóra  wrote:
>>
>>> Hi Juho!
>>> I think the reason you are not getting much answers here is because it
>>> is very hard to debug this problem remotely.
>>> Seemingly you do very normal operations, the state contains all the
>>> required data and nobody else has hit a similar problem for ages.
>>>
>>> My best guess would be some bug with the deduplication or output writing
>>> logic but without a complete code example its very hard to say anything
>>> useful.
>>> Did you try writing it to Kafka to see if the output is there? (that way
>>> we could rule out the dedup probllem)
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Wed, Feb 13, 2019 at 2:37 PM Juho Autio  wrote:
>>>
 Stefan (or anyone!), please, could I have some feedback on the findings
 that I reported on Dec 21, 2018? This is still a major blocker..

 On Thu, Jan 31, 2019 at 11:46 AM Juho Autio 
 wrote:

> Hello, is there anyone that could help with this?
>
> On Fri, Jan 11, 2019 at 8:14 AM Juho Autio 
> wrote:
>
>> Stefan, would you have time to comment?
>>
>> On Wednesday, January 2, 2019, Juho Autio 
>> wrote:
>>
>>> Bump – does anyone know if Stefan will be available to comment the
>>> latest findings? Thanks.
>>>
>>> On Fri, Dec 21, 2018 at 2:33 PM Juho Autio 
>>> wrote:
>>>
 Stefan, I managed to analyze savepoint with bravo. It seems that
 the data that's missing from output *is* found in savepoint.

 I simplified my test case to the following:

 - job 1 has bee running for ~10 days
 - savepoint X created & job 1 cancelled
 - job 2 started with restore from savepoint X

 Then I waited until the next day so that job 2 has triggered the 24
 hour window.

 Then I analyzed the output & savepoint:

 - compare job 2 output with the output of a batch pyspark script =>
 find 4223 missing rows
 - pick one of the missing rows (say, id Z)
 - read savepoint X with bravo, filter for id Z => Z was found in
 the savepoint!

 How can it be possible that the value is in state but doesn't end
 up in output after state has been restored & window is eventually 
 triggered?

 I also did similar analysis on the previous case where I
 savepointed & restored the job multiple times (5) within the same 
 24-hour
 window. A missing id that I drilled down to, was found in all of those
 savepoints, yet missing from the output that gets written at the end 
 of the
 day. This is even more surprising: that the missing ID was written to 
 the
 new savepoints also after restoring. Is the reducer state somehow 
 decoupled
 from the window contents?

 Big thanks to bravo-developer Gyula for guiding me 

Re: Data loss when restoring from savepoint

2019-02-13 Thread Gyula Fóra
Sorry not posting on the mail list was my mistake :/


On Wed, 13 Feb 2019 at 15:01, Juho Autio  wrote:

> Thanks for stepping in, did you post outside of the mailing list on
> purpose btw?
>
> This I did long time ago:
>
> To rule out for good any questions about sink behaviour, the job was
>> killed and started with an additional Kafka sink.
>> The same number of ids were missed in both outputs: KafkaSink &
>> BucketingSink.
>
>
> (I wrote about that On Oct 1, 2018 in this email thread)
>
> After that I did the savepoint analysis with Bravo.
>
> Currently I'm indeed trying to get suggestions how to debug further, for
> example, where to add additional kafka output, to catch where the data gets
> lost. That would probably be somewhere in Flink's internals.
>
> I could try to share the full code also, but IMHO the problem has been
> quite well narrowed down, considering that data can be found in savepoint,
> savepoint is successfully restored, and after restoring the data doesn't go
> to "user code" (like the reducer) any more.
>
> On Wed, Feb 13, 2019 at 3:47 PM Gyula Fóra  wrote:
>
>> Hi Juho!
>> I think the reason you are not getting much answers here is because it is
>> very hard to debug this problem remotely.
>> Seemingly you do very normal operations, the state contains all the
>> required data and nobody else has hit a similar problem for ages.
>>
>> My best guess would be some bug with the deduplication or output writing
>> logic but without a complete code example its very hard to say anything
>> useful.
>> Did you try writing it to Kafka to see if the output is there? (that way
>> we could rule out the dedup probllem)
>>
>> Cheers,
>> Gyula
>>
>> On Wed, Feb 13, 2019 at 2:37 PM Juho Autio  wrote:
>>
>>> Stefan (or anyone!), please, could I have some feedback on the findings
>>> that I reported on Dec 21, 2018? This is still a major blocker..
>>>
>>> On Thu, Jan 31, 2019 at 11:46 AM Juho Autio 
>>> wrote:
>>>
 Hello, is there anyone that could help with this?

 On Fri, Jan 11, 2019 at 8:14 AM Juho Autio 
 wrote:

> Stefan, would you have time to comment?
>
> On Wednesday, January 2, 2019, Juho Autio 
> wrote:
>
>> Bump – does anyone know if Stefan will be available to comment the
>> latest findings? Thanks.
>>
>> On Fri, Dec 21, 2018 at 2:33 PM Juho Autio 
>> wrote:
>>
>>> Stefan, I managed to analyze savepoint with bravo. It seems that the
>>> data that's missing from output *is* found in savepoint.
>>>
>>> I simplified my test case to the following:
>>>
>>> - job 1 has bee running for ~10 days
>>> - savepoint X created & job 1 cancelled
>>> - job 2 started with restore from savepoint X
>>>
>>> Then I waited until the next day so that job 2 has triggered the 24
>>> hour window.
>>>
>>> Then I analyzed the output & savepoint:
>>>
>>> - compare job 2 output with the output of a batch pyspark script =>
>>> find 4223 missing rows
>>> - pick one of the missing rows (say, id Z)
>>> - read savepoint X with bravo, filter for id Z => Z was found in the
>>> savepoint!
>>>
>>> How can it be possible that the value is in state but doesn't end up
>>> in output after state has been restored & window is eventually 
>>> triggered?
>>>
>>> I also did similar analysis on the previous case where I savepointed
>>> & restored the job multiple times (5) within the same 24-hour window. A
>>> missing id that I drilled down to, was found in all of those savepoints,
>>> yet missing from the output that gets written at the end of the day. 
>>> This
>>> is even more surprising: that the missing ID was written to the new
>>> savepoints also after restoring. Is the reducer state somehow decoupled
>>> from the window contents?
>>>
>>> Big thanks to bravo-developer Gyula for guiding me through to be
>>> able read the reducer state! https://github.com/king/bravo/pull/11
>>>
>>> Gyula also had an idea for how to troubleshoot the missing data in a
>>> scalable way: I could add some "side effect kafka output" on individual
>>> operators. This should allow tracking more closely at which point the 
>>> data
>>> gets lost. However, maybe this would have to be in some Flink's internal
>>> components, and I'm not sure which those would be.
>>>
>>> Cheers,
>>> Juho
>>>
>>> On Mon, Nov 19, 2018 at 11:52 AM Juho Autio 
>>> wrote:
>>>

 Hi Stefan,

 Bravo doesn't currently support reading a reducer state. I gave it
 a try but couldn't get to a working implementation yet. If anyone can
 provide some insight on how to make this work, please share at github:
 https://github.com/king/bravo/pull/11

 Thanks.

 On Tue, Oct 23, 2018 at 3:32 PM Juho Autio 
 wrote:

> I was glad to 

Re: Data loss when restoring from savepoint

2019-02-13 Thread Juho Autio
Stefan (or anyone!), please, could I have some feedback on the findings
that I reported on Dec 21, 2018? This is still a major blocker..

On Thu, Jan 31, 2019 at 11:46 AM Juho Autio  wrote:

> Hello, is there anyone that could help with this?
>
> On Fri, Jan 11, 2019 at 8:14 AM Juho Autio  wrote:
>
>> Stefan, would you have time to comment?
>>
>> On Wednesday, January 2, 2019, Juho Autio  wrote:
>>
>>> Bump – does anyone know if Stefan will be available to comment the
>>> latest findings? Thanks.
>>>
>>> On Fri, Dec 21, 2018 at 2:33 PM Juho Autio  wrote:
>>>
 Stefan, I managed to analyze savepoint with bravo. It seems that the
 data that's missing from output *is* found in savepoint.

 I simplified my test case to the following:

 - job 1 has bee running for ~10 days
 - savepoint X created & job 1 cancelled
 - job 2 started with restore from savepoint X

 Then I waited until the next day so that job 2 has triggered the 24
 hour window.

 Then I analyzed the output & savepoint:

 - compare job 2 output with the output of a batch pyspark script =>
 find 4223 missing rows
 - pick one of the missing rows (say, id Z)
 - read savepoint X with bravo, filter for id Z => Z was found in the
 savepoint!

 How can it be possible that the value is in state but doesn't end up in
 output after state has been restored & window is eventually triggered?

 I also did similar analysis on the previous case where I savepointed &
 restored the job multiple times (5) within the same 24-hour window. A
 missing id that I drilled down to, was found in all of those savepoints,
 yet missing from the output that gets written at the end of the day. This
 is even more surprising: that the missing ID was written to the new
 savepoints also after restoring. Is the reducer state somehow decoupled
 from the window contents?

 Big thanks to bravo-developer Gyula for guiding me through to be able
 read the reducer state! https://github.com/king/bravo/pull/11

 Gyula also had an idea for how to troubleshoot the missing data in a
 scalable way: I could add some "side effect kafka output" on individual
 operators. This should allow tracking more closely at which point the data
 gets lost. However, maybe this would have to be in some Flink's internal
 components, and I'm not sure which those would be.

 Cheers,
 Juho

 On Mon, Nov 19, 2018 at 11:52 AM Juho Autio 
 wrote:

>
> Hi Stefan,
>
> Bravo doesn't currently support reading a reducer state. I gave it a
> try but couldn't get to a working implementation yet. If anyone can 
> provide
> some insight on how to make this work, please share at github:
> https://github.com/king/bravo/pull/11
>
> Thanks.
>
> On Tue, Oct 23, 2018 at 3:32 PM Juho Autio 
> wrote:
>
>> I was glad to find that bravo had now been updated to support
>> installing bravo to a local maven repo.
>>
>> I was able to load a checkpoint created by my job, thanks to the
>> example provided in bravo README, but I'm still missing the essential 
>> piece.
>>
>> My code was:
>>
>> OperatorStateReader reader = new OperatorStateReader(env2,
>> savepoint, "DistinctFunction");
>> DontKnowWhatTypeThisIs reducingState =
>> reader.readKeyedStates(what should I put here?);
>>
>> I don't know how to read the values collected from reduce() calls in
>> the state. Is there a way to access the reducing state of the window with
>> bravo? I'm a bit confused how this works, because when I check with
>> debugger, flink internally uses a ReducingStateDescriptor
>> with name=window-contents, but still reading operator state for
>> "DistinctFunction" didn't at least throw an exception ("window-contents"
>> threw – obviously there's no operator by that name).
>>
>> Cheers,
>> Juho
>>
>> On Mon, Oct 15, 2018 at 2:25 PM Juho Autio 
>> wrote:
>>
>>> Hi Stefan,
>>>
>>> Sorry but it doesn't seem immediately clear to me what's a good way
>>> to use https://github.com/king/bravo.
>>>
>>> How are people using it? Would you for example modify build.gradle
>>> somehow to publish the bravo as a library locally/internally? Or add 
>>> code
>>> directly in the bravo project (locally) and run it from there (using an
>>> IDE, for example)? Also it doesn't seem like the bravo gradle project
>>> supports building a flink job jar, but if it does, how do I do it?
>>>
>>> Thanks.
>>>
>>> On Thu, Oct 4, 2018 at 9:30 PM Juho Autio 
>>> wrote:
>>>
 Good then, I'll try to analyze the savepoints with Bravo. Thanks!

 > How would you assume that backpressure would influence your
 updates? Updates to each local state still 

Re: Data loss when restoring from savepoint

2019-01-31 Thread Juho Autio
Hello, is there anyone that could help with this?

On Fri, Jan 11, 2019 at 8:14 AM Juho Autio  wrote:

> Stefan, would you have time to comment?
>
> On Wednesday, January 2, 2019, Juho Autio  wrote:
>
>> Bump – does anyone know if Stefan will be available to comment the latest
>> findings? Thanks.
>>
>> On Fri, Dec 21, 2018 at 2:33 PM Juho Autio  wrote:
>>
>>> Stefan, I managed to analyze savepoint with bravo. It seems that the
>>> data that's missing from output *is* found in savepoint.
>>>
>>> I simplified my test case to the following:
>>>
>>> - job 1 has bee running for ~10 days
>>> - savepoint X created & job 1 cancelled
>>> - job 2 started with restore from savepoint X
>>>
>>> Then I waited until the next day so that job 2 has triggered the 24 hour
>>> window.
>>>
>>> Then I analyzed the output & savepoint:
>>>
>>> - compare job 2 output with the output of a batch pyspark script => find
>>> 4223 missing rows
>>> - pick one of the missing rows (say, id Z)
>>> - read savepoint X with bravo, filter for id Z => Z was found in the
>>> savepoint!
>>>
>>> How can it be possible that the value is in state but doesn't end up in
>>> output after state has been restored & window is eventually triggered?
>>>
>>> I also did similar analysis on the previous case where I savepointed &
>>> restored the job multiple times (5) within the same 24-hour window. A
>>> missing id that I drilled down to, was found in all of those savepoints,
>>> yet missing from the output that gets written at the end of the day. This
>>> is even more surprising: that the missing ID was written to the new
>>> savepoints also after restoring. Is the reducer state somehow decoupled
>>> from the window contents?
>>>
>>> Big thanks to bravo-developer Gyula for guiding me through to be able
>>> read the reducer state! https://github.com/king/bravo/pull/11
>>>
>>> Gyula also had an idea for how to troubleshoot the missing data in a
>>> scalable way: I could add some "side effect kafka output" on individual
>>> operators. This should allow tracking more closely at which point the data
>>> gets lost. However, maybe this would have to be in some Flink's internal
>>> components, and I'm not sure which those would be.
>>>
>>> Cheers,
>>> Juho
>>>
>>> On Mon, Nov 19, 2018 at 11:52 AM Juho Autio 
>>> wrote:
>>>

 Hi Stefan,

 Bravo doesn't currently support reading a reducer state. I gave it a
 try but couldn't get to a working implementation yet. If anyone can provide
 some insight on how to make this work, please share at github:
 https://github.com/king/bravo/pull/11

 Thanks.

 On Tue, Oct 23, 2018 at 3:32 PM Juho Autio 
 wrote:

> I was glad to find that bravo had now been updated to support
> installing bravo to a local maven repo.
>
> I was able to load a checkpoint created by my job, thanks to the
> example provided in bravo README, but I'm still missing the essential 
> piece.
>
> My code was:
>
> OperatorStateReader reader = new OperatorStateReader(env2,
> savepoint, "DistinctFunction");
> DontKnowWhatTypeThisIs reducingState =
> reader.readKeyedStates(what should I put here?);
>
> I don't know how to read the values collected from reduce() calls in
> the state. Is there a way to access the reducing state of the window with
> bravo? I'm a bit confused how this works, because when I check with
> debugger, flink internally uses a ReducingStateDescriptor
> with name=window-contents, but still reading operator state for
> "DistinctFunction" didn't at least throw an exception ("window-contents"
> threw – obviously there's no operator by that name).
>
> Cheers,
> Juho
>
> On Mon, Oct 15, 2018 at 2:25 PM Juho Autio 
> wrote:
>
>> Hi Stefan,
>>
>> Sorry but it doesn't seem immediately clear to me what's a good way
>> to use https://github.com/king/bravo.
>>
>> How are people using it? Would you for example modify build.gradle
>> somehow to publish the bravo as a library locally/internally? Or add code
>> directly in the bravo project (locally) and run it from there (using an
>> IDE, for example)? Also it doesn't seem like the bravo gradle project
>> supports building a flink job jar, but if it does, how do I do it?
>>
>> Thanks.
>>
>> On Thu, Oct 4, 2018 at 9:30 PM Juho Autio 
>> wrote:
>>
>>> Good then, I'll try to analyze the savepoints with Bravo. Thanks!
>>>
>>> > How would you assume that backpressure would influence your
>>> updates? Updates to each local state still happen event-by-event, in a
>>> single reader/writing thread.
>>>
>>> Sure, just an ignorant guess by me. I'm not familiar with most of
>>> Flink's internals. Any way high backpressure is not a seen on this job
>>> after it has caught up the lag, so at I thought it would be worth
>>> mentioning.
>>>

Re: Data loss when restoring from savepoint

2019-01-10 Thread Juho Autio
Stefan, would you have time to comment?

On Wednesday, January 2, 2019, Juho Autio  wrote:

> Bump – does anyone know if Stefan will be available to comment the latest
> findings? Thanks.
>
> On Fri, Dec 21, 2018 at 2:33 PM Juho Autio  wrote:
>
>> Stefan, I managed to analyze savepoint with bravo. It seems that the data
>> that's missing from output *is* found in savepoint.
>>
>> I simplified my test case to the following:
>>
>> - job 1 has bee running for ~10 days
>> - savepoint X created & job 1 cancelled
>> - job 2 started with restore from savepoint X
>>
>> Then I waited until the next day so that job 2 has triggered the 24 hour
>> window.
>>
>> Then I analyzed the output & savepoint:
>>
>> - compare job 2 output with the output of a batch pyspark script => find
>> 4223 missing rows
>> - pick one of the missing rows (say, id Z)
>> - read savepoint X with bravo, filter for id Z => Z was found in the
>> savepoint!
>>
>> How can it be possible that the value is in state but doesn't end up in
>> output after state has been restored & window is eventually triggered?
>>
>> I also did similar analysis on the previous case where I savepointed &
>> restored the job multiple times (5) within the same 24-hour window. A
>> missing id that I drilled down to, was found in all of those savepoints,
>> yet missing from the output that gets written at the end of the day. This
>> is even more surprising: that the missing ID was written to the new
>> savepoints also after restoring. Is the reducer state somehow decoupled
>> from the window contents?
>>
>> Big thanks to bravo-developer Gyula for guiding me through to be able
>> read the reducer state! https://github.com/king/bravo/pull/11
>>
>> Gyula also had an idea for how to troubleshoot the missing data in a
>> scalable way: I could add some "side effect kafka output" on individual
>> operators. This should allow tracking more closely at which point the data
>> gets lost. However, maybe this would have to be in some Flink's internal
>> components, and I'm not sure which those would be.
>>
>> Cheers,
>> Juho
>>
>> On Mon, Nov 19, 2018 at 11:52 AM Juho Autio  wrote:
>>
>>>
>>> Hi Stefan,
>>>
>>> Bravo doesn't currently support reading a reducer state. I gave it a try
>>> but couldn't get to a working implementation yet. If anyone can provide
>>> some insight on how to make this work, please share at github:
>>> https://github.com/king/bravo/pull/11
>>>
>>> Thanks.
>>>
>>> On Tue, Oct 23, 2018 at 3:32 PM Juho Autio  wrote:
>>>
 I was glad to find that bravo had now been updated to support
 installing bravo to a local maven repo.

 I was able to load a checkpoint created by my job, thanks to the
 example provided in bravo README, but I'm still missing the essential 
 piece.

 My code was:

 OperatorStateReader reader = new OperatorStateReader(env2,
 savepoint, "DistinctFunction");
 DontKnowWhatTypeThisIs reducingState =
 reader.readKeyedStates(what should I put here?);

 I don't know how to read the values collected from reduce() calls in
 the state. Is there a way to access the reducing state of the window with
 bravo? I'm a bit confused how this works, because when I check with
 debugger, flink internally uses a ReducingStateDescriptor
 with name=window-contents, but still reading operator state for
 "DistinctFunction" didn't at least throw an exception ("window-contents"
 threw – obviously there's no operator by that name).

 Cheers,
 Juho

 On Mon, Oct 15, 2018 at 2:25 PM Juho Autio 
 wrote:

> Hi Stefan,
>
> Sorry but it doesn't seem immediately clear to me what's a good way to
> use https://github.com/king/bravo.
>
> How are people using it? Would you for example modify build.gradle
> somehow to publish the bravo as a library locally/internally? Or add code
> directly in the bravo project (locally) and run it from there (using an
> IDE, for example)? Also it doesn't seem like the bravo gradle project
> supports building a flink job jar, but if it does, how do I do it?
>
> Thanks.
>
> On Thu, Oct 4, 2018 at 9:30 PM Juho Autio 
> wrote:
>
>> Good then, I'll try to analyze the savepoints with Bravo. Thanks!
>>
>> > How would you assume that backpressure would influence your
>> updates? Updates to each local state still happen event-by-event, in a
>> single reader/writing thread.
>>
>> Sure, just an ignorant guess by me. I'm not familiar with most of
>> Flink's internals. Any way high backpressure is not a seen on this job
>> after it has caught up the lag, so at I thought it would be worth
>> mentioning.
>>
>> On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> Am 04.10.2018 um 16:08 schrieb Juho Autio :
>>>
>>> > you could take a look at Bravo [1] to 

Re: Data loss when restoring from savepoint

2019-01-02 Thread Juho Autio
Bump – does anyone know if Stefan will be available to comment the latest
findings? Thanks.

On Fri, Dec 21, 2018 at 2:33 PM Juho Autio  wrote:

> Stefan, I managed to analyze savepoint with bravo. It seems that the data
> that's missing from output *is* found in savepoint.
>
> I simplified my test case to the following:
>
> - job 1 has bee running for ~10 days
> - savepoint X created & job 1 cancelled
> - job 2 started with restore from savepoint X
>
> Then I waited until the next day so that job 2 has triggered the 24 hour
> window.
>
> Then I analyzed the output & savepoint:
>
> - compare job 2 output with the output of a batch pyspark script => find
> 4223 missing rows
> - pick one of the missing rows (say, id Z)
> - read savepoint X with bravo, filter for id Z => Z was found in the
> savepoint!
>
> How can it be possible that the value is in state but doesn't end up in
> output after state has been restored & window is eventually triggered?
>
> I also did similar analysis on the previous case where I savepointed &
> restored the job multiple times (5) within the same 24-hour window. A
> missing id that I drilled down to, was found in all of those savepoints,
> yet missing from the output that gets written at the end of the day. This
> is even more surprising: that the missing ID was written to the new
> savepoints also after restoring. Is the reducer state somehow decoupled
> from the window contents?
>
> Big thanks to bravo-developer Gyula for guiding me through to be able read
> the reducer state! https://github.com/king/bravo/pull/11
>
> Gyula also had an idea for how to troubleshoot the missing data in a
> scalable way: I could add some "side effect kafka output" on individual
> operators. This should allow tracking more closely at which point the data
> gets lost. However, maybe this would have to be in some Flink's internal
> components, and I'm not sure which those would be.
>
> Cheers,
> Juho
>
> On Mon, Nov 19, 2018 at 11:52 AM Juho Autio  wrote:
>
>>
>> Hi Stefan,
>>
>> Bravo doesn't currently support reading a reducer state. I gave it a try
>> but couldn't get to a working implementation yet. If anyone can provide
>> some insight on how to make this work, please share at github:
>> https://github.com/king/bravo/pull/11
>>
>> Thanks.
>>
>> On Tue, Oct 23, 2018 at 3:32 PM Juho Autio  wrote:
>>
>>> I was glad to find that bravo had now been updated to support installing
>>> bravo to a local maven repo.
>>>
>>> I was able to load a checkpoint created by my job, thanks to the example
>>> provided in bravo README, but I'm still missing the essential piece.
>>>
>>> My code was:
>>>
>>> OperatorStateReader reader = new OperatorStateReader(env2,
>>> savepoint, "DistinctFunction");
>>> DontKnowWhatTypeThisIs reducingState =
>>> reader.readKeyedStates(what should I put here?);
>>>
>>> I don't know how to read the values collected from reduce() calls in the
>>> state. Is there a way to access the reducing state of the window with
>>> bravo? I'm a bit confused how this works, because when I check with
>>> debugger, flink internally uses a ReducingStateDescriptor
>>> with name=window-contents, but still reading operator state for
>>> "DistinctFunction" didn't at least throw an exception ("window-contents"
>>> threw – obviously there's no operator by that name).
>>>
>>> Cheers,
>>> Juho
>>>
>>> On Mon, Oct 15, 2018 at 2:25 PM Juho Autio  wrote:
>>>
 Hi Stefan,

 Sorry but it doesn't seem immediately clear to me what's a good way to
 use https://github.com/king/bravo.

 How are people using it? Would you for example modify build.gradle
 somehow to publish the bravo as a library locally/internally? Or add code
 directly in the bravo project (locally) and run it from there (using an
 IDE, for example)? Also it doesn't seem like the bravo gradle project
 supports building a flink job jar, but if it does, how do I do it?

 Thanks.

 On Thu, Oct 4, 2018 at 9:30 PM Juho Autio  wrote:

> Good then, I'll try to analyze the savepoints with Bravo. Thanks!
>
> > How would you assume that backpressure would influence your updates?
> Updates to each local state still happen event-by-event, in a single
> reader/writing thread.
>
> Sure, just an ignorant guess by me. I'm not familiar with most of
> Flink's internals. Any way high backpressure is not a seen on this job
> after it has caught up the lag, so at I thought it would be worth
> mentioning.
>
> On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Hi,
>>
>> Am 04.10.2018 um 16:08 schrieb Juho Autio :
>>
>> > you could take a look at Bravo [1] to query your savepoints and to
>> check if the state in the savepoint complete w.r.t your expectations
>>
>> Thanks. I'm not 100% if this is the case, but to me it seemed like
>> the missed ids were being logged by the 

Re: Data loss when restoring from savepoint

2018-12-21 Thread Juho Autio
Stefan, I managed to analyze savepoint with bravo. It seems that the data
that's missing from output *is* found in savepoint.

I simplified my test case to the following:

- job 1 has bee running for ~10 days
- savepoint X created & job 1 cancelled
- job 2 started with restore from savepoint X

Then I waited until the next day so that job 2 has triggered the 24 hour
window.

Then I analyzed the output & savepoint:

- compare job 2 output with the output of a batch pyspark script => find
4223 missing rows
- pick one of the missing rows (say, id Z)
- read savepoint X with bravo, filter for id Z => Z was found in the
savepoint!

How can it be possible that the value is in state but doesn't end up in
output after state has been restored & window is eventually triggered?

I also did similar analysis on the previous case where I savepointed &
restored the job multiple times (5) within the same 24-hour window. A
missing id that I drilled down to, was found in all of those savepoints,
yet missing from the output that gets written at the end of the day. This
is even more surprising: that the missing ID was written to the new
savepoints also after restoring. Is the reducer state somehow decoupled
from the window contents?

Big thanks to bravo-developer Gyula for guiding me through to be able read
the reducer state! https://github.com/king/bravo/pull/11

Gyula also had an idea for how to troubleshoot the missing data in a
scalable way: I could add some "side effect kafka output" on individual
operators. This should allow tracking more closely at which point the data
gets lost. However, maybe this would have to be in some Flink's internal
components, and I'm not sure which those would be.

Cheers,
Juho

On Mon, Nov 19, 2018 at 11:52 AM Juho Autio  wrote:

>
> Hi Stefan,
>
> Bravo doesn't currently support reading a reducer state. I gave it a try
> but couldn't get to a working implementation yet. If anyone can provide
> some insight on how to make this work, please share at github:
> https://github.com/king/bravo/pull/11
>
> Thanks.
>
> On Tue, Oct 23, 2018 at 3:32 PM Juho Autio  wrote:
>
>> I was glad to find that bravo had now been updated to support installing
>> bravo to a local maven repo.
>>
>> I was able to load a checkpoint created by my job, thanks to the example
>> provided in bravo README, but I'm still missing the essential piece.
>>
>> My code was:
>>
>> OperatorStateReader reader = new OperatorStateReader(env2,
>> savepoint, "DistinctFunction");
>> DontKnowWhatTypeThisIs reducingState =
>> reader.readKeyedStates(what should I put here?);
>>
>> I don't know how to read the values collected from reduce() calls in the
>> state. Is there a way to access the reducing state of the window with
>> bravo? I'm a bit confused how this works, because when I check with
>> debugger, flink internally uses a ReducingStateDescriptor
>> with name=window-contents, but still reading operator state for
>> "DistinctFunction" didn't at least throw an exception ("window-contents"
>> threw – obviously there's no operator by that name).
>>
>> Cheers,
>> Juho
>>
>> On Mon, Oct 15, 2018 at 2:25 PM Juho Autio  wrote:
>>
>>> Hi Stefan,
>>>
>>> Sorry but it doesn't seem immediately clear to me what's a good way to
>>> use https://github.com/king/bravo.
>>>
>>> How are people using it? Would you for example modify build.gradle
>>> somehow to publish the bravo as a library locally/internally? Or add code
>>> directly in the bravo project (locally) and run it from there (using an
>>> IDE, for example)? Also it doesn't seem like the bravo gradle project
>>> supports building a flink job jar, but if it does, how do I do it?
>>>
>>> Thanks.
>>>
>>> On Thu, Oct 4, 2018 at 9:30 PM Juho Autio  wrote:
>>>
 Good then, I'll try to analyze the savepoints with Bravo. Thanks!

 > How would you assume that backpressure would influence your updates?
 Updates to each local state still happen event-by-event, in a single
 reader/writing thread.

 Sure, just an ignorant guess by me. I'm not familiar with most of
 Flink's internals. Any way high backpressure is not a seen on this job
 after it has caught up the lag, so at I thought it would be worth
 mentioning.

 On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter <
 s.rich...@data-artisans.com> wrote:

> Hi,
>
> Am 04.10.2018 um 16:08 schrieb Juho Autio :
>
> > you could take a look at Bravo [1] to query your savepoints and to
> check if the state in the savepoint complete w.r.t your expectations
>
> Thanks. I'm not 100% if this is the case, but to me it seemed like the
> missed ids were being logged by the reducer soon after the job had started
> (after restoring a savepoint). But on the other hand, after that I also
> made another savepoint & restored that, so what I could check is: does 
> that
> next savepoint have the missed ids that were logged (a couple of minutes
> before the 

Re: Data loss when restoring from savepoint

2018-11-19 Thread Juho Autio
Hi Stefan,

Bravo doesn't currently support reading a reducer state. I gave it a try
but couldn't get to a working implementation yet. If anyone can provide
some insight on how to make this work, please share at github:
https://github.com/king/bravo/pull/11

Thanks.

On Tue, Oct 23, 2018 at 3:32 PM Juho Autio  wrote:

> I was glad to find that bravo had now been updated to support installing
> bravo to a local maven repo.
>
> I was able to load a checkpoint created by my job, thanks to the example
> provided in bravo README, but I'm still missing the essential piece.
>
> My code was:
>
> OperatorStateReader reader = new OperatorStateReader(env2,
> savepoint, "DistinctFunction");
> DontKnowWhatTypeThisIs reducingState = reader.readKeyedStates(what
> should I put here?);
>
> I don't know how to read the values collected from reduce() calls in the
> state. Is there a way to access the reducing state of the window with
> bravo? I'm a bit confused how this works, because when I check with
> debugger, flink internally uses a ReducingStateDescriptor
> with name=window-contents, but still reading operator state for
> "DistinctFunction" didn't at least throw an exception ("window-contents"
> threw – obviously there's no operator by that name).
>
> Cheers,
> Juho
>
> On Mon, Oct 15, 2018 at 2:25 PM Juho Autio  wrote:
>
>> Hi Stefan,
>>
>> Sorry but it doesn't seem immediately clear to me what's a good way to
>> use https://github.com/king/bravo.
>>
>> How are people using it? Would you for example modify build.gradle
>> somehow to publish the bravo as a library locally/internally? Or add code
>> directly in the bravo project (locally) and run it from there (using an
>> IDE, for example)? Also it doesn't seem like the bravo gradle project
>> supports building a flink job jar, but if it does, how do I do it?
>>
>> Thanks.
>>
>> On Thu, Oct 4, 2018 at 9:30 PM Juho Autio  wrote:
>>
>>> Good then, I'll try to analyze the savepoints with Bravo. Thanks!
>>>
>>> > How would you assume that backpressure would influence your updates?
>>> Updates to each local state still happen event-by-event, in a single
>>> reader/writing thread.
>>>
>>> Sure, just an ignorant guess by me. I'm not familiar with most of
>>> Flink's internals. Any way high backpressure is not a seen on this job
>>> after it has caught up the lag, so at I thought it would be worth
>>> mentioning.
>>>
>>> On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter <
>>> s.rich...@data-artisans.com> wrote:
>>>
 Hi,

 Am 04.10.2018 um 16:08 schrieb Juho Autio :

 > you could take a look at Bravo [1] to query your savepoints and to
 check if the state in the savepoint complete w.r.t your expectations

 Thanks. I'm not 100% if this is the case, but to me it seemed like the
 missed ids were being logged by the reducer soon after the job had started
 (after restoring a savepoint). But on the other hand, after that I also
 made another savepoint & restored that, so what I could check is: does that
 next savepoint have the missed ids that were logged (a couple of minutes
 before the savepoint was created, so there should've been more than enough
 time to add them to the state before the savepoint was triggered) or not.
 Any way, if I would be able to verify with Bravo that the ids are missing
 from the savepoint (even though reduced logged that it saw them), would
 that help in figuring out where they are lost? Is there some major
 difference compared to just looking at the final output after window has
 been triggered?



 I think that makes a difference. For example, you can investigate if
 there is a state loss or a problem with the windowing. In the savepoint you
 could see which keys exists and to which windows they are assigned. Also
 just to make sure there is no misunderstanding: only elements that are in
 the state at the start of a savepoint are expected to be part of the
 savepoint; all elements between start and completion of the savepoint are
 not expected to be part of the savepoint.


 > I also doubt that the problem is about backpressure after restore,
 because the job will only continue running after the state restore is
 already completed.

 Yes, I'm not suspecting that the state restoring would be the problem
 either. My concern was about backpressure possibly messing with the updates
 of reducing state? I would tend to suspect that updating the state
 consistently is what fails, where heavy load / backpressure might be a
 factor.



 How would you assume that backpressure would influence your updates?
 Updates to each local state still happen event-by-event, in a single
 reader/writing thread.


 On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter <
 s.rich...@data-artisans.com> wrote:

> Hi,
>
> you could take a look at Bravo [1] to query 

Re: Data loss when restoring from savepoint

2018-10-23 Thread Juho Autio
I was glad to find that bravo had now been updated to support installing
bravo to a local maven repo.

I was able to load a checkpoint created by my job, thanks to the example
provided in bravo README, but I'm still missing the essential piece.

My code was:

OperatorStateReader reader = new OperatorStateReader(env2,
savepoint, "DistinctFunction");
DontKnowWhatTypeThisIs reducingState = reader.readKeyedStates(what
should I put here?);

I don't know how to read the values collected from reduce() calls in the
state. Is there a way to access the reducing state of the window with
bravo? I'm a bit confused how this works, because when I check with
debugger, flink internally uses a ReducingStateDescriptor
with name=window-contents, but still reading operator state for
"DistinctFunction" didn't at least throw an exception ("window-contents"
threw – obviously there's no operator by that name).

Cheers,
Juho

On Mon, Oct 15, 2018 at 2:25 PM Juho Autio  wrote:

> Hi Stefan,
>
> Sorry but it doesn't seem immediately clear to me what's a good way to use
> https://github.com/king/bravo.
>
> How are people using it? Would you for example modify build.gradle somehow
> to publish the bravo as a library locally/internally? Or add code directly
> in the bravo project (locally) and run it from there (using an IDE, for
> example)? Also it doesn't seem like the bravo gradle project supports
> building a flink job jar, but if it does, how do I do it?
>
> Thanks.
>
> On Thu, Oct 4, 2018 at 9:30 PM Juho Autio  wrote:
>
>> Good then, I'll try to analyze the savepoints with Bravo. Thanks!
>>
>> > How would you assume that backpressure would influence your updates?
>> Updates to each local state still happen event-by-event, in a single
>> reader/writing thread.
>>
>> Sure, just an ignorant guess by me. I'm not familiar with most of Flink's
>> internals. Any way high backpressure is not a seen on this job after it has
>> caught up the lag, so at I thought it would be worth mentioning.
>>
>> On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> Am 04.10.2018 um 16:08 schrieb Juho Autio :
>>>
>>> > you could take a look at Bravo [1] to query your savepoints and to
>>> check if the state in the savepoint complete w.r.t your expectations
>>>
>>> Thanks. I'm not 100% if this is the case, but to me it seemed like the
>>> missed ids were being logged by the reducer soon after the job had started
>>> (after restoring a savepoint). But on the other hand, after that I also
>>> made another savepoint & restored that, so what I could check is: does that
>>> next savepoint have the missed ids that were logged (a couple of minutes
>>> before the savepoint was created, so there should've been more than enough
>>> time to add them to the state before the savepoint was triggered) or not.
>>> Any way, if I would be able to verify with Bravo that the ids are missing
>>> from the savepoint (even though reduced logged that it saw them), would
>>> that help in figuring out where they are lost? Is there some major
>>> difference compared to just looking at the final output after window has
>>> been triggered?
>>>
>>>
>>>
>>> I think that makes a difference. For example, you can investigate if
>>> there is a state loss or a problem with the windowing. In the savepoint you
>>> could see which keys exists and to which windows they are assigned. Also
>>> just to make sure there is no misunderstanding: only elements that are in
>>> the state at the start of a savepoint are expected to be part of the
>>> savepoint; all elements between start and completion of the savepoint are
>>> not expected to be part of the savepoint.
>>>
>>>
>>> > I also doubt that the problem is about backpressure after restore,
>>> because the job will only continue running after the state restore is
>>> already completed.
>>>
>>> Yes, I'm not suspecting that the state restoring would be the problem
>>> either. My concern was about backpressure possibly messing with the updates
>>> of reducing state? I would tend to suspect that updating the state
>>> consistently is what fails, where heavy load / backpressure might be a
>>> factor.
>>>
>>>
>>>
>>> How would you assume that backpressure would influence your updates?
>>> Updates to each local state still happen event-by-event, in a single
>>> reader/writing thread.
>>>
>>>
>>> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter <
>>> s.rich...@data-artisans.com> wrote:
>>>
 Hi,

 you could take a look at Bravo [1] to query your savepoints and to
 check if the state in the savepoint complete w.r.t your expectations. I
 somewhat doubt that there is a general problem with the state/savepoints
 because many users are successfully running it on a large state and I am
 not aware of any data loss problems, but nothing is impossible. What the
 savepoint does is also straight forward: iterate a db snapshot and write
 all key/value pairs to disk, so 

Re: Data loss when restoring from savepoint

2018-10-15 Thread Juho Autio
Hi Stefan,

Sorry but it doesn't seem immediately clear to me what's a good way to use
https://github.com/king/bravo.

How are people using it? Would you for example modify build.gradle somehow
to publish the bravo as a library locally/internally? Or add code directly
in the bravo project (locally) and run it from there (using an IDE, for
example)? Also it doesn't seem like the bravo gradle project supports
building a flink job jar, but if it does, how do I do it?

Thanks.

On Thu, Oct 4, 2018 at 9:30 PM Juho Autio  wrote:

> Good then, I'll try to analyze the savepoints with Bravo. Thanks!
>
> > How would you assume that backpressure would influence your updates?
> Updates to each local state still happen event-by-event, in a single
> reader/writing thread.
>
> Sure, just an ignorant guess by me. I'm not familiar with most of Flink's
> internals. Any way high backpressure is not a seen on this job after it has
> caught up the lag, so at I thought it would be worth mentioning.
>
> On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter 
> wrote:
>
>> Hi,
>>
>> Am 04.10.2018 um 16:08 schrieb Juho Autio :
>>
>> > you could take a look at Bravo [1] to query your savepoints and to
>> check if the state in the savepoint complete w.r.t your expectations
>>
>> Thanks. I'm not 100% if this is the case, but to me it seemed like the
>> missed ids were being logged by the reducer soon after the job had started
>> (after restoring a savepoint). But on the other hand, after that I also
>> made another savepoint & restored that, so what I could check is: does that
>> next savepoint have the missed ids that were logged (a couple of minutes
>> before the savepoint was created, so there should've been more than enough
>> time to add them to the state before the savepoint was triggered) or not.
>> Any way, if I would be able to verify with Bravo that the ids are missing
>> from the savepoint (even though reduced logged that it saw them), would
>> that help in figuring out where they are lost? Is there some major
>> difference compared to just looking at the final output after window has
>> been triggered?
>>
>>
>>
>> I think that makes a difference. For example, you can investigate if
>> there is a state loss or a problem with the windowing. In the savepoint you
>> could see which keys exists and to which windows they are assigned. Also
>> just to make sure there is no misunderstanding: only elements that are in
>> the state at the start of a savepoint are expected to be part of the
>> savepoint; all elements between start and completion of the savepoint are
>> not expected to be part of the savepoint.
>>
>>
>> > I also doubt that the problem is about backpressure after restore,
>> because the job will only continue running after the state restore is
>> already completed.
>>
>> Yes, I'm not suspecting that the state restoring would be the problem
>> either. My concern was about backpressure possibly messing with the updates
>> of reducing state? I would tend to suspect that updating the state
>> consistently is what fails, where heavy load / backpressure might be a
>> factor.
>>
>>
>>
>> How would you assume that backpressure would influence your updates?
>> Updates to each local state still happen event-by-event, in a single
>> reader/writing thread.
>>
>>
>> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> you could take a look at Bravo [1] to query your savepoints and to check
>>> if the state in the savepoint complete w.r.t your expectations. I somewhat
>>> doubt that there is a general problem with the state/savepoints because
>>> many users are successfully running it on a large state and I am not aware
>>> of any data loss problems, but nothing is impossible. What the savepoint
>>> does is also straight forward: iterate a db snapshot and write all
>>> key/value pairs to disk, so all data that was in the db at the time of the
>>> savepoint, should show up. I also doubt that the problem is about
>>> backpressure after restore, because the job will only continue running
>>> after the state restore is already completed. Did you check if you are
>>> using exactly-one-semantics or at-least-once semantics? Also did you check
>>> that the kafka consumer start position is configured properly [2]? Are
>>> watermarks generated as expected after restore?
>>>
>>> One more unrelated high-level comment that I have: for a granularity of
>>> 24h windows, I wonder if it would not make sense to use a batch job instead?
>>>
>>> Best,
>>> Stefan
>>>
>>> [1] https://github.com/king/bravo
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>>
>>> Am 04.10.2018 um 14:53 schrieb Juho Autio :
>>>
>>> Thanks for the suggestions!
>>>
>>> > In general, it would be tremendously helpful to have a minimal working
>>> example which allows to reproduce the problem.
>>>
>>> Definitely. The problem with reproducing has 

Re: Data loss when restoring from savepoint

2018-10-04 Thread Stefan Richter
Hi,

> Am 04.10.2018 um 16:08 schrieb Juho Autio :
> 
> > you could take a look at Bravo [1] to query your savepoints and to check if 
> > the state in the savepoint complete w.r.t your expectations
> 
> Thanks. I'm not 100% if this is the case, but to me it seemed like the missed 
> ids were being logged by the reducer soon after the job had started (after 
> restoring a savepoint). But on the other hand, after that I also made another 
> savepoint & restored that, so what I could check is: does that next savepoint 
> have the missed ids that were logged (a couple of minutes before the 
> savepoint was created, so there should've been more than enough time to add 
> them to the state before the savepoint was triggered) or not. Any way, if I 
> would be able to verify with Bravo that the ids are missing from the 
> savepoint (even though reduced logged that it saw them), would that help in 
> figuring out where they are lost? Is there some major difference compared to 
> just looking at the final output after window has been triggered?


I think that makes a difference. For example, you can investigate if there is a 
state loss or a problem with the windowing. In the savepoint you could see 
which keys exists and to which windows they are assigned. Also just to make 
sure there is no misunderstanding: only elements that are in the state at the 
start of a savepoint are expected to be part of the savepoint; all elements 
between start and completion of the savepoint are not expected to be part of 
the savepoint.

> 
> > I also doubt that the problem is about backpressure after restore, because 
> > the job will only continue running after the state restore is already 
> > completed.
> 
> Yes, I'm not suspecting that the state restoring would be the problem either. 
> My concern was about backpressure possibly messing with the updates of 
> reducing state? I would tend to suspect that updating the state consistently 
> is what fails, where heavy load / backpressure might be a factor.


How would you assume that backpressure would influence your updates? Updates to 
each local state still happen event-by-event, in a single reader/writing thread.

> 
> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter  > wrote:
> Hi,
> 
> you could take a look at Bravo [1] to query your savepoints and to check if 
> the state in the savepoint complete w.r.t your expectations. I somewhat doubt 
> that there is a general problem with the state/savepoints because many users 
> are successfully running it on a large state and I am not aware of any data 
> loss problems, but nothing is impossible. What the savepoint does is also 
> straight forward: iterate a db snapshot and write all key/value pairs to 
> disk, so all data that was in the db at the time of the savepoint, should 
> show up. I also doubt that the problem is about backpressure after restore, 
> because the job will only continue running after the state restore is already 
> completed. Did you check if you are using exactly-one-semantics or 
> at-least-once semantics? Also did you check that the kafka consumer start 
> position is configured properly [2]? Are watermarks generated as expected 
> after restore?
> 
> One more unrelated high-level comment that I have: for a granularity of 24h 
> windows, I wonder if it would not make sense to use a batch job instead?
> 
> Best,
> Stefan
> 
> [1] https://github.com/king/bravo 
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>  
> 
> 
>> Am 04.10.2018 um 14:53 schrieb Juho Autio > >:
>> 
>> Thanks for the suggestions!
>> 
>> > In general, it would be tremendously helpful to have a minimal working 
>> > example which allows to reproduce the problem.
>> 
>> Definitely. The problem with reproducing has been that this only seems to 
>> happen in the bigger production data volumes.
>> 
>> That's why I'm hoping to find a way to debug this with the production data. 
>> With that it seems to consistently cause some misses every time the job is 
>> killed/restored.
>> 
>> > check if it happens for shorter windows, like 1h etc
>> 
>> What would be the benefit of that compared to 24h window?
>> 
>> > simplify the job to not use a reduce window but simply a time window which 
>> > outputs the window events. Then counting the input and output events 
>> > should allow you to verify the results. If you are not seeing missing 
>> > events, then it could have something to do with the reducing state used in 
>> > the reduce function.
>> 
>> Hm, maybe, but not sure how useful that would be, because it wouldn't yet 
>> prove that it's related to reducing, because not having a reduce function 
>> could also mean smaller load on the job, which 

Re: Data loss when restoring from savepoint

2018-10-04 Thread Juho Autio
> you could take a look at Bravo [1] to query your savepoints and to check
if the state in the savepoint complete w.r.t your expectations

Thanks. I'm not 100% if this is the case, but to me it seemed like the
missed ids were being logged by the reducer soon after the job had started
(after restoring a savepoint). But on the other hand, after that I also
made another savepoint & restored that, so what I could check is: does that
next savepoint have the missed ids that were logged (a couple of minutes
before the savepoint was created, so there should've been more than enough
time to add them to the state before the savepoint was triggered) or not.
Any way, if I would be able to verify with Bravo that the ids are missing
from the savepoint (even though reduced logged that it saw them), would
that help in figuring out where they are lost? Is there some major
difference compared to just looking at the final output after window has
been triggered?

> I also doubt that the problem is about backpressure after restore,
because the job will only continue running after the state restore is
already completed.

Yes, I'm not suspecting that the state restoring would be the problem
either. My concern was about backpressure possibly messing with the updates
of reducing state? I would tend to suspect that updating the state
consistently is what fails, where heavy load / backpressure might be a
factor.

> One more unrelated high-level comment that I have: for a granularity of
24h windows, I wonder if it would not make sense to use a batch job instead?

We already have a batch version that's in use currently. The reason for
trying to have this as a stream job is to minimize the delay of the
results. But we can't afford missing anything in this case, and Flink
should be able to handle that – please correct me if this is a wrong
assumption.

> Did you check if you are using exactly-one-semantics or at-least-once
semantics?

I couldn't even find how at-least-once would be set in Flink. We must be
using exactly-once, as it's the default. At least once would be ok for this
particular job, but for some other jobs maybe not, so I'd rather get to the
bottom of this problem without switching to at-least-once configuration.

> Also did you check that the kafka consumer start position is configured
properly [2]?

I wonder what you mean by configuring properly? Because when restoring we
don't set it at all. Well, the default is "no reset", obviously. We just
create FlinkKafkaConsumer and let it restore offsets from the savepoint
state. Originally the job has been started with
consumer.setStartFromLatest(), but that shouldn't really matter now.

> Are watermarks generated as expected after restore?

Watermarks are generated normally as far as I can see. If they would be
stuck, for example, the window wouldn't be triggered eventually. Maybe I'm
missing what the question is about(?).

Thank you once more for helping with this!

On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter 
wrote:

> Hi,
>
> you could take a look at Bravo [1] to query your savepoints and to check
> if the state in the savepoint complete w.r.t your expectations. I somewhat
> doubt that there is a general problem with the state/savepoints because
> many users are successfully running it on a large state and I am not aware
> of any data loss problems, but nothing is impossible. What the savepoint
> does is also straight forward: iterate a db snapshot and write all
> key/value pairs to disk, so all data that was in the db at the time of the
> savepoint, should show up. I also doubt that the problem is about
> backpressure after restore, because the job will only continue running
> after the state restore is already completed. Did you check if you are
> using exactly-one-semantics or at-least-once semantics? Also did you check
> that the kafka consumer start position is configured properly [2]? Are
> watermarks generated as expected after restore?
>
> One more unrelated high-level comment that I have: for a granularity of
> 24h windows, I wonder if it would not make sense to use a batch job instead?
>
> Best,
> Stefan
>
> [1] https://github.com/king/bravo
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>
> Am 04.10.2018 um 14:53 schrieb Juho Autio :
>
> Thanks for the suggestions!
>
> > In general, it would be tremendously helpful to have a minimal working
> example which allows to reproduce the problem.
>
> Definitely. The problem with reproducing has been that this only seems to
> happen in the bigger production data volumes.
>
> That's why I'm hoping to find a way to debug this with the production
> data. With that it seems to consistently cause some misses every time the
> job is killed/restored.
>
> > check if it happens for shorter windows, like 1h etc
>
> What would be the benefit of that compared to 24h window?
>
> > simplify the job to not use a reduce window but simply a time 

Re: Data loss when restoring from savepoint

2018-10-04 Thread Stefan Richter
Hi,

you could take a look at Bravo [1] to query your savepoints and to check if the 
state in the savepoint complete w.r.t your expectations. I somewhat doubt that 
there is a general problem with the state/savepoints because many users are 
successfully running it on a large state and I am not aware of any data loss 
problems, but nothing is impossible. What the savepoint does is also straight 
forward: iterate a db snapshot and write all key/value pairs to disk, so all 
data that was in the db at the time of the savepoint, should show up. I also 
doubt that the problem is about backpressure after restore, because the job 
will only continue running after the state restore is already completed. Did 
you check if you are using exactly-one-semantics or at-least-once semantics? 
Also did you check that the kafka consumer start position is configured 
properly [2]? Are watermarks generated as expected after restore?

One more unrelated high-level comment that I have: for a granularity of 24h 
windows, I wonder if it would not make sense to use a batch job instead?

Best,
Stefan

[1] https://github.com/king/bravo
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
 


> Am 04.10.2018 um 14:53 schrieb Juho Autio :
> 
> Thanks for the suggestions!
> 
> > In general, it would be tremendously helpful to have a minimal working 
> > example which allows to reproduce the problem.
> 
> Definitely. The problem with reproducing has been that this only seems to 
> happen in the bigger production data volumes.
> 
> That's why I'm hoping to find a way to debug this with the production data. 
> With that it seems to consistently cause some misses every time the job is 
> killed/restored.
> 
> > check if it happens for shorter windows, like 1h etc
> 
> What would be the benefit of that compared to 24h window?
> 
> > simplify the job to not use a reduce window but simply a time window which 
> > outputs the window events. Then counting the input and output events should 
> > allow you to verify the results. If you are not seeing missing events, then 
> > it could have something to do with the reducing state used in the reduce 
> > function.
> 
> Hm, maybe, but not sure how useful that would be, because it wouldn't yet 
> prove that it's related to reducing, because not having a reduce function 
> could also mean smaller load on the job, which might alone be enough to make 
> the problem not manifest.
> 
> Is there a way to debug what goes into the reducing state (including what 
> gets removed or overwritten and what restored), if that makes sense..? Maybe 
> some suitable logging could be used to prove that the lost data is written to 
> the reducing state (or at least asked to be written), but not found any more 
> when the window closes and state is flushed?
> 
> On configuration once more, we're using RocksDB state backend with 
> asynchronous incremental checkpointing. The state is restored from savepoints 
> though, we haven't been using those checkpoints in these tests (although they 
> could be used in case of crashes – but we haven't had those now).
> 
> On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann  > wrote:
> Hi Juho,
> 
> another idea to further narrow down the problem could be to simplify the job 
> to not use a reduce window but simply a time window which outputs the window 
> events. Then counting the input and output events should allow you to verify 
> the results. If you are not seeing missing events, then it could have 
> something to do with the reducing state used in the reduce function.
> 
> In general, it would be tremendously helpful to have a minimal working 
> example which allows to reproduce the problem.
> 
> Cheers,
> Till
> 
> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin  > wrote:
> Hi Juho,
> 
> can you try to reduce the job to minimal reproducible example and share the 
> job and input?
> 
> For example:
> - some simple records as input, e.g. tuples of primitive types saved as cvs
> - minimal deduplication job which processes them and misses records
> - check if it happens for shorter windows, like 1h etc
> - setup which you use for the job, ideally locally reproducible or cloud
> 
> Best,
> Andrey
> 
>> On 4 Oct 2018, at 11:13, Juho Autio > > wrote:
>> 
>> Sorry to insist, but we seem to be blocked for any serious usage of state in 
>> Flink if we can't rely on it to not miss data in case of restore.
>> 
>> Would anyone have suggestions for how to troubleshoot this? So far I have 
>> verified with DEBUG logs that our reduce function gets to process also the 
>> data that is missing from window output.
>> 
>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio > > wrote:
>> Hi 

Re: Data loss when restoring from savepoint

2018-10-04 Thread Juho Autio
Thanks for the suggestions!

> In general, it would be tremendously helpful to have a minimal working
example which allows to reproduce the problem.

Definitely. The problem with reproducing has been that this only seems to
happen in the bigger production data volumes.

That's why I'm hoping to find a way to debug this with the production data.
With that it seems to consistently cause some misses every time the job is
killed/restored.

> check if it happens for shorter windows, like 1h etc

What would be the benefit of that compared to 24h window?

> simplify the job to not use a reduce window but simply a time window
which outputs the window events. Then counting the input and output events
should allow you to verify the results. If you are not seeing missing
events, then it could have something to do with the reducing state used in
the reduce function.

Hm, maybe, but not sure how useful that would be, because it wouldn't yet
prove that it's related to reducing, because not having a reduce function
could also mean smaller load on the job, which might alone be enough to
make the problem not manifest.

Is there a way to debug what goes into the reducing state (including what
gets removed or overwritten and what restored), if that makes sense..?
Maybe some suitable logging could be used to prove that the lost data is
written to the reducing state (or at least asked to be written), but not
found any more when the window closes and state is flushed?

On configuration once more, we're using RocksDB state backend with
asynchronous incremental checkpointing. The state is restored from
savepoints though, we haven't been using those checkpoints in these tests
(although they could be used in case of crashes – but we haven't had those
now).

On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann  wrote:

> Hi Juho,
>
> another idea to further narrow down the problem could be to simplify the
> job to not use a reduce window but simply a time window which outputs the
> window events. Then counting the input and output events should allow you
> to verify the results. If you are not seeing missing events, then it could
> have something to do with the reducing state used in the reduce function.
>
> In general, it would be tremendously helpful to have a minimal working
> example which allows to reproduce the problem.
>
> Cheers,
> Till
>
> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin 
> wrote:
>
>> Hi Juho,
>>
>> can you try to reduce the job to minimal reproducible example and share
>> the job and input?
>>
>> For example:
>> - some simple records as input, e.g. tuples of primitive types saved as
>> cvs
>> - minimal deduplication job which processes them and misses records
>> - check if it happens for shorter windows, like 1h etc
>> - setup which you use for the job, ideally locally reproducible or cloud
>>
>> Best,
>> Andrey
>>
>> On 4 Oct 2018, at 11:13, Juho Autio  wrote:
>>
>> Sorry to insist, but we seem to be blocked for any serious usage of state
>> in Flink if we can't rely on it to not miss data in case of restore.
>>
>> Would anyone have suggestions for how to troubleshoot this? So far I have
>> verified with DEBUG logs that our reduce function gets to process also the
>> data that is missing from window output.
>>
>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio  wrote:
>>
>>> Hi Andrey,
>>>
>>> To rule out for good any questions about sink behaviour, the job was
>>> killed and started with an additional Kafka sink.
>>>
>>> The same number of ids were missed in both outputs: KafkaSink &
>>> BucketingSink.
>>>
>>> I wonder what would be the next steps in debugging?
>>>
>>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio  wrote:
>>>
 Thanks, Andrey.

 > so it means that the savepoint does not loose at least some dropped
 records.

 I'm not sure what you mean by that? I mean, it was known from the
 beginning, that not everything is lost before/after restoring a savepoint,
 just some records around the time of restoration. It's not 100% clear
 whether records are lost before making a savepoint or after restoring it.
 Although, based on the new DEBUG logs it seems more like losing some
 records that are seen ~soon after restoring. It seems like Flink would be
 somehow confused either about the restored state vs. new inserts to state.
 This could also be somehow linked to the high back pressure on the kafka
 source while the stream is catching up.

 > If it is feasible for your setup, I suggest to insert one more map
 function after reduce and before sink.
 > etc.

 Isn't that the same thing that we discussed before? Nothing is sent to
 BucketingSink before the window closes, so I don't see how it would make
 any difference if we replace the BucketingSink with a map function or
 another sink type. We don't create or restore savepoints during the time
 when BucketingSink gets input or has open buckets – that happens at a much
 later 

Re: Data loss when restoring from savepoint

2018-10-04 Thread Till Rohrmann
Hi Juho,

another idea to further narrow down the problem could be to simplify the
job to not use a reduce window but simply a time window which outputs the
window events. Then counting the input and output events should allow you
to verify the results. If you are not seeing missing events, then it could
have something to do with the reducing state used in the reduce function.

In general, it would be tremendously helpful to have a minimal working
example which allows to reproduce the problem.

Cheers,
Till

On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin 
wrote:

> Hi Juho,
>
> can you try to reduce the job to minimal reproducible example and share
> the job and input?
>
> For example:
> - some simple records as input, e.g. tuples of primitive types saved as cvs
> - minimal deduplication job which processes them and misses records
> - check if it happens for shorter windows, like 1h etc
> - setup which you use for the job, ideally locally reproducible or cloud
>
> Best,
> Andrey
>
> On 4 Oct 2018, at 11:13, Juho Autio  wrote:
>
> Sorry to insist, but we seem to be blocked for any serious usage of state
> in Flink if we can't rely on it to not miss data in case of restore.
>
> Would anyone have suggestions for how to troubleshoot this? So far I have
> verified with DEBUG logs that our reduce function gets to process also the
> data that is missing from window output.
>
> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio  wrote:
>
>> Hi Andrey,
>>
>> To rule out for good any questions about sink behaviour, the job was
>> killed and started with an additional Kafka sink.
>>
>> The same number of ids were missed in both outputs: KafkaSink &
>> BucketingSink.
>>
>> I wonder what would be the next steps in debugging?
>>
>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio  wrote:
>>
>>> Thanks, Andrey.
>>>
>>> > so it means that the savepoint does not loose at least some dropped
>>> records.
>>>
>>> I'm not sure what you mean by that? I mean, it was known from the
>>> beginning, that not everything is lost before/after restoring a savepoint,
>>> just some records around the time of restoration. It's not 100% clear
>>> whether records are lost before making a savepoint or after restoring it.
>>> Although, based on the new DEBUG logs it seems more like losing some
>>> records that are seen ~soon after restoring. It seems like Flink would be
>>> somehow confused either about the restored state vs. new inserts to state.
>>> This could also be somehow linked to the high back pressure on the kafka
>>> source while the stream is catching up.
>>>
>>> > If it is feasible for your setup, I suggest to insert one more map
>>> function after reduce and before sink.
>>> > etc.
>>>
>>> Isn't that the same thing that we discussed before? Nothing is sent to
>>> BucketingSink before the window closes, so I don't see how it would make
>>> any difference if we replace the BucketingSink with a map function or
>>> another sink type. We don't create or restore savepoints during the time
>>> when BucketingSink gets input or has open buckets – that happens at a much
>>> later time of day. I would focus on figuring out why the records are lost
>>> while the window is open. But I don't know how to do that. Would you have
>>> any additional suggestions?
>>>
>>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin <
>>> and...@data-artisans.com> wrote:
>>>
 Hi Juho,

 so it means that the savepoint does not loose at least some dropped
 records.

 If it is feasible for your setup, I suggest to insert one more map
 function after reduce and before sink.
 The map function should be called right after window is triggered but
 before flushing to s3.
 The result of reduce (deduped record) could be logged there.
 This should allow to check whether the processed distinct records were
 buffered in the state after the restoration from the savepoint or not. If
 they were buffered we should see that there was an attempt to write them to
 the sink from the state.

 Another suggestion is to try to write records to some other sink or to
 both.
 E.g. if you can access file system of workers, maybe just into local
 files and check whether the records are also dropped there.

 Best,
 Andrey

 On 20 Sep 2018, at 15:37, Juho Autio  wrote:

 Hi Andrey!

 I was finally able to gather the DEBUG logs that you suggested. In
 short, the reducer logged that it processed at least some of the ids that
 were missing from the output.

 "At least some", because I didn't have the job running with DEBUG logs
 for the full 24-hour window period. So I was only able to look up if I can
 find *some* of the missing ids in the DEBUG logs. Which I did indeed.

 I changed the DistinctFunction.java to do this:

 @Override
 public Map reduce(Map value1,
 Map value2) {
 LOG.debug("DistinctFunction.reduce returns: {}={}",
 

Re: Data loss when restoring from savepoint

2018-10-04 Thread Andrey Zagrebin
Hi Juho,

can you try to reduce the job to minimal reproducible example and share the job 
and input?

For example:
- some simple records as input, e.g. tuples of primitive types saved as cvs
- minimal deduplication job which processes them and misses records
- check if it happens for shorter windows, like 1h etc
- setup which you use for the job, ideally locally reproducible or cloud

Best,
Andrey

> On 4 Oct 2018, at 11:13, Juho Autio  wrote:
> 
> Sorry to insist, but we seem to be blocked for any serious usage of state in 
> Flink if we can't rely on it to not miss data in case of restore.
> 
> Would anyone have suggestions for how to troubleshoot this? So far I have 
> verified with DEBUG logs that our reduce function gets to process also the 
> data that is missing from window output.
> 
> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio  > wrote:
> Hi Andrey,
> 
> To rule out for good any questions about sink behaviour, the job was killed 
> and started with an additional Kafka sink.
> 
> The same number of ids were missed in both outputs: KafkaSink & BucketingSink.
> 
> I wonder what would be the next steps in debugging?
> 
> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio  > wrote:
> Thanks, Andrey.
> 
> > so it means that the savepoint does not loose at least some dropped records.
> 
> I'm not sure what you mean by that? I mean, it was known from the beginning, 
> that not everything is lost before/after restoring a savepoint, just some 
> records around the time of restoration. It's not 100% clear whether records 
> are lost before making a savepoint or after restoring it. Although, based on 
> the new DEBUG logs it seems more like losing some records that are seen ~soon 
> after restoring. It seems like Flink would be somehow confused either about 
> the restored state vs. new inserts to state. This could also be somehow 
> linked to the high back pressure on the kafka source while the stream is 
> catching up.
> 
> > If it is feasible for your setup, I suggest to insert one more map function 
> > after reduce and before sink.
> > etc.
> 
> Isn't that the same thing that we discussed before? Nothing is sent to 
> BucketingSink before the window closes, so I don't see how it would make any 
> difference if we replace the BucketingSink with a map function or another 
> sink type. We don't create or restore savepoints during the time when 
> BucketingSink gets input or has open buckets – that happens at a much later 
> time of day. I would focus on figuring out why the records are lost while the 
> window is open. But I don't know how to do that. Would you have any 
> additional suggestions?
> 
> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin  > wrote:
> Hi Juho,
> 
> so it means that the savepoint does not loose at least some dropped records.
> 
> If it is feasible for your setup, I suggest to insert one more map function 
> after reduce and before sink. 
> The map function should be called right after window is triggered but before 
> flushing to s3.
> The result of reduce (deduped record) could be logged there.
> This should allow to check whether the processed distinct records were 
> buffered in the state after the restoration from the savepoint or not. If 
> they were buffered we should see that there was an attempt to write them to 
> the sink from the state.
> 
> Another suggestion is to try to write records to some other sink or to both. 
> E.g. if you can access file system of workers, maybe just into local files 
> and check whether the records are also dropped there.
> 
> Best,
> Andrey
> 
>> On 20 Sep 2018, at 15:37, Juho Autio > > wrote:
>> 
>> Hi Andrey!
>> 
>> I was finally able to gather the DEBUG logs that you suggested. In short, 
>> the reducer logged that it processed at least some of the ids that were 
>> missing from the output.
>> 
>> "At least some", because I didn't have the job running with DEBUG logs for 
>> the full 24-hour window period. So I was only able to look up if I can find 
>> some of the missing ids in the DEBUG logs. Which I did indeed.
>> 
>> I changed the DistinctFunction.java to do this:
>> 
>> @Override
>> public Map reduce(Map value1, 
>> Map value2) {
>> LOG.debug("DistinctFunction.reduce returns: {}={}", 
>> value1.get("field"), value1.get("id"));
>> return value1;
>> }
>> 
>> Then:
>> 
>> vi flink-1.6.0/conf/log4j.properties
>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
>> 
>> Then I ran the following kind of test:
>> 
>> - Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC 2018
>> - Started a new cluster & job with DEBUG enabled at ~09:13, restored from 
>> that previous cluster's savepoint
>> - Ran until caught up offsets
>> - Cancelled the job with a new savepoint
>> - Started a new job _without_ 

Re: Data loss when restoring from savepoint

2018-10-04 Thread Juho Autio
Sorry to insist, but we seem to be blocked for any serious usage of state
in Flink if we can't rely on it to not miss data in case of restore.

Would anyone have suggestions for how to troubleshoot this? So far I have
verified with DEBUG logs that our reduce function gets to process also the
data that is missing from window output.

On Mon, Oct 1, 2018 at 11:56 AM Juho Autio  wrote:

> Hi Andrey,
>
> To rule out for good any questions about sink behaviour, the job was
> killed and started with an additional Kafka sink.
>
> The same number of ids were missed in both outputs: KafkaSink &
> BucketingSink.
>
> I wonder what would be the next steps in debugging?
>
> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio  wrote:
>
>> Thanks, Andrey.
>>
>> > so it means that the savepoint does not loose at least some dropped
>> records.
>>
>> I'm not sure what you mean by that? I mean, it was known from the
>> beginning, that not everything is lost before/after restoring a savepoint,
>> just some records around the time of restoration. It's not 100% clear
>> whether records are lost before making a savepoint or after restoring it.
>> Although, based on the new DEBUG logs it seems more like losing some
>> records that are seen ~soon after restoring. It seems like Flink would be
>> somehow confused either about the restored state vs. new inserts to state.
>> This could also be somehow linked to the high back pressure on the kafka
>> source while the stream is catching up.
>>
>> > If it is feasible for your setup, I suggest to insert one more map
>> function after reduce and before sink.
>> > etc.
>>
>> Isn't that the same thing that we discussed before? Nothing is sent to
>> BucketingSink before the window closes, so I don't see how it would make
>> any difference if we replace the BucketingSink with a map function or
>> another sink type. We don't create or restore savepoints during the time
>> when BucketingSink gets input or has open buckets – that happens at a much
>> later time of day. I would focus on figuring out why the records are lost
>> while the window is open. But I don't know how to do that. Would you have
>> any additional suggestions?
>>
>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin 
>> wrote:
>>
>>> Hi Juho,
>>>
>>> so it means that the savepoint does not loose at least some dropped
>>> records.
>>>
>>> If it is feasible for your setup, I suggest to insert one more map
>>> function after reduce and before sink.
>>> The map function should be called right after window is triggered but
>>> before flushing to s3.
>>> The result of reduce (deduped record) could be logged there.
>>> This should allow to check whether the processed distinct records were
>>> buffered in the state after the restoration from the savepoint or not. If
>>> they were buffered we should see that there was an attempt to write them to
>>> the sink from the state.
>>>
>>> Another suggestion is to try to write records to some other sink or to
>>> both.
>>> E.g. if you can access file system of workers, maybe just into local
>>> files and check whether the records are also dropped there.
>>>
>>> Best,
>>> Andrey
>>>
>>> On 20 Sep 2018, at 15:37, Juho Autio  wrote:
>>>
>>> Hi Andrey!
>>>
>>> I was finally able to gather the DEBUG logs that you suggested. In
>>> short, the reducer logged that it processed at least some of the ids that
>>> were missing from the output.
>>>
>>> "At least some", because I didn't have the job running with DEBUG logs
>>> for the full 24-hour window period. So I was only able to look up if I can
>>> find *some* of the missing ids in the DEBUG logs. Which I did indeed.
>>>
>>> I changed the DistinctFunction.java to do this:
>>>
>>> @Override
>>> public Map reduce(Map value1,
>>> Map value2) {
>>> LOG.debug("DistinctFunction.reduce returns: {}={}",
>>> value1.get("field"), value1.get("id"));
>>> return value1;
>>> }
>>>
>>> Then:
>>>
>>> vi flink-1.6.0/conf/log4j.properties
>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
>>>
>>> Then I ran the following kind of test:
>>>
>>> - Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC
>>> 2018
>>> - Started a new cluster & job with DEBUG enabled at ~09:13, restored
>>> from that previous cluster's savepoint
>>> - Ran until caught up offsets
>>> - Cancelled the job with a new savepoint
>>> - Started a new job _without_ DEBUG, which restored the new savepoint,
>>> let it keep running so that it will eventually write the output
>>>
>>> Then on the next day, after results had been flushed when the 24-hour
>>> window closed, I compared the results again with a batch version's output.
>>> And found some missing ids as usual.
>>>
>>> I drilled down to one specific missing id (I'm replacing the actual
>>> value with AN12345 below), which was not found in the stream output, but
>>> was found in batch output & flink DEBUG logs.
>>>
>>> Related 

Re: Data loss when restoring from savepoint

2018-10-01 Thread Juho Autio
Hi Andrey,

To rule out for good any questions about sink behaviour, the job was killed
and started with an additional Kafka sink.

The same number of ids were missed in both outputs: KafkaSink &
BucketingSink.

I wonder what would be the next steps in debugging?

On Fri, Sep 21, 2018 at 3:49 PM Juho Autio  wrote:

> Thanks, Andrey.
>
> > so it means that the savepoint does not loose at least some dropped
> records.
>
> I'm not sure what you mean by that? I mean, it was known from the
> beginning, that not everything is lost before/after restoring a savepoint,
> just some records around the time of restoration. It's not 100% clear
> whether records are lost before making a savepoint or after restoring it.
> Although, based on the new DEBUG logs it seems more like losing some
> records that are seen ~soon after restoring. It seems like Flink would be
> somehow confused either about the restored state vs. new inserts to state.
> This could also be somehow linked to the high back pressure on the kafka
> source while the stream is catching up.
>
> > If it is feasible for your setup, I suggest to insert one more map
> function after reduce and before sink.
> > etc.
>
> Isn't that the same thing that we discussed before? Nothing is sent to
> BucketingSink before the window closes, so I don't see how it would make
> any difference if we replace the BucketingSink with a map function or
> another sink type. We don't create or restore savepoints during the time
> when BucketingSink gets input or has open buckets – that happens at a much
> later time of day. I would focus on figuring out why the records are lost
> while the window is open. But I don't know how to do that. Would you have
> any additional suggestions?
>
> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin 
> wrote:
>
>> Hi Juho,
>>
>> so it means that the savepoint does not loose at least some dropped
>> records.
>>
>> If it is feasible for your setup, I suggest to insert one more map
>> function after reduce and before sink.
>> The map function should be called right after window is triggered but
>> before flushing to s3.
>> The result of reduce (deduped record) could be logged there.
>> This should allow to check whether the processed distinct records were
>> buffered in the state after the restoration from the savepoint or not. If
>> they were buffered we should see that there was an attempt to write them to
>> the sink from the state.
>>
>> Another suggestion is to try to write records to some other sink or to
>> both.
>> E.g. if you can access file system of workers, maybe just into local
>> files and check whether the records are also dropped there.
>>
>> Best,
>> Andrey
>>
>> On 20 Sep 2018, at 15:37, Juho Autio  wrote:
>>
>> Hi Andrey!
>>
>> I was finally able to gather the DEBUG logs that you suggested. In short,
>> the reducer logged that it processed at least some of the ids that were
>> missing from the output.
>>
>> "At least some", because I didn't have the job running with DEBUG logs
>> for the full 24-hour window period. So I was only able to look up if I can
>> find *some* of the missing ids in the DEBUG logs. Which I did indeed.
>>
>> I changed the DistinctFunction.java to do this:
>>
>> @Override
>> public Map reduce(Map value1,
>> Map value2) {
>> LOG.debug("DistinctFunction.reduce returns: {}={}",
>> value1.get("field"), value1.get("id"));
>> return value1;
>> }
>>
>> Then:
>>
>> vi flink-1.6.0/conf/log4j.properties
>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
>>
>> Then I ran the following kind of test:
>>
>> - Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC
>> 2018
>> - Started a new cluster & job with DEBUG enabled at ~09:13, restored from
>> that previous cluster's savepoint
>> - Ran until caught up offsets
>> - Cancelled the job with a new savepoint
>> - Started a new job _without_ DEBUG, which restored the new savepoint,
>> let it keep running so that it will eventually write the output
>>
>> Then on the next day, after results had been flushed when the 24-hour
>> window closed, I compared the results again with a batch version's output.
>> And found some missing ids as usual.
>>
>> I drilled down to one specific missing id (I'm replacing the actual value
>> with AN12345 below), which was not found in the stream output, but was
>> found in batch output & flink DEBUG logs.
>>
>> Related to that id, I gathered the following information:
>>
>> 2018-09-18~09:13:21,000 job started & savepoint is restored
>>
>> 2018-09-18 09:14:29,085 missing id is processed for the first time,
>> proved by this log line:
>> 2018-09-18 09:14:29,085 DEBUG
>> com.rovio.ds.flink.uniqueid.DistinctFunction  -
>> DistinctFunction.reduce returns: s.aid1=AN12345
>>
>> 2018-09-18 09:15:14,264 first synchronous part of checkpoint
>> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint
>>
>> 

Re: Data loss when restoring from savepoint

2018-09-21 Thread Juho Autio
Thanks, Andrey.

> so it means that the savepoint does not loose at least some dropped
records.

I'm not sure what you mean by that? I mean, it was known from the
beginning, that not everything is lost before/after restoring a savepoint,
just some records around the time of restoration. It's not 100% clear
whether records are lost before making a savepoint or after restoring it.
Although, based on the new DEBUG logs it seems more like losing some
records that are seen ~soon after restoring. It seems like Flink would be
somehow confused either about the restored state vs. new inserts to state.
This could also be somehow linked to the high back pressure on the kafka
source while the stream is catching up.

> If it is feasible for your setup, I suggest to insert one more map
function after reduce and before sink.
> etc.

Isn't that the same thing that we discussed before? Nothing is sent to
BucketingSink before the window closes, so I don't see how it would make
any difference if we replace the BucketingSink with a map function or
another sink type. We don't create or restore savepoints during the time
when BucketingSink gets input or has open buckets – that happens at a much
later time of day. I would focus on figuring out why the records are lost
while the window is open. But I don't know how to do that. Would you have
any additional suggestions?

On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin 
wrote:

> Hi Juho,
>
> so it means that the savepoint does not loose at least some dropped
> records.
>
> If it is feasible for your setup, I suggest to insert one more map
> function after reduce and before sink.
> The map function should be called right after window is triggered but
> before flushing to s3.
> The result of reduce (deduped record) could be logged there.
> This should allow to check whether the processed distinct records were
> buffered in the state after the restoration from the savepoint or not. If
> they were buffered we should see that there was an attempt to write them to
> the sink from the state.
>
> Another suggestion is to try to write records to some other sink or to
> both.
> E.g. if you can access file system of workers, maybe just into local files
> and check whether the records are also dropped there.
>
> Best,
> Andrey
>
> On 20 Sep 2018, at 15:37, Juho Autio  wrote:
>
> Hi Andrey!
>
> I was finally able to gather the DEBUG logs that you suggested. In short,
> the reducer logged that it processed at least some of the ids that were
> missing from the output.
>
> "At least some", because I didn't have the job running with DEBUG logs for
> the full 24-hour window period. So I was only able to look up if I can find
> *some* of the missing ids in the DEBUG logs. Which I did indeed.
>
> I changed the DistinctFunction.java to do this:
>
> @Override
> public Map reduce(Map value1,
> Map value2) {
> LOG.debug("DistinctFunction.reduce returns: {}={}",
> value1.get("field"), value1.get("id"));
> return value1;
> }
>
> Then:
>
> vi flink-1.6.0/conf/log4j.properties
> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
>
> Then I ran the following kind of test:
>
> - Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC
> 2018
> - Started a new cluster & job with DEBUG enabled at ~09:13, restored from
> that previous cluster's savepoint
> - Ran until caught up offsets
> - Cancelled the job with a new savepoint
> - Started a new job _without_ DEBUG, which restored the new savepoint, let
> it keep running so that it will eventually write the output
>
> Then on the next day, after results had been flushed when the 24-hour
> window closed, I compared the results again with a batch version's output.
> And found some missing ids as usual.
>
> I drilled down to one specific missing id (I'm replacing the actual value
> with AN12345 below), which was not found in the stream output, but was
> found in batch output & flink DEBUG logs.
>
> Related to that id, I gathered the following information:
>
> 2018-09-18~09:13:21,000 job started & savepoint is restored
>
> 2018-09-18 09:14:29,085 missing id is processed for the first time, proved
> by this log line:
> 2018-09-18 09:14:29,085 DEBUG
> com.rovio.ds.flink.uniqueid.DistinctFunction  -
> DistinctFunction.reduce returns: s.aid1=AN12345
>
> 2018-09-18 09:15:14,264 first synchronous part of checkpoint
> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint
>
> (
> more occurrences of checkpoints (~1 min checkpointing time + ~1 min delay
> before next)
> /
> more occurrences of DistinctFunction.reduce
> )
>
> 2018-09-18 09:23:45,053 missing id is processed for the last time
>
> 2018-09-18~10:20:00,000 savepoint created & job cancelled
>
> To be noted, there was high backpressure after restoring from savepoint
> until the stream caught up with the kafka offsets. Although, our job uses
> assign timestamps & 

Re: Data loss when restoring from savepoint

2018-09-21 Thread Andrey Zagrebin
Hi Juho,

so it means that the savepoint does not loose at least some dropped records.

If it is feasible for your setup, I suggest to insert one more map function 
after reduce and before sink. 
The map function should be called right after window is triggered but before 
flushing to s3.
The result of reduce (deduped record) could be logged there.
This should allow to check whether the processed distinct records were buffered 
in the state after the restoration from the savepoint or not. If they were 
buffered we should see that there was an attempt to write them to the sink from 
the state.

Another suggestion is to try to write records to some other sink or to both. 
E.g. if you can access file system of workers, maybe just into local files and 
check whether the records are also dropped there.

Best,
Andrey

> On 20 Sep 2018, at 15:37, Juho Autio  wrote:
> 
> Hi Andrey!
> 
> I was finally able to gather the DEBUG logs that you suggested. In short, the 
> reducer logged that it processed at least some of the ids that were missing 
> from the output.
> 
> "At least some", because I didn't have the job running with DEBUG logs for 
> the full 24-hour window period. So I was only able to look up if I can find 
> some of the missing ids in the DEBUG logs. Which I did indeed.
> 
> I changed the DistinctFunction.java to do this:
> 
> @Override
> public Map reduce(Map value1, Map String> value2) {
> LOG.debug("DistinctFunction.reduce returns: {}={}", 
> value1.get("field"), value1.get("id"));
> return value1;
> }
> 
> Then:
> 
> vi flink-1.6.0/conf/log4j.properties
> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
> 
> Then I ran the following kind of test:
> 
> - Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC 2018
> - Started a new cluster & job with DEBUG enabled at ~09:13, restored from 
> that previous cluster's savepoint
> - Ran until caught up offsets
> - Cancelled the job with a new savepoint
> - Started a new job _without_ DEBUG, which restored the new savepoint, let it 
> keep running so that it will eventually write the output
> 
> Then on the next day, after results had been flushed when the 24-hour window 
> closed, I compared the results again with a batch version's output. And found 
> some missing ids as usual.
> 
> I drilled down to one specific missing id (I'm replacing the actual value 
> with AN12345 below), which was not found in the stream output, but was found 
> in batch output & flink DEBUG logs.
> 
> Related to that id, I gathered the following information:
> 
> 2018-09-18~09:13:21,000 job started & savepoint is restored
> 
> 2018-09-18 09:14:29,085 missing id is processed for the first time, proved by 
> this log line:
> 2018-09-18 09:14:29,085 DEBUG com.rovio.ds.flink.uniqueid.DistinctFunction
>   - DistinctFunction.reduce returns: s.aid1=AN12345
> 
> 2018-09-18 09:15:14,264 first synchronous part of checkpoint
> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint
> 
> (
>   more occurrences of checkpoints (~1 min checkpointing time + ~1 min 
> delay before next)
>   /
>   more occurrences of DistinctFunction.reduce
> )
> 
> 2018-09-18 09:23:45,053 missing id is processed for the last time
> 
> 2018-09-18~10:20:00,000 savepoint created & job cancelled
> 
> To be noted, there was high backpressure after restoring from savepoint until 
> the stream caught up with the kafka offsets. Although, our job uses assign 
> timestamps & watermarks on the flink kafka consumer itself, so event time of 
> all partitions is synchronized. As expected, we don't get any late data in 
> the late data side output.
> 
> From this we can see that the missing ids are processed by the reducer, but 
> they must get lost somewhere before the 24-hour window is triggered.
> 
> I think it's worth mentioning once more that the stream doesn't miss any ids 
> if we let it's running without interruptions / state restoring.
> 
> What's next?
> 
> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin  > wrote:
> Hi Juho,
> 
> > only when the 24-hour window triggers, BucketingSink gets a burst of input
> 
> This is of course totally true, my understanding is the same. We cannot 
> exclude problem there for sure, just savepoints are used a lot w/o problem 
> reports and BucketingSink is known to be problematic with s3. That is why, I 
> asked you:
> 
> > You also wrote that the timestamps of lost event are 'probably' around the 
> > time of the savepoint, if it is not yet for sure I would also check it.
> 
> Although, bucketing sink might loose any data at the end of the day (also 
> from the middle). The fact, that it is always around the time of taking a 
> savepoint and not random, is surely suspicious and possible savepoint 
> failures need to be investigated.
> 
> Regarding the s3 problem, s3 doc says:
> 
> > The 

Re: Data loss when restoring from savepoint

2018-09-20 Thread Juho Autio
Hi Andrey!

I was finally able to gather the DEBUG logs that you suggested. In short,
the reducer logged that it processed at least some of the ids that were
missing from the output.

"At least some", because I didn't have the job running with DEBUG logs for
the full 24-hour window period. So I was only able to look up if I can find
*some* of the missing ids in the DEBUG logs. Which I did indeed.

I changed the DistinctFunction.java to do this:

@Override
public Map reduce(Map value1,
Map value2) {
LOG.debug("DistinctFunction.reduce returns: {}={}",
value1.get("field"), value1.get("id"));
return value1;
}

Then:

vi flink-1.6.0/conf/log4j.properties
log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG

Then I ran the following kind of test:

- Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC
2018
- Started a new cluster & job with DEBUG enabled at ~09:13, restored from
that previous cluster's savepoint
- Ran until caught up offsets
- Cancelled the job with a new savepoint
- Started a new job _without_ DEBUG, which restored the new savepoint, let
it keep running so that it will eventually write the output

Then on the next day, after results had been flushed when the 24-hour
window closed, I compared the results again with a batch version's output.
And found some missing ids as usual.

I drilled down to one specific missing id (I'm replacing the actual value
with AN12345 below), which was not found in the stream output, but was
found in batch output & flink DEBUG logs.

Related to that id, I gathered the following information:

2018-09-18~09:13:21,000 job started & savepoint is restored

2018-09-18 09:14:29,085 missing id is processed for the first time, proved
by this log line:
2018-09-18 09:14:29,085 DEBUG com.rovio.ds.flink.uniqueid.DistinctFunction
- DistinctFunction.reduce returns: s.aid1=AN12345

2018-09-18 09:15:14,264 first synchronous part of checkpoint
2018-09-18 09:15:16,544 first asynchronous part of checkpoint

(
more occurrences of checkpoints (~1 min checkpointing time + ~1 min delay
before next)
/
more occurrences of DistinctFunction.reduce
)

2018-09-18 09:23:45,053 missing id is processed for the last time

2018-09-18~10:20:00,000 savepoint created & job cancelled

To be noted, there was high backpressure after restoring from savepoint
until the stream caught up with the kafka offsets. Although, our job uses
assign timestamps & watermarks on the flink kafka consumer itself, so event
time of all partitions is synchronized. As expected, we don't get any late
data in the late data side output.

>From this we can see that the missing ids are processed by the reducer, but
they must get lost somewhere before the 24-hour window is triggered.

I think it's worth mentioning once more that the stream doesn't miss any
ids if we let it's running without interruptions / state restoring.

What's next?

On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin 
wrote:

> Hi Juho,
>
> > only when the 24-hour window triggers, BucketingSink gets a burst of
> input
>
> This is of course totally true, my understanding is the same. We cannot
> exclude problem there for sure, just savepoints are used a lot w/o problem
> reports and BucketingSink is known to be problematic with s3. That is why,
> I asked you:
>
> > You also wrote that the timestamps of lost event are 'probably' around
> the time of the savepoint, if it is not yet for sure I would also check it.
>
> Although, bucketing sink might loose any data at the end of the day (also
> from the middle). The fact, that it is always around the time of taking a
> savepoint and not random, is surely suspicious and possible savepoint
> failures need to be investigated.
>
> Regarding the s3 problem, s3 doc says:
>
> > The caveat is that if you make a HEAD or GET request to the key name (to
> find if the object exists) before creating the object, Amazon S3 provides
> 'eventual consistency' for read-after-write.
>
> The algorithm you suggest is how it is roughly implemented now
> (BucketingSink.openNewPartFile). My understanding is that
> 'eventual consistency’ means that even if you just created file (its name
> is key) it can be that you do not get it in the list or exists (HEAD)
> returns false and you risk to rewrite the previous part.
>
> The BucketingSink was designed for a standard file system. s3 is used over
> a file system wrapper atm but does not always provide normal file system
> guarantees. See also last example in [1].
>
> Cheers,
> Andrey
>
> [1]
> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82
>
> On 29 Aug 2018, at 12:11, Juho Autio  wrote:
>
> Andrey, thank you very much for the debugging suggestions, I'll try them.
>
> In the meanwhile two more questions, please:
>
> > Just to keep in mind this problem with s3 and exclude it for sure. I
> would also check whether the size of 

Re: Data loss when restoring from savepoint

2018-08-29 Thread Andrey Zagrebin
Hi Juho,

> only when the 24-hour window triggers, BucketingSink gets a burst of input

This is of course totally true, my understanding is the same. We cannot exclude 
problem there for sure, just savepoints are used a lot w/o problem reports and 
BucketingSink is known to be problematic with s3. That is why, I asked you:

> You also wrote that the timestamps of lost event are 'probably' around the 
> time of the savepoint, if it is not yet for sure I would also check it.

Although, bucketing sink might loose any data at the end of the day (also from 
the middle). The fact, that it is always around the time of taking a savepoint 
and not random, is surely suspicious and possible savepoint failures need to be 
investigated.

Regarding the s3 problem, s3 doc says:

> The caveat is that if you make a HEAD or GET request to the key name (to find 
> if the object exists) before creating the object, Amazon S3 provides 
> 'eventual consistency' for read-after-write.

The algorithm you suggest is how it is roughly implemented now 
(BucketingSink.openNewPartFile). My understanding is that 'eventual 
consistency’ means that even if you just created file (its name is key) it can 
be that you do not get it in the list or exists (HEAD) returns false and you 
risk to rewrite the previous part.

The BucketingSink was designed for a standard file system. s3 is used over a 
file system wrapper atm but does not always provide normal file system 
guarantees. See also last example in [1].

Cheers,
Andrey

[1] 
https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82 


> On 29 Aug 2018, at 12:11, Juho Autio  wrote:
> 
> Andrey, thank you very much for the debugging suggestions, I'll try them.
> 
> In the meanwhile two more questions, please:
> 
> > Just to keep in mind this problem with s3 and exclude it for sure. I would 
> > also check whether the size of missing events is around the batch size of 
> > BucketingSink or not.
> 
> Fair enough, but I also want to focus on debugging the most probable subject 
> first. So what do you think about this – true or false: only when the 24-hour 
> window triggers, BucketinSink gets a burst of input. Around the state 
> restoring point (middle of the day) it doesn't get any input, so it can't 
> lose anything either. Isn't this true, or have I totally missed how Flink 
> works in triggering window results? I would not expect there to be any 
> optimization that speculatively triggers early results of a regular time 
> window to the downstream operators.
> 
> > The old BucketingSink has in general problem with s3. Internally 
> > BucketingSink queries s3 as a file system to list already written file 
> > parts (batches) and determine index of the next part to start. Due to 
> > eventual consistency of checking file existence in s3 [1], the 
> > BucketingSink can rewrite the previously written part and basically loose 
> > it.
> 
> I was wondering, what does S3's "read-after-write consistency" (mentioned on 
> the page you linked) actually mean. It seems that this might be possible:
> - LIST keys, find current max index
> - choose next index = max + 1
> - HEAD next index: if it exists, keep adding + 1 until key doesn't exist on S3
> 
> But definitely sounds easier if a sink keeps track of files in a way that's 
> guaranteed to be consistent.
> 
> Cheers,
> Juho
> 
> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin  > wrote:
> Hi,
> 
> true, StreamingFileSink does not support s3 in 1.6.0, it is planned for the 
> next 1.7 release, sorry for confusion.
> The old BucketingSink has in general problem with s3. Internally 
> BucketingSink queries s3 as a file system 
> to list already written file parts (batches) and determine index of the next 
> part to start. Due to eventual consistency of checking file existence in s3 
> [1], the BucketingSink can rewrite the previously written part and basically 
> loose it. It should be fixed for StreamingFileSink in 1.7 where Flink keeps 
> its own track of written parts and does not rely on s3 as a file system. 
> I also include Kostas, he might add more details. 
> 
> Just to keep in mind this problem with s3 and exclude it for sure  I would 
> also check whether the size of missing events is around the batch size of 
> BucketingSink or not. You also wrote that the timestamps of lost event are 
> 'probably' around the time of the savepoint, if it is not yet for sure I 
> would also check it.
> 
> Have you already checked the log files of job manager and task managers for 
> the job running before and after the restore from the check point? Is 
> everything successful there, no errors, relevant warnings or exceptions?
> 
> As the next step, I would suggest to log all encountered events in 
> DistinctFunction.reduce if possible for production data and check whether the 
> missed events are eventually processed before 

Re: Data loss when restoring from savepoint

2018-08-29 Thread Juho Autio
Andrey, thank you very much for the debugging suggestions, I'll try them.

In the meanwhile two more questions, please:

> Just to keep in mind this problem with s3 and exclude it for sure. I
would also check whether the size of missing events is around the batch
size of BucketingSink or not.

Fair enough, but I also want to focus on debugging the most probable
subject first. So what do you think about this – true or false: only when
the 24-hour window triggers, BucketinSink gets a burst of input. Around the
state restoring point (middle of the day) it doesn't get any input, so it
can't lose anything either. Isn't this true, or have I totally missed how
Flink works in triggering window results? I would not expect there to be
any optimization that speculatively triggers early results of a regular
time window to the downstream operators.

> The old BucketingSink has in general problem with s3. Internally
BucketingSink queries s3 as a file system to list already written file
parts (batches) and determine index of the next part to start. Due to
eventual consistency of checking file existence in s3 [1], the
BucketingSink can rewrite the previously written part and basically loose
it.

I was wondering, what does S3's "read-after-write consistency" (mentioned
on the page you linked) actually mean. It seems that this might be possible:
- LIST keys, find current max index
- choose next index = max + 1
- HEAD next index: if it exists, keep adding + 1 until key doesn't exist on
S3

But definitely sounds easier if a sink keeps track of files in a way that's
guaranteed to be consistent.

Cheers,
Juho

On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin 
wrote:

> Hi,
>
> true, StreamingFileSink does not support s3 in 1.6.0, it is planned for
> the next 1.7 release, sorry for confusion.
> The old BucketingSink has in general problem with s3.
> Internally BucketingSink queries s3 as a file system
> to list already written file parts (batches) and determine index of the
> next part to start. Due to eventual consistency of checking file existence
> in s3 [1], the BucketingSink can rewrite the previously written part and
> basically loose it. It should be fixed for StreamingFileSink in 1.7 where
> Flink keeps its own track of written parts and does not rely on s3 as a
> file system.
> I also include Kostas, he might add more details.
>
> Just to keep in mind this problem with s3 and exclude it for sure  I would
> also check whether the size of missing events is around the batch size of
> BucketingSink or not. You also wrote that the timestamps of lost event are
> 'probably' around the time of the savepoint, if it is not yet for sure I
> would also check it.
>
> Have you already checked the log files of job manager and task managers
> for the job running before and after the restore from the check point? Is
> everything successful there, no errors, relevant warnings or exceptions?
>
> As the next step, I would suggest to log all encountered events in
> DistinctFunction.reduce if possible for production data and check whether
> the missed events are eventually processed before or after the savepoint.
> The following log message indicates a border between the events that should
> be included into the savepoint (logged before) or not:
> “{} ({}, synchronous part) in thread {} took {} ms” (template)
> Also check if the savepoint has been overall completed:
> "{} ({}, asynchronous part) in thread {} took {} ms."
>
> Best,
> Andrey
>
> [1] https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html
>
> On 24 Aug 2018, at 20:41, Juho Autio  wrote:
>
> Hi,
>
> Using StreamingFileSink is not a convenient option for production use for
> us as it doesn't support s3*. I could use StreamingFileSink just to verify,
> but I don't see much point in doing so. Please consider my previous comment:
>
> > I realized that BucketingSink must not play any role in this problem.
> This is because only when the 24-hour window triggers, BucketingSink gets a
> burst of input. Around the state restoring point (middle of the day) it
> doesn't get any input, so it can't lose anything either (right?).
>
> I could also use a kafka sink instead, but I can't imagine how there could
> be any difference. It's very real that the sink doesn't get any input for a
> long time until the 24-hour window closes, and then it quickly writes out
> everything because it's not that much data eventually for the distinct
> values.
>
> Any ideas for debugging what's happening around the savepoint &
> restoration time?
>
> *) I actually implemented StreamingFileSink as an alternative sink. This
> was before I came to realize that most likely the sink component has
> nothing to do with the data loss problem. I tried it with s3n:// path just
> to see an exception being thrown. In the source code I indeed then found an
> explicit check for the target path scheme to be "hdfs://".
>
> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin 
> wrote:
>
>> Ok, I think before further 

Re: Data loss when restoring from savepoint

2018-08-27 Thread Andrey Zagrebin
Hi,

true, StreamingFileSink does not support s3 in 1.6.0, it is planned for the 
next 1.7 release, sorry for confusion.
The old BucketingSink has in general problem with s3. Internally BucketingSink 
queries s3 as a file system 
to list already written file parts (batches) and determine index of the next 
part to start. Due to eventual consistency of checking file existence in s3 
[1], the BucketingSink can rewrite the previously written part and basically 
loose it. It should be fixed for StreamingFileSink in 1.7 where Flink keeps its 
own track of written parts and does not rely on s3 as a file system. 
I also include Kostas, he might add more details. 

Just to keep in mind this problem with s3 and exclude it for sure  I would also 
check whether the size of missing events is around the batch size of 
BucketingSink or not. You also wrote that the timestamps of lost event are 
'probably' around the time of the savepoint, if it is not yet for sure I would 
also check it.

Have you already checked the log files of job manager and task managers for the 
job running before and after the restore from the check point? Is everything 
successful there, no errors, relevant warnings or exceptions?

As the next step, I would suggest to log all encountered events in 
DistinctFunction.reduce if possible for production data and check whether the 
missed events are eventually processed before or after the savepoint. The 
following log message indicates a border between the events that should be 
included into the savepoint (logged before) or not:
“{} ({}, synchronous part) in thread {} took {} ms” (template)
Also check if the savepoint has been overall completed:
"{} ({}, asynchronous part) in thread {} took {} ms."

Best,
Andrey

[1] https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html

> On 24 Aug 2018, at 20:41, Juho Autio  wrote:
> 
> Hi,
> 
> Using StreamingFileSink is not a convenient option for production use for us 
> as it doesn't support s3*. I could use StreamingFileSink just to verify, but 
> I don't see much point in doing so. Please consider my previous comment:
> 
> > I realized that BucketingSink must not play any role in this problem. This 
> > is because only when the 24-hour window triggers, BucketingSink gets a 
> > burst of input. Around the state restoring point (middle of the day) it 
> > doesn't get any input, so it can't lose anything either (right?).
> 
> I could also use a kafka sink instead, but I can't imagine how there could be 
> any difference. It's very real that the sink doesn't get any input for a long 
> time until the 24-hour window closes, and then it quickly writes out 
> everything because it's not that much data eventually for the distinct values.
> 
> Any ideas for debugging what's happening around the savepoint & restoration 
> time?
> 
> *) I actually implemented StreamingFileSink as an alternative sink. This was 
> before I came to realize that most likely the sink component has nothing to 
> do with the data loss problem. I tried it with s3n:// path just to see an 
> exception being thrown. In the source code I indeed then found an explicit 
> check for the target path scheme to be "hdfs://". 
> 
> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin  > wrote:
> Ok, I think before further debugging the window reduced state, 
> could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 
> instead of the previous 'BucketingSink’?
> 
> Cheers,
> Andrey
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>  
> 
> 
>> On 24 Aug 2018, at 18:03, Juho Autio > > wrote:
>> 
>> Yes, sorry for my confusing comment. I just meant that it seems like there's 
>> a bug somewhere now that the output is missing some data.
>> 
>> > I would wait and check the actual output in s3 because it is the main 
>> > result of the job
>> 
>> Yes, and that's what I have already done. There seems to be always some data 
>> loss with the production data volumes, if the job has been restarted on that 
>> day.
>> 
>> Would you have any suggestions for how to debug this further?
>> 
>> Many thanks for stepping in.
>> 
>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin > > wrote:
>> Hi Juho,
>> 
>> So it is a per key deduplication job.
>> 
>> Yes, I would wait and check the actual output in s3 because it is the main 
>> result of the job and
>> 
>> > The late data around the time of taking savepoint might be not included 
>> > into the savepoint but it should be behind the snapshotted offset in Kafka.
>> 
>> is not a bug, it is a possible behaviour.
>> 
>> The savepoint is a snapshot of the data in transient which is already 
>> consumed from Kafka.
>> Basically the full contents of the window result is split between the 
>> savepoint and what can come 

Re: Data loss when restoring from savepoint

2018-08-24 Thread Juho Autio
Hi,

Using StreamingFileSink is not a convenient option for production use for
us as it doesn't support s3*. I could use StreamingFileSink just to verify,
but I don't see much point in doing so. Please consider my previous comment:

> I realized that BucketingSink must not play any role in this problem.
This is because only when the 24-hour window triggers, BucketingSink gets a
burst of input. Around the state restoring point (middle of the day) it
doesn't get any input, so it can't lose anything either (right?).

I could also use a kafka sink instead, but I can't imagine how there could
be any difference. It's very real that the sink doesn't get any input for a
long time until the 24-hour window closes, and then it quickly writes out
everything because it's not that much data eventually for the distinct
values.

Any ideas for debugging what's happening around the savepoint & restoration
time?

*) I actually implemented StreamingFileSink as an alternative sink. This
was before I came to realize that most likely the sink component has
nothing to do with the data loss problem. I tried it with s3n:// path just
to see an exception being thrown. In the source code I indeed then found an
explicit check for the target path scheme to be "hdfs://".

On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin 
wrote:

> Ok, I think before further debugging the window reduced state,
> could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0
> instead of the previous 'BucketingSink’?
>
> Cheers,
> Andrey
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>
> On 24 Aug 2018, at 18:03, Juho Autio  wrote:
>
> Yes, sorry for my confusing comment. I just meant that it seems like
> there's a bug somewhere now that the output is missing some data.
>
> > I would wait and check the actual output in s3 because it is the main
> result of the job
>
> Yes, and that's what I have already done. There seems to be always some
> data loss with the production data volumes, if the job has been restarted
> on that day.
>
> Would you have any suggestions for how to debug this further?
>
> Many thanks for stepping in.
>
> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin 
> wrote:
>
>> Hi Juho,
>>
>> So it is a per key deduplication job.
>>
>> Yes, I would wait and check the actual output in s3 because it is the
>> main result of the job and
>>
>> > The late data around the time of taking savepoint might be not included
>> into the savepoint but it should be behind the snapshotted offset in Kafka.
>>
>> is not a bug, it is a possible behaviour.
>>
>> The savepoint is a snapshot of the data in transient which is already
>> consumed from Kafka.
>> Basically the full contents of the window result is split between the
>> savepoint and what can come after the savepoint'ed offset in Kafka but
>> before the window result is written into s3.
>>
>> Allowed lateness should not affect it, I am just saying that the final
>> result in s3 should include all records after it.
>> This is what should be guaranteed but not the contents of the
>> intermediate savepoint.
>>
>> Cheers,
>> Andrey
>>
>> On 24 Aug 2018, at 16:52, Juho Autio  wrote:
>>
>> Thanks for your answer!
>>
>> I check for the missed data from the final output on s3. So I wait until
>> the next day, then run the same thing re-implemented in batch, and compare
>> the output.
>>
>> > The late data around the time of taking savepoint might be not included
>> into the savepoint but it should be behind the snapshotted offset in Kafka.
>>
>> Yes, I would definitely expect that. It seems like there's a bug
>> somewhere.
>>
>> > Then it should just come later after the restore and should be reduced
>> within the allowed lateness into the final result which is saved into s3.
>>
>> Well, as far as I know, allowed lateness doesn't play any role here,
>> because I started running the job with allowedLateness=0, and still get the
>> data loss, while my late data output doesn't receive anything.
>>
>> > Also, is this `DistinctFunction.reduce` just an example or the actual
>> implementation, basically saving just one of records inside the 24h window
>> in s3? then what is missing there?
>>
>> Yes, it's the actual implementation. Note that there's a keyBy before
>> the DistinctFunction. So there's one record for each key (which is the
>> combination of a couple of fields). In practice I've seen that we're
>> missing ~2000-4000 elements on each restore, and the total output is
>> obviously much more than that.
>>
>> Here's the full code for the key selector:
>>
>> public class MapKeySelector implements KeySelector,
>> Object> {
>>
>> private final String[] fields;
>>
>> public MapKeySelector(String... fields) {
>> this.fields = fields;
>> }
>>
>> @Override
>> public Object getKey(Map event) throws Exception {
>> Tuple key = Tuple.getTupleClass(fields.length).newInstance();
>> for (int i = 0; i < fields.length; 

Re: Data loss when restoring from savepoint

2018-08-24 Thread Andrey Zagrebin
Ok, I think before further debugging the window reduced state, 
could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 instead 
of the previous 'BucketingSink’?

Cheers,
Andrey

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
 


> On 24 Aug 2018, at 18:03, Juho Autio  wrote:
> 
> Yes, sorry for my confusing comment. I just meant that it seems like there's 
> a bug somewhere now that the output is missing some data.
> 
> > I would wait and check the actual output in s3 because it is the main 
> > result of the job
> 
> Yes, and that's what I have already done. There seems to be always some data 
> loss with the production data volumes, if the job has been restarted on that 
> day.
> 
> Would you have any suggestions for how to debug this further?
> 
> Many thanks for stepping in.
> 
> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin  > wrote:
> Hi Juho,
> 
> So it is a per key deduplication job.
> 
> Yes, I would wait and check the actual output in s3 because it is the main 
> result of the job and
> 
> > The late data around the time of taking savepoint might be not included 
> > into the savepoint but it should be behind the snapshotted offset in Kafka.
> 
> is not a bug, it is a possible behaviour.
> 
> The savepoint is a snapshot of the data in transient which is already 
> consumed from Kafka.
> Basically the full contents of the window result is split between the 
> savepoint and what can come after the savepoint'ed offset in Kafka but before 
> the window result is written into s3. 
> 
> Allowed lateness should not affect it, I am just saying that the final result 
> in s3 should include all records after it. 
> This is what should be guaranteed but not the contents of the intermediate 
> savepoint.
> 
> Cheers,
> Andrey
> 
>> On 24 Aug 2018, at 16:52, Juho Autio > > wrote:
>> 
>> Thanks for your answer!
>> 
>> I check for the missed data from the final output on s3. So I wait until the 
>> next day, then run the same thing re-implemented in batch, and compare the 
>> output.
>> 
>> > The late data around the time of taking savepoint might be not included 
>> > into the savepoint but it should be behind the snapshotted offset in Kafka.
>> 
>> Yes, I would definitely expect that. It seems like there's a bug somewhere.
>> 
>> > Then it should just come later after the restore and should be reduced 
>> > within the allowed lateness into the final result which is saved into s3.
>> 
>> Well, as far as I know, allowed lateness doesn't play any role here, because 
>> I started running the job with allowedLateness=0, and still get the data 
>> loss, while my late data output doesn't receive anything.
>> 
>> > Also, is this `DistinctFunction.reduce` just an example or the actual 
>> > implementation, basically saving just one of records inside the 24h window 
>> > in s3? then what is missing there?
>> 
>> Yes, it's the actual implementation. Note that there's a keyBy before the 
>> DistinctFunction. So there's one record for each key (which is the 
>> combination of a couple of fields). In practice I've seen that we're missing 
>> ~2000-4000 elements on each restore, and the total output is obviously much 
>> more than that.
>> 
>> Here's the full code for the key selector:
>> 
>> public class MapKeySelector implements KeySelector, 
>> Object> {
>> 
>> private final String[] fields;
>> 
>> public MapKeySelector(String... fields) {
>> this.fields = fields;
>> }
>> 
>> @Override
>> public Object getKey(Map event) throws Exception {
>> Tuple key = Tuple.getTupleClass(fields.length).newInstance();
>> for (int i = 0; i < fields.length; i++) {
>> key.setField(event.getOrDefault(fields[i], ""), i);
>> }
>> return key;
>> }
>> }
>> 
>> And a more exact example on how it's used:
>> 
>> .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", 
>> "KEY_NAME", "KEY_VALUE"))
>> .timeWindow(Time.days(1))
>> .reduce(new DistinctFunction())
>> 
>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin > > wrote:
>> Hi Juho,
>> 
>> Where exactly does the data miss? When do you notice that? 
>> Do you check it:
>> - debugging `DistinctFunction.reduce` right after resume in the middle of 
>> the day 
>> or 
>> - some distinct records miss in the final output of BucketingSink in s3 
>> after window result is actually triggered and saved into s3 at the end of 
>> the day? is this the main output?
>> 
>> The late data around the time of taking savepoint might be not included into 
>> the savepoint but it should be behind the snapshotted offset in Kafka. Then 
>> it should just come later after the restore and should be reduced within the 
>> allowed lateness into the 

Re: Data loss when restoring from savepoint

2018-08-24 Thread Juho Autio
Yes, sorry for my confusing comment. I just meant that it seems like
there's a bug somewhere now that the output is missing some data.

> I would wait and check the actual output in s3 because it is the main
result of the job

Yes, and that's what I have already done. There seems to be always some
data loss with the production data volumes, if the job has been restarted
on that day.

Would you have any suggestions for how to debug this further?

Many thanks for stepping in.

On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin 
wrote:

> Hi Juho,
>
> So it is a per key deduplication job.
>
> Yes, I would wait and check the actual output in s3 because it is the main
> result of the job and
>
> > The late data around the time of taking savepoint might be not included
> into the savepoint but it should be behind the snapshotted offset in Kafka.
>
> is not a bug, it is a possible behaviour.
>
> The savepoint is a snapshot of the data in transient which is already
> consumed from Kafka.
> Basically the full contents of the window result is split between the
> savepoint and what can come after the savepoint'ed offset in Kafka but
> before the window result is written into s3.
>
> Allowed lateness should not affect it, I am just saying that the final
> result in s3 should include all records after it.
> This is what should be guaranteed but not the contents of the intermediate
> savepoint.
>
> Cheers,
> Andrey
>
> On 24 Aug 2018, at 16:52, Juho Autio  wrote:
>
> Thanks for your answer!
>
> I check for the missed data from the final output on s3. So I wait until
> the next day, then run the same thing re-implemented in batch, and compare
> the output.
>
> > The late data around the time of taking savepoint might be not included
> into the savepoint but it should be behind the snapshotted offset in Kafka.
>
> Yes, I would definitely expect that. It seems like there's a bug somewhere.
>
> > Then it should just come later after the restore and should be reduced
> within the allowed lateness into the final result which is saved into s3.
>
> Well, as far as I know, allowed lateness doesn't play any role here,
> because I started running the job with allowedLateness=0, and still get the
> data loss, while my late data output doesn't receive anything.
>
> > Also, is this `DistinctFunction.reduce` just an example or the actual
> implementation, basically saving just one of records inside the 24h window
> in s3? then what is missing there?
>
> Yes, it's the actual implementation. Note that there's a keyBy before
> the DistinctFunction. So there's one record for each key (which is the
> combination of a couple of fields). In practice I've seen that we're
> missing ~2000-4000 elements on each restore, and the total output is
> obviously much more than that.
>
> Here's the full code for the key selector:
>
> public class MapKeySelector implements KeySelector,
> Object> {
>
> private final String[] fields;
>
> public MapKeySelector(String... fields) {
> this.fields = fields;
> }
>
> @Override
> public Object getKey(Map event) throws Exception {
> Tuple key = Tuple.getTupleClass(fields.length).newInstance();
> for (int i = 0; i < fields.length; i++) {
> key.setField(event.getOrDefault(fields[i], ""), i);
> }
> return key;
> }
> }
>
> And a more exact example on how it's used:
>
> .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD",
> "KEY_NAME", "KEY_VALUE"))
> .timeWindow(Time.days(1))
> .reduce(new DistinctFunction())
>
> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin 
> wrote:
>
>> Hi Juho,
>>
>> Where exactly does the data miss? When do you notice that?
>> Do you check it:
>> - debugging `DistinctFunction.reduce` right after resume in the middle of
>> the day
>> or
>> - some distinct records miss in the final output of BucketingSink in s3
>> after window result is actually triggered and saved into s3 at the end of
>> the day? is this the main output?
>>
>> The late data around the time of taking savepoint might be not included
>> into the savepoint but it should be behind the snapshotted offset in Kafka.
>> Then it should just come later after the restore and should be reduced
>> within the allowed lateness into the final result which is saved into s3.
>>
>> Also, is this `DistinctFunction.reduce` just an example or the actual
>> implementation, basically saving just one of records inside the 24h window
>> in s3? then what is missing there?
>>
>> Cheers,
>> Andrey
>>
>> On 23 Aug 2018, at 15:42, Juho Autio  wrote:
>>
>> I changed to allowedLateness=0, no change, still missing data when
>> restoring from savepoint.
>>
>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio  wrote:
>>
>>> I realized that BucketingSink must not play any role in this problem.
>>> This is because only when the 24-hour window triggers, BucketinSink gets a
>>> burst of input. Around the state restoring point (middle of the day) it
>>> 

Re: Data loss when restoring from savepoint

2018-08-24 Thread Andrey Zagrebin
Hi Juho,

So it is a per key deduplication job.

Yes, I would wait and check the actual output in s3 because it is the main 
result of the job and

> The late data around the time of taking savepoint might be not included into 
> the savepoint but it should be behind the snapshotted offset in Kafka.

is not a bug, it is a possible behaviour.

The savepoint is a snapshot of the data in transient which is already consumed 
from Kafka.
Basically the full contents of the window result is split between the savepoint 
and what can come after the savepoint'ed offset in Kafka but before the window 
result is written into s3. 

Allowed lateness should not affect it, I am just saying that the final result 
in s3 should include all records after it. 
This is what should be guaranteed but not the contents of the intermediate 
savepoint.

Cheers,
Andrey

> On 24 Aug 2018, at 16:52, Juho Autio  wrote:
> 
> Thanks for your answer!
> 
> I check for the missed data from the final output on s3. So I wait until the 
> next day, then run the same thing re-implemented in batch, and compare the 
> output.
> 
> > The late data around the time of taking savepoint might be not included 
> > into the savepoint but it should be behind the snapshotted offset in Kafka.
> 
> Yes, I would definitely expect that. It seems like there's a bug somewhere.
> 
> > Then it should just come later after the restore and should be reduced 
> > within the allowed lateness into the final result which is saved into s3.
> 
> Well, as far as I know, allowed lateness doesn't play any role here, because 
> I started running the job with allowedLateness=0, and still get the data 
> loss, while my late data output doesn't receive anything.
> 
> > Also, is this `DistinctFunction.reduce` just an example or the actual 
> > implementation, basically saving just one of records inside the 24h window 
> > in s3? then what is missing there?
> 
> Yes, it's the actual implementation. Note that there's a keyBy before the 
> DistinctFunction. So there's one record for each key (which is the 
> combination of a couple of fields). In practice I've seen that we're missing 
> ~2000-4000 elements on each restore, and the total output is obviously much 
> more than that.
> 
> Here's the full code for the key selector:
> 
> public class MapKeySelector implements KeySelector, 
> Object> {
> 
> private final String[] fields;
> 
> public MapKeySelector(String... fields) {
> this.fields = fields;
> }
> 
> @Override
> public Object getKey(Map event) throws Exception {
> Tuple key = Tuple.getTupleClass(fields.length).newInstance();
> for (int i = 0; i < fields.length; i++) {
> key.setField(event.getOrDefault(fields[i], ""), i);
> }
> return key;
> }
> }
> 
> And a more exact example on how it's used:
> 
> .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", 
> "KEY_NAME", "KEY_VALUE"))
> .timeWindow(Time.days(1))
> .reduce(new DistinctFunction())
> 
> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin  > wrote:
> Hi Juho,
> 
> Where exactly does the data miss? When do you notice that? 
> Do you check it:
> - debugging `DistinctFunction.reduce` right after resume in the middle of the 
> day 
> or 
> - some distinct records miss in the final output of BucketingSink in s3 after 
> window result is actually triggered and saved into s3 at the end of the day? 
> is this the main output?
> 
> The late data around the time of taking savepoint might be not included into 
> the savepoint but it should be behind the snapshotted offset in Kafka. Then 
> it should just come later after the restore and should be reduced within the 
> allowed lateness into the final result which is saved into s3.
> 
> Also, is this `DistinctFunction.reduce` just an example or the actual 
> implementation, basically saving just one of records inside the 24h window in 
> s3? then what is missing there?
> 
> Cheers,
> Andrey
> 
>> On 23 Aug 2018, at 15:42, Juho Autio > > wrote:
>> 
>> I changed to allowedLateness=0, no change, still missing data when restoring 
>> from savepoint.
>> 
>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio > > wrote:
>> I realized that BucketingSink must not play any role in this problem. This 
>> is because only when the 24-hour window triggers, BucketinSink gets a burst 
>> of input. Around the state restoring point (middle of the day) it doesn't 
>> get any input, so it can't lose anything either (right?).
>> 
>> I will next try removing the allowedLateness entirely from the equation.
>> 
>> In the meanwhile, please let me know if you have any suggestions for 
>> debugging the lost data, for example what logs to enable.
>> 
>> We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that 
>> could contribute to lost data when restoring a savepoint?
>> 
>> On Fri, 

Re: Data loss when restoring from savepoint

2018-08-24 Thread Juho Autio
Thanks for your answer!

I check for the missed data from the final output on s3. So I wait until
the next day, then run the same thing re-implemented in batch, and compare
the output.

> The late data around the time of taking savepoint might be not included
into the savepoint but it should be behind the snapshotted offset in Kafka.

Yes, I would definitely expect that. It seems like there's a bug somewhere.

> Then it should just come later after the restore and should be reduced
within the allowed lateness into the final result which is saved into s3.

Well, as far as I know, allowed lateness doesn't play any role here,
because I started running the job with allowedLateness=0, and still get the
data loss, while my late data output doesn't receive anything.

> Also, is this `DistinctFunction.reduce` just an example or the actual
implementation, basically saving just one of records inside the 24h window
in s3? then what is missing there?

Yes, it's the actual implementation. Note that there's a keyBy before
the DistinctFunction. So there's one record for each key (which is the
combination of a couple of fields). In practice I've seen that we're
missing ~2000-4000 elements on each restore, and the total output is
obviously much more than that.

Here's the full code for the key selector:

public class MapKeySelector implements KeySelector,
Object> {

private final String[] fields;

public MapKeySelector(String... fields) {
this.fields = fields;
}

@Override
public Object getKey(Map event) throws Exception {
Tuple key = Tuple.getTupleClass(fields.length).newInstance();
for (int i = 0; i < fields.length; i++) {
key.setField(event.getOrDefault(fields[i], ""), i);
}
return key;
}
}

And a more exact example on how it's used:

.keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD",
"KEY_NAME", "KEY_VALUE"))
.timeWindow(Time.days(1))
.reduce(new DistinctFunction())

On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin 
wrote:

> Hi Juho,
>
> Where exactly does the data miss? When do you notice that?
> Do you check it:
> - debugging `DistinctFunction.reduce` right after resume in the middle of
> the day
> or
> - some distinct records miss in the final output of BucketingSink in s3
> after window result is actually triggered and saved into s3 at the end of
> the day? is this the main output?
>
> The late data around the time of taking savepoint might be not included
> into the savepoint but it should be behind the snapshotted offset in Kafka.
> Then it should just come later after the restore and should be reduced
> within the allowed lateness into the final result which is saved into s3.
>
> Also, is this `DistinctFunction.reduce` just an example or the actual
> implementation, basically saving just one of records inside the 24h window
> in s3? then what is missing there?
>
> Cheers,
> Andrey
>
> On 23 Aug 2018, at 15:42, Juho Autio  wrote:
>
> I changed to allowedLateness=0, no change, still missing data when
> restoring from savepoint.
>
> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio  wrote:
>
>> I realized that BucketingSink must not play any role in this problem.
>> This is because only when the 24-hour window triggers, BucketinSink gets a
>> burst of input. Around the state restoring point (middle of the day) it
>> doesn't get any input, so it can't lose anything either (right?).
>>
>> I will next try removing the allowedLateness entirely from the equation.
>>
>> In the meanwhile, please let me know if you have any suggestions for
>> debugging the lost data, for example what logs to enable.
>>
>> We use FlinkKafkaConsumer010 btw. Are there any known issues with that,
>> that could contribute to lost data when restoring a savepoint?
>>
>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio  wrote:
>>
>>> Some data is silently lost on my Flink stream job when state is restored
>>> from a savepoint.
>>>
>>> Do you have any debugging hints to find out where exactly the data gets
>>> dropped?
>>>
>>> My job gathers distinct values using a 24-hour window. It doesn't have
>>> any custom state management.
>>>
>>> When I cancel the job with savepoint and restore from that savepoint,
>>> some data is missed. It seems to be losing just a small amount of data. The
>>> event time of lost data is probably around the time of savepoint. In other
>>> words the rest of the time window is not entirely missed – collection works
>>> correctly also for (most of the) events that come in after restoring.
>>>
>>> When the job processes a full 24-hour window without interruptions it
>>> doesn't miss anything.
>>>
>>> Usually the problem doesn't happen in test environments that have
>>> smaller parallelism and smaller data volumes. But in production volumes the
>>> job seems to be consistently missing at least something on every restore.
>>>
>>> This issue has consistently happened since the job was initially
>>> created. It was 

Re: Data loss when restoring from savepoint

2018-08-24 Thread Andrey Zagrebin
Hi Juho,

Where exactly does the data miss? When do you notice that? 
Do you check it:
- debugging `DistinctFunction.reduce` right after resume in the middle of the 
day 
or 
- some distinct records miss in the final output of BucketingSink in s3 after 
window result is actually triggered and saved into s3 at the end of the day? is 
this the main output?

The late data around the time of taking savepoint might be not included into 
the savepoint but it should be behind the snapshotted offset in Kafka. Then it 
should just come later after the restore and should be reduced within the 
allowed lateness into the final result which is saved into s3.

Also, is this `DistinctFunction.reduce` just an example or the actual 
implementation, basically saving just one of records inside the 24h window in 
s3? then what is missing there?

Cheers,
Andrey

> On 23 Aug 2018, at 15:42, Juho Autio  wrote:
> 
> I changed to allowedLateness=0, no change, still missing data when restoring 
> from savepoint.
> 
> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio  > wrote:
> I realized that BucketingSink must not play any role in this problem. This is 
> because only when the 24-hour window triggers, BucketinSink gets a burst of 
> input. Around the state restoring point (middle of the day) it doesn't get 
> any input, so it can't lose anything either (right?).
> 
> I will next try removing the allowedLateness entirely from the equation.
> 
> In the meanwhile, please let me know if you have any suggestions for 
> debugging the lost data, for example what logs to enable.
> 
> We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that 
> could contribute to lost data when restoring a savepoint?
> 
> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio  > wrote:
> Some data is silently lost on my Flink stream job when state is restored from 
> a savepoint.
> 
> Do you have any debugging hints to find out where exactly the data gets 
> dropped?
> 
> My job gathers distinct values using a 24-hour window. It doesn't have any 
> custom state management.
> 
> When I cancel the job with savepoint and restore from that savepoint, some 
> data is missed. It seems to be losing just a small amount of data. The event 
> time of lost data is probably around the time of savepoint. In other words 
> the rest of the time window is not entirely missed – collection works 
> correctly also for (most of the) events that come in after restoring.
> 
> When the job processes a full 24-hour window without interruptions it doesn't 
> miss anything.
> 
> Usually the problem doesn't happen in test environments that have smaller 
> parallelism and smaller data volumes. But in production volumes the job seems 
> to be consistently missing at least something on every restore.
> 
> This issue has consistently happened since the job was initially created. It 
> was at first run on an older version of Flink 1.5-SNAPSHOT and it still 
> happens on both Flink 1.5.2 & 1.6.0.
> 
> I'm wondering if this could be for example some synchronization issue between 
> the kafka consumer offsets vs. what's been written by BucketingSink?
> 
> 1. Job content, simplified
> 
> kafkaStream
> .flatMap(new ExtractFieldsFunction())
> .keyBy(new MapKeySelector(1, 2, 3, 4))
> .timeWindow(Time.days(1))
> .allowedLateness(allowedLateness)
> .sideOutputLateData(lateDataTag)
> .reduce(new DistinctFunction())
> .addSink(sink)
> // use a fixed number of output partitions
> .setParallelism(8))
> 
> /**
>  * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
>  */
> public class DistinctFunction implements ReduceFunction String>> {
> @Override
> public Map reduce(Map value1, Map String> value2) {
> return value1;
> }
> }
> 
> 2. State configuration
> 
> boolean enableIncrementalCheckpointing = true;
> String statePath = "s3n://bucket/savepoints";
> new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);
> 
> Checkpointing ModeExactly Once
> Interval  1m 0s
> Timeout   10m 0s
> Minimum Pause Between Checkpoints 1m 0s
> Maximum Concurrent Checkpoints1
> Persist Checkpoints ExternallyEnabled (retain on cancellation)
> 
> 3. BucketingSink configuration
> 
> We use BucketingSink, I don't think there's anything special here, if not the 
> fact that we're writing to S3.
> 
> String outputPath = "s3://bucket/output";
> BucketingSink> sink = new 
> BucketingSink>(outputPath)
> .setBucketer(new ProcessdateBucketer())
> .setBatchSize(batchSize)
> .setInactiveBucketThreshold(inactiveBucketThreshold)
> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
> sink.setWriter(new IdJsonWriter());
> 
> 4. Kafka & event 

Re: Data loss when restoring from savepoint

2018-08-23 Thread Juho Autio
I changed to allowedLateness=0, no change, still missing data when
restoring from savepoint.

On Tue, Aug 21, 2018 at 10:43 AM Juho Autio  wrote:

> I realized that BucketingSink must not play any role in this problem. This
> is because only when the 24-hour window triggers, BucketinSink gets a burst
> of input. Around the state restoring point (middle of the day) it doesn't
> get any input, so it can't lose anything either (right?).
>
> I will next try removing the allowedLateness entirely from the equation.
>
> In the meanwhile, please let me know if you have any suggestions for
> debugging the lost data, for example what logs to enable.
>
> We use FlinkKafkaConsumer010 btw. Are there any known issues with that,
> that could contribute to lost data when restoring a savepoint?
>
> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio  wrote:
>
>> Some data is silently lost on my Flink stream job when state is restored
>> from a savepoint.
>>
>> Do you have any debugging hints to find out where exactly the data gets
>> dropped?
>>
>> My job gathers distinct values using a 24-hour window. It doesn't have
>> any custom state management.
>>
>> When I cancel the job with savepoint and restore from that savepoint,
>> some data is missed. It seems to be losing just a small amount of data. The
>> event time of lost data is probably around the time of savepoint. In other
>> words the rest of the time window is not entirely missed – collection works
>> correctly also for (most of the) events that come in after restoring.
>>
>> When the job processes a full 24-hour window without interruptions it
>> doesn't miss anything.
>>
>> Usually the problem doesn't happen in test environments that have smaller
>> parallelism and smaller data volumes. But in production volumes the job
>> seems to be consistently missing at least something on every restore.
>>
>> This issue has consistently happened since the job was initially created.
>> It was at first run on an older version of Flink 1.5-SNAPSHOT and it still
>> happens on both Flink 1.5.2 & 1.6.0.
>>
>> I'm wondering if this could be for example some synchronization issue
>> between the kafka consumer offsets vs. what's been written by BucketingSink?
>>
>> 1. Job content, simplified
>>
>> kafkaStream
>> .flatMap(new ExtractFieldsFunction())
>> .keyBy(new MapKeySelector(1, 2, 3, 4))
>> .timeWindow(Time.days(1))
>> .allowedLateness(allowedLateness)
>> .sideOutputLateData(lateDataTag)
>> .reduce(new DistinctFunction())
>> .addSink(sink)
>> // use a fixed number of output partitions
>> .setParallelism(8))
>>
>> /**
>>  * Usage: .keyBy("the", "distinct", "fields").reduce(new
>> DistinctFunction())
>>  */
>> public class DistinctFunction implements
>> ReduceFunction> {
>> @Override
>> public Map reduce(Map value1,
>> Map value2) {
>> return value1;
>> }
>> }
>>
>> 2. State configuration
>>
>> boolean enableIncrementalCheckpointing = true;
>> String statePath = "s3n://bucket/savepoints";
>> new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);
>>
>> Checkpointing Mode Exactly Once
>> Interval 1m 0s
>> Timeout 10m 0s
>> Minimum Pause Between Checkpoints 1m 0s
>> Maximum Concurrent Checkpoints 1
>> Persist Checkpoints Externally Enabled (retain on cancellation)
>>
>> 3. BucketingSink configuration
>>
>> We use BucketingSink, I don't think there's anything special here, if not
>> the fact that we're writing to S3.
>>
>> String outputPath = "s3://bucket/output";
>> BucketingSink> sink = new
>> BucketingSink>(outputPath)
>> .setBucketer(new ProcessdateBucketer())
>> .setBatchSize(batchSize)
>> .setInactiveBucketThreshold(inactiveBucketThreshold)
>>
>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
>> sink.setWriter(new IdJsonWriter());
>>
>> 4. Kafka & event time
>>
>> My flink job reads the data from Kafka, using a
>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to
>> synchronize watermarks accross all kafka partitions. We also write late
>> data to side output, but nothing is written there – if it would, it could
>> explain missed data in the main output (I'm also sure that our late data
>> writing works, because we previously had some actual late data which ended
>> up there).
>>
>> 5. allowedLateness
>>
>> It may be or may not be relevant that I have also enabled allowedLateness
>> with 1 minute lateness on the 24-hour window:
>>
>> If that makes sense, I could try removing allowedLateness entirely? That
>> would be just to rule out that Flink doesn't have a bug that's related to
>> restoring state in combination with the allowedLateness feature. After all,
>> all of our data should be in a good enough order to not be late, given the
>> max out of orderness used on kafka consumer timestamp extractor.
>>
>> 

Re: Data loss when restoring from savepoint

2018-08-21 Thread Juho Autio
I realized that BucketingSink must not play any role in this problem. This
is because only when the 24-hour window triggers, BucketinSink gets a burst
of input. Around the state restoring point (middle of the day) it doesn't
get any input, so it can't lose anything either (right?).

I will next try removing the allowedLateness entirely from the equation.

In the meanwhile, please let me know if you have any suggestions for
debugging the lost data, for example what logs to enable.

We use FlinkKafkaConsumer010 btw. Are there any known issues with that,
that could contribute to lost data when restoring a savepoint?

On Fri, Aug 17, 2018 at 4:23 PM Juho Autio  wrote:

> Some data is silently lost on my Flink stream job when state is restored
> from a savepoint.
>
> Do you have any debugging hints to find out where exactly the data gets
> dropped?
>
> My job gathers distinct values using a 24-hour window. It doesn't have any
> custom state management.
>
> When I cancel the job with savepoint and restore from that savepoint, some
> data is missed. It seems to be losing just a small amount of data. The
> event time of lost data is probably around the time of savepoint. In other
> words the rest of the time window is not entirely missed – collection works
> correctly also for (most of the) events that come in after restoring.
>
> When the job processes a full 24-hour window without interruptions it
> doesn't miss anything.
>
> Usually the problem doesn't happen in test environments that have smaller
> parallelism and smaller data volumes. But in production volumes the job
> seems to be consistently missing at least something on every restore.
>
> This issue has consistently happened since the job was initially created.
> It was at first run on an older version of Flink 1.5-SNAPSHOT and it still
> happens on both Flink 1.5.2 & 1.6.0.
>
> I'm wondering if this could be for example some synchronization issue
> between the kafka consumer offsets vs. what's been written by BucketingSink?
>
> 1. Job content, simplified
>
> kafkaStream
> .flatMap(new ExtractFieldsFunction())
> .keyBy(new MapKeySelector(1, 2, 3, 4))
> .timeWindow(Time.days(1))
> .allowedLateness(allowedLateness)
> .sideOutputLateData(lateDataTag)
> .reduce(new DistinctFunction())
> .addSink(sink)
> // use a fixed number of output partitions
> .setParallelism(8))
>
> /**
>  * Usage: .keyBy("the", "distinct", "fields").reduce(new
> DistinctFunction())
>  */
> public class DistinctFunction implements
> ReduceFunction> {
> @Override
> public Map reduce(Map value1,
> Map value2) {
> return value1;
> }
> }
>
> 2. State configuration
>
> boolean enableIncrementalCheckpointing = true;
> String statePath = "s3n://bucket/savepoints";
> new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);
>
> Checkpointing Mode Exactly Once
> Interval 1m 0s
> Timeout 10m 0s
> Minimum Pause Between Checkpoints 1m 0s
> Maximum Concurrent Checkpoints 1
> Persist Checkpoints Externally Enabled (retain on cancellation)
>
> 3. BucketingSink configuration
>
> We use BucketingSink, I don't think there's anything special here, if not
> the fact that we're writing to S3.
>
> String outputPath = "s3://bucket/output";
> BucketingSink> sink = new
> BucketingSink>(outputPath)
> .setBucketer(new ProcessdateBucketer())
> .setBatchSize(batchSize)
> .setInactiveBucketThreshold(inactiveBucketThreshold)
>
> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
> sink.setWriter(new IdJsonWriter());
>
> 4. Kafka & event time
>
> My flink job reads the data from Kafka, using a
> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to
> synchronize watermarks accross all kafka partitions. We also write late
> data to side output, but nothing is written there – if it would, it could
> explain missed data in the main output (I'm also sure that our late data
> writing works, because we previously had some actual late data which ended
> up there).
>
> 5. allowedLateness
>
> It may be or may not be relevant that I have also enabled allowedLateness
> with 1 minute lateness on the 24-hour window:
>
> If that makes sense, I could try removing allowedLateness entirely? That
> would be just to rule out that Flink doesn't have a bug that's related to
> restoring state in combination with the allowedLateness feature. After all,
> all of our data should be in a good enough order to not be late, given the
> max out of orderness used on kafka consumer timestamp extractor.
>
> Thank you in advance!
>