Hi Vishwas,

I believe the screenshots are from a heap size of 1GB?

There are indeed many internal Flink state objects. They are overhead which
is required for Flink to organise and track the state on-heap.
Depending on the actual size of your state objects, the overhead may be
relatively large or compared to the actual state size.
For example, if you just keep integers in your state then overhead is
probably a couple of times larger.
It is not easy to estimate exactly on-heap size without through analysis.

The checkpoint has little overhead and includes only actual state data -
your serialized state objects which are probably smaller than their heap
representation.

So my guess is that the heap representation of the state is much bigger
compared to the checkpoint size.

I also cc other people who might add more thoughts about on-heap state size.

You could also provide GC logs as Xintong suggested.

Best,
Andrey

On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara <vsirav...@gmail.com>
wrote:

> Hi Andrey and Xintong. 2.5 GB is from the flink web UI( checkpoint size).
> I took a heap dump and I could not find any memory leak from user code. I
> see the similar behaviour on smaller heap size, on a 1GB heap , the state
> size from checkpoint UI is 180 MB. Attaching some  screenshots of heap
> profiles if it helps. So when the state grows GC takes a long time and
> sometimes the job manager removes TM slot because of 10000ms timeout and
> tries to restore the task in another task manager, this creates a cascading
> effect and affects other jobs running on the cluster. My tests were run in
> a single node cluster with 1 TM and 4 task slots with a parallelism of 4.
>
> Best,
> Vishwas
>
> On Tue, Aug 25, 2020 at 10:02 AM Andrey Zagrebin <azagre...@apache.org>
> wrote:
>
>> Hi Vishwas,
>>
>> If you use Flink 1.7, check the older memory model docs [1] because you
>> referred to the new memory model of Flink 1.10 in your reference 2.
>> Could you also share a screenshot where you get the state size of 2.5 GB?
>> Do you mean Flink WebUI?
>> Generally, it is quite hard to estimate the on-heap size of state java
>> objects. I never heard about such a Flink metric.
>>
>> Best,
>> Andrey
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html
>>
>> On Mon, Aug 24, 2020 at 4:05 AM Xintong Song <tonysong...@gmail.com>
>> wrote:
>>
>>> Hi Vishwas,
>>>
>>> According to the log, heap space is 13+GB, which looks fine.
>>>
>>> Several reason might lead to the heap space OOM:
>>>
>>>    - Memory leak
>>>    - Not enough GC threads
>>>    - Concurrent GC starts too late
>>>    - ...
>>>
>>> I would suggest taking a look at the GC logs.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Fri, Aug 21, 2020 at 10:34 PM Vishwas Siravara <vsirav...@gmail.com>
>>> wrote:
>>>
>>>> Hi guys,
>>>> I use flink version 1.7.2
>>>> I have a stateful streaming job which uses a keyed process function. I
>>>> use heap state backend. Although I set TM heap size to 16 GB, I get OOM
>>>> error when the state size is around 2.5 GB(from dashboard I get the state
>>>> size). I have set taskmanager.memory.fraction: 0.01 (which I believe is for
>>>> native calls off heap). [1] . For an 8 GB TM heap setting , the OOM errors
>>>> start showing up when the state size reaches 1 GB. This I find puzzling
>>>> because I would expect to get a lot more space on the heap for state when I
>>>> change the size to 16 GB, what fraction of the heap is used by the
>>>> framework ?[2]. Below is the stack trace for the exception. How can I
>>>> increase my state size on the heap ?
>>>>
>>>> 2020-08-21 02:05:54,443 INFO
>>>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Memory
>>>> usage stats: [HEAP: 11920/13653/13653 MB, NON HEAP: 130/154/-1 MB
>>>> (used/committed/max)]
>>>> 2020-08-21 02:05:54,444 INFO
>>>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Direct
>>>> memory stats: Count: 32796, Total Capacity: 1074692520, Used Memory:
>>>> 1074692521
>>>> 2020-08-21 02:05:54,444 INFO
>>>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Off-heap
>>>> pool stats: [Code Cache: 51/55/240 MB (used/committed/max)], [Metaspace:
>>>> 70/88/-1 MB (used/committed/max)], [Compressed Class Space: 8/11/1024 MB
>>>> (used/committed/max)]
>>>> 2020-08-21 02:05:54,444 INFO
>>>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Garbage
>>>> collector stats: [PS Scavenge, GC TIME (ms): 481035, GC COUNT: 1770], [PS
>>>> MarkSweep, GC TIME (ms): 8720945, GC COUNT: 265]
>>>> 2020-08-21 02:05:54,446 INFO  org.apache.flink.runtime.taskmanager.Task
>>>>                     - KeyedProcess (1/4) (23946753549293edc23e88f257980cb4)
>>>> switched from RUNNING to FAILED.
>>>> java.lang.OutOfMemoryError: Java heap space
>>>>         at java.lang.reflect.Array.newInstance(Array.java:75)
>>>>         at java.util.Arrays.copyOf(Arrays.java:3212)
>>>>         at java.util.Arrays.copyOf(Arrays.java:3181)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.resizeQueueArray(AbstractHeapPriorityQueue.java:153)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.increaseSizeByOne(HeapPriorityQueue.java:172)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:83)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper.lambda$keyGroupReader$0(HeapPriorityQueueSnapshotRestoreWrapper.java:85)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper$$Lambda$229/674995813.consume(Unknown
>>>> Source)
>>>>         at
>>>> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:298)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104)
>>>>         at
>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>>>>         at
>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>>>>         at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
>>>>         at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>>>>         at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>
>>>>
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#overview
>>>>
>>>> Best,
>>>> Vishwas
>>>>
>>>

Reply via email to