Hi Andrey,

To rule out for good any questions about sink behaviour, the job was killed
and started with an additional Kafka sink.

The same number of ids were missed in both outputs: KafkaSink &
BucketingSink.

I wonder what would be the next steps in debugging?

On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <juho.au...@rovio.com> wrote:

> Thanks, Andrey.
>
> > so it means that the savepoint does not loose at least some dropped
> records.
>
> I'm not sure what you mean by that? I mean, it was known from the
> beginning, that not everything is lost before/after restoring a savepoint,
> just some records around the time of restoration. It's not 100% clear
> whether records are lost before making a savepoint or after restoring it.
> Although, based on the new DEBUG logs it seems more like losing some
> records that are seen ~soon after restoring. It seems like Flink would be
> somehow confused either about the restored state vs. new inserts to state.
> This could also be somehow linked to the high back pressure on the kafka
> source while the stream is catching up.
>
> > If it is feasible for your setup, I suggest to insert one more map
> function after reduce and before sink.
> > etc.
>
> Isn't that the same thing that we discussed before? Nothing is sent to
> BucketingSink before the window closes, so I don't see how it would make
> any difference if we replace the BucketingSink with a map function or
> another sink type. We don't create or restore savepoints during the time
> when BucketingSink gets input or has open buckets – that happens at a much
> later time of day. I would focus on figuring out why the records are lost
> while the window is open. But I don't know how to do that. Would you have
> any additional suggestions?
>
> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin <and...@data-artisans.com>
> wrote:
>
>> Hi Juho,
>>
>> so it means that the savepoint does not loose at least some dropped
>> records.
>>
>> If it is feasible for your setup, I suggest to insert one more map
>> function after reduce and before sink.
>> The map function should be called right after window is triggered but
>> before flushing to s3.
>> The result of reduce (deduped record) could be logged there.
>> This should allow to check whether the processed distinct records were
>> buffered in the state after the restoration from the savepoint or not. If
>> they were buffered we should see that there was an attempt to write them to
>> the sink from the state.
>>
>> Another suggestion is to try to write records to some other sink or to
>> both.
>> E.g. if you can access file system of workers, maybe just into local
>> files and check whether the records are also dropped there.
>>
>> Best,
>> Andrey
>>
>> On 20 Sep 2018, at 15:37, Juho Autio <juho.au...@rovio.com> wrote:
>>
>> Hi Andrey!
>>
>> I was finally able to gather the DEBUG logs that you suggested. In short,
>> the reducer logged that it processed at least some of the ids that were
>> missing from the output.
>>
>> "At least some", because I didn't have the job running with DEBUG logs
>> for the full 24-hour window period. So I was only able to look up if I can
>> find *some* of the missing ids in the DEBUG logs. Which I did indeed.
>>
>> I changed the DistinctFunction.java to do this:
>>
>>     @Override
>>     public Map<String, String> reduce(Map<String, String> value1,
>> Map<String, String> value2) {
>>         LOG.debug("DistinctFunction.reduce returns: {}={}",
>> value1.get("field"), value1.get("id"));
>>         return value1;
>>     }
>>
>> Then:
>>
>> vi flink-1.6.0/conf/log4j.properties
>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
>>
>> Then I ran the following kind of test:
>>
>> - Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC
>> 2018
>> - Started a new cluster & job with DEBUG enabled at ~09:13, restored from
>> that previous cluster's savepoint
>> - Ran until caught up offsets
>> - Cancelled the job with a new savepoint
>> - Started a new job _without_ DEBUG, which restored the new savepoint,
>> let it keep running so that it will eventually write the output
>>
>> Then on the next day, after results had been flushed when the 24-hour
>> window closed, I compared the results again with a batch version's output.
>> And found some missing ids as usual.
>>
>> I drilled down to one specific missing id (I'm replacing the actual value
>> with AN12345 below), which was not found in the stream output, but was
>> found in batch output & flink DEBUG logs.
>>
>> Related to that id, I gathered the following information:
>>
>> 2018-09-18~09:13:21,000 job started & savepoint is restored
>>
>> 2018-09-18 09:14:29,085 missing id is processed for the first time,
>> proved by this log line:
>> 2018-09-18 09:14:29,085 DEBUG
>> com.rovio.ds.flink.uniqueid.DistinctFunction                  -
>> DistinctFunction.reduce returns: s.aid1=AN12345
>>
>> 2018-09-18 09:15:14,264 first synchronous part of checkpoint
>> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint
>>
>> (
>> more occurrences of checkpoints (~1 min checkpointing time + ~1 min delay
>> before next)
>> /
>> more occurrences of DistinctFunction.reduce
>> )
>>
>> 2018-09-18 09:23:45,053 missing id is processed for the last time
>>
>> 2018-09-18~10:20:00,000 savepoint created & job cancelled
>>
>> To be noted, there was high backpressure after restoring from savepoint
>> until the stream caught up with the kafka offsets. Although, our job uses
>> assign timestamps & watermarks on the flink kafka consumer itself, so event
>> time of all partitions is synchronized. As expected, we don't get any late
>> data in the late data side output.
>>
>> From this we can see that the missing ids are processed by the reducer,
>> but they must get lost somewhere before the 24-hour window is triggered.
>>
>> I think it's worth mentioning once more that the stream doesn't miss any
>> ids if we let it's running without interruptions / state restoring.
>>
>> What's next?
>>
>> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin <and...@data-artisans.com>
>> wrote:
>>
>>> Hi Juho,
>>>
>>> > only when the 24-hour window triggers, BucketingSink gets a burst of
>>> input
>>>
>>> This is of course totally true, my understanding is the same. We cannot
>>> exclude problem there for sure, just savepoints are used a lot w/o problem
>>> reports and BucketingSink is known to be problematic with s3. That is why,
>>> I asked you:
>>>
>>> > You also wrote that the timestamps of lost event are 'probably' around
>>> the time of the savepoint, if it is not yet for sure I would also check it.
>>>
>>> Although, bucketing sink might loose any data at the end of the day
>>> (also from the middle). The fact, that it is always around the time of
>>> taking a savepoint and not random, is surely suspicious and possible
>>> savepoint failures need to be investigated.
>>>
>>> Regarding the s3 problem, s3 doc says:
>>>
>>> > The caveat is that if you make a HEAD or GET request to the key name
>>> (to find if the object exists) before creating the object, Amazon S3
>>> provides 'eventual consistency' for read-after-write.
>>>
>>> The algorithm you suggest is how it is roughly implemented now
>>> (BucketingSink.openNewPartFile). My understanding is that
>>> 'eventual consistency’ means that even if you just created file (its name
>>> is key) it can be that you do not get it in the list or exists (HEAD)
>>> returns false and you risk to rewrite the previous part.
>>>
>>> The BucketingSink was designed for a standard file system. s3 is used
>>> over a file system wrapper atm but does not always provide normal file
>>> system guarantees. See also last example in [1].
>>>
>>> Cheers,
>>> Andrey
>>>
>>> [1]
>>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82
>>>
>>> On 29 Aug 2018, at 12:11, Juho Autio <juho.au...@rovio.com> wrote:
>>>
>>> Andrey, thank you very much for the debugging suggestions, I'll try them.
>>>
>>> In the meanwhile two more questions, please:
>>>
>>> > Just to keep in mind this problem with s3 and exclude it for sure. I
>>> would also check whether the size of missing events is around the batch
>>> size of BucketingSink or not.
>>>
>>> Fair enough, but I also want to focus on debugging the most probable
>>> subject first. So what do you think about this – true or false: only when
>>> the 24-hour window triggers, BucketinSink gets a burst of input. Around the
>>> state restoring point (middle of the day) it doesn't get any input, so it
>>> can't lose anything either. Isn't this true, or have I totally missed how
>>> Flink works in triggering window results? I would not expect there to be
>>> any optimization that speculatively triggers early results of a regular
>>> time window to the downstream operators.
>>>
>>> > The old BucketingSink has in general problem with s3. Internally
>>> BucketingSink queries s3 as a file system to list already written file
>>> parts (batches) and determine index of the next part to start. Due to
>>> eventual consistency of checking file existence in s3 [1], the
>>> BucketingSink can rewrite the previously written part and basically loose
>>> it.
>>>
>>> I was wondering, what does S3's "read-after-write consistency"
>>> (mentioned on the page you linked) actually mean. It seems that this might
>>> be possible:
>>> - LIST keys, find current max index
>>> - choose next index = max + 1
>>> - HEAD next index: if it exists, keep adding + 1 until key doesn't exist
>>> on S3
>>>
>>> But definitely sounds easier if a sink keeps track of files in a way
>>> that's guaranteed to be consistent.
>>>
>>> Cheers,
>>> Juho
>>>
>>> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <
>>> and...@data-artisans.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> true, StreamingFileSink does not support s3 in 1.6.0, it is planned for
>>>> the next 1.7 release, sorry for confusion.
>>>> The old BucketingSink has in general problem with s3.
>>>> Internally BucketingSink queries s3 as a file system
>>>> to list already written file parts (batches) and determine index of the
>>>> next part to start. Due to eventual consistency of checking file existence
>>>> in s3 [1], the BucketingSink can rewrite the previously written part and
>>>> basically loose it. It should be fixed for StreamingFileSink in 1.7 where
>>>> Flink keeps its own track of written parts and does not rely on s3 as a
>>>> file system.
>>>> I also include Kostas, he might add more details.
>>>>
>>>> Just to keep in mind this problem with s3 and exclude it for sure  I
>>>> would also check whether the size of missing events is around the batch
>>>> size of BucketingSink or not. You also wrote that the timestamps of lost
>>>> event are 'probably' around the time of the savepoint, if it is not yet for
>>>> sure I would also check it.
>>>>
>>>> Have you already checked the log files of job manager and task managers
>>>> for the job running before and after the restore from the check point? Is
>>>> everything successful there, no errors, relevant warnings or exceptions?
>>>>
>>>> As the next step, I would suggest to log all encountered events in
>>>> DistinctFunction.reduce if possible for production data and check whether
>>>> the missed events are eventually processed before or after the savepoint.
>>>> The following log message indicates a border between the events that should
>>>> be included into the savepoint (logged before) or not:
>>>> “{} ({}, synchronous part) in thread {} took {} ms” (template)
>>>> Also check if the savepoint has been overall completed:
>>>> "{} ({}, asynchronous part) in thread {} took {} ms."
>>>>
>>>> Best,
>>>> Andrey
>>>>
>>>> [1] https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html
>>>>
>>>> On 24 Aug 2018, at 20:41, Juho Autio <juho.au...@rovio.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>> Using StreamingFileSink is not a convenient option for production use
>>>> for us as it doesn't support s3*. I could use StreamingFileSink just to
>>>> verify, but I don't see much point in doing so. Please consider my previous
>>>> comment:
>>>>
>>>> > I realized that BucketingSink must not play any role in this problem.
>>>> This is because only when the 24-hour window triggers, BucketingSink gets a
>>>> burst of input. Around the state restoring point (middle of the day) it
>>>> doesn't get any input, so it can't lose anything either (right?).
>>>>
>>>> I could also use a kafka sink instead, but I can't imagine how there
>>>> could be any difference. It's very real that the sink doesn't get any input
>>>> for a long time until the 24-hour window closes, and then it quickly writes
>>>> out everything because it's not that much data eventually for the distinct
>>>> values.
>>>>
>>>> Any ideas for debugging what's happening around the savepoint &
>>>> restoration time?
>>>>
>>>> *) I actually implemented StreamingFileSink as an alternative
>>>> sink. This was before I came to realize that most likely the sink component
>>>> has nothing to do with the data loss problem. I tried it with s3n:// path
>>>> just to see an exception being thrown. In the source code I indeed then
>>>> found an explicit check for the target path scheme to be "hdfs://".
>>>>
>>>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <
>>>> and...@data-artisans.com> wrote:
>>>>
>>>>> Ok, I think before further debugging the window reduced state,
>>>>> could you try the new ‘StreamingFileSink’ [1] introduced in Flink
>>>>> 1.6.0 instead of the previous 'BucketingSink’?
>>>>>
>>>>> Cheers,
>>>>> Andrey
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>>>>>
>>>>> On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com> wrote:
>>>>>
>>>>> Yes, sorry for my confusing comment. I just meant that it seems like
>>>>> there's a bug somewhere now that the output is missing some data.
>>>>>
>>>>> > I would wait and check the actual output in s3 because it is the
>>>>> main result of the job
>>>>>
>>>>> Yes, and that's what I have already done. There seems to be always
>>>>> some data loss with the production data volumes, if the job has been
>>>>> restarted on that day.
>>>>>
>>>>> Would you have any suggestions for how to debug this further?
>>>>>
>>>>> Many thanks for stepping in.
>>>>>
>>>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <
>>>>> and...@data-artisans.com> wrote:
>>>>>
>>>>>> Hi Juho,
>>>>>>
>>>>>> So it is a per key deduplication job.
>>>>>>
>>>>>> Yes, I would wait and check the actual output in s3 because it is the
>>>>>> main result of the job and
>>>>>>
>>>>>> > The late data around the time of taking savepoint might be not
>>>>>> included into the savepoint but it should be behind the snapshotted 
>>>>>> offset
>>>>>> in Kafka.
>>>>>>
>>>>>> is not a bug, it is a possible behaviour.
>>>>>>
>>>>>> The savepoint is a snapshot of the data in transient which is already
>>>>>> consumed from Kafka.
>>>>>> Basically the full contents of the window result is split between the
>>>>>> savepoint and what can come after the savepoint'ed offset in Kafka but
>>>>>> before the window result is written into s3.
>>>>>>
>>>>>> Allowed lateness should not affect it, I am just saying that the
>>>>>> final result in s3 should include all records after it.
>>>>>> This is what should be guaranteed but not the contents of the
>>>>>> intermediate savepoint.
>>>>>>
>>>>>> Cheers,
>>>>>> Andrey
>>>>>>
>>>>>> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com> wrote:
>>>>>>
>>>>>> Thanks for your answer!
>>>>>>
>>>>>> I check for the missed data from the final output on s3. So I wait
>>>>>> until the next day, then run the same thing re-implemented in batch, and
>>>>>> compare the output.
>>>>>>
>>>>>> > The late data around the time of taking savepoint might be not
>>>>>> included into the savepoint but it should be behind the snapshotted 
>>>>>> offset
>>>>>> in Kafka.
>>>>>>
>>>>>> Yes, I would definitely expect that. It seems like there's a bug
>>>>>> somewhere.
>>>>>>
>>>>>> > Then it should just come later after the restore and should be
>>>>>> reduced within the allowed lateness into the final result which is saved
>>>>>> into s3.
>>>>>>
>>>>>> Well, as far as I know, allowed lateness doesn't play any role here,
>>>>>> because I started running the job with allowedLateness=0, and still get 
>>>>>> the
>>>>>> data loss, while my late data output doesn't receive anything.
>>>>>>
>>>>>> > Also, is this `DistinctFunction.reduce` just an example or the
>>>>>> actual implementation, basically saving just one of records inside the 
>>>>>> 24h
>>>>>> window in s3? then what is missing there?
>>>>>>
>>>>>> Yes, it's the actual implementation. Note that there's a keyBy before
>>>>>> the DistinctFunction. So there's one record for each key (which is the
>>>>>> combination of a couple of fields). In practice I've seen that we're
>>>>>> missing ~2000-4000 elements on each restore, and the total output is
>>>>>> obviously much more than that.
>>>>>>
>>>>>> Here's the full code for the key selector:
>>>>>>
>>>>>> public class MapKeySelector implements
>>>>>> KeySelector<Map<String,String>, Object> {
>>>>>>
>>>>>>     private final String[] fields;
>>>>>>
>>>>>>     public MapKeySelector(String... fields) {
>>>>>>         this.fields = fields;
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public Object getKey(Map<String, String> event) throws Exception {
>>>>>>         Tuple key = Tuple.getTupleClass(fields.length).newInstance();
>>>>>>         for (int i = 0; i < fields.length; i++) {
>>>>>>             key.setField(event.getOrDefault(fields[i], ""), i);
>>>>>>         }
>>>>>>         return key;
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> And a more exact example on how it's used:
>>>>>>
>>>>>>                 .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD",
>>>>>> "KEY_NAME", "KEY_VALUE"))
>>>>>>                 .timeWindow(Time.days(1))
>>>>>>                 .reduce(new DistinctFunction())
>>>>>>
>>>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <
>>>>>> and...@data-artisans.com> wrote:
>>>>>>
>>>>>>> Hi Juho,
>>>>>>>
>>>>>>> Where exactly does the data miss? When do you notice that?
>>>>>>> Do you check it:
>>>>>>> - debugging `DistinctFunction.reduce` right after resume in the
>>>>>>> middle of the day
>>>>>>> or
>>>>>>> - some distinct records miss in the final output of BucketingSink in
>>>>>>> s3 after window result is actually triggered and saved into s3 at the 
>>>>>>> end
>>>>>>> of the day? is this the main output?
>>>>>>>
>>>>>>> The late data around the time of taking savepoint might be not
>>>>>>> included into the savepoint but it should be behind the snapshotted 
>>>>>>> offset
>>>>>>> in Kafka. Then it should just come later after the restore and should be
>>>>>>> reduced within the allowed lateness into the final result which is saved
>>>>>>> into s3.
>>>>>>>
>>>>>>> Also, is this `DistinctFunction.reduce` just an example or the
>>>>>>> actual implementation, basically saving just one of records inside the 
>>>>>>> 24h
>>>>>>> window in s3? then what is missing there?
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Andrey
>>>>>>>
>>>>>>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com> wrote:
>>>>>>>
>>>>>>> I changed to allowedLateness=0, no change, still missing data when
>>>>>>> restoring from savepoint.
>>>>>>>
>>>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <juho.au...@rovio.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I realized that BucketingSink must not play any role in this
>>>>>>>> problem. This is because only when the 24-hour window triggers,
>>>>>>>> BucketinSink gets a burst of input. Around the state restoring point
>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose anything
>>>>>>>> either (right?).
>>>>>>>>
>>>>>>>> I will next try removing the allowedLateness entirely from the
>>>>>>>> equation.
>>>>>>>>
>>>>>>>> In the meanwhile, please let me know if you have any suggestions
>>>>>>>> for debugging the lost data, for example what logs to enable.
>>>>>>>>
>>>>>>>> We use FlinkKafkaConsumer010 btw. Are there any known issues with
>>>>>>>> that, that could contribute to lost data when restoring a savepoint?
>>>>>>>>
>>>>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <juho.au...@rovio.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Some data is silently lost on my Flink stream job when state is
>>>>>>>>> restored from a savepoint.
>>>>>>>>>
>>>>>>>>> Do you have any debugging hints to find out where exactly the data
>>>>>>>>> gets dropped?
>>>>>>>>>
>>>>>>>>> My job gathers distinct values using a 24-hour window. It doesn't
>>>>>>>>> have any custom state management.
>>>>>>>>>
>>>>>>>>> When I cancel the job with savepoint and restore from that
>>>>>>>>> savepoint, some data is missed. It seems to be losing just a small 
>>>>>>>>> amount
>>>>>>>>> of data. The event time of lost data is probably around the time of
>>>>>>>>> savepoint. In other words the rest of the time window is not entirely
>>>>>>>>> missed – collection works correctly also for (most of the) events 
>>>>>>>>> that come
>>>>>>>>> in after restoring.
>>>>>>>>>
>>>>>>>>> When the job processes a full 24-hour window without interruptions
>>>>>>>>> it doesn't miss anything.
>>>>>>>>>
>>>>>>>>> Usually the problem doesn't happen in test environments that have
>>>>>>>>> smaller parallelism and smaller data volumes. But in production 
>>>>>>>>> volumes the
>>>>>>>>> job seems to be consistently missing at least something on every 
>>>>>>>>> restore.
>>>>>>>>>
>>>>>>>>> This issue has consistently happened since the job was initially
>>>>>>>>> created. It was at first run on an older version of Flink 
>>>>>>>>> 1.5-SNAPSHOT and
>>>>>>>>> it still happens on both Flink 1.5.2 & 1.6.0.
>>>>>>>>>
>>>>>>>>> I'm wondering if this could be for example some synchronization
>>>>>>>>> issue between the kafka consumer offsets vs. what's been written by
>>>>>>>>> BucketingSink?
>>>>>>>>>
>>>>>>>>> 1. Job content, simplified
>>>>>>>>>
>>>>>>>>>         kafkaStream
>>>>>>>>>                 .flatMap(new ExtractFieldsFunction())
>>>>>>>>>                 .keyBy(new MapKeySelector(1, 2, 3, 4))
>>>>>>>>>                 .timeWindow(Time.days(1))
>>>>>>>>>                 .allowedLateness(allowedLateness)
>>>>>>>>>                 .sideOutputLateData(lateDataTag)
>>>>>>>>>                 .reduce(new DistinctFunction())
>>>>>>>>>                 .addSink(sink)
>>>>>>>>>                 // use a fixed number of output partitions
>>>>>>>>>                 .setParallelism(8))
>>>>>>>>>
>>>>>>>>> /**
>>>>>>>>>  * Usage: .keyBy("the", "distinct", "fields").reduce(new
>>>>>>>>> DistinctFunction())
>>>>>>>>>  */
>>>>>>>>> public class DistinctFunction implements
>>>>>>>>> ReduceFunction<java.util.Map<String, String>> {
>>>>>>>>>     @Override
>>>>>>>>>     public Map<String, String> reduce(Map<String, String> value1,
>>>>>>>>> Map<String, String> value2) {
>>>>>>>>>         return value1;
>>>>>>>>>     }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> 2. State configuration
>>>>>>>>>
>>>>>>>>> boolean enableIncrementalCheckpointing = true;
>>>>>>>>> String statePath = "s3n://bucket/savepoints";
>>>>>>>>> new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);
>>>>>>>>>
>>>>>>>>> Checkpointing Mode Exactly Once
>>>>>>>>> Interval 1m 0s
>>>>>>>>> Timeout 10m 0s
>>>>>>>>> Minimum Pause Between Checkpoints 1m 0s
>>>>>>>>> Maximum Concurrent Checkpoints 1
>>>>>>>>> Persist Checkpoints Externally Enabled (retain on cancellation)
>>>>>>>>>
>>>>>>>>> 3. BucketingSink configuration
>>>>>>>>>
>>>>>>>>> We use BucketingSink, I don't think there's anything special here,
>>>>>>>>> if not the fact that we're writing to S3.
>>>>>>>>>
>>>>>>>>>         String outputPath = "s3://bucket/output";
>>>>>>>>>         BucketingSink<Map<String, String>> sink = new
>>>>>>>>> BucketingSink<Map<String, String>>(outputPath)
>>>>>>>>>                 .setBucketer(new ProcessdateBucketer())
>>>>>>>>>                 .setBatchSize(batchSize)
>>>>>>>>>
>>>>>>>>> .setInactiveBucketThreshold(inactiveBucketThreshold)
>>>>>>>>>
>>>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
>>>>>>>>>         sink.setWriter(new IdJsonWriter());
>>>>>>>>>
>>>>>>>>> 4. Kafka & event time
>>>>>>>>>
>>>>>>>>> My flink job reads the data from Kafka, using a
>>>>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to
>>>>>>>>> synchronize watermarks accross all kafka partitions. We also write 
>>>>>>>>> late
>>>>>>>>> data to side output, but nothing is written there – if it would, it 
>>>>>>>>> could
>>>>>>>>> explain missed data in the main output (I'm also sure that our late 
>>>>>>>>> data
>>>>>>>>> writing works, because we previously had some actual late data which 
>>>>>>>>> ended
>>>>>>>>> up there).
>>>>>>>>>
>>>>>>>>> 5. allowedLateness
>>>>>>>>>
>>>>>>>>> It may be or may not be relevant that I have also enabled
>>>>>>>>> allowedLateness with 1 minute lateness on the 24-hour window:
>>>>>>>>>
>>>>>>>>> If that makes sense, I could try removing allowedLateness
>>>>>>>>> entirely? That would be just to rule out that Flink doesn't have a bug
>>>>>>>>> that's related to restoring state in combination with the 
>>>>>>>>> allowedLateness
>>>>>>>>> feature. After all, all of our data should be in a good enough order 
>>>>>>>>> to not
>>>>>>>>> be late, given the max out of orderness used on kafka consumer 
>>>>>>>>> timestamp
>>>>>>>>> extractor.
>>>>>>>>>
>>>>>>>>> Thank you in advance!
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>

Reply via email to