Good day everyone,

I have a Flink job that has an S3 folder as a source, and we keep putting
thousands of small (around 1KB each) gzip files into that folder, with the
rate of about 5000 files per minute. Here is how I created that source in
Scala:

   / val my_input_format = new TextInputFormat(new
org.apache.flink.core.fs.Path(my_path))
    my_input_format.setFilesFilter(FilePathFilter.createDefaultFilter())
    my_input_format.setNestedFileEnumeration(true)

    val my_raw_stream = streamEnv
            .readFile(my_input_format,
                my_path,
                FileProcessingMode.PROCESS_CONTINUOUSLY,
                1000)
/
The problem is, with the monitoring interval of 1,000ms as above, about 20%
of the files were missed. From Apache Flink Dashboard, at the subsequent
operators, I could only see ~80% of the total number of files recorded
("Records sent" column).

If I increase the monitoring interval, the number of missed files would
reduce. At 5,000ms, it is about 10%, and at 30,000ms, only about 2% missed.

No WARNING/ERROR recorded though.

I could not simulate this in HDFS, as I could not reach that high file
writing speed in our cluster.

Could someone please help. Thank you very much.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to