Hi, > Am 04.10.2018 um 16:08 schrieb Juho Autio <juho.au...@rovio.com>: > > > you could take a look at Bravo [1] to query your savepoints and to check if > > the state in the savepoint complete w.r.t your expectations > > Thanks. I'm not 100% if this is the case, but to me it seemed like the missed > ids were being logged by the reducer soon after the job had started (after > restoring a savepoint). But on the other hand, after that I also made another > savepoint & restored that, so what I could check is: does that next savepoint > have the missed ids that were logged (a couple of minutes before the > savepoint was created, so there should've been more than enough time to add > them to the state before the savepoint was triggered) or not. Any way, if I > would be able to verify with Bravo that the ids are missing from the > savepoint (even though reduced logged that it saw them), would that help in > figuring out where they are lost? Is there some major difference compared to > just looking at the final output after window has been triggered?
I think that makes a difference. For example, you can investigate if there is a state loss or a problem with the windowing. In the savepoint you could see which keys exists and to which windows they are assigned. Also just to make sure there is no misunderstanding: only elements that are in the state at the start of a savepoint are expected to be part of the savepoint; all elements between start and completion of the savepoint are not expected to be part of the savepoint. > > > I also doubt that the problem is about backpressure after restore, because > > the job will only continue running after the state restore is already > > completed. > > Yes, I'm not suspecting that the state restoring would be the problem either. > My concern was about backpressure possibly messing with the updates of > reducing state? I would tend to suspect that updating the state consistently > is what fails, where heavy load / backpressure might be a factor. How would you assume that backpressure would influence your updates? Updates to each local state still happen event-by-event, in a single reader/writing thread. > > On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter <s.rich...@data-artisans.com > <mailto:s.rich...@data-artisans.com>> wrote: > Hi, > > you could take a look at Bravo [1] to query your savepoints and to check if > the state in the savepoint complete w.r.t your expectations. I somewhat doubt > that there is a general problem with the state/savepoints because many users > are successfully running it on a large state and I am not aware of any data > loss problems, but nothing is impossible. What the savepoint does is also > straight forward: iterate a db snapshot and write all key/value pairs to > disk, so all data that was in the db at the time of the savepoint, should > show up. I also doubt that the problem is about backpressure after restore, > because the job will only continue running after the state restore is already > completed. Did you check if you are using exactly-one-semantics or > at-least-once semantics? Also did you check that the kafka consumer start > position is configured properly [2]? Are watermarks generated as expected > after restore? > > One more unrelated high-level comment that I have: for a granularity of 24h > windows, I wonder if it would not make sense to use a batch job instead? > > Best, > Stefan > > [1] https://github.com/king/bravo <https://github.com/king/bravo> > [2] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration > > <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration> > >> Am 04.10.2018 um 14:53 schrieb Juho Autio <juho.au...@rovio.com >> <mailto:juho.au...@rovio.com>>: >> >> Thanks for the suggestions! >> >> > In general, it would be tremendously helpful to have a minimal working >> > example which allows to reproduce the problem. >> >> Definitely. The problem with reproducing has been that this only seems to >> happen in the bigger production data volumes. >> >> That's why I'm hoping to find a way to debug this with the production data. >> With that it seems to consistently cause some misses every time the job is >> killed/restored. >> >> > check if it happens for shorter windows, like 1h etc >> >> What would be the benefit of that compared to 24h window? >> >> > simplify the job to not use a reduce window but simply a time window which >> > outputs the window events. Then counting the input and output events >> > should allow you to verify the results. If you are not seeing missing >> > events, then it could have something to do with the reducing state used in >> > the reduce function. >> >> Hm, maybe, but not sure how useful that would be, because it wouldn't yet >> prove that it's related to reducing, because not having a reduce function >> could also mean smaller load on the job, which might alone be enough to make >> the problem not manifest. >> >> Is there a way to debug what goes into the reducing state (including what >> gets removed or overwritten and what restored), if that makes sense..? Maybe >> some suitable logging could be used to prove that the lost data is written >> to the reducing state (or at least asked to be written), but not found any >> more when the window closes and state is flushed? >> >> On configuration once more, we're using RocksDB state backend with >> asynchronous incremental checkpointing. The state is restored from >> savepoints though, we haven't been using those checkpoints in these tests >> (although they could be used in case of crashes – but we haven't had those >> now). >> >> On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann <trohrm...@apache.org >> <mailto:trohrm...@apache.org>> wrote: >> Hi Juho, >> >> another idea to further narrow down the problem could be to simplify the job >> to not use a reduce window but simply a time window which outputs the window >> events. Then counting the input and output events should allow you to verify >> the results. If you are not seeing missing events, then it could have >> something to do with the reducing state used in the reduce function. >> >> In general, it would be tremendously helpful to have a minimal working >> example which allows to reproduce the problem. >> >> Cheers, >> Till >> >> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin <and...@data-artisans.com >> <mailto:and...@data-artisans.com>> wrote: >> Hi Juho, >> >> can you try to reduce the job to minimal reproducible example and share the >> job and input? >> >> For example: >> - some simple records as input, e.g. tuples of primitive types saved as cvs >> - minimal deduplication job which processes them and misses records >> - check if it happens for shorter windows, like 1h etc >> - setup which you use for the job, ideally locally reproducible or cloud >> >> Best, >> Andrey >> >>> On 4 Oct 2018, at 11:13, Juho Autio <juho.au...@rovio.com >>> <mailto:juho.au...@rovio.com>> wrote: >>> >>> Sorry to insist, but we seem to be blocked for any serious usage of state >>> in Flink if we can't rely on it to not miss data in case of restore. >>> >>> Would anyone have suggestions for how to troubleshoot this? So far I have >>> verified with DEBUG logs that our reduce function gets to process also the >>> data that is missing from window output. >>> >>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <juho.au...@rovio.com >>> <mailto:juho.au...@rovio.com>> wrote: >>> Hi Andrey, >>> >>> To rule out for good any questions about sink behaviour, the job was killed >>> and started with an additional Kafka sink. >>> >>> The same number of ids were missed in both outputs: KafkaSink & >>> BucketingSink. >>> >>> I wonder what would be the next steps in debugging? >>> >>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <juho.au...@rovio.com >>> <mailto:juho.au...@rovio.com>> wrote: >>> Thanks, Andrey. >>> >>> > so it means that the savepoint does not loose at least some dropped >>> > records. >>> >>> I'm not sure what you mean by that? I mean, it was known from the >>> beginning, that not everything is lost before/after restoring a savepoint, >>> just some records around the time of restoration. It's not 100% clear >>> whether records are lost before making a savepoint or after restoring it. >>> Although, based on the new DEBUG logs it seems more like losing some >>> records that are seen ~soon after restoring. It seems like Flink would be >>> somehow confused either about the restored state vs. new inserts to state. >>> This could also be somehow linked to the high back pressure on the kafka >>> source while the stream is catching up. >>> >>> > If it is feasible for your setup, I suggest to insert one more map >>> > function after reduce and before sink. >>> > etc. >>> >>> Isn't that the same thing that we discussed before? Nothing is sent to >>> BucketingSink before the window closes, so I don't see how it would make >>> any difference if we replace the BucketingSink with a map function or >>> another sink type. We don't create or restore savepoints during the time >>> when BucketingSink gets input or has open buckets – that happens at a much >>> later time of day. I would focus on figuring out why the records are lost >>> while the window is open. But I don't know how to do that. Would you have >>> any additional suggestions? >>> >>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin <and...@data-artisans.com >>> <mailto:and...@data-artisans.com>> wrote: >>> Hi Juho, >>> >>> so it means that the savepoint does not loose at least some dropped records. >>> >>> If it is feasible for your setup, I suggest to insert one more map function >>> after reduce and before sink. >>> The map function should be called right after window is triggered but >>> before flushing to s3. >>> The result of reduce (deduped record) could be logged there. >>> This should allow to check whether the processed distinct records were >>> buffered in the state after the restoration from the savepoint or not. If >>> they were buffered we should see that there was an attempt to write them to >>> the sink from the state. >>> >>> Another suggestion is to try to write records to some other sink or to >>> both. >>> E.g. if you can access file system of workers, maybe just into local files >>> and check whether the records are also dropped there. >>> >>> Best, >>> Andrey >>> >>>> On 20 Sep 2018, at 15:37, Juho Autio <juho.au...@rovio.com >>>> <mailto:juho.au...@rovio.com>> wrote: >>>> >>>> Hi Andrey! >>>> >>>> I was finally able to gather the DEBUG logs that you suggested. In short, >>>> the reducer logged that it processed at least some of the ids that were >>>> missing from the output. >>>> >>>> "At least some", because I didn't have the job running with DEBUG logs for >>>> the full 24-hour window period. So I was only able to look up if I can >>>> find some of the missing ids in the DEBUG logs. Which I did indeed. >>>> >>>> I changed the DistinctFunction.java to do this: >>>> >>>> @Override >>>> public Map<String, String> reduce(Map<String, String> value1, >>>> Map<String, String> value2) { >>>> LOG.debug("DistinctFunction.reduce returns: {}={}", >>>> value1.get("field"), value1.get("id")); >>>> return value1; >>>> } >>>> >>>> Then: >>>> >>>> vi flink-1.6.0/conf/log4j.properties >>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG >>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG >>>> >>>> Then I ran the following kind of test: >>>> >>>> - Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC >>>> 2018 >>>> - Started a new cluster & job with DEBUG enabled at ~09:13, restored from >>>> that previous cluster's savepoint >>>> - Ran until caught up offsets >>>> - Cancelled the job with a new savepoint >>>> - Started a new job _without_ DEBUG, which restored the new savepoint, let >>>> it keep running so that it will eventually write the output >>>> >>>> Then on the next day, after results had been flushed when the 24-hour >>>> window closed, I compared the results again with a batch version's output. >>>> And found some missing ids as usual. >>>> >>>> I drilled down to one specific missing id (I'm replacing the actual value >>>> with AN12345 below), which was not found in the stream output, but was >>>> found in batch output & flink DEBUG logs. >>>> >>>> Related to that id, I gathered the following information: >>>> >>>> 2018-09-18~09:13:21,000 job started & savepoint is restored >>>> >>>> 2018-09-18 09:14:29,085 missing id is processed for the first time, proved >>>> by this log line: >>>> 2018-09-18 09:14:29,085 DEBUG com.rovio.ds.flink.uniqueid.DistinctFunction >>>> - DistinctFunction.reduce returns: s.aid1=AN12345 >>>> >>>> 2018-09-18 09:15:14,264 first synchronous part of checkpoint >>>> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint >>>> >>>> ( >>>> more occurrences of checkpoints (~1 min checkpointing time + ~1 min >>>> delay before next) >>>> / >>>> more occurrences of DistinctFunction.reduce >>>> ) >>>> >>>> 2018-09-18 09:23:45,053 missing id is processed for the last time >>>> >>>> 2018-09-18~10:20:00,000 savepoint created & job cancelled >>>> >>>> To be noted, there was high backpressure after restoring from savepoint >>>> until the stream caught up with the kafka offsets. Although, our job uses >>>> assign timestamps & watermarks on the flink kafka consumer itself, so >>>> event time of all partitions is synchronized. As expected, we don't get >>>> any late data in the late data side output. >>>> >>>> From this we can see that the missing ids are processed by the reducer, >>>> but they must get lost somewhere before the 24-hour window is triggered. >>>> >>>> I think it's worth mentioning once more that the stream doesn't miss any >>>> ids if we let it's running without interruptions / state restoring. >>>> >>>> What's next? >>>> >>>> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin <and...@data-artisans.com >>>> <mailto:and...@data-artisans.com>> wrote: >>>> Hi Juho, >>>> >>>> > only when the 24-hour window triggers, BucketingSink gets a burst of >>>> > input >>>> >>>> This is of course totally true, my understanding is the same. We cannot >>>> exclude problem there for sure, just savepoints are used a lot w/o problem >>>> reports and BucketingSink is known to be problematic with s3. That is why, >>>> I asked you: >>>> >>>> > You also wrote that the timestamps of lost event are 'probably' around >>>> > the time of the savepoint, if it is not yet for sure I would also check >>>> > it. >>>> >>>> Although, bucketing sink might loose any data at the end of the day (also >>>> from the middle). The fact, that it is always around the time of taking a >>>> savepoint and not random, is surely suspicious and possible savepoint >>>> failures need to be investigated. >>>> >>>> Regarding the s3 problem, s3 doc says: >>>> >>>> > The caveat is that if you make a HEAD or GET request to the key name (to >>>> > find if the object exists) before creating the object, Amazon S3 >>>> > provides 'eventual consistency' for read-after-write. >>>> >>>> The algorithm you suggest is how it is roughly implemented now >>>> (BucketingSink.openNewPartFile). My understanding is that 'eventual >>>> consistency’ means that even if you just created file (its name is key) it >>>> can be that you do not get it in the list or exists (HEAD) returns false >>>> and you risk to rewrite the previous part. >>>> >>>> The BucketingSink was designed for a standard file system. s3 is used over >>>> a file system wrapper atm but does not always provide normal file system >>>> guarantees. See also last example in [1]. >>>> >>>> Cheers, >>>> Andrey >>>> >>>> [1] >>>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82 >>>> >>>> <https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82> >>>> >>>>> On 29 Aug 2018, at 12:11, Juho Autio <juho.au...@rovio.com >>>>> <mailto:juho.au...@rovio.com>> wrote: >>>>> >>>>> Andrey, thank you very much for the debugging suggestions, I'll try them. >>>>> >>>>> In the meanwhile two more questions, please: >>>>> >>>>> > Just to keep in mind this problem with s3 and exclude it for sure. I >>>>> > would also check whether the size of missing events is around the batch >>>>> > size of BucketingSink or not. >>>>> >>>>> Fair enough, but I also want to focus on debugging the most probable >>>>> subject first. So what do you think about this – true or false: only when >>>>> the 24-hour window triggers, BucketinSink gets a burst of input. Around >>>>> the state restoring point (middle of the day) it doesn't get any input, >>>>> so it can't lose anything either. Isn't this true, or have I totally >>>>> missed how Flink works in triggering window results? I would not expect >>>>> there to be any optimization that speculatively triggers early results of >>>>> a regular time window to the downstream operators. >>>>> >>>>> > The old BucketingSink has in general problem with s3. Internally >>>>> > BucketingSink queries s3 as a file system to list already written file >>>>> > parts (batches) and determine index of the next part to start. Due to >>>>> > eventual consistency of checking file existence in s3 [1], the >>>>> > BucketingSink can rewrite the previously written part and basically >>>>> > loose it. >>>>> >>>>> I was wondering, what does S3's "read-after-write consistency" (mentioned >>>>> on the page you linked) actually mean. It seems that this might be >>>>> possible: >>>>> - LIST keys, find current max index >>>>> - choose next index = max + 1 >>>>> - HEAD next index: if it exists, keep adding + 1 until key doesn't exist >>>>> on S3 >>>>> >>>>> But definitely sounds easier if a sink keeps track of files in a way >>>>> that's guaranteed to be consistent. >>>>> >>>>> Cheers, >>>>> Juho >>>>> >>>>> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <and...@data-artisans.com >>>>> <mailto:and...@data-artisans.com>> wrote: >>>>> Hi, >>>>> >>>>> true, StreamingFileSink does not support s3 in 1.6.0, it is planned for >>>>> the next 1.7 release, sorry for confusion. >>>>> The old BucketingSink has in general problem with s3. Internally >>>>> BucketingSink queries s3 as a file system >>>>> to list already written file parts (batches) and determine index of the >>>>> next part to start. Due to eventual consistency of checking file >>>>> existence in s3 [1], the BucketingSink can rewrite the previously written >>>>> part and basically loose it. It should be fixed for StreamingFileSink in >>>>> 1.7 where Flink keeps its own track of written parts and does not rely on >>>>> s3 as a file system. >>>>> I also include Kostas, he might add more details. >>>>> >>>>> Just to keep in mind this problem with s3 and exclude it for sure I >>>>> would also check whether the size of missing events is around the batch >>>>> size of BucketingSink or not. You also wrote that the timestamps of lost >>>>> event are 'probably' around the time of the savepoint, if it is not yet >>>>> for sure I would also check it. >>>>> >>>>> Have you already checked the log files of job manager and task managers >>>>> for the job running before and after the restore from the check point? Is >>>>> everything successful there, no errors, relevant warnings or exceptions? >>>>> >>>>> As the next step, I would suggest to log all encountered events in >>>>> DistinctFunction.reduce if possible for production data and check whether >>>>> the missed events are eventually processed before or after the savepoint. >>>>> The following log message indicates a border between the events that >>>>> should be included into the savepoint (logged before) or not: >>>>> “{} ({}, synchronous part) in thread {} took {} ms” (template) >>>>> Also check if the savepoint has been overall completed: >>>>> "{} ({}, asynchronous part) in thread {} took {} ms." >>>>> >>>>> Best, >>>>> Andrey >>>>> >>>>> [1] https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html >>>>> <https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html> >>>>> >>>>>> On 24 Aug 2018, at 20:41, Juho Autio <juho.au...@rovio.com >>>>>> <mailto:juho.au...@rovio.com>> wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> Using StreamingFileSink is not a convenient option for production use >>>>>> for us as it doesn't support s3*. I could use StreamingFileSink just to >>>>>> verify, but I don't see much point in doing so. Please consider my >>>>>> previous comment: >>>>>> >>>>>> > I realized that BucketingSink must not play any role in this problem. >>>>>> > This is because only when the 24-hour window triggers, BucketingSink >>>>>> > gets a burst of input. Around the state restoring point (middle of the >>>>>> > day) it doesn't get any input, so it can't lose anything either >>>>>> > (right?). >>>>>> >>>>>> I could also use a kafka sink instead, but I can't imagine how there >>>>>> could be any difference. It's very real that the sink doesn't get any >>>>>> input for a long time until the 24-hour window closes, and then it >>>>>> quickly writes out everything because it's not that much data eventually >>>>>> for the distinct values. >>>>>> >>>>>> Any ideas for debugging what's happening around the savepoint & >>>>>> restoration time? >>>>>> >>>>>> *) I actually implemented StreamingFileSink as an alternative sink. This >>>>>> was before I came to realize that most likely the sink component has >>>>>> nothing to do with the data loss problem. I tried it with s3n:// path >>>>>> just to see an exception being thrown. In the source code I indeed then >>>>>> found an explicit check for the target path scheme to be "hdfs://". >>>>>> >>>>>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin >>>>>> <and...@data-artisans.com <mailto:and...@data-artisans.com>> wrote: >>>>>> Ok, I think before further debugging the window reduced state, >>>>>> could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 >>>>>> instead of the previous 'BucketingSink’? >>>>>> >>>>>> Cheers, >>>>>> Andrey >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html >>>>>> >>>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html> >>>>>> >>>>>>> On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com >>>>>>> <mailto:juho.au...@rovio.com>> wrote: >>>>>>> >>>>>>> Yes, sorry for my confusing comment. I just meant that it seems like >>>>>>> there's a bug somewhere now that the output is missing some data. >>>>>>> >>>>>>> > I would wait and check the actual output in s3 because it is the main >>>>>>> > result of the job >>>>>>> >>>>>>> Yes, and that's what I have already done. There seems to be always some >>>>>>> data loss with the production data volumes, if the job has been >>>>>>> restarted on that day. >>>>>>> >>>>>>> Would you have any suggestions for how to debug this further? >>>>>>> >>>>>>> Many thanks for stepping in. >>>>>>> >>>>>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin >>>>>>> <and...@data-artisans.com <mailto:and...@data-artisans.com>> wrote: >>>>>>> Hi Juho, >>>>>>> >>>>>>> So it is a per key deduplication job. >>>>>>> >>>>>>> Yes, I would wait and check the actual output in s3 because it is the >>>>>>> main result of the job and >>>>>>> >>>>>>> > The late data around the time of taking savepoint might be not >>>>>>> > included into the savepoint but it should be behind the snapshotted >>>>>>> > offset in Kafka. >>>>>>> >>>>>>> is not a bug, it is a possible behaviour. >>>>>>> >>>>>>> The savepoint is a snapshot of the data in transient which is already >>>>>>> consumed from Kafka. >>>>>>> Basically the full contents of the window result is split between the >>>>>>> savepoint and what can come after the savepoint'ed offset in Kafka but >>>>>>> before the window result is written into s3. >>>>>>> >>>>>>> Allowed lateness should not affect it, I am just saying that the final >>>>>>> result in s3 should include all records after it. >>>>>>> This is what should be guaranteed but not the contents of the >>>>>>> intermediate savepoint. >>>>>>> >>>>>>> Cheers, >>>>>>> Andrey >>>>>>> >>>>>>>> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com >>>>>>>> <mailto:juho.au...@rovio.com>> wrote: >>>>>>>> >>>>>>>> Thanks for your answer! >>>>>>>> >>>>>>>> I check for the missed data from the final output on s3. So I wait >>>>>>>> until the next day, then run the same thing re-implemented in batch, >>>>>>>> and compare the output. >>>>>>>> >>>>>>>> > The late data around the time of taking savepoint might be not >>>>>>>> > included into the savepoint but it should be behind the snapshotted >>>>>>>> > offset in Kafka. >>>>>>>> >>>>>>>> Yes, I would definitely expect that. It seems like there's a bug >>>>>>>> somewhere. >>>>>>>> >>>>>>>> > Then it should just come later after the restore and should be >>>>>>>> > reduced within the allowed lateness into the final result which is >>>>>>>> > saved into s3. >>>>>>>> >>>>>>>> Well, as far as I know, allowed lateness doesn't play any role here, >>>>>>>> because I started running the job with allowedLateness=0, and still >>>>>>>> get the data loss, while my late data output doesn't receive anything. >>>>>>>> >>>>>>>> > Also, is this `DistinctFunction.reduce` just an example or the >>>>>>>> > actual implementation, basically saving just one of records inside >>>>>>>> > the 24h window in s3? then what is missing there? >>>>>>>> >>>>>>>> Yes, it's the actual implementation. Note that there's a keyBy before >>>>>>>> the DistinctFunction. So there's one record for each key (which is the >>>>>>>> combination of a couple of fields). In practice I've seen that we're >>>>>>>> missing ~2000-4000 elements on each restore, and the total output is >>>>>>>> obviously much more than that. >>>>>>>> >>>>>>>> Here's the full code for the key selector: >>>>>>>> >>>>>>>> public class MapKeySelector implements KeySelector<Map<String,String>, >>>>>>>> Object> { >>>>>>>> >>>>>>>> private final String[] fields; >>>>>>>> >>>>>>>> public MapKeySelector(String... fields) { >>>>>>>> this.fields = fields; >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public Object getKey(Map<String, String> event) throws Exception { >>>>>>>> Tuple key = Tuple.getTupleClass(fields.length).newInstance(); >>>>>>>> for (int i = 0; i < fields.length; i++) { >>>>>>>> key.setField(event.getOrDefault(fields[i], ""), i); >>>>>>>> } >>>>>>>> return key; >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> And a more exact example on how it's used: >>>>>>>> >>>>>>>> .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", >>>>>>>> "KEY_NAME", "KEY_VALUE")) >>>>>>>> .timeWindow(Time.days(1)) >>>>>>>> .reduce(new DistinctFunction()) >>>>>>>> >>>>>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin >>>>>>>> <and...@data-artisans.com <mailto:and...@data-artisans.com>> wrote: >>>>>>>> Hi Juho, >>>>>>>> >>>>>>>> Where exactly does the data miss? When do you notice that? >>>>>>>> Do you check it: >>>>>>>> - debugging `DistinctFunction.reduce` right after resume in the middle >>>>>>>> of the day >>>>>>>> or >>>>>>>> - some distinct records miss in the final output of BucketingSink in >>>>>>>> s3 after window result is actually triggered and saved into s3 at the >>>>>>>> end of the day? is this the main output? >>>>>>>> >>>>>>>> The late data around the time of taking savepoint might be not >>>>>>>> included into the savepoint but it should be behind the snapshotted >>>>>>>> offset in Kafka. Then it should just come later after the restore and >>>>>>>> should be reduced within the allowed lateness into the final result >>>>>>>> which is saved into s3. >>>>>>>> >>>>>>>> Also, is this `DistinctFunction.reduce` just an example or the actual >>>>>>>> implementation, basically saving just one of records inside the 24h >>>>>>>> window in s3? then what is missing there? >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Andrey >>>>>>>> >>>>>>>>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com >>>>>>>>> <mailto:juho.au...@rovio.com>> wrote: >>>>>>>>> >>>>>>>>> I changed to allowedLateness=0, no change, still missing data when >>>>>>>>> restoring from savepoint. >>>>>>>>> >>>>>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <juho.au...@rovio.com >>>>>>>>> <mailto:juho.au...@rovio.com>> wrote: >>>>>>>>> I realized that BucketingSink must not play any role in this problem. >>>>>>>>> This is because only when the 24-hour window triggers, BucketinSink >>>>>>>>> gets a burst of input. Around the state restoring point (middle of >>>>>>>>> the day) it doesn't get any input, so it can't lose anything either >>>>>>>>> (right?). >>>>>>>>> >>>>>>>>> I will next try removing the allowedLateness entirely from the >>>>>>>>> equation. >>>>>>>>> >>>>>>>>> In the meanwhile, please let me know if you have any suggestions for >>>>>>>>> debugging the lost data, for example what logs to enable. >>>>>>>>> >>>>>>>>> We use FlinkKafkaConsumer010 btw. Are there any known issues with >>>>>>>>> that, that could contribute to lost data when restoring a savepoint? >>>>>>>>> >>>>>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <juho.au...@rovio.com >>>>>>>>> <mailto:juho.au...@rovio.com>> wrote: >>>>>>>>> Some data is silently lost on my Flink stream job when state is >>>>>>>>> restored from a savepoint. >>>>>>>>> >>>>>>>>> Do you have any debugging hints to find out where exactly the data >>>>>>>>> gets dropped? >>>>>>>>> >>>>>>>>> My job gathers distinct values using a 24-hour window. It doesn't >>>>>>>>> have any custom state management. >>>>>>>>> >>>>>>>>> When I cancel the job with savepoint and restore from that savepoint, >>>>>>>>> some data is missed. It seems to be losing just a small amount of >>>>>>>>> data. The event time of lost data is probably around the time of >>>>>>>>> savepoint. In other words the rest of the time window is not entirely >>>>>>>>> missed – collection works correctly also for (most of the) events >>>>>>>>> that come in after restoring. >>>>>>>>> >>>>>>>>> When the job processes a full 24-hour window without interruptions it >>>>>>>>> doesn't miss anything. >>>>>>>>> >>>>>>>>> Usually the problem doesn't happen in test environments that have >>>>>>>>> smaller parallelism and smaller data volumes. But in production >>>>>>>>> volumes the job seems to be consistently missing at least something >>>>>>>>> on every restore. >>>>>>>>> >>>>>>>>> This issue has consistently happened since the job was initially >>>>>>>>> created. It was at first run on an older version of Flink >>>>>>>>> 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0. >>>>>>>>> >>>>>>>>> I'm wondering if this could be for example some synchronization issue >>>>>>>>> between the kafka consumer offsets vs. what's been written by >>>>>>>>> BucketingSink? >>>>>>>>> >>>>>>>>> 1. Job content, simplified >>>>>>>>> >>>>>>>>> kafkaStream >>>>>>>>> .flatMap(new ExtractFieldsFunction()) >>>>>>>>> .keyBy(new MapKeySelector(1, 2, 3, 4)) >>>>>>>>> .timeWindow(Time.days(1)) >>>>>>>>> .allowedLateness(allowedLateness) >>>>>>>>> .sideOutputLateData(lateDataTag) >>>>>>>>> .reduce(new DistinctFunction()) >>>>>>>>> .addSink(sink) >>>>>>>>> // use a fixed number of output partitions >>>>>>>>> .setParallelism(8)) >>>>>>>>> >>>>>>>>> /** >>>>>>>>> * Usage: .keyBy("the", "distinct", "fields").reduce(new >>>>>>>>> DistinctFunction()) >>>>>>>>> */ >>>>>>>>> public class DistinctFunction implements >>>>>>>>> ReduceFunction<java.util.Map<String, String>> { >>>>>>>>> @Override >>>>>>>>> public Map<String, String> reduce(Map<String, String> value1, >>>>>>>>> Map<String, String> value2) { >>>>>>>>> return value1; >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> 2. State configuration >>>>>>>>> >>>>>>>>> boolean enableIncrementalCheckpointing = true; >>>>>>>>> String statePath = "s3n://bucket/savepoints <>"; >>>>>>>>> new RocksDBStateBackend(statePath, enableIncrementalCheckpointing); >>>>>>>>> >>>>>>>>> Checkpointing Mode Exactly Once >>>>>>>>> Interval 1m 0s >>>>>>>>> Timeout 10m 0s >>>>>>>>> Minimum Pause Between Checkpoints 1m 0s >>>>>>>>> Maximum Concurrent Checkpoints 1 >>>>>>>>> Persist Checkpoints Externally Enabled (retain on cancellation) >>>>>>>>> >>>>>>>>> 3. BucketingSink configuration >>>>>>>>> >>>>>>>>> We use BucketingSink, I don't think there's anything special here, if >>>>>>>>> not the fact that we're writing to S3. >>>>>>>>> >>>>>>>>> String outputPath = "s3://bucket/output <>"; >>>>>>>>> BucketingSink<Map<String, String>> sink = new >>>>>>>>> BucketingSink<Map<String, String>>(outputPath) >>>>>>>>> .setBucketer(new ProcessdateBucketer()) >>>>>>>>> .setBatchSize(batchSize) >>>>>>>>> .setInactiveBucketThreshold(inactiveBucketThreshold) >>>>>>>>> >>>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval); >>>>>>>>> sink.setWriter(new IdJsonWriter()); >>>>>>>>> >>>>>>>>> 4. Kafka & event time >>>>>>>>> >>>>>>>>> My flink job reads the data from Kafka, using a >>>>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to >>>>>>>>> synchronize watermarks accross all kafka partitions. We also write >>>>>>>>> late data to side output, but nothing is written there – if it would, >>>>>>>>> it could explain missed data in the main output (I'm also sure that >>>>>>>>> our late data writing works, because we previously had some actual >>>>>>>>> late data which ended up there). >>>>>>>>> >>>>>>>>> 5. allowedLateness >>>>>>>>> >>>>>>>>> It may be or may not be relevant that I have also enabled >>>>>>>>> allowedLateness with 1 minute lateness on the 24-hour window: >>>>>>>>> >>>>>>>>> If that makes sense, I could try removing allowedLateness entirely? >>>>>>>>> That would be just to rule out that Flink doesn't have a bug that's >>>>>>>>> related to restoring state in combination with the allowedLateness >>>>>>>>> feature. After all, all of our data should be in a good enough order >>>>>>>>> to not be late, given the max out of orderness used on kafka consumer >>>>>>>>> timestamp extractor. >>>>>>>>> >>>>>>>>> Thank you in advance! >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>>> >>> >>> >>> >>> >> >> > >