This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 10f9f620e5980a5345f5f2fe0e29bf76c7ba73a8
Author: Till Rohrmann <[email protected]>
AuthorDate: Mon Jan 4 14:33:30 2021 +0100

    [hotfix][tests] Replace explicit ExecutionGraphBuilder.buildGraph calls 
with TestingExecutionGraphBuilder
---
 .../CheckpointSettingsSerializableTest.java        | 33 ++------
 .../ExecutionGraphDeploymentTest.java              | 35 ++-------
 .../ExecutionGraphRescalingTest.java               | 88 +---------------------
 .../ExecutionVertexLocalityTest.java               | 30 +-------
 4 files changed, 15 insertions(+), 171 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index 7367299..54ce529 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -19,25 +19,18 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
-import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStorageAccess;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
@@ -47,7 +40,6 @@ import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.testutils.ClassLoaderUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
@@ -108,26 +100,11 @@ public class CheckpointSettingsSerializableTest extends 
TestLogger {
         // distributed execution
         final JobGraph copy = CommonTestUtils.createCopySerializable(jobGraph);
 
-        final Time timeout = Time.seconds(10L);
         final ExecutionGraph eg =
-                ExecutionGraphBuilder.buildGraph(
-                        null,
-                        copy,
-                        new Configuration(),
-                        TestingUtils.defaultExecutor(),
-                        TestingUtils.defaultExecutor(),
-                        mock(SlotProvider.class),
-                        classLoader,
-                        new StandaloneCheckpointRecoveryFactory(),
-                        new StandaloneCheckpointIDCounter(),
-                        timeout,
-                        new UnregisteredMetricsGroup(),
-                        VoidBlobWriter.getInstance(),
-                        timeout,
-                        log,
-                        NettyShuffleMaster.INSTANCE,
-                        NoOpJobMasterPartitionTracker.INSTANCE,
-                        System.currentTimeMillis());
+                TestingExecutionGraphBuilder.newBuilder()
+                        .setJobGraph(copy)
+                        .setUserClassLoader(classLoader)
+                        .build();
 
         assertEquals(1, 
eg.getCheckpointCoordinator().getNumberOfRegisteredMasterHooks());
         assertTrue(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index d83305c..3ff9e5b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -22,17 +22,13 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.PermanentBlobService;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -40,7 +36,6 @@ import 
org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -61,13 +56,11 @@ import org.apache.flink.runtime.scheduler.SchedulerNG;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
-import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.FlinkException;
@@ -77,7 +70,6 @@ import org.apache.flink.util.function.FunctionUtils;
 import org.hamcrest.Description;
 import org.hamcrest.TypeSafeMatcher;
 import org.junit.Test;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -89,7 +81,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Function;
 
 import static junit.framework.TestCase.assertTrue;
@@ -721,8 +712,6 @@ public class ExecutionGraphDeploymentTest extends 
TestLogger {
     }
 
     private ExecutionGraph createExecutionGraph(Configuration configuration) 
throws Exception {
-        final ScheduledExecutorService executor = 
TestingUtils.defaultExecutor();
-
         final JobID jobId = new JobID();
         final JobGraph jobGraph = new JobGraph(jobId, "test");
         jobGraph.setSnapshotSettings(
@@ -742,25 +731,11 @@ public class ExecutionGraphDeploymentTest extends 
TestLogger {
                                 0),
                         null));
 
-        final Time timeout = Time.seconds(10L);
-        return ExecutionGraphBuilder.buildGraph(
-                null,
-                jobGraph,
-                configuration,
-                executor,
-                executor,
-                new ProgrammedSlotProvider(1),
-                getClass().getClassLoader(),
-                new StandaloneCheckpointRecoveryFactory(),
-                new StandaloneCheckpointIDCounter(),
-                timeout,
-                new UnregisteredMetricsGroup(),
-                blobWriter,
-                timeout,
-                LoggerFactory.getLogger(getClass()),
-                NettyShuffleMaster.INSTANCE,
-                NoOpJobMasterPartitionTracker.INSTANCE,
-                System.currentTimeMillis());
+        return TestingExecutionGraphBuilder.newBuilder()
+                .setJobGraph(jobGraph)
+                .setJobMasterConfig(configuration)
+                .setBlobWriter(blobWriter)
+                .build();
     }
 
     private static final class ExecutionStageMatcher
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
index af704db..7c2aa2e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
@@ -19,20 +19,12 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.blob.VoidBlobWriter;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
-import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -41,7 +33,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
@@ -67,25 +58,7 @@ public class ExecutionGraphRescalingTest extends TestLogger {
                 createVerticesForSimpleBipartiteJobGraph(initialParallelism, 
maxParallelism);
         final JobGraph jobGraph = new JobGraph(jobVertices);
 
-        ExecutionGraph eg =
-                ExecutionGraphBuilder.buildGraph(
-                        null,
-                        jobGraph,
-                        config,
-                        TestingUtils.defaultExecutor(),
-                        TestingUtils.defaultExecutor(),
-                        new TestingSlotProvider(ignore -> new 
CompletableFuture<>()),
-                        Thread.currentThread().getContextClassLoader(),
-                        new StandaloneCheckpointRecoveryFactory(),
-                        new StandaloneCheckpointIDCounter(),
-                        AkkaUtils.getDefaultTimeout(),
-                        new UnregisteredMetricsGroup(),
-                        VoidBlobWriter.getInstance(),
-                        AkkaUtils.getDefaultTimeout(),
-                        TEST_LOGGER,
-                        NettyShuffleMaster.INSTANCE,
-                        NoOpJobMasterPartitionTracker.INSTANCE,
-                        System.currentTimeMillis());
+        ExecutionGraph eg = 
TestingExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build();
 
         for (JobVertex jv : jobVertices) {
             assertThat(jv.getParallelism(), is(initialParallelism));
@@ -100,25 +73,7 @@ public class ExecutionGraphRescalingTest extends TestLogger 
{
             jv.setParallelism(scaleDownParallelism);
         }
 
-        eg =
-                ExecutionGraphBuilder.buildGraph(
-                        null,
-                        jobGraph,
-                        config,
-                        TestingUtils.defaultExecutor(),
-                        TestingUtils.defaultExecutor(),
-                        new TestingSlotProvider(ignore -> new 
CompletableFuture<>()),
-                        Thread.currentThread().getContextClassLoader(),
-                        new StandaloneCheckpointRecoveryFactory(),
-                        new StandaloneCheckpointIDCounter(),
-                        AkkaUtils.getDefaultTimeout(),
-                        new UnregisteredMetricsGroup(),
-                        VoidBlobWriter.getInstance(),
-                        AkkaUtils.getDefaultTimeout(),
-                        TEST_LOGGER,
-                        NettyShuffleMaster.INSTANCE,
-                        NoOpJobMasterPartitionTracker.INSTANCE,
-                        System.currentTimeMillis());
+        eg = 
TestingExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build();
 
         for (JobVertex jv : jobVertices) {
             assertThat(jv.getParallelism(), is(1));
@@ -133,25 +88,7 @@ public class ExecutionGraphRescalingTest extends TestLogger 
{
             jv.setParallelism(scaleUpParallelism);
         }
 
-        eg =
-                ExecutionGraphBuilder.buildGraph(
-                        null,
-                        jobGraph,
-                        config,
-                        TestingUtils.defaultExecutor(),
-                        TestingUtils.defaultExecutor(),
-                        new TestingSlotProvider(ignore -> new 
CompletableFuture<>()),
-                        Thread.currentThread().getContextClassLoader(),
-                        new StandaloneCheckpointRecoveryFactory(),
-                        new StandaloneCheckpointIDCounter(),
-                        AkkaUtils.getDefaultTimeout(),
-                        new UnregisteredMetricsGroup(),
-                        VoidBlobWriter.getInstance(),
-                        AkkaUtils.getDefaultTimeout(),
-                        TEST_LOGGER,
-                        NettyShuffleMaster.INSTANCE,
-                        NoOpJobMasterPartitionTracker.INSTANCE,
-                        System.currentTimeMillis());
+        eg = 
TestingExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build();
 
         for (JobVertex jv : jobVertices) {
             assertThat(jv.getParallelism(), is(scaleUpParallelism));
@@ -181,24 +118,7 @@ public class ExecutionGraphRescalingTest extends 
TestLogger {
 
         try {
             // this should fail since we set the parallelism to maxParallelism 
+ 1
-            ExecutionGraphBuilder.buildGraph(
-                    null,
-                    jobGraph,
-                    config,
-                    TestingUtils.defaultExecutor(),
-                    TestingUtils.defaultExecutor(),
-                    new TestingSlotProvider(ignore -> new 
CompletableFuture<>()),
-                    Thread.currentThread().getContextClassLoader(),
-                    new StandaloneCheckpointRecoveryFactory(),
-                    new StandaloneCheckpointIDCounter(),
-                    AkkaUtils.getDefaultTimeout(),
-                    new UnregisteredMetricsGroup(),
-                    VoidBlobWriter.getInstance(),
-                    AkkaUtils.getDefaultTimeout(),
-                    TEST_LOGGER,
-                    NettyShuffleMaster.INSTANCE,
-                    NoOpJobMasterPartitionTracker.INSTANCE,
-                    System.currentTimeMillis());
+            
TestingExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build();
 
             fail(
                     "Building the ExecutionGraph with a parallelism higher 
than the max parallelism should fail.");
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index 2b1700f..0d9bf77 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -19,18 +19,11 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SimpleSlotContext;
-import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -43,10 +36,7 @@ import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
-import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -228,25 +218,7 @@ public class ExecutionVertexLocalityTest extends 
TestLogger {
 
         JobGraph testJob = new JobGraph(jobId, "test job", source, target);
 
-        final Time timeout = Time.seconds(10L);
-        return ExecutionGraphBuilder.buildGraph(
-                null,
-                testJob,
-                new Configuration(),
-                TestingUtils.defaultExecutor(),
-                TestingUtils.defaultExecutor(),
-                mock(SlotProvider.class),
-                getClass().getClassLoader(),
-                new StandaloneCheckpointRecoveryFactory(),
-                new StandaloneCheckpointIDCounter(),
-                timeout,
-                new UnregisteredMetricsGroup(),
-                VoidBlobWriter.getInstance(),
-                timeout,
-                log,
-                NettyShuffleMaster.INSTANCE,
-                NoOpJobMasterPartitionTracker.INSTANCE,
-                System.currentTimeMillis());
+        return 
TestingExecutionGraphBuilder.newBuilder().setJobGraph(testJob).build();
     }
 
     private void initializeLocation(ExecutionVertex vertex, 
TaskManagerLocation location)

Reply via email to