Hi Venkata,

you can find them on the Hadoop AWS page (we are just using it as a
library) [1].

[1]
https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#General_S3A_Client_configuration

On Sat, Jun 6, 2020 at 1:26 AM venkata sateesh` kolluru <
vkollur...@gmail.com> wrote:

> Hi Kostas and Arvid,
>
> Thanks for your suggestions.
>
> The small files were already created and I am trying to roll few into a
> big file while sinking. But due to the custom bucket assigner, it is hard
> getting more files with in the same prefix in specified checkinpointing
> time.
>
> For example:
> <bucket>/prefix1/prefix2/YY/MM/DD/HH  is our structure in s3.
> checkpointing interval is 5 minutes. prefix1 has 40 different values and
> prefix 2 has 10000+ values
> With in the 5 minute interval, we are able to get part file size in these
> prefixes not more than 5-10 files.
>
> Regarding printstream, will figure out how to use SimpleStringEncoder on a
> Tuple as I only need to write tuple.f2 element in the file. If you can
> guide me on how to do it, I would appreciate it.
>
> Will try Arvid suggestion on increasing fs.s3a.connection.maximum . I was
> trying to find about these parameters and could find anywhere. Is there a
> place that I could look at these config params list ?
>
> Also I am using s3:// as prefix, would fs.s3a.connection.maximum affect
> that too or is there separate param like fs.s3.connection.maximum.
>
> On Fri, Jun 5, 2020 at 2:13 PM Kostas Kloudas <kklou...@gmail.com> wrote:
>
>> Hi all,
>>
>> @Venkata, Do you have many small files being created as Arvid suggested?
>> If yes, then I tend to agree that S3 is probably not the best sink.
>> Although I did not get that from your description.
>> In addition, instead of PrintStream you can have a look at the code of
>> the SimpleStringEncoder in Flink [1] for a bit more efficient
>> implementation.
>>
>> Cheers,
>> Kostas
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java
>>
>>
>> On Fri, Jun 5, 2020 at 7:56 PM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Venkata,
>>>
>>> are the many small files intended or is it rather an issue of our commit
>>> on checkpointing? If so then FLINK-11499 [1] should help you. Design is
>>> close to done, unfortunately implementation will not make it into 1.11.
>>>
>>> In any case, I'd look at the parameter fs.s3a.connection.maximum, as
>>> you store both state and data on S3. I'd probably go with slot*3 or even
>>> higher.
>>>
>>> Lastly, the way you output elements looks also a bit suspicious.
>>> PrintStream is not known for great performance. I'm also surprised that it
>>> works without manual flushing.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-11499
>>>
>>> On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke <jornfra...@gmail.com> wrote:
>>>
>>>> I think S3 is a wrong storage backend for this volumes of small
>>>> messages.
>>>> Try to use a NoSQL database or write multiple messages into one file in
>>>> S3 (10000 or 100000)
>>>>
>>>> If you still want to go with your scenario then try a network optimized
>>>> instance and use s3a in Flink and configure s3 entropy.
>>>>
>>>> Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru <
>>>> vkollur...@gmail.com>:
>>>>
>>>> 
>>>> Hi David,
>>>>
>>>> The avg size of each file is around 30KB and I have checkpoint interval
>>>> of 5 minutes. Some files are even 1 kb, because of checkpoint some files
>>>> are merged into 1 big file around 300MB.
>>>>
>>>> With 120 million files and 4Tb, if the rate of transfer is 300 per
>>>> minute, it is taking weeks to write to s3.
>>>>
>>>> I have tried to increase parallelism of sink but I dont see any
>>>> improvement.
>>>>
>>>> The sink record is Tuple3<String,String,String>, the actual content of
>>>> file is f2. This is content is written to <s3 bucket>/f0/f1/part*-*
>>>>
>>>> I guess the prefix determination in custombucketassigner wont be
>>>> causing this delay?
>>>>
>>>> Could you please shed some light on writing custom s3 sink ?
>>>>
>>>> Thanks
>>>>
>>>>
>>>> On Sun, May 31, 2020, 6:34 AM David Magalhães <speeddra...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Venkata.
>>>>>
>>>>> 300 requests per minute look like a 200ms per request, which should be
>>>>> a normal response time to send a file if there isn't any speed limitation
>>>>> (how big are the files?).
>>>>>
>>>>> Have you changed the parallelization to be higher than 1? I also
>>>>> recommend to limit the source parallelization, because it can consume
>>>>> pretty fast from Kafka and create some kind of backpressure.
>>>>>
>>>>> I don't any much experience with StreamingFileSink, because I've ended
>>>>> up using a custom S3Sink, but I did have some issues writing to S3 because
>>>>> the request wasn't parallelised. Check this thread,
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070
>>>>>
>>>>> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <
>>>>> vkollur...@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I have posted the same in stackoverflow but didnt get any response.
>>>>>> So posting it here for help.
>>>>>>
>>>>>>
>>>>>> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787
>>>>>>
>>>>>> Details:
>>>>>>
>>>>>> I am working on a flink application on kubernetes(eks) which consumes
>>>>>> data from kafka and write it to s3.
>>>>>>
>>>>>> We have around 120 million xml messages of size 4TB in kafka.
>>>>>> Consuming from kafka is super fast.
>>>>>>
>>>>>> These are just string messages from kafka.
>>>>>>
>>>>>> There is a high back pressure while writing to s3. We are not even
>>>>>> hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am
>>>>>> seeing only 300 writes per minute to S3 which is very slow.
>>>>>>
>>>>>> I am using StreamFileSink to write to s3 with Rolling policy as
>>>>>> OnCheckpointPolicy.
>>>>>>
>>>>>> Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a
>>>>>> or s3p)
>>>>>>
>>>>>> Other than this I dont have any config related to s3
>>>>>>
>>>>>>     StreamingFileSink<Tuple3<String,String, String>> sink = 
>>>>>> StreamingFileSink
>>>>>>             .forRowFormat(new Path(s3://BUCKET),
>>>>>>                     (Tuple3<String,String, String> element, OutputStream 
>>>>>> stream) -> {
>>>>>>                         PrintStream out = new PrintStream(stream);
>>>>>>                         out.println(element.f2);
>>>>>>                     })
>>>>>>             // Determine component type for each record
>>>>>>             .withBucketAssigner(new CustomBucketAssigner())
>>>>>>             .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>>>>>             .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
>>>>>>             .build();
>>>>>>
>>>>>> Is there anything that we can optimize on s3 from streamfilesink or
>>>>>> in flink-conf.xml ?
>>>>>>
>>>>>> Like using bulkformat or any config params like fs.s3.maxThreads etc.
>>>>>>
>>>>>> For checkpointing too I am using s3:// instead of s3p or s3a
>>>>>>
>>>>>> env.setStateBackend((StateBackend) new 
>>>>>> RocksDBStateBackend(s3://checkpoint_bucket, true));
>>>>>>         env.enableCheckpointing(300000);
>>>>>>
>>>>>>
>>>>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to