Good then, I'll try to analyze the savepoints with Bravo. Thanks!

> How would you assume that backpressure would influence your updates?
Updates to each local state still happen event-by-event, in a single
reader/writing thread.

Sure, just an ignorant guess by me. I'm not familiar with most of Flink's
internals. Any way high backpressure is not a seen on this job after it has
caught up the lag, so at I thought it would be worth mentioning.

On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter <s.rich...@data-artisans.com>
wrote:

> Hi,
>
> Am 04.10.2018 um 16:08 schrieb Juho Autio <juho.au...@rovio.com>:
>
> > you could take a look at Bravo [1] to query your savepoints and to check
> if the state in the savepoint complete w.r.t your expectations
>
> Thanks. I'm not 100% if this is the case, but to me it seemed like the
> missed ids were being logged by the reducer soon after the job had started
> (after restoring a savepoint). But on the other hand, after that I also
> made another savepoint & restored that, so what I could check is: does that
> next savepoint have the missed ids that were logged (a couple of minutes
> before the savepoint was created, so there should've been more than enough
> time to add them to the state before the savepoint was triggered) or not.
> Any way, if I would be able to verify with Bravo that the ids are missing
> from the savepoint (even though reduced logged that it saw them), would
> that help in figuring out where they are lost? Is there some major
> difference compared to just looking at the final output after window has
> been triggered?
>
>
>
> I think that makes a difference. For example, you can investigate if there
> is a state loss or a problem with the windowing. In the savepoint you could
> see which keys exists and to which windows they are assigned. Also just to
> make sure there is no misunderstanding: only elements that are in the state
> at the start of a savepoint are expected to be part of the savepoint; all
> elements between start and completion of the savepoint are not expected to
> be part of the savepoint.
>
>
> > I also doubt that the problem is about backpressure after restore,
> because the job will only continue running after the state restore is
> already completed.
>
> Yes, I'm not suspecting that the state restoring would be the problem
> either. My concern was about backpressure possibly messing with the updates
> of reducing state? I would tend to suspect that updating the state
> consistently is what fails, where heavy load / backpressure might be a
> factor.
>
>
>
> How would you assume that backpressure would influence your updates?
> Updates to each local state still happen event-by-event, in a single
> reader/writing thread.
>
>
> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter <s.rich...@data-artisans.com>
> wrote:
>
>> Hi,
>>
>> you could take a look at Bravo [1] to query your savepoints and to check
>> if the state in the savepoint complete w.r.t your expectations. I somewhat
>> doubt that there is a general problem with the state/savepoints because
>> many users are successfully running it on a large state and I am not aware
>> of any data loss problems, but nothing is impossible. What the savepoint
>> does is also straight forward: iterate a db snapshot and write all
>> key/value pairs to disk, so all data that was in the db at the time of the
>> savepoint, should show up. I also doubt that the problem is about
>> backpressure after restore, because the job will only continue running
>> after the state restore is already completed. Did you check if you are
>> using exactly-one-semantics or at-least-once semantics? Also did you check
>> that the kafka consumer start position is configured properly [2]? Are
>> watermarks generated as expected after restore?
>>
>> One more unrelated high-level comment that I have: for a granularity of
>> 24h windows, I wonder if it would not make sense to use a batch job instead?
>>
>> Best,
>> Stefan
>>
>> [1] https://github.com/king/bravo
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>
>> Am 04.10.2018 um 14:53 schrieb Juho Autio <juho.au...@rovio.com>:
>>
>> Thanks for the suggestions!
>>
>> > In general, it would be tremendously helpful to have a minimal working
>> example which allows to reproduce the problem.
>>
>> Definitely. The problem with reproducing has been that this only seems to
>> happen in the bigger production data volumes.
>>
>> That's why I'm hoping to find a way to debug this with the production
>> data. With that it seems to consistently cause some misses every time the
>> job is killed/restored.
>>
>> > check if it happens for shorter windows, like 1h etc
>>
>> What would be the benefit of that compared to 24h window?
>>
>> > simplify the job to not use a reduce window but simply a time window
>> which outputs the window events. Then counting the input and output events
>> should allow you to verify the results. If you are not seeing missing
>> events, then it could have something to do with the reducing state used in
>> the reduce function.
>>
>> Hm, maybe, but not sure how useful that would be, because it wouldn't yet
>> prove that it's related to reducing, because not having a reduce function
>> could also mean smaller load on the job, which might alone be enough to
>> make the problem not manifest.
>>
>> Is there a way to debug what goes into the reducing state (including what
>> gets removed or overwritten and what restored), if that makes sense..?
>> Maybe some suitable logging could be used to prove that the lost data is
>> written to the reducing state (or at least asked to be written), but not
>> found any more when the window closes and state is flushed?
>>
>> On configuration once more, we're using RocksDB state backend with
>> asynchronous incremental checkpointing. The state is restored from
>> savepoints though, we haven't been using those checkpoints in these tests
>> (although they could be used in case of crashes – but we haven't had those
>> now).
>>
>> On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hi Juho,
>>>
>>> another idea to further narrow down the problem could be to simplify the
>>> job to not use a reduce window but simply a time window which outputs the
>>> window events. Then counting the input and output events should allow you
>>> to verify the results. If you are not seeing missing events, then it could
>>> have something to do with the reducing state used in the reduce function.
>>>
>>> In general, it would be tremendously helpful to have a minimal working
>>> example which allows to reproduce the problem.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin <and...@data-artisans.com>
>>> wrote:
>>>
>>>> Hi Juho,
>>>>
>>>> can you try to reduce the job to minimal reproducible example and share
>>>> the job and input?
>>>>
>>>> For example:
>>>> - some simple records as input, e.g. tuples of primitive types saved as
>>>> cvs
>>>> - minimal deduplication job which processes them and misses records
>>>> - check if it happens for shorter windows, like 1h etc
>>>> - setup which you use for the job, ideally locally reproducible or cloud
>>>>
>>>> Best,
>>>> Andrey
>>>>
>>>> On 4 Oct 2018, at 11:13, Juho Autio <juho.au...@rovio.com> wrote:
>>>>
>>>> Sorry to insist, but we seem to be blocked for any serious usage of
>>>> state in Flink if we can't rely on it to not miss data in case of restore.
>>>>
>>>> Would anyone have suggestions for how to troubleshoot this? So far I
>>>> have verified with DEBUG logs that our reduce function gets to process also
>>>> the data that is missing from window output.
>>>>
>>>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <juho.au...@rovio.com>
>>>> wrote:
>>>>
>>>>> Hi Andrey,
>>>>>
>>>>> To rule out for good any questions about sink behaviour, the job was
>>>>> killed and started with an additional Kafka sink.
>>>>>
>>>>> The same number of ids were missed in both outputs: KafkaSink &
>>>>> BucketingSink.
>>>>>
>>>>> I wonder what would be the next steps in debugging?
>>>>>
>>>>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <juho.au...@rovio.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks, Andrey.
>>>>>>
>>>>>> > so it means that the savepoint does not loose at least some dropped
>>>>>> records.
>>>>>>
>>>>>> I'm not sure what you mean by that? I mean, it was known from the
>>>>>> beginning, that not everything is lost before/after restoring a 
>>>>>> savepoint,
>>>>>> just some records around the time of restoration. It's not 100% clear
>>>>>> whether records are lost before making a savepoint or after restoring it.
>>>>>> Although, based on the new DEBUG logs it seems more like losing some
>>>>>> records that are seen ~soon after restoring. It seems like Flink would be
>>>>>> somehow confused either about the restored state vs. new inserts to 
>>>>>> state.
>>>>>> This could also be somehow linked to the high back pressure on the kafka
>>>>>> source while the stream is catching up.
>>>>>>
>>>>>> > If it is feasible for your setup, I suggest to insert one more map
>>>>>> function after reduce and before sink.
>>>>>> > etc.
>>>>>>
>>>>>> Isn't that the same thing that we discussed before? Nothing is sent
>>>>>> to BucketingSink before the window closes, so I don't see how it would 
>>>>>> make
>>>>>> any difference if we replace the BucketingSink with a map function or
>>>>>> another sink type. We don't create or restore savepoints during the time
>>>>>> when BucketingSink gets input or has open buckets – that happens at a 
>>>>>> much
>>>>>> later time of day. I would focus on figuring out why the records are lost
>>>>>> while the window is open. But I don't know how to do that. Would you have
>>>>>> any additional suggestions?
>>>>>>
>>>>>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin <
>>>>>> and...@data-artisans.com> wrote:
>>>>>>
>>>>>>> Hi Juho,
>>>>>>>
>>>>>>> so it means that the savepoint does not loose at least some dropped
>>>>>>> records.
>>>>>>>
>>>>>>> If it is feasible for your setup, I suggest to insert one more map
>>>>>>> function after reduce and before sink.
>>>>>>> The map function should be called right after window is triggered
>>>>>>> but before flushing to s3.
>>>>>>> The result of reduce (deduped record) could be logged there.
>>>>>>> This should allow to check whether the processed distinct records
>>>>>>> were buffered in the state after the restoration from the savepoint or 
>>>>>>> not.
>>>>>>> If they were buffered we should see that there was an attempt to write 
>>>>>>> them
>>>>>>> to the sink from the state.
>>>>>>>
>>>>>>> Another suggestion is to try to write records to some other sink or
>>>>>>> to both.
>>>>>>> E.g. if you can access file system of workers, maybe just into local
>>>>>>> files and check whether the records are also dropped there.
>>>>>>>
>>>>>>> Best,
>>>>>>> Andrey
>>>>>>>
>>>>>>> On 20 Sep 2018, at 15:37, Juho Autio <juho.au...@rovio.com> wrote:
>>>>>>>
>>>>>>> Hi Andrey!
>>>>>>>
>>>>>>> I was finally able to gather the DEBUG logs that you suggested. In
>>>>>>> short, the reducer logged that it processed at least some of the ids 
>>>>>>> that
>>>>>>> were missing from the output.
>>>>>>>
>>>>>>> "At least some", because I didn't have the job running with DEBUG
>>>>>>> logs for the full 24-hour window period. So I was only able to look up 
>>>>>>> if I
>>>>>>> can find *some* of the missing ids in the DEBUG logs. Which I did
>>>>>>> indeed.
>>>>>>>
>>>>>>> I changed the DistinctFunction.java to do this:
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public Map<String, String> reduce(Map<String, String> value1,
>>>>>>> Map<String, String> value2) {
>>>>>>>         LOG.debug("DistinctFunction.reduce returns: {}={}",
>>>>>>> value1.get("field"), value1.get("id"));
>>>>>>>         return value1;
>>>>>>>     }
>>>>>>>
>>>>>>> Then:
>>>>>>>
>>>>>>> vi flink-1.6.0/conf/log4j.properties
>>>>>>>
>>>>>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
>>>>>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
>>>>>>>
>>>>>>> Then I ran the following kind of test:
>>>>>>>
>>>>>>> - Cancelled the on-going job with savepoint created at ~Sep 18 08:35
>>>>>>> UTC 2018
>>>>>>> - Started a new cluster & job with DEBUG enabled at ~09:13, restored
>>>>>>> from that previous cluster's savepoint
>>>>>>> - Ran until caught up offsets
>>>>>>> - Cancelled the job with a new savepoint
>>>>>>> - Started a new job _without_ DEBUG, which restored the new
>>>>>>> savepoint, let it keep running so that it will eventually write the 
>>>>>>> output
>>>>>>>
>>>>>>> Then on the next day, after results had been flushed when the
>>>>>>> 24-hour window closed, I compared the results again with a batch 
>>>>>>> version's
>>>>>>> output. And found some missing ids as usual.
>>>>>>>
>>>>>>> I drilled down to one specific missing id (I'm replacing the actual
>>>>>>> value with AN12345 below), which was not found in the stream output, but
>>>>>>> was found in batch output & flink DEBUG logs.
>>>>>>>
>>>>>>> Related to that id, I gathered the following information:
>>>>>>>
>>>>>>> 2018-09-18~09:13:21,000 job started & savepoint is restored
>>>>>>>
>>>>>>> 2018-09-18 09:14:29,085 missing id is processed for the first time,
>>>>>>> proved by this log line:
>>>>>>> 2018-09-18 09:14:29,085 DEBUG
>>>>>>> com.rovio.ds.flink.uniqueid.DistinctFunction                  -
>>>>>>> DistinctFunction.reduce returns: s.aid1=AN12345
>>>>>>>
>>>>>>> 2018-09-18 09:15:14,264 first synchronous part of checkpoint
>>>>>>> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint
>>>>>>>
>>>>>>> (
>>>>>>> more occurrences of checkpoints (~1 min checkpointing time + ~1 min
>>>>>>> delay before next)
>>>>>>> /
>>>>>>> more occurrences of DistinctFunction.reduce
>>>>>>> )
>>>>>>>
>>>>>>> 2018-09-18 09:23:45,053 missing id is processed for the last time
>>>>>>>
>>>>>>> 2018-09-18~10:20:00,000 savepoint created & job cancelled
>>>>>>>
>>>>>>> To be noted, there was high backpressure after restoring from
>>>>>>> savepoint until the stream caught up with the kafka offsets. Although, 
>>>>>>> our
>>>>>>> job uses assign timestamps & watermarks on the flink kafka consumer 
>>>>>>> itself,
>>>>>>> so event time of all partitions is synchronized. As expected, we don't 
>>>>>>> get
>>>>>>> any late data in the late data side output.
>>>>>>>
>>>>>>> From this we can see that the missing ids are processed by the
>>>>>>> reducer, but they must get lost somewhere before the 24-hour window is
>>>>>>> triggered.
>>>>>>>
>>>>>>> I think it's worth mentioning once more that the stream doesn't miss
>>>>>>> any ids if we let it's running without interruptions / state restoring.
>>>>>>>
>>>>>>> What's next?
>>>>>>>
>>>>>>> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin <
>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>
>>>>>>>> Hi Juho,
>>>>>>>>
>>>>>>>> > only when the 24-hour window triggers, BucketingSink gets a burst
>>>>>>>> of input
>>>>>>>>
>>>>>>>> This is of course totally true, my understanding is the same. We
>>>>>>>> cannot exclude problem there for sure, just savepoints are used a lot 
>>>>>>>> w/o
>>>>>>>> problem reports and BucketingSink is known to be problematic with s3. 
>>>>>>>> That
>>>>>>>> is why, I asked you:
>>>>>>>>
>>>>>>>> > You also wrote that the timestamps of lost event are 'probably'
>>>>>>>> around the time of the savepoint, if it is not yet for sure I would 
>>>>>>>> also
>>>>>>>> check it.
>>>>>>>>
>>>>>>>> Although, bucketing sink might loose any data at the end of the day
>>>>>>>> (also from the middle). The fact, that it is always around the time of
>>>>>>>> taking a savepoint and not random, is surely suspicious and possible
>>>>>>>> savepoint failures need to be investigated.
>>>>>>>>
>>>>>>>> Regarding the s3 problem, s3 doc says:
>>>>>>>>
>>>>>>>> > The caveat is that if you make a HEAD or GET request to the key
>>>>>>>> name (to find if the object exists) before creating the object, Amazon 
>>>>>>>> S3
>>>>>>>> provides 'eventual consistency' for read-after-write.
>>>>>>>>
>>>>>>>> The algorithm you suggest is how it is roughly implemented now
>>>>>>>> (BucketingSink.openNewPartFile). My understanding is that
>>>>>>>> 'eventual consistency’ means that even if you just created file (its 
>>>>>>>> name
>>>>>>>> is key) it can be that you do not get it in the list or exists (HEAD)
>>>>>>>> returns false and you risk to rewrite the previous part.
>>>>>>>>
>>>>>>>> The BucketingSink was designed for a standard file system. s3 is
>>>>>>>> used over a file system wrapper atm but does not always provide normal 
>>>>>>>> file
>>>>>>>> system guarantees. See also last example in [1].
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Andrey
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82
>>>>>>>>
>>>>>>>> On 29 Aug 2018, at 12:11, Juho Autio <juho.au...@rovio.com> wrote:
>>>>>>>>
>>>>>>>> Andrey, thank you very much for the debugging suggestions, I'll try
>>>>>>>> them.
>>>>>>>>
>>>>>>>> In the meanwhile two more questions, please:
>>>>>>>>
>>>>>>>> > Just to keep in mind this problem with s3 and exclude it for
>>>>>>>> sure. I would also check whether the size of missing events is around 
>>>>>>>> the
>>>>>>>> batch size of BucketingSink or not.
>>>>>>>>
>>>>>>>> Fair enough, but I also want to focus on debugging the most
>>>>>>>> probable subject first. So what do you think about this – true or 
>>>>>>>> false:
>>>>>>>> only when the 24-hour window triggers, BucketinSink gets a burst of 
>>>>>>>> input.
>>>>>>>> Around the state restoring point (middle of the day) it doesn't get any
>>>>>>>> input, so it can't lose anything either. Isn't this true, or have I 
>>>>>>>> totally
>>>>>>>> missed how Flink works in triggering window results? I would not expect
>>>>>>>> there to be any optimization that speculatively triggers early results 
>>>>>>>> of a
>>>>>>>> regular time window to the downstream operators.
>>>>>>>>
>>>>>>>> > The old BucketingSink has in general problem with s3. Internally
>>>>>>>> BucketingSink queries s3 as a file system to list already written file
>>>>>>>> parts (batches) and determine index of the next part to start. Due to
>>>>>>>> eventual consistency of checking file existence in s3 [1], the
>>>>>>>> BucketingSink can rewrite the previously written part and basically 
>>>>>>>> loose
>>>>>>>> it.
>>>>>>>>
>>>>>>>> I was wondering, what does S3's "read-after-write consistency"
>>>>>>>> (mentioned on the page you linked) actually mean. It seems that this 
>>>>>>>> might
>>>>>>>> be possible:
>>>>>>>> - LIST keys, find current max index
>>>>>>>> - choose next index = max + 1
>>>>>>>> - HEAD next index: if it exists, keep adding + 1 until key doesn't
>>>>>>>> exist on S3
>>>>>>>>
>>>>>>>> But definitely sounds easier if a sink keeps track of files in a
>>>>>>>> way that's guaranteed to be consistent.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Juho
>>>>>>>>
>>>>>>>> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <
>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> true, StreamingFileSink does not support s3 in 1.6.0, it is
>>>>>>>>> planned for the next 1.7 release, sorry for confusion.
>>>>>>>>> The old BucketingSink has in general problem with s3.
>>>>>>>>> Internally BucketingSink queries s3 as a file system
>>>>>>>>> to list already written file parts (batches) and determine index
>>>>>>>>> of the next part to start. Due to eventual consistency of checking 
>>>>>>>>> file
>>>>>>>>> existence in s3 [1], the BucketingSink can rewrite the previously 
>>>>>>>>> written
>>>>>>>>> part and basically loose it. It should be fixed for StreamingFileSink 
>>>>>>>>> in
>>>>>>>>> 1.7 where Flink keeps its own track of written parts and does not 
>>>>>>>>> rely on
>>>>>>>>> s3 as a file system.
>>>>>>>>> I also include Kostas, he might add more details.
>>>>>>>>>
>>>>>>>>> Just to keep in mind this problem with s3 and exclude it for sure
>>>>>>>>> I would also check whether the size of missing events is around the 
>>>>>>>>> batch
>>>>>>>>> size of BucketingSink or not. You also wrote that the timestamps of 
>>>>>>>>> lost
>>>>>>>>> event are 'probably' around the time of the savepoint, if it is not 
>>>>>>>>> yet for
>>>>>>>>> sure I would also check it.
>>>>>>>>>
>>>>>>>>> Have you already checked the log files of job manager and task
>>>>>>>>> managers for the job running before and after the restore from the 
>>>>>>>>> check
>>>>>>>>> point? Is everything successful there, no errors, relevant warnings or
>>>>>>>>> exceptions?
>>>>>>>>>
>>>>>>>>> As the next step, I would suggest to log all encountered events in
>>>>>>>>> DistinctFunction.reduce if possible for production data and check 
>>>>>>>>> whether
>>>>>>>>> the missed events are eventually processed before or after the 
>>>>>>>>> savepoint.
>>>>>>>>> The following log message indicates a border between the events that 
>>>>>>>>> should
>>>>>>>>> be included into the savepoint (logged before) or not:
>>>>>>>>> “{} ({}, synchronous part) in thread {} took {} ms” (template)
>>>>>>>>> Also check if the savepoint has been overall completed:
>>>>>>>>> "{} ({}, asynchronous part) in thread {} took {} ms."
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Andrey
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html
>>>>>>>>>
>>>>>>>>> On 24 Aug 2018, at 20:41, Juho Autio <juho.au...@rovio.com> wrote:
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Using StreamingFileSink is not a convenient option for production
>>>>>>>>> use for us as it doesn't support s3*. I could use StreamingFileSink 
>>>>>>>>> just to
>>>>>>>>> verify, but I don't see much point in doing so. Please consider my 
>>>>>>>>> previous
>>>>>>>>> comment:
>>>>>>>>>
>>>>>>>>> > I realized that BucketingSink must not play any role in this
>>>>>>>>> problem. This is because only when the 24-hour window triggers,
>>>>>>>>> BucketingSink gets a burst of input. Around the state restoring point
>>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose 
>>>>>>>>> anything
>>>>>>>>> either (right?).
>>>>>>>>>
>>>>>>>>> I could also use a kafka sink instead, but I can't imagine how
>>>>>>>>> there could be any difference. It's very real that the sink doesn't 
>>>>>>>>> get any
>>>>>>>>> input for a long time until the 24-hour window closes, and then it 
>>>>>>>>> quickly
>>>>>>>>> writes out everything because it's not that much data eventually for 
>>>>>>>>> the
>>>>>>>>> distinct values.
>>>>>>>>>
>>>>>>>>> Any ideas for debugging what's happening around the savepoint &
>>>>>>>>> restoration time?
>>>>>>>>>
>>>>>>>>> *) I actually implemented StreamingFileSink as an alternative
>>>>>>>>> sink. This was before I came to realize that most likely the sink 
>>>>>>>>> component
>>>>>>>>> has nothing to do with the data loss problem. I tried it with s3n:// 
>>>>>>>>> path
>>>>>>>>> just to see an exception being thrown. In the source code I indeed 
>>>>>>>>> then
>>>>>>>>> found an explicit check for the target path scheme to be "hdfs://".
>>>>>>>>>
>>>>>>>>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <
>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>
>>>>>>>>>> Ok, I think before further debugging the window reduced state,
>>>>>>>>>> could you try the new ‘StreamingFileSink’ [1] introduced in Flink
>>>>>>>>>> 1.6.0 instead of the previous 'BucketingSink’?
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Andrey
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>>>>>>>>>>
>>>>>>>>>> On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Yes, sorry for my confusing comment. I just meant that it seems
>>>>>>>>>> like there's a bug somewhere now that the output is missing some 
>>>>>>>>>> data.
>>>>>>>>>>
>>>>>>>>>> > I would wait and check the actual output in s3 because it is
>>>>>>>>>> the main result of the job
>>>>>>>>>>
>>>>>>>>>> Yes, and that's what I have already done. There seems to be
>>>>>>>>>> always some data loss with the production data volumes, if the job 
>>>>>>>>>> has been
>>>>>>>>>> restarted on that day.
>>>>>>>>>>
>>>>>>>>>> Would you have any suggestions for how to debug this further?
>>>>>>>>>>
>>>>>>>>>> Many thanks for stepping in.
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <
>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>
>>>>>>>>>>> So it is a per key deduplication job.
>>>>>>>>>>>
>>>>>>>>>>> Yes, I would wait and check the actual output in s3 because it
>>>>>>>>>>> is the main result of the job and
>>>>>>>>>>>
>>>>>>>>>>> > The late data around the time of taking savepoint might be not
>>>>>>>>>>> included into the savepoint but it should be behind the snapshotted 
>>>>>>>>>>> offset
>>>>>>>>>>> in Kafka.
>>>>>>>>>>>
>>>>>>>>>>> is not a bug, it is a possible behaviour.
>>>>>>>>>>>
>>>>>>>>>>> The savepoint is a snapshot of the data in transient which is
>>>>>>>>>>> already consumed from Kafka.
>>>>>>>>>>> Basically the full contents of the window result is split
>>>>>>>>>>> between the savepoint and what can come after the savepoint'ed 
>>>>>>>>>>> offset in
>>>>>>>>>>> Kafka but before the window result is written into s3.
>>>>>>>>>>>
>>>>>>>>>>> Allowed lateness should not affect it, I am just saying that the
>>>>>>>>>>> final result in s3 should include all records after it.
>>>>>>>>>>> This is what should be guaranteed but not the contents of the
>>>>>>>>>>> intermediate savepoint.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Andrey
>>>>>>>>>>>
>>>>>>>>>>> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Thanks for your answer!
>>>>>>>>>>>
>>>>>>>>>>> I check for the missed data from the final output on s3. So I
>>>>>>>>>>> wait until the next day, then run the same thing re-implemented in 
>>>>>>>>>>> batch,
>>>>>>>>>>> and compare the output.
>>>>>>>>>>>
>>>>>>>>>>> > The late data around the time of taking savepoint might be not
>>>>>>>>>>> included into the savepoint but it should be behind the snapshotted 
>>>>>>>>>>> offset
>>>>>>>>>>> in Kafka.
>>>>>>>>>>>
>>>>>>>>>>> Yes, I would definitely expect that. It seems like there's a bug
>>>>>>>>>>> somewhere.
>>>>>>>>>>>
>>>>>>>>>>> > Then it should just come later after the restore and should be
>>>>>>>>>>> reduced within the allowed lateness into the final result which is 
>>>>>>>>>>> saved
>>>>>>>>>>> into s3.
>>>>>>>>>>>
>>>>>>>>>>> Well, as far as I know, allowed lateness doesn't play any role
>>>>>>>>>>> here, because I started running the job with allowedLateness=0, and 
>>>>>>>>>>> still
>>>>>>>>>>> get the data loss, while my late data output doesn't receive 
>>>>>>>>>>> anything.
>>>>>>>>>>>
>>>>>>>>>>> > Also, is this `DistinctFunction.reduce` just an example or the
>>>>>>>>>>> actual implementation, basically saving just one of records inside 
>>>>>>>>>>> the 24h
>>>>>>>>>>> window in s3? then what is missing there?
>>>>>>>>>>>
>>>>>>>>>>> Yes, it's the actual implementation. Note that there's a keyBy
>>>>>>>>>>> before the DistinctFunction. So there's one record for each key 
>>>>>>>>>>> (which is
>>>>>>>>>>> the combination of a couple of fields). In practice I've seen that 
>>>>>>>>>>> we're
>>>>>>>>>>> missing ~2000-4000 elements on each restore, and the total output is
>>>>>>>>>>> obviously much more than that.
>>>>>>>>>>>
>>>>>>>>>>> Here's the full code for the key selector:
>>>>>>>>>>>
>>>>>>>>>>> public class MapKeySelector implements
>>>>>>>>>>> KeySelector<Map<String,String>, Object> {
>>>>>>>>>>>
>>>>>>>>>>>     private final String[] fields;
>>>>>>>>>>>
>>>>>>>>>>>     public MapKeySelector(String... fields) {
>>>>>>>>>>>         this.fields = fields;
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>>     @Override
>>>>>>>>>>>     public Object getKey(Map<String, String> event) throws
>>>>>>>>>>> Exception {
>>>>>>>>>>>         Tuple key =
>>>>>>>>>>> Tuple.getTupleClass(fields.length).newInstance();
>>>>>>>>>>>         for (int i = 0; i < fields.length; i++) {
>>>>>>>>>>>             key.setField(event.getOrDefault(fields[i], ""), i);
>>>>>>>>>>>         }
>>>>>>>>>>>         return key;
>>>>>>>>>>>     }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> And a more exact example on how it's used:
>>>>>>>>>>>
>>>>>>>>>>>                 .keyBy(new MapKeySelector("ID", "PLAYER_ID",
>>>>>>>>>>> "FIELD", "KEY_NAME", "KEY_VALUE"))
>>>>>>>>>>>                 .timeWindow(Time.days(1))
>>>>>>>>>>>                 .reduce(new DistinctFunction())
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <
>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>
>>>>>>>>>>>> Where exactly does the data miss? When do you notice that?
>>>>>>>>>>>> Do you check it:
>>>>>>>>>>>> - debugging `DistinctFunction.reduce` right after resume in the
>>>>>>>>>>>> middle of the day
>>>>>>>>>>>> or
>>>>>>>>>>>> - some distinct records miss in the final output
>>>>>>>>>>>> of BucketingSink in s3 after window result is actually triggered 
>>>>>>>>>>>> and saved
>>>>>>>>>>>> into s3 at the end of the day? is this the main output?
>>>>>>>>>>>>
>>>>>>>>>>>> The late data around the time of taking savepoint might be not
>>>>>>>>>>>> included into the savepoint but it should be behind the 
>>>>>>>>>>>> snapshotted offset
>>>>>>>>>>>> in Kafka. Then it should just come later after the restore and 
>>>>>>>>>>>> should be
>>>>>>>>>>>> reduced within the allowed lateness into the final result which is 
>>>>>>>>>>>> saved
>>>>>>>>>>>> into s3.
>>>>>>>>>>>>
>>>>>>>>>>>> Also, is this `DistinctFunction.reduce` just an example or the
>>>>>>>>>>>> actual implementation, basically saving just one of records inside 
>>>>>>>>>>>> the 24h
>>>>>>>>>>>> window in s3? then what is missing there?
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Andrey
>>>>>>>>>>>>
>>>>>>>>>>>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> I changed to allowedLateness=0, no change, still missing data
>>>>>>>>>>>> when restoring from savepoint.
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <
>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I realized that BucketingSink must not play any role in this
>>>>>>>>>>>>> problem. This is because only when the 24-hour window triggers,
>>>>>>>>>>>>> BucketinSink gets a burst of input. Around the state restoring 
>>>>>>>>>>>>> point
>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose 
>>>>>>>>>>>>> anything
>>>>>>>>>>>>> either (right?).
>>>>>>>>>>>>>
>>>>>>>>>>>>> I will next try removing the allowedLateness entirely from the
>>>>>>>>>>>>> equation.
>>>>>>>>>>>>>
>>>>>>>>>>>>> In the meanwhile, please let me know if you have any
>>>>>>>>>>>>> suggestions for debugging the lost data, for example what logs to 
>>>>>>>>>>>>> enable.
>>>>>>>>>>>>>
>>>>>>>>>>>>> We use FlinkKafkaConsumer010 btw. Are there any known issues
>>>>>>>>>>>>> with that, that could contribute to lost data when restoring a 
>>>>>>>>>>>>> savepoint?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <
>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Some data is silently lost on my Flink stream job when state
>>>>>>>>>>>>>> is restored from a savepoint.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Do you have any debugging hints to find out where exactly the
>>>>>>>>>>>>>> data gets dropped?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> My job gathers distinct values using a 24-hour window. It
>>>>>>>>>>>>>> doesn't have any custom state management.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> When I cancel the job with savepoint and restore from that
>>>>>>>>>>>>>> savepoint, some data is missed. It seems to be losing just a 
>>>>>>>>>>>>>> small amount
>>>>>>>>>>>>>> of data. The event time of lost data is probably around the time 
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>> savepoint. In other words the rest of the time window is not 
>>>>>>>>>>>>>> entirely
>>>>>>>>>>>>>> missed – collection works correctly also for (most of the) 
>>>>>>>>>>>>>> events that come
>>>>>>>>>>>>>> in after restoring.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> When the job processes a full 24-hour window without
>>>>>>>>>>>>>> interruptions it doesn't miss anything.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Usually the problem doesn't happen in test environments that
>>>>>>>>>>>>>> have smaller parallelism and smaller data volumes. But in 
>>>>>>>>>>>>>> production
>>>>>>>>>>>>>> volumes the job seems to be consistently missing at least 
>>>>>>>>>>>>>> something on
>>>>>>>>>>>>>> every restore.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This issue has consistently happened since the job was
>>>>>>>>>>>>>> initially created. It was at first run on an older version of 
>>>>>>>>>>>>>> Flink
>>>>>>>>>>>>>> 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm wondering if this could be for example some
>>>>>>>>>>>>>> synchronization issue between the kafka consumer offsets vs. 
>>>>>>>>>>>>>> what's been
>>>>>>>>>>>>>> written by BucketingSink?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1. Job content, simplified
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>         kafkaStream
>>>>>>>>>>>>>>                 .flatMap(new ExtractFieldsFunction())
>>>>>>>>>>>>>>                 .keyBy(new MapKeySelector(1, 2, 3, 4))
>>>>>>>>>>>>>>                 .timeWindow(Time.days(1))
>>>>>>>>>>>>>>                 .allowedLateness(allowedLateness)
>>>>>>>>>>>>>>                 .sideOutputLateData(lateDataTag)
>>>>>>>>>>>>>>                 .reduce(new DistinctFunction())
>>>>>>>>>>>>>>                 .addSink(sink)
>>>>>>>>>>>>>>                 // use a fixed number of output partitions
>>>>>>>>>>>>>>                 .setParallelism(8))
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> /**
>>>>>>>>>>>>>>  * Usage: .keyBy("the", "distinct", "fields").reduce(new
>>>>>>>>>>>>>> DistinctFunction())
>>>>>>>>>>>>>>  */
>>>>>>>>>>>>>> public class DistinctFunction implements
>>>>>>>>>>>>>> ReduceFunction<java.util.Map<String, String>> {
>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>     public Map<String, String> reduce(Map<String, String>
>>>>>>>>>>>>>> value1, Map<String, String> value2) {
>>>>>>>>>>>>>>         return value1;
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2. State configuration
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> boolean enableIncrementalCheckpointing = true;
>>>>>>>>>>>>>> String statePath = "s3n://bucket/savepoints";
>>>>>>>>>>>>>> new RocksDBStateBackend(statePath,
>>>>>>>>>>>>>> enableIncrementalCheckpointing);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Checkpointing Mode Exactly Once
>>>>>>>>>>>>>> Interval 1m 0s
>>>>>>>>>>>>>> Timeout 10m 0s
>>>>>>>>>>>>>> Minimum Pause Between Checkpoints 1m 0s
>>>>>>>>>>>>>> Maximum Concurrent Checkpoints 1
>>>>>>>>>>>>>> Persist Checkpoints Externally Enabled (retain on
>>>>>>>>>>>>>> cancellation)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 3. BucketingSink configuration
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We use BucketingSink, I don't think there's anything special
>>>>>>>>>>>>>> here, if not the fact that we're writing to S3.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>         String outputPath = "s3://bucket/output";
>>>>>>>>>>>>>>         BucketingSink<Map<String, String>> sink = new
>>>>>>>>>>>>>> BucketingSink<Map<String, String>>(outputPath)
>>>>>>>>>>>>>>                 .setBucketer(new ProcessdateBucketer())
>>>>>>>>>>>>>>                 .setBatchSize(batchSize)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> .setInactiveBucketThreshold(inactiveBucketThreshold)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
>>>>>>>>>>>>>>         sink.setWriter(new IdJsonWriter());
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 4. Kafka & event time
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> My flink job reads the data from Kafka, using a
>>>>>>>>>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to
>>>>>>>>>>>>>> synchronize watermarks accross all kafka partitions. We also 
>>>>>>>>>>>>>> write late
>>>>>>>>>>>>>> data to side output, but nothing is written there – if it would, 
>>>>>>>>>>>>>> it could
>>>>>>>>>>>>>> explain missed data in the main output (I'm also sure that our 
>>>>>>>>>>>>>> late data
>>>>>>>>>>>>>> writing works, because we previously had some actual late data 
>>>>>>>>>>>>>> which ended
>>>>>>>>>>>>>> up there).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 5. allowedLateness
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> It may be or may not be relevant that I have also enabled
>>>>>>>>>>>>>> allowedLateness with 1 minute lateness on the 24-hour window:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> If that makes sense, I could try removing allowedLateness
>>>>>>>>>>>>>> entirely? That would be just to rule out that Flink doesn't have 
>>>>>>>>>>>>>> a bug
>>>>>>>>>>>>>> that's related to restoring state in combination with the 
>>>>>>>>>>>>>> allowedLateness
>>>>>>>>>>>>>> feature. After all, all of our data should be in a good enough 
>>>>>>>>>>>>>> order to not
>>>>>>>>>>>>>> be late, given the max out of orderness used on kafka consumer 
>>>>>>>>>>>>>> timestamp
>>>>>>>>>>>>>> extractor.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thank you in advance!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>
>>
>
>

Reply via email to