akalash commented on a change in pull request #16844: URL: https://github.com/apache/flink/pull/16844#discussion_r690361139
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ########## @@ -972,24 +972,28 @@ private void releaseResources() { for (ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) { taskEventDispatcher.unregisterPartition(partitionWriter.getPartitionId()); - if (isCanceledOrFailed()) { - partitionWriter.fail(getFailureCause()); - } } - closeNetworkResources(); + if (isCanceledOrFailed()) { + failAllResultPartitions(); + } + closeAllResultPartitions(); Review comment: According to this code, we always should call `close` even after `fail` was called. But according to lamda which we pass to `TaskCanceler` we call `fail` without calling the `close`. Which way is more correct? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ########## @@ -250,15 +250,20 @@ public void release(Throwable cause) { /** Releases all produced data including both those stored in memory and persisted on disk. */ protected abstract void releaseInternal(); - @Override - public void close() { + private void closeBufferPool() { if (bufferPool != null) { bufferPool.lazyDestroy(); } } + @Override + public void close() { + closeBufferPool(); + } + @Override public void fail(@Nullable Throwable throwable) { + closeBufferPool(); Review comment: I don't fully understand the contract here. When ResultParitition fails should we close it or not? According to your code, we only close the buffer pool but don't close the result partition in general. I am afraid that it can lead to another leak if tomorrow we will add something to `close` then `fail` method closes the buffer pool but won't do another things related to `close`. Perhaps, it is not the discussion for now but maybe it is better to rename `fail` to `close` then we will have more clear contract - `close` and `close(Throwable)` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java ########## @@ -345,48 +345,39 @@ private void testAddOnPartition(final ResultPartitionType partitionType) throws @Test public void testReleaseMemoryOnPipelinedPartition() throws Exception { - testReleaseMemory(ResultPartitionType.PIPELINED); + testReleaseMemory(); Review comment: inline this method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org