pnowojski commented on a change in pull request #18112: URL: https://github.com/apache/flink/pull/18112#discussion_r772370496
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOperatorChain.java ########## @@ -99,5 +101,29 @@ public void snapshotState( Supplier<Boolean> isRunning, ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult, CheckpointStreamFactory storage) - throws Exception {} + throws Exception { + for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) { + StreamOperator<?> operator = operatorWrapper.getStreamOperator(); + + if (operator == getMainOperator() || operator == getTailOperator()) { + OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures(); + if (operator == getMainOperator()) { + snapshotInProgress.setInputChannelStateFuture( + channelStateWriteResult + .getInputChannelStateHandles() + .thenApply(StateObjectCollection::new) + .thenApply(SnapshotResult::of)); + } + if (operator == getTailOperator()) { + snapshotInProgress.setResultSubpartitionStateFuture( + channelStateWriteResult + .getResultSubpartitionStateHandles() + .thenApply(StateObjectCollection::new) + .thenApply(SnapshotResult::of)); + } Review comment: Could you refactor the code and deduplicate this with `RegularOperatorChain#buildOperatorSnapshotFutures`? For example this highlighted code could be extracted to a helper method in `OperatorChain` and used both in the `Regular` and `Finished` versions? ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java ########## @@ -840,6 +846,72 @@ public void testOperatorSkipLifeCycleIfFinishedOnRestore() throws Exception { } } + /** + * This test verifies for tasks that finished on restore, when taking unaligned checkpoint the + * asynchronous part would wait for the channel states futures get completed, which means the + * barriers are aligned. + */ + @Test + public void testWaitingForUnalignedChannelStatesIfFinishedOnRestore() throws Exception { + OperatorID operatorId = new OperatorID(); + try (StreamTaskMailboxTestHarness<String> harness = + new StreamTaskMailboxTestHarnessBuilder<>( + OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO) + .modifyStreamConfig( + streamConfig -> streamConfig.setUnalignedCheckpointsEnabled(true)) + .addInput(BasicTypeInfo.STRING_TYPE_INFO, 3) + .setCollectNetworkEvents() + .setTaskStateSnapshot(1, TaskStateSnapshot.FINISHED_ON_RESTORE) + .setupOperatorChain(new TestFinishedOnRestoreStreamOperator()) + .chain( + operatorId, + new TestFinishedOnRestoreStreamOperator(operatorId), + StringSerializer.INSTANCE) + .finish() + .build()) { + // Finish the restore, including state initialization and open. + harness.processAll(); + + TestCheckpointResponder checkpointResponder = harness.getCheckpointResponder(); + checkpointResponder.setAcknowledgeLatch(new OneShotLatch()); + checkpointResponder.setDeclinedLatch(new OneShotLatch()); + + CheckpointBarrier unalignedBarrier = + new CheckpointBarrier(2, 2, CheckpointOptions.unaligned(getDefault())); + + // On first unaligned barrier, the task would take snapshot and start the asynchronous + // part. We slightly extend the process to make the asynchronous part start executing + // before the other barriers arrived. + harness.processEvent(unalignedBarrier, 0, 0); + Thread.sleep(1000); + + // Finish the unaligned checkpoint. + harness.processEvent(unalignedBarrier, 0, 1); + harness.processEvent(unalignedBarrier, 0, 2); + + // Wait till the asynchronous part finished either normally or exceptionally. + Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10)); + while (deadline.hasTimeLeft()) { + if (checkpointResponder.getAcknowledgeLatch().isTriggered() + || checkpointResponder.getDeclinedLatch().isTriggered()) { + break; + } + Thread.sleep(1000); + } Review comment: nit: maybe use `CommonTestUtils#waitUntilCondition`? ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java ########## @@ -840,6 +846,72 @@ public void testOperatorSkipLifeCycleIfFinishedOnRestore() throws Exception { } } + /** + * This test verifies for tasks that finished on restore, when taking unaligned checkpoint the + * asynchronous part would wait for the channel states futures get completed, which means the + * barriers are aligned. + */ + @Test + public void testWaitingForUnalignedChannelStatesIfFinishedOnRestore() throws Exception { + OperatorID operatorId = new OperatorID(); + try (StreamTaskMailboxTestHarness<String> harness = + new StreamTaskMailboxTestHarnessBuilder<>( + OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO) + .modifyStreamConfig( + streamConfig -> streamConfig.setUnalignedCheckpointsEnabled(true)) + .addInput(BasicTypeInfo.STRING_TYPE_INFO, 3) + .setCollectNetworkEvents() + .setTaskStateSnapshot(1, TaskStateSnapshot.FINISHED_ON_RESTORE) + .setupOperatorChain(new TestFinishedOnRestoreStreamOperator()) + .chain( + operatorId, + new TestFinishedOnRestoreStreamOperator(operatorId), + StringSerializer.INSTANCE) + .finish() + .build()) { + // Finish the restore, including state initialization and open. + harness.processAll(); + + TestCheckpointResponder checkpointResponder = harness.getCheckpointResponder(); + checkpointResponder.setAcknowledgeLatch(new OneShotLatch()); + checkpointResponder.setDeclinedLatch(new OneShotLatch()); + + CheckpointBarrier unalignedBarrier = + new CheckpointBarrier(2, 2, CheckpointOptions.unaligned(getDefault())); + + // On first unaligned barrier, the task would take snapshot and start the asynchronous + // part. We slightly extend the process to make the asynchronous part start executing + // before the other barriers arrived. + harness.processEvent(unalignedBarrier, 0, 0); + Thread.sleep(1000); Review comment: maybe extract the `500` constant used in other unit tests in this file and use the same constant here as well? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java ########## @@ -119,10 +119,7 @@ public void run() { Review comment: Can you copy paste the PR description into the commit message? > This PR fixes the issue that when a task finished on restore taking unaligned checkpoints, currently the asynchronous part does not wait till the alignment of barriers get finished, thus throws exception due to some metrics are not set yet. + maybe copy/paste from the JIRA ticket ``` . The task received the first barrier. 2. With the process of unaligned checkpoint, the task would snapshot the state of the operators. 3. The checkpoint would start the asynchronous part. 4. Normally in the asynchronous part, it would wait till all the state futures get done, including the channel states and result partition states. With this method, it ensures the asynchronous part would wait till the last barrier arrived. But if the task has been fully finished before, these states are ignored and the assumption is broken. 5. Then the asynchronous part would fail since when it try to build the CheckpointMetrics, the alignment for this checkpoint is in fact not done yet. ``` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org