[ https://issues.apache.org/jira/browse/FLINK-10367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643673#comment-16643673 ]
Nico Kruber commented on FLINK-10367: ------------------------------------- As a rough sketch for option 3 (maybe this can be optimised): {code} public enum NotificationResult { NONE(false, false), BUFFER_USED_FINISHED(true, false), BUFFER_USED_NEED_MORE(true, true); private final boolean bufferUsed; private final boolean needsMoreBuffers; NotificationResult(boolean bufferUsed, boolean needsMoreBuffers) { this.bufferUsed = bufferUsed; this.needsMoreBuffers = needsMoreBuffers; } public boolean bufferUsed() { return bufferUsed; } public boolean needsMoreBuffers() { return needsMoreBuffers; } } @Override public void recycle(MemorySegment segment) { BufferListener listener; NetworkBuffer buffer; NotificationResult notificationResult = NotificationResult.NONE; // some enum while (notificationResult.bufferUsed()) { synchronized (availableMemorySegments) { if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) { returnMemorySegment(segment); return; } else { listener = registeredListeners.poll(); if (listener == null) { availableMemorySegments.add(segment); availableMemorySegments.notify(); return; } } } // We do not know which locks have been acquired before the recycle() or are needed in the // notification and which other threads also access them. // -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676) // Note that in case of any exceptions notifyBufferAvailable() should recycle the buffer // (either directly or later during error handling) and therefore eventually end up in this // method again. if (buffer == null) { buffer = new NetworkBuffer(segment, this); } notificationResult = listener.notifyBufferAvailable(buffer); if (notificationResult.needsMoreBuffers()) { synchronized (availableMemorySegments) { if (isDestroyed) { // cleanup tasks how they would have been done if we only had one synchronized block listener.notifyBufferDestroyed(); } else { registeredListeners.add(listener); } } } } } {code} Not really nice, but maybe better than adding code that ignores exceptions in particular cases... > Avoid recursion stack overflow during releasing SingleInputGate > --------------------------------------------------------------- > > Key: FLINK-10367 > URL: https://issues.apache.org/jira/browse/FLINK-10367 > Project: Flink > Issue Type: Improvement > Components: Network > Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0 > Reporter: zhijiang > Assignee: zhijiang > Priority: Minor > > For task failure or canceling, the {{SingleInputGate#releaseAllResources}} > will be invoked before task exits. > In the process of {{SingleInputGate#releaseAllResources}}, we first loop to > release all the input channels, then destroy the {{BufferPool}}. For > {{RemoteInputChannel#releaseAllResources}}, it will return floating buffers > to the {{BufferPool}} {{which assigns this recycled buffer to the other > listeners(RemoteInputChannel}}). > It may exist recursive call in this process. If the listener is already > released before, it will directly recycle this buffer to the {{BufferPool}} > which takes another listener to notify available buffer. The above process > may be invoked repeatedly in recursive way. > If there are many input channels as listeners in the {{BufferPool}}, it will > cause {{StackOverflow}} error because of recursion. And in our testing job, > the scale of 10,000 input channels ever caused this error. > I think of two ways for solving this potential problem: > # When the input channel is released, it should notify the {{BufferPool}} of > unregistering this listener, otherwise it is inconsistent between them. > # {{SingleInputGate}} should destroy the {{BufferPool}} first, then loop to > release all the internal input channels. To do so, all the listeners in > {{BufferPool}} will be removed during destroying, and the input channel will > not have further interactions during > {{RemoteInputChannel#releaseAllResources}}. > I prefer the second way to solve this problem, because we do not want to > expand another interface method for removing buffer listener, further > currently the internal data structure in {{BufferPool}} can not support > remove a listener directly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)