This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push: new b56041b [FLINK-24160][tests] Use FS checkpoint storage in PartiallyFinishedSourcesITCase b56041b is described below commit b56041b9acc8fafa2ab1500e9162bed7bbcb333d Author: Roman Khachatryan <khachatryan.ro...@gmail.com> AuthorDate: Mon Sep 6 22:13:28 2021 +0200 [FLINK-24160][tests] Use FS checkpoint storage in PartiallyFinishedSourcesITCase This closes #17165. --- .../operators/lifecycle/BoundedSourceITCase.java | 5 ++++- .../lifecycle/PartiallyFinishedSourcesITCase.java | 10 +++++++++- .../lifecycle/StopWithSavepointITCase.java | 9 ++++++++- .../operators/lifecycle/graph/TestJobBuilders.java | 22 +++++++++++++--------- 4 files changed, 34 insertions(+), 12 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java index 6234ba2..ffeb08d 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java @@ -64,7 +64,10 @@ public class BoundedSourceITCase extends AbstractTestBase { graphBuilder.build( sharedObjects, cfg -> cfg.setBoolean(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true), - env -> {}); + env -> + env.getCheckpointConfig() + .setCheckpointStorage( + TEMPORARY_FOLDER.newFolder().toURI())); TestJobExecutor.execute(testJob, miniClusterResource) .waitForEvent(CheckpointCompletedEvent.class) diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/PartiallyFinishedSourcesITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/PartiallyFinishedSourcesITCase.java index 3fdea20..ef4ac66 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/PartiallyFinishedSourcesITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/PartiallyFinishedSourcesITCase.java @@ -32,8 +32,10 @@ import org.apache.flink.util.TestLogger; import org.junit.After; import org.junit.Before; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; @@ -66,6 +68,8 @@ import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingO @RunWith(Parameterized.class) public class PartiallyFinishedSourcesITCase extends TestLogger { + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + @Rule public final SharedObjects sharedObjects = SharedObjects.create(); private MiniClusterWithClientResource miniClusterResource; @@ -135,7 +139,7 @@ public class PartiallyFinishedSourcesITCase extends TestLogger { checkDataFlow(testJob); } - private TestJobWithDescription buildJob() { + private TestJobWithDescription buildJob() throws Exception { return graphBuilder.build( sharedObjects, cfg -> cfg.setBoolean(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true), @@ -150,6 +154,10 @@ public class PartiallyFinishedSourcesITCase extends TestLogger { .setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); // explicitly set to one to ease avoiding race conditions env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + env.getCheckpointConfig() + // with unaligned checkpoints state size can grow beyond the default + // limits of in-memory storage + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); }); } diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/StopWithSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/StopWithSavepointITCase.java index ea100b1..a787eed 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/StopWithSavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/StopWithSavepointITCase.java @@ -108,7 +108,14 @@ public class StopWithSavepointITCase extends AbstractTestBase { @Test public void test() throws Exception { - TestJobWithDescription testJob = graphBuilder.build(sharedObjects, cfg -> {}, env -> {}); + TestJobWithDescription testJob = + graphBuilder.build( + sharedObjects, + cfg -> {}, + env -> + env.getCheckpointConfig() + .setCheckpointStorage( + TEMPORARY_FOLDER.newFolder().toURI())); TestJobExecutor.execute(testJob, miniClusterResource) .waitForEvent(WatermarkReceivedEvent.class) diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestJobBuilders.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestJobBuilders.java index 4afeec7..7bb9216 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestJobBuilders.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestJobBuilders.java @@ -36,11 +36,11 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.transformations.MultipleInputTransformation; import org.apache.flink.testutils.junit.SharedObjects; +import org.apache.flink.util.function.ThrowingConsumer; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.function.Consumer; import static java.util.Arrays.asList; import static java.util.Collections.singleton; @@ -56,8 +56,9 @@ public class TestJobBuilders { public interface TestingGraphBuilder { TestJobWithDescription build( SharedObjects shared, - Consumer<Configuration> modifyConfig, - Consumer<StreamExecutionEnvironment> modifyEnvironment); + ThrowingConsumer<Configuration, Exception> modifyConfig, + ThrowingConsumer<StreamExecutionEnvironment, Exception> modifyEnvironment) + throws Exception; } private TestJobBuilders() {} @@ -67,8 +68,9 @@ public class TestJobBuilders { @Override public TestJobWithDescription build( SharedObjects shared, - Consumer<Configuration> confConsumer, - Consumer<StreamExecutionEnvironment> envConsumer) { + ThrowingConsumer<Configuration, Exception> confConsumer, + ThrowingConsumer<StreamExecutionEnvironment, Exception> envConsumer) + throws Exception { TestEventQueue eventQueue = TestEventQueue.createShared(shared); TestCommandDispatcher commandQueue = TestCommandDispatcher.createShared(shared); @@ -119,8 +121,9 @@ public class TestJobBuilders { @Override public TestJobWithDescription build( SharedObjects shared, - Consumer<Configuration> confConsumer, - Consumer<StreamExecutionEnvironment> envConsumer) { + ThrowingConsumer<Configuration, Exception> confConsumer, + ThrowingConsumer<StreamExecutionEnvironment, Exception> envConsumer) + throws Exception { TestEventQueue eventQueue = TestEventQueue.createShared(shared); TestCommandDispatcher commandQueue = TestCommandDispatcher.createShared(shared); @@ -251,8 +254,9 @@ public class TestJobBuilders { }; private static StreamExecutionEnvironment prepareEnv( - Consumer<Configuration> confConsumer, - Consumer<StreamExecutionEnvironment> envConsumer) { + ThrowingConsumer<Configuration, Exception> confConsumer, + ThrowingConsumer<StreamExecutionEnvironment, Exception> envConsumer) + throws Exception { Configuration configuration = new Configuration(); configuration.set(EXECUTION_FAILOVER_STRATEGY, "full"); confConsumer.accept(configuration);