Hi Dominique,

In Flink 1.1 we've reworked the reading of static files in the DataStream
API.
There is now a method for passing any FileInputFormat:
readFile(fileInputFormat,
path, watchType, interval, pathFilter, typeInfo).
I guess you can pass a FileInputFormat with the recursive enumeration
enabled there.


Regards,
Robert


On Tue, Jul 12, 2016 at 6:30 PM, Dominique Rondé <
dominique.ro...@allsecur.de> wrote:

> Hi folks,
>
> on the first view I have a very simple problem. I like to get datasets out
> of some textfiles in HDFS and send them to a kafka topic. I use the
> following code to do that:
>
> DataStream<String> hdfsDatasource = env.readTextFile("hdfs://" +
> parameterTool.getRequired("hdfs_env") + "/user/flink/" +
> parameterTool.getRequired("hdfs_path") + "/");
> hdfsDatasource.addSink(new
> FlinkKafkaProducer08<String>(parameterTool.getRequired("brokerlist"),parameterTool.getRequired("topic"),new
> SimpleStringSchema()));
>
> Everything works fine. But I need a possibility to go recursive through
> the source folder and find textfiles in subfolders. For my batch routines
> it work fine with "recursive.file.enumeration", but in the streaming
> environment it is not possible to give these configuration to the
> readTextFile method.
>
> Can someone give me a hint ?
>
> Cheers
>
> Dominique
>
>

Reply via email to