Hi Juho, can you try to reduce the job to minimal reproducible example and share the job and input?
For example: - some simple records as input, e.g. tuples of primitive types saved as cvs - minimal deduplication job which processes them and misses records - check if it happens for shorter windows, like 1h etc - setup which you use for the job, ideally locally reproducible or cloud Best, Andrey > On 4 Oct 2018, at 11:13, Juho Autio <juho.au...@rovio.com> wrote: > > Sorry to insist, but we seem to be blocked for any serious usage of state in > Flink if we can't rely on it to not miss data in case of restore. > > Would anyone have suggestions for how to troubleshoot this? So far I have > verified with DEBUG logs that our reduce function gets to process also the > data that is missing from window output. > > On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <juho.au...@rovio.com > <mailto:juho.au...@rovio.com>> wrote: > Hi Andrey, > > To rule out for good any questions about sink behaviour, the job was killed > and started with an additional Kafka sink. > > The same number of ids were missed in both outputs: KafkaSink & BucketingSink. > > I wonder what would be the next steps in debugging? > > On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <juho.au...@rovio.com > <mailto:juho.au...@rovio.com>> wrote: > Thanks, Andrey. > > > so it means that the savepoint does not loose at least some dropped records. > > I'm not sure what you mean by that? I mean, it was known from the beginning, > that not everything is lost before/after restoring a savepoint, just some > records around the time of restoration. It's not 100% clear whether records > are lost before making a savepoint or after restoring it. Although, based on > the new DEBUG logs it seems more like losing some records that are seen ~soon > after restoring. It seems like Flink would be somehow confused either about > the restored state vs. new inserts to state. This could also be somehow > linked to the high back pressure on the kafka source while the stream is > catching up. > > > If it is feasible for your setup, I suggest to insert one more map function > > after reduce and before sink. > > etc. > > Isn't that the same thing that we discussed before? Nothing is sent to > BucketingSink before the window closes, so I don't see how it would make any > difference if we replace the BucketingSink with a map function or another > sink type. We don't create or restore savepoints during the time when > BucketingSink gets input or has open buckets – that happens at a much later > time of day. I would focus on figuring out why the records are lost while the > window is open. But I don't know how to do that. Would you have any > additional suggestions? > > On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin <and...@data-artisans.com > <mailto:and...@data-artisans.com>> wrote: > Hi Juho, > > so it means that the savepoint does not loose at least some dropped records. > > If it is feasible for your setup, I suggest to insert one more map function > after reduce and before sink. > The map function should be called right after window is triggered but before > flushing to s3. > The result of reduce (deduped record) could be logged there. > This should allow to check whether the processed distinct records were > buffered in the state after the restoration from the savepoint or not. If > they were buffered we should see that there was an attempt to write them to > the sink from the state. > > Another suggestion is to try to write records to some other sink or to both. > E.g. if you can access file system of workers, maybe just into local files > and check whether the records are also dropped there. > > Best, > Andrey > >> On 20 Sep 2018, at 15:37, Juho Autio <juho.au...@rovio.com >> <mailto:juho.au...@rovio.com>> wrote: >> >> Hi Andrey! >> >> I was finally able to gather the DEBUG logs that you suggested. In short, >> the reducer logged that it processed at least some of the ids that were >> missing from the output. >> >> "At least some", because I didn't have the job running with DEBUG logs for >> the full 24-hour window period. So I was only able to look up if I can find >> some of the missing ids in the DEBUG logs. Which I did indeed. >> >> I changed the DistinctFunction.java to do this: >> >> @Override >> public Map<String, String> reduce(Map<String, String> value1, >> Map<String, String> value2) { >> LOG.debug("DistinctFunction.reduce returns: {}={}", >> value1.get("field"), value1.get("id")); >> return value1; >> } >> >> Then: >> >> vi flink-1.6.0/conf/log4j.properties >> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG >> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG >> >> Then I ran the following kind of test: >> >> - Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC 2018 >> - Started a new cluster & job with DEBUG enabled at ~09:13, restored from >> that previous cluster's savepoint >> - Ran until caught up offsets >> - Cancelled the job with a new savepoint >> - Started a new job _without_ DEBUG, which restored the new savepoint, let >> it keep running so that it will eventually write the output >> >> Then on the next day, after results had been flushed when the 24-hour window >> closed, I compared the results again with a batch version's output. And >> found some missing ids as usual. >> >> I drilled down to one specific missing id (I'm replacing the actual value >> with AN12345 below), which was not found in the stream output, but was found >> in batch output & flink DEBUG logs. >> >> Related to that id, I gathered the following information: >> >> 2018-09-18~09:13:21,000 job started & savepoint is restored >> >> 2018-09-18 09:14:29,085 missing id is processed for the first time, proved >> by this log line: >> 2018-09-18 09:14:29,085 DEBUG com.rovio.ds.flink.uniqueid.DistinctFunction >> - DistinctFunction.reduce returns: s.aid1=AN12345 >> >> 2018-09-18 09:15:14,264 first synchronous part of checkpoint >> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint >> >> ( >> more occurrences of checkpoints (~1 min checkpointing time + ~1 min >> delay before next) >> / >> more occurrences of DistinctFunction.reduce >> ) >> >> 2018-09-18 09:23:45,053 missing id is processed for the last time >> >> 2018-09-18~10:20:00,000 savepoint created & job cancelled >> >> To be noted, there was high backpressure after restoring from savepoint >> until the stream caught up with the kafka offsets. Although, our job uses >> assign timestamps & watermarks on the flink kafka consumer itself, so event >> time of all partitions is synchronized. As expected, we don't get any late >> data in the late data side output. >> >> From this we can see that the missing ids are processed by the reducer, but >> they must get lost somewhere before the 24-hour window is triggered. >> >> I think it's worth mentioning once more that the stream doesn't miss any ids >> if we let it's running without interruptions / state restoring. >> >> What's next? >> >> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin <and...@data-artisans.com >> <mailto:and...@data-artisans.com>> wrote: >> Hi Juho, >> >> > only when the 24-hour window triggers, BucketingSink gets a burst of input >> >> This is of course totally true, my understanding is the same. We cannot >> exclude problem there for sure, just savepoints are used a lot w/o problem >> reports and BucketingSink is known to be problematic with s3. That is why, I >> asked you: >> >> > You also wrote that the timestamps of lost event are 'probably' around the >> > time of the savepoint, if it is not yet for sure I would also check it. >> >> Although, bucketing sink might loose any data at the end of the day (also >> from the middle). The fact, that it is always around the time of taking a >> savepoint and not random, is surely suspicious and possible savepoint >> failures need to be investigated. >> >> Regarding the s3 problem, s3 doc says: >> >> > The caveat is that if you make a HEAD or GET request to the key name (to >> > find if the object exists) before creating the object, Amazon S3 provides >> > 'eventual consistency' for read-after-write. >> >> The algorithm you suggest is how it is roughly implemented now >> (BucketingSink.openNewPartFile). My understanding is that 'eventual >> consistency’ means that even if you just created file (its name is key) it >> can be that you do not get it in the list or exists (HEAD) returns false and >> you risk to rewrite the previous part. >> >> The BucketingSink was designed for a standard file system. s3 is used over a >> file system wrapper atm but does not always provide normal file system >> guarantees. See also last example in [1]. >> >> Cheers, >> Andrey >> >> [1] >> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82 >> >> <https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82> >> >>> On 29 Aug 2018, at 12:11, Juho Autio <juho.au...@rovio.com >>> <mailto:juho.au...@rovio.com>> wrote: >>> >>> Andrey, thank you very much for the debugging suggestions, I'll try them. >>> >>> In the meanwhile two more questions, please: >>> >>> > Just to keep in mind this problem with s3 and exclude it for sure. I >>> > would also check whether the size of missing events is around the batch >>> > size of BucketingSink or not. >>> >>> Fair enough, but I also want to focus on debugging the most probable >>> subject first. So what do you think about this – true or false: only when >>> the 24-hour window triggers, BucketinSink gets a burst of input. Around the >>> state restoring point (middle of the day) it doesn't get any input, so it >>> can't lose anything either. Isn't this true, or have I totally missed how >>> Flink works in triggering window results? I would not expect there to be >>> any optimization that speculatively triggers early results of a regular >>> time window to the downstream operators. >>> >>> > The old BucketingSink has in general problem with s3. Internally >>> > BucketingSink queries s3 as a file system to list already written file >>> > parts (batches) and determine index of the next part to start. Due to >>> > eventual consistency of checking file existence in s3 [1], the >>> > BucketingSink can rewrite the previously written part and basically loose >>> > it. >>> >>> I was wondering, what does S3's "read-after-write consistency" (mentioned >>> on the page you linked) actually mean. It seems that this might be possible: >>> - LIST keys, find current max index >>> - choose next index = max + 1 >>> - HEAD next index: if it exists, keep adding + 1 until key doesn't exist on >>> S3 >>> >>> But definitely sounds easier if a sink keeps track of files in a way that's >>> guaranteed to be consistent. >>> >>> Cheers, >>> Juho >>> >>> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <and...@data-artisans.com >>> <mailto:and...@data-artisans.com>> wrote: >>> Hi, >>> >>> true, StreamingFileSink does not support s3 in 1.6.0, it is planned for the >>> next 1.7 release, sorry for confusion. >>> The old BucketingSink has in general problem with s3. Internally >>> BucketingSink queries s3 as a file system >>> to list already written file parts (batches) and determine index of the >>> next part to start. Due to eventual consistency of checking file existence >>> in s3 [1], the BucketingSink can rewrite the previously written part and >>> basically loose it. It should be fixed for StreamingFileSink in 1.7 where >>> Flink keeps its own track of written parts and does not rely on s3 as a >>> file system. >>> I also include Kostas, he might add more details. >>> >>> Just to keep in mind this problem with s3 and exclude it for sure I would >>> also check whether the size of missing events is around the batch size of >>> BucketingSink or not. You also wrote that the timestamps of lost event are >>> 'probably' around the time of the savepoint, if it is not yet for sure I >>> would also check it. >>> >>> Have you already checked the log files of job manager and task managers for >>> the job running before and after the restore from the check point? Is >>> everything successful there, no errors, relevant warnings or exceptions? >>> >>> As the next step, I would suggest to log all encountered events in >>> DistinctFunction.reduce if possible for production data and check whether >>> the missed events are eventually processed before or after the savepoint. >>> The following log message indicates a border between the events that should >>> be included into the savepoint (logged before) or not: >>> “{} ({}, synchronous part) in thread {} took {} ms” (template) >>> Also check if the savepoint has been overall completed: >>> "{} ({}, asynchronous part) in thread {} took {} ms." >>> >>> Best, >>> Andrey >>> >>> [1] https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html >>> <https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html> >>> >>>> On 24 Aug 2018, at 20:41, Juho Autio <juho.au...@rovio.com >>>> <mailto:juho.au...@rovio.com>> wrote: >>>> >>>> Hi, >>>> >>>> Using StreamingFileSink is not a convenient option for production use for >>>> us as it doesn't support s3*. I could use StreamingFileSink just to >>>> verify, but I don't see much point in doing so. Please consider my >>>> previous comment: >>>> >>>> > I realized that BucketingSink must not play any role in this problem. >>>> > This is because only when the 24-hour window triggers, BucketingSink >>>> > gets a burst of input. Around the state restoring point (middle of the >>>> > day) it doesn't get any input, so it can't lose anything either (right?). >>>> >>>> I could also use a kafka sink instead, but I can't imagine how there could >>>> be any difference. It's very real that the sink doesn't get any input for >>>> a long time until the 24-hour window closes, and then it quickly writes >>>> out everything because it's not that much data eventually for the distinct >>>> values. >>>> >>>> Any ideas for debugging what's happening around the savepoint & >>>> restoration time? >>>> >>>> *) I actually implemented StreamingFileSink as an alternative sink. This >>>> was before I came to realize that most likely the sink component has >>>> nothing to do with the data loss problem. I tried it with s3n:// path just >>>> to see an exception being thrown. In the source code I indeed then found >>>> an explicit check for the target path scheme to be "hdfs://". >>>> >>>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <and...@data-artisans.com >>>> <mailto:and...@data-artisans.com>> wrote: >>>> Ok, I think before further debugging the window reduced state, >>>> could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 >>>> instead of the previous 'BucketingSink’? >>>> >>>> Cheers, >>>> Andrey >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html >>>> >>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html> >>>> >>>>> On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com >>>>> <mailto:juho.au...@rovio.com>> wrote: >>>>> >>>>> Yes, sorry for my confusing comment. I just meant that it seems like >>>>> there's a bug somewhere now that the output is missing some data. >>>>> >>>>> > I would wait and check the actual output in s3 because it is the main >>>>> > result of the job >>>>> >>>>> Yes, and that's what I have already done. There seems to be always some >>>>> data loss with the production data volumes, if the job has been restarted >>>>> on that day. >>>>> >>>>> Would you have any suggestions for how to debug this further? >>>>> >>>>> Many thanks for stepping in. >>>>> >>>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <and...@data-artisans.com >>>>> <mailto:and...@data-artisans.com>> wrote: >>>>> Hi Juho, >>>>> >>>>> So it is a per key deduplication job. >>>>> >>>>> Yes, I would wait and check the actual output in s3 because it is the >>>>> main result of the job and >>>>> >>>>> > The late data around the time of taking savepoint might be not included >>>>> > into the savepoint but it should be behind the snapshotted offset in >>>>> > Kafka. >>>>> >>>>> is not a bug, it is a possible behaviour. >>>>> >>>>> The savepoint is a snapshot of the data in transient which is already >>>>> consumed from Kafka. >>>>> Basically the full contents of the window result is split between the >>>>> savepoint and what can come after the savepoint'ed offset in Kafka but >>>>> before the window result is written into s3. >>>>> >>>>> Allowed lateness should not affect it, I am just saying that the final >>>>> result in s3 should include all records after it. >>>>> This is what should be guaranteed but not the contents of the >>>>> intermediate savepoint. >>>>> >>>>> Cheers, >>>>> Andrey >>>>> >>>>>> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com >>>>>> <mailto:juho.au...@rovio.com>> wrote: >>>>>> >>>>>> Thanks for your answer! >>>>>> >>>>>> I check for the missed data from the final output on s3. So I wait until >>>>>> the next day, then run the same thing re-implemented in batch, and >>>>>> compare the output. >>>>>> >>>>>> > The late data around the time of taking savepoint might be not >>>>>> > included into the savepoint but it should be behind the snapshotted >>>>>> > offset in Kafka. >>>>>> >>>>>> Yes, I would definitely expect that. It seems like there's a bug >>>>>> somewhere. >>>>>> >>>>>> > Then it should just come later after the restore and should be reduced >>>>>> > within the allowed lateness into the final result which is saved into >>>>>> > s3. >>>>>> >>>>>> Well, as far as I know, allowed lateness doesn't play any role here, >>>>>> because I started running the job with allowedLateness=0, and still get >>>>>> the data loss, while my late data output doesn't receive anything. >>>>>> >>>>>> > Also, is this `DistinctFunction.reduce` just an example or the actual >>>>>> > implementation, basically saving just one of records inside the 24h >>>>>> > window in s3? then what is missing there? >>>>>> >>>>>> Yes, it's the actual implementation. Note that there's a keyBy before >>>>>> the DistinctFunction. So there's one record for each key (which is the >>>>>> combination of a couple of fields). In practice I've seen that we're >>>>>> missing ~2000-4000 elements on each restore, and the total output is >>>>>> obviously much more than that. >>>>>> >>>>>> Here's the full code for the key selector: >>>>>> >>>>>> public class MapKeySelector implements KeySelector<Map<String,String>, >>>>>> Object> { >>>>>> >>>>>> private final String[] fields; >>>>>> >>>>>> public MapKeySelector(String... fields) { >>>>>> this.fields = fields; >>>>>> } >>>>>> >>>>>> @Override >>>>>> public Object getKey(Map<String, String> event) throws Exception { >>>>>> Tuple key = Tuple.getTupleClass(fields.length).newInstance(); >>>>>> for (int i = 0; i < fields.length; i++) { >>>>>> key.setField(event.getOrDefault(fields[i], ""), i); >>>>>> } >>>>>> return key; >>>>>> } >>>>>> } >>>>>> >>>>>> And a more exact example on how it's used: >>>>>> >>>>>> .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", >>>>>> "KEY_NAME", "KEY_VALUE")) >>>>>> .timeWindow(Time.days(1)) >>>>>> .reduce(new DistinctFunction()) >>>>>> >>>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin >>>>>> <and...@data-artisans.com <mailto:and...@data-artisans.com>> wrote: >>>>>> Hi Juho, >>>>>> >>>>>> Where exactly does the data miss? When do you notice that? >>>>>> Do you check it: >>>>>> - debugging `DistinctFunction.reduce` right after resume in the middle >>>>>> of the day >>>>>> or >>>>>> - some distinct records miss in the final output of BucketingSink in s3 >>>>>> after window result is actually triggered and saved into s3 at the end >>>>>> of the day? is this the main output? >>>>>> >>>>>> The late data around the time of taking savepoint might be not included >>>>>> into the savepoint but it should be behind the snapshotted offset in >>>>>> Kafka. Then it should just come later after the restore and should be >>>>>> reduced within the allowed lateness into the final result which is saved >>>>>> into s3. >>>>>> >>>>>> Also, is this `DistinctFunction.reduce` just an example or the actual >>>>>> implementation, basically saving just one of records inside the 24h >>>>>> window in s3? then what is missing there? >>>>>> >>>>>> Cheers, >>>>>> Andrey >>>>>> >>>>>>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com >>>>>>> <mailto:juho.au...@rovio.com>> wrote: >>>>>>> >>>>>>> I changed to allowedLateness=0, no change, still missing data when >>>>>>> restoring from savepoint. >>>>>>> >>>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <juho.au...@rovio.com >>>>>>> <mailto:juho.au...@rovio.com>> wrote: >>>>>>> I realized that BucketingSink must not play any role in this problem. >>>>>>> This is because only when the 24-hour window triggers, BucketinSink >>>>>>> gets a burst of input. Around the state restoring point (middle of the >>>>>>> day) it doesn't get any input, so it can't lose anything either >>>>>>> (right?). >>>>>>> >>>>>>> I will next try removing the allowedLateness entirely from the equation. >>>>>>> >>>>>>> In the meanwhile, please let me know if you have any suggestions for >>>>>>> debugging the lost data, for example what logs to enable. >>>>>>> >>>>>>> We use FlinkKafkaConsumer010 btw. Are there any known issues with that, >>>>>>> that could contribute to lost data when restoring a savepoint? >>>>>>> >>>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <juho.au...@rovio.com >>>>>>> <mailto:juho.au...@rovio.com>> wrote: >>>>>>> Some data is silently lost on my Flink stream job when state is >>>>>>> restored from a savepoint. >>>>>>> >>>>>>> Do you have any debugging hints to find out where exactly the data gets >>>>>>> dropped? >>>>>>> >>>>>>> My job gathers distinct values using a 24-hour window. It doesn't have >>>>>>> any custom state management. >>>>>>> >>>>>>> When I cancel the job with savepoint and restore from that savepoint, >>>>>>> some data is missed. It seems to be losing just a small amount of data. >>>>>>> The event time of lost data is probably around the time of savepoint. >>>>>>> In other words the rest of the time window is not entirely missed – >>>>>>> collection works correctly also for (most of the) events that come in >>>>>>> after restoring. >>>>>>> >>>>>>> When the job processes a full 24-hour window without interruptions it >>>>>>> doesn't miss anything. >>>>>>> >>>>>>> Usually the problem doesn't happen in test environments that have >>>>>>> smaller parallelism and smaller data volumes. But in production volumes >>>>>>> the job seems to be consistently missing at least something on every >>>>>>> restore. >>>>>>> >>>>>>> This issue has consistently happened since the job was initially >>>>>>> created. It was at first run on an older version of Flink 1.5-SNAPSHOT >>>>>>> and it still happens on both Flink 1.5.2 & 1.6.0. >>>>>>> >>>>>>> I'm wondering if this could be for example some synchronization issue >>>>>>> between the kafka consumer offsets vs. what's been written by >>>>>>> BucketingSink? >>>>>>> >>>>>>> 1. Job content, simplified >>>>>>> >>>>>>> kafkaStream >>>>>>> .flatMap(new ExtractFieldsFunction()) >>>>>>> .keyBy(new MapKeySelector(1, 2, 3, 4)) >>>>>>> .timeWindow(Time.days(1)) >>>>>>> .allowedLateness(allowedLateness) >>>>>>> .sideOutputLateData(lateDataTag) >>>>>>> .reduce(new DistinctFunction()) >>>>>>> .addSink(sink) >>>>>>> // use a fixed number of output partitions >>>>>>> .setParallelism(8)) >>>>>>> >>>>>>> /** >>>>>>> * Usage: .keyBy("the", "distinct", "fields").reduce(new >>>>>>> DistinctFunction()) >>>>>>> */ >>>>>>> public class DistinctFunction implements >>>>>>> ReduceFunction<java.util.Map<String, String>> { >>>>>>> @Override >>>>>>> public Map<String, String> reduce(Map<String, String> value1, >>>>>>> Map<String, String> value2) { >>>>>>> return value1; >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> 2. State configuration >>>>>>> >>>>>>> boolean enableIncrementalCheckpointing = true; >>>>>>> String statePath = "s3n://bucket/savepoints <>"; >>>>>>> new RocksDBStateBackend(statePath, enableIncrementalCheckpointing); >>>>>>> >>>>>>> Checkpointing Mode Exactly Once >>>>>>> Interval 1m 0s >>>>>>> Timeout 10m 0s >>>>>>> Minimum Pause Between Checkpoints 1m 0s >>>>>>> Maximum Concurrent Checkpoints 1 >>>>>>> Persist Checkpoints Externally Enabled (retain on cancellation) >>>>>>> >>>>>>> 3. BucketingSink configuration >>>>>>> >>>>>>> We use BucketingSink, I don't think there's anything special here, if >>>>>>> not the fact that we're writing to S3. >>>>>>> >>>>>>> String outputPath = "s3://bucket/output <>"; >>>>>>> BucketingSink<Map<String, String>> sink = new >>>>>>> BucketingSink<Map<String, String>>(outputPath) >>>>>>> .setBucketer(new ProcessdateBucketer()) >>>>>>> .setBatchSize(batchSize) >>>>>>> .setInactiveBucketThreshold(inactiveBucketThreshold) >>>>>>> >>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval); >>>>>>> sink.setWriter(new IdJsonWriter()); >>>>>>> >>>>>>> 4. Kafka & event time >>>>>>> >>>>>>> My flink job reads the data from Kafka, using a >>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to >>>>>>> synchronize watermarks accross all kafka partitions. We also write late >>>>>>> data to side output, but nothing is written there – if it would, it >>>>>>> could explain missed data in the main output (I'm also sure that our >>>>>>> late data writing works, because we previously had some actual late >>>>>>> data which ended up there). >>>>>>> >>>>>>> 5. allowedLateness >>>>>>> >>>>>>> It may be or may not be relevant that I have also enabled >>>>>>> allowedLateness with 1 minute lateness on the 24-hour window: >>>>>>> >>>>>>> If that makes sense, I could try removing allowedLateness entirely? >>>>>>> That would be just to rule out that Flink doesn't have a bug that's >>>>>>> related to restoring state in combination with the allowedLateness >>>>>>> feature. After all, all of our data should be in a good enough order to >>>>>>> not be late, given the max out of orderness used on kafka consumer >>>>>>> timestamp extractor. >>>>>>> >>>>>>> Thank you in advance! >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>>> >>> >>> >> >> > > > >