I was glad to find that bravo had now been updated to support installing bravo to a local maven repo.
I was able to load a checkpoint created by my job, thanks to the example provided in bravo README, but I'm still missing the essential piece. My code was: OperatorStateReader reader = new OperatorStateReader(env2, savepoint, "DistinctFunction"); DontKnowWhatTypeThisIs reducingState = reader.readKeyedStates(what should I put here?); I don't know how to read the values collected from reduce() calls in the state. Is there a way to access the reducing state of the window with bravo? I'm a bit confused how this works, because when I check with debugger, flink internally uses a ReducingStateDescriptor with name=window-contents, but still reading operator state for "DistinctFunction" didn't at least throw an exception ("window-contents" threw – obviously there's no operator by that name). Cheers, Juho On Mon, Oct 15, 2018 at 2:25 PM Juho Autio <juho.au...@rovio.com> wrote: > Hi Stefan, > > Sorry but it doesn't seem immediately clear to me what's a good way to use > https://github.com/king/bravo. > > How are people using it? Would you for example modify build.gradle somehow > to publish the bravo as a library locally/internally? Or add code directly > in the bravo project (locally) and run it from there (using an IDE, for > example)? Also it doesn't seem like the bravo gradle project supports > building a flink job jar, but if it does, how do I do it? > > Thanks. > > On Thu, Oct 4, 2018 at 9:30 PM Juho Autio <juho.au...@rovio.com> wrote: > >> Good then, I'll try to analyze the savepoints with Bravo. Thanks! >> >> > How would you assume that backpressure would influence your updates? >> Updates to each local state still happen event-by-event, in a single >> reader/writing thread. >> >> Sure, just an ignorant guess by me. I'm not familiar with most of Flink's >> internals. Any way high backpressure is not a seen on this job after it has >> caught up the lag, so at I thought it would be worth mentioning. >> >> On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter < >> s.rich...@data-artisans.com> wrote: >> >>> Hi, >>> >>> Am 04.10.2018 um 16:08 schrieb Juho Autio <juho.au...@rovio.com>: >>> >>> > you could take a look at Bravo [1] to query your savepoints and to >>> check if the state in the savepoint complete w.r.t your expectations >>> >>> Thanks. I'm not 100% if this is the case, but to me it seemed like the >>> missed ids were being logged by the reducer soon after the job had started >>> (after restoring a savepoint). But on the other hand, after that I also >>> made another savepoint & restored that, so what I could check is: does that >>> next savepoint have the missed ids that were logged (a couple of minutes >>> before the savepoint was created, so there should've been more than enough >>> time to add them to the state before the savepoint was triggered) or not. >>> Any way, if I would be able to verify with Bravo that the ids are missing >>> from the savepoint (even though reduced logged that it saw them), would >>> that help in figuring out where they are lost? Is there some major >>> difference compared to just looking at the final output after window has >>> been triggered? >>> >>> >>> >>> I think that makes a difference. For example, you can investigate if >>> there is a state loss or a problem with the windowing. In the savepoint you >>> could see which keys exists and to which windows they are assigned. Also >>> just to make sure there is no misunderstanding: only elements that are in >>> the state at the start of a savepoint are expected to be part of the >>> savepoint; all elements between start and completion of the savepoint are >>> not expected to be part of the savepoint. >>> >>> >>> > I also doubt that the problem is about backpressure after restore, >>> because the job will only continue running after the state restore is >>> already completed. >>> >>> Yes, I'm not suspecting that the state restoring would be the problem >>> either. My concern was about backpressure possibly messing with the updates >>> of reducing state? I would tend to suspect that updating the state >>> consistently is what fails, where heavy load / backpressure might be a >>> factor. >>> >>> >>> >>> How would you assume that backpressure would influence your updates? >>> Updates to each local state still happen event-by-event, in a single >>> reader/writing thread. >>> >>> >>> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter < >>> s.rich...@data-artisans.com> wrote: >>> >>>> Hi, >>>> >>>> you could take a look at Bravo [1] to query your savepoints and to >>>> check if the state in the savepoint complete w.r.t your expectations. I >>>> somewhat doubt that there is a general problem with the state/savepoints >>>> because many users are successfully running it on a large state and I am >>>> not aware of any data loss problems, but nothing is impossible. What the >>>> savepoint does is also straight forward: iterate a db snapshot and write >>>> all key/value pairs to disk, so all data that was in the db at the time of >>>> the savepoint, should show up. I also doubt that the problem is about >>>> backpressure after restore, because the job will only continue running >>>> after the state restore is already completed. Did you check if you are >>>> using exactly-one-semantics or at-least-once semantics? Also did you check >>>> that the kafka consumer start position is configured properly [2]? Are >>>> watermarks generated as expected after restore? >>>> >>>> One more unrelated high-level comment that I have: for a granularity of >>>> 24h windows, I wonder if it would not make sense to use a batch job >>>> instead? >>>> >>>> Best, >>>> Stefan >>>> >>>> [1] https://github.com/king/bravo >>>> [2] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration >>>> >>>> Am 04.10.2018 um 14:53 schrieb Juho Autio <juho.au...@rovio.com>: >>>> >>>> Thanks for the suggestions! >>>> >>>> > In general, it would be tremendously helpful to have a minimal >>>> working example which allows to reproduce the problem. >>>> >>>> Definitely. The problem with reproducing has been that this only seems >>>> to happen in the bigger production data volumes. >>>> >>>> That's why I'm hoping to find a way to debug this with the production >>>> data. With that it seems to consistently cause some misses every time the >>>> job is killed/restored. >>>> >>>> > check if it happens for shorter windows, like 1h etc >>>> >>>> What would be the benefit of that compared to 24h window? >>>> >>>> > simplify the job to not use a reduce window but simply a time window >>>> which outputs the window events. Then counting the input and output events >>>> should allow you to verify the results. If you are not seeing missing >>>> events, then it could have something to do with the reducing state used in >>>> the reduce function. >>>> >>>> Hm, maybe, but not sure how useful that would be, because it wouldn't >>>> yet prove that it's related to reducing, because not having a reduce >>>> function could also mean smaller load on the job, which might alone be >>>> enough to make the problem not manifest. >>>> >>>> Is there a way to debug what goes into the reducing state (including >>>> what gets removed or overwritten and what restored), if that makes sense..? >>>> Maybe some suitable logging could be used to prove that the lost data is >>>> written to the reducing state (or at least asked to be written), but not >>>> found any more when the window closes and state is flushed? >>>> >>>> On configuration once more, we're using RocksDB state backend with >>>> asynchronous incremental checkpointing. The state is restored from >>>> savepoints though, we haven't been using those checkpoints in these tests >>>> (although they could be used in case of crashes – but we haven't had those >>>> now). >>>> >>>> On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann <trohrm...@apache.org> >>>> wrote: >>>> >>>>> Hi Juho, >>>>> >>>>> another idea to further narrow down the problem could be to simplify >>>>> the job to not use a reduce window but simply a time window which outputs >>>>> the window events. Then counting the input and output events should allow >>>>> you to verify the results. If you are not seeing missing events, then it >>>>> could have something to do with the reducing state used in the reduce >>>>> function. >>>>> >>>>> In general, it would be tremendously helpful to have a minimal working >>>>> example which allows to reproduce the problem. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin < >>>>> and...@data-artisans.com> wrote: >>>>> >>>>>> Hi Juho, >>>>>> >>>>>> can you try to reduce the job to minimal reproducible example and >>>>>> share the job and input? >>>>>> >>>>>> For example: >>>>>> - some simple records as input, e.g. tuples of primitive types saved >>>>>> as cvs >>>>>> - minimal deduplication job which processes them and misses records >>>>>> - check if it happens for shorter windows, like 1h etc >>>>>> - setup which you use for the job, ideally locally reproducible or >>>>>> cloud >>>>>> >>>>>> Best, >>>>>> Andrey >>>>>> >>>>>> On 4 Oct 2018, at 11:13, Juho Autio <juho.au...@rovio.com> wrote: >>>>>> >>>>>> Sorry to insist, but we seem to be blocked for any serious usage of >>>>>> state in Flink if we can't rely on it to not miss data in case of >>>>>> restore. >>>>>> >>>>>> Would anyone have suggestions for how to troubleshoot this? So far I >>>>>> have verified with DEBUG logs that our reduce function gets to process >>>>>> also >>>>>> the data that is missing from window output. >>>>>> >>>>>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <juho.au...@rovio.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Andrey, >>>>>>> >>>>>>> To rule out for good any questions about sink behaviour, the job was >>>>>>> killed and started with an additional Kafka sink. >>>>>>> >>>>>>> The same number of ids were missed in both outputs: KafkaSink & >>>>>>> BucketingSink. >>>>>>> >>>>>>> I wonder what would be the next steps in debugging? >>>>>>> >>>>>>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <juho.au...@rovio.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks, Andrey. >>>>>>>> >>>>>>>> > so it means that the savepoint does not loose at least some >>>>>>>> dropped records. >>>>>>>> >>>>>>>> I'm not sure what you mean by that? I mean, it was known from the >>>>>>>> beginning, that not everything is lost before/after restoring a >>>>>>>> savepoint, >>>>>>>> just some records around the time of restoration. It's not 100% clear >>>>>>>> whether records are lost before making a savepoint or after restoring >>>>>>>> it. >>>>>>>> Although, based on the new DEBUG logs it seems more like losing some >>>>>>>> records that are seen ~soon after restoring. It seems like Flink would >>>>>>>> be >>>>>>>> somehow confused either about the restored state vs. new inserts to >>>>>>>> state. >>>>>>>> This could also be somehow linked to the high back pressure on the >>>>>>>> kafka >>>>>>>> source while the stream is catching up. >>>>>>>> >>>>>>>> > If it is feasible for your setup, I suggest to insert one more >>>>>>>> map function after reduce and before sink. >>>>>>>> > etc. >>>>>>>> >>>>>>>> Isn't that the same thing that we discussed before? Nothing is sent >>>>>>>> to BucketingSink before the window closes, so I don't see how it would >>>>>>>> make >>>>>>>> any difference if we replace the BucketingSink with a map function or >>>>>>>> another sink type. We don't create or restore savepoints during the >>>>>>>> time >>>>>>>> when BucketingSink gets input or has open buckets – that happens at a >>>>>>>> much >>>>>>>> later time of day. I would focus on figuring out why the records are >>>>>>>> lost >>>>>>>> while the window is open. But I don't know how to do that. Would you >>>>>>>> have >>>>>>>> any additional suggestions? >>>>>>>> >>>>>>>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin < >>>>>>>> and...@data-artisans.com> wrote: >>>>>>>> >>>>>>>>> Hi Juho, >>>>>>>>> >>>>>>>>> so it means that the savepoint does not loose at least some >>>>>>>>> dropped records. >>>>>>>>> >>>>>>>>> If it is feasible for your setup, I suggest to insert one more map >>>>>>>>> function after reduce and before sink. >>>>>>>>> The map function should be called right after window is triggered >>>>>>>>> but before flushing to s3. >>>>>>>>> The result of reduce (deduped record) could be logged there. >>>>>>>>> This should allow to check whether the processed distinct records >>>>>>>>> were buffered in the state after the restoration from the savepoint >>>>>>>>> or not. >>>>>>>>> If they were buffered we should see that there was an attempt to >>>>>>>>> write them >>>>>>>>> to the sink from the state. >>>>>>>>> >>>>>>>>> Another suggestion is to try to write records to some other sink >>>>>>>>> or to both. >>>>>>>>> E.g. if you can access file system of workers, maybe just into >>>>>>>>> local files and check whether the records are also dropped there. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Andrey >>>>>>>>> >>>>>>>>> On 20 Sep 2018, at 15:37, Juho Autio <juho.au...@rovio.com> wrote: >>>>>>>>> >>>>>>>>> Hi Andrey! >>>>>>>>> >>>>>>>>> I was finally able to gather the DEBUG logs that you suggested. In >>>>>>>>> short, the reducer logged that it processed at least some of the ids >>>>>>>>> that >>>>>>>>> were missing from the output. >>>>>>>>> >>>>>>>>> "At least some", because I didn't have the job running with DEBUG >>>>>>>>> logs for the full 24-hour window period. So I was only able to look >>>>>>>>> up if I >>>>>>>>> can find *some* of the missing ids in the DEBUG logs. Which I did >>>>>>>>> indeed. >>>>>>>>> >>>>>>>>> I changed the DistinctFunction.java to do this: >>>>>>>>> >>>>>>>>> @Override >>>>>>>>> public Map<String, String> reduce(Map<String, String> value1, >>>>>>>>> Map<String, String> value2) { >>>>>>>>> LOG.debug("DistinctFunction.reduce returns: {}={}", >>>>>>>>> value1.get("field"), value1.get("id")); >>>>>>>>> return value1; >>>>>>>>> } >>>>>>>>> >>>>>>>>> Then: >>>>>>>>> >>>>>>>>> vi flink-1.6.0/conf/log4j.properties >>>>>>>>> >>>>>>>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG >>>>>>>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG >>>>>>>>> >>>>>>>>> Then I ran the following kind of test: >>>>>>>>> >>>>>>>>> - Cancelled the on-going job with savepoint created at ~Sep 18 >>>>>>>>> 08:35 UTC 2018 >>>>>>>>> - Started a new cluster & job with DEBUG enabled at ~09:13, >>>>>>>>> restored from that previous cluster's savepoint >>>>>>>>> - Ran until caught up offsets >>>>>>>>> - Cancelled the job with a new savepoint >>>>>>>>> - Started a new job _without_ DEBUG, which restored the new >>>>>>>>> savepoint, let it keep running so that it will eventually write the >>>>>>>>> output >>>>>>>>> >>>>>>>>> Then on the next day, after results had been flushed when the >>>>>>>>> 24-hour window closed, I compared the results again with a batch >>>>>>>>> version's >>>>>>>>> output. And found some missing ids as usual. >>>>>>>>> >>>>>>>>> I drilled down to one specific missing id (I'm replacing the >>>>>>>>> actual value with AN12345 below), which was not found in the stream >>>>>>>>> output, >>>>>>>>> but was found in batch output & flink DEBUG logs. >>>>>>>>> >>>>>>>>> Related to that id, I gathered the following information: >>>>>>>>> >>>>>>>>> 2018-09-18~09:13:21,000 job started & savepoint is restored >>>>>>>>> >>>>>>>>> 2018-09-18 09:14:29,085 missing id is processed for the first >>>>>>>>> time, proved by this log line: >>>>>>>>> 2018-09-18 09:14:29,085 DEBUG >>>>>>>>> com.rovio.ds.flink.uniqueid.DistinctFunction - >>>>>>>>> DistinctFunction.reduce returns: s.aid1=AN12345 >>>>>>>>> >>>>>>>>> 2018-09-18 09:15:14,264 first synchronous part of checkpoint >>>>>>>>> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint >>>>>>>>> >>>>>>>>> ( >>>>>>>>> more occurrences of checkpoints (~1 min checkpointing time + ~1 >>>>>>>>> min delay before next) >>>>>>>>> / >>>>>>>>> more occurrences of DistinctFunction.reduce >>>>>>>>> ) >>>>>>>>> >>>>>>>>> 2018-09-18 09:23:45,053 missing id is processed for the last time >>>>>>>>> >>>>>>>>> 2018-09-18~10:20:00,000 savepoint created & job cancelled >>>>>>>>> >>>>>>>>> To be noted, there was high backpressure after restoring from >>>>>>>>> savepoint until the stream caught up with the kafka offsets. >>>>>>>>> Although, our >>>>>>>>> job uses assign timestamps & watermarks on the flink kafka consumer >>>>>>>>> itself, >>>>>>>>> so event time of all partitions is synchronized. As expected, we >>>>>>>>> don't get >>>>>>>>> any late data in the late data side output. >>>>>>>>> >>>>>>>>> From this we can see that the missing ids are processed by the >>>>>>>>> reducer, but they must get lost somewhere before the 24-hour window is >>>>>>>>> triggered. >>>>>>>>> >>>>>>>>> I think it's worth mentioning once more that the stream doesn't >>>>>>>>> miss any ids if we let it's running without interruptions / state >>>>>>>>> restoring. >>>>>>>>> >>>>>>>>> What's next? >>>>>>>>> >>>>>>>>> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin < >>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Juho, >>>>>>>>>> >>>>>>>>>> > only when the 24-hour window triggers, BucketingSink gets a >>>>>>>>>> burst of input >>>>>>>>>> >>>>>>>>>> This is of course totally true, my understanding is the same. We >>>>>>>>>> cannot exclude problem there for sure, just savepoints are used a >>>>>>>>>> lot w/o >>>>>>>>>> problem reports and BucketingSink is known to be problematic with >>>>>>>>>> s3. That >>>>>>>>>> is why, I asked you: >>>>>>>>>> >>>>>>>>>> > You also wrote that the timestamps of lost event are 'probably' >>>>>>>>>> around the time of the savepoint, if it is not yet for sure I would >>>>>>>>>> also >>>>>>>>>> check it. >>>>>>>>>> >>>>>>>>>> Although, bucketing sink might loose any data at the end of the >>>>>>>>>> day (also from the middle). The fact, that it is always around the >>>>>>>>>> time of >>>>>>>>>> taking a savepoint and not random, is surely suspicious and possible >>>>>>>>>> savepoint failures need to be investigated. >>>>>>>>>> >>>>>>>>>> Regarding the s3 problem, s3 doc says: >>>>>>>>>> >>>>>>>>>> > The caveat is that if you make a HEAD or GET request to the key >>>>>>>>>> name (to find if the object exists) before creating the object, >>>>>>>>>> Amazon S3 >>>>>>>>>> provides 'eventual consistency' for read-after-write. >>>>>>>>>> >>>>>>>>>> The algorithm you suggest is how it is roughly implemented now >>>>>>>>>> (BucketingSink.openNewPartFile). My understanding is that >>>>>>>>>> 'eventual consistency’ means that even if you just created file (its >>>>>>>>>> name >>>>>>>>>> is key) it can be that you do not get it in the list or exists (HEAD) >>>>>>>>>> returns false and you risk to rewrite the previous part. >>>>>>>>>> >>>>>>>>>> The BucketingSink was designed for a standard file system. s3 is >>>>>>>>>> used over a file system wrapper atm but does not always provide >>>>>>>>>> normal file >>>>>>>>>> system guarantees. See also last example in [1]. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Andrey >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82 >>>>>>>>>> >>>>>>>>>> On 29 Aug 2018, at 12:11, Juho Autio <juho.au...@rovio.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Andrey, thank you very much for the debugging suggestions, I'll >>>>>>>>>> try them. >>>>>>>>>> >>>>>>>>>> In the meanwhile two more questions, please: >>>>>>>>>> >>>>>>>>>> > Just to keep in mind this problem with s3 and exclude it for >>>>>>>>>> sure. I would also check whether the size of missing events is >>>>>>>>>> around the >>>>>>>>>> batch size of BucketingSink or not. >>>>>>>>>> >>>>>>>>>> Fair enough, but I also want to focus on debugging the most >>>>>>>>>> probable subject first. So what do you think about this – true or >>>>>>>>>> false: >>>>>>>>>> only when the 24-hour window triggers, BucketinSink gets a burst of >>>>>>>>>> input. >>>>>>>>>> Around the state restoring point (middle of the day) it doesn't get >>>>>>>>>> any >>>>>>>>>> input, so it can't lose anything either. Isn't this true, or have I >>>>>>>>>> totally >>>>>>>>>> missed how Flink works in triggering window results? I would not >>>>>>>>>> expect >>>>>>>>>> there to be any optimization that speculatively triggers early >>>>>>>>>> results of a >>>>>>>>>> regular time window to the downstream operators. >>>>>>>>>> >>>>>>>>>> > The old BucketingSink has in general problem with s3. >>>>>>>>>> Internally BucketingSink queries s3 as a file system to list already >>>>>>>>>> written file parts (batches) and determine index of the next part to >>>>>>>>>> start. >>>>>>>>>> Due to eventual consistency of checking file existence in s3 [1], the >>>>>>>>>> BucketingSink can rewrite the previously written part and basically >>>>>>>>>> loose >>>>>>>>>> it. >>>>>>>>>> >>>>>>>>>> I was wondering, what does S3's "read-after-write consistency" >>>>>>>>>> (mentioned on the page you linked) actually mean. It seems that this >>>>>>>>>> might >>>>>>>>>> be possible: >>>>>>>>>> - LIST keys, find current max index >>>>>>>>>> - choose next index = max + 1 >>>>>>>>>> - HEAD next index: if it exists, keep adding + 1 until key >>>>>>>>>> doesn't exist on S3 >>>>>>>>>> >>>>>>>>>> But definitely sounds easier if a sink keeps track of files in a >>>>>>>>>> way that's guaranteed to be consistent. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Juho >>>>>>>>>> >>>>>>>>>> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin < >>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> true, StreamingFileSink does not support s3 in 1.6.0, it is >>>>>>>>>>> planned for the next 1.7 release, sorry for confusion. >>>>>>>>>>> The old BucketingSink has in general problem with s3. >>>>>>>>>>> Internally BucketingSink queries s3 as a file system >>>>>>>>>>> to list already written file parts (batches) and determine index >>>>>>>>>>> of the next part to start. Due to eventual consistency of checking >>>>>>>>>>> file >>>>>>>>>>> existence in s3 [1], the BucketingSink can rewrite the previously >>>>>>>>>>> written >>>>>>>>>>> part and basically loose it. It should be fixed for >>>>>>>>>>> StreamingFileSink in >>>>>>>>>>> 1.7 where Flink keeps its own track of written parts and does not >>>>>>>>>>> rely on >>>>>>>>>>> s3 as a file system. >>>>>>>>>>> I also include Kostas, he might add more details. >>>>>>>>>>> >>>>>>>>>>> Just to keep in mind this problem with s3 and exclude it for >>>>>>>>>>> sure I would also check whether the size of missing events is >>>>>>>>>>> around the >>>>>>>>>>> batch size of BucketingSink or not. You also wrote that the >>>>>>>>>>> timestamps of >>>>>>>>>>> lost event are 'probably' around the time of the savepoint, if it >>>>>>>>>>> is not >>>>>>>>>>> yet for sure I would also check it. >>>>>>>>>>> >>>>>>>>>>> Have you already checked the log files of job manager and task >>>>>>>>>>> managers for the job running before and after the restore from the >>>>>>>>>>> check >>>>>>>>>>> point? Is everything successful there, no errors, relevant warnings >>>>>>>>>>> or >>>>>>>>>>> exceptions? >>>>>>>>>>> >>>>>>>>>>> As the next step, I would suggest to log all encountered events >>>>>>>>>>> in DistinctFunction.reduce if possible for production data and check >>>>>>>>>>> whether the missed events are eventually processed before or after >>>>>>>>>>> the >>>>>>>>>>> savepoint. The following log message indicates a border between the >>>>>>>>>>> events >>>>>>>>>>> that should be included into the savepoint (logged before) or not: >>>>>>>>>>> “{} ({}, synchronous part) in thread {} took {} ms” (template) >>>>>>>>>>> Also check if the savepoint has been overall completed: >>>>>>>>>>> "{} ({}, asynchronous part) in thread {} took {} ms." >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Andrey >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html >>>>>>>>>>> >>>>>>>>>>> On 24 Aug 2018, at 20:41, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> Using StreamingFileSink is not a convenient option for >>>>>>>>>>> production use for us as it doesn't support s3*. I could use >>>>>>>>>>> StreamingFileSink just to verify, but I don't see much point in >>>>>>>>>>> doing so. >>>>>>>>>>> Please consider my previous comment: >>>>>>>>>>> >>>>>>>>>>> > I realized that BucketingSink must not play any role in this >>>>>>>>>>> problem. This is because only when the 24-hour window triggers, >>>>>>>>>>> BucketingSink gets a burst of input. Around the state restoring >>>>>>>>>>> point >>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose >>>>>>>>>>> anything >>>>>>>>>>> either (right?). >>>>>>>>>>> >>>>>>>>>>> I could also use a kafka sink instead, but I can't imagine how >>>>>>>>>>> there could be any difference. It's very real that the sink doesn't >>>>>>>>>>> get any >>>>>>>>>>> input for a long time until the 24-hour window closes, and then it >>>>>>>>>>> quickly >>>>>>>>>>> writes out everything because it's not that much data eventually >>>>>>>>>>> for the >>>>>>>>>>> distinct values. >>>>>>>>>>> >>>>>>>>>>> Any ideas for debugging what's happening around the savepoint & >>>>>>>>>>> restoration time? >>>>>>>>>>> >>>>>>>>>>> *) I actually implemented StreamingFileSink as an alternative >>>>>>>>>>> sink. This was before I came to realize that most likely the sink >>>>>>>>>>> component >>>>>>>>>>> has nothing to do with the data loss problem. I tried it with >>>>>>>>>>> s3n:// path >>>>>>>>>>> just to see an exception being thrown. In the source code I indeed >>>>>>>>>>> then >>>>>>>>>>> found an explicit check for the target path scheme to be "hdfs://". >>>>>>>>>>> >>>>>>>>>>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin < >>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Ok, I think before further debugging the window reduced state, >>>>>>>>>>>> could you try the new ‘StreamingFileSink’ [1] introduced in >>>>>>>>>>>> Flink 1.6.0 instead of the previous 'BucketingSink’? >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Andrey >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html >>>>>>>>>>>> >>>>>>>>>>>> On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Yes, sorry for my confusing comment. I just meant that it seems >>>>>>>>>>>> like there's a bug somewhere now that the output is missing some >>>>>>>>>>>> data. >>>>>>>>>>>> >>>>>>>>>>>> > I would wait and check the actual output in s3 because it is >>>>>>>>>>>> the main result of the job >>>>>>>>>>>> >>>>>>>>>>>> Yes, and that's what I have already done. There seems to be >>>>>>>>>>>> always some data loss with the production data volumes, if the job >>>>>>>>>>>> has been >>>>>>>>>>>> restarted on that day. >>>>>>>>>>>> >>>>>>>>>>>> Would you have any suggestions for how to debug this further? >>>>>>>>>>>> >>>>>>>>>>>> Many thanks for stepping in. >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin < >>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>> >>>>>>>>>>>>> So it is a per key deduplication job. >>>>>>>>>>>>> >>>>>>>>>>>>> Yes, I would wait and check the actual output in s3 because it >>>>>>>>>>>>> is the main result of the job and >>>>>>>>>>>>> >>>>>>>>>>>>> > The late data around the time of taking savepoint might be >>>>>>>>>>>>> not included into the savepoint but it should be behind the >>>>>>>>>>>>> snapshotted >>>>>>>>>>>>> offset in Kafka. >>>>>>>>>>>>> >>>>>>>>>>>>> is not a bug, it is a possible behaviour. >>>>>>>>>>>>> >>>>>>>>>>>>> The savepoint is a snapshot of the data in transient which is >>>>>>>>>>>>> already consumed from Kafka. >>>>>>>>>>>>> Basically the full contents of the window result is split >>>>>>>>>>>>> between the savepoint and what can come after the savepoint'ed >>>>>>>>>>>>> offset in >>>>>>>>>>>>> Kafka but before the window result is written into s3. >>>>>>>>>>>>> >>>>>>>>>>>>> Allowed lateness should not affect it, I am just saying that >>>>>>>>>>>>> the final result in s3 should include all records after it. >>>>>>>>>>>>> This is what should be guaranteed but not the contents of the >>>>>>>>>>>>> intermediate savepoint. >>>>>>>>>>>>> >>>>>>>>>>>>> Cheers, >>>>>>>>>>>>> Andrey >>>>>>>>>>>>> >>>>>>>>>>>>> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for your answer! >>>>>>>>>>>>> >>>>>>>>>>>>> I check for the missed data from the final output on s3. So I >>>>>>>>>>>>> wait until the next day, then run the same thing re-implemented >>>>>>>>>>>>> in batch, >>>>>>>>>>>>> and compare the output. >>>>>>>>>>>>> >>>>>>>>>>>>> > The late data around the time of taking savepoint might be >>>>>>>>>>>>> not included into the savepoint but it should be behind the >>>>>>>>>>>>> snapshotted >>>>>>>>>>>>> offset in Kafka. >>>>>>>>>>>>> >>>>>>>>>>>>> Yes, I would definitely expect that. It seems like there's a >>>>>>>>>>>>> bug somewhere. >>>>>>>>>>>>> >>>>>>>>>>>>> > Then it should just come later after the restore and should >>>>>>>>>>>>> be reduced within the allowed lateness into the final result >>>>>>>>>>>>> which is saved >>>>>>>>>>>>> into s3. >>>>>>>>>>>>> >>>>>>>>>>>>> Well, as far as I know, allowed lateness doesn't play any role >>>>>>>>>>>>> here, because I started running the job with allowedLateness=0, >>>>>>>>>>>>> and still >>>>>>>>>>>>> get the data loss, while my late data output doesn't receive >>>>>>>>>>>>> anything. >>>>>>>>>>>>> >>>>>>>>>>>>> > Also, is this `DistinctFunction.reduce` just an example or >>>>>>>>>>>>> the actual implementation, basically saving just one of records >>>>>>>>>>>>> inside the >>>>>>>>>>>>> 24h window in s3? then what is missing there? >>>>>>>>>>>>> >>>>>>>>>>>>> Yes, it's the actual implementation. Note that there's a keyBy >>>>>>>>>>>>> before the DistinctFunction. So there's one record for each key >>>>>>>>>>>>> (which is >>>>>>>>>>>>> the combination of a couple of fields). In practice I've seen >>>>>>>>>>>>> that we're >>>>>>>>>>>>> missing ~2000-4000 elements on each restore, and the total output >>>>>>>>>>>>> is >>>>>>>>>>>>> obviously much more than that. >>>>>>>>>>>>> >>>>>>>>>>>>> Here's the full code for the key selector: >>>>>>>>>>>>> >>>>>>>>>>>>> public class MapKeySelector implements >>>>>>>>>>>>> KeySelector<Map<String,String>, Object> { >>>>>>>>>>>>> >>>>>>>>>>>>> private final String[] fields; >>>>>>>>>>>>> >>>>>>>>>>>>> public MapKeySelector(String... fields) { >>>>>>>>>>>>> this.fields = fields; >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> @Override >>>>>>>>>>>>> public Object getKey(Map<String, String> event) throws >>>>>>>>>>>>> Exception { >>>>>>>>>>>>> Tuple key = >>>>>>>>>>>>> Tuple.getTupleClass(fields.length).newInstance(); >>>>>>>>>>>>> for (int i = 0; i < fields.length; i++) { >>>>>>>>>>>>> key.setField(event.getOrDefault(fields[i], ""), i); >>>>>>>>>>>>> } >>>>>>>>>>>>> return key; >>>>>>>>>>>>> } >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> And a more exact example on how it's used: >>>>>>>>>>>>> >>>>>>>>>>>>> .keyBy(new MapKeySelector("ID", "PLAYER_ID", >>>>>>>>>>>>> "FIELD", "KEY_NAME", "KEY_VALUE")) >>>>>>>>>>>>> .timeWindow(Time.days(1)) >>>>>>>>>>>>> .reduce(new DistinctFunction()) >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin < >>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Where exactly does the data miss? When do you notice that? >>>>>>>>>>>>>> Do you check it: >>>>>>>>>>>>>> - debugging `DistinctFunction.reduce` right after resume in >>>>>>>>>>>>>> the middle of the day >>>>>>>>>>>>>> or >>>>>>>>>>>>>> - some distinct records miss in the final output >>>>>>>>>>>>>> of BucketingSink in s3 after window result is actually triggered >>>>>>>>>>>>>> and saved >>>>>>>>>>>>>> into s3 at the end of the day? is this the main output? >>>>>>>>>>>>>> >>>>>>>>>>>>>> The late data around the time of taking savepoint might be >>>>>>>>>>>>>> not included into the savepoint but it should be behind the >>>>>>>>>>>>>> snapshotted >>>>>>>>>>>>>> offset in Kafka. Then it should just come later after the >>>>>>>>>>>>>> restore and >>>>>>>>>>>>>> should be reduced within the allowed lateness into the final >>>>>>>>>>>>>> result which >>>>>>>>>>>>>> is saved into s3. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Also, is this `DistinctFunction.reduce` just an example or >>>>>>>>>>>>>> the actual implementation, basically saving just one of records >>>>>>>>>>>>>> inside the >>>>>>>>>>>>>> 24h window in s3? then what is missing there? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> I changed to allowedLateness=0, no change, still missing data >>>>>>>>>>>>>> when restoring from savepoint. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio < >>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> I realized that BucketingSink must not play any role in this >>>>>>>>>>>>>>> problem. This is because only when the 24-hour window triggers, >>>>>>>>>>>>>>> BucketinSink gets a burst of input. Around the state restoring >>>>>>>>>>>>>>> point >>>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose >>>>>>>>>>>>>>> anything >>>>>>>>>>>>>>> either (right?). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I will next try removing the allowedLateness entirely from >>>>>>>>>>>>>>> the equation. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> In the meanwhile, please let me know if you have any >>>>>>>>>>>>>>> suggestions for debugging the lost data, for example what logs >>>>>>>>>>>>>>> to enable. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> We use FlinkKafkaConsumer010 btw. Are there any known issues >>>>>>>>>>>>>>> with that, that could contribute to lost data when restoring a >>>>>>>>>>>>>>> savepoint? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio < >>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Some data is silently lost on my Flink stream job when >>>>>>>>>>>>>>>> state is restored from a savepoint. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Do you have any debugging hints to find out where exactly >>>>>>>>>>>>>>>> the data gets dropped? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> My job gathers distinct values using a 24-hour window. It >>>>>>>>>>>>>>>> doesn't have any custom state management. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> When I cancel the job with savepoint and restore from that >>>>>>>>>>>>>>>> savepoint, some data is missed. It seems to be losing just a >>>>>>>>>>>>>>>> small amount >>>>>>>>>>>>>>>> of data. The event time of lost data is probably around the >>>>>>>>>>>>>>>> time of >>>>>>>>>>>>>>>> savepoint. In other words the rest of the time window is not >>>>>>>>>>>>>>>> entirely >>>>>>>>>>>>>>>> missed – collection works correctly also for (most of the) >>>>>>>>>>>>>>>> events that come >>>>>>>>>>>>>>>> in after restoring. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> When the job processes a full 24-hour window without >>>>>>>>>>>>>>>> interruptions it doesn't miss anything. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Usually the problem doesn't happen in test environments >>>>>>>>>>>>>>>> that have smaller parallelism and smaller data volumes. But in >>>>>>>>>>>>>>>> production >>>>>>>>>>>>>>>> volumes the job seems to be consistently missing at least >>>>>>>>>>>>>>>> something on >>>>>>>>>>>>>>>> every restore. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> This issue has consistently happened since the job was >>>>>>>>>>>>>>>> initially created. It was at first run on an older version of >>>>>>>>>>>>>>>> Flink >>>>>>>>>>>>>>>> 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I'm wondering if this could be for example some >>>>>>>>>>>>>>>> synchronization issue between the kafka consumer offsets vs. >>>>>>>>>>>>>>>> what's been >>>>>>>>>>>>>>>> written by BucketingSink? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1. Job content, simplified >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> kafkaStream >>>>>>>>>>>>>>>> .flatMap(new ExtractFieldsFunction()) >>>>>>>>>>>>>>>> .keyBy(new MapKeySelector(1, 2, 3, 4)) >>>>>>>>>>>>>>>> .timeWindow(Time.days(1)) >>>>>>>>>>>>>>>> .allowedLateness(allowedLateness) >>>>>>>>>>>>>>>> .sideOutputLateData(lateDataTag) >>>>>>>>>>>>>>>> .reduce(new DistinctFunction()) >>>>>>>>>>>>>>>> .addSink(sink) >>>>>>>>>>>>>>>> // use a fixed number of output partitions >>>>>>>>>>>>>>>> .setParallelism(8)) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> /** >>>>>>>>>>>>>>>> * Usage: .keyBy("the", "distinct", "fields").reduce(new >>>>>>>>>>>>>>>> DistinctFunction()) >>>>>>>>>>>>>>>> */ >>>>>>>>>>>>>>>> public class DistinctFunction implements >>>>>>>>>>>>>>>> ReduceFunction<java.util.Map<String, String>> { >>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>> public Map<String, String> reduce(Map<String, String> >>>>>>>>>>>>>>>> value1, Map<String, String> value2) { >>>>>>>>>>>>>>>> return value1; >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2. State configuration >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> boolean enableIncrementalCheckpointing = true; >>>>>>>>>>>>>>>> String statePath = "s3n://bucket/savepoints"; >>>>>>>>>>>>>>>> new RocksDBStateBackend(statePath, >>>>>>>>>>>>>>>> enableIncrementalCheckpointing); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Checkpointing Mode Exactly Once >>>>>>>>>>>>>>>> Interval 1m 0s >>>>>>>>>>>>>>>> Timeout 10m 0s >>>>>>>>>>>>>>>> Minimum Pause Between Checkpoints 1m 0s >>>>>>>>>>>>>>>> Maximum Concurrent Checkpoints 1 >>>>>>>>>>>>>>>> Persist Checkpoints Externally Enabled (retain on >>>>>>>>>>>>>>>> cancellation) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 3. BucketingSink configuration >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> We use BucketingSink, I don't think there's anything >>>>>>>>>>>>>>>> special here, if not the fact that we're writing to S3. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> String outputPath = "s3://bucket/output"; >>>>>>>>>>>>>>>> BucketingSink<Map<String, String>> sink = new >>>>>>>>>>>>>>>> BucketingSink<Map<String, String>>(outputPath) >>>>>>>>>>>>>>>> .setBucketer(new ProcessdateBucketer()) >>>>>>>>>>>>>>>> .setBatchSize(batchSize) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> .setInactiveBucketThreshold(inactiveBucketThreshold) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval); >>>>>>>>>>>>>>>> sink.setWriter(new IdJsonWriter()); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 4. Kafka & event time >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> My flink job reads the data from Kafka, using a >>>>>>>>>>>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> synchronize watermarks accross all kafka partitions. We also >>>>>>>>>>>>>>>> write late >>>>>>>>>>>>>>>> data to side output, but nothing is written there – if it >>>>>>>>>>>>>>>> would, it could >>>>>>>>>>>>>>>> explain missed data in the main output (I'm also sure that our >>>>>>>>>>>>>>>> late data >>>>>>>>>>>>>>>> writing works, because we previously had some actual late data >>>>>>>>>>>>>>>> which ended >>>>>>>>>>>>>>>> up there). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 5. allowedLateness >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> It may be or may not be relevant that I have also enabled >>>>>>>>>>>>>>>> allowedLateness with 1 minute lateness on the 24-hour window: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> If that makes sense, I could try removing allowedLateness >>>>>>>>>>>>>>>> entirely? That would be just to rule out that Flink doesn't >>>>>>>>>>>>>>>> have a bug >>>>>>>>>>>>>>>> that's related to restoring state in combination with the >>>>>>>>>>>>>>>> allowedLateness >>>>>>>>>>>>>>>> feature. After all, all of our data should be in a good enough >>>>>>>>>>>>>>>> order to not >>>>>>>>>>>>>>>> be late, given the max out of orderness used on kafka consumer >>>>>>>>>>>>>>>> timestamp >>>>>>>>>>>>>>>> extractor. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thank you in advance! >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>> >>>> >>> >>> >> >