This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 64ca88ac989ee7525cb821670f293404b7b30d2d
Author: Zhijiang <wangzhijiang...@aliyun.com>
AuthorDate: Fri May 29 11:57:33 2020 +0200

    [FLINK-17994][checkpointing] Fix the race condition between 
CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived
    
    The race condition happens as following:
    1. CheckpointBarrierUnaligner#notifyBarrierReceived triggers an async 
checkpoint(ch1) in mailbox by netty thread.
    2. CheckpointBarrierUnaligner#processBarrier also triggers a sync 
checkpoint(ch2) by task thread and executes immediately.
    3. When ch1 is taken from mailbox by task thread to execute, it will cause 
illegal argument exception because it is smaller than the previous executed ch2.
    
    For async checkpoint action, before it is actual executing, we can compare 
its id with previous executed checkpoint id. If it is not larger than the 
previous
    one, we should ignore it to exit directly.
    
    This closes #12406.
---
 .../runtime/io/CheckpointBarrierUnaligner.java     |  26 ++++--
 .../flink/streaming/runtime/tasks/StreamTask.java  |   3 +-
 .../runtime/io/CheckpointBarrierUnalignerTest.java | 100 ++++++++++++++++++++-
 3 files changed, 119 insertions(+), 10 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
index 01b1219..1d4bf82 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
@@ -137,8 +137,9 @@ public class CheckpointBarrierUnaligner extends 
CheckpointBarrierHandler {
        }
 
        /**
-        * We still need to trigger checkpoint while reading the first barrier 
from one channel, because
-        * this might happen earlier than the previous async trigger via 
mailbox by netty thread.
+        * We still need to trigger checkpoint via {@link 
ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, InputChannelInfo)}
+        * while reading the first barrier from one channel, because this might 
happen
+        * earlier than the previous async trigger via mailbox by netty thread.
         *
         * <p>Note this is also suitable for the trigger case of local input 
channel.
         */
@@ -256,8 +257,20 @@ public class CheckpointBarrierUnaligner extends 
CheckpointBarrierHandler {
                return threadSafeUnaligner.getNumOpenChannels();
        }
 
+       @VisibleForTesting
+       ThreadSafeUnaligner getThreadSafeUnaligner() {
+               return threadSafeUnaligner;
+       }
+
+       private void notifyCheckpoint(CheckpointBarrier barrier) throws 
IOException {
+               // ignore the previous triggered checkpoint by netty thread if 
it was already canceled or aborted before.
+               if (barrier.getId() >= 
threadSafeUnaligner.getCurrentCheckpointId()) {
+                       super.notifyCheckpoint(barrier, 0);
+               }
+       }
+
        @ThreadSafe
-       private static class ThreadSafeUnaligner implements 
BufferReceivedListener, Closeable {
+       static class ThreadSafeUnaligner implements BufferReceivedListener, 
Closeable {
 
                /**
                 * Tag the state of which input channel has not received the 
barrier, such that newly arriving buffers need
@@ -280,7 +293,6 @@ public class CheckpointBarrierUnaligner extends 
CheckpointBarrierHandler {
                 */
                private long currentReceivedCheckpointId = -1L;
 
-               /** The number of open channels. */
                private int numOpenChannels;
 
                private final ChannelStateWriter channelStateWriter;
@@ -300,7 +312,7 @@ public class CheckpointBarrierUnaligner extends 
CheckpointBarrierHandler {
 
                        if (currentReceivedCheckpointId < barrierId) {
                                handleNewCheckpoint(barrier);
-                               handler.executeInTaskThread(() -> 
handler.notifyCheckpoint(barrier, 0), "notifyCheckpoint");
+                               handler.executeInTaskThread(() -> 
handler.notifyCheckpoint(barrier), "notifyCheckpoint");
                        }
 
                        int channelIndex = 
handler.getFlattenedChannelIndex(channelInfo);
@@ -396,5 +408,9 @@ public class CheckpointBarrierUnaligner extends 
CheckpointBarrierHandler {
                synchronized int getNumOpenChannels() {
                        return numOpenChannels;
                }
+
+               synchronized long getCurrentCheckpointId() {
+                       return currentReceivedCheckpointId;
+               }
        }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 7c08712..7f0011f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -545,7 +545,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                cleanUpInvoke();
        }
 
-       protected boolean runMailboxStep() throws Exception {
+       @VisibleForTesting
+       public boolean runMailboxStep() throws Exception {
                return mailboxProcessor.runMailboxStep();
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
index 7244539..18a60ec 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
@@ -37,6 +39,9 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBui
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import 
org.apache.flink.streaming.runtime.io.CheckpointBarrierUnaligner.ThreadSafeUnaligner;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
 import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.junit.After;
@@ -48,6 +53,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -79,11 +87,15 @@ public class CheckpointBarrierUnalignerTest {
 
        @After
        public void ensureEmpty() throws Exception {
-               assertFalse(inputGate.pollNext().isPresent());
-               assertTrue(inputGate.isFinished());
+               if (inputGate != null) {
+                       assertFalse(inputGate.pollNext().isPresent());
+                       assertTrue(inputGate.isFinished());
+                       inputGate.close();
+               }
 
-               channelStateWriter.close();
-               inputGate.close();
+               if (channelStateWriter != null) {
+                       channelStateWriter.close();
+               }
        }
 
        // 
------------------------------------------------------------------------
@@ -463,6 +475,43 @@ public class CheckpointBarrierUnalignerTest {
                assertInflightData();
        }
 
+       /**
+        * Tests the race condition between {@link 
CheckpointBarrierUnaligner#processBarrier(CheckpointBarrier, int)}
+        * and {@link 
ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, 
InputChannelInfo)}. The barrier
+        * notification will trigger an async checkpoint (ch1) via mailbox, and 
meanwhile the barrier processing will
+        * execute the next checkpoint (ch2) directly in advance. When the ch1 
action is taken from mailbox to execute,
+        * it should be exit because it is smaller than the finished ch2.
+        */
+       @Test
+       public void testConcurrentProcessBarrierAndNotifyBarrierReceived() 
throws Exception {
+               final ValidatingCheckpointInvokable invokable = new 
ValidatingCheckpointInvokable();
+               final CheckpointBarrierUnaligner handler = new 
CheckpointBarrierUnaligner(new int[] { 1 }, ChannelStateWriter.NO_OP, "test", 
invokable);
+               final InputChannelInfo channelInfo = new InputChannelInfo(0, 0);
+               final ExecutorService executor = 
Executors.newFixedThreadPool(1);
+
+               try {
+                       // Enqueue the checkpoint (ch0) action into the mailbox 
of invokable because it is triggered by other thread.
+                       Callable<Void> notifyTask = () -> {
+                               
handler.getThreadSafeUnaligner().notifyBarrierReceived(buildCheckpointBarrier(0),
 channelInfo);
+                               return null;
+                       };
+                       Future<Void> result = executor.submit(notifyTask);
+                       result.get();
+
+                       // Execute the checkpoint (ch1) directly because it is 
triggered by main thread.
+                       handler.processBarrier(buildCheckpointBarrier(1), 0);
+
+                       // Run the previous queued mailbox action to execute 
ch0.
+                       invokable.runMailboxStep();
+
+                       // ch0 will not be executed finally because it is 
smaller than the previously executed ch1.
+                       assertEquals(1, invokable.getTriggeredCheckpointId());
+                       assertEquals(1, 
invokable.getTotalTriggeredCheckpoints());
+               } finally {
+                       executor.shutdown();
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Utils
        // 
------------------------------------------------------------------------
@@ -571,6 +620,10 @@ public class CheckpointBarrierUnalignerTest {
                        .collect(Collectors.toList());
        }
 
+       private CheckpointBarrier buildCheckpointBarrier(int id) {
+               return new CheckpointBarrier(id, 0, 
CheckpointOptions.forCheckpointWithDefaultLocation());
+       }
+
        // 
------------------------------------------------------------------------
        //  Testing Mocks
        // 
------------------------------------------------------------------------
@@ -639,4 +692,43 @@ public class CheckpointBarrierUnalignerTest {
                        return lastCanceledCheckpointId;
                }
        }
+
+       /**
+        * Specific {@link AbstractInvokable} implementation to record and 
validate which checkpoint
+        * id is executed and how many checkpoints are executed.
+        */
+       private static final class ValidatingCheckpointInvokable extends 
StreamTask {
+
+               private long expectedCheckpointId;
+
+               private int totalNumCheckpoints;
+
+               ValidatingCheckpointInvokable() throws Exception {
+                       super(new DummyEnvironment("test", 1, 0));
+               }
+
+               @Override
+               public void init() {
+               }
+
+               @Override
+               protected void processInput(MailboxDefaultAction.Controller 
controller) {
+               }
+
+               public void triggerCheckpointOnBarrier(
+                               CheckpointMetaData checkpointMetaData,
+                               CheckpointOptions checkpointOptions,
+                               CheckpointMetrics checkpointMetrics) {
+                       expectedCheckpointId = 
checkpointMetaData.getCheckpointId();
+                       totalNumCheckpoints++;
+               }
+
+               long getTriggeredCheckpointId() {
+                       return expectedCheckpointId;
+               }
+
+               int getTotalTriggeredCheckpoints() {
+                       return totalNumCheckpoints;
+               }
+       }
 }

Reply via email to