Good then, I'll try to analyze the savepoints with Bravo. Thanks! > How would you assume that backpressure would influence your updates? Updates to each local state still happen event-by-event, in a single reader/writing thread.
Sure, just an ignorant guess by me. I'm not familiar with most of Flink's internals. Any way high backpressure is not a seen on this job after it has caught up the lag, so at I thought it would be worth mentioning. On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter <s.rich...@data-artisans.com> wrote: > Hi, > > Am 04.10.2018 um 16:08 schrieb Juho Autio <juho.au...@rovio.com>: > > > you could take a look at Bravo [1] to query your savepoints and to check > if the state in the savepoint complete w.r.t your expectations > > Thanks. I'm not 100% if this is the case, but to me it seemed like the > missed ids were being logged by the reducer soon after the job had started > (after restoring a savepoint). But on the other hand, after that I also > made another savepoint & restored that, so what I could check is: does that > next savepoint have the missed ids that were logged (a couple of minutes > before the savepoint was created, so there should've been more than enough > time to add them to the state before the savepoint was triggered) or not. > Any way, if I would be able to verify with Bravo that the ids are missing > from the savepoint (even though reduced logged that it saw them), would > that help in figuring out where they are lost? Is there some major > difference compared to just looking at the final output after window has > been triggered? > > > > I think that makes a difference. For example, you can investigate if there > is a state loss or a problem with the windowing. In the savepoint you could > see which keys exists and to which windows they are assigned. Also just to > make sure there is no misunderstanding: only elements that are in the state > at the start of a savepoint are expected to be part of the savepoint; all > elements between start and completion of the savepoint are not expected to > be part of the savepoint. > > > > I also doubt that the problem is about backpressure after restore, > because the job will only continue running after the state restore is > already completed. > > Yes, I'm not suspecting that the state restoring would be the problem > either. My concern was about backpressure possibly messing with the updates > of reducing state? I would tend to suspect that updating the state > consistently is what fails, where heavy load / backpressure might be a > factor. > > > > How would you assume that backpressure would influence your updates? > Updates to each local state still happen event-by-event, in a single > reader/writing thread. > > > On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter <s.rich...@data-artisans.com> > wrote: > >> Hi, >> >> you could take a look at Bravo [1] to query your savepoints and to check >> if the state in the savepoint complete w.r.t your expectations. I somewhat >> doubt that there is a general problem with the state/savepoints because >> many users are successfully running it on a large state and I am not aware >> of any data loss problems, but nothing is impossible. What the savepoint >> does is also straight forward: iterate a db snapshot and write all >> key/value pairs to disk, so all data that was in the db at the time of the >> savepoint, should show up. I also doubt that the problem is about >> backpressure after restore, because the job will only continue running >> after the state restore is already completed. Did you check if you are >> using exactly-one-semantics or at-least-once semantics? Also did you check >> that the kafka consumer start position is configured properly [2]? Are >> watermarks generated as expected after restore? >> >> One more unrelated high-level comment that I have: for a granularity of >> 24h windows, I wonder if it would not make sense to use a batch job instead? >> >> Best, >> Stefan >> >> [1] https://github.com/king/bravo >> [2] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration >> >> Am 04.10.2018 um 14:53 schrieb Juho Autio <juho.au...@rovio.com>: >> >> Thanks for the suggestions! >> >> > In general, it would be tremendously helpful to have a minimal working >> example which allows to reproduce the problem. >> >> Definitely. The problem with reproducing has been that this only seems to >> happen in the bigger production data volumes. >> >> That's why I'm hoping to find a way to debug this with the production >> data. With that it seems to consistently cause some misses every time the >> job is killed/restored. >> >> > check if it happens for shorter windows, like 1h etc >> >> What would be the benefit of that compared to 24h window? >> >> > simplify the job to not use a reduce window but simply a time window >> which outputs the window events. Then counting the input and output events >> should allow you to verify the results. If you are not seeing missing >> events, then it could have something to do with the reducing state used in >> the reduce function. >> >> Hm, maybe, but not sure how useful that would be, because it wouldn't yet >> prove that it's related to reducing, because not having a reduce function >> could also mean smaller load on the job, which might alone be enough to >> make the problem not manifest. >> >> Is there a way to debug what goes into the reducing state (including what >> gets removed or overwritten and what restored), if that makes sense..? >> Maybe some suitable logging could be used to prove that the lost data is >> written to the reducing state (or at least asked to be written), but not >> found any more when the window closes and state is flushed? >> >> On configuration once more, we're using RocksDB state backend with >> asynchronous incremental checkpointing. The state is restored from >> savepoints though, we haven't been using those checkpoints in these tests >> (although they could be used in case of crashes – but we haven't had those >> now). >> >> On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hi Juho, >>> >>> another idea to further narrow down the problem could be to simplify the >>> job to not use a reduce window but simply a time window which outputs the >>> window events. Then counting the input and output events should allow you >>> to verify the results. If you are not seeing missing events, then it could >>> have something to do with the reducing state used in the reduce function. >>> >>> In general, it would be tremendously helpful to have a minimal working >>> example which allows to reproduce the problem. >>> >>> Cheers, >>> Till >>> >>> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin <and...@data-artisans.com> >>> wrote: >>> >>>> Hi Juho, >>>> >>>> can you try to reduce the job to minimal reproducible example and share >>>> the job and input? >>>> >>>> For example: >>>> - some simple records as input, e.g. tuples of primitive types saved as >>>> cvs >>>> - minimal deduplication job which processes them and misses records >>>> - check if it happens for shorter windows, like 1h etc >>>> - setup which you use for the job, ideally locally reproducible or cloud >>>> >>>> Best, >>>> Andrey >>>> >>>> On 4 Oct 2018, at 11:13, Juho Autio <juho.au...@rovio.com> wrote: >>>> >>>> Sorry to insist, but we seem to be blocked for any serious usage of >>>> state in Flink if we can't rely on it to not miss data in case of restore. >>>> >>>> Would anyone have suggestions for how to troubleshoot this? So far I >>>> have verified with DEBUG logs that our reduce function gets to process also >>>> the data that is missing from window output. >>>> >>>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <juho.au...@rovio.com> >>>> wrote: >>>> >>>>> Hi Andrey, >>>>> >>>>> To rule out for good any questions about sink behaviour, the job was >>>>> killed and started with an additional Kafka sink. >>>>> >>>>> The same number of ids were missed in both outputs: KafkaSink & >>>>> BucketingSink. >>>>> >>>>> I wonder what would be the next steps in debugging? >>>>> >>>>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <juho.au...@rovio.com> >>>>> wrote: >>>>> >>>>>> Thanks, Andrey. >>>>>> >>>>>> > so it means that the savepoint does not loose at least some dropped >>>>>> records. >>>>>> >>>>>> I'm not sure what you mean by that? I mean, it was known from the >>>>>> beginning, that not everything is lost before/after restoring a >>>>>> savepoint, >>>>>> just some records around the time of restoration. It's not 100% clear >>>>>> whether records are lost before making a savepoint or after restoring it. >>>>>> Although, based on the new DEBUG logs it seems more like losing some >>>>>> records that are seen ~soon after restoring. It seems like Flink would be >>>>>> somehow confused either about the restored state vs. new inserts to >>>>>> state. >>>>>> This could also be somehow linked to the high back pressure on the kafka >>>>>> source while the stream is catching up. >>>>>> >>>>>> > If it is feasible for your setup, I suggest to insert one more map >>>>>> function after reduce and before sink. >>>>>> > etc. >>>>>> >>>>>> Isn't that the same thing that we discussed before? Nothing is sent >>>>>> to BucketingSink before the window closes, so I don't see how it would >>>>>> make >>>>>> any difference if we replace the BucketingSink with a map function or >>>>>> another sink type. We don't create or restore savepoints during the time >>>>>> when BucketingSink gets input or has open buckets – that happens at a >>>>>> much >>>>>> later time of day. I would focus on figuring out why the records are lost >>>>>> while the window is open. But I don't know how to do that. Would you have >>>>>> any additional suggestions? >>>>>> >>>>>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin < >>>>>> and...@data-artisans.com> wrote: >>>>>> >>>>>>> Hi Juho, >>>>>>> >>>>>>> so it means that the savepoint does not loose at least some dropped >>>>>>> records. >>>>>>> >>>>>>> If it is feasible for your setup, I suggest to insert one more map >>>>>>> function after reduce and before sink. >>>>>>> The map function should be called right after window is triggered >>>>>>> but before flushing to s3. >>>>>>> The result of reduce (deduped record) could be logged there. >>>>>>> This should allow to check whether the processed distinct records >>>>>>> were buffered in the state after the restoration from the savepoint or >>>>>>> not. >>>>>>> If they were buffered we should see that there was an attempt to write >>>>>>> them >>>>>>> to the sink from the state. >>>>>>> >>>>>>> Another suggestion is to try to write records to some other sink or >>>>>>> to both. >>>>>>> E.g. if you can access file system of workers, maybe just into local >>>>>>> files and check whether the records are also dropped there. >>>>>>> >>>>>>> Best, >>>>>>> Andrey >>>>>>> >>>>>>> On 20 Sep 2018, at 15:37, Juho Autio <juho.au...@rovio.com> wrote: >>>>>>> >>>>>>> Hi Andrey! >>>>>>> >>>>>>> I was finally able to gather the DEBUG logs that you suggested. In >>>>>>> short, the reducer logged that it processed at least some of the ids >>>>>>> that >>>>>>> were missing from the output. >>>>>>> >>>>>>> "At least some", because I didn't have the job running with DEBUG >>>>>>> logs for the full 24-hour window period. So I was only able to look up >>>>>>> if I >>>>>>> can find *some* of the missing ids in the DEBUG logs. Which I did >>>>>>> indeed. >>>>>>> >>>>>>> I changed the DistinctFunction.java to do this: >>>>>>> >>>>>>> @Override >>>>>>> public Map<String, String> reduce(Map<String, String> value1, >>>>>>> Map<String, String> value2) { >>>>>>> LOG.debug("DistinctFunction.reduce returns: {}={}", >>>>>>> value1.get("field"), value1.get("id")); >>>>>>> return value1; >>>>>>> } >>>>>>> >>>>>>> Then: >>>>>>> >>>>>>> vi flink-1.6.0/conf/log4j.properties >>>>>>> >>>>>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG >>>>>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG >>>>>>> >>>>>>> Then I ran the following kind of test: >>>>>>> >>>>>>> - Cancelled the on-going job with savepoint created at ~Sep 18 08:35 >>>>>>> UTC 2018 >>>>>>> - Started a new cluster & job with DEBUG enabled at ~09:13, restored >>>>>>> from that previous cluster's savepoint >>>>>>> - Ran until caught up offsets >>>>>>> - Cancelled the job with a new savepoint >>>>>>> - Started a new job _without_ DEBUG, which restored the new >>>>>>> savepoint, let it keep running so that it will eventually write the >>>>>>> output >>>>>>> >>>>>>> Then on the next day, after results had been flushed when the >>>>>>> 24-hour window closed, I compared the results again with a batch >>>>>>> version's >>>>>>> output. And found some missing ids as usual. >>>>>>> >>>>>>> I drilled down to one specific missing id (I'm replacing the actual >>>>>>> value with AN12345 below), which was not found in the stream output, but >>>>>>> was found in batch output & flink DEBUG logs. >>>>>>> >>>>>>> Related to that id, I gathered the following information: >>>>>>> >>>>>>> 2018-09-18~09:13:21,000 job started & savepoint is restored >>>>>>> >>>>>>> 2018-09-18 09:14:29,085 missing id is processed for the first time, >>>>>>> proved by this log line: >>>>>>> 2018-09-18 09:14:29,085 DEBUG >>>>>>> com.rovio.ds.flink.uniqueid.DistinctFunction - >>>>>>> DistinctFunction.reduce returns: s.aid1=AN12345 >>>>>>> >>>>>>> 2018-09-18 09:15:14,264 first synchronous part of checkpoint >>>>>>> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint >>>>>>> >>>>>>> ( >>>>>>> more occurrences of checkpoints (~1 min checkpointing time + ~1 min >>>>>>> delay before next) >>>>>>> / >>>>>>> more occurrences of DistinctFunction.reduce >>>>>>> ) >>>>>>> >>>>>>> 2018-09-18 09:23:45,053 missing id is processed for the last time >>>>>>> >>>>>>> 2018-09-18~10:20:00,000 savepoint created & job cancelled >>>>>>> >>>>>>> To be noted, there was high backpressure after restoring from >>>>>>> savepoint until the stream caught up with the kafka offsets. Although, >>>>>>> our >>>>>>> job uses assign timestamps & watermarks on the flink kafka consumer >>>>>>> itself, >>>>>>> so event time of all partitions is synchronized. As expected, we don't >>>>>>> get >>>>>>> any late data in the late data side output. >>>>>>> >>>>>>> From this we can see that the missing ids are processed by the >>>>>>> reducer, but they must get lost somewhere before the 24-hour window is >>>>>>> triggered. >>>>>>> >>>>>>> I think it's worth mentioning once more that the stream doesn't miss >>>>>>> any ids if we let it's running without interruptions / state restoring. >>>>>>> >>>>>>> What's next? >>>>>>> >>>>>>> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin < >>>>>>> and...@data-artisans.com> wrote: >>>>>>> >>>>>>>> Hi Juho, >>>>>>>> >>>>>>>> > only when the 24-hour window triggers, BucketingSink gets a burst >>>>>>>> of input >>>>>>>> >>>>>>>> This is of course totally true, my understanding is the same. We >>>>>>>> cannot exclude problem there for sure, just savepoints are used a lot >>>>>>>> w/o >>>>>>>> problem reports and BucketingSink is known to be problematic with s3. >>>>>>>> That >>>>>>>> is why, I asked you: >>>>>>>> >>>>>>>> > You also wrote that the timestamps of lost event are 'probably' >>>>>>>> around the time of the savepoint, if it is not yet for sure I would >>>>>>>> also >>>>>>>> check it. >>>>>>>> >>>>>>>> Although, bucketing sink might loose any data at the end of the day >>>>>>>> (also from the middle). The fact, that it is always around the time of >>>>>>>> taking a savepoint and not random, is surely suspicious and possible >>>>>>>> savepoint failures need to be investigated. >>>>>>>> >>>>>>>> Regarding the s3 problem, s3 doc says: >>>>>>>> >>>>>>>> > The caveat is that if you make a HEAD or GET request to the key >>>>>>>> name (to find if the object exists) before creating the object, Amazon >>>>>>>> S3 >>>>>>>> provides 'eventual consistency' for read-after-write. >>>>>>>> >>>>>>>> The algorithm you suggest is how it is roughly implemented now >>>>>>>> (BucketingSink.openNewPartFile). My understanding is that >>>>>>>> 'eventual consistency’ means that even if you just created file (its >>>>>>>> name >>>>>>>> is key) it can be that you do not get it in the list or exists (HEAD) >>>>>>>> returns false and you risk to rewrite the previous part. >>>>>>>> >>>>>>>> The BucketingSink was designed for a standard file system. s3 is >>>>>>>> used over a file system wrapper atm but does not always provide normal >>>>>>>> file >>>>>>>> system guarantees. See also last example in [1]. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Andrey >>>>>>>> >>>>>>>> [1] >>>>>>>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82 >>>>>>>> >>>>>>>> On 29 Aug 2018, at 12:11, Juho Autio <juho.au...@rovio.com> wrote: >>>>>>>> >>>>>>>> Andrey, thank you very much for the debugging suggestions, I'll try >>>>>>>> them. >>>>>>>> >>>>>>>> In the meanwhile two more questions, please: >>>>>>>> >>>>>>>> > Just to keep in mind this problem with s3 and exclude it for >>>>>>>> sure. I would also check whether the size of missing events is around >>>>>>>> the >>>>>>>> batch size of BucketingSink or not. >>>>>>>> >>>>>>>> Fair enough, but I also want to focus on debugging the most >>>>>>>> probable subject first. So what do you think about this – true or >>>>>>>> false: >>>>>>>> only when the 24-hour window triggers, BucketinSink gets a burst of >>>>>>>> input. >>>>>>>> Around the state restoring point (middle of the day) it doesn't get any >>>>>>>> input, so it can't lose anything either. Isn't this true, or have I >>>>>>>> totally >>>>>>>> missed how Flink works in triggering window results? I would not expect >>>>>>>> there to be any optimization that speculatively triggers early results >>>>>>>> of a >>>>>>>> regular time window to the downstream operators. >>>>>>>> >>>>>>>> > The old BucketingSink has in general problem with s3. Internally >>>>>>>> BucketingSink queries s3 as a file system to list already written file >>>>>>>> parts (batches) and determine index of the next part to start. Due to >>>>>>>> eventual consistency of checking file existence in s3 [1], the >>>>>>>> BucketingSink can rewrite the previously written part and basically >>>>>>>> loose >>>>>>>> it. >>>>>>>> >>>>>>>> I was wondering, what does S3's "read-after-write consistency" >>>>>>>> (mentioned on the page you linked) actually mean. It seems that this >>>>>>>> might >>>>>>>> be possible: >>>>>>>> - LIST keys, find current max index >>>>>>>> - choose next index = max + 1 >>>>>>>> - HEAD next index: if it exists, keep adding + 1 until key doesn't >>>>>>>> exist on S3 >>>>>>>> >>>>>>>> But definitely sounds easier if a sink keeps track of files in a >>>>>>>> way that's guaranteed to be consistent. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Juho >>>>>>>> >>>>>>>> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin < >>>>>>>> and...@data-artisans.com> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> true, StreamingFileSink does not support s3 in 1.6.0, it is >>>>>>>>> planned for the next 1.7 release, sorry for confusion. >>>>>>>>> The old BucketingSink has in general problem with s3. >>>>>>>>> Internally BucketingSink queries s3 as a file system >>>>>>>>> to list already written file parts (batches) and determine index >>>>>>>>> of the next part to start. Due to eventual consistency of checking >>>>>>>>> file >>>>>>>>> existence in s3 [1], the BucketingSink can rewrite the previously >>>>>>>>> written >>>>>>>>> part and basically loose it. It should be fixed for StreamingFileSink >>>>>>>>> in >>>>>>>>> 1.7 where Flink keeps its own track of written parts and does not >>>>>>>>> rely on >>>>>>>>> s3 as a file system. >>>>>>>>> I also include Kostas, he might add more details. >>>>>>>>> >>>>>>>>> Just to keep in mind this problem with s3 and exclude it for sure >>>>>>>>> I would also check whether the size of missing events is around the >>>>>>>>> batch >>>>>>>>> size of BucketingSink or not. You also wrote that the timestamps of >>>>>>>>> lost >>>>>>>>> event are 'probably' around the time of the savepoint, if it is not >>>>>>>>> yet for >>>>>>>>> sure I would also check it. >>>>>>>>> >>>>>>>>> Have you already checked the log files of job manager and task >>>>>>>>> managers for the job running before and after the restore from the >>>>>>>>> check >>>>>>>>> point? Is everything successful there, no errors, relevant warnings or >>>>>>>>> exceptions? >>>>>>>>> >>>>>>>>> As the next step, I would suggest to log all encountered events in >>>>>>>>> DistinctFunction.reduce if possible for production data and check >>>>>>>>> whether >>>>>>>>> the missed events are eventually processed before or after the >>>>>>>>> savepoint. >>>>>>>>> The following log message indicates a border between the events that >>>>>>>>> should >>>>>>>>> be included into the savepoint (logged before) or not: >>>>>>>>> “{} ({}, synchronous part) in thread {} took {} ms” (template) >>>>>>>>> Also check if the savepoint has been overall completed: >>>>>>>>> "{} ({}, asynchronous part) in thread {} took {} ms." >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Andrey >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html >>>>>>>>> >>>>>>>>> On 24 Aug 2018, at 20:41, Juho Autio <juho.au...@rovio.com> wrote: >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Using StreamingFileSink is not a convenient option for production >>>>>>>>> use for us as it doesn't support s3*. I could use StreamingFileSink >>>>>>>>> just to >>>>>>>>> verify, but I don't see much point in doing so. Please consider my >>>>>>>>> previous >>>>>>>>> comment: >>>>>>>>> >>>>>>>>> > I realized that BucketingSink must not play any role in this >>>>>>>>> problem. This is because only when the 24-hour window triggers, >>>>>>>>> BucketingSink gets a burst of input. Around the state restoring point >>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose >>>>>>>>> anything >>>>>>>>> either (right?). >>>>>>>>> >>>>>>>>> I could also use a kafka sink instead, but I can't imagine how >>>>>>>>> there could be any difference. It's very real that the sink doesn't >>>>>>>>> get any >>>>>>>>> input for a long time until the 24-hour window closes, and then it >>>>>>>>> quickly >>>>>>>>> writes out everything because it's not that much data eventually for >>>>>>>>> the >>>>>>>>> distinct values. >>>>>>>>> >>>>>>>>> Any ideas for debugging what's happening around the savepoint & >>>>>>>>> restoration time? >>>>>>>>> >>>>>>>>> *) I actually implemented StreamingFileSink as an alternative >>>>>>>>> sink. This was before I came to realize that most likely the sink >>>>>>>>> component >>>>>>>>> has nothing to do with the data loss problem. I tried it with s3n:// >>>>>>>>> path >>>>>>>>> just to see an exception being thrown. In the source code I indeed >>>>>>>>> then >>>>>>>>> found an explicit check for the target path scheme to be "hdfs://". >>>>>>>>> >>>>>>>>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin < >>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>> >>>>>>>>>> Ok, I think before further debugging the window reduced state, >>>>>>>>>> could you try the new ‘StreamingFileSink’ [1] introduced in Flink >>>>>>>>>> 1.6.0 instead of the previous 'BucketingSink’? >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Andrey >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html >>>>>>>>>> >>>>>>>>>> On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Yes, sorry for my confusing comment. I just meant that it seems >>>>>>>>>> like there's a bug somewhere now that the output is missing some >>>>>>>>>> data. >>>>>>>>>> >>>>>>>>>> > I would wait and check the actual output in s3 because it is >>>>>>>>>> the main result of the job >>>>>>>>>> >>>>>>>>>> Yes, and that's what I have already done. There seems to be >>>>>>>>>> always some data loss with the production data volumes, if the job >>>>>>>>>> has been >>>>>>>>>> restarted on that day. >>>>>>>>>> >>>>>>>>>> Would you have any suggestions for how to debug this further? >>>>>>>>>> >>>>>>>>>> Many thanks for stepping in. >>>>>>>>>> >>>>>>>>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin < >>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Juho, >>>>>>>>>>> >>>>>>>>>>> So it is a per key deduplication job. >>>>>>>>>>> >>>>>>>>>>> Yes, I would wait and check the actual output in s3 because it >>>>>>>>>>> is the main result of the job and >>>>>>>>>>> >>>>>>>>>>> > The late data around the time of taking savepoint might be not >>>>>>>>>>> included into the savepoint but it should be behind the snapshotted >>>>>>>>>>> offset >>>>>>>>>>> in Kafka. >>>>>>>>>>> >>>>>>>>>>> is not a bug, it is a possible behaviour. >>>>>>>>>>> >>>>>>>>>>> The savepoint is a snapshot of the data in transient which is >>>>>>>>>>> already consumed from Kafka. >>>>>>>>>>> Basically the full contents of the window result is split >>>>>>>>>>> between the savepoint and what can come after the savepoint'ed >>>>>>>>>>> offset in >>>>>>>>>>> Kafka but before the window result is written into s3. >>>>>>>>>>> >>>>>>>>>>> Allowed lateness should not affect it, I am just saying that the >>>>>>>>>>> final result in s3 should include all records after it. >>>>>>>>>>> This is what should be guaranteed but not the contents of the >>>>>>>>>>> intermediate savepoint. >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Andrey >>>>>>>>>>> >>>>>>>>>>> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Thanks for your answer! >>>>>>>>>>> >>>>>>>>>>> I check for the missed data from the final output on s3. So I >>>>>>>>>>> wait until the next day, then run the same thing re-implemented in >>>>>>>>>>> batch, >>>>>>>>>>> and compare the output. >>>>>>>>>>> >>>>>>>>>>> > The late data around the time of taking savepoint might be not >>>>>>>>>>> included into the savepoint but it should be behind the snapshotted >>>>>>>>>>> offset >>>>>>>>>>> in Kafka. >>>>>>>>>>> >>>>>>>>>>> Yes, I would definitely expect that. It seems like there's a bug >>>>>>>>>>> somewhere. >>>>>>>>>>> >>>>>>>>>>> > Then it should just come later after the restore and should be >>>>>>>>>>> reduced within the allowed lateness into the final result which is >>>>>>>>>>> saved >>>>>>>>>>> into s3. >>>>>>>>>>> >>>>>>>>>>> Well, as far as I know, allowed lateness doesn't play any role >>>>>>>>>>> here, because I started running the job with allowedLateness=0, and >>>>>>>>>>> still >>>>>>>>>>> get the data loss, while my late data output doesn't receive >>>>>>>>>>> anything. >>>>>>>>>>> >>>>>>>>>>> > Also, is this `DistinctFunction.reduce` just an example or the >>>>>>>>>>> actual implementation, basically saving just one of records inside >>>>>>>>>>> the 24h >>>>>>>>>>> window in s3? then what is missing there? >>>>>>>>>>> >>>>>>>>>>> Yes, it's the actual implementation. Note that there's a keyBy >>>>>>>>>>> before the DistinctFunction. So there's one record for each key >>>>>>>>>>> (which is >>>>>>>>>>> the combination of a couple of fields). In practice I've seen that >>>>>>>>>>> we're >>>>>>>>>>> missing ~2000-4000 elements on each restore, and the total output is >>>>>>>>>>> obviously much more than that. >>>>>>>>>>> >>>>>>>>>>> Here's the full code for the key selector: >>>>>>>>>>> >>>>>>>>>>> public class MapKeySelector implements >>>>>>>>>>> KeySelector<Map<String,String>, Object> { >>>>>>>>>>> >>>>>>>>>>> private final String[] fields; >>>>>>>>>>> >>>>>>>>>>> public MapKeySelector(String... fields) { >>>>>>>>>>> this.fields = fields; >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> @Override >>>>>>>>>>> public Object getKey(Map<String, String> event) throws >>>>>>>>>>> Exception { >>>>>>>>>>> Tuple key = >>>>>>>>>>> Tuple.getTupleClass(fields.length).newInstance(); >>>>>>>>>>> for (int i = 0; i < fields.length; i++) { >>>>>>>>>>> key.setField(event.getOrDefault(fields[i], ""), i); >>>>>>>>>>> } >>>>>>>>>>> return key; >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> And a more exact example on how it's used: >>>>>>>>>>> >>>>>>>>>>> .keyBy(new MapKeySelector("ID", "PLAYER_ID", >>>>>>>>>>> "FIELD", "KEY_NAME", "KEY_VALUE")) >>>>>>>>>>> .timeWindow(Time.days(1)) >>>>>>>>>>> .reduce(new DistinctFunction()) >>>>>>>>>>> >>>>>>>>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin < >>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Juho, >>>>>>>>>>>> >>>>>>>>>>>> Where exactly does the data miss? When do you notice that? >>>>>>>>>>>> Do you check it: >>>>>>>>>>>> - debugging `DistinctFunction.reduce` right after resume in the >>>>>>>>>>>> middle of the day >>>>>>>>>>>> or >>>>>>>>>>>> - some distinct records miss in the final output >>>>>>>>>>>> of BucketingSink in s3 after window result is actually triggered >>>>>>>>>>>> and saved >>>>>>>>>>>> into s3 at the end of the day? is this the main output? >>>>>>>>>>>> >>>>>>>>>>>> The late data around the time of taking savepoint might be not >>>>>>>>>>>> included into the savepoint but it should be behind the >>>>>>>>>>>> snapshotted offset >>>>>>>>>>>> in Kafka. Then it should just come later after the restore and >>>>>>>>>>>> should be >>>>>>>>>>>> reduced within the allowed lateness into the final result which is >>>>>>>>>>>> saved >>>>>>>>>>>> into s3. >>>>>>>>>>>> >>>>>>>>>>>> Also, is this `DistinctFunction.reduce` just an example or the >>>>>>>>>>>> actual implementation, basically saving just one of records inside >>>>>>>>>>>> the 24h >>>>>>>>>>>> window in s3? then what is missing there? >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Andrey >>>>>>>>>>>> >>>>>>>>>>>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> I changed to allowedLateness=0, no change, still missing data >>>>>>>>>>>> when restoring from savepoint. >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio < >>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I realized that BucketingSink must not play any role in this >>>>>>>>>>>>> problem. This is because only when the 24-hour window triggers, >>>>>>>>>>>>> BucketinSink gets a burst of input. Around the state restoring >>>>>>>>>>>>> point >>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose >>>>>>>>>>>>> anything >>>>>>>>>>>>> either (right?). >>>>>>>>>>>>> >>>>>>>>>>>>> I will next try removing the allowedLateness entirely from the >>>>>>>>>>>>> equation. >>>>>>>>>>>>> >>>>>>>>>>>>> In the meanwhile, please let me know if you have any >>>>>>>>>>>>> suggestions for debugging the lost data, for example what logs to >>>>>>>>>>>>> enable. >>>>>>>>>>>>> >>>>>>>>>>>>> We use FlinkKafkaConsumer010 btw. Are there any known issues >>>>>>>>>>>>> with that, that could contribute to lost data when restoring a >>>>>>>>>>>>> savepoint? >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio < >>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Some data is silently lost on my Flink stream job when state >>>>>>>>>>>>>> is restored from a savepoint. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Do you have any debugging hints to find out where exactly the >>>>>>>>>>>>>> data gets dropped? >>>>>>>>>>>>>> >>>>>>>>>>>>>> My job gathers distinct values using a 24-hour window. It >>>>>>>>>>>>>> doesn't have any custom state management. >>>>>>>>>>>>>> >>>>>>>>>>>>>> When I cancel the job with savepoint and restore from that >>>>>>>>>>>>>> savepoint, some data is missed. It seems to be losing just a >>>>>>>>>>>>>> small amount >>>>>>>>>>>>>> of data. The event time of lost data is probably around the time >>>>>>>>>>>>>> of >>>>>>>>>>>>>> savepoint. In other words the rest of the time window is not >>>>>>>>>>>>>> entirely >>>>>>>>>>>>>> missed – collection works correctly also for (most of the) >>>>>>>>>>>>>> events that come >>>>>>>>>>>>>> in after restoring. >>>>>>>>>>>>>> >>>>>>>>>>>>>> When the job processes a full 24-hour window without >>>>>>>>>>>>>> interruptions it doesn't miss anything. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Usually the problem doesn't happen in test environments that >>>>>>>>>>>>>> have smaller parallelism and smaller data volumes. But in >>>>>>>>>>>>>> production >>>>>>>>>>>>>> volumes the job seems to be consistently missing at least >>>>>>>>>>>>>> something on >>>>>>>>>>>>>> every restore. >>>>>>>>>>>>>> >>>>>>>>>>>>>> This issue has consistently happened since the job was >>>>>>>>>>>>>> initially created. It was at first run on an older version of >>>>>>>>>>>>>> Flink >>>>>>>>>>>>>> 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm wondering if this could be for example some >>>>>>>>>>>>>> synchronization issue between the kafka consumer offsets vs. >>>>>>>>>>>>>> what's been >>>>>>>>>>>>>> written by BucketingSink? >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. Job content, simplified >>>>>>>>>>>>>> >>>>>>>>>>>>>> kafkaStream >>>>>>>>>>>>>> .flatMap(new ExtractFieldsFunction()) >>>>>>>>>>>>>> .keyBy(new MapKeySelector(1, 2, 3, 4)) >>>>>>>>>>>>>> .timeWindow(Time.days(1)) >>>>>>>>>>>>>> .allowedLateness(allowedLateness) >>>>>>>>>>>>>> .sideOutputLateData(lateDataTag) >>>>>>>>>>>>>> .reduce(new DistinctFunction()) >>>>>>>>>>>>>> .addSink(sink) >>>>>>>>>>>>>> // use a fixed number of output partitions >>>>>>>>>>>>>> .setParallelism(8)) >>>>>>>>>>>>>> >>>>>>>>>>>>>> /** >>>>>>>>>>>>>> * Usage: .keyBy("the", "distinct", "fields").reduce(new >>>>>>>>>>>>>> DistinctFunction()) >>>>>>>>>>>>>> */ >>>>>>>>>>>>>> public class DistinctFunction implements >>>>>>>>>>>>>> ReduceFunction<java.util.Map<String, String>> { >>>>>>>>>>>>>> @Override >>>>>>>>>>>>>> public Map<String, String> reduce(Map<String, String> >>>>>>>>>>>>>> value1, Map<String, String> value2) { >>>>>>>>>>>>>> return value1; >>>>>>>>>>>>>> } >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2. State configuration >>>>>>>>>>>>>> >>>>>>>>>>>>>> boolean enableIncrementalCheckpointing = true; >>>>>>>>>>>>>> String statePath = "s3n://bucket/savepoints"; >>>>>>>>>>>>>> new RocksDBStateBackend(statePath, >>>>>>>>>>>>>> enableIncrementalCheckpointing); >>>>>>>>>>>>>> >>>>>>>>>>>>>> Checkpointing Mode Exactly Once >>>>>>>>>>>>>> Interval 1m 0s >>>>>>>>>>>>>> Timeout 10m 0s >>>>>>>>>>>>>> Minimum Pause Between Checkpoints 1m 0s >>>>>>>>>>>>>> Maximum Concurrent Checkpoints 1 >>>>>>>>>>>>>> Persist Checkpoints Externally Enabled (retain on >>>>>>>>>>>>>> cancellation) >>>>>>>>>>>>>> >>>>>>>>>>>>>> 3. BucketingSink configuration >>>>>>>>>>>>>> >>>>>>>>>>>>>> We use BucketingSink, I don't think there's anything special >>>>>>>>>>>>>> here, if not the fact that we're writing to S3. >>>>>>>>>>>>>> >>>>>>>>>>>>>> String outputPath = "s3://bucket/output"; >>>>>>>>>>>>>> BucketingSink<Map<String, String>> sink = new >>>>>>>>>>>>>> BucketingSink<Map<String, String>>(outputPath) >>>>>>>>>>>>>> .setBucketer(new ProcessdateBucketer()) >>>>>>>>>>>>>> .setBatchSize(batchSize) >>>>>>>>>>>>>> >>>>>>>>>>>>>> .setInactiveBucketThreshold(inactiveBucketThreshold) >>>>>>>>>>>>>> >>>>>>>>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval); >>>>>>>>>>>>>> sink.setWriter(new IdJsonWriter()); >>>>>>>>>>>>>> >>>>>>>>>>>>>> 4. Kafka & event time >>>>>>>>>>>>>> >>>>>>>>>>>>>> My flink job reads the data from Kafka, using a >>>>>>>>>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to >>>>>>>>>>>>>> synchronize watermarks accross all kafka partitions. We also >>>>>>>>>>>>>> write late >>>>>>>>>>>>>> data to side output, but nothing is written there – if it would, >>>>>>>>>>>>>> it could >>>>>>>>>>>>>> explain missed data in the main output (I'm also sure that our >>>>>>>>>>>>>> late data >>>>>>>>>>>>>> writing works, because we previously had some actual late data >>>>>>>>>>>>>> which ended >>>>>>>>>>>>>> up there). >>>>>>>>>>>>>> >>>>>>>>>>>>>> 5. allowedLateness >>>>>>>>>>>>>> >>>>>>>>>>>>>> It may be or may not be relevant that I have also enabled >>>>>>>>>>>>>> allowedLateness with 1 minute lateness on the 24-hour window: >>>>>>>>>>>>>> >>>>>>>>>>>>>> If that makes sense, I could try removing allowedLateness >>>>>>>>>>>>>> entirely? That would be just to rule out that Flink doesn't have >>>>>>>>>>>>>> a bug >>>>>>>>>>>>>> that's related to restoring state in combination with the >>>>>>>>>>>>>> allowedLateness >>>>>>>>>>>>>> feature. After all, all of our data should be in a good enough >>>>>>>>>>>>>> order to not >>>>>>>>>>>>>> be late, given the max out of orderness used on kafka consumer >>>>>>>>>>>>>> timestamp >>>>>>>>>>>>>> extractor. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thank you in advance! >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >> >> > >