Good day everyone,

I have about 100 thousand files to read, and a custom FilePathFilter with a
simple filterPath method defined as below (the custom part is only to check
file-size and skip files with size = 0) 
        override def filterPath(filePath: Path): Boolean = {
                filePath == null ||
                                filePath.getName.startsWith(".") ||
                                filePath.getName.startsWith("_") ||
                                
filePath.getName.contains(FilePathFilter.HADOOP_COPYING) ||
                                {
                                                def fileStatus = 
filePath.getFileSystem.getFileStatus(filePath)
                                                !fileStatus.isDir && 
fileStatus.getLen == 0
                                }
        }

It is running fine either when I disable checkpointing or when I use the
default FilePathFilter. It takes about 7 minutes to finished processing all
files (from source to sink).
However, when I have both, customer filter and checkpointing, it usually
takes 15-20 minutes for Flink to start reading my files (in Flink GUI, the
CustomFileSource monitor generates 0 records during that 15-20 minutes
period)

Could someone please help with this? 

Thank you very much for your time.
Best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to