Hi Juho,

can you try to reduce the job to minimal reproducible example and share the job 
and input?

For example:
- some simple records as input, e.g. tuples of primitive types saved as cvs
- minimal deduplication job which processes them and misses records
- check if it happens for shorter windows, like 1h etc
- setup which you use for the job, ideally locally reproducible or cloud

Best,
Andrey

> On 4 Oct 2018, at 11:13, Juho Autio <juho.au...@rovio.com> wrote:
> 
> Sorry to insist, but we seem to be blocked for any serious usage of state in 
> Flink if we can't rely on it to not miss data in case of restore.
> 
> Would anyone have suggestions for how to troubleshoot this? So far I have 
> verified with DEBUG logs that our reduce function gets to process also the 
> data that is missing from window output.
> 
> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <juho.au...@rovio.com 
> <mailto:juho.au...@rovio.com>> wrote:
> Hi Andrey,
> 
> To rule out for good any questions about sink behaviour, the job was killed 
> and started with an additional Kafka sink.
> 
> The same number of ids were missed in both outputs: KafkaSink & BucketingSink.
> 
> I wonder what would be the next steps in debugging?
> 
> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <juho.au...@rovio.com 
> <mailto:juho.au...@rovio.com>> wrote:
> Thanks, Andrey.
> 
> > so it means that the savepoint does not loose at least some dropped records.
> 
> I'm not sure what you mean by that? I mean, it was known from the beginning, 
> that not everything is lost before/after restoring a savepoint, just some 
> records around the time of restoration. It's not 100% clear whether records 
> are lost before making a savepoint or after restoring it. Although, based on 
> the new DEBUG logs it seems more like losing some records that are seen ~soon 
> after restoring. It seems like Flink would be somehow confused either about 
> the restored state vs. new inserts to state. This could also be somehow 
> linked to the high back pressure on the kafka source while the stream is 
> catching up.
> 
> > If it is feasible for your setup, I suggest to insert one more map function 
> > after reduce and before sink.
> > etc.
> 
> Isn't that the same thing that we discussed before? Nothing is sent to 
> BucketingSink before the window closes, so I don't see how it would make any 
> difference if we replace the BucketingSink with a map function or another 
> sink type. We don't create or restore savepoints during the time when 
> BucketingSink gets input or has open buckets – that happens at a much later 
> time of day. I would focus on figuring out why the records are lost while the 
> window is open. But I don't know how to do that. Would you have any 
> additional suggestions?
> 
> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin <and...@data-artisans.com 
> <mailto:and...@data-artisans.com>> wrote:
> Hi Juho,
> 
> so it means that the savepoint does not loose at least some dropped records.
> 
> If it is feasible for your setup, I suggest to insert one more map function 
> after reduce and before sink. 
> The map function should be called right after window is triggered but before 
> flushing to s3.
> The result of reduce (deduped record) could be logged there.
> This should allow to check whether the processed distinct records were 
> buffered in the state after the restoration from the savepoint or not. If 
> they were buffered we should see that there was an attempt to write them to 
> the sink from the state.
> 
> Another suggestion is to try to write records to some other sink or to both. 
> E.g. if you can access file system of workers, maybe just into local files 
> and check whether the records are also dropped there.
> 
> Best,
> Andrey
> 
>> On 20 Sep 2018, at 15:37, Juho Autio <juho.au...@rovio.com 
>> <mailto:juho.au...@rovio.com>> wrote:
>> 
>> Hi Andrey!
>> 
>> I was finally able to gather the DEBUG logs that you suggested. In short, 
>> the reducer logged that it processed at least some of the ids that were 
>> missing from the output.
>> 
>> "At least some", because I didn't have the job running with DEBUG logs for 
>> the full 24-hour window period. So I was only able to look up if I can find 
>> some of the missing ids in the DEBUG logs. Which I did indeed.
>> 
>> I changed the DistinctFunction.java to do this:
>> 
>>     @Override
>>     public Map<String, String> reduce(Map<String, String> value1, 
>> Map<String, String> value2) {
>>         LOG.debug("DistinctFunction.reduce returns: {}={}", 
>> value1.get("field"), value1.get("id"));
>>         return value1;
>>     }
>> 
>> Then:
>> 
>> vi flink-1.6.0/conf/log4j.properties
>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
>> 
>> Then I ran the following kind of test:
>> 
>> - Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC 2018
>> - Started a new cluster & job with DEBUG enabled at ~09:13, restored from 
>> that previous cluster's savepoint
>> - Ran until caught up offsets
>> - Cancelled the job with a new savepoint
>> - Started a new job _without_ DEBUG, which restored the new savepoint, let 
>> it keep running so that it will eventually write the output
>> 
>> Then on the next day, after results had been flushed when the 24-hour window 
>> closed, I compared the results again with a batch version's output. And 
>> found some missing ids as usual.
>> 
>> I drilled down to one specific missing id (I'm replacing the actual value 
>> with AN12345 below), which was not found in the stream output, but was found 
>> in batch output & flink DEBUG logs.
>> 
>> Related to that id, I gathered the following information:
>> 
>> 2018-09-18~09:13:21,000 job started & savepoint is restored
>> 
>> 2018-09-18 09:14:29,085 missing id is processed for the first time, proved 
>> by this log line:
>> 2018-09-18 09:14:29,085 DEBUG com.rovio.ds.flink.uniqueid.DistinctFunction   
>>                - DistinctFunction.reduce returns: s.aid1=AN12345
>> 
>> 2018-09-18 09:15:14,264 first synchronous part of checkpoint
>> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint
>> 
>> (
>>      more occurrences of checkpoints (~1 min checkpointing time + ~1 min 
>> delay before next)
>>      /
>>      more occurrences of DistinctFunction.reduce
>> )
>> 
>> 2018-09-18 09:23:45,053 missing id is processed for the last time
>> 
>> 2018-09-18~10:20:00,000 savepoint created & job cancelled
>> 
>> To be noted, there was high backpressure after restoring from savepoint 
>> until the stream caught up with the kafka offsets. Although, our job uses 
>> assign timestamps & watermarks on the flink kafka consumer itself, so event 
>> time of all partitions is synchronized. As expected, we don't get any late 
>> data in the late data side output.
>> 
>> From this we can see that the missing ids are processed by the reducer, but 
>> they must get lost somewhere before the 24-hour window is triggered.
>> 
>> I think it's worth mentioning once more that the stream doesn't miss any ids 
>> if we let it's running without interruptions / state restoring.
>> 
>> What's next?
>> 
>> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin <and...@data-artisans.com 
>> <mailto:and...@data-artisans.com>> wrote:
>> Hi Juho,
>> 
>> > only when the 24-hour window triggers, BucketingSink gets a burst of input
>> 
>> This is of course totally true, my understanding is the same. We cannot 
>> exclude problem there for sure, just savepoints are used a lot w/o problem 
>> reports and BucketingSink is known to be problematic with s3. That is why, I 
>> asked you:
>> 
>> > You also wrote that the timestamps of lost event are 'probably' around the 
>> > time of the savepoint, if it is not yet for sure I would also check it.
>> 
>> Although, bucketing sink might loose any data at the end of the day (also 
>> from the middle). The fact, that it is always around the time of taking a 
>> savepoint and not random, is surely suspicious and possible savepoint 
>> failures need to be investigated.
>> 
>> Regarding the s3 problem, s3 doc says:
>> 
>> > The caveat is that if you make a HEAD or GET request to the key name (to 
>> > find if the object exists) before creating the object, Amazon S3 provides 
>> > 'eventual consistency' for read-after-write.
>> 
>> The algorithm you suggest is how it is roughly implemented now 
>> (BucketingSink.openNewPartFile). My understanding is that 'eventual 
>> consistency’ means that even if you just created file (its name is key) it 
>> can be that you do not get it in the list or exists (HEAD) returns false and 
>> you risk to rewrite the previous part.
>> 
>> The BucketingSink was designed for a standard file system. s3 is used over a 
>> file system wrapper atm but does not always provide normal file system 
>> guarantees. See also last example in [1].
>> 
>> Cheers,
>> Andrey
>> 
>> [1] 
>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82
>>  
>> <https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82>
>> 
>>> On 29 Aug 2018, at 12:11, Juho Autio <juho.au...@rovio.com 
>>> <mailto:juho.au...@rovio.com>> wrote:
>>> 
>>> Andrey, thank you very much for the debugging suggestions, I'll try them.
>>> 
>>> In the meanwhile two more questions, please:
>>> 
>>> > Just to keep in mind this problem with s3 and exclude it for sure. I 
>>> > would also check whether the size of missing events is around the batch 
>>> > size of BucketingSink or not.
>>> 
>>> Fair enough, but I also want to focus on debugging the most probable 
>>> subject first. So what do you think about this – true or false: only when 
>>> the 24-hour window triggers, BucketinSink gets a burst of input. Around the 
>>> state restoring point (middle of the day) it doesn't get any input, so it 
>>> can't lose anything either. Isn't this true, or have I totally missed how 
>>> Flink works in triggering window results? I would not expect there to be 
>>> any optimization that speculatively triggers early results of a regular 
>>> time window to the downstream operators.
>>> 
>>> > The old BucketingSink has in general problem with s3. Internally 
>>> > BucketingSink queries s3 as a file system to list already written file 
>>> > parts (batches) and determine index of the next part to start. Due to 
>>> > eventual consistency of checking file existence in s3 [1], the 
>>> > BucketingSink can rewrite the previously written part and basically loose 
>>> > it.
>>> 
>>> I was wondering, what does S3's "read-after-write consistency" (mentioned 
>>> on the page you linked) actually mean. It seems that this might be possible:
>>> - LIST keys, find current max index
>>> - choose next index = max + 1
>>> - HEAD next index: if it exists, keep adding + 1 until key doesn't exist on 
>>> S3
>>> 
>>> But definitely sounds easier if a sink keeps track of files in a way that's 
>>> guaranteed to be consistent.
>>> 
>>> Cheers,
>>> Juho
>>> 
>>> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <and...@data-artisans.com 
>>> <mailto:and...@data-artisans.com>> wrote:
>>> Hi,
>>> 
>>> true, StreamingFileSink does not support s3 in 1.6.0, it is planned for the 
>>> next 1.7 release, sorry for confusion.
>>> The old BucketingSink has in general problem with s3. Internally 
>>> BucketingSink queries s3 as a file system 
>>> to list already written file parts (batches) and determine index of the 
>>> next part to start. Due to eventual consistency of checking file existence 
>>> in s3 [1], the BucketingSink can rewrite the previously written part and 
>>> basically loose it. It should be fixed for StreamingFileSink in 1.7 where 
>>> Flink keeps its own track of written parts and does not rely on s3 as a 
>>> file system. 
>>> I also include Kostas, he might add more details. 
>>> 
>>> Just to keep in mind this problem with s3 and exclude it for sure  I would 
>>> also check whether the size of missing events is around the batch size of 
>>> BucketingSink or not. You also wrote that the timestamps of lost event are 
>>> 'probably' around the time of the savepoint, if it is not yet for sure I 
>>> would also check it.
>>> 
>>> Have you already checked the log files of job manager and task managers for 
>>> the job running before and after the restore from the check point? Is 
>>> everything successful there, no errors, relevant warnings or exceptions?
>>> 
>>> As the next step, I would suggest to log all encountered events in 
>>> DistinctFunction.reduce if possible for production data and check whether 
>>> the missed events are eventually processed before or after the savepoint. 
>>> The following log message indicates a border between the events that should 
>>> be included into the savepoint (logged before) or not:
>>> “{} ({}, synchronous part) in thread {} took {} ms” (template)
>>> Also check if the savepoint has been overall completed:
>>> "{} ({}, asynchronous part) in thread {} took {} ms."
>>> 
>>> Best,
>>> Andrey
>>> 
>>> [1] https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html 
>>> <https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html>
>>> 
>>>> On 24 Aug 2018, at 20:41, Juho Autio <juho.au...@rovio.com 
>>>> <mailto:juho.au...@rovio.com>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> Using StreamingFileSink is not a convenient option for production use for 
>>>> us as it doesn't support s3*. I could use StreamingFileSink just to 
>>>> verify, but I don't see much point in doing so. Please consider my 
>>>> previous comment:
>>>> 
>>>> > I realized that BucketingSink must not play any role in this problem. 
>>>> > This is because only when the 24-hour window triggers, BucketingSink 
>>>> > gets a burst of input. Around the state restoring point (middle of the 
>>>> > day) it doesn't get any input, so it can't lose anything either (right?).
>>>> 
>>>> I could also use a kafka sink instead, but I can't imagine how there could 
>>>> be any difference. It's very real that the sink doesn't get any input for 
>>>> a long time until the 24-hour window closes, and then it quickly writes 
>>>> out everything because it's not that much data eventually for the distinct 
>>>> values.
>>>> 
>>>> Any ideas for debugging what's happening around the savepoint & 
>>>> restoration time?
>>>> 
>>>> *) I actually implemented StreamingFileSink as an alternative sink. This 
>>>> was before I came to realize that most likely the sink component has 
>>>> nothing to do with the data loss problem. I tried it with s3n:// path just 
>>>> to see an exception being thrown. In the source code I indeed then found 
>>>> an explicit check for the target path scheme to be "hdfs://". 
>>>> 
>>>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <and...@data-artisans.com 
>>>> <mailto:and...@data-artisans.com>> wrote:
>>>> Ok, I think before further debugging the window reduced state, 
>>>> could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 
>>>> instead of the previous 'BucketingSink’?
>>>> 
>>>> Cheers,
>>>> Andrey
>>>> 
>>>> [1] 
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>>>>  
>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html>
>>>> 
>>>>> On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com 
>>>>> <mailto:juho.au...@rovio.com>> wrote:
>>>>> 
>>>>> Yes, sorry for my confusing comment. I just meant that it seems like 
>>>>> there's a bug somewhere now that the output is missing some data.
>>>>> 
>>>>> > I would wait and check the actual output in s3 because it is the main 
>>>>> > result of the job
>>>>> 
>>>>> Yes, and that's what I have already done. There seems to be always some 
>>>>> data loss with the production data volumes, if the job has been restarted 
>>>>> on that day.
>>>>> 
>>>>> Would you have any suggestions for how to debug this further?
>>>>> 
>>>>> Many thanks for stepping in.
>>>>> 
>>>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <and...@data-artisans.com 
>>>>> <mailto:and...@data-artisans.com>> wrote:
>>>>> Hi Juho,
>>>>> 
>>>>> So it is a per key deduplication job.
>>>>> 
>>>>> Yes, I would wait and check the actual output in s3 because it is the 
>>>>> main result of the job and
>>>>> 
>>>>> > The late data around the time of taking savepoint might be not included 
>>>>> > into the savepoint but it should be behind the snapshotted offset in 
>>>>> > Kafka.
>>>>> 
>>>>> is not a bug, it is a possible behaviour.
>>>>> 
>>>>> The savepoint is a snapshot of the data in transient which is already 
>>>>> consumed from Kafka.
>>>>> Basically the full contents of the window result is split between the 
>>>>> savepoint and what can come after the savepoint'ed offset in Kafka but 
>>>>> before the window result is written into s3. 
>>>>> 
>>>>> Allowed lateness should not affect it, I am just saying that the final 
>>>>> result in s3 should include all records after it. 
>>>>> This is what should be guaranteed but not the contents of the 
>>>>> intermediate savepoint.
>>>>> 
>>>>> Cheers,
>>>>> Andrey
>>>>> 
>>>>>> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com 
>>>>>> <mailto:juho.au...@rovio.com>> wrote:
>>>>>> 
>>>>>> Thanks for your answer!
>>>>>> 
>>>>>> I check for the missed data from the final output on s3. So I wait until 
>>>>>> the next day, then run the same thing re-implemented in batch, and 
>>>>>> compare the output.
>>>>>> 
>>>>>> > The late data around the time of taking savepoint might be not 
>>>>>> > included into the savepoint but it should be behind the snapshotted 
>>>>>> > offset in Kafka.
>>>>>> 
>>>>>> Yes, I would definitely expect that. It seems like there's a bug 
>>>>>> somewhere.
>>>>>> 
>>>>>> > Then it should just come later after the restore and should be reduced 
>>>>>> > within the allowed lateness into the final result which is saved into 
>>>>>> > s3.
>>>>>> 
>>>>>> Well, as far as I know, allowed lateness doesn't play any role here, 
>>>>>> because I started running the job with allowedLateness=0, and still get 
>>>>>> the data loss, while my late data output doesn't receive anything.
>>>>>> 
>>>>>> > Also, is this `DistinctFunction.reduce` just an example or the actual 
>>>>>> > implementation, basically saving just one of records inside the 24h 
>>>>>> > window in s3? then what is missing there?
>>>>>> 
>>>>>> Yes, it's the actual implementation. Note that there's a keyBy before 
>>>>>> the DistinctFunction. So there's one record for each key (which is the 
>>>>>> combination of a couple of fields). In practice I've seen that we're 
>>>>>> missing ~2000-4000 elements on each restore, and the total output is 
>>>>>> obviously much more than that.
>>>>>> 
>>>>>> Here's the full code for the key selector:
>>>>>> 
>>>>>> public class MapKeySelector implements KeySelector<Map<String,String>, 
>>>>>> Object> {
>>>>>> 
>>>>>>     private final String[] fields;
>>>>>> 
>>>>>>     public MapKeySelector(String... fields) {
>>>>>>         this.fields = fields;
>>>>>>     }
>>>>>> 
>>>>>>     @Override
>>>>>>     public Object getKey(Map<String, String> event) throws Exception {
>>>>>>         Tuple key = Tuple.getTupleClass(fields.length).newInstance();
>>>>>>         for (int i = 0; i < fields.length; i++) {
>>>>>>             key.setField(event.getOrDefault(fields[i], ""), i);
>>>>>>         }
>>>>>>         return key;
>>>>>>     }
>>>>>> }
>>>>>> 
>>>>>> And a more exact example on how it's used:
>>>>>> 
>>>>>>                 .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", 
>>>>>> "KEY_NAME", "KEY_VALUE"))
>>>>>>                 .timeWindow(Time.days(1))
>>>>>>                 .reduce(new DistinctFunction())
>>>>>> 
>>>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin 
>>>>>> <and...@data-artisans.com <mailto:and...@data-artisans.com>> wrote:
>>>>>> Hi Juho,
>>>>>> 
>>>>>> Where exactly does the data miss? When do you notice that? 
>>>>>> Do you check it:
>>>>>> - debugging `DistinctFunction.reduce` right after resume in the middle 
>>>>>> of the day 
>>>>>> or 
>>>>>> - some distinct records miss in the final output of BucketingSink in s3 
>>>>>> after window result is actually triggered and saved into s3 at the end 
>>>>>> of the day? is this the main output?
>>>>>> 
>>>>>> The late data around the time of taking savepoint might be not included 
>>>>>> into the savepoint but it should be behind the snapshotted offset in 
>>>>>> Kafka. Then it should just come later after the restore and should be 
>>>>>> reduced within the allowed lateness into the final result which is saved 
>>>>>> into s3.
>>>>>> 
>>>>>> Also, is this `DistinctFunction.reduce` just an example or the actual 
>>>>>> implementation, basically saving just one of records inside the 24h 
>>>>>> window in s3? then what is missing there?
>>>>>> 
>>>>>> Cheers,
>>>>>> Andrey
>>>>>> 
>>>>>>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com 
>>>>>>> <mailto:juho.au...@rovio.com>> wrote:
>>>>>>> 
>>>>>>> I changed to allowedLateness=0, no change, still missing data when 
>>>>>>> restoring from savepoint.
>>>>>>> 
>>>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <juho.au...@rovio.com 
>>>>>>> <mailto:juho.au...@rovio.com>> wrote:
>>>>>>> I realized that BucketingSink must not play any role in this problem. 
>>>>>>> This is because only when the 24-hour window triggers, BucketinSink 
>>>>>>> gets a burst of input. Around the state restoring point (middle of the 
>>>>>>> day) it doesn't get any input, so it can't lose anything either 
>>>>>>> (right?).
>>>>>>> 
>>>>>>> I will next try removing the allowedLateness entirely from the equation.
>>>>>>> 
>>>>>>> In the meanwhile, please let me know if you have any suggestions for 
>>>>>>> debugging the lost data, for example what logs to enable.
>>>>>>> 
>>>>>>> We use FlinkKafkaConsumer010 btw. Are there any known issues with that, 
>>>>>>> that could contribute to lost data when restoring a savepoint?
>>>>>>> 
>>>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <juho.au...@rovio.com 
>>>>>>> <mailto:juho.au...@rovio.com>> wrote:
>>>>>>> Some data is silently lost on my Flink stream job when state is 
>>>>>>> restored from a savepoint.
>>>>>>> 
>>>>>>> Do you have any debugging hints to find out where exactly the data gets 
>>>>>>> dropped?
>>>>>>> 
>>>>>>> My job gathers distinct values using a 24-hour window. It doesn't have 
>>>>>>> any custom state management.
>>>>>>> 
>>>>>>> When I cancel the job with savepoint and restore from that savepoint, 
>>>>>>> some data is missed. It seems to be losing just a small amount of data. 
>>>>>>> The event time of lost data is probably around the time of savepoint. 
>>>>>>> In other words the rest of the time window is not entirely missed – 
>>>>>>> collection works correctly also for (most of the) events that come in 
>>>>>>> after restoring.
>>>>>>> 
>>>>>>> When the job processes a full 24-hour window without interruptions it 
>>>>>>> doesn't miss anything.
>>>>>>> 
>>>>>>> Usually the problem doesn't happen in test environments that have 
>>>>>>> smaller parallelism and smaller data volumes. But in production volumes 
>>>>>>> the job seems to be consistently missing at least something on every 
>>>>>>> restore.
>>>>>>> 
>>>>>>> This issue has consistently happened since the job was initially 
>>>>>>> created. It was at first run on an older version of Flink 1.5-SNAPSHOT 
>>>>>>> and it still happens on both Flink 1.5.2 & 1.6.0.
>>>>>>> 
>>>>>>> I'm wondering if this could be for example some synchronization issue 
>>>>>>> between the kafka consumer offsets vs. what's been written by 
>>>>>>> BucketingSink?
>>>>>>> 
>>>>>>> 1. Job content, simplified
>>>>>>> 
>>>>>>>         kafkaStream
>>>>>>>                 .flatMap(new ExtractFieldsFunction())
>>>>>>>                 .keyBy(new MapKeySelector(1, 2, 3, 4))
>>>>>>>                 .timeWindow(Time.days(1))
>>>>>>>                 .allowedLateness(allowedLateness)
>>>>>>>                 .sideOutputLateData(lateDataTag)
>>>>>>>                 .reduce(new DistinctFunction())
>>>>>>>                 .addSink(sink)
>>>>>>>                 // use a fixed number of output partitions
>>>>>>>                 .setParallelism(8))
>>>>>>> 
>>>>>>> /**
>>>>>>>  * Usage: .keyBy("the", "distinct", "fields").reduce(new 
>>>>>>> DistinctFunction())
>>>>>>>  */
>>>>>>> public class DistinctFunction implements 
>>>>>>> ReduceFunction<java.util.Map<String, String>> {
>>>>>>>     @Override
>>>>>>>     public Map<String, String> reduce(Map<String, String> value1, 
>>>>>>> Map<String, String> value2) {
>>>>>>>         return value1;
>>>>>>>     }
>>>>>>> }
>>>>>>> 
>>>>>>> 2. State configuration
>>>>>>> 
>>>>>>> boolean enableIncrementalCheckpointing = true;
>>>>>>> String statePath = "s3n://bucket/savepoints <>";
>>>>>>> new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);
>>>>>>> 
>>>>>>> Checkpointing Mode      Exactly Once
>>>>>>> Interval        1m 0s
>>>>>>> Timeout 10m 0s
>>>>>>> Minimum Pause Between Checkpoints       1m 0s
>>>>>>> Maximum Concurrent Checkpoints  1
>>>>>>> Persist Checkpoints Externally  Enabled (retain on cancellation)
>>>>>>> 
>>>>>>> 3. BucketingSink configuration
>>>>>>> 
>>>>>>> We use BucketingSink, I don't think there's anything special here, if 
>>>>>>> not the fact that we're writing to S3.
>>>>>>> 
>>>>>>>         String outputPath = "s3://bucket/output <>";
>>>>>>>         BucketingSink<Map<String, String>> sink = new 
>>>>>>> BucketingSink<Map<String, String>>(outputPath)
>>>>>>>                 .setBucketer(new ProcessdateBucketer())
>>>>>>>                 .setBatchSize(batchSize)
>>>>>>>                 .setInactiveBucketThreshold(inactiveBucketThreshold)
>>>>>>>                 
>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
>>>>>>>         sink.setWriter(new IdJsonWriter());
>>>>>>> 
>>>>>>> 4. Kafka & event time
>>>>>>> 
>>>>>>> My flink job reads the data from Kafka, using a 
>>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to 
>>>>>>> synchronize watermarks accross all kafka partitions. We also write late 
>>>>>>> data to side output, but nothing is written there – if it would, it 
>>>>>>> could explain missed data in the main output (I'm also sure that our 
>>>>>>> late data writing works, because we previously had some actual late 
>>>>>>> data which ended up there).
>>>>>>> 
>>>>>>> 5. allowedLateness
>>>>>>> 
>>>>>>> It may be or may not be relevant that I have also enabled 
>>>>>>> allowedLateness with 1 minute lateness on the 24-hour window:
>>>>>>> 
>>>>>>> If that makes sense, I could try removing allowedLateness entirely? 
>>>>>>> That would be just to rule out that Flink doesn't have a bug that's 
>>>>>>> related to restoring state in combination with the allowedLateness 
>>>>>>> feature. After all, all of our data should be in a good enough order to 
>>>>>>> not be late, given the max out of orderness used on kafka consumer 
>>>>>>> timestamp extractor.
>>>>>>> 
>>>>>>> Thank you in advance!
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 
> 
> 

Reply via email to