[ 
https://issues.apache.org/jira/browse/FLINK-21552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17293418#comment-17293418
 ] 

Xintong Song commented on FLINK-21552:
--------------------------------------

Thanks [~dian.fu],

I think the problem in the following snippet from 
{{MemoryManager#getSharedMemoryResourceForManagedMemory}}. We probably should 
unreserve the budget if {{initializer.apply}} fail with an error.

{code:java}
final LongFunctionWithException<T, Exception> reserveAndInitialize =
        (size) -> {
            try {
                reserveMemory(type, size);
            } catch (MemoryReservationException e) {
                throw new MemoryAllocationException(
                        "Could not created the shared memory resource of size "
                                + size
                                + ". Not enough memory left to reserve from the 
slot's managed memory.",
                        e);
            }

            return initializer.apply(size);
        };
{code}

cc [~sewen]

> The managed memory was not released if exception was thrown in 
> createPythonExecutionEnvironment
> -----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21552
>                 URL: https://issues.apache.org/jira/browse/FLINK-21552
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.12.0
>            Reporter: Dian Fu
>            Assignee: Dian Fu
>            Priority: Critical
>             Fix For: 1.13.0, 1.12.3
>
>
> If there is exception thrown in createPythonExecutionEnvironment, the job 
> will failed with the following exception:
> {code}
> org.apache.flink.runtime.memory.MemoryAllocationException: Could not created 
> the shared memory resource of size 611948962. Not enough memory left to 
> reserve from the slot's managed memory.
> at 
> org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:536)
> at 
> org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:126)
> at 
> org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
> at 
> org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:555)
> at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:250)
> at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:113)
> at 
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:116)
> at 
> org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:88)
> at 
> org.apache.flink.table.runtime.operators.python.scalar.AbstractRowDataPythonScalarFunctionOperator.open(AbstractRowDataPythonScalarFunctionOperator.java:70)
> at 
> org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.open(RowDataPythonScalarFunctionOperator.java:59)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:428)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.lang.Thread.run(Thread.java:834)
> Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could 
> not allocate 611948962 bytes, only 0 bytes are remaining. This usually 
> indicates that you are requesting more memory than you have reserved. 
> However, when running an old JVM version it can also be caused by slow 
> garbage collection. Try to upgrade to Java 8u72 or higher if running on an 
> old Java version.
> at 
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:170)
> at 
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:84)
> at 
> org.apache.flink.runtime.memory.MemoryManager.reserveMemory(MemoryManager.java:423)
> at 
> org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:534)
> ... 17 more
> {code}
> The reason is that the reserved managed memory was not added back to the 
> MemoryManager when Job failed because of exceptions thrown in 
> createPythonExecutionEnvironment. This causes that there is no managed memory 
> to allocate during failover.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to