I have a requirement to read a file continously from a specific path.

Means flink job should continously poll the specified location and read a
file will arrive at this location at certains intervals .

Example: my location on windows machine is C:/inputfiles get a file
file_1.txt at 2:00PM, file_2.txt at 2:30PM, file_3.txt at 3:00PM.

To experimented this with below code .

import org.apache.flink.api.common.functions.FlatMapFunction;import
org.apache.flink.api.common.io.FilePathFilter;import
org.apache.flink.api.java.io.TextInputFormat;import
org.apache.flink.core.fs.FileSystem;import
org.apache.flink.streaming.api.datastream.DataStream;import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
org.apache.flink.streaming.api.functions.source.FileProcessingMode;import
org.apache.flink.util.Collector;
import java.util.Arrays;import java.util.List;
public class ContinuousFileProcessingTest {
public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(10);
    String localFsURI = "D:\\FLink\\2021_01_01\\";
    TextInputFormat format = new TextInputFormat(new
org.apache.flink.core.fs.Path(localFsURI));
    format.setFilesFilter(FilePathFilter.createDefaultFilter());
    DataStream<String> inputStream =
            env.readFile(format, localFsURI,
FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
    SingleOutputStreamOperator<String> soso =
inputStream.map(String::toUpperCase);
    soso.print();
    soso.writeAsText("D:\\FLink\\completed", FileSystem.WriteMode.OVERWRITE);
    env.execute("read and write");
}
}



I brought up flink cluster using flink's 1.9.2 and i was able to
achieve my goal of readin file continously at some intervals.

Flink's 1.9.2 version can bring up cluster on windows.

But now i have to upgrade the flink's version from 1.9.2 to 1.12 .And
we used docker to bring cluster up on 1.12 (unlike 1.9.2).

Unlike windows path i changed the file location as per docker location
but the same above program in not running there.

Need help to find the solution.

Thanks in advance.


Thanks & Regards,
Samir Vasani

Reply via email to