Hi Prasanna , 

>From my experience, there is a ton of stuff which can slow down even a simple 
>pipeline heavily. One thing directly coming to my mind: "object reuse" is not 
>enabled. Even if you have a very simple pipeline with just 2 map steps or so, 
>this can lead to a ton of unneceesary deep copies and GC activities. 

For a benchmark/idea of performance on a cloud vm setup, I would probably start 
off with the highest level API of Flink to work on, i.e. Flink SQL CLI. The 
most automatic optimizations can be expected from this. Having that, you know 
you can tweak your manually programmed pipeline to be at least as fast as that. 

You probably also want to share the code of your POC so that others can check 
whether there are other potential problems. 

Best regards 
Theo 


Von: "Xintong Song" <tonysong...@gmail.com> 
An: "Prasanna kumar" <prasannakumarram...@gmail.com> 
CC: "user" <user@flink.apache.org> 
Gesendet: Freitag, 17. Juli 2020 05:41:23 
Betreff: Re: Performance test Flink vs Storm 



>From this exercise , I understand that increasing JVM memory would directly 
>support/increase throughout. Am i correct? 



It depends. Smaller heap space means more frequent GCs, which occupies the cpu 
processing time and also introduces more pauses to your program. If you already 
have large enough heap space, then you can hardly benefit from further 
increasing it. 

I'm not aware of any benchmark for Kafka connectors. You can check 
flink-benchmarks[1], and maybe fork the repository and develop your own Kafka 
connector benchmark based on it. 



Thank you~ 

Xintong Song 



[1] [ https://github.com/apache/flink-benchmarks | 
https://github.com/apache/flink-benchmarks ] 

On Fri, Jul 17, 2020 at 10:54 AM Prasanna kumar < [ 
mailto:prasannakumarram...@gmail.com | prasannakumarram...@gmail.com ] > wrote: 

BQ_BEGIN

Hi, 

After making the task.managed. Memory. fraction as 0 , i see that JVM heap 
memory increased from 512 mb to 1 GB. 

Earlier I was getting a maximum of 4-6k per second throughput on Kafka source 
for ingestion rate of 12k+/second. Now I see that improved to 11k per 
task(parallelism of 1) and 16.5k+ second when run with parallelism of 2. (8.25k 
per task).. 

The maximum memory used during the run was 500 mb of heap space. 

>From this exercise , I understand that increasing JVM memory would directly 
>support/increase throughout. Am i correct? 

Our goal is to test for 100k ingestion per second and try to calculate cost for 
1 million per second ( hope it's linear relation) 

I also saw the CPU utilisation peaked to 50% during the same. 

1) Let me know what you think of the same, as I would continue to test. 

2) Is there a benchmark for number of records handled per Kafka connector task 
for a particular JVM heap number. 

Thanks, 
Prasanna 

On Fri 17 Jul, 2020, 06:18 Xintong Song, < [ mailto:tonysong...@gmail.com | 
tonysong...@gmail.com ] > wrote: 

BQ_BEGIN


BQ_BEGIN
I had set Checkpoint to use the Job manager backend. 
BQ_END

Jobmanager backend also runs in JVM heap space and does not use managed memory. 
Setting managed memory fraction to 0 will give you larger JVM heap space, thus 
lesser GC pressure. 



Thank you~ 

Xintong Song 




On Thu, Jul 16, 2020 at 10:38 PM Prasanna kumar < [ 
mailto:prasannakumarram...@gmail.com | prasannakumarram...@gmail.com ] > wrote: 

BQ_BEGIN


Xintong Song, 


    * Which version of Flink is used? 1.10 
    * Which deployment mode is used? Standalone 
    * Which cluster mode is used? Job 
    * Do you mean you have a 4core16gb node for each task manager, and each 
task manager has 4 slots? Yeah . There are totally 3 taskmanagers in the 
cluster. 2TMs are t2.medium machine 2 core 4 gb per machine. 1 slot per core. 
1TM is t2.large 4core 16gb . 4slots in the machine. There were other jobs 
running in the t2.medium TMs. T2.large machine is where the performance testing 
job was running. 
    * Sounds like you are running a streaming job without using any state. Have 
you tuned the managed memory fraction (`taskmanager.memory.managed.fraction`) 
to zero as suggested in the document[1]? No i have not set the 
taskmanager.memory.network.fraction to 0. I had set Checkpoint to use the Job 
manager backend. 
    * The CPU maximum spike i spotted was 40%. 

Between i did some latest test only on t2.medium machine with 2 slots per core. 
1million records with 10k/s ingestion rate. Parallelism was 1. 
I added rebalance to the inputstream. ex: inputStream.rebalance().map() 
I was able to get latency in the range 130ms - 2sec. 

Let me also know if there are more things to consider here. 

Thanks 
Prasanna. 

On Thu, Jul 16, 2020 at 4:04 PM Xintong Song < [ mailto:tonysong...@gmail.com | 
tonysong...@gmail.com ] > wrote: 

BQ_BEGIN

Hi Prasanna, 

Trying to understand how Flink is deployed. 


    * Which version of Flink is used? 
    * Which deployment mode is used? (Standalone/Kubernetes/Yarn/Mesos) 
    * Which cluster mode is used? (Job/Session) 
    * Do you mean you have a 4core16gb node for each task manager, and each 
task manager has 4 slots? 
    * Sounds like you are running a streaming job without using any state. Have 
you tuned the managed memory fraction (`taskmanager.memory.managed.fraction`) 
to zero as suggested in the document[1]? 

BQ_BEGIN

When running a stateless job or using a heap state backend (MemoryStateBackend 
or FsStateBackend), set managed memory to zero. 

BQ_END

BQ_BEGIN

BQ_END


I can see a few potential problems. 


    * Managed memory is probably not configured. That means a significant 
fraction of memory is unused. 
    * It sounds like the CPU processing time is not the bottleneck. Thus 
increasing the parallelism will not give you better performance, but will on 
the other hand increase the overhead load on the task manager. 

Also pulled in Becket Qin, who is the expert of Kafka connectors. Since you 
have observed lack of performance in reading from Kafka compared to Storm. 



Thank you~ 

Xintong Song 



[1] [ 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_tuning.html#heap-state-backend
 | 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_tuning.html#heap-state-backend
 ] 

On Thu, Jul 16, 2020 at 10:35 AM Prasanna kumar < [ 
mailto:prasannakumarram...@gmail.com | prasannakumarram...@gmail.com ] > wrote: 

BQ_BEGIN

Hi 

Sending to you all separately as you answered one of my earlier query. 

Thanks, 
Prasanna. 


---------- Forwarded message --------- 
From: Prasanna kumar < [ mailto:prasannakumarram...@gmail.com | 
prasannakumarram...@gmail.com ] > 
Date: Wed 15 Jul, 2020, 23:27 
Subject: Performance test Flink vs Storm 
To: < [ mailto:d...@flink.apache.org | d...@flink.apache.org ] >, user < [ 
mailto:user@flink.apache.org | user@flink.apache.org ] > 


Hi, 
We are testing flink and storm for our streaming pipelines on various features. 

In terms of Latency,i see the flink comes up short on storm even if more CPU is 
given to it. Will Explain in detail. 

Machine . t2.large 4 core 16 gb. is used for Used for flink task manager and 
storm supervisor node. 
Kafka Partitions 4 
Messages tested: 1million 
Load : 50k/sec 

Scenario : 
Read from Kafka -> Transform (Map to a different JSON format) - > Write to a 
Kafka topic. 

Test 1 
Storm Parallelism is set as 1. There are four processes. 1 Spout (Read from 
Kafka) and 3 bolts (Transformation and sink) . 
Flink. Operator level parallelism not set. Task Parallelism is set as 1. Task 
slot is 1 per core. 

Storm was 130 milliseconds faster in 1st record. 
Storm was 20 seconds faster in 1 millionth record. 

Test 2 
Storm Parallelism is set as 1. There are four processes. 1 Spout (Read from 
Kafka) and 3 bolts (Transformation and sink) 
Flink. Operator level parallelism not set. Task Parallelism is set as 4. Task 
slot is 1 per core. So all cores is used. 

Storm was 180 milliseconds faster in 1st record. 
Storm was 25 seconds faster in 1 millionth record. 

Observations here 
1) Increasing Parallelism did not increase the performance in Flink rather it 
became 50ms to 5s slower. 
2) Flink is slower in Reading from Kafka compared to storm. Thats where the 
bulk of the latency is. for the millionth record its 19-24 seconds slower. 
3) Once message is read, flink takes lesser time to transform and write to 
kafka compared to storm. 

Other Flink Config 
jobmanager.heap.size : 1024m 


taskmanager.memory.process.size: 1568m 
How do we improve the latency ? 
Why does latency becomes worse when parallelism is increased and matched to 
partitions? 

Thanks, 
Prasanna. 

BQ_END


BQ_END


BQ_END


BQ_END


BQ_END

Reply via email to