Do you know the list of directories when you submit the job?

If so, then you can iterate over them, create a source for each directory, union them, and apply the sink to the union.

private static DataStream<String>createInputStream(StreamExecutionEnvironment 
environment, String directory) {
   TextInputFormat format =new TextInputFormat(new 
org.apache.flink.core.fs.Path(directory)); 
format.setNestedFileEnumeration(true); return environment.readFile(format, 
directory, FileProcessingMode.PROCESS_ONCE, -1, 
FilePathFilter.createDefaultFilter()); }

public static void runJob()throws Exception {
   StreamExecutionEnvironment environment = 
StreamExecutionEnvironment.getExecutionEnvironment(); List<String> directories 
=getDirectories(); DataStream<String> joinedStreams =null; for (String directory : 
directories) {
      DataStream<String> inputStream =createInputStream(environment, 
directory); if (joinedStreams ==null) {
         joinedStreams = inputStream; }else {
         joinedStreams.union(inputStream); }
   }
   // add a sanity check that there was at least 1 directory

   joinedStreams.addSink(kafka); }



On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
Hi Guys,

Got stuck with it please help me here
Regards,
Satya

On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit <satyaadi...@gmail.com> wrote:

Hi Guys,

Sorry to bother you again, but someone could help me here? Any help in
this regard will be much appreciated.

Regards,
Satya

On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <satyaadi...@gmail.com>
wrote:

Hi Guys,












I need one help, any leads will be highly appreciated.I have written a
flink streaming job to read the data from s3 bucket and push to kafka.
Below is the working source that deal with single s3 path:












TextInputFormat format = new TextInputFormat(new
org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));




format.setNestedFileEnumeration(true);












DataStream<String> inputStream = environment.readFile(format,
"s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,




FilePathFilter.createDefaultFilter());












inputStream.addSink(kafka);









But my requirement is get the list of paths and pass them one by one to
this environment.readFile() method.How we can achieve this.

















Thanks,




Satya



--
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913


--
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913


Reply via email to