Hey Xintong, thanks for the explanations.

For the first part, can I confirm whether some of my understandings are correct 
here:

For Akka direct memory, it's part of the framework.off-heap; we also use 
FlinkKafkaConsumers and FlinkKafkaProducers in our pipeline, because of the 
netty usage within them, we need to set up a non-zero task.off-heap?

Thanks!
________________________________
From: Xintong Song <tonysong...@gmail.com>
Sent: Wednesday, April 29, 2020 10:53 PM
To: Jiahui Jiang <qzhzm173...@hotmail.com>
Cc: Steven Wu <stevenz...@gmail.com>; user@flink.apache.org 
<user@flink.apache.org>
Subject: Re: Configuring taskmanager.memory.task.off-heap.size in Flink 1.10

Hi Jiahui,

I'd like to clarify a bit more.

I think in our case, it was actually the network buffer size that was too large 
(we were seeing Akka exception), which happened to be fixed by increasing 
task.off-heap.size since that just make the direct memory larger.
Please be aware that 'taskmanager.memory.network.*' only accounts for the 
network buffer pool for Flink's data exchange between tasks. There are other 
direct memory footprints accounted by task/framework off-heap memory, including 
some akka/netty direct memory. Network buffer pool has a fixed size. It 
allocates all the configured memory at initialization and guarantees not 
exceeding the configured value. In your case, increasing configured network 
memory size should not help, because the all increased direct memory limit will 
be take away by the network buffer pool, leaves the same size to other direct 
memory consumptions.

tweaking jvm-overhead should be a much more common thing to do!
This is not always true. When talking about "off-heap memory", there are 
actually three categories: direct memory, metaspace, and native memory. Direct 
memory is controlled by JVM parameter '-XX:MaxDirectMemorySize', which is set 
to the sum of Flink's framework/task off-heap memory and network memory. 
Metaspace is controlled by JVM parameter '-XX:MaxMetaspaceSize', which is set 
to 'taskmanager.memory.metaspace.size'. Native memory is not controlled by JVM. 
In Flink, managed memory and jvm-overhead are using native memory. That means, 
if you see a JVM OOM, increasing jvm-overhead should not help.


Thank you~

Xintong Song


On Thu, Apr 30, 2020 at 11:06 AM Jiahui Jiang 
<qzhzm173...@hotmail.com<mailto:qzhzm173...@hotmail.com>> wrote:
Hey Xintong, Steven, thanks for replies!

@Steven Wu<mailto:stevenz...@gmail.com> thanks for the link! I didn't realize 
for all the different direct memory configs, even though they can be configured 
separately, it's only the sum that will be used to set JVM parameter. I think 
in our case, it was actually the network buffer size that was too large (we 
were seeing Akka exception), which happened to be fixed by increasing 
task.off-heap.size since that just make the direct memory larger. But agree 
most of the time we shouldn't need to change this value at all and tweaking 
jvm-overhead should be a much more common thing to do!

I will test our some more pipeline with different resource requirements to 
understand the memory profiles better! Thank you guys again!

________________________________
From: Steven Wu <stevenz...@gmail.com<mailto:stevenz...@gmail.com>>
Sent: Wednesday, April 29, 2020 10:12 AM
To: Xintong Song <tonysong...@gmail.com<mailto:tonysong...@gmail.com>>
Cc: Jiahui Jiang <qzhzm173...@hotmail.com<mailto:qzhzm173...@hotmail.com>>; 
user@flink.apache.org<mailto:user@flink.apache.org> 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Configuring taskmanager.memory.task.off-heap.size in Flink 1.10

Jiahui,

Based on my reading on the doc, for containerized environment, it is probably 
better to set `taskmanager.memory.process.size` to the container memory limit.
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-process-size

Then I typically set `taskmanager.memory.jvm-overhead.max` to allocate some 
overhead to non Flink memory. I think it matches the intention better than 
'taskmanager.memory.task.off-heap.size' , which is used for calculating JVM 
direct memory size.
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-jvm-overhead-max

I also found this Flink doc pretty helpful
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html

Hope that helps.

Thanks,
Steven

On Tue, Apr 28, 2020 at 8:56 PM Xintong Song 
<tonysong...@gmail.com<mailto:tonysong...@gmail.com>> wrote:
Hi Jiahui,

'taskmanager.memory.task.off-heap.size' accounts for the off-heap memory 
reserved for your job / operators. There are other configuration options 
accounting for the off-heap memory usages for other purposes, e.g., 
'taskmanager.memory.framework.off-heap'. The default 'task.off-heap.size' being 
0 only represents that in most cases user codes / operators do not use off-heap 
memory. User would need to explicitly increase this configuration if UDFs or 
libraries of the job uses off-heap memory.


Thank you~

Xintong Song


On Wed, Apr 29, 2020 at 11:07 AM Jiahui Jiang 
<qzhzm173...@hotmail.com<mailto:qzhzm173...@hotmail.com>> wrote:
Hello! We are migrating our pipeline from Flink 1.8 to 1.10 in Kubernetes.

In the first try, we simply copied the old 'taskmanager.heap.size' over to 
'taskmanager.memory.flink.size'. This caused the cluster to OOM.
Eventually we had to allocate a small amount of memory to 
'taskmanager.memory.task.off-heap.size' for it to stop failing. But we don't 
quite understand why this needs to be overriden.

I saw the default for 'taskmanager.memory.task.off-heap.size' is 0, does that 
mean in most cases task managers won't need off-heap memory? What are some 
examples that off-heap memory need to be non-zero?

Thank you!

Reply via email to