This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git

commit acbf4b953c22c5294894a0b753c6d3a1211cf643
Author: Yun Gao <gaoyunhen...@gmail.com>
AuthorDate: Wed Nov 10 22:27:49 2021 +0800

    [FLINK-24807][iteration] Not start logging at the head operator if the 
barrier feed back first
    
    This closes #25.
---
 .../flink/iteration/checkpoint/Checkpoints.java    | 22 +++++++++++++++
 .../flink/iteration/operator/HeadOperatorTest.java | 32 ++++++++++++++++++++++
 2 files changed, 54 insertions(+)

diff --git 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/checkpoint/Checkpoints.java
 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/checkpoint/Checkpoints.java
index edbfeba..9a16432 100644
--- 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/checkpoint/Checkpoints.java
+++ 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/checkpoint/Checkpoints.java
@@ -48,11 +48,25 @@ public class Checkpoints<T> implements AutoCloseable {
     private final FileSystem fileSystem;
     private final SupplierWithException<Path, IOException> pathSupplier;
 
+    /**
+     * Stores the pending checkpoints and whether they are canceled. This 
field would be shared
+     * between the head and tail operators for aborting the checkpoints.
+     */
     private final ConcurrentHashMap<Long, Tuple2<PendingCheckpoint, Boolean>>
             uncompletedCheckpoints = new ConcurrentHashMap<>();
 
+    /**
+     * Stores the list of pending checkpoints ordered by the checkpoint id. 
This field is only
+     * accessed by the head operator.
+     */
     private final TreeMap<Long, PendingCheckpoint> 
sortedUncompletedCheckpoints = new TreeMap<>();
 
+    /**
+     * Stores the checkpoint id of the latest completed one. It is to avoid 
the feedback barrier get
+     * processed before the head operator actually snapshots the state.
+     */
+    private long latestCompletedCheckpointId;
+
     public Checkpoints(
             TypeSerializer<T> typeSerializer,
             FileSystem fileSystem,
@@ -77,6 +91,10 @@ public class Checkpoints<T> implements AutoCloseable {
 
     public void startLogging(long checkpointId, 
OperatorStateCheckpointOutputStream outputStream)
             throws IOException {
+        if (checkpointId <= latestCompletedCheckpointId) {
+            return;
+        }
+
         Tuple2<PendingCheckpoint, Boolean> possibleCheckpoint =
                 uncompletedCheckpoints.computeIfAbsent(
                         checkpointId,
@@ -123,6 +141,10 @@ public class Checkpoints<T> implements AutoCloseable {
     }
 
     public void commitCheckpointsUntil(long checkpointId) {
+        if (latestCompletedCheckpointId < checkpointId) {
+            latestCompletedCheckpointId = checkpointId;
+        }
+
         SortedMap<Long, PendingCheckpoint> completedCheckpoints =
                 sortedUncompletedCheckpoints.headMap(checkpointId, true);
         completedCheckpoints
diff --git 
a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/HeadOperatorTest.java
 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/HeadOperatorTest.java
index 3a407a0..c1fe602 100644
--- 
a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/HeadOperatorTest.java
+++ 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/HeadOperatorTest.java
@@ -767,6 +767,38 @@ public class HeadOperatorTest extends TestLogger {
                 });
     }
 
+    @Test(timeout = 20000)
+    public void testCheckpointsWithBarrierFeedbackFirst() throws Exception {
+        IterationID iterationId = new IterationID();
+        OperatorID operatorId = new OperatorID();
+
+        createHarnessAndRun(
+                iterationId,
+                operatorId,
+                null,
+                harness -> {
+                    
harness.getTaskStateManager().getWaitForReportLatch().reset();
+                    harness.processElement(new 
StreamRecord<>(IterationRecord.newRecord(100, 0)));
+                    harness.processAll();
+
+                    harness.getStreamTask()
+                            .triggerCheckpointAsync(
+                                    new CheckpointMetaData(2, 1000),
+                                    CheckpointOptions.alignedNoTimeout(
+                                            CheckpointType.CHECKPOINT,
+                                            
CheckpointStorageLocationReference.getDefault()));
+
+                    // Simulates that the barrier get feed back before the
+                    // CoordinatorCheckpointEvent is dispatched. If we not 
handle this case,
+                    // there would be deadlock.
+                    putFeedbackRecords(iterationId, 
IterationRecord.newBarrier(2), null);
+                    dispatchOperatorEvent(harness, operatorId, new 
CoordinatorCheckpointEvent(2));
+                    harness.processAll();
+                    
harness.getTaskStateManager().getWaitForReportLatch().await();
+                    return null;
+                });
+    }
+
     private <T> T createHarnessAndRun(
             IterationID iterationId,
             OperatorID operatorId,

Reply via email to