This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f5193466c5c9b92b52e3b4e81d0dffe27d351b34 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Tue Feb 9 19:57:30 2021 +0100 [FLINK-21344] Test legacy heap timers --- .../test/checkpointing/TimersSavepointITCase.java | 229 +++++++++++++++++++++ .../_metadata | Bin 0 -> 5391 bytes 2 files changed, 229 insertions(+) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimersSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimersSavepointITCase.java new file mode 100644 index 0000000..d1d9e83 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimersSavepointITCase.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBOptions; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.Collector; + +import org.apache.commons.io.FileUtils; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URL; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** Tests for restoring {@link PriorityQueueStateType#HEAP} timers stored in raw operator state. */ +public class TimersSavepointITCase { + private static final int PARALLELISM = 4; + + private static final OneShotLatch savepointLatch = new OneShotLatch(); + private static final OneShotLatch resultLatch = new OneShotLatch(); + + @ClassRule public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); + + // We use a single past Flink version as we verify heap timers stored in raw state + // Starting from 1.13 we do not store heap timers in raw state, but we keep them in + // managed state + public static final String SAVEPOINT_FILE_NAME = "legacy-raw-state-heap-timers-rocks-db-1.12"; + + /** + * This test runs in either of two modes: 1) we want to generate the binary savepoint, i.e. we + * have to run the checkpointing functions 2) we want to verify restoring, so we have to run the + * checking functions. + */ + public enum ExecutionMode { + PERFORM_SAVEPOINT, + VERIFY_SAVEPOINT + } + + // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints + // TODO Note: You should generate the savepoint based on the release branch instead of the + // master. + private final ExecutionMode executionMode = ExecutionMode.VERIFY_SAVEPOINT; + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .build()); + + @Test(timeout = 60_000) + public void testSavepointWithTimers() throws Exception { + try (ClusterClient<?> client = miniClusterResource.getClusterClient()) { + if (executionMode == ExecutionMode.PERFORM_SAVEPOINT) { + takeSavepoint("src/test/resources/" + SAVEPOINT_FILE_NAME, client); + } else if (executionMode == ExecutionMode.VERIFY_SAVEPOINT) { + verifySavepoint(getResourceFilename(SAVEPOINT_FILE_NAME), client); + } else { + throw new IllegalStateException("Unknown ExecutionMode " + executionMode); + } + } + } + + private void verifySavepoint(String savepointPath, ClusterClient<?> client) + throws IOException, InterruptedException, java.util.concurrent.ExecutionException { + JobGraph jobGraph; + + jobGraph = getJobGraph(PriorityQueueStateType.HEAP); + jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath)); + client.submitJob(jobGraph).get(); + resultLatch.await(); + } + + private void takeSavepoint(String savepointPath, ClusterClient<?> client) throws Exception { + JobGraph jobGraph = getJobGraph(PriorityQueueStateType.ROCKSDB); + client.submitJob(jobGraph).get(); + savepointLatch.await(); + CompletableFuture<String> savepointPathFuture = + client.triggerSavepoint(jobGraph.getJobID(), null); + + String jobmanagerSavepointPath = savepointPathFuture.get(2, TimeUnit.SECONDS); + + File jobManagerSavepoint = new File(new URI(jobmanagerSavepointPath).getPath()); + // savepoints were changed to be directories in Flink 1.3 + FileUtils.moveDirectory(jobManagerSavepoint, new File(savepointPath)); + } + + public JobGraph getJobGraph(PriorityQueueStateType priorityQueueStateType) throws IOException { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + env.addSource(new Source()) + .assignTimestampsAndWatermarks( + WatermarkStrategy.<Integer>forMonotonousTimestamps() + .withTimestampAssigner((i, p) -> i)) + .keyBy(i -> i) + .process(new TimersProcessFunction()) + .addSink(new DiscardingSink<>()); + + final Configuration config = new Configuration(); + config.set(CheckpointingOptions.STATE_BACKEND, "rocksdb"); + config.set( + CheckpointingOptions.CHECKPOINTS_DIRECTORY, + TMP_FOLDER.newFolder().toURI().toString()); + config.set( + CheckpointingOptions.SAVEPOINT_DIRECTORY, + TMP_FOLDER.newFolder().toURI().toString()); + config.set(RocksDBOptions.TIMER_SERVICE_FACTORY, priorityQueueStateType); + env.configure(config, this.getClass().getClassLoader()); + return env.getStreamGraph("Test", false).getJobGraph(); + } + + private static String getResourceFilename(String filename) { + ClassLoader cl = TimersSavepointITCase.class.getClassLoader(); + URL resource = cl.getResource(filename); + if (resource == null) { + throw new NullPointerException("Missing snapshot resource."); + } + return resource.getFile(); + } + + private static class Source implements SourceFunction<Integer>, CheckpointedFunction { + + private volatile boolean running = true; + private int emittedCount; + private ListState<Integer> state; + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + while (running) { + synchronized (ctx.getCheckpointLock()) { + if (emittedCount == 0) { + ctx.collect(0); + emittedCount = 1; + } else if (emittedCount == 1) { + ctx.collect(emittedCount); + } else { + ctx.collect(emittedCount++); + } + } + Thread.sleep(1); + } + } + + @Override + public void cancel() { + this.running = false; + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + state.add(emittedCount); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + state = + context.getOperatorStateStore() + .getListState( + new ListStateDescriptor<>( + "emittedCount", IntSerializer.INSTANCE)); + if (context.isRestored()) { + this.emittedCount = 2; + } + } + } + + private static class TimersProcessFunction + extends KeyedProcessFunction<Integer, Integer, Integer> { + + @Override + public void processElement(Integer value, Context ctx, Collector<Integer> out) + throws Exception { + if (value == 0) { + ctx.timerService().registerEventTimeTimer(2L); + savepointLatch.trigger(); + } + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) + throws Exception { + out.collect(1); + resultLatch.trigger(); + } + } +} diff --git a/flink-tests/src/test/resources/legacy-raw-state-heap-timers-rocks-db-1.12/_metadata b/flink-tests/src/test/resources/legacy-raw-state-heap-timers-rocks-db-1.12/_metadata new file mode 100644 index 0000000..007c518 Binary files /dev/null and b/flink-tests/src/test/resources/legacy-raw-state-heap-timers-rocks-db-1.12/_metadata differ