This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 81cfe465c9e4a17e563e1b4c02cd60a63b984de5
Author: Arvid Heise <[email protected]>
AuthorDate: Tue Feb 23 09:40:31 2021 +0100

    [FLINK-21452][connector/common] Stop snapshotting registered readers in 
source coordinator.
    
    Sources used to store their registered readers into the snapshot. However, 
when downscaling, they have unmatched readers that violate a couple of 
invariants.
    The solution is to not store registered readers - they are re-registered on 
restart anyways.
    To keep it backward compatible, the best option is to always store an empty 
list of readers while writing the snapshot and discard any recovered readers 
from the snapshot.
---
 .../reader/CoordinatedSourceRescaleITCase.java     | 162 +++++++++++++++++++++
 .../coordinator/SourceCoordinatorContext.java      |  16 +-
 .../coordinator/SourceCoordinatorSerdeUtils.java   |  27 ----
 .../coordinator/SourceCoordinatorContextTest.java  |   3 +-
 .../coordinator/SourceCoordinatorProviderTest.java |   9 +-
 5 files changed, 177 insertions(+), 40 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
new file mode 100644
index 0000000..8ada12b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests if the coordinator handles up and downscaling. */
+public class CoordinatedSourceRescaleITCase {
+    public static final String CREATED_CHECKPOINT = "successfully created 
checkpoint";
+    public static final String RESTORED_CHECKPOINT = "successfully restored 
checkpoint";
+    @Rule public final TemporaryFolder temp = new TemporaryFolder();
+
+    @Test
+    public void testDownscaling() throws Exception {
+        final File checkpointDir = temp.newFolder();
+        final File lastCheckpoint = generateCheckpoint(checkpointDir, 7);
+        resumeCheckpoint(checkpointDir, lastCheckpoint, 3);
+    }
+
+    @Test
+    public void testUpscaling() throws Exception {
+        final File checkpointDir = temp.newFolder();
+        final File lastCheckpoint = generateCheckpoint(checkpointDir, 3);
+        resumeCheckpoint(checkpointDir, lastCheckpoint, 7);
+    }
+
+    private File generateCheckpoint(File checkpointDir, int p) throws 
IOException {
+        final StreamExecutionEnvironment env = createEnv(checkpointDir, null, 
p);
+
+        try {
+            env.execute("create checkpoint");
+            throw new AssertionError("No checkpoint");
+        } catch (Exception e) {
+            assertEquals(CREATED_CHECKPOINT, 
ExceptionUtils.getRootCause(e).getMessage());
+            return Files.find(
+                            checkpointDir.toPath(),
+                            2,
+                            (file, attr) ->
+                                    attr.isDirectory()
+                                            && 
file.getFileName().toString().startsWith("chk"))
+                    .min(Comparator.comparing(Path::toString))
+                    .map(Path::toFile)
+                    .orElseThrow(() -> new IllegalStateException("Cannot 
generate checkpoint", e));
+        }
+    }
+
+    private void resumeCheckpoint(File checkpointDir, File restoreCheckpoint, 
int p) {
+        final StreamExecutionEnvironment env = createEnv(checkpointDir, 
restoreCheckpoint, p);
+
+        try {
+            env.execute("resume checkpoint");
+            throw new AssertionError("No success error");
+        } catch (Exception e) {
+            if (RESTORED_CHECKPOINT != 
ExceptionUtils.getRootCause(e).getMessage()) {
+                throw new AssertionError("Cannot resume", e);
+            }
+        }
+    }
+
+    private StreamExecutionEnvironment createEnv(
+            File checkpointDir, @Nullable File restoreCheckpoint, int p) {
+        Configuration conf = new Configuration();
+        conf.setString(
+                CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toURI().toString());
+        conf.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 
MemorySize.parse("4kb"));
+        if (restoreCheckpoint != null) {
+            conf.set(SavepointConfigOptions.SAVEPOINT_PATH, 
restoreCheckpoint.toURI().toString());
+        }
+        conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, p);
+
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.createLocalEnvironment(p, conf);
+        env.enableCheckpointing(100);
+        env.getCheckpointConfig()
+                .enableExternalizedCheckpoints(
+                        
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        DataStream<Long> stream = env.fromSequence(0, Long.MAX_VALUE);
+        stream.map(new FailingMapFunction(restoreCheckpoint == 
null)).addSink(new SleepySink());
+
+        return env;
+    }
+
+    private static class FailingMapFunction extends RichMapFunction<Long, Long>
+            implements CheckpointListener {
+        private static final long serialVersionUID = 699621912578369378L;
+        private boolean generateCheckpoint;
+
+        FailingMapFunction(boolean generateCheckpoint) {
+            this.generateCheckpoint = generateCheckpoint;
+        }
+
+        @Override
+        public Long map(Long value) throws Exception {
+            // run a bit before failing
+            if (!generateCheckpoint && value % 100 == 42) {
+                throw new Exception(RESTORED_CHECKPOINT);
+            }
+            return value;
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+            if (generateCheckpoint && checkpointId > 5) {
+                throw new Exception(CREATED_CHECKPOINT);
+            }
+        }
+    }
+
+    private static class SleepySink implements SinkFunction<Long> {
+        private static final long serialVersionUID = -3542950841846119765L;
+
+        @Override
+        public void invoke(Long value, Context context) throws Exception {
+            if (value % 1000 == 0) {
+                Thread.sleep(1);
+            }
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 493ce69..a262807 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -58,7 +58,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 
 import static 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readRegisteredReaders;
-import static 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.writeRegisteredReaders;
 
 /**
  * A context class for the {@link OperatorCoordinator}. Compared with {@link 
SplitEnumeratorContext}
@@ -288,7 +287,8 @@ public class SourceCoordinatorContext<SplitT extends 
SourceSplit>
             SimpleVersionedSerializer<SplitT> splitSerializer,
             DataOutputStream out)
             throws Exception {
-        writeRegisteredReaders(registeredReaders, out);
+        // FLINK-21452: backwards compatible change to drop writing registered 
readers (empty list)
+        out.writeInt(0);
         assignmentTracker.snapshotState(checkpointId, splitSerializer, out);
     }
 
@@ -301,9 +301,8 @@ public class SourceCoordinatorContext<SplitT extends 
SourceSplit>
      */
     void restoreState(SimpleVersionedSerializer<SplitT> splitSerializer, 
DataInputStream in)
             throws Exception {
-        Map<Integer, ReaderInfo> readers = readRegisteredReaders(in);
-        registeredReaders.clear();
-        registeredReaders.putAll(readers);
+        // FLINK-21452: discard readers as they will be re-registering 
themselves
+        readRegisteredReaders(in);
         assignmentTracker.restoreState(splitSerializer, in);
     }
 
@@ -313,7 +312,12 @@ public class SourceCoordinatorContext<SplitT extends 
SourceSplit>
      * @param readerInfo the reader information of the source reader.
      */
     void registerSourceReader(ReaderInfo readerInfo) {
-        registeredReaders.put(readerInfo.getSubtaskId(), readerInfo);
+        final ReaderInfo previousReader =
+                registeredReaders.put(readerInfo.getSubtaskId(), readerInfo);
+        if (previousReader != null) {
+            throw new IllegalStateException(
+                    "Overwriting " + previousReader + " with " + readerInfo);
+        }
     }
 
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java
index 1295d4a..51c4ed7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java
@@ -49,20 +49,6 @@ public class SourceCoordinatorSerdeUtils {
         }
     }
 
-    /**
-     * Get serialized size of the registered readers map.
-     *
-     * <p>The binary format is following: 4 Bytes - num entries. N Bytes - 
entries 4 Bytes - subtask
-     * id N Bytes - reader info, see {@link #writeReaderInfo(ReaderInfo, 
DataOutputStream)}.
-     */
-    static void writeRegisteredReaders(
-            Map<Integer, ReaderInfo> registeredReaders, DataOutputStream out) 
throws IOException {
-        out.writeInt(registeredReaders.size());
-        for (ReaderInfo info : registeredReaders.values()) {
-            writeReaderInfo(info, out);
-        }
-    }
-
     static Map<Integer, ReaderInfo> readRegisteredReaders(DataInputStream in) 
throws IOException {
         int numReaders = in.readInt();
         Map<Integer, ReaderInfo> registeredReaders = new HashMap<>();
@@ -144,19 +130,6 @@ public class SourceCoordinatorSerdeUtils {
 
     // ----- private helper methods -----
 
-    /**
-     * Serialize {@link ReaderInfo}.
-     *
-     * <p>The binary format is following: 4 Bytes - subtask id N Bytes - 
location string
-     *
-     * @param readerInfo the given reader information to serialize.
-     */
-    private static void writeReaderInfo(ReaderInfo readerInfo, 
DataOutputStream out)
-            throws IOException {
-        out.writeInt(readerInfo.getSubtaskId());
-        out.writeUTF(readerInfo.getLocation());
-    }
-
     private static ReaderInfo readReaderInfo(DataInputStream in) throws 
IOException {
         int subtaskId = in.readInt();
         String location = in.readUTF();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index 3fdcdae..389a6b9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -172,7 +172,8 @@ public class SourceCoordinatorContextTest extends 
SourceCoordinatorTestBase {
                             restoredTracker);
             restoredContext.restoreState(new MockSourceSplitSerializer(), in);
         }
-        assertEquals(context.registeredReaders(), 
restoredContext.registeredReaders());
+        // FLINK-21452: do not (re)store registered readers
+        assertEquals(0, restoredContext.registeredReaders().size());
         assertEquals(
                 splitSplitAssignmentTracker.uncheckpointedAssignments(),
                 restoredTracker.uncheckpointedAssignments());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
index 966fc0d..33b085c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
@@ -36,7 +36,6 @@ import java.util.concurrent.CompletableFuture;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 /** Unit tests for {@link SourceCoordinatorProvider}. */
@@ -96,13 +95,11 @@ public class SourceCoordinatorProviderTest {
                 "The restored source coordinator should be a different 
instance",
                 restoredSourceCoordinator,
                 sourceCoordinator);
+        // FLINK-21452: do not (re)store registered readers
         assertEquals(
-                "There should only be one registered reader.",
-                1,
+                "There should be no registered reader.",
+                0,
                 
restoredSourceCoordinator.getContext().registeredReaders().size());
-        assertNotNull(
-                "The only registered reader should be reader 0",
-                
restoredSourceCoordinator.getContext().registeredReaders().get(0));
     }
 
     @Test

Reply via email to