Hi
   The stack said that the job failed when restoring from
checkpoint/savepoint. If encounter this when in failover, maybe you can try
to find out the root cause which caused the job failover.
   For the stack, it because when restoring `HeapPriorityQueue`, there
would ensure there are enough size by resizeQueueArray[1](use Arrays.copy),
maybe this is the problem, could you please take heap dump when exit with
OOM?

[1]
https://github.com/apache/flink/blob/5e0b7970a9aea74aba4ebffaa75c37e960799b93/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapPriorityQueue.java#L151

Best,
Congxian


Robert Metzger <rmetz...@apache.org> 于2020年8月27日周四 下午10:59写道:

> Hi Vishwas,
>
> Your scenario sounds like RocksDB would actually be recommended. I would
> always suggest to start with RocksDB, unless your state is really small
> compared to the available memory, or you need to optimize for performance.
> But maybe your job is running fine with RocksDB (performance wise), then
> there's no need to go into the details of heap memory management with Flink.
>
>
>
> On Wed, Aug 26, 2020 at 7:21 PM Vishwas Siravara <vsirav...@gmail.com>
> wrote:
>
>> Thanks Andrey,
>> My question is related to
>>
>> The FsStateBackend is encouraged for:
>>
>>    - Jobs with large state, long windows, large key/value states.
>>    - All high-availability setups.
>>
>> How large is large state without any overhead added by the framework?
>>
>> Best,
>> Vishwas
>>
>> On Wed, Aug 26, 2020 at 12:10 PM Andrey Zagrebin <azagre...@apache.org>
>> wrote:
>>
>>> Hi Vishwas,
>>>
>>>  is this quantifiable with respect to JVM heap size on a single node
>>>> without the node being used for other tasks ?
>>>
>>>
>>> I don't quite understand this question. I believe the recommendation in
>>> docs has the same reason: use larger state objects so that the Java object
>>> overhead pays off.
>>> RocksDB keeps state in memory and on disk in the serialized form.
>>> Therefore it usually has a smaller footprint.
>>> Other jobs in the same task manager can potentially use other state
>>> backend depending on their state requirements.
>>> All tasks in the same task manager share the JVM heap as the task
>>> manager runs one JVM system process on the machine where it is deployed to.
>>>
>>> Best,
>>> Andrey
>>>
>>> On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara <vsirav...@gmail.com>
>>> wrote:
>>>
>>>> Hi Andrey,
>>>> Thanks for getting back to me so quickly. The screenshots are for 1GB
>>>> heap, the keys for the state are 20 character strings(20 bytes, we don't
>>>> have multi byte characters) . So the overhead seems to be quite large(4x)
>>>> even in comparison to the checkpoint size(which already adds an overhead) .
>>>> In this document [1] it says use FS/Heap backend for large states, is this
>>>> quantifiable with respect to JVM heap size on a single node without the
>>>> node being used for other tasks ?
>>>> I have attached GC log for TM and JM
>>>>
>>>>
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend
>>>>
>>>> Best,
>>>> Vishwas
>>>>
>>>> On Wed, Aug 26, 2020 at 11:29 AM Andrey Zagrebin <azagre...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Vishwas,
>>>>>
>>>>> I believe the screenshots are from a heap size of 1GB?
>>>>>
>>>>> There are indeed many internal Flink state objects. They are overhead
>>>>> which is required for Flink to organise and track the state on-heap.
>>>>> Depending on the actual size of your state objects, the overhead may
>>>>> be relatively large or compared to the actual state size.
>>>>> For example, if you just keep integers in your state then overhead is
>>>>> probably a couple of times larger.
>>>>> It is not easy to estimate exactly on-heap size without through
>>>>> analysis.
>>>>>
>>>>> The checkpoint has little overhead and includes only actual state data
>>>>> - your serialized state objects which are probably smaller than their heap
>>>>> representation.
>>>>>
>>>>> So my guess is that the heap representation of the state is much
>>>>> bigger compared to the checkpoint size.
>>>>>
>>>>> I also cc other people who might add more thoughts about on-heap state
>>>>> size.
>>>>>
>>>>> You could also provide GC logs as Xintong suggested.
>>>>>
>>>>> Best,
>>>>> Andrey
>>>>>
>>>>> On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara <vsirav...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Andrey and Xintong. 2.5 GB is from the flink web UI(
>>>>>> checkpoint size). I took a heap dump and I could not find any memory leak
>>>>>> from user code. I see the similar behaviour on smaller heap size, on a 
>>>>>> 1GB
>>>>>> heap , the state size from checkpoint UI is 180 MB. Attaching some
>>>>>> screenshots of heap profiles if it helps. So when the state grows GC 
>>>>>> takes
>>>>>> a long time and sometimes the job manager removes TM slot because of
>>>>>> 10000ms timeout and tries to restore the task in another task manager, 
>>>>>> this
>>>>>> creates a cascading effect and affects other jobs running on the cluster.
>>>>>> My tests were run in a single node cluster with 1 TM and 4 task slots 
>>>>>> with
>>>>>> a parallelism of 4.
>>>>>>
>>>>>> Best,
>>>>>> Vishwas
>>>>>>
>>>>>> On Tue, Aug 25, 2020 at 10:02 AM Andrey Zagrebin <
>>>>>> azagre...@apache.org> wrote:
>>>>>>
>>>>>>> Hi Vishwas,
>>>>>>>
>>>>>>> If you use Flink 1.7, check the older memory model docs [1] because
>>>>>>> you referred to the new memory model of Flink 1.10 in your reference 2.
>>>>>>> Could you also share a screenshot where you get the state size of
>>>>>>> 2.5 GB? Do you mean Flink WebUI?
>>>>>>> Generally, it is quite hard to estimate the on-heap size of state
>>>>>>> java objects. I never heard about such a Flink metric.
>>>>>>>
>>>>>>> Best,
>>>>>>> Andrey
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html
>>>>>>>
>>>>>>> On Mon, Aug 24, 2020 at 4:05 AM Xintong Song <tonysong...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Vishwas,
>>>>>>>>
>>>>>>>> According to the log, heap space is 13+GB, which looks fine.
>>>>>>>>
>>>>>>>> Several reason might lead to the heap space OOM:
>>>>>>>>
>>>>>>>>    - Memory leak
>>>>>>>>    - Not enough GC threads
>>>>>>>>    - Concurrent GC starts too late
>>>>>>>>    - ...
>>>>>>>>
>>>>>>>> I would suggest taking a look at the GC logs.
>>>>>>>>
>>>>>>>> Thank you~
>>>>>>>>
>>>>>>>> Xintong Song
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Aug 21, 2020 at 10:34 PM Vishwas Siravara <
>>>>>>>> vsirav...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi guys,
>>>>>>>>> I use flink version 1.7.2
>>>>>>>>> I have a stateful streaming job which uses a keyed process
>>>>>>>>> function. I use heap state backend. Although I set TM heap size to 16 
>>>>>>>>> GB, I
>>>>>>>>> get OOM error when the state size is around 2.5 GB(from dashboard I 
>>>>>>>>> get the
>>>>>>>>> state size). I have set taskmanager.memory.fraction: 0.01 (which I 
>>>>>>>>> believe
>>>>>>>>> is for native calls off heap). [1] . For an 8 GB TM heap setting , 
>>>>>>>>> the OOM
>>>>>>>>> errors start showing up when the state size reaches 1 GB. This I find
>>>>>>>>> puzzling because I would expect to get a lot more space on the heap 
>>>>>>>>> for
>>>>>>>>> state when I change the size to 16 GB, what fraction of the heap is 
>>>>>>>>> used by
>>>>>>>>> the framework ?[2]. Below is the stack trace for the exception. How 
>>>>>>>>> can I
>>>>>>>>> increase my state size on the heap ?
>>>>>>>>>
>>>>>>>>> 2020-08-21 02:05:54,443 INFO
>>>>>>>>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - 
>>>>>>>>> Memory
>>>>>>>>> usage stats: [HEAP: 11920/13653/13653 MB, NON HEAP: 130/154/-1 MB
>>>>>>>>> (used/committed/max)]
>>>>>>>>> 2020-08-21 02:05:54,444 INFO
>>>>>>>>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - 
>>>>>>>>> Direct
>>>>>>>>> memory stats: Count: 32796, Total Capacity: 1074692520, Used Memory:
>>>>>>>>> 1074692521
>>>>>>>>> 2020-08-21 02:05:54,444 INFO
>>>>>>>>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - 
>>>>>>>>> Off-heap
>>>>>>>>> pool stats: [Code Cache: 51/55/240 MB (used/committed/max)], 
>>>>>>>>> [Metaspace:
>>>>>>>>> 70/88/-1 MB (used/committed/max)], [Compressed Class Space: 8/11/1024 
>>>>>>>>> MB
>>>>>>>>> (used/committed/max)]
>>>>>>>>> 2020-08-21 02:05:54,444 INFO
>>>>>>>>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - 
>>>>>>>>> Garbage
>>>>>>>>> collector stats: [PS Scavenge, GC TIME (ms): 481035, GC COUNT: 1770], 
>>>>>>>>> [PS
>>>>>>>>> MarkSweep, GC TIME (ms): 8720945, GC COUNT: 265]
>>>>>>>>> 2020-08-21 02:05:54,446 INFO
>>>>>>>>>  org.apache.flink.runtime.taskmanager.Task                     -
>>>>>>>>> KeyedProcess (1/4) (23946753549293edc23e88f257980cb4) switched from 
>>>>>>>>> RUNNING
>>>>>>>>> to FAILED.
>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>         at java.lang.reflect.Array.newInstance(Array.java:75)
>>>>>>>>>         at java.util.Arrays.copyOf(Arrays.java:3212)
>>>>>>>>>         at java.util.Arrays.copyOf(Arrays.java:3181)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.resizeQueueArray(AbstractHeapPriorityQueue.java:153)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.increaseSizeByOne(HeapPriorityQueue.java:172)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:83)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper.lambda$keyGroupReader$0(HeapPriorityQueueSnapshotRestoreWrapper.java:85)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper$$Lambda$229/674995813.consume(Unknown
>>>>>>>>> Source)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:298)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>>>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend
>>>>>>>>> [2]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#overview
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Vishwas
>>>>>>>>>
>>>>>>>>

Reply via email to