[ 
https://issues.apache.org/jira/browse/FLINK-20663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17273247#comment-17273247
 ] 

Kurt Young commented on FLINK-20663:
------------------------------------

IIRC in the past, users of managed memory (operators, data structures) are 
required to know the contract when using memory segment, which is when they 
release the memory segments, they can just return the object back to memory 
manager, and don't access it ever again. 

And {{HybridMemorySegment}} and {{MemorySegment also provided some protection 
after being released in their free() method. The only exception is wrap() 
method, which caused FLINK-14894. But wrap() method is not that popular, and 
half of them are used by network buffer which is not even managed (they are 
unpooled segments). Other than this, all other methods are safe to use and will 
raise exception if being misused. }}

So my thought is, instead of relying on such inefficient and complex way (Full 
GC) to protect only one usage about memory segment, we can change this to much 
simpler way (like the old behaviors) and we pay enough attention of the wrap() 
method, e.g. adding some comments or find some way to protect the wrapped 
ByteBuffer. 

> Managed memory may not be released in time when operators use managed memory 
> frequently
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-20663
>                 URL: https://issues.apache.org/jira/browse/FLINK-20663
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.12.0
>            Reporter: Caizhi Weng
>            Priority: Major
>             Fix For: 1.12.2
>
>
> Some batch operators (like sort merge join or hash aggregate) use managed 
> memory frequently. When these operators are chained together and the cluster 
> load is a bit heavy, it is very likely that the following exception occurs:
> {code:java}
> 2020-12-18 10:04:32
> java.lang.RuntimeException: 
> org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate 
> 512 pages
>       at 
> org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:85)
>       at 
> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:49)
>       at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.<init>(BytesHashMap.java:297)
>       at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:103)
>       at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:90)
>       at LocalHashAggregateWithKeys$209161.open(Unknown Source)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:506)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>       at java.lang.Thread.run(Thread.java:834)
>       Suppressed: java.lang.NullPointerException
>               at LocalHashAggregateWithKeys$209161.close(Unknown Source)
>               at 
> org.apache.flink.table.runtime.operators.TableStreamOperator.dispose(TableStreamOperator.java:46)
>               at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:739)
>               at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:719)
>               at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:642)
>               at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:551)
>               ... 3 more
>               Suppressed: java.lang.NullPointerException
>                       at LocalHashAggregateWithKeys$209766.close(Unknown 
> Source)
>                       ... 8 more
> Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could 
> not allocate 512 pages
>       at 
> org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:231)
>       at 
> org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:83)
>       ... 13 more
> Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could 
> not allocate 16777216 bytes, only 9961487 bytes are remaining. This usually 
> indicates that you are requesting more memory than you have reserved. 
> However, when running an old JVM version it can also be caused by slow 
> garbage collection. Try to upgrade to Java 8u72 or higher if running on an 
> old Java version.
>       at 
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:164)
>       at 
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:80)
>       at 
> org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:229)
>       ... 14 more
> {code}
> It seems that this is caused by relying on GC to release managed memory, as 
> {{System.gc()}} may not trigger GC in time. See {{UnsafeMemoryBudget.java}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to