Hi Chesnay, Thanks for your prompt response. Sink : Kafka After the first checkpointing of the reader job failed, that's where duplication happened.
Can we use the external checkpointing by using below code snipped? as part of reader source. *environment.setStateBackend(new FsStateBackend("s3://flink-test-directory/flink/checkpoints")); * Regards, Satya On Tue, Oct 27, 2020 at 5:36 PM Chesnay Schepler <ches...@apache.org> wrote: > To fix the OutOfMemory error you will have to provide Flink with more > memory, use more task executors or possibly reduce the parallelism. > > Did the job fail before the first checkpoint has occurred? > > What sink are you using? > > On 10/27/2020 12:45 PM, Satyaa Dixit wrote: > > In continuation of the above email, I have tried below code also but it is > restarting the job from the beginning. > > environment.enableCheckpointing(30000L); > environment.disableOperatorChaining(); > environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); > > environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > environment.getCheckpointConfig().setPreferCheckpointForRecovery(true); > > environment.setRestartStrategy(RestartStrategies.failureRateRestart(5, // > max failures per interval > Time.of(30, TimeUnit.MINUTES), // time interval for measuring failure rate > Time.of(60, TimeUnit.SECONDS) // delay > )); > > Please have a look into this as well. > > Regards, > Satya > > > On Tue, Oct 27, 2020 at 4:46 PM Satyaa Dixit <satyaadi...@gmail.com> > wrote: > >> Hi Chesnay & Team, >> >> I'm using already using "ContinuousFileMonitoringFunction" but still >> I'm not able to achieve the below use case. For example once job started >> and it process half of the data and in between job got failed because of >> below exception. how to avoid this exception? could you please help us on >> this too. >> >> >> >> >> >> *org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught >> exception when processing split: [110] >> s3://messages/qa/test-data/DEMO/0077/data/2020-09-03/10/2020-09-03_10:01:51_53807.jsonmod@ >> 1599127313000 : 0 + 345 at >> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932) >> at >> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357) >> Caused by: java.lang.OutOfMemoryError: Java heap space* >> >> If I start the job back again, it is causing the data duplication. How to >> fix this negative use case with flink s3 reader source using checkpointing? >> >> Regards, >> >> Satya >> >> On Thu, Oct 22, 2020 at 7:05 PM Chesnay Schepler <ches...@apache.org> >> wrote: >> >>> The existing ContinuousFileMonitoringFunction and >>> ContinuousFileReaderOperator already take care of that. >>> Unless you aren't re-implementing them from scratch you shouldn't have >>> to do anything. >>> >>> On 10/22/2020 1:47 PM, Satyaa Dixit wrote: >>> > Hi Chesnay, >>> > Thanks for your support.It helped a lot. I need one more help on how >>> to do >>> > checkpointing as part of the s3 reader source in case if some failure >>> > happens due to OutOfMemoryError exception or it could be any other >>> failure, >>> > and want to recover the data from last reader splitted offset during >>> > restart the job in continuation of the previous job in order to avoid >>> > duplicate data. >>> > >>> > Thanks, >>> > Satya >>> > >>> > On Thu, Oct 15, 2020 at 3:29 PM Chesnay Schepler <ches...@apache.org> >>> wrote: >>> > >>> >> hmm...I don't see an easy way. >>> >> You may have to replicated StreamExecutionEnvironment#createFileInput >>> and >>> >> create a custom ContinuousFileMonitoringFunction that ignores missing >>> >> files in it's run() method. >>> >> >>> >> Alternatively, use some library to check the existence of the S3 >>> >> directories before creating the sources. >>> >> >>> >> On 10/15/2020 11:49 AM, Satyaa Dixit wrote: >>> >>> Hi Chesnay/Team, >>> >>> >>> >>> Thanks, we got the fix for our problem but got stuck with the below >>> >> issue, >>> >>> request your support. >>> >>> >>> >>> >>> >>> How to catch FileNotFoundException during runtime,if any directory is >>> >>> missing in s3 as part of the below source code to avoid job failure. >>> >>> >>> >>> >>> >>> s3PathList.stream().map(directory -> >>> >>> S3Service.createInputStream(environment, directory, >>> readerParallelism)) >>> >>> .reduce(DataStream::union).map(joinedStream -> >>> >>> joinedStream.addSink(kafkaProducer)); >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> Regards, >>> >>> >>> >>> Satya >>> >>> >>> >>> On Wed, Oct 14, 2020 at 8:57 PM Satyaa Dixit <satyaadi...@gmail.com> >>> >> wrote: >>> >>>> Hi Chesnay/Team >>> >>>> >>> >>>> Thank you so much.I have tried with the solution but it is not >>> working >>> >> as >>> >>>> expected showing compilation issues and tried all the ways .Please >>> find >>> >>>> below code snippet : >>> >>>> >>> >>>> s3PathList.stream() >>> >>>> .map(directory -> S3Service.customCreateInputStream(environment, >>> >>>> directory, readerParallelism)) >>> >>>> .reduce(DataStream::union).map(joinedStream -> stream.flatMap(new >>> >>>> IntermidiateOperator()).map(joinedStream -> >>> >>>> joinedStream.addSink(kafkaProducer).name("Publish to " + >>> >> kafkaTopicName)); >>> >>>> public static class IntermidiateOperator implements >>> >>>> FlatMapFunction<String, String> { >>> >>>> private static final ObjectMapper objectMapper1 = new >>> ObjectMapper(); >>> >>>> >>> >>>> @Override >>> >>>> public void flatMap(String value, Collector<String> out) throws >>> >> Exception { >>> >>>> Test m = objectMapper1.readValue(value, Test.class); >>> >>>> System.out.println("Json string:: ------" + m); >>> >>>> // logger.info("Json string:: ------"+m); >>> >>>> out.collect(value); >>> >>>> } >>> >>>> } >>> >>>> >>> >>>> Also just to clarify one doubt , How to handle >>> *FileNotFoundException* >>> >> as >>> >>>> part of flink reader during runtime if in case directory is not >>> >> available >>> >>>> in s3. How to avoid job failure in that use case. >>> >>>> >>> >>>> Regards, >>> >>>> Satya >>> >>>> >>> >>>> On Tue, Oct 13, 2020 at 11:15 AM Satyaa Dixit < >>> satyaadi...@gmail.com> >>> >>>> wrote: >>> >>>> >>> >>>>> Thanks, I'll check it out. >>> >>>>> >>> >>>>> On Mon, Oct 12, 2020 at 1:29 PM Chesnay Schepler < >>> ches...@apache.org> >>> >>>>> wrote: >>> >>>>> >>> >>>>>> 1) There's no mechanism in the API to restrict the number of >>> number >>> >> of >>> >>>>>> readers across several sources. I can't quite think of a way to >>> >> achieve >>> >>>>>> this; maybe Kostas has an idea. >>> >>>>>> >>> >>>>>> 2) You're mixing up the Java Streams and Finks DataStream API. >>> >>>>>> >>> >>>>>> Try this: >>> >>>>>> >>> >>>>>> s3PathList.stream() >>> >>>>>> .map(...) >>> >>>>>> .reduce(...) >>> >>>>>> .map(joinedStream -> stream.map(new FlatMapFunction...)) >>> >>>>>> .map(joinedStream-> joinedStream.addSink...) >>> >>>>>> >>> >>>>>> On 10/12/2020 6:05 AM, Satyaa Dixit wrote: >>> >>>>>> >>> >>>>>> Hi Team, >>> >>>>>> >>> >>>>>> Could you please help me here. I’m sorry for asking on such short >>> >> notice >>> >>>>>> but my work has stopped due to this. >>> >>>>>> >>> >>>>>> >>> >>>>>> Regards, >>> >>>>>> Satya >>> >>>>>> >>> >>>>>> On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit < >>> satyaadi...@gmail.com> >>> >>>>>> wrote: >>> >>>>>> >>> >>>>>>> Hi Shesnay/Team, >>> >>>>>>> >>> >>>>>>> Thank you so much for the reply.In the continuation of the >>> previous >>> >>>>>>> email, below is the block diagram where I am reading the file >>> from >>> >> s3 and >>> >>>>>>> pushing it to kafka.Now with the current setup, I have total 4 >>> >> directory >>> >>>>>>> based on the readfile method from flink environment ,we are >>> >> creating 4 >>> >>>>>>> readers parallely to process the data from s3 . >>> >>>>>>> >>> >>>>>>> Below are my Questions: >>> >>>>>>> 1. Can we restrict the no. of readers to process the data >>> parallely. >>> >>>>>>> e.g let's say if we have a thousand of directory , in that case >>> i >>> >> want to >>> >>>>>>> restrict the no. of readers to 10 and ten parallel threads will >>> >> continue >>> >>>>>>> with 100 sequential reading of the directory per thread to >>> consume >>> >> the data >>> >>>>>>> . >>> >>>>>>> >>> >>>>>>> 2.In between the two flink operators i.e s3 reader and kafka >>> sink , i >>> >>>>>>> just want to implement one more operator in order to transform >>> the >>> >> data >>> >>>>>>> which i am reading from s3 bucket and then want to push into the >>> >> kafka >>> >>>>>>> sink. Below is my working code.Here i am finding difficulties to >>> >>>>>>> implement map operator in order to transform the union of >>> >> datastreams by >>> >>>>>>> applying union method over each directory's reader before >>> pushing to >>> >> kafka. >>> >>>>>>> List<String> s3PathList = >>> >> S3Service.getListOfS3Paths(finalParameters); >>> >>>>>>> s3PathList.stream() >>> >>>>>>> .map(directory -> S3Service.customInputStream(environment, >>> directory, >>> >>>>>>> readerParallelism)) >>> >>>>>>> .reduce(DataStream::union) >>> >>>>>>> .map(joinedStream -> >>> >> joinedStream.addSink(kafkaProducer).name("Publish >>> >>>>>>> to " + kafkaTopicName)); >>> >>>>>>> >>> >>>>>>> >>> >>>>>>> *Something like this I'm trying to do in order to achieve the >>> above >>> >> use >>> >>>>>>> case by applying FlatMap, it could be map as well:* >>> >>>>>>> s3PathList.stream() >>> >>>>>>> .map(directory -> S3Service.customInputStream(environment, >>> directory, >>> >>>>>>> readerParallelism)) >>> >>>>>>> .reduce(DataStream::union).flatMap(new >>> FlatMapFunction<DataStream, >>> >>>>>>> String>() { >>> >>>>>>> @Override >>> >>>>>>> public void flatMap(String value, Collector<String> out) >>> throws >>> >>>>>>> Exception { >>> >>>>>>> FinalJsonMessage m=objectMapper.readValue(value, >>> >>>>>>> FinalJsonMessage.class); >>> >>>>>>> System.out.println("Json string:: ------"+m); >>> >>>>>>> //transformation logic >>> >>>>>>> out.collect(value); >>> >>>>>>> } >>> >>>>>>> }) >>> >>>>>>> .map(joinedStream -> >>> >> joinedStream.addSink(kafkaProducer).name("Publish >>> >>>>>>> to " + kafkaTopicName)); >>> >>>>>>> [image: image.png] >>> >>>>>>> Request your support on the same. >>> >>>>>>> Regards, >>> >>>>>>> Satya >>> >>>>>>> >>> >>>>>>> On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit < >>> satyaadi...@gmail.com> >>> >>>>>>> wrote: >>> >>>>>>> >>> >>>>>>>> Hi @ches...@apache.org <ches...@apache.org> , >>> >>>>>>>> >>> >>>>>>>> Thanks for your support, it was really helpful. >>> >>>>>>>> Do you know the list of directories when you submit the job? >>> [Yes we >>> >>>>>>>> do have] >>> >>>>>>>> The impletemation is progress and will get back to you if any >>> >> further >>> >>>>>>>> challenges we may face. >>> >>>>>>>> Appreciate your support in this regard. >>> >>>>>>>> >>> >>>>>>>> Regards, >>> >>>>>>>> Satya >>> >>>>>>>> >>> >>>>>>>> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit < >>> satyaadi...@gmail.com> >>> >>>>>>>> wrote: >>> >>>>>>>> >>> >>>>>>>>> Thank you @Chesnay let me try this change . >>> >>>>>>>>> >>> >>>>>>>>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler < >>> >> ches...@apache.org> >>> >>>>>>>>> wrote: >>> >>>>>>>>> >>> >>>>>>>>>> You could also try using streams to make it a little more >>> concise: >>> >>>>>>>>>> >>> >>>>>>>>>> directories.stream() >>> >>>>>>>>>> .map(directory -> createInputStream(environment, >>> directory)) >>> >>>>>>>>>> .reduce(DataStream::union) >>> >>>>>>>>>> .map(joinedStream -> joinedStream.addSink(kafka)); >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote: >>> >>>>>>>>>> >>> >>>>>>>>>> Do you know the list of directories when you submit the job? >>> >>>>>>>>>> >>> >>>>>>>>>> If so, then you can iterate over them, create a source for >>> each >>> >>>>>>>>>> directory, union them, and apply the sink to the union. >>> >>>>>>>>>> >>> >>>>>>>>>> private static >>> >>>>>>>>>> DataStream<String>createInputStream(StreamExecutionEnvironment >>> >> environment, >>> >>>>>>>>>> String directory) { >>> >>>>>>>>>> TextInputFormat format =new TextInputFormat(new >>> >>>>>>>>>> org.apache.flink.core.fs.Path(directory)); >>> >>>>>>>>>> format.setNestedFileEnumeration(true); return >>> >> environment.readFile(format, >>> >>>>>>>>>> directory, FileProcessingMode.PROCESS_ONCE, -1, >>> >>>>>>>>>> FilePathFilter.createDefaultFilter()); } >>> >>>>>>>>>> >>> >>>>>>>>>> public static void runJob()throws Exception { >>> >>>>>>>>>> StreamExecutionEnvironment environment = >>> >>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> List<String> >>> >>>>>>>>>> directories =getDirectories(); DataStream<String> >>> joinedStreams >>> >> =null; for >>> >>>>>>>>>> (String directory : directories) { >>> >>>>>>>>>> DataStream<String> inputStream >>> >> =createInputStream(environment, >>> >>>>>>>>>> directory); if (joinedStreams ==null) { >>> >>>>>>>>>> joinedStreams = inputStream; }else { >>> >>>>>>>>>> joinedStreams.union(inputStream); } >>> >>>>>>>>>> } >>> >>>>>>>>>> // add a sanity check that there was at least 1 directory >>> >>>>>>>>>> >>> >>>>>>>>>> joinedStreams.addSink(kafka); } >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote: >>> >>>>>>>>>> >>> >>>>>>>>>> Hi Guys, >>> >>>>>>>>>> >>> >>>>>>>>>> Got stuck with it please help me here >>> >>>>>>>>>> Regards, >>> >>>>>>>>>> Satya >>> >>>>>>>>>> >>> >>>>>>>>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit >>> >>>>>>>>>> <satyaadi...@gmail.com> <satyaadi...@gmail.com> wrote: >>> >>>>>>>>>> >>> >>>>>>>>>> Hi Guys, >>> >>>>>>>>>> >>> >>>>>>>>>> Sorry to bother you again, but someone could help me here? Any >>> >> help >>> >>>>>>>>>> in >>> >>>>>>>>>> this regard will be much appreciated. >>> >>>>>>>>>> >>> >>>>>>>>>> Regards, >>> >>>>>>>>>> Satya >>> >>>>>>>>>> >>> >>>>>>>>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit < >>> >> satyaadi...@gmail.com> >>> >>>>>>>>>> <satyaadi...@gmail.com> >>> >>>>>>>>>> wrote: >>> >>>>>>>>>> >>> >>>>>>>>>> Hi Guys, >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> I need one help, any leads will be highly appreciated.I have >>> >> written >>> >>>>>>>>>> a >>> >>>>>>>>>> flink streaming job to read the data from s3 bucket and push >>> to >>> >>>>>>>>>> kafka. >>> >>>>>>>>>> Below is the working source that deal with single s3 path: >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> TextInputFormat format = new TextInputFormat(new >>> >>>>>>>>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/")); >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> format.setNestedFileEnumeration(true); >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> DataStream<String> inputStream = environment.readFile(format, >>> >>>>>>>>>> "s3a://directory/2020-09-03/", >>> FileProcessingMode.PROCESS_ONCE, >>> >> -1, >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> FilePathFilter.createDefaultFilter()); >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> inputStream.addSink(kafka); >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> But my requirement is get the list of paths and pass them one >>> by >>> >> one >>> >>>>>>>>>> to >>> >>>>>>>>>> this environment.readFile() method.How we can achieve this. >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> Thanks, >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> Satya >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> -- >>> >>>>>>>>>> -------------------------- >>> >>>>>>>>>> Best Regards >>> >>>>>>>>>> Satya Prakash >>> >>>>>>>>>> (M)+91-9845111913 >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> -- >>> >>>>>>>>>> >>> >>>>>>>>>> -------------------------- >>> >>>>>>>>>> Best Regards >>> >>>>>>>>>> Satya Prakash >>> >>>>>>>>>> (M)+91-9845111913 >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>> -- >>> >>>>>>>>> -------------------------- >>> >>>>>>>>> Best Regards >>> >>>>>>>>> Satya Prakash >>> >>>>>>>>> (M)+91-9845111913 >>> >>>>>>>>> >>> >>>>>>>> -- >>> >>>>>>>> -------------------------- >>> >>>>>>>> Best Regards >>> >>>>>>>> Satya Prakash >>> >>>>>>>> (M)+91-9845111913 >>> >>>>>>>> >>> >>>>>>> -- >>> >>>>>>> -------------------------- >>> >>>>>>> Best Regards >>> >>>>>>> Satya Prakash >>> >>>>>>> (M)+91-9845111913 >>> >>>>>>> >>> >>>>>> -- >>> >>>>>> -------------------------- >>> >>>>>> Best Regards >>> >>>>>> Satya Prakash >>> >>>>>> (M)+91-9845111913 >>> >>>>>> >>> >>>>>> >>> >>>>>> >>> >>>>> -- >>> >>>>> -------------------------- >>> >>>>> Best Regards >>> >>>>> Satya Prakash >>> >>>>> (M)+91-9845111913 >>> >>>>> >>> >>>> -- >>> >>>> -------------------------- >>> >>>> Best Regards >>> >>>> Satya Prakash >>> >>>> (M)+91-9845111913 >>> >>>> >>> >> >>> >>> >> >> -- >> -------------------------- >> Best Regards >> Satya Prakash >> (M)+91-9845111913 >> > > > -- > -------------------------- > Best Regards > Satya Prakash > (M)+91-9845111913 > > > -- -------------------------- Best Regards Satya Prakash (M)+91-9845111913