Hi Chesnay,

Thanks for your prompt response.
Sink : Kafka
After the first checkpointing of the reader job failed, that's where
duplication happened.

Can we use the external checkpointing by using below code snipped? as part
of reader source.
*environment.setStateBackend(new
FsStateBackend("s3://flink-test-directory/flink/checkpoints")); *

Regards,
Satya


On Tue, Oct 27, 2020 at 5:36 PM Chesnay Schepler <ches...@apache.org> wrote:

> To fix the OutOfMemory error you will have to provide Flink with more
> memory, use more task executors or possibly reduce the parallelism.
>
> Did the job fail before the first checkpoint has occurred?
>
> What sink are you using?
>
> On 10/27/2020 12:45 PM, Satyaa Dixit wrote:
>
> In continuation of the above email, I have tried below code also but it is
> restarting the job from the beginning.
>
> environment.enableCheckpointing(30000L);
> environment.disableOperatorChaining();
> environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
>
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> environment.getCheckpointConfig().setPreferCheckpointForRecovery(true);
>
> environment.setRestartStrategy(RestartStrategies.failureRateRestart(5, //
> max failures per interval
> Time.of(30, TimeUnit.MINUTES), // time interval for measuring failure rate
> Time.of(60, TimeUnit.SECONDS) // delay
> ));
>
> Please have a look into this as well.
>
> Regards,
> Satya
>
>
> On Tue, Oct 27, 2020 at 4:46 PM Satyaa Dixit <satyaadi...@gmail.com>
> wrote:
>
>> Hi Chesnay & Team,
>>
>> I'm using already using "ContinuousFileMonitoringFunction"  but still
>> I'm not able to achieve the below use case. For example once job started
>> and it process half of the data and in between job got failed because of
>> below exception. how to avoid this exception? could you please help us on
>> this too.
>>
>>
>>
>>
>>
>> *org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>> exception when processing split: [110]
>> s3://messages/qa/test-data/DEMO/0077/data/2020-09-03/10/2020-09-03_10:01:51_53807.jsonmod@
>> 1599127313000 : 0 + 345 at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357)
>> Caused by: java.lang.OutOfMemoryError: Java heap space*
>>
>> If I start the job back again, it is causing the data duplication. How to
>> fix this negative use case with flink s3 reader source using checkpointing?
>>
>> Regards,
>>
>> Satya
>>
>> On Thu, Oct 22, 2020 at 7:05 PM Chesnay Schepler <ches...@apache.org>
>> wrote:
>>
>>> The existing ContinuousFileMonitoringFunction and
>>> ContinuousFileReaderOperator already take care of that.
>>> Unless you aren't re-implementing them from scratch you shouldn't have
>>> to do anything.
>>>
>>> On 10/22/2020 1:47 PM, Satyaa Dixit wrote:
>>> > Hi Chesnay,
>>> > Thanks for your support.It helped a lot. I need one more help on how
>>> to do
>>> > checkpointing as part of the s3 reader source in case if some failure
>>> > happens due to OutOfMemoryError exception or it could be any other
>>> failure,
>>> > and want to recover the data from last reader splitted offset during
>>> > restart the job in continuation of the previous job in order to avoid
>>> > duplicate data.
>>> >
>>> > Thanks,
>>> > Satya
>>> >
>>> > On Thu, Oct 15, 2020 at 3:29 PM Chesnay Schepler <ches...@apache.org>
>>> wrote:
>>> >
>>> >> hmm...I don't see an easy way.
>>> >> You may have to replicated StreamExecutionEnvironment#createFileInput
>>> and
>>> >> create a custom ContinuousFileMonitoringFunction that ignores missing
>>> >> files in it's run() method.
>>> >>
>>> >> Alternatively, use some library to check the existence of the S3
>>> >> directories before creating the sources.
>>> >>
>>> >> On 10/15/2020 11:49 AM, Satyaa Dixit wrote:
>>> >>> Hi Chesnay/Team,
>>> >>>
>>> >>> Thanks, we got the fix for our problem but got stuck with the below
>>> >> issue,
>>> >>> request your support.
>>> >>>
>>> >>>
>>> >>> How to catch FileNotFoundException during runtime,if any directory is
>>> >>> missing in s3 as part of the below source code to avoid job failure.
>>> >>>
>>> >>>
>>> >>> s3PathList.stream().map(directory ->
>>> >>> S3Service.createInputStream(environment, directory,
>>> readerParallelism))
>>> >>> .reduce(DataStream::union).map(joinedStream ->
>>> >>> joinedStream.addSink(kafkaProducer));
>>> >>>
>>> >>>
>>> >>>
>>> >>>
>>> >>>
>>> >>> Regards,
>>> >>>
>>> >>> Satya
>>> >>>
>>> >>> On Wed, Oct 14, 2020 at 8:57 PM Satyaa Dixit <satyaadi...@gmail.com>
>>> >> wrote:
>>> >>>> Hi Chesnay/Team
>>> >>>>
>>> >>>> Thank you so much.I have tried with the solution but it is not
>>> working
>>> >> as
>>> >>>> expected showing compilation issues and tried all the ways .Please
>>> find
>>> >>>> below code snippet :
>>> >>>>
>>> >>>> s3PathList.stream()
>>> >>>> .map(directory -> S3Service.customCreateInputStream(environment,
>>> >>>> directory, readerParallelism))
>>> >>>> .reduce(DataStream::union).map(joinedStream -> stream.flatMap(new
>>> >>>> IntermidiateOperator()).map(joinedStream ->
>>> >>>> joinedStream.addSink(kafkaProducer).name("Publish to " +
>>> >> kafkaTopicName));
>>> >>>> public static class IntermidiateOperator implements
>>> >>>> FlatMapFunction<String, String> {
>>> >>>> private static final ObjectMapper objectMapper1 = new
>>> ObjectMapper();
>>> >>>>
>>> >>>> @Override
>>> >>>> public void flatMap(String value, Collector<String> out) throws
>>> >> Exception {
>>> >>>> Test m = objectMapper1.readValue(value, Test.class);
>>> >>>> System.out.println("Json string:: ------" + m);
>>> >>>> // logger.info("Json string:: ------"+m);
>>> >>>> out.collect(value);
>>> >>>> }
>>> >>>> }
>>> >>>>
>>> >>>> Also just to clarify one doubt , How to handle
>>> *FileNotFoundException*
>>> >> as
>>> >>>> part of flink reader during runtime if in case directory is not
>>> >> available
>>> >>>> in s3. How to avoid job failure in that use case.
>>> >>>>
>>> >>>> Regards,
>>> >>>> Satya
>>> >>>>
>>> >>>> On Tue, Oct 13, 2020 at 11:15 AM Satyaa Dixit <
>>> satyaadi...@gmail.com>
>>> >>>> wrote:
>>> >>>>
>>> >>>>> Thanks, I'll check it out.
>>> >>>>>
>>> >>>>> On Mon, Oct 12, 2020 at 1:29 PM Chesnay Schepler <
>>> ches...@apache.org>
>>> >>>>> wrote:
>>> >>>>>
>>> >>>>>> 1) There's no mechanism in the API to restrict the number of
>>> number
>>> >> of
>>> >>>>>> readers across several sources. I can't quite think of a way to
>>> >> achieve
>>> >>>>>> this; maybe Kostas has an idea.
>>> >>>>>>
>>> >>>>>> 2) You're mixing  up the Java Streams and Finks DataStream API.
>>> >>>>>>
>>> >>>>>> Try this:
>>> >>>>>>
>>> >>>>>> s3PathList.stream()
>>> >>>>>> .map(...)
>>> >>>>>> .reduce(...)
>>> >>>>>> .map(joinedStream -> stream.map(new FlatMapFunction...))
>>> >>>>>> .map(joinedStream->  joinedStream.addSink...)
>>> >>>>>>
>>> >>>>>> On 10/12/2020 6:05 AM, Satyaa Dixit wrote:
>>> >>>>>>
>>> >>>>>> Hi Team,
>>> >>>>>>
>>> >>>>>> Could you please help me here. I’m sorry for asking on such short
>>> >> notice
>>> >>>>>> but my work has stopped due to this.
>>> >>>>>>
>>> >>>>>>
>>> >>>>>> Regards,
>>> >>>>>> Satya
>>> >>>>>>
>>> >>>>>> On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit <
>>> satyaadi...@gmail.com>
>>> >>>>>> wrote:
>>> >>>>>>
>>> >>>>>>> Hi  Shesnay/Team,
>>> >>>>>>>
>>> >>>>>>> Thank you so much for the reply.In the continuation of the
>>> previous
>>> >>>>>>> email, below is the block diagram where I am reading the file
>>> from
>>> >> s3 and
>>> >>>>>>> pushing it to kafka.Now with the current setup, I have total 4
>>> >> directory
>>> >>>>>>> based on the readfile method  from flink environment ,we are
>>> >> creating 4
>>> >>>>>>> readers parallely to process the data from s3 .
>>> >>>>>>>
>>> >>>>>>> Below are my Questions:
>>> >>>>>>> 1. Can we restrict the no. of readers to process the  data
>>> parallely.
>>> >>>>>>> e.g let's say if  we have a thousand of directory , in that case
>>> i
>>> >> want to
>>> >>>>>>> restrict the no. of readers to 10 and ten parallel threads will
>>> >> continue
>>> >>>>>>> with 100 sequential reading of the directory per thread to
>>> consume
>>> >> the data
>>> >>>>>>> .
>>> >>>>>>>
>>> >>>>>>> 2.In between the two flink operators i.e s3 reader and kafka
>>> sink , i
>>> >>>>>>> just want to implement one more operator in order to transform
>>> the
>>> >> data
>>> >>>>>>> which i am reading from s3 bucket and then want to push into the
>>> >> kafka
>>> >>>>>>> sink. Below is my working code.Here i am finding  difficulties to
>>> >>>>>>> implement  map operator in order to transform the union of
>>> >> datastreams  by
>>> >>>>>>> applying union method over each directory's reader before
>>> pushing to
>>> >> kafka.
>>> >>>>>>> List<String> s3PathList =
>>> >> S3Service.getListOfS3Paths(finalParameters);
>>> >>>>>>> s3PathList.stream()
>>> >>>>>>> .map(directory -> S3Service.customInputStream(environment,
>>> directory,
>>> >>>>>>> readerParallelism))
>>> >>>>>>> .reduce(DataStream::union)
>>> >>>>>>> .map(joinedStream ->
>>> >> joinedStream.addSink(kafkaProducer).name("Publish
>>> >>>>>>> to " + kafkaTopicName));
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>> *Something like this I'm trying to do in order to achieve the
>>> above
>>> >> use
>>> >>>>>>> case by applying FlatMap, it could be map as well:*
>>> >>>>>>> s3PathList.stream()
>>> >>>>>>> .map(directory -> S3Service.customInputStream(environment,
>>> directory,
>>> >>>>>>> readerParallelism))
>>> >>>>>>> .reduce(DataStream::union).flatMap(new
>>> FlatMapFunction<DataStream,
>>> >>>>>>> String>() {
>>> >>>>>>>      @Override
>>> >>>>>>>      public void flatMap(String value, Collector<String> out)
>>> throws
>>> >>>>>>> Exception {
>>> >>>>>>>       FinalJsonMessage m=objectMapper.readValue(value,
>>> >>>>>>> FinalJsonMessage.class);
>>> >>>>>>>       System.out.println("Json string:: ------"+m);
>>> >>>>>>>        //transformation logic
>>> >>>>>>>        out.collect(value);
>>> >>>>>>>      }
>>> >>>>>>>    })
>>> >>>>>>> .map(joinedStream ->
>>> >> joinedStream.addSink(kafkaProducer).name("Publish
>>> >>>>>>> to " + kafkaTopicName));
>>> >>>>>>> [image: image.png]
>>> >>>>>>> Request your support on the same.
>>> >>>>>>> Regards,
>>> >>>>>>> Satya
>>> >>>>>>>
>>> >>>>>>> On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit <
>>> satyaadi...@gmail.com>
>>> >>>>>>> wrote:
>>> >>>>>>>
>>> >>>>>>>> Hi @ches...@apache.org <ches...@apache.org> ,
>>> >>>>>>>>
>>> >>>>>>>> Thanks for your support, it was really helpful.
>>> >>>>>>>> Do you know the list of directories when you submit the job?
>>> [Yes we
>>> >>>>>>>> do have]
>>> >>>>>>>> The impletemation is progress and will get back to you if any
>>> >> further
>>> >>>>>>>> challenges we may face.
>>> >>>>>>>> Appreciate your support in this regard.
>>> >>>>>>>>
>>> >>>>>>>> Regards,
>>> >>>>>>>> Satya
>>> >>>>>>>>
>>> >>>>>>>> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <
>>> satyaadi...@gmail.com>
>>> >>>>>>>> wrote:
>>> >>>>>>>>
>>> >>>>>>>>> Thank you @Chesnay let me try this change .
>>> >>>>>>>>>
>>> >>>>>>>>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <
>>> >> ches...@apache.org>
>>> >>>>>>>>> wrote:
>>> >>>>>>>>>
>>> >>>>>>>>>> You could also try using streams to make it a little more
>>> concise:
>>> >>>>>>>>>>
>>> >>>>>>>>>> directories.stream()
>>> >>>>>>>>>>      .map(directory -> createInputStream(environment,
>>> directory))
>>> >>>>>>>>>>      .reduce(DataStream::union)
>>> >>>>>>>>>>      .map(joinedStream -> joinedStream.addSink(kafka));
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
>>> >>>>>>>>>>
>>> >>>>>>>>>> Do you know the list of directories when you submit the job?
>>> >>>>>>>>>>
>>> >>>>>>>>>> If so, then you can iterate over them, create a source for
>>> each
>>> >>>>>>>>>> directory, union them, and apply the sink to the union.
>>> >>>>>>>>>>
>>> >>>>>>>>>> private static
>>> >>>>>>>>>> DataStream<String>createInputStream(StreamExecutionEnvironment
>>> >> environment,
>>> >>>>>>>>>> String directory) {
>>> >>>>>>>>>>      TextInputFormat format =new TextInputFormat(new
>>> >>>>>>>>>> org.apache.flink.core.fs.Path(directory));
>>> >>>>>>>>>> format.setNestedFileEnumeration(true); return
>>> >> environment.readFile(format,
>>> >>>>>>>>>> directory, FileProcessingMode.PROCESS_ONCE, -1,
>>> >>>>>>>>>> FilePathFilter.createDefaultFilter()); }
>>> >>>>>>>>>>
>>> >>>>>>>>>> public static void runJob()throws Exception {
>>> >>>>>>>>>>      StreamExecutionEnvironment environment =
>>> >>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> List<String>
>>> >>>>>>>>>> directories =getDirectories(); DataStream<String>
>>> joinedStreams
>>> >> =null; for
>>> >>>>>>>>>> (String directory : directories) {
>>> >>>>>>>>>>         DataStream<String> inputStream
>>> >> =createInputStream(environment,
>>> >>>>>>>>>> directory); if (joinedStreams ==null) {
>>> >>>>>>>>>>            joinedStreams = inputStream; }else {
>>> >>>>>>>>>>            joinedStreams.union(inputStream); }
>>> >>>>>>>>>>      }
>>> >>>>>>>>>>      // add a sanity check that there was at least 1 directory
>>> >>>>>>>>>>
>>> >>>>>>>>>>      joinedStreams.addSink(kafka); }
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
>>> >>>>>>>>>>
>>> >>>>>>>>>> Hi Guys,
>>> >>>>>>>>>>
>>> >>>>>>>>>> Got stuck with it please help me here
>>> >>>>>>>>>> Regards,
>>> >>>>>>>>>> Satya
>>> >>>>>>>>>>
>>> >>>>>>>>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit
>>> >>>>>>>>>> <satyaadi...@gmail.com> <satyaadi...@gmail.com> wrote:
>>> >>>>>>>>>>
>>> >>>>>>>>>> Hi Guys,
>>> >>>>>>>>>>
>>> >>>>>>>>>> Sorry to bother you again, but someone could help me here? Any
>>> >> help
>>> >>>>>>>>>> in
>>> >>>>>>>>>> this regard will be much appreciated.
>>> >>>>>>>>>>
>>> >>>>>>>>>> Regards,
>>> >>>>>>>>>> Satya
>>> >>>>>>>>>>
>>> >>>>>>>>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <
>>> >> satyaadi...@gmail.com>
>>> >>>>>>>>>> <satyaadi...@gmail.com>
>>> >>>>>>>>>> wrote:
>>> >>>>>>>>>>
>>> >>>>>>>>>> Hi Guys,
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> I need one help, any leads will be highly appreciated.I have
>>> >> written
>>> >>>>>>>>>> a
>>> >>>>>>>>>> flink streaming job to read the data from s3 bucket and push
>>> to
>>> >>>>>>>>>> kafka.
>>> >>>>>>>>>> Below is the working source that deal with single s3 path:
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> TextInputFormat format = new TextInputFormat(new
>>> >>>>>>>>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> format.setNestedFileEnumeration(true);
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> DataStream<String> inputStream = environment.readFile(format,
>>> >>>>>>>>>> "s3a://directory/2020-09-03/",
>>> FileProcessingMode.PROCESS_ONCE,
>>> >> -1,
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> FilePathFilter.createDefaultFilter());
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> inputStream.addSink(kafka);
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> But my requirement is get the list of paths and pass them one
>>> by
>>> >> one
>>> >>>>>>>>>> to
>>> >>>>>>>>>> this environment.readFile() method.How we can achieve this.
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> Thanks,
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> Satya
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> --
>>> >>>>>>>>>> --------------------------
>>> >>>>>>>>>> Best Regards
>>> >>>>>>>>>> Satya Prakash
>>> >>>>>>>>>> (M)+91-9845111913
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> --
>>> >>>>>>>>>>
>>> >>>>>>>>>> --------------------------
>>> >>>>>>>>>> Best Regards
>>> >>>>>>>>>> Satya Prakash
>>> >>>>>>>>>> (M)+91-9845111913
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>> --
>>> >>>>>>>>> --------------------------
>>> >>>>>>>>> Best Regards
>>> >>>>>>>>> Satya Prakash
>>> >>>>>>>>> (M)+91-9845111913
>>> >>>>>>>>>
>>> >>>>>>>> --
>>> >>>>>>>> --------------------------
>>> >>>>>>>> Best Regards
>>> >>>>>>>> Satya Prakash
>>> >>>>>>>> (M)+91-9845111913
>>> >>>>>>>>
>>> >>>>>>> --
>>> >>>>>>> --------------------------
>>> >>>>>>> Best Regards
>>> >>>>>>> Satya Prakash
>>> >>>>>>> (M)+91-9845111913
>>> >>>>>>>
>>> >>>>>> --
>>> >>>>>> --------------------------
>>> >>>>>> Best Regards
>>> >>>>>> Satya Prakash
>>> >>>>>> (M)+91-9845111913
>>> >>>>>>
>>> >>>>>>
>>> >>>>>>
>>> >>>>> --
>>> >>>>> --------------------------
>>> >>>>> Best Regards
>>> >>>>> Satya Prakash
>>> >>>>> (M)+91-9845111913
>>> >>>>>
>>> >>>> --
>>> >>>> --------------------------
>>> >>>> Best Regards
>>> >>>> Satya Prakash
>>> >>>> (M)+91-9845111913
>>> >>>>
>>> >>
>>>
>>>
>>
>> --
>> --------------------------
>> Best Regards
>> Satya Prakash
>> (M)+91-9845111913
>>
>
>
> --
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913
>
>
>

-- 
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913

Reply via email to