Thanks Andrey,
My question is related to

The FsStateBackend is encouraged for:

   - Jobs with large state, long windows, large key/value states.
   - All high-availability setups.

How large is large state without any overhead added by the framework?

Best,
Vishwas

On Wed, Aug 26, 2020 at 12:10 PM Andrey Zagrebin <azagre...@apache.org>
wrote:

> Hi Vishwas,
>
>  is this quantifiable with respect to JVM heap size on a single node
>> without the node being used for other tasks ?
>
>
> I don't quite understand this question. I believe the recommendation in
> docs has the same reason: use larger state objects so that the Java object
> overhead pays off.
> RocksDB keeps state in memory and on disk in the serialized form.
> Therefore it usually has a smaller footprint.
> Other jobs in the same task manager can potentially use other state
> backend depending on their state requirements.
> All tasks in the same task manager share the JVM heap as the task manager
> runs one JVM system process on the machine where it is deployed to.
>
> Best,
> Andrey
>
> On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara <vsirav...@gmail.com>
> wrote:
>
>> Hi Andrey,
>> Thanks for getting back to me so quickly. The screenshots are for 1GB
>> heap, the keys for the state are 20 character strings(20 bytes, we don't
>> have multi byte characters) . So the overhead seems to be quite large(4x)
>> even in comparison to the checkpoint size(which already adds an overhead) .
>> In this document [1] it says use FS/Heap backend for large states, is this
>> quantifiable with respect to JVM heap size on a single node without the
>> node being used for other tasks ?
>> I have attached GC log for TM and JM
>>
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend
>>
>> Best,
>> Vishwas
>>
>> On Wed, Aug 26, 2020 at 11:29 AM Andrey Zagrebin <azagre...@apache.org>
>> wrote:
>>
>>> Hi Vishwas,
>>>
>>> I believe the screenshots are from a heap size of 1GB?
>>>
>>> There are indeed many internal Flink state objects. They are overhead
>>> which is required for Flink to organise and track the state on-heap.
>>> Depending on the actual size of your state objects, the overhead may be
>>> relatively large or compared to the actual state size.
>>> For example, if you just keep integers in your state then overhead is
>>> probably a couple of times larger.
>>> It is not easy to estimate exactly on-heap size without through analysis.
>>>
>>> The checkpoint has little overhead and includes only actual state data -
>>> your serialized state objects which are probably smaller than their heap
>>> representation.
>>>
>>> So my guess is that the heap representation of the state is much bigger
>>> compared to the checkpoint size.
>>>
>>> I also cc other people who might add more thoughts about on-heap state
>>> size.
>>>
>>> You could also provide GC logs as Xintong suggested.
>>>
>>> Best,
>>> Andrey
>>>
>>> On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara <vsirav...@gmail.com>
>>> wrote:
>>>
>>>> Hi Andrey and Xintong. 2.5 GB is from the flink web UI(
>>>> checkpoint size). I took a heap dump and I could not find any memory leak
>>>> from user code. I see the similar behaviour on smaller heap size, on a 1GB
>>>> heap , the state size from checkpoint UI is 180 MB. Attaching some
>>>> screenshots of heap profiles if it helps. So when the state grows GC takes
>>>> a long time and sometimes the job manager removes TM slot because of
>>>> 10000ms timeout and tries to restore the task in another task manager, this
>>>> creates a cascading effect and affects other jobs running on the cluster.
>>>> My tests were run in a single node cluster with 1 TM and 4 task slots with
>>>> a parallelism of 4.
>>>>
>>>> Best,
>>>> Vishwas
>>>>
>>>> On Tue, Aug 25, 2020 at 10:02 AM Andrey Zagrebin <azagre...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Vishwas,
>>>>>
>>>>> If you use Flink 1.7, check the older memory model docs [1] because
>>>>> you referred to the new memory model of Flink 1.10 in your reference 2.
>>>>> Could you also share a screenshot where you get the state size of 2.5
>>>>> GB? Do you mean Flink WebUI?
>>>>> Generally, it is quite hard to estimate the on-heap size of state java
>>>>> objects. I never heard about such a Flink metric.
>>>>>
>>>>> Best,
>>>>> Andrey
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html
>>>>>
>>>>> On Mon, Aug 24, 2020 at 4:05 AM Xintong Song <tonysong...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Vishwas,
>>>>>>
>>>>>> According to the log, heap space is 13+GB, which looks fine.
>>>>>>
>>>>>> Several reason might lead to the heap space OOM:
>>>>>>
>>>>>>    - Memory leak
>>>>>>    - Not enough GC threads
>>>>>>    - Concurrent GC starts too late
>>>>>>    - ...
>>>>>>
>>>>>> I would suggest taking a look at the GC logs.
>>>>>>
>>>>>> Thank you~
>>>>>>
>>>>>> Xintong Song
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Aug 21, 2020 at 10:34 PM Vishwas Siravara <
>>>>>> vsirav...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi guys,
>>>>>>> I use flink version 1.7.2
>>>>>>> I have a stateful streaming job which uses a keyed process function.
>>>>>>> I use heap state backend. Although I set TM heap size to 16 GB, I get 
>>>>>>> OOM
>>>>>>> error when the state size is around 2.5 GB(from dashboard I get the 
>>>>>>> state
>>>>>>> size). I have set taskmanager.memory.fraction: 0.01 (which I believe is 
>>>>>>> for
>>>>>>> native calls off heap). [1] . For an 8 GB TM heap setting , the OOM 
>>>>>>> errors
>>>>>>> start showing up when the state size reaches 1 GB. This I find puzzling
>>>>>>> because I would expect to get a lot more space on the heap for state 
>>>>>>> when I
>>>>>>> change the size to 16 GB, what fraction of the heap is used by the
>>>>>>> framework ?[2]. Below is the stack trace for the exception. How can I
>>>>>>> increase my state size on the heap ?
>>>>>>>
>>>>>>> 2020-08-21 02:05:54,443 INFO
>>>>>>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Memory
>>>>>>> usage stats: [HEAP: 11920/13653/13653 MB, NON HEAP: 130/154/-1 MB
>>>>>>> (used/committed/max)]
>>>>>>> 2020-08-21 02:05:54,444 INFO
>>>>>>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Direct
>>>>>>> memory stats: Count: 32796, Total Capacity: 1074692520, Used Memory:
>>>>>>> 1074692521
>>>>>>> 2020-08-21 02:05:54,444 INFO
>>>>>>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - 
>>>>>>> Off-heap
>>>>>>> pool stats: [Code Cache: 51/55/240 MB (used/committed/max)], [Metaspace:
>>>>>>> 70/88/-1 MB (used/committed/max)], [Compressed Class Space: 8/11/1024 MB
>>>>>>> (used/committed/max)]
>>>>>>> 2020-08-21 02:05:54,444 INFO
>>>>>>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Garbage
>>>>>>> collector stats: [PS Scavenge, GC TIME (ms): 481035, GC COUNT: 1770], 
>>>>>>> [PS
>>>>>>> MarkSweep, GC TIME (ms): 8720945, GC COUNT: 265]
>>>>>>> 2020-08-21 02:05:54,446 INFO
>>>>>>>  org.apache.flink.runtime.taskmanager.Task                     -
>>>>>>> KeyedProcess (1/4) (23946753549293edc23e88f257980cb4) switched from 
>>>>>>> RUNNING
>>>>>>> to FAILED.
>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>         at java.lang.reflect.Array.newInstance(Array.java:75)
>>>>>>>         at java.util.Arrays.copyOf(Arrays.java:3212)
>>>>>>>         at java.util.Arrays.copyOf(Arrays.java:3181)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.resizeQueueArray(AbstractHeapPriorityQueue.java:153)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.increaseSizeByOne(HeapPriorityQueue.java:172)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:83)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper.lambda$keyGroupReader$0(HeapPriorityQueueSnapshotRestoreWrapper.java:85)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper$$Lambda$229/674995813.consume(Unknown
>>>>>>> Source)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:298)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104)
>>>>>>>         at
>>>>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>>>>>>>         at
>>>>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>>>>>>>         at
>>>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
>>>>>>>         at
>>>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>>>>>>>         at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>>>>>>>         at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>>>>>>         at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend
>>>>>>> [2]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#overview
>>>>>>>
>>>>>>> Best,
>>>>>>> Vishwas
>>>>>>>
>>>>>>

Reply via email to