Hi all,

    I came across this change
<https://issues.apache.org/jira/browse/FLINK-3655> that allows user to have
multiple file paths to read from in Flink. However, I have a question about
how to use this feature for StreamExecutionEnvironment.readFile(). It seems
in readFile, the input filePaths actually get overwritten here
<https://github.com/apache/flink/blob/6c7b195d57c3bad5bc1f2251de75ac744dbbe4a7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1322>.
So no matter what FileInputFormat I pass into it, the filePaths will just
get set to a single directory later. Just curious if I'm missing something
here?

This is a sample code I have:

// Read from S3 object to get the list of S3 paths.
final List<String> directoryList =
    getDirectoryList(someClient.getS3ObjectContentAsString(commonBucket,
directory.getKey()));

inputFormat = new TextInputFormat(new Path(inputBucketProperty));
inputFormat.setFilePaths((String[]) directoryList.toArray());
inputFormat.setNestedFileEnumeration(true);

streamEnv
    .readFile(inputFormat, "some path")
    .addSink(createSink());

streamEnv.execute(getClass().getSimpleName());

This is going to run on Kinesis Data Analytics, if that makes any
difference.

Thanks for the help, if any :)
-Jason

Reply via email to