This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-ml.git
commit acbf4b953c22c5294894a0b753c6d3a1211cf643 Author: Yun Gao <gaoyunhen...@gmail.com> AuthorDate: Wed Nov 10 22:27:49 2021 +0800 [FLINK-24807][iteration] Not start logging at the head operator if the barrier feed back first This closes #25. --- .../flink/iteration/checkpoint/Checkpoints.java | 22 +++++++++++++++ .../flink/iteration/operator/HeadOperatorTest.java | 32 ++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/checkpoint/Checkpoints.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/checkpoint/Checkpoints.java index edbfeba..9a16432 100644 --- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/checkpoint/Checkpoints.java +++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/checkpoint/Checkpoints.java @@ -48,11 +48,25 @@ public class Checkpoints<T> implements AutoCloseable { private final FileSystem fileSystem; private final SupplierWithException<Path, IOException> pathSupplier; + /** + * Stores the pending checkpoints and whether they are canceled. This field would be shared + * between the head and tail operators for aborting the checkpoints. + */ private final ConcurrentHashMap<Long, Tuple2<PendingCheckpoint, Boolean>> uncompletedCheckpoints = new ConcurrentHashMap<>(); + /** + * Stores the list of pending checkpoints ordered by the checkpoint id. This field is only + * accessed by the head operator. + */ private final TreeMap<Long, PendingCheckpoint> sortedUncompletedCheckpoints = new TreeMap<>(); + /** + * Stores the checkpoint id of the latest completed one. It is to avoid the feedback barrier get + * processed before the head operator actually snapshots the state. + */ + private long latestCompletedCheckpointId; + public Checkpoints( TypeSerializer<T> typeSerializer, FileSystem fileSystem, @@ -77,6 +91,10 @@ public class Checkpoints<T> implements AutoCloseable { public void startLogging(long checkpointId, OperatorStateCheckpointOutputStream outputStream) throws IOException { + if (checkpointId <= latestCompletedCheckpointId) { + return; + } + Tuple2<PendingCheckpoint, Boolean> possibleCheckpoint = uncompletedCheckpoints.computeIfAbsent( checkpointId, @@ -123,6 +141,10 @@ public class Checkpoints<T> implements AutoCloseable { } public void commitCheckpointsUntil(long checkpointId) { + if (latestCompletedCheckpointId < checkpointId) { + latestCompletedCheckpointId = checkpointId; + } + SortedMap<Long, PendingCheckpoint> completedCheckpoints = sortedUncompletedCheckpoints.headMap(checkpointId, true); completedCheckpoints diff --git a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/HeadOperatorTest.java b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/HeadOperatorTest.java index 3a407a0..c1fe602 100644 --- a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/HeadOperatorTest.java +++ b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/HeadOperatorTest.java @@ -767,6 +767,38 @@ public class HeadOperatorTest extends TestLogger { }); } + @Test(timeout = 20000) + public void testCheckpointsWithBarrierFeedbackFirst() throws Exception { + IterationID iterationId = new IterationID(); + OperatorID operatorId = new OperatorID(); + + createHarnessAndRun( + iterationId, + operatorId, + null, + harness -> { + harness.getTaskStateManager().getWaitForReportLatch().reset(); + harness.processElement(new StreamRecord<>(IterationRecord.newRecord(100, 0))); + harness.processAll(); + + harness.getStreamTask() + .triggerCheckpointAsync( + new CheckpointMetaData(2, 1000), + CheckpointOptions.alignedNoTimeout( + CheckpointType.CHECKPOINT, + CheckpointStorageLocationReference.getDefault())); + + // Simulates that the barrier get feed back before the + // CoordinatorCheckpointEvent is dispatched. If we not handle this case, + // there would be deadlock. + putFeedbackRecords(iterationId, IterationRecord.newBarrier(2), null); + dispatchOperatorEvent(harness, operatorId, new CoordinatorCheckpointEvent(2)); + harness.processAll(); + harness.getTaskStateManager().getWaitForReportLatch().await(); + return null; + }); + } + private <T> T createHarnessAndRun( IterationID iterationId, OperatorID operatorId,