pnowojski commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r989031922


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java:
##########
@@ -569,6 +571,58 @@ public void snapshotState(
         }
     }
 
+    @Test
+    public void 
testChannelStateWriteResultLeakAndNotFailAfterCheckpointAborted() throws 
Exception {
+        String taskName = "test";
+        try (MockEnvironment mockEnvironment = 
MockEnvironment.builder().build();
+                ChannelStateWriterImpl writer =
+                        new ChannelStateWriterImpl(taskName, 0, 
getStreamFactoryFactory());
+                SubtaskCheckpointCoordinator coordinator =
+                        new SubtaskCheckpointCoordinatorImpl(
+                                new TestCheckpointStorageWorkerView(100),
+                                taskName,
+                                StreamTaskActionExecutor.IMMEDIATE,
+                                newDirectExecutorService(),
+                                new DummyEnvironment(),
+                                (unused1, unused2) -> {},
+                                (unused1, unused2) -> 
CompletableFuture.completedFuture(null),
+                                128,
+                                writer,
+                                true,
+                                (callable, duration) -> () -> {})) {
+            writer.open();
+            final OperatorChain<?, ?> operatorChain = 
getOperatorChain(mockEnvironment);
+            int checkpointId = 1;
+            // Abort checkpoint 1
+            coordinator.notifyCheckpointAborted(checkpointId, operatorChain, 
() -> true);
+
+            coordinator.initInputsCheckpoint(
+                    checkpointId,
+                    CheckpointOptions.unaligned(
+                            CheckpointType.CHECKPOINT,
+                            CheckpointStorageLocationReference.getDefault()));
+            ChannelStateWriter.ChannelStateWriteResult writeResult =
+                    writer.getWriteResult(checkpointId);
+            assertNotNull(writeResult);
+            assertFalse(writeResult.isDone());
+            
assertFalse(writeResult.getInputChannelStateHandles().isCompletedExceptionally());
+            
assertFalse(writeResult.getResultSubpartitionStateHandles().isCompletedExceptionally());
+
+            coordinator.checkpointState(
+                    new CheckpointMetaData(checkpointId, 
System.currentTimeMillis()),
+                    CheckpointOptions.forCheckpointWithDefaultLocation(),
+                    new CheckpointMetricsBuilder(),
+                    operatorChain,
+                    false,
+                    () -> true);
+            assertNull(writer.getWriteResult(checkpointId));
+            TimeUnit.MILLISECONDS.sleep(10);

Review Comment:
   One more thing. Can we get rid of this sleep? It's very likely it will be 
failing from time to time in the azure CI.
   
   It looks like we can replace it with
   ```
   writeResult.get();
   ```
   where
   
   ```
   @VisibleForTesting
   public void ChannelStateWriter#get() {
     inputChannelStateHandles.get();
     resultSubpartitionStateHandles.get();
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to