akalash commented on a change in pull request #16844:
URL: https://github.com/apache/flink/pull/16844#discussion_r690361139



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -972,24 +972,28 @@ private void releaseResources() {
 
         for (ResultPartitionWriter partitionWriter : 
consumableNotifyingPartitionWriters) {
             
taskEventDispatcher.unregisterPartition(partitionWriter.getPartitionId());
-            if (isCanceledOrFailed()) {
-                partitionWriter.fail(getFailureCause());
-            }
         }
 
-        closeNetworkResources();
+        if (isCanceledOrFailed()) {
+            failAllResultPartitions();
+        }
+        closeAllResultPartitions();

Review comment:
       According to this code, we always should call `close` even after `fail` 
was called. But according to lamda which we pass to `TaskCanceler` we call 
`fail` without calling the `close`. Which way is more correct?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
##########
@@ -250,15 +250,20 @@ public void release(Throwable cause) {
     /** Releases all produced data including both those stored in memory and 
persisted on disk. */
     protected abstract void releaseInternal();
 
-    @Override
-    public void close() {
+    private void closeBufferPool() {
         if (bufferPool != null) {
             bufferPool.lazyDestroy();
         }
     }
 
+    @Override
+    public void close() {
+        closeBufferPool();
+    }
+
     @Override
     public void fail(@Nullable Throwable throwable) {
+        closeBufferPool();

Review comment:
       I don't fully understand the contract here. When ResultParitition fails 
should we close it or not? According to your code, we only close the buffer 
pool but don't close the result partition in general. I am afraid that it can 
lead to another leak if tomorrow we will add something to `close` then `fail` 
method closes the buffer pool but won't do another things related to `close`. 
   Perhaps, it is not the discussion for now but maybe it is better to rename 
`fail` to `close` then we will have more clear contract - `close` and 
`close(Throwable)`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
##########
@@ -345,48 +345,39 @@ private void testAddOnPartition(final ResultPartitionType 
partitionType) throws
 
     @Test
     public void testReleaseMemoryOnPipelinedPartition() throws Exception {
-        testReleaseMemory(ResultPartitionType.PIPELINED);
+        testReleaseMemory();

Review comment:
       inline this method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to