Hi Shesnay/Team,
Thank you so much for the reply.In the continuation of the previous email,
below is the block diagram where I am reading the file from s3 and pushing
it to kafka.Now with the current setup, I have total 4 directory based on
the readfile method from flink environment ,we are creating 4 readers
parallely to process the data from s3 .
Below are my Questions:
1. Can we restrict the no. of readers to process the data parallely. e.g
let's say if we have a thousand of directory , in that case i want to
restrict the no. of readers to 10 and ten parallel threads will continue
with 100 sequential reading of the directory per thread to consume the data
.
2.In between the two flink operators i.e s3 reader and kafka sink , i just
want to implement one more operator in order to transform the data which i
am reading from s3 bucket and then want to push into the kafka sink. Below
is my working code.Here i am finding difficulties to implement map
operator in order to transform the union of datastreams by applying union
method over each directory's reader before pushing to kafka.
List<String> s3PathList = S3Service.getListOfS3Paths(finalParameters);
s3PathList.stream()
.map(directory -> S3Service.customInputStream(environment, directory,
readerParallelism))
.reduce(DataStream::union)
.map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish to "
+ kafkaTopicName));
*Something like this I'm trying to do in order to achieve the above use
case by applying FlatMap, it could be map as well:*
s3PathList.stream()
.map(directory -> S3Service.customInputStream(environment, directory,
readerParallelism))
.reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream,
String>() {
@Override
public void flatMap(String value, Collector<String> out) throws
Exception {
FinalJsonMessage m=objectMapper.readValue(value,
FinalJsonMessage.class);
System.out.println("Json string:: ------"+m);
//transformation logic
out.collect(value);
}
})
.map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish to "
+ kafkaTopicName));
[image: image.png]
Request your support on the same.
Regards,
Satya
On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit <[email protected]> wrote:
> Hi @[email protected] <[email protected]> ,
>
> Thanks for your support, it was really helpful.
> Do you know the list of directories when you submit the job? [Yes we do
> have]
> The impletemation is progress and will get back to you if any further
> challenges we may face.
> Appreciate your support in this regard.
>
> Regards,
> Satya
>
> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <[email protected]> wrote:
>
>> Thank you @Chesnay let me try this change .
>>
>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <[email protected]>
>> wrote:
>>
>>> You could also try using streams to make it a little more concise:
>>>
>>> directories.stream()
>>> .map(directory -> createInputStream(environment, directory))
>>> .reduce(DataStream::union)
>>> .map(joinedStream -> joinedStream.addSink(kafka));
>>>
>>>
>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
>>>
>>> Do you know the list of directories when you submit the job?
>>>
>>> If so, then you can iterate over them, create a source for each
>>> directory, union them, and apply the sink to the union.
>>>
>>> private static
>>> DataStream<String>createInputStream(StreamExecutionEnvironment environment,
>>> String directory) {
>>> TextInputFormat format =new TextInputFormat(new
>>> org.apache.flink.core.fs.Path(directory));
>>> format.setNestedFileEnumeration(true); return environment.readFile(format,
>>> directory, FileProcessingMode.PROCESS_ONCE, -1,
>>> FilePathFilter.createDefaultFilter()); }
>>>
>>> public static void runJob()throws Exception {
>>> StreamExecutionEnvironment environment =
>>> StreamExecutionEnvironment.getExecutionEnvironment(); List<String>
>>> directories =getDirectories(); DataStream<String> joinedStreams =null; for
>>> (String directory : directories) {
>>> DataStream<String> inputStream =createInputStream(environment,
>>> directory); if (joinedStreams ==null) {
>>> joinedStreams = inputStream; }else {
>>> joinedStreams.union(inputStream); }
>>> }
>>> // add a sanity check that there was at least 1 directory
>>>
>>> joinedStreams.addSink(kafka); }
>>>
>>>
>>>
>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
>>>
>>> Hi Guys,
>>>
>>> Got stuck with it please help me here
>>> Regards,
>>> Satya
>>>
>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit <[email protected]>
>>> <[email protected]> wrote:
>>>
>>> Hi Guys,
>>>
>>> Sorry to bother you again, but someone could help me here? Any help in
>>> this regard will be much appreciated.
>>>
>>> Regards,
>>> Satya
>>>
>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <[email protected]>
>>> <[email protected]>
>>> wrote:
>>>
>>> Hi Guys,
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> I need one help, any leads will be highly appreciated.I have written a
>>> flink streaming job to read the data from s3 bucket and push to kafka.
>>> Below is the working source that deal with single s3 path:
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> TextInputFormat format = new TextInputFormat(new
>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>>>
>>>
>>>
>>>
>>> format.setNestedFileEnumeration(true);
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> DataStream<String> inputStream = environment.readFile(format,
>>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,
>>>
>>>
>>>
>>>
>>> FilePathFilter.createDefaultFilter());
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> inputStream.addSink(kafka);
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> But my requirement is get the list of paths and pass them one by one to
>>> this environment.readFile() method.How we can achieve this.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Thanks,
>>>
>>>
>>>
>>>
>>> Satya
>>>
>>>
>>>
>>> --
>>> --------------------------
>>> Best Regards
>>> Satya Prakash
>>> (M)+91-9845111913
>>>
>>>
>>> --
>>>
>>> --------------------------
>>> Best Regards
>>> Satya Prakash
>>> (M)+91-9845111913
>>>
>>>
>>>
>>>
>>>
>>
>> --
>> --------------------------
>> Best Regards
>> Satya Prakash
>> (M)+91-9845111913
>>
>
>
> --
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913
>
--
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913