pnowojski commented on code in PR #20233: URL: https://github.com/apache/flink/pull/20233#discussion_r989031922
########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java: ########## @@ -569,6 +571,58 @@ public void snapshotState( } } + @Test + public void testChannelStateWriteResultLeakAndNotFailAfterCheckpointAborted() throws Exception { + String taskName = "test"; + try (MockEnvironment mockEnvironment = MockEnvironment.builder().build(); + ChannelStateWriterImpl writer = + new ChannelStateWriterImpl(taskName, 0, getStreamFactoryFactory()); + SubtaskCheckpointCoordinator coordinator = + new SubtaskCheckpointCoordinatorImpl( + new TestCheckpointStorageWorkerView(100), + taskName, + StreamTaskActionExecutor.IMMEDIATE, + newDirectExecutorService(), + new DummyEnvironment(), + (unused1, unused2) -> {}, + (unused1, unused2) -> CompletableFuture.completedFuture(null), + 128, + writer, + true, + (callable, duration) -> () -> {})) { + writer.open(); + final OperatorChain<?, ?> operatorChain = getOperatorChain(mockEnvironment); + int checkpointId = 1; + // Abort checkpoint 1 + coordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true); + + coordinator.initInputsCheckpoint( + checkpointId, + CheckpointOptions.unaligned( + CheckpointType.CHECKPOINT, + CheckpointStorageLocationReference.getDefault())); + ChannelStateWriter.ChannelStateWriteResult writeResult = + writer.getWriteResult(checkpointId); + assertNotNull(writeResult); + assertFalse(writeResult.isDone()); + assertFalse(writeResult.getInputChannelStateHandles().isCompletedExceptionally()); + assertFalse(writeResult.getResultSubpartitionStateHandles().isCompletedExceptionally()); + + coordinator.checkpointState( + new CheckpointMetaData(checkpointId, System.currentTimeMillis()), + CheckpointOptions.forCheckpointWithDefaultLocation(), + new CheckpointMetricsBuilder(), + operatorChain, + false, + () -> true); + assertNull(writer.getWriteResult(checkpointId)); + TimeUnit.MILLISECONDS.sleep(10); Review Comment: One more thing. Can we get rid of this sleep? It's very likely it will be failing from time to time in the azure CI. It looks like we can replace it with ``` writeResult.get(); ``` where ``` @VisibleForTesting public void ChannelStateWriter#get() { inputChannelStateHandles.get(); resultSubpartitionStateHandles.get(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org