Thank you @Chesnay let me try this change . On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <ches...@apache.org> wrote:
> You could also try using streams to make it a little more concise: > > directories.stream() > .map(directory -> createInputStream(environment, directory)) > .reduce(DataStream::union) > .map(joinedStream -> joinedStream.addSink(kafka)); > > > On 10/1/2020 9:48 AM, Chesnay Schepler wrote: > > Do you know the list of directories when you submit the job? > > If so, then you can iterate over them, create a source for each directory, > union them, and apply the sink to the union. > > private static > DataStream<String>createInputStream(StreamExecutionEnvironment environment, > String directory) { > TextInputFormat format =new TextInputFormat(new > org.apache.flink.core.fs.Path(directory)); > format.setNestedFileEnumeration(true); return environment.readFile(format, > directory, FileProcessingMode.PROCESS_ONCE, -1, > FilePathFilter.createDefaultFilter()); } > > public static void runJob()throws Exception { > StreamExecutionEnvironment environment = > StreamExecutionEnvironment.getExecutionEnvironment(); List<String> > directories =getDirectories(); DataStream<String> joinedStreams =null; for > (String directory : directories) { > DataStream<String> inputStream =createInputStream(environment, > directory); if (joinedStreams ==null) { > joinedStreams = inputStream; }else { > joinedStreams.union(inputStream); } > } > // add a sanity check that there was at least 1 directory > > joinedStreams.addSink(kafka); } > > > > On 10/1/2020 9:08 AM, Satyaa Dixit wrote: > > Hi Guys, > > Got stuck with it please help me here > Regards, > Satya > > On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit <satyaadi...@gmail.com> > <satyaadi...@gmail.com> wrote: > > Hi Guys, > > Sorry to bother you again, but someone could help me here? Any help in > this regard will be much appreciated. > > Regards, > Satya > > On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <satyaadi...@gmail.com> > <satyaadi...@gmail.com> > wrote: > > Hi Guys, > > > > > > > > > > > > > I need one help, any leads will be highly appreciated.I have written a > flink streaming job to read the data from s3 bucket and push to kafka. > Below is the working source that deal with single s3 path: > > > > > > > > > > > > > TextInputFormat format = new TextInputFormat(new > org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/")); > > > > > format.setNestedFileEnumeration(true); > > > > > > > > > > > > > DataStream<String> inputStream = environment.readFile(format, > "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1, > > > > > FilePathFilter.createDefaultFilter()); > > > > > > > > > > > > > inputStream.addSink(kafka); > > > > > > > > > > But my requirement is get the list of paths and pass them one by one to > this environment.readFile() method.How we can achieve this. > > > > > > > > > > > > > > > > > > Thanks, > > > > > Satya > > > > -- > -------------------------- > Best Regards > Satya Prakash > (M)+91-9845111913 > > > -- > > -------------------------- > Best Regards > Satya Prakash > (M)+91-9845111913 > > > > > -- -------------------------- Best Regards Satya Prakash (M)+91-9845111913