This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch revert-21371-30166 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 93c7505d3133848686c3704d7bfc2f59af83e92e Author: Matthias Pohl <matthias.p...@aiven.io> AuthorDate: Mon Jan 9 13:26:23 2023 +0100 Revert "[FLINK-30166][Tests] Remove no longer necessary test" This reverts commit 21c44c0863b52090a49da46ed228d3de18799506. --- .../f5e3e868-8d92-4258-9654-a605dc9c550f | 6 + .../file/sink/writer/FileSinkMigrationITCase.java | 291 +++++++++++++++++++++ 2 files changed, 297 insertions(+) diff --git a/flink-connectors/flink-connector-files/archunit-violations/f5e3e868-8d92-4258-9654-a605dc9c550f b/flink-connectors/flink-connector-files/archunit-violations/f5e3e868-8d92-4258-9654-a605dc9c550f index a8d59151016..ff88812bba2 100644 --- a/flink-connectors/flink-connector-files/archunit-violations/f5e3e868-8d92-4258-9654-a605dc9c550f +++ b/flink-connectors/flink-connector-files/archunit-violations/f5e3e868-8d92-4258-9654-a605dc9c550f @@ -10,3 +10,9 @@ org.apache.flink.connector.file.sink.StreamingExecutionFileSinkITCase does not s * reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\ * reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule +org.apache.flink.connector.file.sink.writer.FileSinkMigrationITCase does not satisfy: only one of the following predicates match:\ +* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\ +* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\ +* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\ +* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\ + or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule \ No newline at end of file diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java new file mode 100644 index 00000000000..90dde5462c4 --- /dev/null +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.writer; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.testutils.junit.SharedObjectsExtension; +import org.apache.flink.testutils.junit.SharedReference; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests migrating from {@link StreamingFileSink} to {@link FileSink}. It trigger a savepoint for + * the {@link StreamingFileSink} job and restore the {@link FileSink} job from the savepoint taken. + */ +class FileSinkMigrationITCase { + + @RegisterExtension + private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create(); + + private static final String SOURCE_UID = "source"; + + private static final String SINK_UID = "sink"; + + private static final int NUM_SOURCES = 4; + + private static final int NUM_SINKS = 3; + + private static final int NUM_RECORDS = 10000; + + private static final int NUM_BUCKETS = 4; + + private SharedReference<CountDownLatch> finalCheckpointLatch; + + @BeforeEach + void setup() { + // We wait for two successful checkpoints in sources before shutting down. This ensures that + // the sink can commit its data. + // We need to keep a "static" latch here because all sources need to be kept running + // while we're waiting for the required number of checkpoints. Otherwise, we would lock up + // because we can only do checkpoints while all operators are running. + finalCheckpointLatch = sharedObjects.add(new CountDownLatch(NUM_SOURCES * 2)); + } + + @Test + void test() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + SharedReference<Collection<Long>> list = sharedObjects.add(new ArrayList<>()); + int n = 10000; + env.setParallelism(100); + env.fromSequence(0, n).map(i -> list.applySync(l -> l.add(i))); + env.execute(); + assertThat(list.get()).hasSize(n + 1); + assertThat(LongStream.rangeClosed(0, n).boxed().collect(Collectors.toList())) + .isEqualTo(list.get().stream().sorted().collect(Collectors.toList())); + } + + @Test + void testMigration( + @TempDir java.nio.file.Path tmpOutputDir, @TempDir java.nio.file.Path tmpSavepointDir) + throws Exception { + String outputPath = tmpOutputDir.toString(); + String savepointBasePath = tmpSavepointDir.toString(); + + final MiniClusterConfiguration cfg = + new MiniClusterConfiguration.Builder() + .withRandomPorts() + .setNumTaskManagers(1) + .setNumSlotsPerTaskManager(4) + .build(); + + JobGraph streamingFileSinkJobGraph = createStreamingFileSinkJobGraph(outputPath); + String savepointPath = + executeAndTakeSavepoint(cfg, streamingFileSinkJobGraph, savepointBasePath); + + JobGraph fileSinkJobGraph = createFileSinkJobGraph(outputPath); + loadSavepointAndExecute(cfg, fileSinkJobGraph, savepointPath); + + IntegerFileSinkTestDataUtils.checkIntegerSequenceSinkOutput( + outputPath, NUM_RECORDS, NUM_BUCKETS, NUM_SOURCES); + } + + private JobGraph createStreamingFileSinkJobGraph(String outputPath) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE); + + StreamingFileSink<Integer> sink = + StreamingFileSink.forRowFormat( + new Path(outputPath), new IntegerFileSinkTestDataUtils.IntEncoder()) + .withBucketAssigner( + new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(NUM_BUCKETS)) + .withRollingPolicy(OnCheckpointRollingPolicy.build()) + .build(); + + env.addSource(new StatefulSource(true, finalCheckpointLatch)) + .uid(SOURCE_UID) + .setParallelism(NUM_SOURCES) + .addSink(sink) + .setParallelism(NUM_SINKS) + .uid(SINK_UID); + return env.getStreamGraph().getJobGraph(); + } + + private JobGraph createFileSinkJobGraph(String outputPath) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE); + + FileSink<Integer> sink = + FileSink.forRowFormat( + new Path(outputPath), new IntegerFileSinkTestDataUtils.IntEncoder()) + .withBucketAssigner( + new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(NUM_BUCKETS)) + .withRollingPolicy(OnCheckpointRollingPolicy.build()) + .build(); + + env.addSource(new StatefulSource(false, finalCheckpointLatch)) + .uid(SOURCE_UID) + .setParallelism(NUM_SOURCES) + .sinkTo(sink) + .setParallelism(NUM_SINKS) + .uid(SINK_UID); + return env.getStreamGraph().getJobGraph(); + } + + private String executeAndTakeSavepoint( + MiniClusterConfiguration cfg, JobGraph jobGraph, String savepointBasePath) + throws Exception { + try (MiniCluster miniCluster = new MiniCluster(cfg)) { + miniCluster.start(); + CompletableFuture<JobSubmissionResult> jobSubmissionResultFuture = + miniCluster.submitJob(jobGraph); + JobID jobId = jobSubmissionResultFuture.get().getJobID(); + + waitForAllTaskRunning(miniCluster, jobId, false); + + CompletableFuture<String> savepointResultFuture = + miniCluster.triggerSavepoint( + jobId, savepointBasePath, true, SavepointFormatType.CANONICAL); + return savepointResultFuture.get(); + } + } + + private void loadSavepointAndExecute( + MiniClusterConfiguration cfg, JobGraph jobGraph, String savepointPath) + throws Exception { + jobGraph.setSavepointRestoreSettings( + SavepointRestoreSettings.forPath(savepointPath, false)); + + try (MiniCluster miniCluster = new MiniCluster(cfg)) { + miniCluster.start(); + miniCluster.executeJobBlocking(jobGraph); + } + } + + private static class StatefulSource extends RichParallelSourceFunction<Integer> + implements CheckpointedFunction, CheckpointListener { + + private final boolean takingSavepointMode; + + private SharedReference<CountDownLatch> finalCheckpointLatch; + + private ListState<Integer> nextValueState; + + private int nextValue; + + private volatile boolean snapshottedAfterAllRecordsOutput; + + private volatile boolean isWaitingCheckpointComplete; + + private volatile boolean isCanceled; + + public StatefulSource( + boolean takingSavepointMode, SharedReference<CountDownLatch> finalCheckpointLatch) { + this.takingSavepointMode = takingSavepointMode; + this.finalCheckpointLatch = finalCheckpointLatch; + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + nextValueState = + context.getOperatorStateStore() + .getListState(new ListStateDescriptor<>("nextValue", Integer.class)); + + if (nextValueState.get() != null && nextValueState.get().iterator().hasNext()) { + nextValue = nextValueState.get().iterator().next(); + } + } + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + if (takingSavepointMode) { + sendRecordsUntil(NUM_RECORDS / 3, 0, ctx); + sendRecordsUntil(NUM_RECORDS / 2, 100, ctx); + + while (true) { + Thread.sleep(5000); + } + } else { + sendRecordsUntil(NUM_RECORDS, 0, ctx); + + // Wait the last checkpoint to commit all the pending records. + isWaitingCheckpointComplete = true; + finalCheckpointLatch.get().await(); + } + } + + private void sendRecordsUntil( + int targetNumber, int sleepInMillis, SourceContext<Integer> ctx) + throws InterruptedException { + while (!isCanceled && nextValue < targetNumber) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(nextValue++); + } + + if (sleepInMillis > 0) { + Thread.sleep(sleepInMillis); + } + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + nextValueState.update(Collections.singletonList(nextValue)); + + if (isWaitingCheckpointComplete) { + snapshottedAfterAllRecordsOutput = true; + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + if (isWaitingCheckpointComplete && snapshottedAfterAllRecordsOutput) { + finalCheckpointLatch.get().countDown(); + } + } + + @Override + public void cancel() { + isCanceled = true; + } + } +}