Hi,

> Am 04.10.2018 um 16:08 schrieb Juho Autio <juho.au...@rovio.com>:
> 
> > you could take a look at Bravo [1] to query your savepoints and to check if 
> > the state in the savepoint complete w.r.t your expectations
> 
> Thanks. I'm not 100% if this is the case, but to me it seemed like the missed 
> ids were being logged by the reducer soon after the job had started (after 
> restoring a savepoint). But on the other hand, after that I also made another 
> savepoint & restored that, so what I could check is: does that next savepoint 
> have the missed ids that were logged (a couple of minutes before the 
> savepoint was created, so there should've been more than enough time to add 
> them to the state before the savepoint was triggered) or not. Any way, if I 
> would be able to verify with Bravo that the ids are missing from the 
> savepoint (even though reduced logged that it saw them), would that help in 
> figuring out where they are lost? Is there some major difference compared to 
> just looking at the final output after window has been triggered?


I think that makes a difference. For example, you can investigate if there is a 
state loss or a problem with the windowing. In the savepoint you could see 
which keys exists and to which windows they are assigned. Also just to make 
sure there is no misunderstanding: only elements that are in the state at the 
start of a savepoint are expected to be part of the savepoint; all elements 
between start and completion of the savepoint are not expected to be part of 
the savepoint.

> 
> > I also doubt that the problem is about backpressure after restore, because 
> > the job will only continue running after the state restore is already 
> > completed.
> 
> Yes, I'm not suspecting that the state restoring would be the problem either. 
> My concern was about backpressure possibly messing with the updates of 
> reducing state? I would tend to suspect that updating the state consistently 
> is what fails, where heavy load / backpressure might be a factor.


How would you assume that backpressure would influence your updates? Updates to 
each local state still happen event-by-event, in a single reader/writing thread.

> 
> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter <s.rich...@data-artisans.com 
> <mailto:s.rich...@data-artisans.com>> wrote:
> Hi,
> 
> you could take a look at Bravo [1] to query your savepoints and to check if 
> the state in the savepoint complete w.r.t your expectations. I somewhat doubt 
> that there is a general problem with the state/savepoints because many users 
> are successfully running it on a large state and I am not aware of any data 
> loss problems, but nothing is impossible. What the savepoint does is also 
> straight forward: iterate a db snapshot and write all key/value pairs to 
> disk, so all data that was in the db at the time of the savepoint, should 
> show up. I also doubt that the problem is about backpressure after restore, 
> because the job will only continue running after the state restore is already 
> completed. Did you check if you are using exactly-one-semantics or 
> at-least-once semantics? Also did you check that the kafka consumer start 
> position is configured properly [2]? Are watermarks generated as expected 
> after restore?
> 
> One more unrelated high-level comment that I have: for a granularity of 24h 
> windows, I wonder if it would not make sense to use a batch job instead?
> 
> Best,
> Stefan
> 
> [1] https://github.com/king/bravo <https://github.com/king/bravo>
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>  
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration>
> 
>> Am 04.10.2018 um 14:53 schrieb Juho Autio <juho.au...@rovio.com 
>> <mailto:juho.au...@rovio.com>>:
>> 
>> Thanks for the suggestions!
>> 
>> > In general, it would be tremendously helpful to have a minimal working 
>> > example which allows to reproduce the problem.
>> 
>> Definitely. The problem with reproducing has been that this only seems to 
>> happen in the bigger production data volumes.
>> 
>> That's why I'm hoping to find a way to debug this with the production data. 
>> With that it seems to consistently cause some misses every time the job is 
>> killed/restored.
>> 
>> > check if it happens for shorter windows, like 1h etc
>> 
>> What would be the benefit of that compared to 24h window?
>> 
>> > simplify the job to not use a reduce window but simply a time window which 
>> > outputs the window events. Then counting the input and output events 
>> > should allow you to verify the results. If you are not seeing missing 
>> > events, then it could have something to do with the reducing state used in 
>> > the reduce function.
>> 
>> Hm, maybe, but not sure how useful that would be, because it wouldn't yet 
>> prove that it's related to reducing, because not having a reduce function 
>> could also mean smaller load on the job, which might alone be enough to make 
>> the problem not manifest.
>> 
>> Is there a way to debug what goes into the reducing state (including what 
>> gets removed or overwritten and what restored), if that makes sense..? Maybe 
>> some suitable logging could be used to prove that the lost data is written 
>> to the reducing state (or at least asked to be written), but not found any 
>> more when the window closes and state is flushed?
>> 
>> On configuration once more, we're using RocksDB state backend with 
>> asynchronous incremental checkpointing. The state is restored from 
>> savepoints though, we haven't been using those checkpoints in these tests 
>> (although they could be used in case of crashes – but we haven't had those 
>> now).
>> 
>> On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann <trohrm...@apache.org 
>> <mailto:trohrm...@apache.org>> wrote:
>> Hi Juho,
>> 
>> another idea to further narrow down the problem could be to simplify the job 
>> to not use a reduce window but simply a time window which outputs the window 
>> events. Then counting the input and output events should allow you to verify 
>> the results. If you are not seeing missing events, then it could have 
>> something to do with the reducing state used in the reduce function.
>> 
>> In general, it would be tremendously helpful to have a minimal working 
>> example which allows to reproduce the problem.
>> 
>> Cheers,
>> Till
>> 
>> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin <and...@data-artisans.com 
>> <mailto:and...@data-artisans.com>> wrote:
>> Hi Juho,
>> 
>> can you try to reduce the job to minimal reproducible example and share the 
>> job and input?
>> 
>> For example:
>> - some simple records as input, e.g. tuples of primitive types saved as cvs
>> - minimal deduplication job which processes them and misses records
>> - check if it happens for shorter windows, like 1h etc
>> - setup which you use for the job, ideally locally reproducible or cloud
>> 
>> Best,
>> Andrey
>> 
>>> On 4 Oct 2018, at 11:13, Juho Autio <juho.au...@rovio.com 
>>> <mailto:juho.au...@rovio.com>> wrote:
>>> 
>>> Sorry to insist, but we seem to be blocked for any serious usage of state 
>>> in Flink if we can't rely on it to not miss data in case of restore.
>>> 
>>> Would anyone have suggestions for how to troubleshoot this? So far I have 
>>> verified with DEBUG logs that our reduce function gets to process also the 
>>> data that is missing from window output.
>>> 
>>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <juho.au...@rovio.com 
>>> <mailto:juho.au...@rovio.com>> wrote:
>>> Hi Andrey,
>>> 
>>> To rule out for good any questions about sink behaviour, the job was killed 
>>> and started with an additional Kafka sink.
>>> 
>>> The same number of ids were missed in both outputs: KafkaSink & 
>>> BucketingSink.
>>> 
>>> I wonder what would be the next steps in debugging?
>>> 
>>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <juho.au...@rovio.com 
>>> <mailto:juho.au...@rovio.com>> wrote:
>>> Thanks, Andrey.
>>> 
>>> > so it means that the savepoint does not loose at least some dropped 
>>> > records.
>>> 
>>> I'm not sure what you mean by that? I mean, it was known from the 
>>> beginning, that not everything is lost before/after restoring a savepoint, 
>>> just some records around the time of restoration. It's not 100% clear 
>>> whether records are lost before making a savepoint or after restoring it. 
>>> Although, based on the new DEBUG logs it seems more like losing some 
>>> records that are seen ~soon after restoring. It seems like Flink would be 
>>> somehow confused either about the restored state vs. new inserts to state. 
>>> This could also be somehow linked to the high back pressure on the kafka 
>>> source while the stream is catching up.
>>> 
>>> > If it is feasible for your setup, I suggest to insert one more map 
>>> > function after reduce and before sink.
>>> > etc.
>>> 
>>> Isn't that the same thing that we discussed before? Nothing is sent to 
>>> BucketingSink before the window closes, so I don't see how it would make 
>>> any difference if we replace the BucketingSink with a map function or 
>>> another sink type. We don't create or restore savepoints during the time 
>>> when BucketingSink gets input or has open buckets – that happens at a much 
>>> later time of day. I would focus on figuring out why the records are lost 
>>> while the window is open. But I don't know how to do that. Would you have 
>>> any additional suggestions?
>>> 
>>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin <and...@data-artisans.com 
>>> <mailto:and...@data-artisans.com>> wrote:
>>> Hi Juho,
>>> 
>>> so it means that the savepoint does not loose at least some dropped records.
>>> 
>>> If it is feasible for your setup, I suggest to insert one more map function 
>>> after reduce and before sink. 
>>> The map function should be called right after window is triggered but 
>>> before flushing to s3.
>>> The result of reduce (deduped record) could be logged there.
>>> This should allow to check whether the processed distinct records were 
>>> buffered in the state after the restoration from the savepoint or not. If 
>>> they were buffered we should see that there was an attempt to write them to 
>>> the sink from the state.
>>> 
>>> Another suggestion is to try to write records to some other sink or to 
>>> both. 
>>> E.g. if you can access file system of workers, maybe just into local files 
>>> and check whether the records are also dropped there.
>>> 
>>> Best,
>>> Andrey
>>> 
>>>> On 20 Sep 2018, at 15:37, Juho Autio <juho.au...@rovio.com 
>>>> <mailto:juho.au...@rovio.com>> wrote:
>>>> 
>>>> Hi Andrey!
>>>> 
>>>> I was finally able to gather the DEBUG logs that you suggested. In short, 
>>>> the reducer logged that it processed at least some of the ids that were 
>>>> missing from the output.
>>>> 
>>>> "At least some", because I didn't have the job running with DEBUG logs for 
>>>> the full 24-hour window period. So I was only able to look up if I can 
>>>> find some of the missing ids in the DEBUG logs. Which I did indeed.
>>>> 
>>>> I changed the DistinctFunction.java to do this:
>>>> 
>>>>     @Override
>>>>     public Map<String, String> reduce(Map<String, String> value1, 
>>>> Map<String, String> value2) {
>>>>         LOG.debug("DistinctFunction.reduce returns: {}={}", 
>>>> value1.get("field"), value1.get("id"));
>>>>         return value1;
>>>>     }
>>>> 
>>>> Then:
>>>> 
>>>> vi flink-1.6.0/conf/log4j.properties
>>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
>>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
>>>> 
>>>> Then I ran the following kind of test:
>>>> 
>>>> - Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC 
>>>> 2018
>>>> - Started a new cluster & job with DEBUG enabled at ~09:13, restored from 
>>>> that previous cluster's savepoint
>>>> - Ran until caught up offsets
>>>> - Cancelled the job with a new savepoint
>>>> - Started a new job _without_ DEBUG, which restored the new savepoint, let 
>>>> it keep running so that it will eventually write the output
>>>> 
>>>> Then on the next day, after results had been flushed when the 24-hour 
>>>> window closed, I compared the results again with a batch version's output. 
>>>> And found some missing ids as usual.
>>>> 
>>>> I drilled down to one specific missing id (I'm replacing the actual value 
>>>> with AN12345 below), which was not found in the stream output, but was 
>>>> found in batch output & flink DEBUG logs.
>>>> 
>>>> Related to that id, I gathered the following information:
>>>> 
>>>> 2018-09-18~09:13:21,000 job started & savepoint is restored
>>>> 
>>>> 2018-09-18 09:14:29,085 missing id is processed for the first time, proved 
>>>> by this log line:
>>>> 2018-09-18 09:14:29,085 DEBUG com.rovio.ds.flink.uniqueid.DistinctFunction 
>>>>                  - DistinctFunction.reduce returns: s.aid1=AN12345
>>>> 
>>>> 2018-09-18 09:15:14,264 first synchronous part of checkpoint
>>>> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint
>>>> 
>>>> (
>>>>    more occurrences of checkpoints (~1 min checkpointing time + ~1 min 
>>>> delay before next)
>>>>    /
>>>>    more occurrences of DistinctFunction.reduce
>>>> )
>>>> 
>>>> 2018-09-18 09:23:45,053 missing id is processed for the last time
>>>> 
>>>> 2018-09-18~10:20:00,000 savepoint created & job cancelled
>>>> 
>>>> To be noted, there was high backpressure after restoring from savepoint 
>>>> until the stream caught up with the kafka offsets. Although, our job uses 
>>>> assign timestamps & watermarks on the flink kafka consumer itself, so 
>>>> event time of all partitions is synchronized. As expected, we don't get 
>>>> any late data in the late data side output.
>>>> 
>>>> From this we can see that the missing ids are processed by the reducer, 
>>>> but they must get lost somewhere before the 24-hour window is triggered.
>>>> 
>>>> I think it's worth mentioning once more that the stream doesn't miss any 
>>>> ids if we let it's running without interruptions / state restoring.
>>>> 
>>>> What's next?
>>>> 
>>>> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin <and...@data-artisans.com 
>>>> <mailto:and...@data-artisans.com>> wrote:
>>>> Hi Juho,
>>>> 
>>>> > only when the 24-hour window triggers, BucketingSink gets a burst of 
>>>> > input
>>>> 
>>>> This is of course totally true, my understanding is the same. We cannot 
>>>> exclude problem there for sure, just savepoints are used a lot w/o problem 
>>>> reports and BucketingSink is known to be problematic with s3. That is why, 
>>>> I asked you:
>>>> 
>>>> > You also wrote that the timestamps of lost event are 'probably' around 
>>>> > the time of the savepoint, if it is not yet for sure I would also check 
>>>> > it.
>>>> 
>>>> Although, bucketing sink might loose any data at the end of the day (also 
>>>> from the middle). The fact, that it is always around the time of taking a 
>>>> savepoint and not random, is surely suspicious and possible savepoint 
>>>> failures need to be investigated.
>>>> 
>>>> Regarding the s3 problem, s3 doc says:
>>>> 
>>>> > The caveat is that if you make a HEAD or GET request to the key name (to 
>>>> > find if the object exists) before creating the object, Amazon S3 
>>>> > provides 'eventual consistency' for read-after-write.
>>>> 
>>>> The algorithm you suggest is how it is roughly implemented now 
>>>> (BucketingSink.openNewPartFile). My understanding is that 'eventual 
>>>> consistency’ means that even if you just created file (its name is key) it 
>>>> can be that you do not get it in the list or exists (HEAD) returns false 
>>>> and you risk to rewrite the previous part.
>>>> 
>>>> The BucketingSink was designed for a standard file system. s3 is used over 
>>>> a file system wrapper atm but does not always provide normal file system 
>>>> guarantees. See also last example in [1].
>>>> 
>>>> Cheers,
>>>> Andrey
>>>> 
>>>> [1] 
>>>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82
>>>>  
>>>> <https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82>
>>>> 
>>>>> On 29 Aug 2018, at 12:11, Juho Autio <juho.au...@rovio.com 
>>>>> <mailto:juho.au...@rovio.com>> wrote:
>>>>> 
>>>>> Andrey, thank you very much for the debugging suggestions, I'll try them.
>>>>> 
>>>>> In the meanwhile two more questions, please:
>>>>> 
>>>>> > Just to keep in mind this problem with s3 and exclude it for sure. I 
>>>>> > would also check whether the size of missing events is around the batch 
>>>>> > size of BucketingSink or not.
>>>>> 
>>>>> Fair enough, but I also want to focus on debugging the most probable 
>>>>> subject first. So what do you think about this – true or false: only when 
>>>>> the 24-hour window triggers, BucketinSink gets a burst of input. Around 
>>>>> the state restoring point (middle of the day) it doesn't get any input, 
>>>>> so it can't lose anything either. Isn't this true, or have I totally 
>>>>> missed how Flink works in triggering window results? I would not expect 
>>>>> there to be any optimization that speculatively triggers early results of 
>>>>> a regular time window to the downstream operators.
>>>>> 
>>>>> > The old BucketingSink has in general problem with s3. Internally 
>>>>> > BucketingSink queries s3 as a file system to list already written file 
>>>>> > parts (batches) and determine index of the next part to start. Due to 
>>>>> > eventual consistency of checking file existence in s3 [1], the 
>>>>> > BucketingSink can rewrite the previously written part and basically 
>>>>> > loose it.
>>>>> 
>>>>> I was wondering, what does S3's "read-after-write consistency" (mentioned 
>>>>> on the page you linked) actually mean. It seems that this might be 
>>>>> possible:
>>>>> - LIST keys, find current max index
>>>>> - choose next index = max + 1
>>>>> - HEAD next index: if it exists, keep adding + 1 until key doesn't exist 
>>>>> on S3
>>>>> 
>>>>> But definitely sounds easier if a sink keeps track of files in a way 
>>>>> that's guaranteed to be consistent.
>>>>> 
>>>>> Cheers,
>>>>> Juho
>>>>> 
>>>>> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <and...@data-artisans.com 
>>>>> <mailto:and...@data-artisans.com>> wrote:
>>>>> Hi,
>>>>> 
>>>>> true, StreamingFileSink does not support s3 in 1.6.0, it is planned for 
>>>>> the next 1.7 release, sorry for confusion.
>>>>> The old BucketingSink has in general problem with s3. Internally 
>>>>> BucketingSink queries s3 as a file system 
>>>>> to list already written file parts (batches) and determine index of the 
>>>>> next part to start. Due to eventual consistency of checking file 
>>>>> existence in s3 [1], the BucketingSink can rewrite the previously written 
>>>>> part and basically loose it. It should be fixed for StreamingFileSink in 
>>>>> 1.7 where Flink keeps its own track of written parts and does not rely on 
>>>>> s3 as a file system. 
>>>>> I also include Kostas, he might add more details. 
>>>>> 
>>>>> Just to keep in mind this problem with s3 and exclude it for sure  I 
>>>>> would also check whether the size of missing events is around the batch 
>>>>> size of BucketingSink or not. You also wrote that the timestamps of lost 
>>>>> event are 'probably' around the time of the savepoint, if it is not yet 
>>>>> for sure I would also check it.
>>>>> 
>>>>> Have you already checked the log files of job manager and task managers 
>>>>> for the job running before and after the restore from the check point? Is 
>>>>> everything successful there, no errors, relevant warnings or exceptions?
>>>>> 
>>>>> As the next step, I would suggest to log all encountered events in 
>>>>> DistinctFunction.reduce if possible for production data and check whether 
>>>>> the missed events are eventually processed before or after the savepoint. 
>>>>> The following log message indicates a border between the events that 
>>>>> should be included into the savepoint (logged before) or not:
>>>>> “{} ({}, synchronous part) in thread {} took {} ms” (template)
>>>>> Also check if the savepoint has been overall completed:
>>>>> "{} ({}, asynchronous part) in thread {} took {} ms."
>>>>> 
>>>>> Best,
>>>>> Andrey
>>>>> 
>>>>> [1] https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html 
>>>>> <https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html>
>>>>> 
>>>>>> On 24 Aug 2018, at 20:41, Juho Autio <juho.au...@rovio.com 
>>>>>> <mailto:juho.au...@rovio.com>> wrote:
>>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> Using StreamingFileSink is not a convenient option for production use 
>>>>>> for us as it doesn't support s3*. I could use StreamingFileSink just to 
>>>>>> verify, but I don't see much point in doing so. Please consider my 
>>>>>> previous comment:
>>>>>> 
>>>>>> > I realized that BucketingSink must not play any role in this problem. 
>>>>>> > This is because only when the 24-hour window triggers, BucketingSink 
>>>>>> > gets a burst of input. Around the state restoring point (middle of the 
>>>>>> > day) it doesn't get any input, so it can't lose anything either 
>>>>>> > (right?).
>>>>>> 
>>>>>> I could also use a kafka sink instead, but I can't imagine how there 
>>>>>> could be any difference. It's very real that the sink doesn't get any 
>>>>>> input for a long time until the 24-hour window closes, and then it 
>>>>>> quickly writes out everything because it's not that much data eventually 
>>>>>> for the distinct values.
>>>>>> 
>>>>>> Any ideas for debugging what's happening around the savepoint & 
>>>>>> restoration time?
>>>>>> 
>>>>>> *) I actually implemented StreamingFileSink as an alternative sink. This 
>>>>>> was before I came to realize that most likely the sink component has 
>>>>>> nothing to do with the data loss problem. I tried it with s3n:// path 
>>>>>> just to see an exception being thrown. In the source code I indeed then 
>>>>>> found an explicit check for the target path scheme to be "hdfs://". 
>>>>>> 
>>>>>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin 
>>>>>> <and...@data-artisans.com <mailto:and...@data-artisans.com>> wrote:
>>>>>> Ok, I think before further debugging the window reduced state, 
>>>>>> could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 
>>>>>> instead of the previous 'BucketingSink’?
>>>>>> 
>>>>>> Cheers,
>>>>>> Andrey
>>>>>> 
>>>>>> [1] 
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>>>>>>  
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html>
>>>>>> 
>>>>>>> On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com 
>>>>>>> <mailto:juho.au...@rovio.com>> wrote:
>>>>>>> 
>>>>>>> Yes, sorry for my confusing comment. I just meant that it seems like 
>>>>>>> there's a bug somewhere now that the output is missing some data.
>>>>>>> 
>>>>>>> > I would wait and check the actual output in s3 because it is the main 
>>>>>>> > result of the job
>>>>>>> 
>>>>>>> Yes, and that's what I have already done. There seems to be always some 
>>>>>>> data loss with the production data volumes, if the job has been 
>>>>>>> restarted on that day.
>>>>>>> 
>>>>>>> Would you have any suggestions for how to debug this further?
>>>>>>> 
>>>>>>> Many thanks for stepping in.
>>>>>>> 
>>>>>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin 
>>>>>>> <and...@data-artisans.com <mailto:and...@data-artisans.com>> wrote:
>>>>>>> Hi Juho,
>>>>>>> 
>>>>>>> So it is a per key deduplication job.
>>>>>>> 
>>>>>>> Yes, I would wait and check the actual output in s3 because it is the 
>>>>>>> main result of the job and
>>>>>>> 
>>>>>>> > The late data around the time of taking savepoint might be not 
>>>>>>> > included into the savepoint but it should be behind the snapshotted 
>>>>>>> > offset in Kafka.
>>>>>>> 
>>>>>>> is not a bug, it is a possible behaviour.
>>>>>>> 
>>>>>>> The savepoint is a snapshot of the data in transient which is already 
>>>>>>> consumed from Kafka.
>>>>>>> Basically the full contents of the window result is split between the 
>>>>>>> savepoint and what can come after the savepoint'ed offset in Kafka but 
>>>>>>> before the window result is written into s3. 
>>>>>>> 
>>>>>>> Allowed lateness should not affect it, I am just saying that the final 
>>>>>>> result in s3 should include all records after it. 
>>>>>>> This is what should be guaranteed but not the contents of the 
>>>>>>> intermediate savepoint.
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Andrey
>>>>>>> 
>>>>>>>> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com 
>>>>>>>> <mailto:juho.au...@rovio.com>> wrote:
>>>>>>>> 
>>>>>>>> Thanks for your answer!
>>>>>>>> 
>>>>>>>> I check for the missed data from the final output on s3. So I wait 
>>>>>>>> until the next day, then run the same thing re-implemented in batch, 
>>>>>>>> and compare the output.
>>>>>>>> 
>>>>>>>> > The late data around the time of taking savepoint might be not 
>>>>>>>> > included into the savepoint but it should be behind the snapshotted 
>>>>>>>> > offset in Kafka.
>>>>>>>> 
>>>>>>>> Yes, I would definitely expect that. It seems like there's a bug 
>>>>>>>> somewhere.
>>>>>>>> 
>>>>>>>> > Then it should just come later after the restore and should be 
>>>>>>>> > reduced within the allowed lateness into the final result which is 
>>>>>>>> > saved into s3.
>>>>>>>> 
>>>>>>>> Well, as far as I know, allowed lateness doesn't play any role here, 
>>>>>>>> because I started running the job with allowedLateness=0, and still 
>>>>>>>> get the data loss, while my late data output doesn't receive anything.
>>>>>>>> 
>>>>>>>> > Also, is this `DistinctFunction.reduce` just an example or the 
>>>>>>>> > actual implementation, basically saving just one of records inside 
>>>>>>>> > the 24h window in s3? then what is missing there?
>>>>>>>> 
>>>>>>>> Yes, it's the actual implementation. Note that there's a keyBy before 
>>>>>>>> the DistinctFunction. So there's one record for each key (which is the 
>>>>>>>> combination of a couple of fields). In practice I've seen that we're 
>>>>>>>> missing ~2000-4000 elements on each restore, and the total output is 
>>>>>>>> obviously much more than that.
>>>>>>>> 
>>>>>>>> Here's the full code for the key selector:
>>>>>>>> 
>>>>>>>> public class MapKeySelector implements KeySelector<Map<String,String>, 
>>>>>>>> Object> {
>>>>>>>> 
>>>>>>>>     private final String[] fields;
>>>>>>>> 
>>>>>>>>     public MapKeySelector(String... fields) {
>>>>>>>>         this.fields = fields;
>>>>>>>>     }
>>>>>>>> 
>>>>>>>>     @Override
>>>>>>>>     public Object getKey(Map<String, String> event) throws Exception {
>>>>>>>>         Tuple key = Tuple.getTupleClass(fields.length).newInstance();
>>>>>>>>         for (int i = 0; i < fields.length; i++) {
>>>>>>>>             key.setField(event.getOrDefault(fields[i], ""), i);
>>>>>>>>         }
>>>>>>>>         return key;
>>>>>>>>     }
>>>>>>>> }
>>>>>>>> 
>>>>>>>> And a more exact example on how it's used:
>>>>>>>> 
>>>>>>>>                 .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", 
>>>>>>>> "KEY_NAME", "KEY_VALUE"))
>>>>>>>>                 .timeWindow(Time.days(1))
>>>>>>>>                 .reduce(new DistinctFunction())
>>>>>>>> 
>>>>>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin 
>>>>>>>> <and...@data-artisans.com <mailto:and...@data-artisans.com>> wrote:
>>>>>>>> Hi Juho,
>>>>>>>> 
>>>>>>>> Where exactly does the data miss? When do you notice that? 
>>>>>>>> Do you check it:
>>>>>>>> - debugging `DistinctFunction.reduce` right after resume in the middle 
>>>>>>>> of the day 
>>>>>>>> or 
>>>>>>>> - some distinct records miss in the final output of BucketingSink in 
>>>>>>>> s3 after window result is actually triggered and saved into s3 at the 
>>>>>>>> end of the day? is this the main output?
>>>>>>>> 
>>>>>>>> The late data around the time of taking savepoint might be not 
>>>>>>>> included into the savepoint but it should be behind the snapshotted 
>>>>>>>> offset in Kafka. Then it should just come later after the restore and 
>>>>>>>> should be reduced within the allowed lateness into the final result 
>>>>>>>> which is saved into s3.
>>>>>>>> 
>>>>>>>> Also, is this `DistinctFunction.reduce` just an example or the actual 
>>>>>>>> implementation, basically saving just one of records inside the 24h 
>>>>>>>> window in s3? then what is missing there?
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Andrey
>>>>>>>> 
>>>>>>>>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com 
>>>>>>>>> <mailto:juho.au...@rovio.com>> wrote:
>>>>>>>>> 
>>>>>>>>> I changed to allowedLateness=0, no change, still missing data when 
>>>>>>>>> restoring from savepoint.
>>>>>>>>> 
>>>>>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <juho.au...@rovio.com 
>>>>>>>>> <mailto:juho.au...@rovio.com>> wrote:
>>>>>>>>> I realized that BucketingSink must not play any role in this problem. 
>>>>>>>>> This is because only when the 24-hour window triggers, BucketinSink 
>>>>>>>>> gets a burst of input. Around the state restoring point (middle of 
>>>>>>>>> the day) it doesn't get any input, so it can't lose anything either 
>>>>>>>>> (right?).
>>>>>>>>> 
>>>>>>>>> I will next try removing the allowedLateness entirely from the 
>>>>>>>>> equation.
>>>>>>>>> 
>>>>>>>>> In the meanwhile, please let me know if you have any suggestions for 
>>>>>>>>> debugging the lost data, for example what logs to enable.
>>>>>>>>> 
>>>>>>>>> We use FlinkKafkaConsumer010 btw. Are there any known issues with 
>>>>>>>>> that, that could contribute to lost data when restoring a savepoint?
>>>>>>>>> 
>>>>>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <juho.au...@rovio.com 
>>>>>>>>> <mailto:juho.au...@rovio.com>> wrote:
>>>>>>>>> Some data is silently lost on my Flink stream job when state is 
>>>>>>>>> restored from a savepoint.
>>>>>>>>> 
>>>>>>>>> Do you have any debugging hints to find out where exactly the data 
>>>>>>>>> gets dropped?
>>>>>>>>> 
>>>>>>>>> My job gathers distinct values using a 24-hour window. It doesn't 
>>>>>>>>> have any custom state management.
>>>>>>>>> 
>>>>>>>>> When I cancel the job with savepoint and restore from that savepoint, 
>>>>>>>>> some data is missed. It seems to be losing just a small amount of 
>>>>>>>>> data. The event time of lost data is probably around the time of 
>>>>>>>>> savepoint. In other words the rest of the time window is not entirely 
>>>>>>>>> missed – collection works correctly also for (most of the) events 
>>>>>>>>> that come in after restoring.
>>>>>>>>> 
>>>>>>>>> When the job processes a full 24-hour window without interruptions it 
>>>>>>>>> doesn't miss anything.
>>>>>>>>> 
>>>>>>>>> Usually the problem doesn't happen in test environments that have 
>>>>>>>>> smaller parallelism and smaller data volumes. But in production 
>>>>>>>>> volumes the job seems to be consistently missing at least something 
>>>>>>>>> on every restore.
>>>>>>>>> 
>>>>>>>>> This issue has consistently happened since the job was initially 
>>>>>>>>> created. It was at first run on an older version of Flink 
>>>>>>>>> 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.
>>>>>>>>> 
>>>>>>>>> I'm wondering if this could be for example some synchronization issue 
>>>>>>>>> between the kafka consumer offsets vs. what's been written by 
>>>>>>>>> BucketingSink?
>>>>>>>>> 
>>>>>>>>> 1. Job content, simplified
>>>>>>>>> 
>>>>>>>>>         kafkaStream
>>>>>>>>>                 .flatMap(new ExtractFieldsFunction())
>>>>>>>>>                 .keyBy(new MapKeySelector(1, 2, 3, 4))
>>>>>>>>>                 .timeWindow(Time.days(1))
>>>>>>>>>                 .allowedLateness(allowedLateness)
>>>>>>>>>                 .sideOutputLateData(lateDataTag)
>>>>>>>>>                 .reduce(new DistinctFunction())
>>>>>>>>>                 .addSink(sink)
>>>>>>>>>                 // use a fixed number of output partitions
>>>>>>>>>                 .setParallelism(8))
>>>>>>>>> 
>>>>>>>>> /**
>>>>>>>>>  * Usage: .keyBy("the", "distinct", "fields").reduce(new 
>>>>>>>>> DistinctFunction())
>>>>>>>>>  */
>>>>>>>>> public class DistinctFunction implements 
>>>>>>>>> ReduceFunction<java.util.Map<String, String>> {
>>>>>>>>>     @Override
>>>>>>>>>     public Map<String, String> reduce(Map<String, String> value1, 
>>>>>>>>> Map<String, String> value2) {
>>>>>>>>>         return value1;
>>>>>>>>>     }
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> 2. State configuration
>>>>>>>>> 
>>>>>>>>> boolean enableIncrementalCheckpointing = true;
>>>>>>>>> String statePath = "s3n://bucket/savepoints <>";
>>>>>>>>> new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);
>>>>>>>>> 
>>>>>>>>> Checkpointing Mode    Exactly Once
>>>>>>>>> Interval      1m 0s
>>>>>>>>> Timeout       10m 0s
>>>>>>>>> Minimum Pause Between Checkpoints     1m 0s
>>>>>>>>> Maximum Concurrent Checkpoints        1
>>>>>>>>> Persist Checkpoints Externally        Enabled (retain on cancellation)
>>>>>>>>> 
>>>>>>>>> 3. BucketingSink configuration
>>>>>>>>> 
>>>>>>>>> We use BucketingSink, I don't think there's anything special here, if 
>>>>>>>>> not the fact that we're writing to S3.
>>>>>>>>> 
>>>>>>>>>         String outputPath = "s3://bucket/output <>";
>>>>>>>>>         BucketingSink<Map<String, String>> sink = new 
>>>>>>>>> BucketingSink<Map<String, String>>(outputPath)
>>>>>>>>>                 .setBucketer(new ProcessdateBucketer())
>>>>>>>>>                 .setBatchSize(batchSize)
>>>>>>>>>                 .setInactiveBucketThreshold(inactiveBucketThreshold)
>>>>>>>>>                 
>>>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
>>>>>>>>>         sink.setWriter(new IdJsonWriter());
>>>>>>>>> 
>>>>>>>>> 4. Kafka & event time
>>>>>>>>> 
>>>>>>>>> My flink job reads the data from Kafka, using a 
>>>>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to 
>>>>>>>>> synchronize watermarks accross all kafka partitions. We also write 
>>>>>>>>> late data to side output, but nothing is written there – if it would, 
>>>>>>>>> it could explain missed data in the main output (I'm also sure that 
>>>>>>>>> our late data writing works, because we previously had some actual 
>>>>>>>>> late data which ended up there).
>>>>>>>>> 
>>>>>>>>> 5. allowedLateness
>>>>>>>>> 
>>>>>>>>> It may be or may not be relevant that I have also enabled 
>>>>>>>>> allowedLateness with 1 minute lateness on the 24-hour window:
>>>>>>>>> 
>>>>>>>>> If that makes sense, I could try removing allowedLateness entirely? 
>>>>>>>>> That would be just to rule out that Flink doesn't have a bug that's 
>>>>>>>>> related to restoring state in combination with the allowedLateness 
>>>>>>>>> feature. After all, all of our data should be in a good enough order 
>>>>>>>>> to not be late, given the max out of orderness used on kafka consumer 
>>>>>>>>> timestamp extractor.
>>>>>>>>> 
>>>>>>>>> Thank you in advance!
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>>> 
>>> 
>>> 
>>> 
>>> 
>> 
>> 
> 
> 

Reply via email to