[ 
https://issues.apache.org/jira/browse/FLINK-10367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643673#comment-16643673
 ] 

Nico Kruber commented on FLINK-10367:
-------------------------------------

As a rough sketch for option 3 (maybe this can be optimised):
{code}
        public enum NotificationResult {
                NONE(false, false),
                BUFFER_USED_FINISHED(true, false),
                BUFFER_USED_NEED_MORE(true, true);

                private final boolean bufferUsed;
                private final boolean needsMoreBuffers;

                NotificationResult(boolean bufferUsed, boolean 
needsMoreBuffers) {
                        this.bufferUsed = bufferUsed;
                        this.needsMoreBuffers = needsMoreBuffers;
                }

                public boolean bufferUsed() {
                        return bufferUsed;
                }

                public boolean needsMoreBuffers() {
                        return needsMoreBuffers;
                }
        }

        @Override
        public void recycle(MemorySegment segment) {
                BufferListener listener;
                NetworkBuffer buffer;
                NotificationResult notificationResult = 
NotificationResult.NONE; // some enum
                while (notificationResult.bufferUsed()) {
                        synchronized (availableMemorySegments) {
                                if (isDestroyed || 
numberOfRequestedMemorySegments > currentPoolSize) {
                                        returnMemorySegment(segment);
                                        return;
                                } else {
                                        listener = registeredListeners.poll();

                                        if (listener == null) {
                                                
availableMemorySegments.add(segment);
                                                
availableMemorySegments.notify();
                                                return;
                                        }
                                }
                        }

                        // We do not know which locks have been acquired before 
the recycle() or are needed in the
                        // notification and which other threads also access 
them.
                        // -> call notifyBufferAvailable() outside of the 
synchronized block to avoid a deadlock (FLINK-9676)
                        // Note that in case of any exceptions 
notifyBufferAvailable() should recycle the buffer
                        // (either directly or later during error handling) and 
therefore eventually end up in this
                        // method again.
                        if (buffer == null) {
                                buffer = new NetworkBuffer(segment, this);
                        }
                        notificationResult = 
listener.notifyBufferAvailable(buffer);

                        if (notificationResult.needsMoreBuffers()) {
                                synchronized (availableMemorySegments) {
                                        if (isDestroyed) {
                                                // cleanup tasks how they would 
have been done if we only had one synchronized block
                                                
listener.notifyBufferDestroyed();
                                        } else {
                                                
registeredListeners.add(listener);
                                        }
                                }
                        }
                }
        }
{code}

Not really nice, but maybe better than adding code that ignores exceptions in 
particular cases...

> Avoid recursion stack overflow during releasing SingleInputGate
> ---------------------------------------------------------------
>
>                 Key: FLINK-10367
>                 URL: https://issues.apache.org/jira/browse/FLINK-10367
>             Project: Flink
>          Issue Type: Improvement
>          Components: Network
>    Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0
>            Reporter: zhijiang
>            Assignee: zhijiang
>            Priority: Minor
>
> For task failure or canceling, the {{SingleInputGate#releaseAllResources}} 
> will be invoked before task exits.
> In the process of {{SingleInputGate#releaseAllResources}}, we first loop to 
> release all the input channels, then destroy the {{BufferPool}}.  For 
> {{RemoteInputChannel#releaseAllResources}}, it will return floating buffers 
> to the {{BufferPool}} {{which assigns this recycled buffer to the other 
> listeners(RemoteInputChannel}}). 
> It may exist recursive call in this process. If the listener is already 
> released before, it will directly recycle this buffer to the {{BufferPool}} 
> which takes another listener to notify available buffer. The above process 
> may be invoked repeatedly in recursive way.
> If there are many input channels as listeners in the {{BufferPool}}, it will 
> cause {{StackOverflow}} error because of recursion. And in our testing job, 
> the scale of 10,000 input channels ever caused this error.
> I think of two ways for solving this potential problem:
>  # When the input channel is released, it should notify the {{BufferPool}} of 
> unregistering this listener, otherwise it is inconsistent between them.
>  # {{SingleInputGate}} should destroy the {{BufferPool}} first, then loop to 
> release all the internal input channels. To do so, all the listeners in 
> {{BufferPool}} will be removed during destroying, and the input channel will 
> not have further interactions during 
> {{RemoteInputChannel#releaseAllResources}}.
> I prefer the second way to solve this problem, because we do not want to 
> expand another interface method for removing buffer listener, further 
> currently the internal data structure in {{BufferPool}} can not support 
> remove a listener directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to