Hi Shubham,

Java heap memory cannot cause a container memory exceeding. Heap memory is
strictly limited by the JVM `-Xmx` parameter. If the program does need more
memory than the limit, it will run into a heap space OOM, rather than
implicitly using more memory than the limit.

Several reasons that might lead to container memory exceeding.
- RocksDB, whose memory controlling is based on estimation rather than hard
limit. This is one of the most common reasons for such memory exceedings.
However, usually the extra memory usage introduced by RocksDB, if there's
any, should not be too large. Given that your container size is 12GB and
Flink only plans to use 10GB, I'm not sure whether RocksDB is the cause in
your case. I've CC'ed @Yun Tang, who is the expert of Flink's RocksDB state
backend.
- Does your job use mmap memory? MMap memory, if used, is controlled by the
operating system, not Flink. Depending on your Yarn cgroup configurations,
some clusters would also count that as part of the container memory
consumption.
- Native memory leaks in user code dependencies and libraries could also
lead to container memory exceeding.

Another suggestion is, do not trust Flink's "Non-Heap" metrics. It is
practically helpless and misleading. The "Non-Heap" accounts for SOME of
the non-heap memory usage, but NOT ALL of them. The community is working on
a new set of metrics and Web UI for the task manager memory tuning.

Thank you~

Xintong Song



On Sun, Sep 20, 2020 at 12:10 AM Shubham Kumar <shubhamkumar1...@gmail.com>
wrote:

> Hey everyone,
>
> We had deployed a streaming job using Flink 1.10.1 one month back and now
> we are encountering a Yarn container killed due to memory issues very
> frequently. I am trying to figure out the root cause of this issue in order
> to fix it.
>
> We have a streaming job whose basic structure looks like this:
> - Read 6 kafka streams and combine stats from them (union) to form a
> single stream
> - stream.keyBy(MyKey)
>              .window(TumblingEventTimeWindows.of(Time.minutes(1)))
>              .reduce(MyReduceFunction)
>              .addSink(new FlinkKafkaProducer011<>...);
>
> We are using RocksDB as state backend. In flink-conf.yaml, we used
> taskmanager.memory.process.size = 10GB with a parallelism of 12 and only
> one slot per task manager.
>
> So, a taskmanager process gets started with the following memory
> components as indicated in logs:
>
> TaskExecutor container... will be started on ... with
>> TaskExecutorProcessSpec {cpuCores=1.0, frameworkHeapSize=128.000mb (
>> 134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes),
>> taskHeapSize=4.125gb (4429184954 bytes), taskOffHeapSize=0 bytes,
>> networkMemSize=896.000mb (939524110 bytes), managedMemorySize=3.500gb (
>> 3758096440 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes),
>> jvmOverheadSize=1024.000mb (1073741824 bytes)}.
>>
>
>>
>
>  which are as per defaults.
>
> Now, after 25 days we started encountering the following yarn container
> kill error:
>
>> Association with remote system [akka.tcp://flink@...] has failed,
>> address is now gated for [50] ms. Reason: [Association failed with
>> [akka.tcp://flink@...]] Caused by: [java.net.ConnectException: Connection
>> refused: .../...:37679]
>> 2020-09-09 00:53:24 INFO Closing TaskExecutor connection
>> container_e193_1592804717489_149347_01_000011 because: [2020-09-09 00:53:
>> 21.417]Container 
>> [pid=44371,containerID=container_e193_1592804717489_149347_01_000011]
>> is running beyond physical memory limits. Current usage: 12.0 GB of 12 GB
>> physical memory used; 14.4 GB of 25.2 GB virtual memory used. Killing
>> container.
>>
>
> Yarn container size is 12GB as it is only allowed as a multiple of 3 GB
> (as per our settings).
>
> Now, when the YARN reallocates a new container, the program starts again
> (without any issues) and after a few hours another container is killed with
> the same error and the cycle repeats.
> At this point, I want to debug it as a running process without changing or
> playing around with various config options for memory as I don't think just
> to reproduce the error, I want to wait for ~1 month.
>
> I have tried to figure out something from Graphite metrics (see
> attachments):
> [1]: JVM Heap Memory (First 25 days) -> The memory goes up and after
> reaching a point goes does and again starts going up. (No container kills
> were encountered until 09/09/2020, program started on 14/08/2020)
> [2]: JVM Heap Memory (Recent) -> The memory is still going up but it seems
> it doesn't even reaches its peak, but instead container is killed before
> that itself (within a few hours)
>
> From [1] and [2], JVM heap memory should not rise up I think, but that
> doesn't explain container kill in [2] case if JVM heap memory was the issue
> causing container kill.
>
> [3]: Direct Memory and Off heap Memory -> I don't think this is causing
> the issue as most of the network buffers are free and off heap memory is
> well below threshold.
>
> At this point I thought RocksDB might be the culprit. I am aware that it
> uses the managed memory limits (I haven't changed any default config) which
> is completely off heap. But when I see the rocksDB size maintained at
> location:
>
>
>> /data_4/yarn-nm-local-dir/usercache/root/appcache/application_.../flink-io-a48d1127-58a1-41c5-a5f0-32c5180fe74d/job_0bff1881431b5774c3b496a98febed1a_op_WindowOperator_4061fbe16fb95459a1a8d207644e2e63__4_12__uuid_9fe0b2ff-24bc-4301-8044-3fe8e1b3a3a0/db/
>
>
> It is only 17MB which doesn't seem much. I also took a heap dump
> of org.apache.flink.yarn.YarnTaskExecutorRunner process but it shows only
> 30MB of data is being used (not sure what I am missing here as it doesn't
> match with metrics shown by flink).
>
> Although top -p 'pid' (for task manager process) does show RES = 10-12 GB
> for every container constantly going up and eventually dies.
>
> Has someone encountered a similar situation or have guidelines that I can
> continue with to figure out and debug the issue? Let me know if there is
> anything else that you might wanna know.
>
> --
> Thanks & Regards
>
> Shubham Kumar
>
>

Reply via email to