Ok, I think before further debugging the window reduced state, 
could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 instead 
of the previous 'BucketingSink’?

Cheers,
Andrey

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
 
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html>

> On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com> wrote:
> 
> Yes, sorry for my confusing comment. I just meant that it seems like there's 
> a bug somewhere now that the output is missing some data.
> 
> > I would wait and check the actual output in s3 because it is the main 
> > result of the job
> 
> Yes, and that's what I have already done. There seems to be always some data 
> loss with the production data volumes, if the job has been restarted on that 
> day.
> 
> Would you have any suggestions for how to debug this further?
> 
> Many thanks for stepping in.
> 
> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <and...@data-artisans.com 
> <mailto:and...@data-artisans.com>> wrote:
> Hi Juho,
> 
> So it is a per key deduplication job.
> 
> Yes, I would wait and check the actual output in s3 because it is the main 
> result of the job and
> 
> > The late data around the time of taking savepoint might be not included 
> > into the savepoint but it should be behind the snapshotted offset in Kafka.
> 
> is not a bug, it is a possible behaviour.
> 
> The savepoint is a snapshot of the data in transient which is already 
> consumed from Kafka.
> Basically the full contents of the window result is split between the 
> savepoint and what can come after the savepoint'ed offset in Kafka but before 
> the window result is written into s3. 
> 
> Allowed lateness should not affect it, I am just saying that the final result 
> in s3 should include all records after it. 
> This is what should be guaranteed but not the contents of the intermediate 
> savepoint.
> 
> Cheers,
> Andrey
> 
>> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com 
>> <mailto:juho.au...@rovio.com>> wrote:
>> 
>> Thanks for your answer!
>> 
>> I check for the missed data from the final output on s3. So I wait until the 
>> next day, then run the same thing re-implemented in batch, and compare the 
>> output.
>> 
>> > The late data around the time of taking savepoint might be not included 
>> > into the savepoint but it should be behind the snapshotted offset in Kafka.
>> 
>> Yes, I would definitely expect that. It seems like there's a bug somewhere.
>> 
>> > Then it should just come later after the restore and should be reduced 
>> > within the allowed lateness into the final result which is saved into s3.
>> 
>> Well, as far as I know, allowed lateness doesn't play any role here, because 
>> I started running the job with allowedLateness=0, and still get the data 
>> loss, while my late data output doesn't receive anything.
>> 
>> > Also, is this `DistinctFunction.reduce` just an example or the actual 
>> > implementation, basically saving just one of records inside the 24h window 
>> > in s3? then what is missing there?
>> 
>> Yes, it's the actual implementation. Note that there's a keyBy before the 
>> DistinctFunction. So there's one record for each key (which is the 
>> combination of a couple of fields). In practice I've seen that we're missing 
>> ~2000-4000 elements on each restore, and the total output is obviously much 
>> more than that.
>> 
>> Here's the full code for the key selector:
>> 
>> public class MapKeySelector implements KeySelector<Map<String,String>, 
>> Object> {
>> 
>>     private final String[] fields;
>> 
>>     public MapKeySelector(String... fields) {
>>         this.fields = fields;
>>     }
>> 
>>     @Override
>>     public Object getKey(Map<String, String> event) throws Exception {
>>         Tuple key = Tuple.getTupleClass(fields.length).newInstance();
>>         for (int i = 0; i < fields.length; i++) {
>>             key.setField(event.getOrDefault(fields[i], ""), i);
>>         }
>>         return key;
>>     }
>> }
>> 
>> And a more exact example on how it's used:
>> 
>>                 .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", 
>> "KEY_NAME", "KEY_VALUE"))
>>                 .timeWindow(Time.days(1))
>>                 .reduce(new DistinctFunction())
>> 
>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <and...@data-artisans.com 
>> <mailto:and...@data-artisans.com>> wrote:
>> Hi Juho,
>> 
>> Where exactly does the data miss? When do you notice that? 
>> Do you check it:
>> - debugging `DistinctFunction.reduce` right after resume in the middle of 
>> the day 
>> or 
>> - some distinct records miss in the final output of BucketingSink in s3 
>> after window result is actually triggered and saved into s3 at the end of 
>> the day? is this the main output?
>> 
>> The late data around the time of taking savepoint might be not included into 
>> the savepoint but it should be behind the snapshotted offset in Kafka. Then 
>> it should just come later after the restore and should be reduced within the 
>> allowed lateness into the final result which is saved into s3.
>> 
>> Also, is this `DistinctFunction.reduce` just an example or the actual 
>> implementation, basically saving just one of records inside the 24h window 
>> in s3? then what is missing there?
>> 
>> Cheers,
>> Andrey
>> 
>>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com 
>>> <mailto:juho.au...@rovio.com>> wrote:
>>> 
>>> I changed to allowedLateness=0, no change, still missing data when 
>>> restoring from savepoint.
>>> 
>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <juho.au...@rovio.com 
>>> <mailto:juho.au...@rovio.com>> wrote:
>>> I realized that BucketingSink must not play any role in this problem. This 
>>> is because only when the 24-hour window triggers, BucketinSink gets a burst 
>>> of input. Around the state restoring point (middle of the day) it doesn't 
>>> get any input, so it can't lose anything either (right?).
>>> 
>>> I will next try removing the allowedLateness entirely from the equation.
>>> 
>>> In the meanwhile, please let me know if you have any suggestions for 
>>> debugging the lost data, for example what logs to enable.
>>> 
>>> We use FlinkKafkaConsumer010 btw. Are there any known issues with that, 
>>> that could contribute to lost data when restoring a savepoint?
>>> 
>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <juho.au...@rovio.com 
>>> <mailto:juho.au...@rovio.com>> wrote:
>>> Some data is silently lost on my Flink stream job when state is restored 
>>> from a savepoint.
>>> 
>>> Do you have any debugging hints to find out where exactly the data gets 
>>> dropped?
>>> 
>>> My job gathers distinct values using a 24-hour window. It doesn't have any 
>>> custom state management.
>>> 
>>> When I cancel the job with savepoint and restore from that savepoint, some 
>>> data is missed. It seems to be losing just a small amount of data. The 
>>> event time of lost data is probably around the time of savepoint. In other 
>>> words the rest of the time window is not entirely missed – collection works 
>>> correctly also for (most of the) events that come in after restoring.
>>> 
>>> When the job processes a full 24-hour window without interruptions it 
>>> doesn't miss anything.
>>> 
>>> Usually the problem doesn't happen in test environments that have smaller 
>>> parallelism and smaller data volumes. But in production volumes the job 
>>> seems to be consistently missing at least something on every restore.
>>> 
>>> This issue has consistently happened since the job was initially created. 
>>> It was at first run on an older version of Flink 1.5-SNAPSHOT and it still 
>>> happens on both Flink 1.5.2 & 1.6.0.
>>> 
>>> I'm wondering if this could be for example some synchronization issue 
>>> between the kafka consumer offsets vs. what's been written by BucketingSink?
>>> 
>>> 1. Job content, simplified
>>> 
>>>         kafkaStream
>>>                 .flatMap(new ExtractFieldsFunction())
>>>                 .keyBy(new MapKeySelector(1, 2, 3, 4))
>>>                 .timeWindow(Time.days(1))
>>>                 .allowedLateness(allowedLateness)
>>>                 .sideOutputLateData(lateDataTag)
>>>                 .reduce(new DistinctFunction())
>>>                 .addSink(sink)
>>>                 // use a fixed number of output partitions
>>>                 .setParallelism(8))
>>> 
>>> /**
>>>  * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
>>>  */
>>> public class DistinctFunction implements 
>>> ReduceFunction<java.util.Map<String, String>> {
>>>     @Override
>>>     public Map<String, String> reduce(Map<String, String> value1, 
>>> Map<String, String> value2) {
>>>         return value1;
>>>     }
>>> }
>>> 
>>> 2. State configuration
>>> 
>>> boolean enableIncrementalCheckpointing = true;
>>> String statePath = "s3n://bucket/savepoints <>";
>>> new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);
>>> 
>>> Checkpointing Mode  Exactly Once
>>> Interval    1m 0s
>>> Timeout     10m 0s
>>> Minimum Pause Between Checkpoints   1m 0s
>>> Maximum Concurrent Checkpoints      1
>>> Persist Checkpoints Externally      Enabled (retain on cancellation)
>>> 
>>> 3. BucketingSink configuration
>>> 
>>> We use BucketingSink, I don't think there's anything special here, if not 
>>> the fact that we're writing to S3.
>>> 
>>>         String outputPath = "s3://bucket/output <>";
>>>         BucketingSink<Map<String, String>> sink = new 
>>> BucketingSink<Map<String, String>>(outputPath)
>>>                 .setBucketer(new ProcessdateBucketer())
>>>                 .setBatchSize(batchSize)
>>>                 .setInactiveBucketThreshold(inactiveBucketThreshold)
>>>                 
>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
>>>         sink.setWriter(new IdJsonWriter());
>>> 
>>> 4. Kafka & event time
>>> 
>>> My flink job reads the data from Kafka, using a 
>>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to 
>>> synchronize watermarks accross all kafka partitions. We also write late 
>>> data to side output, but nothing is written there – if it would, it could 
>>> explain missed data in the main output (I'm also sure that our late data 
>>> writing works, because we previously had some actual late data which ended 
>>> up there).
>>> 
>>> 5. allowedLateness
>>> 
>>> It may be or may not be relevant that I have also enabled allowedLateness 
>>> with 1 minute lateness on the 24-hour window:
>>> 
>>> If that makes sense, I could try removing allowedLateness entirely? That 
>>> would be just to rule out that Flink doesn't have a bug that's related to 
>>> restoring state in combination with the allowedLateness feature. After all, 
>>> all of our data should be in a good enough order to not be late, given the 
>>> max out of orderness used on kafka consumer timestamp extractor.
>>> 
>>> Thank you in advance!
>>> 
>>> 
>> 
>> 
> 
> 

Reply via email to