This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new b56041b  [FLINK-24160][tests] Use FS checkpoint storage in 
PartiallyFinishedSourcesITCase
b56041b is described below

commit b56041b9acc8fafa2ab1500e9162bed7bbcb333d
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Mon Sep 6 22:13:28 2021 +0200

    [FLINK-24160][tests] Use FS checkpoint storage in 
PartiallyFinishedSourcesITCase
    
    This closes #17165.
---
 .../operators/lifecycle/BoundedSourceITCase.java   |  5 ++++-
 .../lifecycle/PartiallyFinishedSourcesITCase.java  | 10 +++++++++-
 .../lifecycle/StopWithSavepointITCase.java         |  9 ++++++++-
 .../operators/lifecycle/graph/TestJobBuilders.java | 22 +++++++++++++---------
 4 files changed, 34 insertions(+), 12 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java
index 6234ba2..ffeb08d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java
@@ -64,7 +64,10 @@ public class BoundedSourceITCase extends AbstractTestBase {
                 graphBuilder.build(
                         sharedObjects,
                         cfg -> 
cfg.setBoolean(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true),
-                        env -> {});
+                        env ->
+                                env.getCheckpointConfig()
+                                        .setCheckpointStorage(
+                                                
TEMPORARY_FOLDER.newFolder().toURI()));
 
         TestJobExecutor.execute(testJob, miniClusterResource)
                 .waitForEvent(CheckpointCompletedEvent.class)
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/PartiallyFinishedSourcesITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/PartiallyFinishedSourcesITCase.java
index 3fdea20..ef4ac66 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/PartiallyFinishedSourcesITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/PartiallyFinishedSourcesITCase.java
@@ -32,8 +32,10 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
@@ -66,6 +68,8 @@ import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingO
 @RunWith(Parameterized.class)
 public class PartiallyFinishedSourcesITCase extends TestLogger {
 
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
     @Rule public final SharedObjects sharedObjects = SharedObjects.create();
 
     private MiniClusterWithClientResource miniClusterResource;
@@ -135,7 +139,7 @@ public class PartiallyFinishedSourcesITCase extends 
TestLogger {
         checkDataFlow(testJob);
     }
 
-    private TestJobWithDescription buildJob() {
+    private TestJobWithDescription buildJob() throws Exception {
         return graphBuilder.build(
                 sharedObjects,
                 cfg -> cfg.setBoolean(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true),
@@ -150,6 +154,10 @@ public class PartiallyFinishedSourcesITCase extends 
TestLogger {
                             
.setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
                     // explicitly set to one to ease avoiding race conditions
                     env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+                    env.getCheckpointConfig()
+                            // with unaligned checkpoints state size can grow 
beyond the default
+                            // limits of in-memory storage
+                            
.setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
                 });
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/StopWithSavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/StopWithSavepointITCase.java
index ea100b1..a787eed 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/StopWithSavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/StopWithSavepointITCase.java
@@ -108,7 +108,14 @@ public class StopWithSavepointITCase extends 
AbstractTestBase {
 
     @Test
     public void test() throws Exception {
-        TestJobWithDescription testJob = graphBuilder.build(sharedObjects, cfg 
-> {}, env -> {});
+        TestJobWithDescription testJob =
+                graphBuilder.build(
+                        sharedObjects,
+                        cfg -> {},
+                        env ->
+                                env.getCheckpointConfig()
+                                        .setCheckpointStorage(
+                                                
TEMPORARY_FOLDER.newFolder().toURI()));
 
         TestJobExecutor.execute(testJob, miniClusterResource)
                 .waitForEvent(WatermarkReceivedEvent.class)
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestJobBuilders.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestJobBuilders.java
index 4afeec7..7bb9216 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestJobBuilders.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestJobBuilders.java
@@ -36,11 +36,11 @@ import 
org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import 
org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
 import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.function.Consumer;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singleton;
@@ -56,8 +56,9 @@ public class TestJobBuilders {
     public interface TestingGraphBuilder {
         TestJobWithDescription build(
                 SharedObjects shared,
-                Consumer<Configuration> modifyConfig,
-                Consumer<StreamExecutionEnvironment> modifyEnvironment);
+                ThrowingConsumer<Configuration, Exception> modifyConfig,
+                ThrowingConsumer<StreamExecutionEnvironment, Exception> 
modifyEnvironment)
+                throws Exception;
     }
 
     private TestJobBuilders() {}
@@ -67,8 +68,9 @@ public class TestJobBuilders {
                 @Override
                 public TestJobWithDescription build(
                         SharedObjects shared,
-                        Consumer<Configuration> confConsumer,
-                        Consumer<StreamExecutionEnvironment> envConsumer) {
+                        ThrowingConsumer<Configuration, Exception> 
confConsumer,
+                        ThrowingConsumer<StreamExecutionEnvironment, 
Exception> envConsumer)
+                        throws Exception {
 
                     TestEventQueue eventQueue = 
TestEventQueue.createShared(shared);
                     TestCommandDispatcher commandQueue = 
TestCommandDispatcher.createShared(shared);
@@ -119,8 +121,9 @@ public class TestJobBuilders {
                 @Override
                 public TestJobWithDescription build(
                         SharedObjects shared,
-                        Consumer<Configuration> confConsumer,
-                        Consumer<StreamExecutionEnvironment> envConsumer) {
+                        ThrowingConsumer<Configuration, Exception> 
confConsumer,
+                        ThrowingConsumer<StreamExecutionEnvironment, 
Exception> envConsumer)
+                        throws Exception {
 
                     TestEventQueue eventQueue = 
TestEventQueue.createShared(shared);
                     TestCommandDispatcher commandQueue = 
TestCommandDispatcher.createShared(shared);
@@ -251,8 +254,9 @@ public class TestJobBuilders {
             };
 
     private static StreamExecutionEnvironment prepareEnv(
-            Consumer<Configuration> confConsumer,
-            Consumer<StreamExecutionEnvironment> envConsumer) {
+            ThrowingConsumer<Configuration, Exception> confConsumer,
+            ThrowingConsumer<StreamExecutionEnvironment, Exception> 
envConsumer)
+            throws Exception {
         Configuration configuration = new Configuration();
         configuration.set(EXECUTION_FAILOVER_STRATEGY, "full");
         confConsumer.accept(configuration);

Reply via email to