Re: OOM error for heap state backend.
Hi The stack said that the job failed when restoring from checkpoint/savepoint. If encounter this when in failover, maybe you can try to find out the root cause which caused the job failover. For the stack, it because when restoring `HeapPriorityQueue`, there would ensure there are enough size by resizeQueueArray[1](use Arrays.copy), maybe this is the problem, could you please take heap dump when exit with OOM? [1] https://github.com/apache/flink/blob/5e0b7970a9aea74aba4ebffaa75c37e960799b93/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapPriorityQueue.java#L151 Best, Congxian Robert Metzger 于2020年8月27日周四 下午10:59写道: > Hi Vishwas, > > Your scenario sounds like RocksDB would actually be recommended. I would > always suggest to start with RocksDB, unless your state is really small > compared to the available memory, or you need to optimize for performance. > But maybe your job is running fine with RocksDB (performance wise), then > there's no need to go into the details of heap memory management with Flink. > > > > On Wed, Aug 26, 2020 at 7:21 PM Vishwas Siravara > wrote: > >> Thanks Andrey, >> My question is related to >> >> The FsStateBackend is encouraged for: >> >>- Jobs with large state, long windows, large key/value states. >>- All high-availability setups. >> >> How large is large state without any overhead added by the framework? >> >> Best, >> Vishwas >> >> On Wed, Aug 26, 2020 at 12:10 PM Andrey Zagrebin >> wrote: >> >>> Hi Vishwas, >>> >>> is this quantifiable with respect to JVM heap size on a single node without the node being used for other tasks ? >>> >>> >>> I don't quite understand this question. I believe the recommendation in >>> docs has the same reason: use larger state objects so that the Java object >>> overhead pays off. >>> RocksDB keeps state in memory and on disk in the serialized form. >>> Therefore it usually has a smaller footprint. >>> Other jobs in the same task manager can potentially use other state >>> backend depending on their state requirements. >>> All tasks in the same task manager share the JVM heap as the task >>> manager runs one JVM system process on the machine where it is deployed to. >>> >>> Best, >>> Andrey >>> >>> On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara >>> wrote: >>> Hi Andrey, Thanks for getting back to me so quickly. The screenshots are for 1GB heap, the keys for the state are 20 character strings(20 bytes, we don't have multi byte characters) . So the overhead seems to be quite large(4x) even in comparison to the checkpoint size(which already adds an overhead) . In this document [1] it says use FS/Heap backend for large states, is this quantifiable with respect to JVM heap size on a single node without the node being used for other tasks ? I have attached GC log for TM and JM [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend Best, Vishwas On Wed, Aug 26, 2020 at 11:29 AM Andrey Zagrebin wrote: > Hi Vishwas, > > I believe the screenshots are from a heap size of 1GB? > > There are indeed many internal Flink state objects. They are overhead > which is required for Flink to organise and track the state on-heap. > Depending on the actual size of your state objects, the overhead may > be relatively large or compared to the actual state size. > For example, if you just keep integers in your state then overhead is > probably a couple of times larger. > It is not easy to estimate exactly on-heap size without through > analysis. > > The checkpoint has little overhead and includes only actual state data > - your serialized state objects which are probably smaller than their heap > representation. > > So my guess is that the heap representation of the state is much > bigger compared to the checkpoint size. > > I also cc other people who might add more thoughts about on-heap state > size. > > You could also provide GC logs as Xintong suggested. > > Best, > Andrey > > On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara > wrote: > >> Hi Andrey and Xintong. 2.5 GB is from the flink web UI( >> checkpoint size). I took a heap dump and I could not find any memory leak >> from user code. I see the similar behaviour on smaller heap size, on a >> 1GB >> heap , the state size from checkpoint UI is 180 MB. Attaching some >> screenshots of heap profiles if it helps. So when the state grows GC >> takes >> a long time and sometimes the job manager removes TM slot because of >> 1ms timeout and tries to restore the task in another task manager, >> this >> creates a cascading effect and affects other jobs running on the cluster. >> My tests were run in a single node cluster with 1 TM
Re: OOM error for heap state backend.
Hi Vishwas, Your scenario sounds like RocksDB would actually be recommended. I would always suggest to start with RocksDB, unless your state is really small compared to the available memory, or you need to optimize for performance. But maybe your job is running fine with RocksDB (performance wise), then there's no need to go into the details of heap memory management with Flink. On Wed, Aug 26, 2020 at 7:21 PM Vishwas Siravara wrote: > Thanks Andrey, > My question is related to > > The FsStateBackend is encouraged for: > >- Jobs with large state, long windows, large key/value states. >- All high-availability setups. > > How large is large state without any overhead added by the framework? > > Best, > Vishwas > > On Wed, Aug 26, 2020 at 12:10 PM Andrey Zagrebin > wrote: > >> Hi Vishwas, >> >> is this quantifiable with respect to JVM heap size on a single node >>> without the node being used for other tasks ? >> >> >> I don't quite understand this question. I believe the recommendation in >> docs has the same reason: use larger state objects so that the Java object >> overhead pays off. >> RocksDB keeps state in memory and on disk in the serialized form. >> Therefore it usually has a smaller footprint. >> Other jobs in the same task manager can potentially use other state >> backend depending on their state requirements. >> All tasks in the same task manager share the JVM heap as the task manager >> runs one JVM system process on the machine where it is deployed to. >> >> Best, >> Andrey >> >> On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara >> wrote: >> >>> Hi Andrey, >>> Thanks for getting back to me so quickly. The screenshots are for 1GB >>> heap, the keys for the state are 20 character strings(20 bytes, we don't >>> have multi byte characters) . So the overhead seems to be quite large(4x) >>> even in comparison to the checkpoint size(which already adds an overhead) . >>> In this document [1] it says use FS/Heap backend for large states, is this >>> quantifiable with respect to JVM heap size on a single node without the >>> node being used for other tasks ? >>> I have attached GC log for TM and JM >>> >>> >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend >>> >>> Best, >>> Vishwas >>> >>> On Wed, Aug 26, 2020 at 11:29 AM Andrey Zagrebin >>> wrote: >>> Hi Vishwas, I believe the screenshots are from a heap size of 1GB? There are indeed many internal Flink state objects. They are overhead which is required for Flink to organise and track the state on-heap. Depending on the actual size of your state objects, the overhead may be relatively large or compared to the actual state size. For example, if you just keep integers in your state then overhead is probably a couple of times larger. It is not easy to estimate exactly on-heap size without through analysis. The checkpoint has little overhead and includes only actual state data - your serialized state objects which are probably smaller than their heap representation. So my guess is that the heap representation of the state is much bigger compared to the checkpoint size. I also cc other people who might add more thoughts about on-heap state size. You could also provide GC logs as Xintong suggested. Best, Andrey On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara wrote: > Hi Andrey and Xintong. 2.5 GB is from the flink web UI( > checkpoint size). I took a heap dump and I could not find any memory leak > from user code. I see the similar behaviour on smaller heap size, on a 1GB > heap , the state size from checkpoint UI is 180 MB. Attaching some > screenshots of heap profiles if it helps. So when the state grows GC takes > a long time and sometimes the job manager removes TM slot because of > 1ms timeout and tries to restore the task in another task manager, > this > creates a cascading effect and affects other jobs running on the cluster. > My tests were run in a single node cluster with 1 TM and 4 task slots with > a parallelism of 4. > > Best, > Vishwas > > On Tue, Aug 25, 2020 at 10:02 AM Andrey Zagrebin > wrote: > >> Hi Vishwas, >> >> If you use Flink 1.7, check the older memory model docs [1] because >> you referred to the new memory model of Flink 1.10 in your reference 2. >> Could you also share a screenshot where you get the state size of 2.5 >> GB? Do you mean Flink WebUI? >> Generally, it is quite hard to estimate the on-heap size of state >> java objects. I never heard about such a Flink metric. >> >> Best, >> Andrey >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html >> >> On Mon, Aug 24, 2020 at 4:05 AM Xintong Song >> wrote:
Re: OOM error for heap state backend.
Thanks Andrey, My question is related to The FsStateBackend is encouraged for: - Jobs with large state, long windows, large key/value states. - All high-availability setups. How large is large state without any overhead added by the framework? Best, Vishwas On Wed, Aug 26, 2020 at 12:10 PM Andrey Zagrebin wrote: > Hi Vishwas, > > is this quantifiable with respect to JVM heap size on a single node >> without the node being used for other tasks ? > > > I don't quite understand this question. I believe the recommendation in > docs has the same reason: use larger state objects so that the Java object > overhead pays off. > RocksDB keeps state in memory and on disk in the serialized form. > Therefore it usually has a smaller footprint. > Other jobs in the same task manager can potentially use other state > backend depending on their state requirements. > All tasks in the same task manager share the JVM heap as the task manager > runs one JVM system process on the machine where it is deployed to. > > Best, > Andrey > > On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara > wrote: > >> Hi Andrey, >> Thanks for getting back to me so quickly. The screenshots are for 1GB >> heap, the keys for the state are 20 character strings(20 bytes, we don't >> have multi byte characters) . So the overhead seems to be quite large(4x) >> even in comparison to the checkpoint size(which already adds an overhead) . >> In this document [1] it says use FS/Heap backend for large states, is this >> quantifiable with respect to JVM heap size on a single node without the >> node being used for other tasks ? >> I have attached GC log for TM and JM >> >> >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend >> >> Best, >> Vishwas >> >> On Wed, Aug 26, 2020 at 11:29 AM Andrey Zagrebin >> wrote: >> >>> Hi Vishwas, >>> >>> I believe the screenshots are from a heap size of 1GB? >>> >>> There are indeed many internal Flink state objects. They are overhead >>> which is required for Flink to organise and track the state on-heap. >>> Depending on the actual size of your state objects, the overhead may be >>> relatively large or compared to the actual state size. >>> For example, if you just keep integers in your state then overhead is >>> probably a couple of times larger. >>> It is not easy to estimate exactly on-heap size without through analysis. >>> >>> The checkpoint has little overhead and includes only actual state data - >>> your serialized state objects which are probably smaller than their heap >>> representation. >>> >>> So my guess is that the heap representation of the state is much bigger >>> compared to the checkpoint size. >>> >>> I also cc other people who might add more thoughts about on-heap state >>> size. >>> >>> You could also provide GC logs as Xintong suggested. >>> >>> Best, >>> Andrey >>> >>> On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara >>> wrote: >>> Hi Andrey and Xintong. 2.5 GB is from the flink web UI( checkpoint size). I took a heap dump and I could not find any memory leak from user code. I see the similar behaviour on smaller heap size, on a 1GB heap , the state size from checkpoint UI is 180 MB. Attaching some screenshots of heap profiles if it helps. So when the state grows GC takes a long time and sometimes the job manager removes TM slot because of 1ms timeout and tries to restore the task in another task manager, this creates a cascading effect and affects other jobs running on the cluster. My tests were run in a single node cluster with 1 TM and 4 task slots with a parallelism of 4. Best, Vishwas On Tue, Aug 25, 2020 at 10:02 AM Andrey Zagrebin wrote: > Hi Vishwas, > > If you use Flink 1.7, check the older memory model docs [1] because > you referred to the new memory model of Flink 1.10 in your reference 2. > Could you also share a screenshot where you get the state size of 2.5 > GB? Do you mean Flink WebUI? > Generally, it is quite hard to estimate the on-heap size of state java > objects. I never heard about such a Flink metric. > > Best, > Andrey > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html > > On Mon, Aug 24, 2020 at 4:05 AM Xintong Song > wrote: > >> Hi Vishwas, >> >> According to the log, heap space is 13+GB, which looks fine. >> >> Several reason might lead to the heap space OOM: >> >>- Memory leak >>- Not enough GC threads >>- Concurrent GC starts too late >>- ... >> >> I would suggest taking a look at the GC logs. >> >> Thank you~ >> >> Xintong Song >> >> >> >> On Fri, Aug 21, 2020 at 10:34 PM Vishwas Siravara < >> vsirav...@gmail.com> wrote: >> >>> Hi guys, >>> I use flink version 1.7.2 >>> I have a
Re: OOM error for heap state backend.
Hi Vishwas, is this quantifiable with respect to JVM heap size on a single node > without the node being used for other tasks ? I don't quite understand this question. I believe the recommendation in docs has the same reason: use larger state objects so that the Java object overhead pays off. RocksDB keeps state in memory and on disk in the serialized form. Therefore it usually has a smaller footprint. Other jobs in the same task manager can potentially use other state backend depending on their state requirements. All tasks in the same task manager share the JVM heap as the task manager runs one JVM system process on the machine where it is deployed to. Best, Andrey On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara wrote: > Hi Andrey, > Thanks for getting back to me so quickly. The screenshots are for 1GB > heap, the keys for the state are 20 character strings(20 bytes, we don't > have multi byte characters) . So the overhead seems to be quite large(4x) > even in comparison to the checkpoint size(which already adds an overhead) . > In this document [1] it says use FS/Heap backend for large states, is this > quantifiable with respect to JVM heap size on a single node without the > node being used for other tasks ? > I have attached GC log for TM and JM > > > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend > > Best, > Vishwas > > On Wed, Aug 26, 2020 at 11:29 AM Andrey Zagrebin > wrote: > >> Hi Vishwas, >> >> I believe the screenshots are from a heap size of 1GB? >> >> There are indeed many internal Flink state objects. They are overhead >> which is required for Flink to organise and track the state on-heap. >> Depending on the actual size of your state objects, the overhead may be >> relatively large or compared to the actual state size. >> For example, if you just keep integers in your state then overhead is >> probably a couple of times larger. >> It is not easy to estimate exactly on-heap size without through analysis. >> >> The checkpoint has little overhead and includes only actual state data - >> your serialized state objects which are probably smaller than their heap >> representation. >> >> So my guess is that the heap representation of the state is much bigger >> compared to the checkpoint size. >> >> I also cc other people who might add more thoughts about on-heap state >> size. >> >> You could also provide GC logs as Xintong suggested. >> >> Best, >> Andrey >> >> On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara >> wrote: >> >>> Hi Andrey and Xintong. 2.5 GB is from the flink web UI( >>> checkpoint size). I took a heap dump and I could not find any memory leak >>> from user code. I see the similar behaviour on smaller heap size, on a 1GB >>> heap , the state size from checkpoint UI is 180 MB. Attaching some >>> screenshots of heap profiles if it helps. So when the state grows GC takes >>> a long time and sometimes the job manager removes TM slot because of >>> 1ms timeout and tries to restore the task in another task manager, this >>> creates a cascading effect and affects other jobs running on the cluster. >>> My tests were run in a single node cluster with 1 TM and 4 task slots with >>> a parallelism of 4. >>> >>> Best, >>> Vishwas >>> >>> On Tue, Aug 25, 2020 at 10:02 AM Andrey Zagrebin >>> wrote: >>> Hi Vishwas, If you use Flink 1.7, check the older memory model docs [1] because you referred to the new memory model of Flink 1.10 in your reference 2. Could you also share a screenshot where you get the state size of 2.5 GB? Do you mean Flink WebUI? Generally, it is quite hard to estimate the on-heap size of state java objects. I never heard about such a Flink metric. Best, Andrey [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html On Mon, Aug 24, 2020 at 4:05 AM Xintong Song wrote: > Hi Vishwas, > > According to the log, heap space is 13+GB, which looks fine. > > Several reason might lead to the heap space OOM: > >- Memory leak >- Not enough GC threads >- Concurrent GC starts too late >- ... > > I would suggest taking a look at the GC logs. > > Thank you~ > > Xintong Song > > > > On Fri, Aug 21, 2020 at 10:34 PM Vishwas Siravara > wrote: > >> Hi guys, >> I use flink version 1.7.2 >> I have a stateful streaming job which uses a keyed process function. >> I use heap state backend. Although I set TM heap size to 16 GB, I get OOM >> error when the state size is around 2.5 GB(from dashboard I get the state >> size). I have set taskmanager.memory.fraction: 0.01 (which I believe is >> for >> native calls off heap). [1] . For an 8 GB TM heap setting , the OOM >> errors >> start showing up when the state size reaches 1 GB. This I find puzzling >> because I
Re: OOM error for heap state backend.
Hi Vishwas, I believe the screenshots are from a heap size of 1GB? There are indeed many internal Flink state objects. They are overhead which is required for Flink to organise and track the state on-heap. Depending on the actual size of your state objects, the overhead may be relatively large or compared to the actual state size. For example, if you just keep integers in your state then overhead is probably a couple of times larger. It is not easy to estimate exactly on-heap size without through analysis. The checkpoint has little overhead and includes only actual state data - your serialized state objects which are probably smaller than their heap representation. So my guess is that the heap representation of the state is much bigger compared to the checkpoint size. I also cc other people who might add more thoughts about on-heap state size. You could also provide GC logs as Xintong suggested. Best, Andrey On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara wrote: > Hi Andrey and Xintong. 2.5 GB is from the flink web UI( checkpoint size). > I took a heap dump and I could not find any memory leak from user code. I > see the similar behaviour on smaller heap size, on a 1GB heap , the state > size from checkpoint UI is 180 MB. Attaching some screenshots of heap > profiles if it helps. So when the state grows GC takes a long time and > sometimes the job manager removes TM slot because of 1ms timeout and > tries to restore the task in another task manager, this creates a cascading > effect and affects other jobs running on the cluster. My tests were run in > a single node cluster with 1 TM and 4 task slots with a parallelism of 4. > > Best, > Vishwas > > On Tue, Aug 25, 2020 at 10:02 AM Andrey Zagrebin > wrote: > >> Hi Vishwas, >> >> If you use Flink 1.7, check the older memory model docs [1] because you >> referred to the new memory model of Flink 1.10 in your reference 2. >> Could you also share a screenshot where you get the state size of 2.5 GB? >> Do you mean Flink WebUI? >> Generally, it is quite hard to estimate the on-heap size of state java >> objects. I never heard about such a Flink metric. >> >> Best, >> Andrey >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html >> >> On Mon, Aug 24, 2020 at 4:05 AM Xintong Song >> wrote: >> >>> Hi Vishwas, >>> >>> According to the log, heap space is 13+GB, which looks fine. >>> >>> Several reason might lead to the heap space OOM: >>> >>>- Memory leak >>>- Not enough GC threads >>>- Concurrent GC starts too late >>>- ... >>> >>> I would suggest taking a look at the GC logs. >>> >>> Thank you~ >>> >>> Xintong Song >>> >>> >>> >>> On Fri, Aug 21, 2020 at 10:34 PM Vishwas Siravara >>> wrote: >>> Hi guys, I use flink version 1.7.2 I have a stateful streaming job which uses a keyed process function. I use heap state backend. Although I set TM heap size to 16 GB, I get OOM error when the state size is around 2.5 GB(from dashboard I get the state size). I have set taskmanager.memory.fraction: 0.01 (which I believe is for native calls off heap). [1] . For an 8 GB TM heap setting , the OOM errors start showing up when the state size reaches 1 GB. This I find puzzling because I would expect to get a lot more space on the heap for state when I change the size to 16 GB, what fraction of the heap is used by the framework ?[2]. Below is the stack trace for the exception. How can I increase my state size on the heap ? 2020-08-21 02:05:54,443 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Memory usage stats: [HEAP: 11920/13653/13653 MB, NON HEAP: 130/154/-1 MB (used/committed/max)] 2020-08-21 02:05:54,444 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Direct memory stats: Count: 32796, Total Capacity: 1074692520, Used Memory: 1074692521 2020-08-21 02:05:54,444 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Off-heap pool stats: [Code Cache: 51/55/240 MB (used/committed/max)], [Metaspace: 70/88/-1 MB (used/committed/max)], [Compressed Class Space: 8/11/1024 MB (used/committed/max)] 2020-08-21 02:05:54,444 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Garbage collector stats: [PS Scavenge, GC TIME (ms): 481035, GC COUNT: 1770], [PS MarkSweep, GC TIME (ms): 8720945, GC COUNT: 265] 2020-08-21 02:05:54,446 INFO org.apache.flink.runtime.taskmanager.Task - KeyedProcess (1/4) (23946753549293edc23e88f257980cb4) switched from RUNNING to FAILED. java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newInstance(Array.java:75) at java.util.Arrays.copyOf(Arrays.java:3212) at java.util.Arrays.copyOf(Arrays.java:3181) at
Re: OOM error for heap state backend.
Hi Vishwas, If you use Flink 1.7, check the older memory model docs [1] because you referred to the new memory model of Flink 1.10 in your reference 2. Could you also share a screenshot where you get the state size of 2.5 GB? Do you mean Flink WebUI? Generally, it is quite hard to estimate the on-heap size of state java objects. I never heard about such a Flink metric. Best, Andrey [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html On Mon, Aug 24, 2020 at 4:05 AM Xintong Song wrote: > Hi Vishwas, > > According to the log, heap space is 13+GB, which looks fine. > > Several reason might lead to the heap space OOM: > >- Memory leak >- Not enough GC threads >- Concurrent GC starts too late >- ... > > I would suggest taking a look at the GC logs. > > Thank you~ > > Xintong Song > > > > On Fri, Aug 21, 2020 at 10:34 PM Vishwas Siravara > wrote: > >> Hi guys, >> I use flink version 1.7.2 >> I have a stateful streaming job which uses a keyed process function. I >> use heap state backend. Although I set TM heap size to 16 GB, I get OOM >> error when the state size is around 2.5 GB(from dashboard I get the state >> size). I have set taskmanager.memory.fraction: 0.01 (which I believe is for >> native calls off heap). [1] . For an 8 GB TM heap setting , the OOM errors >> start showing up when the state size reaches 1 GB. This I find puzzling >> because I would expect to get a lot more space on the heap for state when I >> change the size to 16 GB, what fraction of the heap is used by the >> framework ?[2]. Below is the stack trace for the exception. How can I >> increase my state size on the heap ? >> >> 2020-08-21 02:05:54,443 INFO >> org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Memory >> usage stats: [HEAP: 11920/13653/13653 MB, NON HEAP: 130/154/-1 MB >> (used/committed/max)] >> 2020-08-21 02:05:54,444 INFO >> org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Direct >> memory stats: Count: 32796, Total Capacity: 1074692520, Used Memory: >> 1074692521 >> 2020-08-21 02:05:54,444 INFO >> org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Off-heap >> pool stats: [Code Cache: 51/55/240 MB (used/committed/max)], [Metaspace: >> 70/88/-1 MB (used/committed/max)], [Compressed Class Space: 8/11/1024 MB >> (used/committed/max)] >> 2020-08-21 02:05:54,444 INFO >> org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Garbage >> collector stats: [PS Scavenge, GC TIME (ms): 481035, GC COUNT: 1770], [PS >> MarkSweep, GC TIME (ms): 8720945, GC COUNT: 265] >> 2020-08-21 02:05:54,446 INFO org.apache.flink.runtime.taskmanager.Task >> - KeyedProcess (1/4) (23946753549293edc23e88f257980cb4) >> switched from RUNNING to FAILED. >> java.lang.OutOfMemoryError: Java heap space >> at java.lang.reflect.Array.newInstance(Array.java:75) >> at java.util.Arrays.copyOf(Arrays.java:3212) >> at java.util.Arrays.copyOf(Arrays.java:3181) >> at >> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.resizeQueueArray(AbstractHeapPriorityQueue.java:153) >> at >> org.apache.flink.runtime.state.heap.HeapPriorityQueue.increaseSizeByOne(HeapPriorityQueue.java:172) >> at >> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:83) >> at >> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73) >> at >> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) >> at >> org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper.lambda$keyGroupReader$0(HeapPriorityQueueSnapshotRestoreWrapper.java:85) >> at >> org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper$$Lambda$229/674995813.consume(Unknown >> Source) >> at >> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:298) >> at >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492) >> at >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453) >> at >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410) >> at >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358) >> at >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104) >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) >> at >>
Re: OOM error for heap state backend.
Hi Vishwas, According to the log, heap space is 13+GB, which looks fine. Several reason might lead to the heap space OOM: - Memory leak - Not enough GC threads - Concurrent GC starts too late - ... I would suggest taking a look at the GC logs. Thank you~ Xintong Song On Fri, Aug 21, 2020 at 10:34 PM Vishwas Siravara wrote: > Hi guys, > I use flink version 1.7.2 > I have a stateful streaming job which uses a keyed process function. I use > heap state backend. Although I set TM heap size to 16 GB, I get OOM error > when the state size is around 2.5 GB(from dashboard I get the state size). > I have set taskmanager.memory.fraction: 0.01 (which I believe is for native > calls off heap). [1] . For an 8 GB TM heap setting , the OOM errors start > showing up when the state size reaches 1 GB. This I find puzzling because I > would expect to get a lot more space on the heap for state when I change > the size to 16 GB, what fraction of the heap is used by the framework ?[2]. > Below is the stack trace for the exception. How can I increase my state > size on the heap ? > > 2020-08-21 02:05:54,443 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Memory > usage stats: [HEAP: 11920/13653/13653 MB, NON HEAP: 130/154/-1 MB > (used/committed/max)] > 2020-08-21 02:05:54,444 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Direct > memory stats: Count: 32796, Total Capacity: 1074692520, Used Memory: > 1074692521 > 2020-08-21 02:05:54,444 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Off-heap > pool stats: [Code Cache: 51/55/240 MB (used/committed/max)], [Metaspace: > 70/88/-1 MB (used/committed/max)], [Compressed Class Space: 8/11/1024 MB > (used/committed/max)] > 2020-08-21 02:05:54,444 INFO > org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Garbage > collector stats: [PS Scavenge, GC TIME (ms): 481035, GC COUNT: 1770], [PS > MarkSweep, GC TIME (ms): 8720945, GC COUNT: 265] > 2020-08-21 02:05:54,446 INFO org.apache.flink.runtime.taskmanager.Task > - KeyedProcess (1/4) (23946753549293edc23e88f257980cb4) > switched from RUNNING to FAILED. > java.lang.OutOfMemoryError: Java heap space > at java.lang.reflect.Array.newInstance(Array.java:75) > at java.util.Arrays.copyOf(Arrays.java:3212) > at java.util.Arrays.copyOf(Arrays.java:3181) > at > org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.resizeQueueArray(AbstractHeapPriorityQueue.java:153) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.increaseSizeByOne(HeapPriorityQueue.java:172) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:83) > at > org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper.lambda$keyGroupReader$0(HeapPriorityQueueSnapshotRestoreWrapper.java:85) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper$$Lambda$229/674995813.consume(Unknown > Source) > at > org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:298) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at
OOM error for heap state backend.
Hi guys, I use flink version 1.7.2 I have a stateful streaming job which uses a keyed process function. I use heap state backend. Although I set TM heap size to 16 GB, I get OOM error when the state size is around 2.5 GB(from dashboard I get the state size). I have set taskmanager.memory.fraction: 0.01 (which I believe is for native calls off heap). [1] . For an 8 GB TM heap setting , the OOM errors start showing up when the state size reaches 1 GB. This I find puzzling because I would expect to get a lot more space on the heap for state when I change the size to 16 GB, what fraction of the heap is used by the framework ?[2]. Below is the stack trace for the exception. How can I increase my state size on the heap ? 2020-08-21 02:05:54,443 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Memory usage stats: [HEAP: 11920/13653/13653 MB, NON HEAP: 130/154/-1 MB (used/committed/max)] 2020-08-21 02:05:54,444 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Direct memory stats: Count: 32796, Total Capacity: 1074692520, Used Memory: 1074692521 2020-08-21 02:05:54,444 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Off-heap pool stats: [Code Cache: 51/55/240 MB (used/committed/max)], [Metaspace: 70/88/-1 MB (used/committed/max)], [Compressed Class Space: 8/11/1024 MB (used/committed/max)] 2020-08-21 02:05:54,444 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - Garbage collector stats: [PS Scavenge, GC TIME (ms): 481035, GC COUNT: 1770], [PS MarkSweep, GC TIME (ms): 8720945, GC COUNT: 265] 2020-08-21 02:05:54,446 INFO org.apache.flink.runtime.taskmanager.Task - KeyedProcess (1/4) (23946753549293edc23e88f257980cb4) switched from RUNNING to FAILED. java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newInstance(Array.java:75) at java.util.Arrays.copyOf(Arrays.java:3212) at java.util.Arrays.copyOf(Arrays.java:3181) at org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.resizeQueueArray(AbstractHeapPriorityQueue.java:153) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.increaseSizeByOne(HeapPriorityQueue.java:172) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:83) at org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper.lambda$keyGroupReader$0(HeapPriorityQueueSnapshotRestoreWrapper.java:85) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper$$Lambda$229/674995813.consume(Unknown Source) at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:298) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#overview Best, Vishwas