Re: Flink s3 streaming performance

2020-06-06 Thread venkata sateesh` kolluru
Thanks Arvid!

Will try to increase the property you recommended and will post the update.

On Sat, Jun 6, 2020, 7:33 AM Arvid Heise  wrote:

> Hi Venkata,
>
> you can find them on the Hadoop AWS page (we are just using it as a
> library) [1].
>
> [1]
> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#General_S3A_Client_configuration
>
> On Sat, Jun 6, 2020 at 1:26 AM venkata sateesh` kolluru <
> vkollur...@gmail.com> wrote:
>
>> Hi Kostas and Arvid,
>>
>> Thanks for your suggestions.
>>
>> The small files were already created and I am trying to roll few into a
>> big file while sinking. But due to the custom bucket assigner, it is hard
>> getting more files with in the same prefix in specified checkinpointing
>> time.
>>
>> For example:
>> /prefix1/prefix2/YY/MM/DD/HH  is our structure in s3.
>> checkpointing interval is 5 minutes. prefix1 has 40 different values and
>> prefix 2 has 1+ values
>> With in the 5 minute interval, we are able to get part file size in these
>> prefixes not more than 5-10 files.
>>
>> Regarding printstream, will figure out how to use SimpleStringEncoder on
>> a Tuple as I only need to write tuple.f2 element in the file. If you can
>> guide me on how to do it, I would appreciate it.
>>
>> Will try Arvid suggestion on increasing fs.s3a.connection.maximum . I
>> was trying to find about these parameters and could find anywhere. Is there
>> a place that I could look at these config params list ?
>>
>> Also I am using s3:// as prefix, would fs.s3a.connection.maximum affect
>> that too or is there separate param like fs.s3.connection.maximum.
>>
>> On Fri, Jun 5, 2020 at 2:13 PM Kostas Kloudas  wrote:
>>
>>> Hi all,
>>>
>>> @Venkata, Do you have many small files being created as Arvid suggested?
>>> If yes, then I tend to agree that S3 is probably not the best sink.
>>> Although I did not get that from your description.
>>> In addition, instead of PrintStream you can have a look at the code of
>>> the SimpleStringEncoder in Flink [1] for a bit more efficient
>>> implementation.
>>>
>>> Cheers,
>>> Kostas
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java
>>>
>>>
>>> On Fri, Jun 5, 2020 at 7:56 PM Arvid Heise  wrote:
>>>
 Hi Venkata,

 are the many small files intended or is it rather an issue of our
 commit on checkpointing? If so then FLINK-11499 [1] should help you. Design
 is close to done, unfortunately implementation will not make it into 1.11.

 In any case, I'd look at the parameter fs.s3a.connection.maximum, as
 you store both state and data on S3. I'd probably go with slot*3 or even
 higher.

 Lastly, the way you output elements looks also a bit suspicious.
 PrintStream is not known for great performance. I'm also surprised that it
 works without manual flushing.

 [1] https://issues.apache.org/jira/browse/FLINK-11499

 On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke 
 wrote:

> I think S3 is a wrong storage backend for this volumes of small
> messages.
> Try to use a NoSQL database or write multiple messages into one file
> in S3 (1 or 10)
>
> If you still want to go with your scenario then try a network
> optimized instance and use s3a in Flink and configure s3 entropy.
>
> Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru <
> vkollur...@gmail.com>:
>
> 
> Hi David,
>
> The avg size of each file is around 30KB and I have checkpoint
> interval of 5 minutes. Some files are even 1 kb, because of checkpoint 
> some
> files are merged into 1 big file around 300MB.
>
> With 120 million files and 4Tb, if the rate of transfer is 300 per
> minute, it is taking weeks to write to s3.
>
> I have tried to increase parallelism of sink but I dont see any
> improvement.
>
> The sink record is Tuple3, the actual content of
> file is f2. This is content is written to /f0/f1/part*-*
>
> I guess the prefix determination in custombucketassigner wont be
> causing this delay?
>
> Could you please shed some light on writing custom s3 sink ?
>
> Thanks
>
>
> On Sun, May 31, 2020, 6:34 AM David Magalhães 
> wrote:
>
>> Hi Venkata.
>>
>> 300 requests per minute look like a 200ms per request, which should
>> be a normal response time to send a file if there isn't any speed
>> limitation (how big are the files?).
>>
>> Have you changed the parallelization to be higher than 1? I also
>> recommend to limit the source parallelization, because it can consume
>> pretty fast from Kafka and create some kind of backpressure.
>>
>> I don't any much experience with StreamingFileSink, because I've
>> ended up using a custom S3Sink, but I did have some issues writing to S3
>> because the 

Re: Flink s3 streaming performance

2020-06-06 Thread Arvid Heise
Hi Venkata,

you can find them on the Hadoop AWS page (we are just using it as a
library) [1].

[1]
https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#General_S3A_Client_configuration

On Sat, Jun 6, 2020 at 1:26 AM venkata sateesh` kolluru <
vkollur...@gmail.com> wrote:

> Hi Kostas and Arvid,
>
> Thanks for your suggestions.
>
> The small files were already created and I am trying to roll few into a
> big file while sinking. But due to the custom bucket assigner, it is hard
> getting more files with in the same prefix in specified checkinpointing
> time.
>
> For example:
> /prefix1/prefix2/YY/MM/DD/HH  is our structure in s3.
> checkpointing interval is 5 minutes. prefix1 has 40 different values and
> prefix 2 has 1+ values
> With in the 5 minute interval, we are able to get part file size in these
> prefixes not more than 5-10 files.
>
> Regarding printstream, will figure out how to use SimpleStringEncoder on a
> Tuple as I only need to write tuple.f2 element in the file. If you can
> guide me on how to do it, I would appreciate it.
>
> Will try Arvid suggestion on increasing fs.s3a.connection.maximum . I was
> trying to find about these parameters and could find anywhere. Is there a
> place that I could look at these config params list ?
>
> Also I am using s3:// as prefix, would fs.s3a.connection.maximum affect
> that too or is there separate param like fs.s3.connection.maximum.
>
> On Fri, Jun 5, 2020 at 2:13 PM Kostas Kloudas  wrote:
>
>> Hi all,
>>
>> @Venkata, Do you have many small files being created as Arvid suggested?
>> If yes, then I tend to agree that S3 is probably not the best sink.
>> Although I did not get that from your description.
>> In addition, instead of PrintStream you can have a look at the code of
>> the SimpleStringEncoder in Flink [1] for a bit more efficient
>> implementation.
>>
>> Cheers,
>> Kostas
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java
>>
>>
>> On Fri, Jun 5, 2020 at 7:56 PM Arvid Heise  wrote:
>>
>>> Hi Venkata,
>>>
>>> are the many small files intended or is it rather an issue of our commit
>>> on checkpointing? If so then FLINK-11499 [1] should help you. Design is
>>> close to done, unfortunately implementation will not make it into 1.11.
>>>
>>> In any case, I'd look at the parameter fs.s3a.connection.maximum, as
>>> you store both state and data on S3. I'd probably go with slot*3 or even
>>> higher.
>>>
>>> Lastly, the way you output elements looks also a bit suspicious.
>>> PrintStream is not known for great performance. I'm also surprised that it
>>> works without manual flushing.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-11499
>>>
>>> On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke  wrote:
>>>
 I think S3 is a wrong storage backend for this volumes of small
 messages.
 Try to use a NoSQL database or write multiple messages into one file in
 S3 (1 or 10)

 If you still want to go with your scenario then try a network optimized
 instance and use s3a in Flink and configure s3 entropy.

 Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru <
 vkollur...@gmail.com>:

 
 Hi David,

 The avg size of each file is around 30KB and I have checkpoint interval
 of 5 minutes. Some files are even 1 kb, because of checkpoint some files
 are merged into 1 big file around 300MB.

 With 120 million files and 4Tb, if the rate of transfer is 300 per
 minute, it is taking weeks to write to s3.

 I have tried to increase parallelism of sink but I dont see any
 improvement.

 The sink record is Tuple3, the actual content of
 file is f2. This is content is written to /f0/f1/part*-*

 I guess the prefix determination in custombucketassigner wont be
 causing this delay?

 Could you please shed some light on writing custom s3 sink ?

 Thanks


 On Sun, May 31, 2020, 6:34 AM David Magalhães 
 wrote:

> Hi Venkata.
>
> 300 requests per minute look like a 200ms per request, which should be
> a normal response time to send a file if there isn't any speed limitation
> (how big are the files?).
>
> Have you changed the parallelization to be higher than 1? I also
> recommend to limit the source parallelization, because it can consume
> pretty fast from Kafka and create some kind of backpressure.
>
> I don't any much experience with StreamingFileSink, because I've ended
> up using a custom S3Sink, but I did have some issues writing to S3 because
> the request wasn't parallelised. Check this thread,
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070
>
> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <
> vkollur...@gmail.com> wrote:
>
>> 

Re: Flink s3 streaming performance

2020-06-05 Thread venkata sateesh` kolluru
Hi Kostas and Arvid,

Thanks for your suggestions.

The small files were already created and I am trying to roll few into a big
file while sinking. But due to the custom bucket assigner, it is hard
getting more files with in the same prefix in specified checkinpointing
time.

For example:
/prefix1/prefix2/YY/MM/DD/HH  is our structure in s3.
checkpointing interval is 5 minutes. prefix1 has 40 different values and
prefix 2 has 1+ values
With in the 5 minute interval, we are able to get part file size in these
prefixes not more than 5-10 files.

Regarding printstream, will figure out how to use SimpleStringEncoder on a
Tuple as I only need to write tuple.f2 element in the file. If you can
guide me on how to do it, I would appreciate it.

Will try Arvid suggestion on increasing fs.s3a.connection.maximum . I was
trying to find about these parameters and could find anywhere. Is there a
place that I could look at these config params list ?

Also I am using s3:// as prefix, would fs.s3a.connection.maximum affect
that too or is there separate param like fs.s3.connection.maximum.

On Fri, Jun 5, 2020 at 2:13 PM Kostas Kloudas  wrote:

> Hi all,
>
> @Venkata, Do you have many small files being created as Arvid suggested?
> If yes, then I tend to agree that S3 is probably not the best sink.
> Although I did not get that from your description.
> In addition, instead of PrintStream you can have a look at the code of the
> SimpleStringEncoder in Flink [1] for a bit more efficient implementation.
>
> Cheers,
> Kostas
>
> [1]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java
>
>
> On Fri, Jun 5, 2020 at 7:56 PM Arvid Heise  wrote:
>
>> Hi Venkata,
>>
>> are the many small files intended or is it rather an issue of our commit
>> on checkpointing? If so then FLINK-11499 [1] should help you. Design is
>> close to done, unfortunately implementation will not make it into 1.11.
>>
>> In any case, I'd look at the parameter fs.s3a.connection.maximum, as you
>> store both state and data on S3. I'd probably go with slot*3 or even higher.
>>
>> Lastly, the way you output elements looks also a bit suspicious.
>> PrintStream is not known for great performance. I'm also surprised that it
>> works without manual flushing.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-11499
>>
>> On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke  wrote:
>>
>>> I think S3 is a wrong storage backend for this volumes of small
>>> messages.
>>> Try to use a NoSQL database or write multiple messages into one file in
>>> S3 (1 or 10)
>>>
>>> If you still want to go with your scenario then try a network optimized
>>> instance and use s3a in Flink and configure s3 entropy.
>>>
>>> Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru <
>>> vkollur...@gmail.com>:
>>>
>>> 
>>> Hi David,
>>>
>>> The avg size of each file is around 30KB and I have checkpoint interval
>>> of 5 minutes. Some files are even 1 kb, because of checkpoint some files
>>> are merged into 1 big file around 300MB.
>>>
>>> With 120 million files and 4Tb, if the rate of transfer is 300 per
>>> minute, it is taking weeks to write to s3.
>>>
>>> I have tried to increase parallelism of sink but I dont see any
>>> improvement.
>>>
>>> The sink record is Tuple3, the actual content of
>>> file is f2. This is content is written to /f0/f1/part*-*
>>>
>>> I guess the prefix determination in custombucketassigner wont be causing
>>> this delay?
>>>
>>> Could you please shed some light on writing custom s3 sink ?
>>>
>>> Thanks
>>>
>>>
>>> On Sun, May 31, 2020, 6:34 AM David Magalhães 
>>> wrote:
>>>
 Hi Venkata.

 300 requests per minute look like a 200ms per request, which should be
 a normal response time to send a file if there isn't any speed limitation
 (how big are the files?).

 Have you changed the parallelization to be higher than 1? I also
 recommend to limit the source parallelization, because it can consume
 pretty fast from Kafka and create some kind of backpressure.

 I don't any much experience with StreamingFileSink, because I've ended
 up using a custom S3Sink, but I did have some issues writing to S3 because
 the request wasn't parallelised. Check this thread,
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070

 On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <
 vkollur...@gmail.com> wrote:

> Hello,
>
> I have posted the same in stackoverflow but didnt get any response. So
> posting it here for help.
>
>
> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787
>
> Details:
>
> I am working on a flink application on kubernetes(eks) which consumes
> data from kafka and write it to s3.
>
> We have around 120 million 

Re: Flink s3 streaming performance

2020-06-05 Thread Kostas Kloudas
Hi all,

@Venkata, Do you have many small files being created as Arvid suggested? If
yes, then I tend to agree that S3 is probably not the best sink. Although I
did not get that from your description.
In addition, instead of PrintStream you can have a look at the code of the
SimpleStringEncoder in Flink [1] for a bit more efficient implementation.

Cheers,
Kostas

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java


On Fri, Jun 5, 2020 at 7:56 PM Arvid Heise  wrote:

> Hi Venkata,
>
> are the many small files intended or is it rather an issue of our commit
> on checkpointing? If so then FLINK-11499 [1] should help you. Design is
> close to done, unfortunately implementation will not make it into 1.11.
>
> In any case, I'd look at the parameter fs.s3a.connection.maximum, as you
> store both state and data on S3. I'd probably go with slot*3 or even higher.
>
> Lastly, the way you output elements looks also a bit suspicious.
> PrintStream is not known for great performance. I'm also surprised that it
> works without manual flushing.
>
> [1] https://issues.apache.org/jira/browse/FLINK-11499
>
> On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke  wrote:
>
>> I think S3 is a wrong storage backend for this volumes of small messages.
>> Try to use a NoSQL database or write multiple messages into one file in
>> S3 (1 or 10)
>>
>> If you still want to go with your scenario then try a network optimized
>> instance and use s3a in Flink and configure s3 entropy.
>>
>> Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru <
>> vkollur...@gmail.com>:
>>
>> 
>> Hi David,
>>
>> The avg size of each file is around 30KB and I have checkpoint interval
>> of 5 minutes. Some files are even 1 kb, because of checkpoint some files
>> are merged into 1 big file around 300MB.
>>
>> With 120 million files and 4Tb, if the rate of transfer is 300 per
>> minute, it is taking weeks to write to s3.
>>
>> I have tried to increase parallelism of sink but I dont see any
>> improvement.
>>
>> The sink record is Tuple3, the actual content of
>> file is f2. This is content is written to /f0/f1/part*-*
>>
>> I guess the prefix determination in custombucketassigner wont be causing
>> this delay?
>>
>> Could you please shed some light on writing custom s3 sink ?
>>
>> Thanks
>>
>>
>> On Sun, May 31, 2020, 6:34 AM David Magalhães 
>> wrote:
>>
>>> Hi Venkata.
>>>
>>> 300 requests per minute look like a 200ms per request, which should be a
>>> normal response time to send a file if there isn't any speed limitation
>>> (how big are the files?).
>>>
>>> Have you changed the parallelization to be higher than 1? I also
>>> recommend to limit the source parallelization, because it can consume
>>> pretty fast from Kafka and create some kind of backpressure.
>>>
>>> I don't any much experience with StreamingFileSink, because I've ended
>>> up using a custom S3Sink, but I did have some issues writing to S3 because
>>> the request wasn't parallelised. Check this thread,
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070
>>>
>>> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <
>>> vkollur...@gmail.com> wrote:
>>>
 Hello,

 I have posted the same in stackoverflow but didnt get any response. So
 posting it here for help.


 https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787

 Details:

 I am working on a flink application on kubernetes(eks) which consumes
 data from kafka and write it to s3.

 We have around 120 million xml messages of size 4TB in kafka. Consuming
 from kafka is super fast.

 These are just string messages from kafka.

 There is a high back pressure while writing to s3. We are not even
 hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am
 seeing only 300 writes per minute to S3 which is very slow.

 I am using StreamFileSink to write to s3 with Rolling policy as
 OnCheckpointPolicy.

 Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or
 s3p)

 Other than this I dont have any config related to s3

 StreamingFileSink> sink = 
 StreamingFileSink
 .forRowFormat(new Path(s3://BUCKET),
 (Tuple3 element, OutputStream 
 stream) -> {
 PrintStream out = new PrintStream(stream);
 out.println(element.f2);
 })
 // Determine component type for each record
 .withBucketAssigner(new CustomBucketAssigner())
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
 .build();

 Is there 

Re: Flink s3 streaming performance

2020-06-05 Thread Arvid Heise
Hi Venkata,

are the many small files intended or is it rather an issue of our commit on
checkpointing? If so then FLINK-11499 [1] should help you. Design is close
to done, unfortunately implementation will not make it into 1.11.

In any case, I'd look at the parameter fs.s3a.connection.maximum, as you
store both state and data on S3. I'd probably go with slot*3 or even higher.

Lastly, the way you output elements looks also a bit suspicious.
PrintStream is not known for great performance. I'm also surprised that it
works without manual flushing.

[1] https://issues.apache.org/jira/browse/FLINK-11499

On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke  wrote:

> I think S3 is a wrong storage backend for this volumes of small messages.
> Try to use a NoSQL database or write multiple messages into one file in S3
> (1 or 10)
>
> If you still want to go with your scenario then try a network optimized
> instance and use s3a in Flink and configure s3 entropy.
>
> Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru <
> vkollur...@gmail.com>:
>
> 
> Hi David,
>
> The avg size of each file is around 30KB and I have checkpoint interval of
> 5 minutes. Some files are even 1 kb, because of checkpoint some files are
> merged into 1 big file around 300MB.
>
> With 120 million files and 4Tb, if the rate of transfer is 300 per minute,
> it is taking weeks to write to s3.
>
> I have tried to increase parallelism of sink but I dont see any
> improvement.
>
> The sink record is Tuple3, the actual content of
> file is f2. This is content is written to /f0/f1/part*-*
>
> I guess the prefix determination in custombucketassigner wont be causing
> this delay?
>
> Could you please shed some light on writing custom s3 sink ?
>
> Thanks
>
>
> On Sun, May 31, 2020, 6:34 AM David Magalhães 
> wrote:
>
>> Hi Venkata.
>>
>> 300 requests per minute look like a 200ms per request, which should be a
>> normal response time to send a file if there isn't any speed limitation
>> (how big are the files?).
>>
>> Have you changed the parallelization to be higher than 1? I also
>> recommend to limit the source parallelization, because it can consume
>> pretty fast from Kafka and create some kind of backpressure.
>>
>> I don't any much experience with StreamingFileSink, because I've ended up
>> using a custom S3Sink, but I did have some issues writing to S3 because the
>> request wasn't parallelised. Check this thread,
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070
>>
>> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <
>> vkollur...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I have posted the same in stackoverflow but didnt get any response. So
>>> posting it here for help.
>>>
>>>
>>> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787
>>>
>>> Details:
>>>
>>> I am working on a flink application on kubernetes(eks) which consumes
>>> data from kafka and write it to s3.
>>>
>>> We have around 120 million xml messages of size 4TB in kafka. Consuming
>>> from kafka is super fast.
>>>
>>> These are just string messages from kafka.
>>>
>>> There is a high back pressure while writing to s3. We are not even
>>> hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am
>>> seeing only 300 writes per minute to S3 which is very slow.
>>>
>>> I am using StreamFileSink to write to s3 with Rolling policy as
>>> OnCheckpointPolicy.
>>>
>>> Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or
>>> s3p)
>>>
>>> Other than this I dont have any config related to s3
>>>
>>> StreamingFileSink> sink = 
>>> StreamingFileSink
>>> .forRowFormat(new Path(s3://BUCKET),
>>> (Tuple3 element, OutputStream 
>>> stream) -> {
>>> PrintStream out = new PrintStream(stream);
>>> out.println(element.f2);
>>> })
>>> // Determine component type for each record
>>> .withBucketAssigner(new CustomBucketAssigner())
>>> .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>> .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
>>> .build();
>>>
>>> Is there anything that we can optimize on s3 from streamfilesink or in
>>> flink-conf.xml ?
>>>
>>> Like using bulkformat or any config params like fs.s3.maxThreads etc.
>>>
>>> For checkpointing too I am using s3:// instead of s3p or s3a
>>>
>>> env.setStateBackend((StateBackend) new 
>>> RocksDBStateBackend(s3://checkpoint_bucket, true));
>>> env.enableCheckpointing(30);
>>>
>>>
>>>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Re: Flink s3 streaming performance

2020-06-01 Thread Jörn Franke
I think S3 is a wrong storage backend for this volumes of small messages. 
Try to use a NoSQL database or write multiple messages into one file in S3 
(1 or 10)

If you still want to go with your scenario then try a network optimized 
instance and use s3a in Flink and configure s3 entropy.

> Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru 
> :
> 
> 
> Hi David,
> 
> The avg size of each file is around 30KB and I have checkpoint interval of 5 
> minutes. Some files are even 1 kb, because of checkpoint some files are 
> merged into 1 big file around 300MB.
> 
> With 120 million files and 4Tb, if the rate of transfer is 300 per minute, it 
> is taking weeks to write to s3.
> 
> I have tried to increase parallelism of sink but I dont see any improvement. 
> 
> The sink record is Tuple3, the actual content of file 
> is f2. This is content is written to /f0/f1/part*-* 
> 
> I guess the prefix determination in custombucketassigner wont be causing this 
> delay?
> 
> Could you please shed some light on writing custom s3 sink ?
> 
> Thanks
> 
> 
>> On Sun, May 31, 2020, 6:34 AM David Magalhães  wrote:
>> Hi Venkata. 
>> 
>> 300 requests per minute look like a 200ms per request, which should be a 
>> normal response time to send a file if there isn't any speed limitation (how 
>> big are the files?).
>> 
>> Have you changed the parallelization to be higher than 1? I also recommend 
>> to limit the source parallelization, because it can consume pretty fast from 
>> Kafka and create some kind of backpressure.
>> 
>> I don't any much experience with StreamingFileSink, because I've ended up 
>> using a custom S3Sink, but I did have some issues writing to S3 because the 
>> request wasn't parallelised. Check this thread, 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070
>> 
>>> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru 
>>>  wrote:
>>> Hello,
>>> 
>>> I have posted the same in stackoverflow but didnt get any response. So 
>>> posting it here for help.
>>> 
>>> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787
>>> 
>>> Details:
>>> 
>>> I am working on a flink application on kubernetes(eks) which consumes data 
>>> from kafka and write it to s3.
>>> 
>>> We have around 120 million xml messages of size 4TB in kafka. Consuming 
>>> from kafka is super fast.
>>> 
>>> These are just string messages from kafka. 
>>> 
>>> There is a high back pressure while writing to s3. We are not even hitting 
>>> the s3 PUT request limit which is arounf 3500 requests/sec. I am seeing 
>>> only 300 writes per minute to S3 which is very slow.
>>> 
>>> I am using StreamFileSink to write to s3 with Rolling policy as 
>>> OnCheckpointPolicy.
>>> 
>>> Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or s3p)
>>> 
>>> Other than this I dont have any config related to s3
>>> 
>>> StreamingFileSink> sink = 
>>> StreamingFileSink
>>> .forRowFormat(new Path(s3://BUCKET),
>>> (Tuple3 element, OutputStream 
>>> stream) -> {
>>> PrintStream out = new PrintStream(stream);
>>> out.println(element.f2);
>>> })
>>> // Determine component type for each record
>>> .withBucketAssigner(new CustomBucketAssigner())
>>> .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>> .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
>>> .build(); 
>>> Is there anything that we can optimize on s3 from streamfilesink or in 
>>> flink-conf.xml ?
>>> 
>>> Like using bulkformat or any config params like fs.s3.maxThreads etc.
>>> 
>>> For checkpointing too I am using s3:// instead of s3p or s3a
>>> 
>>> env.setStateBackend((StateBackend) new 
>>> RocksDBStateBackend(s3://checkpoint_bucket, true));
>>> env.enableCheckpointing(30);
>>> 


Re: Flink s3 streaming performance

2020-05-31 Thread venkata sateesh` kolluru
Hi David,

The avg size of each file is around 30KB and I have checkpoint interval of
5 minutes. Some files are even 1 kb, because of checkpoint some files are
merged into 1 big file around 300MB.

With 120 million files and 4Tb, if the rate of transfer is 300 per minute,
it is taking weeks to write to s3.

I have tried to increase parallelism of sink but I dont see any
improvement.

The sink record is Tuple3, the actual content of file
is f2. This is content is written to /f0/f1/part*-*

I guess the prefix determination in custombucketassigner wont be causing
this delay?

Could you please shed some light on writing custom s3 sink ?

Thanks


On Sun, May 31, 2020, 6:34 AM David Magalhães  wrote:

> Hi Venkata.
>
> 300 requests per minute look like a 200ms per request, which should be a
> normal response time to send a file if there isn't any speed limitation
> (how big are the files?).
>
> Have you changed the parallelization to be higher than 1? I also recommend
> to limit the source parallelization, because it can consume pretty fast
> from Kafka and create some kind of backpressure.
>
> I don't any much experience with StreamingFileSink, because I've ended up
> using a custom S3Sink, but I did have some issues writing to S3 because the
> request wasn't parallelised. Check this thread,
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070
>
> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <
> vkollur...@gmail.com> wrote:
>
>> Hello,
>>
>> I have posted the same in stackoverflow but didnt get any response. So
>> posting it here for help.
>>
>>
>> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787
>>
>> Details:
>>
>> I am working on a flink application on kubernetes(eks) which consumes
>> data from kafka and write it to s3.
>>
>> We have around 120 million xml messages of size 4TB in kafka. Consuming
>> from kafka is super fast.
>>
>> These are just string messages from kafka.
>>
>> There is a high back pressure while writing to s3. We are not even
>> hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am
>> seeing only 300 writes per minute to S3 which is very slow.
>>
>> I am using StreamFileSink to write to s3 with Rolling policy as
>> OnCheckpointPolicy.
>>
>> Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or
>> s3p)
>>
>> Other than this I dont have any config related to s3
>>
>> StreamingFileSink> sink = StreamingFileSink
>> .forRowFormat(new Path(s3://BUCKET),
>> (Tuple3 element, OutputStream 
>> stream) -> {
>> PrintStream out = new PrintStream(stream);
>> out.println(element.f2);
>> })
>> // Determine component type for each record
>> .withBucketAssigner(new CustomBucketAssigner())
>> .withRollingPolicy(OnCheckpointRollingPolicy.build())
>> .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
>> .build();
>>
>> Is there anything that we can optimize on s3 from streamfilesink or in
>> flink-conf.xml ?
>>
>> Like using bulkformat or any config params like fs.s3.maxThreads etc.
>>
>> For checkpointing too I am using s3:// instead of s3p or s3a
>>
>> env.setStateBackend((StateBackend) new 
>> RocksDBStateBackend(s3://checkpoint_bucket, true));
>> env.enableCheckpointing(30);
>>
>>
>>


Re: Flink s3 streaming performance

2020-05-31 Thread David Magalhães
Hi Venkata.

300 requests per minute look like a 200ms per request, which should be a
normal response time to send a file if there isn't any speed limitation
(how big are the files?).

Have you changed the parallelization to be higher than 1? I also recommend
to limit the source parallelization, because it can consume pretty fast
from Kafka and create some kind of backpressure.

I don't any much experience with StreamingFileSink, because I've ended up
using a custom S3Sink, but I did have some issues writing to S3 because the
request wasn't parallelised. Check this thread,
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070

On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <
vkollur...@gmail.com> wrote:

> Hello,
>
> I have posted the same in stackoverflow but didnt get any response. So
> posting it here for help.
>
>
> https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787
>
> Details:
>
> I am working on a flink application on kubernetes(eks) which consumes data
> from kafka and write it to s3.
>
> We have around 120 million xml messages of size 4TB in kafka. Consuming
> from kafka is super fast.
>
> These are just string messages from kafka.
>
> There is a high back pressure while writing to s3. We are not even hitting
> the s3 PUT request limit which is arounf 3500 requests/sec. I am seeing
> only 300 writes per minute to S3 which is very slow.
>
> I am using StreamFileSink to write to s3 with Rolling policy as
> OnCheckpointPolicy.
>
> Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or s3p)
>
> Other than this I dont have any config related to s3
>
> StreamingFileSink> sink = StreamingFileSink
> .forRowFormat(new Path(s3://BUCKET),
> (Tuple3 element, OutputStream 
> stream) -> {
> PrintStream out = new PrintStream(stream);
> out.println(element.f2);
> })
> // Determine component type for each record
> .withBucketAssigner(new CustomBucketAssigner())
> .withRollingPolicy(OnCheckpointRollingPolicy.build())
> .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
> .build();
>
> Is there anything that we can optimize on s3 from streamfilesink or in
> flink-conf.xml ?
>
> Like using bulkformat or any config params like fs.s3.maxThreads etc.
>
> For checkpointing too I am using s3:// instead of s3p or s3a
>
> env.setStateBackend((StateBackend) new 
> RocksDBStateBackend(s3://checkpoint_bucket, true));
> env.enableCheckpointing(30);
>
>
>


Flink s3 streaming performance

2020-05-30 Thread venkata sateesh` kolluru
Hello,

I have posted the same in stackoverflow but didnt get any response. So
posting it here for help.

https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787

Details:

I am working on a flink application on kubernetes(eks) which consumes data
from kafka and write it to s3.

We have around 120 million xml messages of size 4TB in kafka. Consuming
from kafka is super fast.

These are just string messages from kafka.

There is a high back pressure while writing to s3. We are not even hitting
the s3 PUT request limit which is arounf 3500 requests/sec. I am seeing
only 300 writes per minute to S3 which is very slow.

I am using StreamFileSink to write to s3 with Rolling policy as
OnCheckpointPolicy.

Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or s3p)

Other than this I dont have any config related to s3

StreamingFileSink> sink = StreamingFileSink
.forRowFormat(new Path(s3://BUCKET),
(Tuple3 element,
OutputStream stream) -> {
PrintStream out = new PrintStream(stream);
out.println(element.f2);
})
// Determine component type for each record
.withBucketAssigner(new CustomBucketAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
.build();

Is there anything that we can optimize on s3 from streamfilesink or in
flink-conf.xml ?

Like using bulkformat or any config params like fs.s3.maxThreads etc.

For checkpointing too I am using s3:// instead of s3p or s3a

env.setStateBackend((StateBackend) new
RocksDBStateBackend(s3://checkpoint_bucket, true));
env.enableCheckpointing(30);