Hi Vishwas,

 is this quantifiable with respect to JVM heap size on a single node
> without the node being used for other tasks ?


I don't quite understand this question. I believe the recommendation in
docs has the same reason: use larger state objects so that the Java object
overhead pays off.
RocksDB keeps state in memory and on disk in the serialized form.
Therefore it usually has a smaller footprint.
Other jobs in the same task manager can potentially use other state backend
depending on their state requirements.
All tasks in the same task manager share the JVM heap as the task manager
runs one JVM system process on the machine where it is deployed to.

Best,
Andrey

On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara <vsirav...@gmail.com>
wrote:

> Hi Andrey,
> Thanks for getting back to me so quickly. The screenshots are for 1GB
> heap, the keys for the state are 20 character strings(20 bytes, we don't
> have multi byte characters) . So the overhead seems to be quite large(4x)
> even in comparison to the checkpoint size(which already adds an overhead) .
> In this document [1] it says use FS/Heap backend for large states, is this
> quantifiable with respect to JVM heap size on a single node without the
> node being used for other tasks ?
> I have attached GC log for TM and JM
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend
>
> Best,
> Vishwas
>
> On Wed, Aug 26, 2020 at 11:29 AM Andrey Zagrebin <azagre...@apache.org>
> wrote:
>
>> Hi Vishwas,
>>
>> I believe the screenshots are from a heap size of 1GB?
>>
>> There are indeed many internal Flink state objects. They are overhead
>> which is required for Flink to organise and track the state on-heap.
>> Depending on the actual size of your state objects, the overhead may be
>> relatively large or compared to the actual state size.
>> For example, if you just keep integers in your state then overhead is
>> probably a couple of times larger.
>> It is not easy to estimate exactly on-heap size without through analysis.
>>
>> The checkpoint has little overhead and includes only actual state data -
>> your serialized state objects which are probably smaller than their heap
>> representation.
>>
>> So my guess is that the heap representation of the state is much bigger
>> compared to the checkpoint size.
>>
>> I also cc other people who might add more thoughts about on-heap state
>> size.
>>
>> You could also provide GC logs as Xintong suggested.
>>
>> Best,
>> Andrey
>>
>> On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara <vsirav...@gmail.com>
>> wrote:
>>
>>> Hi Andrey and Xintong. 2.5 GB is from the flink web UI(
>>> checkpoint size). I took a heap dump and I could not find any memory leak
>>> from user code. I see the similar behaviour on smaller heap size, on a 1GB
>>> heap , the state size from checkpoint UI is 180 MB. Attaching some
>>> screenshots of heap profiles if it helps. So when the state grows GC takes
>>> a long time and sometimes the job manager removes TM slot because of
>>> 10000ms timeout and tries to restore the task in another task manager, this
>>> creates a cascading effect and affects other jobs running on the cluster.
>>> My tests were run in a single node cluster with 1 TM and 4 task slots with
>>> a parallelism of 4.
>>>
>>> Best,
>>> Vishwas
>>>
>>> On Tue, Aug 25, 2020 at 10:02 AM Andrey Zagrebin <azagre...@apache.org>
>>> wrote:
>>>
>>>> Hi Vishwas,
>>>>
>>>> If you use Flink 1.7, check the older memory model docs [1] because you
>>>> referred to the new memory model of Flink 1.10 in your reference 2.
>>>> Could you also share a screenshot where you get the state size of 2.5
>>>> GB? Do you mean Flink WebUI?
>>>> Generally, it is quite hard to estimate the on-heap size of state java
>>>> objects. I never heard about such a Flink metric.
>>>>
>>>> Best,
>>>> Andrey
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html
>>>>
>>>> On Mon, Aug 24, 2020 at 4:05 AM Xintong Song <tonysong...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Vishwas,
>>>>>
>>>>> According to the log, heap space is 13+GB, which looks fine.
>>>>>
>>>>> Several reason might lead to the heap space OOM:
>>>>>
>>>>>    - Memory leak
>>>>>    - Not enough GC threads
>>>>>    - Concurrent GC starts too late
>>>>>    - ...
>>>>>
>>>>> I would suggest taking a look at the GC logs.
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Aug 21, 2020 at 10:34 PM Vishwas Siravara <vsirav...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi guys,
>>>>>> I use flink version 1.7.2
>>>>>> I have a stateful streaming job which uses a keyed process function.
>>>>>> I use heap state backend. Although I set TM heap size to 16 GB, I get OOM
>>>>>> error when the state size is around 2.5 GB(from dashboard I get the state
>>>>>> size). I have set taskmanager.memory.fraction: 0.01 (which I believe is 
>>>>>> for
>>>>>> native calls off heap). [1] . For an 8 GB TM heap setting , the OOM 
>>>>>> errors
>>>>>> start showing up when the state size reaches 1 GB. This I find puzzling
>>>>>> because I would expect to get a lot more space on the heap for state 
>>>>>> when I
>>>>>> change the size to 16 GB, what fraction of the heap is used by the
>>>>>> framework ?[2]. Below is the stack trace for the exception. How can I
>>>>>> increase my state size on the heap ?
>>>>>>
>>>>>> 2020-08-21 02:05:54,443 INFO
>>>>>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Memory
>>>>>> usage stats: [HEAP: 11920/13653/13653 MB, NON HEAP: 130/154/-1 MB
>>>>>> (used/committed/max)]
>>>>>> 2020-08-21 02:05:54,444 INFO
>>>>>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Direct
>>>>>> memory stats: Count: 32796, Total Capacity: 1074692520, Used Memory:
>>>>>> 1074692521
>>>>>> 2020-08-21 02:05:54,444 INFO
>>>>>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Off-heap
>>>>>> pool stats: [Code Cache: 51/55/240 MB (used/committed/max)], [Metaspace:
>>>>>> 70/88/-1 MB (used/committed/max)], [Compressed Class Space: 8/11/1024 MB
>>>>>> (used/committed/max)]
>>>>>> 2020-08-21 02:05:54,444 INFO
>>>>>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Garbage
>>>>>> collector stats: [PS Scavenge, GC TIME (ms): 481035, GC COUNT: 1770], [PS
>>>>>> MarkSweep, GC TIME (ms): 8720945, GC COUNT: 265]
>>>>>> 2020-08-21 02:05:54,446 INFO
>>>>>>  org.apache.flink.runtime.taskmanager.Task                     -
>>>>>> KeyedProcess (1/4) (23946753549293edc23e88f257980cb4) switched from 
>>>>>> RUNNING
>>>>>> to FAILED.
>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>         at java.lang.reflect.Array.newInstance(Array.java:75)
>>>>>>         at java.util.Arrays.copyOf(Arrays.java:3212)
>>>>>>         at java.util.Arrays.copyOf(Arrays.java:3181)
>>>>>>         at
>>>>>> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.resizeQueueArray(AbstractHeapPriorityQueue.java:153)
>>>>>>         at
>>>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.increaseSizeByOne(HeapPriorityQueue.java:172)
>>>>>>         at
>>>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:83)
>>>>>>         at
>>>>>> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
>>>>>>         at
>>>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
>>>>>>         at
>>>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper.lambda$keyGroupReader$0(HeapPriorityQueueSnapshotRestoreWrapper.java:85)
>>>>>>         at
>>>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper$$Lambda$229/674995813.consume(Unknown
>>>>>> Source)
>>>>>>         at
>>>>>> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:298)
>>>>>>         at
>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492)
>>>>>>         at
>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453)
>>>>>>         at
>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410)
>>>>>>         at
>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358)
>>>>>>         at
>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104)
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>>>>>>         at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>>>>>         at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>>>>>         at
>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>>>
>>>>>>
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend
>>>>>> [2]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#overview
>>>>>>
>>>>>> Best,
>>>>>> Vishwas
>>>>>>
>>>>>

Reply via email to