pnowojski commented on a change in pull request #18112:
URL: https://github.com/apache/flink/pull/18112#discussion_r772370496



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOperatorChain.java
##########
@@ -99,5 +101,29 @@ public void snapshotState(
             Supplier<Boolean> isRunning,
             ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult,
             CheckpointStreamFactory storage)
-            throws Exception {}
+            throws Exception {
+        for (StreamOperatorWrapper<?, ?> operatorWrapper : 
getAllOperators(true)) {
+            StreamOperator<?> operator = operatorWrapper.getStreamOperator();
+
+            if (operator == getMainOperator() || operator == 
getTailOperator()) {
+                OperatorSnapshotFutures snapshotInProgress = new 
OperatorSnapshotFutures();
+                if (operator == getMainOperator()) {
+                    snapshotInProgress.setInputChannelStateFuture(
+                            channelStateWriteResult
+                                    .getInputChannelStateHandles()
+                                    .thenApply(StateObjectCollection::new)
+                                    .thenApply(SnapshotResult::of));
+                }
+                if (operator == getTailOperator()) {
+                    snapshotInProgress.setResultSubpartitionStateFuture(
+                            channelStateWriteResult
+                                    .getResultSubpartitionStateHandles()
+                                    .thenApply(StateObjectCollection::new)
+                                    .thenApply(SnapshotResult::of));
+                }

Review comment:
       Could you refactor the code and deduplicate this with 
`RegularOperatorChain#buildOperatorSnapshotFutures`?
   
   For example this highlighted code could be extracted to a helper method in 
`OperatorChain` and used both in the `Regular` and `Finished` versions?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
##########
@@ -840,6 +846,72 @@ public void testOperatorSkipLifeCycleIfFinishedOnRestore() 
throws Exception {
         }
     }
 
+    /**
+     * This test verifies for tasks that finished on restore, when taking 
unaligned checkpoint the
+     * asynchronous part would wait for the channel states futures get 
completed, which means the
+     * barriers are aligned.
+     */
+    @Test
+    public void testWaitingForUnalignedChannelStatesIfFinishedOnRestore() 
throws Exception {
+        OperatorID operatorId = new OperatorID();
+        try (StreamTaskMailboxTestHarness<String> harness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, 
BasicTypeInfo.STRING_TYPE_INFO)
+                        .modifyStreamConfig(
+                                streamConfig -> 
streamConfig.setUnalignedCheckpointsEnabled(true))
+                        .addInput(BasicTypeInfo.STRING_TYPE_INFO, 3)
+                        .setCollectNetworkEvents()
+                        .setTaskStateSnapshot(1, 
TaskStateSnapshot.FINISHED_ON_RESTORE)
+                        .setupOperatorChain(new 
TestFinishedOnRestoreStreamOperator())
+                        .chain(
+                                operatorId,
+                                new 
TestFinishedOnRestoreStreamOperator(operatorId),
+                                StringSerializer.INSTANCE)
+                        .finish()
+                        .build()) {
+            // Finish the restore, including state initialization and open.
+            harness.processAll();
+
+            TestCheckpointResponder checkpointResponder = 
harness.getCheckpointResponder();
+            checkpointResponder.setAcknowledgeLatch(new OneShotLatch());
+            checkpointResponder.setDeclinedLatch(new OneShotLatch());
+
+            CheckpointBarrier unalignedBarrier =
+                    new CheckpointBarrier(2, 2, 
CheckpointOptions.unaligned(getDefault()));
+
+            // On first unaligned barrier, the task would take snapshot and 
start the asynchronous
+            // part. We slightly extend the process to make the asynchronous 
part start executing
+            // before the other barriers arrived.
+            harness.processEvent(unalignedBarrier, 0, 0);
+            Thread.sleep(1000);
+
+            // Finish the unaligned checkpoint.
+            harness.processEvent(unalignedBarrier, 0, 1);
+            harness.processEvent(unalignedBarrier, 0, 2);
+
+            // Wait till the asynchronous part finished either normally or 
exceptionally.
+            Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
+            while (deadline.hasTimeLeft()) {
+                if (checkpointResponder.getAcknowledgeLatch().isTriggered()
+                        || 
checkpointResponder.getDeclinedLatch().isTriggered()) {
+                    break;
+                }
+                Thread.sleep(1000);
+            }

Review comment:
       nit: maybe use `CommonTestUtils#waitUntilCondition`?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
##########
@@ -840,6 +846,72 @@ public void testOperatorSkipLifeCycleIfFinishedOnRestore() 
throws Exception {
         }
     }
 
+    /**
+     * This test verifies for tasks that finished on restore, when taking 
unaligned checkpoint the
+     * asynchronous part would wait for the channel states futures get 
completed, which means the
+     * barriers are aligned.
+     */
+    @Test
+    public void testWaitingForUnalignedChannelStatesIfFinishedOnRestore() 
throws Exception {
+        OperatorID operatorId = new OperatorID();
+        try (StreamTaskMailboxTestHarness<String> harness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, 
BasicTypeInfo.STRING_TYPE_INFO)
+                        .modifyStreamConfig(
+                                streamConfig -> 
streamConfig.setUnalignedCheckpointsEnabled(true))
+                        .addInput(BasicTypeInfo.STRING_TYPE_INFO, 3)
+                        .setCollectNetworkEvents()
+                        .setTaskStateSnapshot(1, 
TaskStateSnapshot.FINISHED_ON_RESTORE)
+                        .setupOperatorChain(new 
TestFinishedOnRestoreStreamOperator())
+                        .chain(
+                                operatorId,
+                                new 
TestFinishedOnRestoreStreamOperator(operatorId),
+                                StringSerializer.INSTANCE)
+                        .finish()
+                        .build()) {
+            // Finish the restore, including state initialization and open.
+            harness.processAll();
+
+            TestCheckpointResponder checkpointResponder = 
harness.getCheckpointResponder();
+            checkpointResponder.setAcknowledgeLatch(new OneShotLatch());
+            checkpointResponder.setDeclinedLatch(new OneShotLatch());
+
+            CheckpointBarrier unalignedBarrier =
+                    new CheckpointBarrier(2, 2, 
CheckpointOptions.unaligned(getDefault()));
+
+            // On first unaligned barrier, the task would take snapshot and 
start the asynchronous
+            // part. We slightly extend the process to make the asynchronous 
part start executing
+            // before the other barriers arrived.
+            harness.processEvent(unalignedBarrier, 0, 0);
+            Thread.sleep(1000);

Review comment:
       maybe extract the `500` constant used in other unit tests in this file 
and use the same constant here as well?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
##########
@@ -119,10 +119,7 @@ public void run() {
 

Review comment:
       Can you copy paste the PR description into the commit message?
   > This PR fixes the issue that when a task finished on restore taking 
unaligned checkpoints, currently the asynchronous part does not wait till the 
alignment of barriers get finished, thus throws exception due to some metrics 
are not set yet.
   
   + maybe copy/paste from the JIRA ticket
   
   ```
   . The task received the first barrier.
   2. With the process of unaligned checkpoint, the task would snapshot the 
state of the operators.
   3. The checkpoint would start the asynchronous part.
   4. Normally in the asynchronous part, it would wait till all the state 
futures get done, including the channel states and result partition states. 
With this method, it ensures the asynchronous part would wait till the last 
barrier arrived. But if the task has been fully finished before, these states 
are ignored and the assumption is broken.
   5. Then the asynchronous part would fail since when it try to build the 
CheckpointMetrics, the alignment for this checkpoint is in fact not done yet.
   ```
   ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to