Ok, I think before further debugging the window reduced state, could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 instead of the previous 'BucketingSink’?
Cheers, Andrey [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html> > On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com> wrote: > > Yes, sorry for my confusing comment. I just meant that it seems like there's > a bug somewhere now that the output is missing some data. > > > I would wait and check the actual output in s3 because it is the main > > result of the job > > Yes, and that's what I have already done. There seems to be always some data > loss with the production data volumes, if the job has been restarted on that > day. > > Would you have any suggestions for how to debug this further? > > Many thanks for stepping in. > > On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <and...@data-artisans.com > <mailto:and...@data-artisans.com>> wrote: > Hi Juho, > > So it is a per key deduplication job. > > Yes, I would wait and check the actual output in s3 because it is the main > result of the job and > > > The late data around the time of taking savepoint might be not included > > into the savepoint but it should be behind the snapshotted offset in Kafka. > > is not a bug, it is a possible behaviour. > > The savepoint is a snapshot of the data in transient which is already > consumed from Kafka. > Basically the full contents of the window result is split between the > savepoint and what can come after the savepoint'ed offset in Kafka but before > the window result is written into s3. > > Allowed lateness should not affect it, I am just saying that the final result > in s3 should include all records after it. > This is what should be guaranteed but not the contents of the intermediate > savepoint. > > Cheers, > Andrey > >> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com >> <mailto:juho.au...@rovio.com>> wrote: >> >> Thanks for your answer! >> >> I check for the missed data from the final output on s3. So I wait until the >> next day, then run the same thing re-implemented in batch, and compare the >> output. >> >> > The late data around the time of taking savepoint might be not included >> > into the savepoint but it should be behind the snapshotted offset in Kafka. >> >> Yes, I would definitely expect that. It seems like there's a bug somewhere. >> >> > Then it should just come later after the restore and should be reduced >> > within the allowed lateness into the final result which is saved into s3. >> >> Well, as far as I know, allowed lateness doesn't play any role here, because >> I started running the job with allowedLateness=0, and still get the data >> loss, while my late data output doesn't receive anything. >> >> > Also, is this `DistinctFunction.reduce` just an example or the actual >> > implementation, basically saving just one of records inside the 24h window >> > in s3? then what is missing there? >> >> Yes, it's the actual implementation. Note that there's a keyBy before the >> DistinctFunction. So there's one record for each key (which is the >> combination of a couple of fields). In practice I've seen that we're missing >> ~2000-4000 elements on each restore, and the total output is obviously much >> more than that. >> >> Here's the full code for the key selector: >> >> public class MapKeySelector implements KeySelector<Map<String,String>, >> Object> { >> >> private final String[] fields; >> >> public MapKeySelector(String... fields) { >> this.fields = fields; >> } >> >> @Override >> public Object getKey(Map<String, String> event) throws Exception { >> Tuple key = Tuple.getTupleClass(fields.length).newInstance(); >> for (int i = 0; i < fields.length; i++) { >> key.setField(event.getOrDefault(fields[i], ""), i); >> } >> return key; >> } >> } >> >> And a more exact example on how it's used: >> >> .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", >> "KEY_NAME", "KEY_VALUE")) >> .timeWindow(Time.days(1)) >> .reduce(new DistinctFunction()) >> >> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <and...@data-artisans.com >> <mailto:and...@data-artisans.com>> wrote: >> Hi Juho, >> >> Where exactly does the data miss? When do you notice that? >> Do you check it: >> - debugging `DistinctFunction.reduce` right after resume in the middle of >> the day >> or >> - some distinct records miss in the final output of BucketingSink in s3 >> after window result is actually triggered and saved into s3 at the end of >> the day? is this the main output? >> >> The late data around the time of taking savepoint might be not included into >> the savepoint but it should be behind the snapshotted offset in Kafka. Then >> it should just come later after the restore and should be reduced within the >> allowed lateness into the final result which is saved into s3. >> >> Also, is this `DistinctFunction.reduce` just an example or the actual >> implementation, basically saving just one of records inside the 24h window >> in s3? then what is missing there? >> >> Cheers, >> Andrey >> >>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com >>> <mailto:juho.au...@rovio.com>> wrote: >>> >>> I changed to allowedLateness=0, no change, still missing data when >>> restoring from savepoint. >>> >>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <juho.au...@rovio.com >>> <mailto:juho.au...@rovio.com>> wrote: >>> I realized that BucketingSink must not play any role in this problem. This >>> is because only when the 24-hour window triggers, BucketinSink gets a burst >>> of input. Around the state restoring point (middle of the day) it doesn't >>> get any input, so it can't lose anything either (right?). >>> >>> I will next try removing the allowedLateness entirely from the equation. >>> >>> In the meanwhile, please let me know if you have any suggestions for >>> debugging the lost data, for example what logs to enable. >>> >>> We use FlinkKafkaConsumer010 btw. Are there any known issues with that, >>> that could contribute to lost data when restoring a savepoint? >>> >>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <juho.au...@rovio.com >>> <mailto:juho.au...@rovio.com>> wrote: >>> Some data is silently lost on my Flink stream job when state is restored >>> from a savepoint. >>> >>> Do you have any debugging hints to find out where exactly the data gets >>> dropped? >>> >>> My job gathers distinct values using a 24-hour window. It doesn't have any >>> custom state management. >>> >>> When I cancel the job with savepoint and restore from that savepoint, some >>> data is missed. It seems to be losing just a small amount of data. The >>> event time of lost data is probably around the time of savepoint. In other >>> words the rest of the time window is not entirely missed – collection works >>> correctly also for (most of the) events that come in after restoring. >>> >>> When the job processes a full 24-hour window without interruptions it >>> doesn't miss anything. >>> >>> Usually the problem doesn't happen in test environments that have smaller >>> parallelism and smaller data volumes. But in production volumes the job >>> seems to be consistently missing at least something on every restore. >>> >>> This issue has consistently happened since the job was initially created. >>> It was at first run on an older version of Flink 1.5-SNAPSHOT and it still >>> happens on both Flink 1.5.2 & 1.6.0. >>> >>> I'm wondering if this could be for example some synchronization issue >>> between the kafka consumer offsets vs. what's been written by BucketingSink? >>> >>> 1. Job content, simplified >>> >>> kafkaStream >>> .flatMap(new ExtractFieldsFunction()) >>> .keyBy(new MapKeySelector(1, 2, 3, 4)) >>> .timeWindow(Time.days(1)) >>> .allowedLateness(allowedLateness) >>> .sideOutputLateData(lateDataTag) >>> .reduce(new DistinctFunction()) >>> .addSink(sink) >>> // use a fixed number of output partitions >>> .setParallelism(8)) >>> >>> /** >>> * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction()) >>> */ >>> public class DistinctFunction implements >>> ReduceFunction<java.util.Map<String, String>> { >>> @Override >>> public Map<String, String> reduce(Map<String, String> value1, >>> Map<String, String> value2) { >>> return value1; >>> } >>> } >>> >>> 2. State configuration >>> >>> boolean enableIncrementalCheckpointing = true; >>> String statePath = "s3n://bucket/savepoints <>"; >>> new RocksDBStateBackend(statePath, enableIncrementalCheckpointing); >>> >>> Checkpointing Mode Exactly Once >>> Interval 1m 0s >>> Timeout 10m 0s >>> Minimum Pause Between Checkpoints 1m 0s >>> Maximum Concurrent Checkpoints 1 >>> Persist Checkpoints Externally Enabled (retain on cancellation) >>> >>> 3. BucketingSink configuration >>> >>> We use BucketingSink, I don't think there's anything special here, if not >>> the fact that we're writing to S3. >>> >>> String outputPath = "s3://bucket/output <>"; >>> BucketingSink<Map<String, String>> sink = new >>> BucketingSink<Map<String, String>>(outputPath) >>> .setBucketer(new ProcessdateBucketer()) >>> .setBatchSize(batchSize) >>> .setInactiveBucketThreshold(inactiveBucketThreshold) >>> >>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval); >>> sink.setWriter(new IdJsonWriter()); >>> >>> 4. Kafka & event time >>> >>> My flink job reads the data from Kafka, using a >>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to >>> synchronize watermarks accross all kafka partitions. We also write late >>> data to side output, but nothing is written there – if it would, it could >>> explain missed data in the main output (I'm also sure that our late data >>> writing works, because we previously had some actual late data which ended >>> up there). >>> >>> 5. allowedLateness >>> >>> It may be or may not be relevant that I have also enabled allowedLateness >>> with 1 minute lateness on the 24-hour window: >>> >>> If that makes sense, I could try removing allowedLateness entirely? That >>> would be just to rule out that Flink doesn't have a bug that's related to >>> restoring state in combination with the allowedLateness feature. After all, >>> all of our data should be in a good enough order to not be late, given the >>> max out of orderness used on kafka consumer timestamp extractor. >>> >>> Thank you in advance! >>> >>> >> >> > >