Re: OOM error for heap state backend.

2020-08-27 Thread Congxian Qiu
Hi
   The stack said that the job failed when restoring from
checkpoint/savepoint. If encounter this when in failover, maybe you can try
to find out the root cause which caused the job failover.
   For the stack, it because when restoring `HeapPriorityQueue`, there
would ensure there are enough size by resizeQueueArray[1](use Arrays.copy),
maybe this is the problem, could you please take heap dump when exit with
OOM?

[1]
https://github.com/apache/flink/blob/5e0b7970a9aea74aba4ebffaa75c37e960799b93/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapPriorityQueue.java#L151

Best,
Congxian


Robert Metzger  于2020年8月27日周四 下午10:59写道:

> Hi Vishwas,
>
> Your scenario sounds like RocksDB would actually be recommended. I would
> always suggest to start with RocksDB, unless your state is really small
> compared to the available memory, or you need to optimize for performance.
> But maybe your job is running fine with RocksDB (performance wise), then
> there's no need to go into the details of heap memory management with Flink.
>
>
>
> On Wed, Aug 26, 2020 at 7:21 PM Vishwas Siravara 
> wrote:
>
>> Thanks Andrey,
>> My question is related to
>>
>> The FsStateBackend is encouraged for:
>>
>>- Jobs with large state, long windows, large key/value states.
>>- All high-availability setups.
>>
>> How large is large state without any overhead added by the framework?
>>
>> Best,
>> Vishwas
>>
>> On Wed, Aug 26, 2020 at 12:10 PM Andrey Zagrebin 
>> wrote:
>>
>>> Hi Vishwas,
>>>
>>>  is this quantifiable with respect to JVM heap size on a single node
 without the node being used for other tasks ?
>>>
>>>
>>> I don't quite understand this question. I believe the recommendation in
>>> docs has the same reason: use larger state objects so that the Java object
>>> overhead pays off.
>>> RocksDB keeps state in memory and on disk in the serialized form.
>>> Therefore it usually has a smaller footprint.
>>> Other jobs in the same task manager can potentially use other state
>>> backend depending on their state requirements.
>>> All tasks in the same task manager share the JVM heap as the task
>>> manager runs one JVM system process on the machine where it is deployed to.
>>>
>>> Best,
>>> Andrey
>>>
>>> On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara 
>>> wrote:
>>>
 Hi Andrey,
 Thanks for getting back to me so quickly. The screenshots are for 1GB
 heap, the keys for the state are 20 character strings(20 bytes, we don't
 have multi byte characters) . So the overhead seems to be quite large(4x)
 even in comparison to the checkpoint size(which already adds an overhead) .
 In this document [1] it says use FS/Heap backend for large states, is this
 quantifiable with respect to JVM heap size on a single node without the
 node being used for other tasks ?
 I have attached GC log for TM and JM



 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend

 Best,
 Vishwas

 On Wed, Aug 26, 2020 at 11:29 AM Andrey Zagrebin 
 wrote:

> Hi Vishwas,
>
> I believe the screenshots are from a heap size of 1GB?
>
> There are indeed many internal Flink state objects. They are overhead
> which is required for Flink to organise and track the state on-heap.
> Depending on the actual size of your state objects, the overhead may
> be relatively large or compared to the actual state size.
> For example, if you just keep integers in your state then overhead is
> probably a couple of times larger.
> It is not easy to estimate exactly on-heap size without through
> analysis.
>
> The checkpoint has little overhead and includes only actual state data
> - your serialized state objects which are probably smaller than their heap
> representation.
>
> So my guess is that the heap representation of the state is much
> bigger compared to the checkpoint size.
>
> I also cc other people who might add more thoughts about on-heap state
> size.
>
> You could also provide GC logs as Xintong suggested.
>
> Best,
> Andrey
>
> On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara 
> wrote:
>
>> Hi Andrey and Xintong. 2.5 GB is from the flink web UI(
>> checkpoint size). I took a heap dump and I could not find any memory leak
>> from user code. I see the similar behaviour on smaller heap size, on a 
>> 1GB
>> heap , the state size from checkpoint UI is 180 MB. Attaching some
>> screenshots of heap profiles if it helps. So when the state grows GC 
>> takes
>> a long time and sometimes the job manager removes TM slot because of
>> 1ms timeout and tries to restore the task in another task manager, 
>> this
>> creates a cascading effect and affects other jobs running on the cluster.
>> My tests were run in a single node cluster with 1 TM 

Re: OOM error for heap state backend.

2020-08-27 Thread Robert Metzger
Hi Vishwas,

Your scenario sounds like RocksDB would actually be recommended. I would
always suggest to start with RocksDB, unless your state is really small
compared to the available memory, or you need to optimize for performance.
But maybe your job is running fine with RocksDB (performance wise), then
there's no need to go into the details of heap memory management with Flink.



On Wed, Aug 26, 2020 at 7:21 PM Vishwas Siravara 
wrote:

> Thanks Andrey,
> My question is related to
>
> The FsStateBackend is encouraged for:
>
>- Jobs with large state, long windows, large key/value states.
>- All high-availability setups.
>
> How large is large state without any overhead added by the framework?
>
> Best,
> Vishwas
>
> On Wed, Aug 26, 2020 at 12:10 PM Andrey Zagrebin 
> wrote:
>
>> Hi Vishwas,
>>
>>  is this quantifiable with respect to JVM heap size on a single node
>>> without the node being used for other tasks ?
>>
>>
>> I don't quite understand this question. I believe the recommendation in
>> docs has the same reason: use larger state objects so that the Java object
>> overhead pays off.
>> RocksDB keeps state in memory and on disk in the serialized form.
>> Therefore it usually has a smaller footprint.
>> Other jobs in the same task manager can potentially use other state
>> backend depending on their state requirements.
>> All tasks in the same task manager share the JVM heap as the task manager
>> runs one JVM system process on the machine where it is deployed to.
>>
>> Best,
>> Andrey
>>
>> On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara 
>> wrote:
>>
>>> Hi Andrey,
>>> Thanks for getting back to me so quickly. The screenshots are for 1GB
>>> heap, the keys for the state are 20 character strings(20 bytes, we don't
>>> have multi byte characters) . So the overhead seems to be quite large(4x)
>>> even in comparison to the checkpoint size(which already adds an overhead) .
>>> In this document [1] it says use FS/Heap backend for large states, is this
>>> quantifiable with respect to JVM heap size on a single node without the
>>> node being used for other tasks ?
>>> I have attached GC log for TM and JM
>>>
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend
>>>
>>> Best,
>>> Vishwas
>>>
>>> On Wed, Aug 26, 2020 at 11:29 AM Andrey Zagrebin 
>>> wrote:
>>>
 Hi Vishwas,

 I believe the screenshots are from a heap size of 1GB?

 There are indeed many internal Flink state objects. They are overhead
 which is required for Flink to organise and track the state on-heap.
 Depending on the actual size of your state objects, the overhead may be
 relatively large or compared to the actual state size.
 For example, if you just keep integers in your state then overhead is
 probably a couple of times larger.
 It is not easy to estimate exactly on-heap size without through
 analysis.

 The checkpoint has little overhead and includes only actual state data
 - your serialized state objects which are probably smaller than their heap
 representation.

 So my guess is that the heap representation of the state is much bigger
 compared to the checkpoint size.

 I also cc other people who might add more thoughts about on-heap state
 size.

 You could also provide GC logs as Xintong suggested.

 Best,
 Andrey

 On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara 
 wrote:

> Hi Andrey and Xintong. 2.5 GB is from the flink web UI(
> checkpoint size). I took a heap dump and I could not find any memory leak
> from user code. I see the similar behaviour on smaller heap size, on a 1GB
> heap , the state size from checkpoint UI is 180 MB. Attaching some
> screenshots of heap profiles if it helps. So when the state grows GC takes
> a long time and sometimes the job manager removes TM slot because of
> 1ms timeout and tries to restore the task in another task manager, 
> this
> creates a cascading effect and affects other jobs running on the cluster.
> My tests were run in a single node cluster with 1 TM and 4 task slots with
> a parallelism of 4.
>
> Best,
> Vishwas
>
> On Tue, Aug 25, 2020 at 10:02 AM Andrey Zagrebin 
> wrote:
>
>> Hi Vishwas,
>>
>> If you use Flink 1.7, check the older memory model docs [1] because
>> you referred to the new memory model of Flink 1.10 in your reference 2.
>> Could you also share a screenshot where you get the state size of 2.5
>> GB? Do you mean Flink WebUI?
>> Generally, it is quite hard to estimate the on-heap size of state
>> java objects. I never heard about such a Flink metric.
>>
>> Best,
>> Andrey
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html
>>
>> On Mon, Aug 24, 2020 at 4:05 AM Xintong Song 
>> wrote:

Re: OOM error for heap state backend.

2020-08-26 Thread Vishwas Siravara
Thanks Andrey,
My question is related to

The FsStateBackend is encouraged for:

   - Jobs with large state, long windows, large key/value states.
   - All high-availability setups.

How large is large state without any overhead added by the framework?

Best,
Vishwas

On Wed, Aug 26, 2020 at 12:10 PM Andrey Zagrebin 
wrote:

> Hi Vishwas,
>
>  is this quantifiable with respect to JVM heap size on a single node
>> without the node being used for other tasks ?
>
>
> I don't quite understand this question. I believe the recommendation in
> docs has the same reason: use larger state objects so that the Java object
> overhead pays off.
> RocksDB keeps state in memory and on disk in the serialized form.
> Therefore it usually has a smaller footprint.
> Other jobs in the same task manager can potentially use other state
> backend depending on their state requirements.
> All tasks in the same task manager share the JVM heap as the task manager
> runs one JVM system process on the machine where it is deployed to.
>
> Best,
> Andrey
>
> On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara 
> wrote:
>
>> Hi Andrey,
>> Thanks for getting back to me so quickly. The screenshots are for 1GB
>> heap, the keys for the state are 20 character strings(20 bytes, we don't
>> have multi byte characters) . So the overhead seems to be quite large(4x)
>> even in comparison to the checkpoint size(which already adds an overhead) .
>> In this document [1] it says use FS/Heap backend for large states, is this
>> quantifiable with respect to JVM heap size on a single node without the
>> node being used for other tasks ?
>> I have attached GC log for TM and JM
>>
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend
>>
>> Best,
>> Vishwas
>>
>> On Wed, Aug 26, 2020 at 11:29 AM Andrey Zagrebin 
>> wrote:
>>
>>> Hi Vishwas,
>>>
>>> I believe the screenshots are from a heap size of 1GB?
>>>
>>> There are indeed many internal Flink state objects. They are overhead
>>> which is required for Flink to organise and track the state on-heap.
>>> Depending on the actual size of your state objects, the overhead may be
>>> relatively large or compared to the actual state size.
>>> For example, if you just keep integers in your state then overhead is
>>> probably a couple of times larger.
>>> It is not easy to estimate exactly on-heap size without through analysis.
>>>
>>> The checkpoint has little overhead and includes only actual state data -
>>> your serialized state objects which are probably smaller than their heap
>>> representation.
>>>
>>> So my guess is that the heap representation of the state is much bigger
>>> compared to the checkpoint size.
>>>
>>> I also cc other people who might add more thoughts about on-heap state
>>> size.
>>>
>>> You could also provide GC logs as Xintong suggested.
>>>
>>> Best,
>>> Andrey
>>>
>>> On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara 
>>> wrote:
>>>
 Hi Andrey and Xintong. 2.5 GB is from the flink web UI(
 checkpoint size). I took a heap dump and I could not find any memory leak
 from user code. I see the similar behaviour on smaller heap size, on a 1GB
 heap , the state size from checkpoint UI is 180 MB. Attaching some
 screenshots of heap profiles if it helps. So when the state grows GC takes
 a long time and sometimes the job manager removes TM slot because of
 1ms timeout and tries to restore the task in another task manager, this
 creates a cascading effect and affects other jobs running on the cluster.
 My tests were run in a single node cluster with 1 TM and 4 task slots with
 a parallelism of 4.

 Best,
 Vishwas

 On Tue, Aug 25, 2020 at 10:02 AM Andrey Zagrebin 
 wrote:

> Hi Vishwas,
>
> If you use Flink 1.7, check the older memory model docs [1] because
> you referred to the new memory model of Flink 1.10 in your reference 2.
> Could you also share a screenshot where you get the state size of 2.5
> GB? Do you mean Flink WebUI?
> Generally, it is quite hard to estimate the on-heap size of state java
> objects. I never heard about such a Flink metric.
>
> Best,
> Andrey
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html
>
> On Mon, Aug 24, 2020 at 4:05 AM Xintong Song 
> wrote:
>
>> Hi Vishwas,
>>
>> According to the log, heap space is 13+GB, which looks fine.
>>
>> Several reason might lead to the heap space OOM:
>>
>>- Memory leak
>>- Not enough GC threads
>>- Concurrent GC starts too late
>>- ...
>>
>> I would suggest taking a look at the GC logs.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Fri, Aug 21, 2020 at 10:34 PM Vishwas Siravara <
>> vsirav...@gmail.com> wrote:
>>
>>> Hi guys,
>>> I use flink version 1.7.2
>>> I have a 

Re: OOM error for heap state backend.

2020-08-26 Thread Andrey Zagrebin
Hi Vishwas,

 is this quantifiable with respect to JVM heap size on a single node
> without the node being used for other tasks ?


I don't quite understand this question. I believe the recommendation in
docs has the same reason: use larger state objects so that the Java object
overhead pays off.
RocksDB keeps state in memory and on disk in the serialized form.
Therefore it usually has a smaller footprint.
Other jobs in the same task manager can potentially use other state backend
depending on their state requirements.
All tasks in the same task manager share the JVM heap as the task manager
runs one JVM system process on the machine where it is deployed to.

Best,
Andrey

On Wed, Aug 26, 2020 at 6:52 PM Vishwas Siravara 
wrote:

> Hi Andrey,
> Thanks for getting back to me so quickly. The screenshots are for 1GB
> heap, the keys for the state are 20 character strings(20 bytes, we don't
> have multi byte characters) . So the overhead seems to be quite large(4x)
> even in comparison to the checkpoint size(which already adds an overhead) .
> In this document [1] it says use FS/Heap backend for large states, is this
> quantifiable with respect to JVM heap size on a single node without the
> node being used for other tasks ?
> I have attached GC log for TM and JM
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend
>
> Best,
> Vishwas
>
> On Wed, Aug 26, 2020 at 11:29 AM Andrey Zagrebin 
> wrote:
>
>> Hi Vishwas,
>>
>> I believe the screenshots are from a heap size of 1GB?
>>
>> There are indeed many internal Flink state objects. They are overhead
>> which is required for Flink to organise and track the state on-heap.
>> Depending on the actual size of your state objects, the overhead may be
>> relatively large or compared to the actual state size.
>> For example, if you just keep integers in your state then overhead is
>> probably a couple of times larger.
>> It is not easy to estimate exactly on-heap size without through analysis.
>>
>> The checkpoint has little overhead and includes only actual state data -
>> your serialized state objects which are probably smaller than their heap
>> representation.
>>
>> So my guess is that the heap representation of the state is much bigger
>> compared to the checkpoint size.
>>
>> I also cc other people who might add more thoughts about on-heap state
>> size.
>>
>> You could also provide GC logs as Xintong suggested.
>>
>> Best,
>> Andrey
>>
>> On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara 
>> wrote:
>>
>>> Hi Andrey and Xintong. 2.5 GB is from the flink web UI(
>>> checkpoint size). I took a heap dump and I could not find any memory leak
>>> from user code. I see the similar behaviour on smaller heap size, on a 1GB
>>> heap , the state size from checkpoint UI is 180 MB. Attaching some
>>> screenshots of heap profiles if it helps. So when the state grows GC takes
>>> a long time and sometimes the job manager removes TM slot because of
>>> 1ms timeout and tries to restore the task in another task manager, this
>>> creates a cascading effect and affects other jobs running on the cluster.
>>> My tests were run in a single node cluster with 1 TM and 4 task slots with
>>> a parallelism of 4.
>>>
>>> Best,
>>> Vishwas
>>>
>>> On Tue, Aug 25, 2020 at 10:02 AM Andrey Zagrebin 
>>> wrote:
>>>
 Hi Vishwas,

 If you use Flink 1.7, check the older memory model docs [1] because you
 referred to the new memory model of Flink 1.10 in your reference 2.
 Could you also share a screenshot where you get the state size of 2.5
 GB? Do you mean Flink WebUI?
 Generally, it is quite hard to estimate the on-heap size of state java
 objects. I never heard about such a Flink metric.

 Best,
 Andrey

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html

 On Mon, Aug 24, 2020 at 4:05 AM Xintong Song 
 wrote:

> Hi Vishwas,
>
> According to the log, heap space is 13+GB, which looks fine.
>
> Several reason might lead to the heap space OOM:
>
>- Memory leak
>- Not enough GC threads
>- Concurrent GC starts too late
>- ...
>
> I would suggest taking a look at the GC logs.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Aug 21, 2020 at 10:34 PM Vishwas Siravara 
> wrote:
>
>> Hi guys,
>> I use flink version 1.7.2
>> I have a stateful streaming job which uses a keyed process function.
>> I use heap state backend. Although I set TM heap size to 16 GB, I get OOM
>> error when the state size is around 2.5 GB(from dashboard I get the state
>> size). I have set taskmanager.memory.fraction: 0.01 (which I believe is 
>> for
>> native calls off heap). [1] . For an 8 GB TM heap setting , the OOM 
>> errors
>> start showing up when the state size reaches 1 GB. This I find puzzling
>> because I 

Re: OOM error for heap state backend.

2020-08-26 Thread Andrey Zagrebin
Hi Vishwas,

I believe the screenshots are from a heap size of 1GB?

There are indeed many internal Flink state objects. They are overhead which
is required for Flink to organise and track the state on-heap.
Depending on the actual size of your state objects, the overhead may be
relatively large or compared to the actual state size.
For example, if you just keep integers in your state then overhead is
probably a couple of times larger.
It is not easy to estimate exactly on-heap size without through analysis.

The checkpoint has little overhead and includes only actual state data -
your serialized state objects which are probably smaller than their heap
representation.

So my guess is that the heap representation of the state is much bigger
compared to the checkpoint size.

I also cc other people who might add more thoughts about on-heap state size.

You could also provide GC logs as Xintong suggested.

Best,
Andrey

On Wed, Aug 26, 2020 at 4:21 PM Vishwas Siravara 
wrote:

> Hi Andrey and Xintong. 2.5 GB is from the flink web UI( checkpoint size).
> I took a heap dump and I could not find any memory leak from user code. I
> see the similar behaviour on smaller heap size, on a 1GB heap , the state
> size from checkpoint UI is 180 MB. Attaching some  screenshots of heap
> profiles if it helps. So when the state grows GC takes a long time and
> sometimes the job manager removes TM slot because of 1ms timeout and
> tries to restore the task in another task manager, this creates a cascading
> effect and affects other jobs running on the cluster. My tests were run in
> a single node cluster with 1 TM and 4 task slots with a parallelism of 4.
>
> Best,
> Vishwas
>
> On Tue, Aug 25, 2020 at 10:02 AM Andrey Zagrebin 
> wrote:
>
>> Hi Vishwas,
>>
>> If you use Flink 1.7, check the older memory model docs [1] because you
>> referred to the new memory model of Flink 1.10 in your reference 2.
>> Could you also share a screenshot where you get the state size of 2.5 GB?
>> Do you mean Flink WebUI?
>> Generally, it is quite hard to estimate the on-heap size of state java
>> objects. I never heard about such a Flink metric.
>>
>> Best,
>> Andrey
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html
>>
>> On Mon, Aug 24, 2020 at 4:05 AM Xintong Song 
>> wrote:
>>
>>> Hi Vishwas,
>>>
>>> According to the log, heap space is 13+GB, which looks fine.
>>>
>>> Several reason might lead to the heap space OOM:
>>>
>>>- Memory leak
>>>- Not enough GC threads
>>>- Concurrent GC starts too late
>>>- ...
>>>
>>> I would suggest taking a look at the GC logs.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Fri, Aug 21, 2020 at 10:34 PM Vishwas Siravara 
>>> wrote:
>>>
 Hi guys,
 I use flink version 1.7.2
 I have a stateful streaming job which uses a keyed process function. I
 use heap state backend. Although I set TM heap size to 16 GB, I get OOM
 error when the state size is around 2.5 GB(from dashboard I get the state
 size). I have set taskmanager.memory.fraction: 0.01 (which I believe is for
 native calls off heap). [1] . For an 8 GB TM heap setting , the OOM errors
 start showing up when the state size reaches 1 GB. This I find puzzling
 because I would expect to get a lot more space on the heap for state when I
 change the size to 16 GB, what fraction of the heap is used by the
 framework ?[2]. Below is the stack trace for the exception. How can I
 increase my state size on the heap ?

 2020-08-21 02:05:54,443 INFO
  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Memory
 usage stats: [HEAP: 11920/13653/13653 MB, NON HEAP: 130/154/-1 MB
 (used/committed/max)]
 2020-08-21 02:05:54,444 INFO
  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Direct
 memory stats: Count: 32796, Total Capacity: 1074692520, Used Memory:
 1074692521
 2020-08-21 02:05:54,444 INFO
  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Off-heap
 pool stats: [Code Cache: 51/55/240 MB (used/committed/max)], [Metaspace:
 70/88/-1 MB (used/committed/max)], [Compressed Class Space: 8/11/1024 MB
 (used/committed/max)]
 2020-08-21 02:05:54,444 INFO
  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Garbage
 collector stats: [PS Scavenge, GC TIME (ms): 481035, GC COUNT: 1770], [PS
 MarkSweep, GC TIME (ms): 8720945, GC COUNT: 265]
 2020-08-21 02:05:54,446 INFO  org.apache.flink.runtime.taskmanager.Task
 - KeyedProcess (1/4) (23946753549293edc23e88f257980cb4)
 switched from RUNNING to FAILED.
 java.lang.OutOfMemoryError: Java heap space
 at java.lang.reflect.Array.newInstance(Array.java:75)
 at java.util.Arrays.copyOf(Arrays.java:3212)
 at java.util.Arrays.copyOf(Arrays.java:3181)
 at
 

Re: OOM error for heap state backend.

2020-08-25 Thread Andrey Zagrebin
Hi Vishwas,

If you use Flink 1.7, check the older memory model docs [1] because you
referred to the new memory model of Flink 1.10 in your reference 2.
Could you also share a screenshot where you get the state size of 2.5 GB?
Do you mean Flink WebUI?
Generally, it is quite hard to estimate the on-heap size of state java
objects. I never heard about such a Flink metric.

Best,
Andrey

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html

On Mon, Aug 24, 2020 at 4:05 AM Xintong Song  wrote:

> Hi Vishwas,
>
> According to the log, heap space is 13+GB, which looks fine.
>
> Several reason might lead to the heap space OOM:
>
>- Memory leak
>- Not enough GC threads
>- Concurrent GC starts too late
>- ...
>
> I would suggest taking a look at the GC logs.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Aug 21, 2020 at 10:34 PM Vishwas Siravara 
> wrote:
>
>> Hi guys,
>> I use flink version 1.7.2
>> I have a stateful streaming job which uses a keyed process function. I
>> use heap state backend. Although I set TM heap size to 16 GB, I get OOM
>> error when the state size is around 2.5 GB(from dashboard I get the state
>> size). I have set taskmanager.memory.fraction: 0.01 (which I believe is for
>> native calls off heap). [1] . For an 8 GB TM heap setting , the OOM errors
>> start showing up when the state size reaches 1 GB. This I find puzzling
>> because I would expect to get a lot more space on the heap for state when I
>> change the size to 16 GB, what fraction of the heap is used by the
>> framework ?[2]. Below is the stack trace for the exception. How can I
>> increase my state size on the heap ?
>>
>> 2020-08-21 02:05:54,443 INFO
>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Memory
>> usage stats: [HEAP: 11920/13653/13653 MB, NON HEAP: 130/154/-1 MB
>> (used/committed/max)]
>> 2020-08-21 02:05:54,444 INFO
>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Direct
>> memory stats: Count: 32796, Total Capacity: 1074692520, Used Memory:
>> 1074692521
>> 2020-08-21 02:05:54,444 INFO
>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Off-heap
>> pool stats: [Code Cache: 51/55/240 MB (used/committed/max)], [Metaspace:
>> 70/88/-1 MB (used/committed/max)], [Compressed Class Space: 8/11/1024 MB
>> (used/committed/max)]
>> 2020-08-21 02:05:54,444 INFO
>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Garbage
>> collector stats: [PS Scavenge, GC TIME (ms): 481035, GC COUNT: 1770], [PS
>> MarkSweep, GC TIME (ms): 8720945, GC COUNT: 265]
>> 2020-08-21 02:05:54,446 INFO  org.apache.flink.runtime.taskmanager.Task
>>   - KeyedProcess (1/4) (23946753549293edc23e88f257980cb4)
>> switched from RUNNING to FAILED.
>> java.lang.OutOfMemoryError: Java heap space
>> at java.lang.reflect.Array.newInstance(Array.java:75)
>> at java.util.Arrays.copyOf(Arrays.java:3212)
>> at java.util.Arrays.copyOf(Arrays.java:3181)
>> at
>> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.resizeQueueArray(AbstractHeapPriorityQueue.java:153)
>> at
>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.increaseSizeByOne(HeapPriorityQueue.java:172)
>> at
>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:83)
>> at
>> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
>> at
>> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
>> at
>> org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper.lambda$keyGroupReader$0(HeapPriorityQueueSnapshotRestoreWrapper.java:85)
>> at
>> org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper$$Lambda$229/674995813.consume(Unknown
>> Source)
>> at
>> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:298)
>> at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492)
>> at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453)
>> at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410)
>> at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358)
>> at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104)
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>> at
>> 

Re: OOM error for heap state backend.

2020-08-23 Thread Xintong Song
Hi Vishwas,

According to the log, heap space is 13+GB, which looks fine.

Several reason might lead to the heap space OOM:

   - Memory leak
   - Not enough GC threads
   - Concurrent GC starts too late
   - ...

I would suggest taking a look at the GC logs.

Thank you~

Xintong Song



On Fri, Aug 21, 2020 at 10:34 PM Vishwas Siravara 
wrote:

> Hi guys,
> I use flink version 1.7.2
> I have a stateful streaming job which uses a keyed process function. I use
> heap state backend. Although I set TM heap size to 16 GB, I get OOM error
> when the state size is around 2.5 GB(from dashboard I get the state size).
> I have set taskmanager.memory.fraction: 0.01 (which I believe is for native
> calls off heap). [1] . For an 8 GB TM heap setting , the OOM errors start
> showing up when the state size reaches 1 GB. This I find puzzling because I
> would expect to get a lot more space on the heap for state when I change
> the size to 16 GB, what fraction of the heap is used by the framework ?[2].
> Below is the stack trace for the exception. How can I increase my state
> size on the heap ?
>
> 2020-08-21 02:05:54,443 INFO
>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Memory
> usage stats: [HEAP: 11920/13653/13653 MB, NON HEAP: 130/154/-1 MB
> (used/committed/max)]
> 2020-08-21 02:05:54,444 INFO
>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Direct
> memory stats: Count: 32796, Total Capacity: 1074692520, Used Memory:
> 1074692521
> 2020-08-21 02:05:54,444 INFO
>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Off-heap
> pool stats: [Code Cache: 51/55/240 MB (used/committed/max)], [Metaspace:
> 70/88/-1 MB (used/committed/max)], [Compressed Class Space: 8/11/1024 MB
> (used/committed/max)]
> 2020-08-21 02:05:54,444 INFO
>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Garbage
> collector stats: [PS Scavenge, GC TIME (ms): 481035, GC COUNT: 1770], [PS
> MarkSweep, GC TIME (ms): 8720945, GC COUNT: 265]
> 2020-08-21 02:05:54,446 INFO  org.apache.flink.runtime.taskmanager.Task
>   - KeyedProcess (1/4) (23946753549293edc23e88f257980cb4)
> switched from RUNNING to FAILED.
> java.lang.OutOfMemoryError: Java heap space
> at java.lang.reflect.Array.newInstance(Array.java:75)
> at java.util.Arrays.copyOf(Arrays.java:3212)
> at java.util.Arrays.copyOf(Arrays.java:3181)
> at
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.resizeQueueArray(AbstractHeapPriorityQueue.java:153)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.increaseSizeByOne(HeapPriorityQueue.java:172)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:83)
> at
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper.lambda$keyGroupReader$0(HeapPriorityQueueSnapshotRestoreWrapper.java:85)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper$$Lambda$229/674995813.consume(Unknown
> Source)
> at
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:298)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> at 

OOM error for heap state backend.

2020-08-21 Thread Vishwas Siravara
Hi guys,
I use flink version 1.7.2
I have a stateful streaming job which uses a keyed process function. I use
heap state backend. Although I set TM heap size to 16 GB, I get OOM error
when the state size is around 2.5 GB(from dashboard I get the state size).
I have set taskmanager.memory.fraction: 0.01 (which I believe is for native
calls off heap). [1] . For an 8 GB TM heap setting , the OOM errors start
showing up when the state size reaches 1 GB. This I find puzzling because I
would expect to get a lot more space on the heap for state when I change
the size to 16 GB, what fraction of the heap is used by the framework ?[2].
Below is the stack trace for the exception. How can I increase my state
size on the heap ?

2020-08-21 02:05:54,443 INFO
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Memory
usage stats: [HEAP: 11920/13653/13653 MB, NON HEAP: 130/154/-1 MB
(used/committed/max)]
2020-08-21 02:05:54,444 INFO
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Direct
memory stats: Count: 32796, Total Capacity: 1074692520, Used Memory:
1074692521
2020-08-21 02:05:54,444 INFO
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Off-heap
pool stats: [Code Cache: 51/55/240 MB (used/committed/max)], [Metaspace:
70/88/-1 MB (used/committed/max)], [Compressed Class Space: 8/11/1024 MB
(used/committed/max)]
2020-08-21 02:05:54,444 INFO
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Garbage
collector stats: [PS Scavenge, GC TIME (ms): 481035, GC COUNT: 1770], [PS
MarkSweep, GC TIME (ms): 8720945, GC COUNT: 265]
2020-08-21 02:05:54,446 INFO  org.apache.flink.runtime.taskmanager.Task
- KeyedProcess (1/4) (23946753549293edc23e88f257980cb4)
switched from RUNNING to FAILED.
java.lang.OutOfMemoryError: Java heap space
at java.lang.reflect.Array.newInstance(Array.java:75)
at java.util.Arrays.copyOf(Arrays.java:3212)
at java.util.Arrays.copyOf(Arrays.java:3181)
at
org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.resizeQueueArray(AbstractHeapPriorityQueue.java:153)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueue.increaseSizeByOne(HeapPriorityQueue.java:172)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:83)
at
org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper.lambda$keyGroupReader$0(HeapPriorityQueueSnapshotRestoreWrapper.java:85)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper$$Lambda$229/674995813.consume(Unknown
Source)
at
org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:298)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)



[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-fsstatebackend
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#overview

Best,
Vishwas