This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bf1fbd  [FLINK-13973][runtime][checkpoint] Fix the state assignments 
if uidHash is set
8bf1fbd is described below

commit 8bf1fbd4d4bb0c0ce664aeb36395238612341a44
Author: Yun Gao <gaoyunhen...@gmail.com>
AuthorDate: Mon Aug 16 15:56:45 2021 +0800

    [FLINK-13973][runtime][checkpoint] Fix the state assignments if uidHash is 
set
    
    This closes #16836.
---
 .../checkpoint/StateAssignmentOperation.java       |   1 +
 .../CheckpointRestoreWithUidHashITCase.java        | 317 +++++++++++++++++++++
 2 files changed, 318 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 409d1a02..c409340 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -108,6 +108,7 @@ public class StateAssignmentOperation {
                 OperatorID operatorID =
                         operatorIDPair
                                 .getUserDefinedOperatorID()
+                                .filter(localOperators::containsKey)
                                 
.orElse(operatorIDPair.getGeneratedOperatorID());
 
                 OperatorState operatorState = 
localOperators.remove(operatorID);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointRestoreWithUidHashITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointRestoreWithUidHashITCase.java
new file mode 100644
index 0000000..e470a3b
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointRestoreWithUidHashITCase.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.OperatorIDPair;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.IntStream;
+
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+
+/**
+ * Verifies that with {@code uidHash} a job could restore state from an 
existing savepoint, and the
+ * job would still be able to restored from the checkpoints taken after 
restarted correctly without
+ * losing states due to mismatched operator id.
+ */
+public class CheckpointRestoreWithUidHashITCase {
+
+    @ClassRule public static final TemporaryFolder TMP_FOLDER = new 
TemporaryFolder();
+
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+
+    private SharedReference<CountDownLatch> startWaitingForCheckpointLatch;
+
+    private SharedReference<List<Integer>> result;
+
+    @Before
+    public void setup() {
+        startWaitingForCheckpointLatch = sharedObjects.add(new 
CountDownLatch(1));
+        result = sharedObjects.add(new ArrayList<>());
+    }
+
+    @Test
+    public void testRestoreFromSavepointBySetUidHash() throws Exception {
+        final int maxNumber = 100;
+
+        try (MiniCluster miniCluster = new 
MiniCluster(createMiniClusterConfig())) {
+            miniCluster.start();
+
+            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+            JobGraph firstJob =
+                    createJobGraph(
+                            env,
+                            
StatefulSourceBehavior.HOLD_AFTER_CHECKPOINT_ON_FIRST_RUN,
+                            maxNumber,
+                            "test-uid",
+                            null,
+                            null);
+            JobID jobId = miniCluster.submitJob(firstJob).get().getJobID();
+            waitForAllTaskRunning(miniCluster, jobId, false);
+
+            // The source would emit some records and start waiting for the 
checkpoint to happen.
+            // With this latch we ensures the savepoint happens in a fixed 
position and no following
+            // records are emitted after savepoint is triggered.
+            startWaitingForCheckpointLatch.get().await();
+            String savepointPath =
+                    miniCluster
+                            .triggerSavepoint(jobId, 
TMP_FOLDER.newFolder().getAbsolutePath(), true)
+                            .get();
+
+            // Get the operator id
+            List<OperatorIDPair> operatorIds =
+                    
firstJob.getVerticesSortedTopologicallyFromSources().get(0).getOperatorIDs();
+            OperatorIDPair sourceOperatorIds = 
operatorIds.get(operatorIds.size() - 1);
+
+            JobGraph secondJob =
+                    createJobGraph(
+                            env,
+                            StatefulSourceBehavior.PROCESS_ONLY,
+                            maxNumber,
+                            null,
+                            
sourceOperatorIds.getGeneratedOperatorID().toHexString(),
+                            savepointPath);
+            miniCluster.executeJobBlocking(secondJob);
+        }
+        assertThat(result.get(), contains(IntStream.range(0, 
maxNumber).boxed().toArray()));
+    }
+
+    @Test
+    public void testRestoreCheckpointAfterFailoverWithUidHashSet() throws 
Exception {
+        final int maxNumber = 100;
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 500));
+        env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
+
+        JobGraph jobGraph =
+                createJobGraph(
+                        env,
+                        
StatefulSourceBehavior.FAIL_AFTER_CHECKPOINT_ON_FIRST_RUN,
+                        maxNumber,
+                        null,
+                        new OperatorID().toHexString(),
+                        null);
+
+        try (MiniCluster miniCluster = new 
MiniCluster(createMiniClusterConfig())) {
+            miniCluster.start();
+            miniCluster.executeJobBlocking(jobGraph);
+        }
+        assertThat(result.get(), contains(IntStream.range(0, 
maxNumber).boxed().toArray()));
+    }
+
+    private MiniClusterConfiguration createMiniClusterConfig() {
+        final Configuration config = new Configuration();
+        config.setString(RestOptions.BIND_PORT, "18081-19000");
+        return new MiniClusterConfiguration.Builder()
+                .setNumTaskManagers(1)
+                .setNumSlotsPerTaskManager(1)
+                .setConfiguration(config)
+                .build();
+    }
+
+    private JobGraph createJobGraph(
+            StreamExecutionEnvironment env,
+            StatefulSourceBehavior behavior,
+            int maxNumber,
+            @Nullable String uid,
+            @Nullable String uidHash,
+            @Nullable String savepointPath) {
+
+        SingleOutputStreamOperator<Integer> source =
+                env.addSource(
+                                new StatefulSource(
+                                        behavior, maxNumber, 
startWaitingForCheckpointLatch))
+                        .setParallelism(1);
+        if (uid != null) {
+            source = source.uid(uid);
+        }
+
+        if (uidHash != null) {
+            source = source.setUidHash(uidHash);
+        }
+
+        source.addSink(new CollectSink(result)).setParallelism(1);
+        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+        if (savepointPath != null) {
+            jobGraph.setSavepointRestoreSettings(
+                    SavepointRestoreSettings.forPath(savepointPath, false));
+        }
+
+        return jobGraph;
+    }
+
+    private enum StatefulSourceBehavior {
+        PROCESS_ONLY(false),
+        HOLD_AFTER_CHECKPOINT_ON_FIRST_RUN(true),
+        FAIL_AFTER_CHECKPOINT_ON_FIRST_RUN(true);
+
+        boolean waitForCheckpointOnFirstRun;
+
+        StatefulSourceBehavior(boolean waitForCheckpointOnFirstRun) {
+            this.waitForCheckpointOnFirstRun = waitForCheckpointOnFirstRun;
+        }
+    }
+
+    private static class StatefulSource extends RichSourceFunction<Integer>
+            implements CheckpointedFunction, CheckpointListener {
+
+        private final StatefulSourceBehavior behavior;
+
+        private final int maxNumber;
+
+        private final SharedReference<CountDownLatch> 
startWaitingForCheckpointLatch;
+
+        private ListState<Integer> nextNumberState;
+
+        private int nextNumber;
+
+        private volatile boolean isCanceled;
+
+        private volatile boolean isWaiting;
+
+        private volatile long firstCheckpointIdAfterWaiting;
+
+        private volatile boolean checkpointCompletedAfterWaiting;
+
+        public StatefulSource(
+                StatefulSourceBehavior behavior,
+                int maxNumber,
+                SharedReference<CountDownLatch> 
startWaitingForCheckpointLatch) {
+            this.behavior = behavior;
+            this.maxNumber = maxNumber;
+            this.startWaitingForCheckpointLatch = 
startWaitingForCheckpointLatch;
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) 
throws Exception {
+            nextNumberState =
+                    context.getOperatorStateStore()
+                            .getListState(new ListStateDescriptor<>("next", 
Integer.class));
+            if (nextNumberState.get().iterator().hasNext()) {
+                nextNumber = nextNumberState.get().iterator().next();
+            }
+        }
+
+        @Override
+        public void run(SourceContext<Integer> ctx) throws Exception {
+            emitRecordsTill(maxNumber / 3, ctx);
+
+            if (behavior.waitForCheckpointOnFirstRun
+                    && getRuntimeContext().getAttemptNumber() == 0) {
+                // Wait till one checkpoint is triggered and completed
+                isWaiting = true;
+                startWaitingForCheckpointLatch.get().countDown();
+                while (!checkpointCompletedAfterWaiting) {
+                    Thread.sleep(200);
+                }
+
+                if (behavior == 
StatefulSourceBehavior.FAIL_AFTER_CHECKPOINT_ON_FIRST_RUN) {
+                    throw new RuntimeException("Artificial Exception");
+                } else if (behavior == 
StatefulSourceBehavior.HOLD_AFTER_CHECKPOINT_ON_FIRST_RUN) {
+                    while (!isCanceled) {
+                        Thread.sleep(200);
+                    }
+                }
+            } else {
+                emitRecordsTill(maxNumber, ctx);
+            }
+        }
+
+        private void emitRecordsTill(int endExclusive, SourceContext<Integer> 
ctx) {
+            while (!isCanceled && nextNumber < endExclusive) {
+                synchronized (ctx.getCheckpointLock()) {
+                    ctx.collect(nextNumber);
+                    nextNumber++;
+                }
+            }
+        }
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+            nextNumberState.update(Collections.singletonList(nextNumber));
+
+            if (isWaiting && firstCheckpointIdAfterWaiting <= 0) {
+                firstCheckpointIdAfterWaiting = context.getCheckpointId();
+            }
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+            if (firstCheckpointIdAfterWaiting > 0
+                    && checkpointId >= firstCheckpointIdAfterWaiting) {
+                checkpointCompletedAfterWaiting = true;
+            }
+        }
+
+        @Override
+        public void cancel() {
+            isCanceled = true;
+        }
+    }
+
+    private static class CollectSink implements SinkFunction<Integer> {
+
+        private final SharedReference<List<Integer>> result;
+
+        public CollectSink(SharedReference<List<Integer>> result) {
+            this.result = result;
+        }
+
+        @Override
+        public void invoke(Integer value, Context context) throws Exception {
+            result.get().add(value);
+        }
+    }
+}

Reply via email to