I have a requirement to read a file continously from a specific path. Means flink job should continously poll the specified location and read a file will arrive at this location at certains intervals .
Example: my location on windows machine is C:/inputfiles get a file file_1.txt at 2:00PM, file_2.txt at 2:30PM, file_3.txt at 3:00PM. To experimented this with below code . import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.io.FilePathFilter;import org.apache.flink.api.java.io.TextInputFormat;import org.apache.flink.core.fs.FileSystem;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.FileProcessingMode;import org.apache.flink.util.Collector; import java.util.Arrays;import java.util.List; public class ContinuousFileProcessingTest { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(10); String localFsURI = "D:\\FLink\\2021_01_01\\"; TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI)); format.setFilesFilter(FilePathFilter.createDefaultFilter()); DataStream<String> inputStream = env.readFile(format, localFsURI, FileProcessingMode.PROCESS_CONTINUOUSLY, 100); SingleOutputStreamOperator<String> soso = inputStream.map(String::toUpperCase); soso.print(); soso.writeAsText("D:\\FLink\\completed", FileSystem.WriteMode.OVERWRITE); env.execute("read and write"); } } I brought up flink cluster using flink's 1.9.2 and i was able to achieve my goal of readin file continously at some intervals. Flink's 1.9.2 version can bring up cluster on windows. But now i have to upgrade the flink's version from 1.9.2 to 1.12 .And we used docker to bring cluster up on 1.12 (unlike 1.9.2). Unlike windows path i changed the file location as per docker location but the same above program in not running there. Need help to find the solution. Thanks in advance. Thanks & Regards, Samir Vasani