Hi,

Using StreamingFileSink is not a convenient option for production use for
us as it doesn't support s3*. I could use StreamingFileSink just to verify,
but I don't see much point in doing so. Please consider my previous comment:

> I realized that BucketingSink must not play any role in this problem.
This is because only when the 24-hour window triggers, BucketingSink gets a
burst of input. Around the state restoring point (middle of the day) it
doesn't get any input, so it can't lose anything either (right?).

I could also use a kafka sink instead, but I can't imagine how there could
be any difference. It's very real that the sink doesn't get any input for a
long time until the 24-hour window closes, and then it quickly writes out
everything because it's not that much data eventually for the distinct
values.

Any ideas for debugging what's happening around the savepoint & restoration
time?

*) I actually implemented StreamingFileSink as an alternative sink. This
was before I came to realize that most likely the sink component has
nothing to do with the data loss problem. I tried it with s3n:// path just
to see an exception being thrown. In the source code I indeed then found an
explicit check for the target path scheme to be "hdfs://".

On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <and...@data-artisans.com>
wrote:

> Ok, I think before further debugging the window reduced state,
> could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0
> instead of the previous 'BucketingSink’?
>
> Cheers,
> Andrey
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>
> On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com> wrote:
>
> Yes, sorry for my confusing comment. I just meant that it seems like
> there's a bug somewhere now that the output is missing some data.
>
> > I would wait and check the actual output in s3 because it is the main
> result of the job
>
> Yes, and that's what I have already done. There seems to be always some
> data loss with the production data volumes, if the job has been restarted
> on that day.
>
> Would you have any suggestions for how to debug this further?
>
> Many thanks for stepping in.
>
> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <and...@data-artisans.com>
> wrote:
>
>> Hi Juho,
>>
>> So it is a per key deduplication job.
>>
>> Yes, I would wait and check the actual output in s3 because it is the
>> main result of the job and
>>
>> > The late data around the time of taking savepoint might be not included
>> into the savepoint but it should be behind the snapshotted offset in Kafka.
>>
>> is not a bug, it is a possible behaviour.
>>
>> The savepoint is a snapshot of the data in transient which is already
>> consumed from Kafka.
>> Basically the full contents of the window result is split between the
>> savepoint and what can come after the savepoint'ed offset in Kafka but
>> before the window result is written into s3.
>>
>> Allowed lateness should not affect it, I am just saying that the final
>> result in s3 should include all records after it.
>> This is what should be guaranteed but not the contents of the
>> intermediate savepoint.
>>
>> Cheers,
>> Andrey
>>
>> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com> wrote:
>>
>> Thanks for your answer!
>>
>> I check for the missed data from the final output on s3. So I wait until
>> the next day, then run the same thing re-implemented in batch, and compare
>> the output.
>>
>> > The late data around the time of taking savepoint might be not included
>> into the savepoint but it should be behind the snapshotted offset in Kafka.
>>
>> Yes, I would definitely expect that. It seems like there's a bug
>> somewhere.
>>
>> > Then it should just come later after the restore and should be reduced
>> within the allowed lateness into the final result which is saved into s3.
>>
>> Well, as far as I know, allowed lateness doesn't play any role here,
>> because I started running the job with allowedLateness=0, and still get the
>> data loss, while my late data output doesn't receive anything.
>>
>> > Also, is this `DistinctFunction.reduce` just an example or the actual
>> implementation, basically saving just one of records inside the 24h window
>> in s3? then what is missing there?
>>
>> Yes, it's the actual implementation. Note that there's a keyBy before
>> the DistinctFunction. So there's one record for each key (which is the
>> combination of a couple of fields). In practice I've seen that we're
>> missing ~2000-4000 elements on each restore, and the total output is
>> obviously much more than that.
>>
>> Here's the full code for the key selector:
>>
>> public class MapKeySelector implements KeySelector<Map<String,String>,
>> Object> {
>>
>>     private final String[] fields;
>>
>>     public MapKeySelector(String... fields) {
>>         this.fields = fields;
>>     }
>>
>>     @Override
>>     public Object getKey(Map<String, String> event) throws Exception {
>>         Tuple key = Tuple.getTupleClass(fields.length).newInstance();
>>         for (int i = 0; i < fields.length; i++) {
>>             key.setField(event.getOrDefault(fields[i], ""), i);
>>         }
>>         return key;
>>     }
>> }
>>
>> And a more exact example on how it's used:
>>
>>                 .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD",
>> "KEY_NAME", "KEY_VALUE"))
>>                 .timeWindow(Time.days(1))
>>                 .reduce(new DistinctFunction())
>>
>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <and...@data-artisans.com>
>> wrote:
>>
>>> Hi Juho,
>>>
>>> Where exactly does the data miss? When do you notice that?
>>> Do you check it:
>>> - debugging `DistinctFunction.reduce` right after resume in the middle
>>> of the day
>>> or
>>> - some distinct records miss in the final output of BucketingSink in s3
>>> after window result is actually triggered and saved into s3 at the end of
>>> the day? is this the main output?
>>>
>>> The late data around the time of taking savepoint might be not included
>>> into the savepoint but it should be behind the snapshotted offset in Kafka.
>>> Then it should just come later after the restore and should be reduced
>>> within the allowed lateness into the final result which is saved into s3.
>>>
>>> Also, is this `DistinctFunction.reduce` just an example or the actual
>>> implementation, basically saving just one of records inside the 24h window
>>> in s3? then what is missing there?
>>>
>>> Cheers,
>>> Andrey
>>>
>>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com> wrote:
>>>
>>> I changed to allowedLateness=0, no change, still missing data when
>>> restoring from savepoint.
>>>
>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <juho.au...@rovio.com>
>>> wrote:
>>>
>>>> I realized that BucketingSink must not play any role in this problem.
>>>> This is because only when the 24-hour window triggers, BucketinSink gets a
>>>> burst of input. Around the state restoring point (middle of the day) it
>>>> doesn't get any input, so it can't lose anything either (right?).
>>>>
>>>> I will next try removing the allowedLateness entirely from the equation.
>>>>
>>>> In the meanwhile, please let me know if you have any suggestions for
>>>> debugging the lost data, for example what logs to enable.
>>>>
>>>> We use FlinkKafkaConsumer010 btw. Are there any known issues with that,
>>>> that could contribute to lost data when restoring a savepoint?
>>>>
>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <juho.au...@rovio.com>
>>>> wrote:
>>>>
>>>>> Some data is silently lost on my Flink stream job when state is
>>>>> restored from a savepoint.
>>>>>
>>>>> Do you have any debugging hints to find out where exactly the data
>>>>> gets dropped?
>>>>>
>>>>> My job gathers distinct values using a 24-hour window. It doesn't have
>>>>> any custom state management.
>>>>>
>>>>> When I cancel the job with savepoint and restore from that savepoint,
>>>>> some data is missed. It seems to be losing just a small amount of data. 
>>>>> The
>>>>> event time of lost data is probably around the time of savepoint. In other
>>>>> words the rest of the time window is not entirely missed – collection 
>>>>> works
>>>>> correctly also for (most of the) events that come in after restoring.
>>>>>
>>>>> When the job processes a full 24-hour window without interruptions it
>>>>> doesn't miss anything.
>>>>>
>>>>> Usually the problem doesn't happen in test environments that have
>>>>> smaller parallelism and smaller data volumes. But in production volumes 
>>>>> the
>>>>> job seems to be consistently missing at least something on every restore.
>>>>>
>>>>> This issue has consistently happened since the job was initially
>>>>> created. It was at first run on an older version of Flink 1.5-SNAPSHOT and
>>>>> it still happens on both Flink 1.5.2 & 1.6.0.
>>>>>
>>>>> I'm wondering if this could be for example some synchronization issue
>>>>> between the kafka consumer offsets vs. what's been written by 
>>>>> BucketingSink?
>>>>>
>>>>> 1. Job content, simplified
>>>>>
>>>>>         kafkaStream
>>>>>                 .flatMap(new ExtractFieldsFunction())
>>>>>                 .keyBy(new MapKeySelector(1, 2, 3, 4))
>>>>>                 .timeWindow(Time.days(1))
>>>>>                 .allowedLateness(allowedLateness)
>>>>>                 .sideOutputLateData(lateDataTag)
>>>>>                 .reduce(new DistinctFunction())
>>>>>                 .addSink(sink)
>>>>>                 // use a fixed number of output partitions
>>>>>                 .setParallelism(8))
>>>>>
>>>>> /**
>>>>>  * Usage: .keyBy("the", "distinct", "fields").reduce(new
>>>>> DistinctFunction())
>>>>>  */
>>>>> public class DistinctFunction implements
>>>>> ReduceFunction<java.util.Map<String, String>> {
>>>>>     @Override
>>>>>     public Map<String, String> reduce(Map<String, String> value1,
>>>>> Map<String, String> value2) {
>>>>>         return value1;
>>>>>     }
>>>>> }
>>>>>
>>>>> 2. State configuration
>>>>>
>>>>> boolean enableIncrementalCheckpointing = true;
>>>>> String statePath = "s3n://bucket/savepoints";
>>>>> new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);
>>>>>
>>>>> Checkpointing Mode Exactly Once
>>>>> Interval 1m 0s
>>>>> Timeout 10m 0s
>>>>> Minimum Pause Between Checkpoints 1m 0s
>>>>> Maximum Concurrent Checkpoints 1
>>>>> Persist Checkpoints Externally Enabled (retain on cancellation)
>>>>>
>>>>> 3. BucketingSink configuration
>>>>>
>>>>> We use BucketingSink, I don't think there's anything special here, if
>>>>> not the fact that we're writing to S3.
>>>>>
>>>>>         String outputPath = "s3://bucket/output";
>>>>>         BucketingSink<Map<String, String>> sink = new
>>>>> BucketingSink<Map<String, String>>(outputPath)
>>>>>                 .setBucketer(new ProcessdateBucketer())
>>>>>                 .setBatchSize(batchSize)
>>>>>                 .setInactiveBucketThreshold(inactiveBucketThreshold)
>>>>>
>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
>>>>>         sink.setWriter(new IdJsonWriter());
>>>>>
>>>>> 4. Kafka & event time
>>>>>
>>>>> My flink job reads the data from Kafka, using a
>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to
>>>>> synchronize watermarks accross all kafka partitions. We also write late
>>>>> data to side output, but nothing is written there – if it would, it could
>>>>> explain missed data in the main output (I'm also sure that our late data
>>>>> writing works, because we previously had some actual late data which ended
>>>>> up there).
>>>>>
>>>>> 5. allowedLateness
>>>>>
>>>>> It may be or may not be relevant that I have also enabled
>>>>> allowedLateness with 1 minute lateness on the 24-hour window:
>>>>>
>>>>> If that makes sense, I could try removing allowedLateness entirely?
>>>>> That would be just to rule out that Flink doesn't have a bug that's 
>>>>> related
>>>>> to restoring state in combination with the allowedLateness feature. After
>>>>> all, all of our data should be in a good enough order to not be late, 
>>>>> given
>>>>> the max out of orderness used on kafka consumer timestamp extractor.
>>>>>
>>>>> Thank you in advance!
>>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>
>

Reply via email to