Aljoscha is right!

Any contribution is more than welcomed.

Kostas

> On Jan 10, 2017, at 3:48 PM, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> Yes, please go ahead with the fix! :-)
> 
> (If I'm not mistaken Kostas is working on other stuff right now.)
> 
> On Mon, 9 Jan 2017 at 23:19 Yassine MARZOUGUI <y.marzou...@mindlytix.com 
> <mailto:y.marzou...@mindlytix.com>> wrote:
> Hi,
> 
> I found the root cause of the problem : the listEligibleFiles method in 
> ContinuousFileMonitoringFunction scans only the topmost files and ignores the 
> nested files. By fixing that I was able to get the expected output. I created 
> Jira issue: https://issues.apache.org/jira/browse/FLINK-5432 
> <https://issues.apache.org/jira/browse/FLINK-5432>.
> 
> @Kostas, If you haven't already started working on a fix for this, I would 
> happily contribute a fix for it if you like.
> 
> Best,
> Yassine
> 
> 2017-01-09 17:23 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com 
> <mailto:y.marzou...@mindlytix.com>>:
> Hi Kostas,
> 
> I debugged the code and the nestedFileEnumeration parameter was always true 
> during the execution. I noticed however that in the following loop in 
> ContinuousFileMonitoringFunction, for some reason, the fileStatus was null 
> for files in nested folders, and non null for files directly under the parent 
> path, so no splits were forwarded in the case of nested folders.
> 
> for(int var5 = 0; var5 < var4; ++var5) {
>                 FileInputSplit split = var3[var5];
>                 FileStatus fileStatus = 
> (FileStatus)eligibleFiles.get(split.getPath());
>                 if(fileStatus != null) {
>                     Long modTime = 
> Long.valueOf(fileStatus.getModificationTime());
>                     Object splitsToForward = 
> (List)splitsByModTime.get(modTime);
>                     if(splitsToForward == null) {
>                         splitsToForward = new ArrayList();
>                         splitsByModTime.put(modTime, splitsToForward);
>                     }
> 
>                     ((List)splitsToForward).add(new 
> TimestampedFileInputSplit(modTime.longValue(), split.getSplitNumber(), 
> split.getPath(), split.getStart(), split.getLength(), split.getHostnames()));
>                 }
>             }
> 
> Thanks,
> Yassine
> 
> 
> 2017-01-09 15:04 GMT+01:00 Kostas Kloudas <k.klou...@data-artisans.com 
> <mailto:k.klou...@data-artisans.com>>:
> Hi Yassine,
> 
> I suspect that the problem is in the way the input format (and not the 
> reader) scans nested files, 
> but could you see if in the code that is executed by the tasks, the 
> nestedFileEnumeration parameter is still true?
> 
> I am asking in order to pin down if the problem is in the way we ship the 
> code to the tasks or in reading the 
> nested files.
> 
> Thanks,
> Kostas
> 
>> On Jan 9, 2017, at 12:56 PM, Yassine MARZOUGUI <y.marzou...@mindlytix.com 
>> <mailto:y.marzou...@mindlytix.com>> wrote:
>> 
>> Hi,
>> 
>> Any updates on this issue? Thank you.
>> 
>> Best,
>> Yassine
>> 
>> 
>> On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <aljos...@apache.org 
>> <mailto:aljos...@apache.org>> wrote:
>> +kostas, who probably has the most experience with this by now. Do you have 
>> an idea what might be going on?
>> 
>> On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <y.marzou...@mindlytix.com 
>> <mailto:y.marzou...@mindlytix.com>> wrote:
>> Looks like this is not specific to the continuous file monitoring, I'm 
>> having the same issue (files in nested directories are not read) when using:
>> 
>> env.readFile(fileInputFormat, "hdfs:///shared/mydir <>", 
>> FileProcessingMode.PROCESS_ONCE, -1L)
>> 
>> 2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com 
>> <mailto:y.marzou...@mindlytix.com>>:
>> Hi all,
>> 
>> I'm using the following code to continuously process files from a directory 
>> "mydir".
>> 
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>> FileInputFormat fileInputFormat = new TextInputFormat(new 
>> Path("hdfs:///shared/mydir <>"));
>> fileInputFormat.setNestedFileEnumeration(true);
>> 
>> env.readFile(fileInputFormat,
>>                 "hdfs:///shared/mydir <>",
>>                 FileProcessingMode.PROCESS_CONTINUOUSLY, 10000L)
>>                 .print();
>> 
>> env.execute();
>> 
>> If I add directory under mydir, say "2016-12-16", and then add a file 
>> "2016-12-16/file.txt", its contents are not printed. If I add the same file 
>> directly under "mydir",  its contents are correctly printed. After that the 
>> logs will show the following :
>> 
>> 10:55:44,928 DEBUG 
>> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
>>   - Ignoring hdfs://mlxbackoffice/shared/ <>mydir/2016-12-16, with mod time= 
>> 1481882041587 and global mod time= 1481882126122
>> 10:55:44,928 DEBUG 
>> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
>>   - Ignoring hdfs://mlxbackoffice/shared/ <>mydir/file.txt, with mod time= 
>> 1481881788704 and global mod time= 1481882126122
>> 
>> Looks like the ContinuousFileMonitoringFunction  considered it already read 
>> 2016-12-16 as a file and then excludes it, but its contents were not 
>> processed. Any Idea why this happens?
>> Thank you.
>> 
>> Best,
>> Yassine
>> 
>> 
> 
> 
> 

Reply via email to