Thanks for looking into this, Jürgen!

I opened a Jira issue: https://issues.apache.org/jira/browse/FLINK-6427 
<https://issues.apache.org/jira/browse/FLINK-6427>

> On 29. Apr 2017, at 09:15, Jürgen Thomann <juergen.thom...@innogames.com> 
> wrote:
> 
> Hi Aljoscha,
> 
> In my case the valid-length file created contains a value which e.g. says 100 
> MB are valid to read for exactly once but the file with the data is only 95 
> MB large. As I understand it the valid-length file contains the length of the 
> file at the checkpoint. This does also not happen for all files (3 HDFS sinks 
> each with a parallelism of 2). For some parts the file size and the value in 
> the valid-length file match exactly. 
> 
> After looking now over the checkpoint code in BucketingSink I looked into the 
> hsync behavior again and found the following page: 
> http://stackoverflow.com/questions/32231105/why-is-hsync-not-flushing-my-hdfs-file
>  
> <http://stackoverflow.com/questions/32231105/why-is-hsync-not-flushing-my-hdfs-file>
> After this I downloaded the file with the hdfs dfs tool and actually the file 
> is now even larger than the valid-length file. I checked this against the 
> things I did before (Impala and hive select count query, and Hue download of 
> files and wc -l) and this 3 ways result in the same amount of lines but hdfs 
> dfs -cat <filename> | wc -l gives a much larger value. 
> 
> So my conclusion would be that the data is written and not exactly lost as I 
> thought, but for my use case not visible because the files are not properly 
> closed during cancel and the namenode is not aware of the flushed data. So I 
> could imagine 2 ways out of this: 1. implement the hsync as stated at the 
> Stack Overflow page or 2. ensure that files are properly closed during cancel.
> 
> Best,
> Jürgen
> 
> On 28.04.2017 17:38, Aljoscha Krettek wrote:
>> Hi Jürgen,
>> Is there missing data with respect to what should have been written at the 
>> time of the cancel or when the last checkpoint (or in that case, the 
>> savepoint) was performed. I’m asking because the cancel command is only sent 
>> out once the savepoint has been completed, as can be seen at [1]. If the 
>> savepoint is complete this also means that the snapshot method of the 
>> BucketingSink must have done it’s work, i.e. that it also flushed all files, 
>> which is done in [2]. There’s always the possibility of a bug, however, so 
>> we’ll have to look into this together.
>> 
>> Best,
>> Aljoscha
>> 
>> [1] 
>> https://github.com/apache/flink/blob/c22efce098c14e8f08bad1e0065dbd02df6e4dbb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L607-L607
>>  
>> <https://github.com/apache/flink/blob/c22efce098c14e8f08bad1e0065dbd02df6e4dbb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L607-L607>
>> 
>> [2] 
>> https://github.com/apache/flink/blob/b4c60a942fe07e355dd49ed2aab3c0a7ae94285d/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
>>  
>> <https://github.com/apache/flink/blob/b4c60a942fe07e355dd49ed2aab3c0a7ae94285d/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java>
>> 
>>> On 27. Apr 2017, at 10:27, Jürgen Thomann <juergen.thom...@innogames.com 
>>> <mailto:juergen.thom...@innogames.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> I had some time ago problems with writing data to Hadoop with the 
>>> BucketingSink and losing data in case of cancel with savepoint because 
>>> flush/sync command was interrupted. I tried changing Hadoop settings as 
>>> suggested but had no luck at the end and looked into the Flink code. If I 
>>> understand the code correctly it behaves the following way:
>>> 
>>> 1. Start a Watchdog thread if we have a cancellation timeout set
>>> 2. invoke cancel on the sink/task, but do not wait for it to finish
>>> 3. destroy buffer pool and a release resources
>>> 4. send initial interrupt to the sink/task
>>> 5. call join on the sink/task and ignore InterruptedException
>>> 6. let the watchdog send more interrupts if needed and throw fatal error if 
>>> timeout is reached
>>> 
>>> In my case the BucketingSink does not has enough time to flush everything 
>>> before the initial interrupt is sent and some files are not closed properly 
>>> which causes the missing data in Hadoop in my understanding.
>>> 
>>> Is my understanding correct and if yes, do you know a way to get around 
>>> this behavior to let the close function finish the sync for all files?
>>> 
>>> Best,
>>> Jürgen
>> 

Reply via email to