Thank you @Chesnay let me try this change .

On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <ches...@apache.org> wrote:

> You could also try using streams to make it a little more concise:
>
> directories.stream()
>    .map(directory -> createInputStream(environment, directory))
>    .reduce(DataStream::union)
>    .map(joinedStream -> joinedStream.addSink(kafka));
>
>
> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
>
> Do you know the list of directories when you submit the job?
>
> If so, then you can iterate over them, create a source for each directory,
> union them, and apply the sink to the union.
>
> private static
> DataStream<String>createInputStream(StreamExecutionEnvironment environment,
> String directory) {
>    TextInputFormat format =new TextInputFormat(new
> org.apache.flink.core.fs.Path(directory));
> format.setNestedFileEnumeration(true); return environment.readFile(format,
> directory, FileProcessingMode.PROCESS_ONCE, -1,
> FilePathFilter.createDefaultFilter()); }
>
> public static void runJob()throws Exception {
>    StreamExecutionEnvironment environment =
> StreamExecutionEnvironment.getExecutionEnvironment(); List<String>
> directories =getDirectories(); DataStream<String> joinedStreams =null; for
> (String directory : directories) {
>       DataStream<String> inputStream =createInputStream(environment,
> directory); if (joinedStreams ==null) {
>          joinedStreams = inputStream; }else {
>          joinedStreams.union(inputStream); }
>    }
>    // add a sanity check that there was at least 1 directory
>
>    joinedStreams.addSink(kafka); }
>
>
>
> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
>
> Hi Guys,
>
> Got stuck with it please help me here
> Regards,
> Satya
>
> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit <satyaadi...@gmail.com>
> <satyaadi...@gmail.com> wrote:
>
> Hi Guys,
>
> Sorry to bother you again, but someone could help me here? Any help in
> this regard will be much appreciated.
>
> Regards,
> Satya
>
> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <satyaadi...@gmail.com>
> <satyaadi...@gmail.com>
> wrote:
>
> Hi Guys,
>
>
>
>
>
>
>
>
>
>
>
>
> I need one help, any leads will be highly appreciated.I have written a
> flink streaming job to read the data from s3 bucket and push to kafka.
> Below is the working source that deal with single s3 path:
>
>
>
>
>
>
>
>
>
>
>
>
> TextInputFormat format = new TextInputFormat(new
> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>
>
>
>
> format.setNestedFileEnumeration(true);
>
>
>
>
>
>
>
>
>
>
>
>
> DataStream<String> inputStream = environment.readFile(format,
> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,
>
>
>
>
> FilePathFilter.createDefaultFilter());
>
>
>
>
>
>
>
>
>
>
>
>
> inputStream.addSink(kafka);
>
>
>
>
>
>
>
>
>
> But my requirement is get the list of paths and pass them one by one to
> this environment.readFile() method.How we can achieve this.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Thanks,
>
>
>
>
> Satya
>
>
>
> --
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913
>
>
> --
>
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913
>
>
>
>
>

-- 
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913

Reply via email to