Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-09-29 Thread Satyaa Dixit
Hi Guys, I need one help, any leads will be highly appreciated.I have written a flink streaming job to read the data from s3 bucket and push to kafka. Below is the working source that deal with single s3 path: TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path("s3a://dir

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-09-29 Thread Satyaa Dixit
Hi Guys, Sorry to bother you again, but someone could help me here? Any help in this regard will be much appreciated. Regards, Satya On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit wrote: > Hi Guys, > I need one help, any leads will be highly appreciated.I have written a > flink streaming job to

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-10-01 Thread Satyaa Dixit
Hi Guys, Got stuck with it please help me here Regards, Satya On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit wrote: > Hi Guys, > > Sorry to bother you again, but someone could help me here? Any help in > this regard will be much appreciated. > > Regards, > Satya > > On Tue, Sep 29, 2020 at 2:57

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-10-01 Thread Chesnay Schepler
Do you know the list of directories when you submit the job? If so, then you can iterate over them, create a source for each directory, union them, and apply the sink to the union. private static DataStreamcreateInputStream(StreamExecutionEnvironment environment, String directory) { TextIn

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-10-01 Thread Chesnay Schepler
You could also try using streams to make it a little more concise: directories.stream() .map(directory ->createInputStream(environment, directory)) .reduce(DataStream::union) .map(joinedStream -> joinedStream.addSink(kafka)); On 10/1/2020 9:48 AM, Chesnay Schepler wrote: Do you know t

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-10-01 Thread Satyaa Dixit
Thank you @Chesnay let me try this change . On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler wrote: > You could also try using streams to make it a little more concise: > > directories.stream() >.map(directory -> createInputStream(environment, directory)) >.reduce(DataStream::union) >

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-10-04 Thread Satyaa Dixit
Hi @ches...@apache.org , Thanks for your support, it was really helpful. Do you know the list of directories when you submit the job? [Yes we do have] The impletemation is progress and will get back to you if any further challenges we may face. Appreciate your support in this regard. Regards, Sa

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-10-09 Thread Satyaa Dixit
Hi Shesnay/Team, Thank you so much for the reply.In the continuation of the previous email, below is the block diagram where I am reading the file from s3 and pushing it to kafka.Now with the current setup, I have total 4 directory based on the readfile method from flink environment ,we are cr

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-10-11 Thread Satyaa Dixit
Hi Team, Could you please help me here. I’m sorry for asking on such short notice but my work has stopped due to this. Regards, Satya On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit wrote: > Hi Shesnay/Team, > > Thank you so much for the reply.In the continuation of the previous email, > below i

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-10-12 Thread Chesnay Schepler
1) There's no mechanism in the API to restrict the number of  number of readers across several sources. I can't quite think of a way to achieve this; maybe Kostas has an idea. 2) You're mixing  up the Java Streams and Finks DataStream API. Try this: s3PathList.stream() .map(...) .reduce(...)

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-10-12 Thread Satyaa Dixit
Thanks, I'll check it out. On Mon, Oct 12, 2020 at 1:29 PM Chesnay Schepler wrote: > 1) There's no mechanism in the API to restrict the number of number of > readers across several sources. I can't quite think of a way to achieve > this; maybe Kostas has an idea. > > 2) You're mixing up the Ja

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-10-14 Thread Satyaa Dixit
Hi Chesnay/Team Thank you so much.I have tried with the solution but it is not working as expected showing compilation issues and tried all the ways .Please find below code snippet : s3PathList.stream() .map(directory -> S3Service.customCreateInputStream(environment, directory, readerParallelism)

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-10-15 Thread Satyaa Dixit
Hi Chesnay/Team, Thanks, we got the fix for our problem but got stuck with the below issue, request your support. How to catch FileNotFoundException during runtime,if any directory is missing in s3 as part of the below source code to avoid job failure. s3PathList.stream().map(directory -> S3Se

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-10-15 Thread Chesnay Schepler
hmm...I don't see an easy way. You may have to replicated StreamExecutionEnvironment#createFileInput and create a custom ContinuousFileMonitoringFunction that ignores missing files in it's run() method. Alternatively, use some library to check the existence of the S3 directories before creatin

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-10-22 Thread Satyaa Dixit
Hi Chesnay, Thanks for your support.It helped a lot. I need one more help on how to do checkpointing as part of the s3 reader source in case if some failure happens due to OutOfMemoryError exception or it could be any other failure, and want to recover the data from last reader splitted offset duri

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-10-22 Thread Chesnay Schepler
The existing ContinuousFileMonitoringFunction and ContinuousFileReaderOperator already take care of that. Unless you aren't re-implementing them from scratch you shouldn't have to do anything. On 10/22/2020 1:47 PM, Satyaa Dixit wrote: Hi Chesnay, Thanks for your support.It helped a lot. I nee

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-10-27 Thread Satyaa Dixit
Hi Chesnay & Team, I'm using already using "ContinuousFileMonitoringFunction" but still I'm not able to achieve the below use case. For example once job started and it process half of the data and in between job got failed because of below exception. how to avoid this exception? could you please

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-10-27 Thread Satyaa Dixit
In continuation of the above email, I have tried below code also but it is restarting the job from the beginning. environment.enableCheckpointing(3L); environment.disableOperatorChaining(); environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); environment.getCheckpointConfig()

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-10-27 Thread Chesnay Schepler
To fix the OutOfMemory error you will have to provide Flink with more memory, use more task executors or possibly reduce the parallelism. Did the job fail before the first checkpoint has occurred? What sink are you using? On 10/27/2020 12:45 PM, Satyaa Dixit wrote: In continuation of the above

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-10-27 Thread Satyaa Dixit
Hi Chesnay, Thanks for your prompt response. Sink : Kafka After the first checkpointing of the reader job failed, that's where duplication happened. Can we use the external checkpointing by using below code snipped? as part of reader source. *environment.setStateBackend(new FsStateBackend("s3://f