This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 10f9f620e5980a5345f5f2fe0e29bf76c7ba73a8 Author: Till Rohrmann <[email protected]> AuthorDate: Mon Jan 4 14:33:30 2021 +0100 [hotfix][tests] Replace explicit ExecutionGraphBuilder.buildGraph calls with TestingExecutionGraphBuilder --- .../CheckpointSettingsSerializableTest.java | 33 ++------ .../ExecutionGraphDeploymentTest.java | 35 ++------- .../ExecutionGraphRescalingTest.java | 88 +--------------------- .../ExecutionVertexLocalityTest.java | 30 +------- 4 files changed, 15 insertions(+), 171 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java index 7367299..54ce529 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java @@ -19,25 +19,18 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; -import org.apache.flink.runtime.blob.VoidBlobWriter; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; -import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; +import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; -import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStorageAccess; import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; @@ -47,7 +40,6 @@ import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; -import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.testutils.ClassLoaderUtils; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; @@ -108,26 +100,11 @@ public class CheckpointSettingsSerializableTest extends TestLogger { // distributed execution final JobGraph copy = CommonTestUtils.createCopySerializable(jobGraph); - final Time timeout = Time.seconds(10L); final ExecutionGraph eg = - ExecutionGraphBuilder.buildGraph( - null, - copy, - new Configuration(), - TestingUtils.defaultExecutor(), - TestingUtils.defaultExecutor(), - mock(SlotProvider.class), - classLoader, - new StandaloneCheckpointRecoveryFactory(), - new StandaloneCheckpointIDCounter(), - timeout, - new UnregisteredMetricsGroup(), - VoidBlobWriter.getInstance(), - timeout, - log, - NettyShuffleMaster.INSTANCE, - NoOpJobMasterPartitionTracker.INSTANCE, - System.currentTimeMillis()); + TestingExecutionGraphBuilder.newBuilder() + .setJobGraph(copy) + .setUserClassLoader(classLoader) + .build(); assertEquals(1, eg.getCheckpointCoordinator().getNumberOfRegisteredMasterHooks()); assertTrue( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index d83305c..3ff9e5b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -22,17 +22,13 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.IntCounter; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.PermanentBlobService; import org.apache.flink.runtime.blob.VoidBlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; -import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; -import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -40,7 +36,6 @@ import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -61,13 +56,11 @@ import org.apache.flink.runtime.scheduler.SchedulerNG; import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; import org.apache.flink.runtime.scheduler.TestingPhysicalSlot; import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider; -import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.util.FlinkException; @@ -77,7 +70,6 @@ import org.apache.flink.util.function.FunctionUtils; import org.hamcrest.Description; import org.hamcrest.TypeSafeMatcher; import org.junit.Test; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; @@ -89,7 +81,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; import static junit.framework.TestCase.assertTrue; @@ -721,8 +712,6 @@ public class ExecutionGraphDeploymentTest extends TestLogger { } private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception { - final ScheduledExecutorService executor = TestingUtils.defaultExecutor(); - final JobID jobId = new JobID(); final JobGraph jobGraph = new JobGraph(jobId, "test"); jobGraph.setSnapshotSettings( @@ -742,25 +731,11 @@ public class ExecutionGraphDeploymentTest extends TestLogger { 0), null)); - final Time timeout = Time.seconds(10L); - return ExecutionGraphBuilder.buildGraph( - null, - jobGraph, - configuration, - executor, - executor, - new ProgrammedSlotProvider(1), - getClass().getClassLoader(), - new StandaloneCheckpointRecoveryFactory(), - new StandaloneCheckpointIDCounter(), - timeout, - new UnregisteredMetricsGroup(), - blobWriter, - timeout, - LoggerFactory.getLogger(getClass()), - NettyShuffleMaster.INSTANCE, - NoOpJobMasterPartitionTracker.INSTANCE, - System.currentTimeMillis()); + return TestingExecutionGraphBuilder.newBuilder() + .setJobGraph(jobGraph) + .setJobMasterConfig(configuration) + .setBlobWriter(blobWriter) + .build(); } private static final class ExecutionStageMatcher diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java index af704db..7c2aa2e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java @@ -19,20 +19,12 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.JobException; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.blob.VoidBlobWriter; -import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; -import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; -import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.shuffle.NettyShuffleMaster; -import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -41,7 +33,6 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.Collections; -import java.util.concurrent.CompletableFuture; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; @@ -67,25 +58,7 @@ public class ExecutionGraphRescalingTest extends TestLogger { createVerticesForSimpleBipartiteJobGraph(initialParallelism, maxParallelism); final JobGraph jobGraph = new JobGraph(jobVertices); - ExecutionGraph eg = - ExecutionGraphBuilder.buildGraph( - null, - jobGraph, - config, - TestingUtils.defaultExecutor(), - TestingUtils.defaultExecutor(), - new TestingSlotProvider(ignore -> new CompletableFuture<>()), - Thread.currentThread().getContextClassLoader(), - new StandaloneCheckpointRecoveryFactory(), - new StandaloneCheckpointIDCounter(), - AkkaUtils.getDefaultTimeout(), - new UnregisteredMetricsGroup(), - VoidBlobWriter.getInstance(), - AkkaUtils.getDefaultTimeout(), - TEST_LOGGER, - NettyShuffleMaster.INSTANCE, - NoOpJobMasterPartitionTracker.INSTANCE, - System.currentTimeMillis()); + ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build(); for (JobVertex jv : jobVertices) { assertThat(jv.getParallelism(), is(initialParallelism)); @@ -100,25 +73,7 @@ public class ExecutionGraphRescalingTest extends TestLogger { jv.setParallelism(scaleDownParallelism); } - eg = - ExecutionGraphBuilder.buildGraph( - null, - jobGraph, - config, - TestingUtils.defaultExecutor(), - TestingUtils.defaultExecutor(), - new TestingSlotProvider(ignore -> new CompletableFuture<>()), - Thread.currentThread().getContextClassLoader(), - new StandaloneCheckpointRecoveryFactory(), - new StandaloneCheckpointIDCounter(), - AkkaUtils.getDefaultTimeout(), - new UnregisteredMetricsGroup(), - VoidBlobWriter.getInstance(), - AkkaUtils.getDefaultTimeout(), - TEST_LOGGER, - NettyShuffleMaster.INSTANCE, - NoOpJobMasterPartitionTracker.INSTANCE, - System.currentTimeMillis()); + eg = TestingExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build(); for (JobVertex jv : jobVertices) { assertThat(jv.getParallelism(), is(1)); @@ -133,25 +88,7 @@ public class ExecutionGraphRescalingTest extends TestLogger { jv.setParallelism(scaleUpParallelism); } - eg = - ExecutionGraphBuilder.buildGraph( - null, - jobGraph, - config, - TestingUtils.defaultExecutor(), - TestingUtils.defaultExecutor(), - new TestingSlotProvider(ignore -> new CompletableFuture<>()), - Thread.currentThread().getContextClassLoader(), - new StandaloneCheckpointRecoveryFactory(), - new StandaloneCheckpointIDCounter(), - AkkaUtils.getDefaultTimeout(), - new UnregisteredMetricsGroup(), - VoidBlobWriter.getInstance(), - AkkaUtils.getDefaultTimeout(), - TEST_LOGGER, - NettyShuffleMaster.INSTANCE, - NoOpJobMasterPartitionTracker.INSTANCE, - System.currentTimeMillis()); + eg = TestingExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build(); for (JobVertex jv : jobVertices) { assertThat(jv.getParallelism(), is(scaleUpParallelism)); @@ -181,24 +118,7 @@ public class ExecutionGraphRescalingTest extends TestLogger { try { // this should fail since we set the parallelism to maxParallelism + 1 - ExecutionGraphBuilder.buildGraph( - null, - jobGraph, - config, - TestingUtils.defaultExecutor(), - TestingUtils.defaultExecutor(), - new TestingSlotProvider(ignore -> new CompletableFuture<>()), - Thread.currentThread().getContextClassLoader(), - new StandaloneCheckpointRecoveryFactory(), - new StandaloneCheckpointIDCounter(), - AkkaUtils.getDefaultTimeout(), - new UnregisteredMetricsGroup(), - VoidBlobWriter.getInstance(), - AkkaUtils.getDefaultTimeout(), - TEST_LOGGER, - NettyShuffleMaster.INSTANCE, - NoOpJobMasterPartitionTracker.INSTANCE, - System.currentTimeMillis()); + TestingExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build(); fail( "Building the ExecutionGraph with a parallelism higher than the max parallelism should fail."); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java index 2b1700f..0d9bf77 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java @@ -19,18 +19,11 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; -import org.apache.flink.runtime.blob.VoidBlobWriter; import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; -import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; -import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.SimpleSlotContext; -import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -43,10 +36,7 @@ import org.apache.flink.runtime.jobmaster.SlotContext; import org.apache.flink.runtime.jobmaster.SlotOwner; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot; -import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; -import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; @@ -228,25 +218,7 @@ public class ExecutionVertexLocalityTest extends TestLogger { JobGraph testJob = new JobGraph(jobId, "test job", source, target); - final Time timeout = Time.seconds(10L); - return ExecutionGraphBuilder.buildGraph( - null, - testJob, - new Configuration(), - TestingUtils.defaultExecutor(), - TestingUtils.defaultExecutor(), - mock(SlotProvider.class), - getClass().getClassLoader(), - new StandaloneCheckpointRecoveryFactory(), - new StandaloneCheckpointIDCounter(), - timeout, - new UnregisteredMetricsGroup(), - VoidBlobWriter.getInstance(), - timeout, - log, - NettyShuffleMaster.INSTANCE, - NoOpJobMasterPartitionTracker.INSTANCE, - System.currentTimeMillis()); + return TestingExecutionGraphBuilder.newBuilder().setJobGraph(testJob).build(); } private void initializeLocation(ExecutionVertex vertex, TaskManagerLocation location)
