Xintong Song,

   - Which version of Flink is used?    *1.10*
   - Which deployment mode is used? *Standalone*
   - Which cluster mode is used? *Job*
   - Do you mean you have a 4core16gb node for each task manager, and each
   task manager has 4 slots? *Yeah*. *There are totally 3 taskmanagers in
   the cluster.  2TMs are t2.medium machine 2 core 4 gb per machine. 1 slot
   per core. 1TM is t2.large 4core 16gb . 4slots in the machine. There were
   other jobs running in the t2.medium TMs. T2.large machine is where the
   performance testing job was running. *
   - Sounds like you are running a streaming job without using any state.
   Have you tuned the managed memory fraction
   (`taskmanager.memory.managed.fraction`) to zero as suggested in the
   document[1]?  *No i have not set the
   taskmanager.memory.network.fraction to 0. I had set Checkpoint to use the
   Job manager backend. *
   - *The CPU maximum spike i spotted was 40%. *

*Between i did some latest test only on t2.medium machine with 2 slots per
core. 1million records with 10k/s ingestion rate. Parallelism was 1. *
*I added rebalance to the inputstream.   ex: *inputStream.rebalance().map()
*I was able to get latency in the range 130ms - 2sec.*

Let me also know if there are more things to consider here.

Thanks
Prasanna.

On Thu, Jul 16, 2020 at 4:04 PM Xintong Song <tonysong...@gmail.com> wrote:

> Hi Prasanna,
>
> Trying to understand how Flink is deployed.
>
>    - Which version of Flink is used?
>    - Which deployment mode is used? (Standalone/Kubernetes/Yarn/Mesos)
>    - Which cluster mode is used? (Job/Session)
>    - Do you mean you have a 4core16gb node for each task manager, and
>    each task manager has 4 slots?
>    - Sounds like you are running a streaming job without using any state.
>    Have you tuned the managed memory fraction
>    (`taskmanager.memory.managed.fraction`) to zero as suggested in the
>    document[1]?
>
> When running a stateless job or using a heap state backend
>> (MemoryStateBackend or FsStateBackend), set managed memory to zero.
>>
>
> I can see a few potential problems.
>
>    - Managed memory is probably not configured. That means a significant
>    fraction of memory is unused.
>    - It sounds like the CPU processing time is not the bottleneck. Thus
>    increasing the parallelism will not give you better performance, but will
>    on the other hand increase the overhead load on the task manager.
>
> Also pulled in Becket Qin, who is the expert of Kafka connectors. Since
> you have observed lack of performance in reading from Kafka compared to
> Storm.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_tuning.html#heap-state-backend
>
> On Thu, Jul 16, 2020 at 10:35 AM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>> Hi
>>
>> Sending to you all separately as you answered one of my earlier query.
>>
>> Thanks,
>> Prasanna.
>>
>>
>> ---------- Forwarded message ---------
>> From: Prasanna kumar <prasannakumarram...@gmail.com>
>> Date: Wed 15 Jul, 2020, 23:27
>> Subject: Performance test Flink vs Storm
>> To: <d...@flink.apache.org>, user <user@flink.apache.org>
>>
>>
>> Hi,
>>
>> We are testing flink and storm for our streaming pipelines on various
>> features.
>>
>> In terms of Latency,i see the flink comes up short on storm even if more
>> CPU is given to it. Will Explain in detail.
>>
>> *Machine*. t2.large 4 core 16 gb. is used for Used for flink task
>> manager and storm supervisor node.
>> *Kafka Partitions* 4
>> *Messages tested:* 1million
>> *Load* : 50k/sec
>>
>> *Scenario*:
>> Read from Kafka -> Transform (Map to a different JSON format) - > Write
>> to a Kafka topic.
>>
>> *Test 1*
>> Storm Parallelism is set as 1. There are four processes. 1 Spout (Read
>> from Kafka) and 3 bolts (Transformation and sink) .
>> Flink. Operator level parallelism not set. Task Parallelism is set as 1.
>> Task slot is 1 per core.
>>
>> Storm was 130 milliseconds faster in 1st record.
>> Storm was 20 seconds faster in 1 millionth record.
>>
>> *Test 2*
>> Storm Parallelism is set as 1. There are four processes. 1 Spout (Read
>> from Kafka) and 3 bolts (Transformation and sink)
>> Flink. Operator level parallelism not set. Task Parallelism is set as 4.
>> Task slot is 1 per core. So all cores is used.
>>
>> Storm was 180 milliseconds faster in 1st record.
>> Storm was 25 seconds faster in 1 millionth record.
>>
>> *Observations here*
>> 1) Increasing Parallelism did not increase the performance in Flink
>> rather it became 50ms to 5s slower.
>> 2) Flink is slower in Reading from Kafka compared to storm. Thats where
>> the bulk of the latency is.  for the millionth record its 19-24 seconds
>> slower.
>> 3) Once message is read, flink takes lesser time to transform and write
>> to kafka compared to storm.
>>
>> *Other Flink Config*
>> jobmanager.heap.size: 1024m
>>
>> taskmanager.memory.process.size: 1568m
>>
>> *How do we improve the latency ? *
>> *Why does latency becomes worse when parallelism is increased and matched
>> to partitions?*
>>
>> Thanks,
>> Prasanna.
>>
>

Reply via email to