Thanks Arvid!

Will try to increase the property you recommended and will post the update.

On Sat, Jun 6, 2020, 7:33 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi Venkata,
>
> you can find them on the Hadoop AWS page (we are just using it as a
> library) [1].
>
> [1]
> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#General_S3A_Client_configuration
>
> On Sat, Jun 6, 2020 at 1:26 AM venkata sateesh` kolluru <
> vkollur...@gmail.com> wrote:
>
>> Hi Kostas and Arvid,
>>
>> Thanks for your suggestions.
>>
>> The small files were already created and I am trying to roll few into a
>> big file while sinking. But due to the custom bucket assigner, it is hard
>> getting more files with in the same prefix in specified checkinpointing
>> time.
>>
>> For example:
>> <bucket>/prefix1/prefix2/YY/MM/DD/HH  is our structure in s3.
>> checkpointing interval is 5 minutes. prefix1 has 40 different values and
>> prefix 2 has 10000+ values
>> With in the 5 minute interval, we are able to get part file size in these
>> prefixes not more than 5-10 files.
>>
>> Regarding printstream, will figure out how to use SimpleStringEncoder on
>> a Tuple as I only need to write tuple.f2 element in the file. If you can
>> guide me on how to do it, I would appreciate it.
>>
>> Will try Arvid suggestion on increasing fs.s3a.connection.maximum . I
>> was trying to find about these parameters and could find anywhere. Is there
>> a place that I could look at these config params list ?
>>
>> Also I am using s3:// as prefix, would fs.s3a.connection.maximum affect
>> that too or is there separate param like fs.s3.connection.maximum.
>>
>> On Fri, Jun 5, 2020 at 2:13 PM Kostas Kloudas <kklou...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> @Venkata, Do you have many small files being created as Arvid suggested?
>>> If yes, then I tend to agree that S3 is probably not the best sink.
>>> Although I did not get that from your description.
>>> In addition, instead of PrintStream you can have a look at the code of
>>> the SimpleStringEncoder in Flink [1] for a bit more efficient
>>> implementation.
>>>
>>> Cheers,
>>> Kostas
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java
>>>
>>>
>>> On Fri, Jun 5, 2020 at 7:56 PM Arvid Heise <ar...@ververica.com> wrote:
>>>
>>>> Hi Venkata,
>>>>
>>>> are the many small files intended or is it rather an issue of our
>>>> commit on checkpointing? If so then FLINK-11499 [1] should help you. Design
>>>> is close to done, unfortunately implementation will not make it into 1.11.
>>>>
>>>> In any case, I'd look at the parameter fs.s3a.connection.maximum, as
>>>> you store both state and data on S3. I'd probably go with slot*3 or even
>>>> higher.
>>>>
>>>> Lastly, the way you output elements looks also a bit suspicious.
>>>> PrintStream is not known for great performance. I'm also surprised that it
>>>> works without manual flushing.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-11499
>>>>
>>>> On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke <jornfra...@gmail.com>
>>>> wrote:
>>>>
>>>>> I think S3 is a wrong storage backend for this volumes of small
>>>>> messages.
>>>>> Try to use a NoSQL database or write multiple messages into one file
>>>>> in S3 (10000 or 100000)
>>>>>
>>>>> If you still want to go with your scenario then try a network
>>>>> optimized instance and use s3a in Flink and configure s3 entropy.
>>>>>
>>>>> Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru <
>>>>> vkollur...@gmail.com>:
>>>>>
>>>>> 
>>>>> Hi David,
>>>>>
>>>>> The avg size of each file is around 30KB and I have checkpoint
>>>>> interval of 5 minutes. Some files are even 1 kb, because of checkpoint 
>>>>> some
>>>>> files are merged into 1 big file around 300MB.
>>>>>
>>>>> With 120 million files and 4Tb, if the rate of transfer is 300 per
>>>>> minute, it is taking weeks to write to s3.
>>>>>
>>>>> I have tried to increase parallelism of sink but I dont see any
>>>>> improvement.
>>>>>
>>>>> The sink record is Tuple3<String,String,String>, the actual content of
>>>>> file is f2. This is content is written to <s3 bucket>/f0/f1/part*-*
>>>>>
>>>>> I guess the prefix determination in custombucketassigner wont be
>>>>> causing this delay?
>>>>>
>>>>> Could you please shed some light on writing custom s3 sink ?
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>> On Sun, May 31, 2020, 6:34 AM David Magalhães <speeddra...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Venkata.
>>>>>>
>>>>>> 300 requests per minute look like a 200ms per request, which should
>>>>>> be a normal response time to send a file if there isn't any speed
>>>>>> limitation (how big are the files?).
>>>>>>
>>>>>> Have you changed the parallelization to be higher than 1? I also
>>>>>> recommend to limit the source parallelization, because it can consume
>>>>>> pretty fast from Kafka and create some kind of backpressure.
>>>>>>
>>>>>> I don't any much experience with StreamingFileSink, because I've
>>>>>> ended up using a custom S3Sink, but I did have some issues writing to S3
>>>>>> because the request wasn't parallelised. Check this thread,
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070
>>>>>>
>>>>>> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <
>>>>>> vkollur...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I have posted the same in stackoverflow but didnt get any response.
>>>>>>> So posting it here for help.
>>>>>>>
>>>>>>>
>>>>>>> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787
>>>>>>>
>>>>>>> Details:
>>>>>>>
>>>>>>> I am working on a flink application on kubernetes(eks) which
>>>>>>> consumes data from kafka and write it to s3.
>>>>>>>
>>>>>>> We have around 120 million xml messages of size 4TB in kafka.
>>>>>>> Consuming from kafka is super fast.
>>>>>>>
>>>>>>> These are just string messages from kafka.
>>>>>>>
>>>>>>> There is a high back pressure while writing to s3. We are not even
>>>>>>> hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am
>>>>>>> seeing only 300 writes per minute to S3 which is very slow.
>>>>>>>
>>>>>>> I am using StreamFileSink to write to s3 with Rolling policy as
>>>>>>> OnCheckpointPolicy.
>>>>>>>
>>>>>>> Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a
>>>>>>> or s3p)
>>>>>>>
>>>>>>> Other than this I dont have any config related to s3
>>>>>>>
>>>>>>>     StreamingFileSink<Tuple3<String,String, String>> sink = 
>>>>>>> StreamingFileSink
>>>>>>>             .forRowFormat(new Path(s3://BUCKET),
>>>>>>>                     (Tuple3<String,String, String> element, 
>>>>>>> OutputStream stream) -> {
>>>>>>>                         PrintStream out = new PrintStream(stream);
>>>>>>>                         out.println(element.f2);
>>>>>>>                     })
>>>>>>>             // Determine component type for each record
>>>>>>>             .withBucketAssigner(new CustomBucketAssigner())
>>>>>>>             .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>>>>>>             .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
>>>>>>>             .build();
>>>>>>>
>>>>>>> Is there anything that we can optimize on s3 from streamfilesink or
>>>>>>> in flink-conf.xml ?
>>>>>>>
>>>>>>> Like using bulkformat or any config params like fs.s3.maxThreads etc.
>>>>>>>
>>>>>>> For checkpointing too I am using s3:// instead of s3p or s3a
>>>>>>>
>>>>>>> env.setStateBackend((StateBackend) new 
>>>>>>> RocksDBStateBackend(s3://checkpoint_bucket, true));
>>>>>>>         env.enableCheckpointing(300000);
>>>>>>>
>>>>>>>
>>>>>>>
>>>>
>>>> --
>>>>
>>>> Arvid Heise | Senior Java Developer
>>>>
>>>> <https://www.ververica.com/>
>>>>
>>>> Follow us @VervericaData
>>>>
>>>> --
>>>>
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>>
>>>> Stream Processing | Event Driven | Real Time
>>>>
>>>> --
>>>>
>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>
>>>> --
>>>> Ververica GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>>> (Toni) Cheng
>>>>
>>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Reply via email to