This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push: new 537b871 [FLINK-26106][runtime] Used 'filesystem' for state change log storage in BoundedSourceITCase 537b871 is described below commit 537b871962b806fc1e40a5c987e046e482a509c5 Author: Anton Kalashnikov <kaa....@yandex.ru> AuthorDate: Mon Mar 14 18:23:25 2022 +0100 [FLINK-26106][runtime] Used 'filesystem' for state change log storage in BoundedSourceITCase --- .../operators/lifecycle/BoundedSourceITCase.java | 37 ++++++++++++++++++++-- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java index d03e9cb..6933fe6 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/BoundedSourceITCase.java @@ -17,19 +17,27 @@ package org.apache.flink.runtime.operators.lifecycle; +import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.lifecycle.event.CheckpointCompletedEvent; import org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders.TestingGraphBuilder; import org.apache.flink.runtime.operators.lifecycle.validation.DrainingValidator; import org.apache.flink.runtime.operators.lifecycle.validation.FinishingValidator; -import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.testutils.junit.SharedObjects; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; +import java.io.IOException; + import static org.apache.flink.runtime.operators.lifecycle.command.TestCommand.FINISH_SOURCES; import static org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher.TestCommandScope.ALL_SUBTASKS; import static org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders.COMPLEX_GRAPH_BUILDER; @@ -46,10 +54,33 @@ import static org.apache.flink.runtime.operators.lifecycle.validation.TestOperat * same. */ @RunWith(Parameterized.class) -public class BoundedSourceITCase extends AbstractTestBase { +public class BoundedSourceITCase extends TestBaseUtils { + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(configuration()) + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(4) + .build()); @Rule public final SharedObjects sharedObjects = SharedObjects.create(); + private static Configuration configuration() { + Configuration conf = new Configuration(); + + try { + FsStateChangelogStorageFactory.configure(conf, TEMPORARY_FOLDER.newFolder()); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return conf; + } + @Parameter public TestingGraphBuilder graphBuilder; @Parameterized.Parameters(name = "{0}") @@ -68,7 +99,7 @@ public class BoundedSourceITCase extends AbstractTestBase { .setCheckpointStorage( TEMPORARY_FOLDER.newFolder().toURI())); - TestJobExecutor.execute(testJob, MINI_CLUSTER_RESOURCE) + TestJobExecutor.execute(testJob, miniClusterResource) .waitForEvent(CheckpointCompletedEvent.class) .sendBroadcastCommand(FINISH_SOURCES, ALL_SUBTASKS) .waitForTermination()