This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch revert-21371-30166
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 93c7505d3133848686c3704d7bfc2f59af83e92e
Author: Matthias Pohl <matthias.p...@aiven.io>
AuthorDate: Mon Jan 9 13:26:23 2023 +0100

    Revert "[FLINK-30166][Tests] Remove no longer necessary test"
    
    This reverts commit 21c44c0863b52090a49da46ed228d3de18799506.
---
 .../f5e3e868-8d92-4258-9654-a605dc9c550f           |   6 +
 .../file/sink/writer/FileSinkMigrationITCase.java  | 291 +++++++++++++++++++++
 2 files changed, 297 insertions(+)

diff --git 
a/flink-connectors/flink-connector-files/archunit-violations/f5e3e868-8d92-4258-9654-a605dc9c550f
 
b/flink-connectors/flink-connector-files/archunit-violations/f5e3e868-8d92-4258-9654-a605dc9c550f
index a8d59151016..ff88812bba2 100644
--- 
a/flink-connectors/flink-connector-files/archunit-violations/f5e3e868-8d92-4258-9654-a605dc9c550f
+++ 
b/flink-connectors/flink-connector-files/archunit-violations/f5e3e868-8d92-4258-9654-a605dc9c550f
@@ -10,3 +10,9 @@ 
org.apache.flink.connector.file.sink.StreamingExecutionFileSinkITCase does not s
 * reside in a package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class InternalMiniClusterExtension\
 * reside outside of package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class MiniClusterExtension\
  or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientResource and public and 
final and not static and annotated with @Rule
+org.apache.flink.connector.file.sink.writer.FileSinkMigrationITCase does not 
satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that 
are static, final, and of type InternalMiniClusterExtension and annotated with 
@RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any 
fields that are static, final, and of type MiniClusterExtension and annotated 
with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientResource and public and 
final and not static and annotated with @Rule
\ No newline at end of file
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java
new file mode 100644
index 00000000000..90dde5462c4
--- /dev/null
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.testutils.junit.SharedObjectsExtension;
+import org.apache.flink.testutils.junit.SharedReference;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests migrating from {@link StreamingFileSink} to {@link FileSink}. It 
trigger a savepoint for
+ * the {@link StreamingFileSink} job and restore the {@link FileSink} job from 
the savepoint taken.
+ */
+class FileSinkMigrationITCase {
+
+    @RegisterExtension
+    private final SharedObjectsExtension sharedObjects = 
SharedObjectsExtension.create();
+
+    private static final String SOURCE_UID = "source";
+
+    private static final String SINK_UID = "sink";
+
+    private static final int NUM_SOURCES = 4;
+
+    private static final int NUM_SINKS = 3;
+
+    private static final int NUM_RECORDS = 10000;
+
+    private static final int NUM_BUCKETS = 4;
+
+    private SharedReference<CountDownLatch> finalCheckpointLatch;
+
+    @BeforeEach
+    void setup() {
+        // We wait for two successful checkpoints in sources before shutting 
down. This ensures that
+        // the sink can commit its data.
+        // We need to keep a "static" latch here because all sources need to 
be kept running
+        // while we're waiting for the required number of checkpoints. 
Otherwise, we would lock up
+        // because we can only do checkpoints while all operators are running.
+        finalCheckpointLatch = sharedObjects.add(new 
CountDownLatch(NUM_SOURCES * 2));
+    }
+
+    @Test
+    void test() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        SharedReference<Collection<Long>> list = sharedObjects.add(new 
ArrayList<>());
+        int n = 10000;
+        env.setParallelism(100);
+        env.fromSequence(0, n).map(i -> list.applySync(l -> l.add(i)));
+        env.execute();
+        assertThat(list.get()).hasSize(n + 1);
+        assertThat(LongStream.rangeClosed(0, 
n).boxed().collect(Collectors.toList()))
+                
.isEqualTo(list.get().stream().sorted().collect(Collectors.toList()));
+    }
+
+    @Test
+    void testMigration(
+            @TempDir java.nio.file.Path tmpOutputDir, @TempDir 
java.nio.file.Path tmpSavepointDir)
+            throws Exception {
+        String outputPath = tmpOutputDir.toString();
+        String savepointBasePath = tmpSavepointDir.toString();
+
+        final MiniClusterConfiguration cfg =
+                new MiniClusterConfiguration.Builder()
+                        .withRandomPorts()
+                        .setNumTaskManagers(1)
+                        .setNumSlotsPerTaskManager(4)
+                        .build();
+
+        JobGraph streamingFileSinkJobGraph = 
createStreamingFileSinkJobGraph(outputPath);
+        String savepointPath =
+                executeAndTakeSavepoint(cfg, streamingFileSinkJobGraph, 
savepointBasePath);
+
+        JobGraph fileSinkJobGraph = createFileSinkJobGraph(outputPath);
+        loadSavepointAndExecute(cfg, fileSinkJobGraph, savepointPath);
+
+        IntegerFileSinkTestDataUtils.checkIntegerSequenceSinkOutput(
+                outputPath, NUM_RECORDS, NUM_BUCKETS, NUM_SOURCES);
+    }
+
+    private JobGraph createStreamingFileSinkJobGraph(String outputPath) {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
+
+        StreamingFileSink<Integer> sink =
+                StreamingFileSink.forRowFormat(
+                                new Path(outputPath), new 
IntegerFileSinkTestDataUtils.IntEncoder())
+                        .withBucketAssigner(
+                                new 
IntegerFileSinkTestDataUtils.ModuloBucketAssigner(NUM_BUCKETS))
+                        .withRollingPolicy(OnCheckpointRollingPolicy.build())
+                        .build();
+
+        env.addSource(new StatefulSource(true, finalCheckpointLatch))
+                .uid(SOURCE_UID)
+                .setParallelism(NUM_SOURCES)
+                .addSink(sink)
+                .setParallelism(NUM_SINKS)
+                .uid(SINK_UID);
+        return env.getStreamGraph().getJobGraph();
+    }
+
+    private JobGraph createFileSinkJobGraph(String outputPath) {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
+
+        FileSink<Integer> sink =
+                FileSink.forRowFormat(
+                                new Path(outputPath), new 
IntegerFileSinkTestDataUtils.IntEncoder())
+                        .withBucketAssigner(
+                                new 
IntegerFileSinkTestDataUtils.ModuloBucketAssigner(NUM_BUCKETS))
+                        .withRollingPolicy(OnCheckpointRollingPolicy.build())
+                        .build();
+
+        env.addSource(new StatefulSource(false, finalCheckpointLatch))
+                .uid(SOURCE_UID)
+                .setParallelism(NUM_SOURCES)
+                .sinkTo(sink)
+                .setParallelism(NUM_SINKS)
+                .uid(SINK_UID);
+        return env.getStreamGraph().getJobGraph();
+    }
+
+    private String executeAndTakeSavepoint(
+            MiniClusterConfiguration cfg, JobGraph jobGraph, String 
savepointBasePath)
+            throws Exception {
+        try (MiniCluster miniCluster = new MiniCluster(cfg)) {
+            miniCluster.start();
+            CompletableFuture<JobSubmissionResult> jobSubmissionResultFuture =
+                    miniCluster.submitJob(jobGraph);
+            JobID jobId = jobSubmissionResultFuture.get().getJobID();
+
+            waitForAllTaskRunning(miniCluster, jobId, false);
+
+            CompletableFuture<String> savepointResultFuture =
+                    miniCluster.triggerSavepoint(
+                            jobId, savepointBasePath, true, 
SavepointFormatType.CANONICAL);
+            return savepointResultFuture.get();
+        }
+    }
+
+    private void loadSavepointAndExecute(
+            MiniClusterConfiguration cfg, JobGraph jobGraph, String 
savepointPath)
+            throws Exception {
+        jobGraph.setSavepointRestoreSettings(
+                SavepointRestoreSettings.forPath(savepointPath, false));
+
+        try (MiniCluster miniCluster = new MiniCluster(cfg)) {
+            miniCluster.start();
+            miniCluster.executeJobBlocking(jobGraph);
+        }
+    }
+
+    private static class StatefulSource extends 
RichParallelSourceFunction<Integer>
+            implements CheckpointedFunction, CheckpointListener {
+
+        private final boolean takingSavepointMode;
+
+        private SharedReference<CountDownLatch> finalCheckpointLatch;
+
+        private ListState<Integer> nextValueState;
+
+        private int nextValue;
+
+        private volatile boolean snapshottedAfterAllRecordsOutput;
+
+        private volatile boolean isWaitingCheckpointComplete;
+
+        private volatile boolean isCanceled;
+
+        public StatefulSource(
+                boolean takingSavepointMode, SharedReference<CountDownLatch> 
finalCheckpointLatch) {
+            this.takingSavepointMode = takingSavepointMode;
+            this.finalCheckpointLatch = finalCheckpointLatch;
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) 
throws Exception {
+            nextValueState =
+                    context.getOperatorStateStore()
+                            .getListState(new 
ListStateDescriptor<>("nextValue", Integer.class));
+
+            if (nextValueState.get() != null && 
nextValueState.get().iterator().hasNext()) {
+                nextValue = nextValueState.get().iterator().next();
+            }
+        }
+
+        @Override
+        public void run(SourceContext<Integer> ctx) throws Exception {
+            if (takingSavepointMode) {
+                sendRecordsUntil(NUM_RECORDS / 3, 0, ctx);
+                sendRecordsUntil(NUM_RECORDS / 2, 100, ctx);
+
+                while (true) {
+                    Thread.sleep(5000);
+                }
+            } else {
+                sendRecordsUntil(NUM_RECORDS, 0, ctx);
+
+                // Wait the last checkpoint to commit all the pending records.
+                isWaitingCheckpointComplete = true;
+                finalCheckpointLatch.get().await();
+            }
+        }
+
+        private void sendRecordsUntil(
+                int targetNumber, int sleepInMillis, SourceContext<Integer> 
ctx)
+                throws InterruptedException {
+            while (!isCanceled && nextValue < targetNumber) {
+                synchronized (ctx.getCheckpointLock()) {
+                    ctx.collect(nextValue++);
+                }
+
+                if (sleepInMillis > 0) {
+                    Thread.sleep(sleepInMillis);
+                }
+            }
+        }
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+            nextValueState.update(Collections.singletonList(nextValue));
+
+            if (isWaitingCheckpointComplete) {
+                snapshottedAfterAllRecordsOutput = true;
+            }
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+            if (isWaitingCheckpointComplete && 
snapshottedAfterAllRecordsOutput) {
+                finalCheckpointLatch.get().countDown();
+            }
+        }
+
+        @Override
+        public void cancel() {
+            isCanceled = true;
+        }
+    }
+}

Reply via email to