This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 51010a100ce7c1403c99bf639ea8fc1aac7ba135
Author: Zhu Zhu <reed...@gmail.com>
AuthorDate: Fri Jul 8 20:52:04 2022 +0800

    [hotfix][tests] Merge DefaultSchedulerBuilder and 
AdaptiveBatchSchedulerBuilder
    
    The extending AdaptiveBatchSchedulerBuilder is in a good shape, which 
requires hack logics to create AdaptiveBatchScheduler. Merging them can make it 
simpler to create different schedulers.
---
 .../adaptivebatch/AdaptiveBatchScheduler.java      |   2 +-
 .../DefaultSchedulerCheckpointCoordinatorTest.java |   4 +-
 .../DefaultExecutionGraphDeploymentTest.java       |   7 +-
 .../ExecutionGraphCoLocationRestartTest.java       |   3 +-
 .../executiongraph/ExecutionGraphFinishTest.java   |   4 +-
 .../ExecutionGraphPartitionReleaseTest.java        |   3 +-
 .../executiongraph/ExecutionGraphRestartTest.java  |  15 +-
 .../executiongraph/ExecutionGraphSuspendTest.java  |   5 +-
 .../executiongraph/ExecutionGraphTestUtils.java    |   4 +-
 .../ExecutionPartitionLifecycleTest.java           |   3 +-
 .../runtime/executiongraph/ExecutionTest.java      |   9 +-
 .../executiongraph/ExecutionVertexTest.java        |   5 +-
 .../IntermediateResultPartitionTest.java           |   4 +-
 .../RemoveCachedShuffleDescriptorTest.java         |   4 +-
 .../scheduler/UpdatePartitionConsumersTest.java    |   4 +-
 .../OperatorCoordinatorSchedulerTest.java          |  69 ++++-
 .../DefaultSchedulerBatchSchedulingTest.java       |   2 +-
 .../runtime/scheduler/DefaultSchedulerBuilder.java | 319 +++++++++++++++++++++
 .../runtime/scheduler/DefaultSchedulerTest.java    |   9 +-
 .../runtime/scheduler/SchedulerTestingUtils.java   | 316 --------------------
 .../adaptivebatch/AdaptiveBatchSchedulerTest.java  |  16 +-
 .../AdaptiveBatchSchedulerTestUtils.java           | 106 -------
 .../benchmark/SchedulerBenchmarkUtils.java         |   5 +-
 .../e2e/SchedulerEndToEndBenchmarkBase.java        |   4 +-
 .../ExecutionTimeBasedSlowTaskDetectorTest.java    |   6 +-
 25 files changed, 445 insertions(+), 483 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
index 6507774b3e6..f744d68f28b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -80,7 +80,7 @@ public class AdaptiveBatchScheduler extends DefaultScheduler 
implements Schedule
 
     private final Map<JobVertexID, ForwardGroup> forwardGroupsByJobVertexId;
 
-    AdaptiveBatchScheduler(
+    public AdaptiveBatchScheduler(
             final Logger log,
             final JobGraph jobGraph,
             final Executor ioExecutor,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java
index 1fe91122849..5b746502ca7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java
@@ -32,8 +32,8 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
-import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorResource;
@@ -208,7 +208,7 @@ public class DefaultSchedulerCheckpointCoordinatorTest 
extends TestLogger {
                         .setJobCheckpointingSettings(checkpointingSettings)
                         .build();
 
-        return new SchedulerTestingUtils.DefaultSchedulerBuilder(
+        return new DefaultSchedulerBuilder(
                         jobGraph,
                         
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                         EXECUTOR_RESOURCE.getExecutor())
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
index a2abb0b5901..05237422a71 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
@@ -50,6 +50,7 @@ import 
org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
@@ -477,7 +478,7 @@ public class DefaultExecutionGraphDeploymentTest extends 
TestLogger {
 
         // execution graph that executes actions synchronously
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 graph,
                                 
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
@@ -538,7 +539,7 @@ public class DefaultExecutionGraphDeploymentTest extends 
TestLogger {
 
         // execution graph that executes actions synchronously
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 JobGraphTestUtils.streamingJobGraph(v1, v2),
                                 
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
@@ -624,7 +625,7 @@ public class DefaultExecutionGraphDeploymentTest extends 
TestLogger {
         final TestingPhysicalSlotProvider physicalSlotProvider =
                 
TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation();
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
index d7a9d89557b..96b23b8dba5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
@@ -80,7 +81,7 @@ public class ExecutionGraphCoLocationRestartTest {
         final ManuallyTriggeredScheduledExecutorService delayExecutor =
                 new ManuallyTriggeredScheduledExecutorService();
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphFinishTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphFinishTest.java
index d23b13cc59f..f8959481fd3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphFinishTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphFinishTest.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.JobStatus;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
-import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorResource;
@@ -55,7 +55,7 @@ public class ExecutionGraphFinishTest extends TestLogger {
                         ExecutionGraphTestUtils.createJobVertex("Task2", 2, 
NoOpInvokable.class));
 
         SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
index 0ca4f106f95..29711bc9956 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -251,7 +252,7 @@ public class ExecutionGraphPartitionReleaseTest extends 
TestLogger {
 
         final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(vertices);
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 mainThreadExecutor.getMainThreadExecutor(),
                                 EXECUTOR_RESOURCE.getExecutor())
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index facd4be2fbe..a08d5444083 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
@@ -107,7 +108,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                             "Task", NUM_TASKS + numTasksExceedSlotPool, 
NoOpInvokable.class);
             JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
             SchedulerBase scheduler =
-                    new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                    new DefaultSchedulerBuilder(
                                     graph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
                             .setExecutionSlotAllocatorFactory(
                                     
createExecutionSlotAllocatorFactory(slotPool))
@@ -136,7 +137,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                             "Task", NUM_TASKS + numTasksExceedSlotPool, 
NoOpInvokable.class);
             JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
             SchedulerBase scheduler =
-                    new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                    new DefaultSchedulerBuilder(
                                     graph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
                             .setExecutionSlotAllocatorFactory(
                                     
createExecutionSlotAllocatorFactory(slotPool))
@@ -159,7 +160,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
         // We want to manually control the restart and delay
         try (SlotPool slotPool = 
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
             SchedulerBase scheduler =
-                    new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                    new DefaultSchedulerBuilder(
                                     createJobGraph(),
                                     mainThreadExecutor,
                                     EXECUTOR_RESOURCE.getExecutor())
@@ -202,7 +203,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
     public void testCancelWhileFailing() throws Exception {
         try (SlotPool slotPool = 
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
             SchedulerBase scheduler =
-                    new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                    new DefaultSchedulerBuilder(
                                     createJobGraph(),
                                     mainThreadExecutor,
                                     EXECUTOR_RESOURCE.getExecutor())
@@ -240,7 +241,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
     public void testFailWhileCanceling() throws Exception {
         try (SlotPool slotPool = 
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
             SchedulerBase scheduler =
-                    new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                    new DefaultSchedulerBuilder(
                                     createJobGraph(),
                                     mainThreadExecutor,
                                     EXECUTOR_RESOURCE.getExecutor())
@@ -291,7 +292,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
         try (SlotPool slotPool = 
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
             SchedulerBase scheduler =
-                    new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                    new DefaultSchedulerBuilder(
                                     jobGraph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
                             .setExecutionSlotAllocatorFactory(
                                     
createExecutionSlotAllocatorFactory(slotPool))
@@ -351,7 +352,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
     public void testFailExecutionAfterCancel() throws Exception {
         try (SlotPool slotPool = 
SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
             SchedulerBase scheduler =
-                    new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                    new DefaultSchedulerBuilder(
                                     createJobGraphToCancel(),
                                     mainThreadExecutor,
                                     EXECUTOR_RESOURCE.getExecutor())
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index 5804f782ff9..3277cf3dbf5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoff
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
@@ -237,7 +238,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
         final ManuallyTriggeredScheduledExecutor taskRestartExecutor =
                 new ManuallyTriggeredScheduledExecutor();
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 JobGraphTestUtils.emptyJobGraph(),
                                 
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
@@ -318,7 +319,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
         vertex.setParallelism(parallelism);
 
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 JobGraphTestUtils.streamingJobGraph(vertex),
                                 
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 10ff1baa915..ff2baf807e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -31,8 +31,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
-import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -391,7 +391,7 @@ public class ExecutionGraphTestUtils {
         JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
 
         SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 executor)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
index df765c69cd7..6ad9d321b96 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
@@ -279,7 +280,7 @@ public class ExecutionPartitionLifecycleTest extends 
TestLogger {
 
         final JobGraph jobGraph = 
JobGraphTestUtils.batchJobGraph(producerVertex, consumerVertex);
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 9f11c727632..0c991f55ea3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
@@ -79,7 +80,7 @@ public class ExecutionTest extends TestLogger {
         final TestingPhysicalSlotProvider physicalSlotProvider =
                 
TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1);
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 JobGraphTestUtils.streamingJobGraph(jobVertex),
                                 
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
@@ -123,7 +124,7 @@ public class ExecutionTest extends TestLogger {
         final JobVertexID jobVertexId = jobVertex.getID();
 
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 JobGraphTestUtils.streamingJobGraph(jobVertex),
                                 
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
@@ -168,7 +169,7 @@ public class ExecutionTest extends TestLogger {
                                                 
.withTaskManagerGateway(taskManagerGateway)
                                                 .build()));
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 JobGraphTestUtils.streamingJobGraph(jobVertex),
                                 testMainThreadUtil.getMainThreadExecutor(),
                                 EXECUTOR_RESOURCE.getExecutor())
@@ -208,7 +209,7 @@ public class ExecutionTest extends TestLogger {
         final TestingPhysicalSlotProvider physicalSlotProvider =
                 
TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1);
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 JobGraphTestUtils.streamingJobGraph(jobVertex),
                                 testMainThreadUtil.getMainThreadExecutor(),
                                 EXECUTOR_RESOURCE.getExecutor())
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java
index dd87d04abcb..d5f6f4f13e9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
@@ -74,7 +75,7 @@ public class ExecutionVertexTest extends TestLogger {
         final JobGraph jobGraph =
                 JobGraphTestUtils.streamingJobGraph(producerJobVertex, 
consumerJobVertex);
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
@@ -122,7 +123,7 @@ public class ExecutionVertexTest extends TestLogger {
         // make sure that retrieving the last (al)location is independent from 
the history size
         configuration.set(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE, 1);
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
index e5470cb99cd..456de025668 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
@@ -26,8 +26,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
-import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.VertexParallelismStore;
 import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
 import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
@@ -304,7 +304,7 @@ public class IntermediateResultPartitionTest extends 
TestLogger {
         JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(source, sink);
 
         SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 executorService)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java
index 2ba942dcecc..73dd9cdeb3a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java
@@ -39,7 +39,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.scheduler.DefaultScheduler;
-import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.testutils.TestingUtils;
@@ -393,7 +393,7 @@ public class RemoveCachedShuffleDescriptorTest extends 
TestLogger {
                         .addJobVertices(jobVertices)
                         .build();
 
-        return new SchedulerTestingUtils.DefaultSchedulerBuilder(
+        return new DefaultSchedulerBuilder(
                         jobGraph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
                 .setRestartBackoffTimeStrategy(new 
TestRestartBackoffTimeStrategy(true, 0))
                 .setBlobWriter(blobWriter)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java
index b1b46dee1a0..65e7975105d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java
@@ -31,8 +31,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
-import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
@@ -132,7 +132,7 @@ public class UpdatePartitionConsumersTest extends 
TestLogger {
                 new SimpleAckingTaskManagerGateway();
 
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index e8022c97ada..0bc4aba7187 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -37,17 +37,23 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartAllFailoverStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
+import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import 
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
 import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -694,11 +700,11 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
                 new ComponentMainThreadExecutorServiceAdapter(
                         (ScheduledExecutorService) executor, 
Thread.currentThread());
 
-        final SchedulerTestingUtils.DefaultSchedulerBuilder schedulerBuilder =
+        final DefaultSchedulerBuilder schedulerBuilder =
                 taskExecutorOperatorEventGateway == null
-                        ? SchedulerTestingUtils.createSchedulerBuilder(
+                        ? createSchedulerBuilder(
                                 jobGraph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
-                        : SchedulerTestingUtils.createSchedulerBuilder(
+                        : createSchedulerBuilder(
                                 jobGraph,
                                 mainThreadExecutor,
                                 taskExecutorOperatorEventGateway,
@@ -714,6 +720,46 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
         return scheduler;
     }
 
+    private static DefaultSchedulerBuilder createSchedulerBuilder(
+            JobGraph jobGraph,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            ScheduledExecutorService scheduledExecutorService) {
+
+        return createSchedulerBuilder(
+                jobGraph,
+                mainThreadExecutor,
+                new SimpleAckingTaskManagerGateway(),
+                scheduledExecutorService);
+    }
+
+    private static DefaultSchedulerBuilder createSchedulerBuilder(
+            JobGraph jobGraph,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            TaskExecutorOperatorEventGateway operatorEventGateway,
+            ScheduledExecutorService scheduledExecutorService) {
+
+        final TaskManagerGateway gateway =
+                operatorEventGateway instanceof TaskManagerGateway
+                        ? (TaskManagerGateway) operatorEventGateway
+                        : new 
TaskExecutorOperatorEventGatewayAdapter(operatorEventGateway);
+
+        return createSchedulerBuilder(
+                jobGraph, mainThreadExecutor, gateway, 
scheduledExecutorService);
+    }
+
+    private static DefaultSchedulerBuilder createSchedulerBuilder(
+            JobGraph jobGraph,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            TaskManagerGateway taskManagerGateway,
+            ScheduledExecutorService executorService) {
+
+        return new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, 
executorService)
+                .setSchedulingStrategyFactory(new 
PipelinedRegionSchedulingStrategy.Factory())
+                .setRestartBackoffTimeStrategy(new 
TestRestartBackoffTimeStrategy(true, 0))
+                .setExecutionSlotAllocatorFactory(
+                        new 
TestExecutionSlotAllocatorFactory(taskManagerGateway));
+    }
+
     private void scheduleAllTasksToRunning(DefaultScheduler scheduler) {
         scheduler.startScheduling();
         executor.triggerAll();
@@ -1014,4 +1060,21 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
             throw new UnsupportedOperationException();
         }
     }
+
+    private static final class TaskExecutorOperatorEventGatewayAdapter
+            extends SimpleAckingTaskManagerGateway {
+
+        private final TaskExecutorOperatorEventGateway operatorGateway;
+
+        private TaskExecutorOperatorEventGatewayAdapter(
+                TaskExecutorOperatorEventGateway operatorGateway) {
+            this.operatorGateway = operatorGateway;
+        }
+
+        @Override
+        public CompletableFuture<Acknowledge> sendOperatorEventToTask(
+                ExecutionAttemptID task, OperatorID operator, 
SerializedValue<OperatorEvent> evt) {
+            return operatorGateway.sendOperatorEventToTask(task, operator, 
evt);
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
index 2b72068abc7..442b0adb9b1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
@@ -205,7 +205,7 @@ public class DefaultSchedulerBatchSchedulingTest extends 
TestLogger {
             Time slotRequestTimeout,
             JobStatusListener jobStatusListener)
             throws Exception {
-        return new SchedulerTestingUtils.DefaultSchedulerBuilder(
+        return new DefaultSchedulerBuilder(
                         jobGraph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
                 .setExecutionSlotAllocatorFactory(
                         
SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
new file mode 100644
index 00000000000..4c99e8ce5ec
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
+import 
org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismDecider;
+import 
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import 
org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleTestUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static 
org.apache.flink.runtime.scheduler.SchedulerBase.computeVertexParallelismStore;
+
+/** A builder to create {@link DefaultScheduler} instances for testing. */
+public class DefaultSchedulerBuilder {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultSchedulerBuilder.class);
+
+    private final JobGraph jobGraph;
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
+    private Executor ioExecutor;
+    private ScheduledExecutorService futureExecutor;
+    private ScheduledExecutor delayExecutor;
+    private Logger log = LOG;
+    private Configuration jobMasterConfiguration = new Configuration();
+    private ClassLoader userCodeLoader = ClassLoader.getSystemClassLoader();
+    private CheckpointsCleaner checkpointCleaner = new CheckpointsCleaner();
+    private CheckpointRecoveryFactory checkpointRecoveryFactory =
+            new StandaloneCheckpointRecoveryFactory();
+    private Time rpcTimeout = Time.seconds(300);
+    private BlobWriter blobWriter = VoidBlobWriter.getInstance();
+    private JobManagerJobMetricGroup jobManagerJobMetricGroup =
+            
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
+    private ShuffleMaster<?> shuffleMaster = 
ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER;
+    private JobMasterPartitionTracker partitionTracker = 
NoOpJobMasterPartitionTracker.INSTANCE;
+    private SchedulingStrategyFactory schedulingStrategyFactory =
+            new PipelinedRegionSchedulingStrategy.Factory();
+    private FailoverStrategy.Factory failoverStrategyFactory =
+            new RestartPipelinedRegionFailoverStrategy.Factory();
+    private RestartBackoffTimeStrategy restartBackoffTimeStrategy =
+            NoRestartBackoffTimeStrategy.INSTANCE;
+    private ExecutionOperations executionOperations = new 
DefaultExecutionOperations();
+    private ExecutionVertexVersioner executionVertexVersioner = new 
ExecutionVertexVersioner();
+    private ExecutionSlotAllocatorFactory executionSlotAllocatorFactory =
+            new TestExecutionSlotAllocatorFactory();
+    private JobStatusListener jobStatusListener = (ignoredA, ignoredB, 
ignoredC) -> {};
+    private ExecutionDeployer.Factory executionDeployerFactory =
+            new DefaultExecutionDeployer.Factory();
+    private VertexParallelismDecider vertexParallelismDecider = (ignored) -> 0;
+    private int defaultMaxParallelism =
+            
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM.defaultValue();
+
+    public DefaultSchedulerBuilder(
+            JobGraph jobGraph,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            ScheduledExecutorService generalExecutorService) {
+        this(
+                jobGraph,
+                mainThreadExecutor,
+                generalExecutorService,
+                generalExecutorService,
+                new ScheduledExecutorServiceAdapter(generalExecutorService));
+    }
+
+    public DefaultSchedulerBuilder(
+            JobGraph jobGraph,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            Executor ioExecutor,
+            ScheduledExecutorService futureExecutor,
+            ScheduledExecutor delayExecutor) {
+        this.jobGraph = jobGraph;
+        this.mainThreadExecutor = mainThreadExecutor;
+        this.ioExecutor = ioExecutor;
+        this.futureExecutor = futureExecutor;
+        this.delayExecutor = delayExecutor;
+    }
+
+    public DefaultSchedulerBuilder setIoExecutor(Executor ioExecutor) {
+        this.ioExecutor = ioExecutor;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setFutureExecutor(ScheduledExecutorService 
futureExecutor) {
+        this.futureExecutor = futureExecutor;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setDelayExecutor(ScheduledExecutor 
delayExecutor) {
+        this.delayExecutor = delayExecutor;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setLogger(Logger log) {
+        this.log = log;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setJobMasterConfiguration(Configuration 
jobMasterConfiguration) {
+        this.jobMasterConfiguration = jobMasterConfiguration;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setUserCodeLoader(ClassLoader 
userCodeLoader) {
+        this.userCodeLoader = userCodeLoader;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setCheckpointCleaner(CheckpointsCleaner 
checkpointsCleaner) {
+        this.checkpointCleaner = checkpointsCleaner;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setCheckpointRecoveryFactory(
+            CheckpointRecoveryFactory checkpointRecoveryFactory) {
+        this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setRpcTimeout(Time rpcTimeout) {
+        this.rpcTimeout = rpcTimeout;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setBlobWriter(BlobWriter blobWriter) {
+        this.blobWriter = blobWriter;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setJobManagerJobMetricGroup(
+            JobManagerJobMetricGroup jobManagerJobMetricGroup) {
+        this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setShuffleMaster(ShuffleMaster<?> 
shuffleMaster) {
+        this.shuffleMaster = shuffleMaster;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder 
setPartitionTracker(JobMasterPartitionTracker partitionTracker) {
+        this.partitionTracker = partitionTracker;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setSchedulingStrategyFactory(
+            SchedulingStrategyFactory schedulingStrategyFactory) {
+        this.schedulingStrategyFactory = schedulingStrategyFactory;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setFailoverStrategyFactory(
+            FailoverStrategy.Factory failoverStrategyFactory) {
+        this.failoverStrategyFactory = failoverStrategyFactory;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setRestartBackoffTimeStrategy(
+            RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
+        this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setExecutionOperations(ExecutionOperations 
executionOperations) {
+        this.executionOperations = executionOperations;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setExecutionVertexVersioner(
+            ExecutionVertexVersioner executionVertexVersioner) {
+        this.executionVertexVersioner = executionVertexVersioner;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setExecutionSlotAllocatorFactory(
+            ExecutionSlotAllocatorFactory executionSlotAllocatorFactory) {
+        this.executionSlotAllocatorFactory = executionSlotAllocatorFactory;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setJobStatusListener(JobStatusListener 
jobStatusListener) {
+        this.jobStatusListener = jobStatusListener;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setExecutionDeployerFactory(
+            ExecutionDeployer.Factory executionDeployerFactory) {
+        this.executionDeployerFactory = executionDeployerFactory;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setVertexParallelismDecider(
+            VertexParallelismDecider vertexParallelismDecider) {
+        this.vertexParallelismDecider = vertexParallelismDecider;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setDefaultMaxParallelism(int 
defaultMaxParallelism) {
+        this.defaultMaxParallelism = defaultMaxParallelism;
+        return this;
+    }
+
+    public DefaultScheduler build() throws Exception {
+        return new DefaultScheduler(
+                log,
+                jobGraph,
+                ioExecutor,
+                jobMasterConfiguration,
+                componentMainThreadExecutor -> {},
+                delayExecutor,
+                userCodeLoader,
+                checkpointCleaner,
+                checkpointRecoveryFactory,
+                jobManagerJobMetricGroup,
+                schedulingStrategyFactory,
+                failoverStrategyFactory,
+                restartBackoffTimeStrategy,
+                executionOperations,
+                executionVertexVersioner,
+                executionSlotAllocatorFactory,
+                System.currentTimeMillis(),
+                mainThreadExecutor,
+                jobStatusListener,
+                createExecutionGraphFactory(false),
+                shuffleMaster,
+                rpcTimeout,
+                computeVertexParallelismStore(jobGraph),
+                executionDeployerFactory);
+    }
+
+    public AdaptiveBatchScheduler buildAdaptiveBatchJobScheduler() throws 
Exception {
+        return new AdaptiveBatchScheduler(
+                log,
+                jobGraph,
+                ioExecutor,
+                jobMasterConfiguration,
+                componentMainThreadExecutor -> {},
+                delayExecutor,
+                userCodeLoader,
+                checkpointCleaner,
+                checkpointRecoveryFactory,
+                jobManagerJobMetricGroup,
+                new VertexwiseSchedulingStrategy.Factory(),
+                failoverStrategyFactory,
+                restartBackoffTimeStrategy,
+                executionOperations,
+                executionVertexVersioner,
+                executionSlotAllocatorFactory,
+                System.currentTimeMillis(),
+                mainThreadExecutor,
+                jobStatusListener,
+                createExecutionGraphFactory(true),
+                shuffleMaster,
+                rpcTimeout,
+                vertexParallelismDecider,
+                defaultMaxParallelism);
+    }
+
+    private ExecutionGraphFactory createExecutionGraphFactory(boolean 
isDynamicGraph) {
+        return new DefaultExecutionGraphFactory(
+                jobMasterConfiguration,
+                userCodeLoader,
+                new DefaultExecutionDeploymentTracker(),
+                futureExecutor,
+                ioExecutor,
+                rpcTimeout,
+                jobManagerJobMetricGroup,
+                blobWriter,
+                shuffleMaster,
+                partitionTracker,
+                isDynamicGraph,
+                new ExecutionJobVertex.Factory());
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 733dbbc3d6e..d78610c25aa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -1599,7 +1599,7 @@ public class DefaultSchedulerTest extends TestLogger {
                         final JobGraph jobGraph = singleJobVertexJobGraph(1);
                         enableCheckpointing(jobGraph);
                         try {
-                            return new 
SchedulerTestingUtils.DefaultSchedulerBuilder(
+                            return new DefaultSchedulerBuilder(
                                             jobGraph,
                                             
ComponentMainThreadExecutorServiceAdapter
                                                     
.forSingleThreadExecutor(executorService),
@@ -1844,10 +1844,9 @@ public class DefaultSchedulerTest extends TestLogger {
                 .build();
     }
 
-    private SchedulerTestingUtils.DefaultSchedulerBuilder 
createSchedulerBuilder(
-            final JobGraph jobGraph, final ComponentMainThreadExecutor 
mainThreadExecutor)
-            throws Exception {
-        return new SchedulerTestingUtils.DefaultSchedulerBuilder(
+    private DefaultSchedulerBuilder createSchedulerBuilder(
+            final JobGraph jobGraph, final ComponentMainThreadExecutor 
mainThreadExecutor) {
+        return new DefaultSchedulerBuilder(
                         jobGraph,
                         mainThreadExecutor,
                         executor,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index ba6b7a0d880..eb09a7e139c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -20,72 +20,38 @@ package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobWriter;
-import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
-import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.JobStatusListener;
-import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
-import 
org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
-import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
-import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
-import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
-import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
-import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
-import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-import 
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
-import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
-import org.apache.flink.runtime.shuffle.ShuffleMaster;
-import org.apache.flink.runtime.shuffle.ShuffleTestUtils;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TernaryBoolean;
-import org.apache.flink.util.concurrent.ScheduledExecutor;
-import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
-import static 
org.apache.flink.runtime.scheduler.SchedulerBase.computeVertexParallelismStore;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -94,8 +60,6 @@ import static org.junit.Assert.fail;
 /** A utility class to create {@link DefaultScheduler} instances for testing. 
*/
 public class SchedulerTestingUtils {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(SchedulerTestingUtils.class);
-
     private static final long DEFAULT_CHECKPOINT_TIMEOUT_MS = 10 * 60 * 1000;
 
     private static final Time DEFAULT_TIMEOUT = Time.seconds(300);
@@ -110,47 +74,6 @@ public class SchedulerTestingUtils {
         return new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, 
executorService).build();
     }
 
-    public static DefaultSchedulerBuilder createSchedulerBuilder(
-            JobGraph jobGraph,
-            ComponentMainThreadExecutor mainThreadExecutor,
-            ScheduledExecutorService scheduledExecutorService) {
-
-        return createSchedulerBuilder(
-                jobGraph,
-                mainThreadExecutor,
-                new SimpleAckingTaskManagerGateway(),
-                scheduledExecutorService);
-    }
-
-    public static DefaultSchedulerBuilder createSchedulerBuilder(
-            JobGraph jobGraph,
-            ComponentMainThreadExecutor mainThreadExecutor,
-            TaskExecutorOperatorEventGateway operatorEventGateway,
-            ScheduledExecutorService scheduledExecutorService) {
-
-        final TaskManagerGateway gateway =
-                operatorEventGateway instanceof TaskManagerGateway
-                        ? (TaskManagerGateway) operatorEventGateway
-                        : new 
TaskExecutorOperatorEventGatewayAdapter(operatorEventGateway);
-
-        return createSchedulerBuilder(
-                jobGraph, mainThreadExecutor, gateway, 
scheduledExecutorService);
-    }
-
-    private static DefaultSchedulerBuilder createSchedulerBuilder(
-            JobGraph jobGraph,
-            ComponentMainThreadExecutor mainThreadExecutor,
-            TaskManagerGateway taskManagerGateway,
-            ScheduledExecutorService executorService) {
-
-        return new SchedulerTestingUtils.DefaultSchedulerBuilder(
-                        jobGraph, mainThreadExecutor, executorService)
-                .setSchedulingStrategyFactory(new 
PipelinedRegionSchedulingStrategy.Factory())
-                .setRestartBackoffTimeStrategy(new 
TestRestartBackoffTimeStrategy(true, 0))
-                .setExecutionSlotAllocatorFactory(
-                        new 
TestExecutionSlotAllocatorFactory(taskManagerGateway));
-    }
-
     public static void enableCheckpointing(final JobGraph jobGraph) {
         enableCheckpointing(jobGraph, null, null);
     }
@@ -353,22 +276,6 @@ public class SchedulerTestingUtils {
 
     // ------------------------------------------------------------------------
 
-    private static final class TaskExecutorOperatorEventGatewayAdapter
-            extends SimpleAckingTaskManagerGateway {
-
-        private final TaskExecutorOperatorEventGateway operatorGateway;
-
-        
TaskExecutorOperatorEventGatewayAdapter(TaskExecutorOperatorEventGateway 
operatorGateway) {
-            this.operatorGateway = operatorGateway;
-        }
-
-        @Override
-        public CompletableFuture<Acknowledge> sendOperatorEventToTask(
-                ExecutionAttemptID task, OperatorID operator, 
SerializedValue<OperatorEvent> evt) {
-            return operatorGateway.sendOperatorEventToTask(task, operator, 
evt);
-        }
-    }
-
     public static SlotSharingExecutionSlotAllocatorFactory
             newSlotSharingExecutionSlotAllocatorFactory() {
         return newSlotSharingExecutionSlotAllocatorFactory(
@@ -390,227 +297,4 @@ public class SchedulerTestingUtils {
                 allocationTimeout,
                 new LocalInputPreferredSlotSharingStrategy.Factory());
     }
-
-    /** Builder for {@link DefaultScheduler}. */
-    public static class DefaultSchedulerBuilder {
-        protected final JobGraph jobGraph;
-
-        protected final ComponentMainThreadExecutor mainThreadExecutor;
-
-        protected SchedulingStrategyFactory schedulingStrategyFactory =
-                new PipelinedRegionSchedulingStrategy.Factory();
-
-        protected Logger log = LOG;
-        protected Executor ioExecutor;
-        protected Configuration jobMasterConfiguration = new Configuration();
-        protected ScheduledExecutorService futureExecutor;
-        protected ScheduledExecutor delayExecutor;
-        protected ClassLoader userCodeLoader = 
ClassLoader.getSystemClassLoader();
-        protected CheckpointsCleaner checkpointCleaner = new 
CheckpointsCleaner();
-        protected CheckpointRecoveryFactory checkpointRecoveryFactory =
-                new StandaloneCheckpointRecoveryFactory();
-        protected Time rpcTimeout = DEFAULT_TIMEOUT;
-        protected BlobWriter blobWriter = VoidBlobWriter.getInstance();
-        protected JobManagerJobMetricGroup jobManagerJobMetricGroup =
-                
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
-        protected ShuffleMaster<?> shuffleMaster = 
ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER;
-        protected JobMasterPartitionTracker partitionTracker =
-                NoOpJobMasterPartitionTracker.INSTANCE;
-        protected FailoverStrategy.Factory failoverStrategyFactory =
-                new RestartPipelinedRegionFailoverStrategy.Factory();
-        protected RestartBackoffTimeStrategy restartBackoffTimeStrategy =
-                NoRestartBackoffTimeStrategy.INSTANCE;
-        protected ExecutionOperations executionOperations = new 
DefaultExecutionOperations();
-        protected ExecutionVertexVersioner executionVertexVersioner =
-                new ExecutionVertexVersioner();
-        protected ExecutionSlotAllocatorFactory executionSlotAllocatorFactory =
-                new TestExecutionSlotAllocatorFactory();
-        protected JobStatusListener jobStatusListener = (ignoredA, ignoredB, 
ignoredC) -> {};
-        protected ExecutionDeployer.Factory executionDeployerFactory =
-                new DefaultExecutionDeployer.Factory();
-
-        public DefaultSchedulerBuilder(
-                final JobGraph jobGraph,
-                ComponentMainThreadExecutor mainThreadExecutor,
-                ScheduledExecutorService generalExecutorService) {
-            this(
-                    jobGraph,
-                    mainThreadExecutor,
-                    generalExecutorService,
-                    generalExecutorService,
-                    new 
ScheduledExecutorServiceAdapter(generalExecutorService));
-        }
-
-        public DefaultSchedulerBuilder(
-                final JobGraph jobGraph,
-                ComponentMainThreadExecutor mainThreadExecutor,
-                Executor ioExecutor,
-                ScheduledExecutorService futureExecutor,
-                ScheduledExecutor delayExecuto) {
-            this.jobGraph = jobGraph;
-            this.mainThreadExecutor = mainThreadExecutor;
-            this.ioExecutor = ioExecutor;
-            this.futureExecutor = futureExecutor;
-            this.delayExecutor = delayExecuto;
-        }
-
-        public DefaultSchedulerBuilder setLogger(final Logger log) {
-            this.log = log;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setIoExecutor(final Executor 
ioExecutor) {
-            this.ioExecutor = ioExecutor;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setJobMasterConfiguration(
-                final Configuration jobMasterConfiguration) {
-            this.jobMasterConfiguration = jobMasterConfiguration;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setFutureExecutor(
-                final ScheduledExecutorService futureExecutor) {
-            this.futureExecutor = futureExecutor;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setDelayExecutor(final 
ScheduledExecutor delayExecutor) {
-            this.delayExecutor = delayExecutor;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setUserCodeLoader(final ClassLoader 
userCodeLoader) {
-            this.userCodeLoader = userCodeLoader;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setCheckpointCleaner(
-                final CheckpointsCleaner checkpointsCleaner) {
-            this.checkpointCleaner = checkpointsCleaner;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setCheckpointRecoveryFactory(
-                final CheckpointRecoveryFactory checkpointRecoveryFactory) {
-            this.checkpointRecoveryFactory = checkpointRecoveryFactory;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setRpcTimeout(final Time rpcTimeout) {
-            this.rpcTimeout = rpcTimeout;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setBlobWriter(final BlobWriter 
blobWriter) {
-            this.blobWriter = blobWriter;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setJobManagerJobMetricGroup(
-                final JobManagerJobMetricGroup jobManagerJobMetricGroup) {
-            this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setShuffleMaster(final ShuffleMaster<?> 
shuffleMaster) {
-            this.shuffleMaster = shuffleMaster;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setPartitionTracker(
-                final JobMasterPartitionTracker partitionTracker) {
-            this.partitionTracker = partitionTracker;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setSchedulingStrategyFactory(
-                final SchedulingStrategyFactory schedulingStrategyFactory) {
-            this.schedulingStrategyFactory = schedulingStrategyFactory;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setFailoverStrategyFactory(
-                final FailoverStrategy.Factory failoverStrategyFactory) {
-            this.failoverStrategyFactory = failoverStrategyFactory;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setRestartBackoffTimeStrategy(
-                final RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
-            this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setExecutionOperations(
-                final ExecutionOperations executionOperations) {
-            this.executionOperations = executionOperations;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setExecutionVertexVersioner(
-                final ExecutionVertexVersioner executionVertexVersioner) {
-            this.executionVertexVersioner = executionVertexVersioner;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setExecutionSlotAllocatorFactory(
-                final ExecutionSlotAllocatorFactory 
executionSlotAllocatorFactory) {
-            this.executionSlotAllocatorFactory = executionSlotAllocatorFactory;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setJobStatusListener(JobStatusListener 
jobStatusListener) {
-            this.jobStatusListener = jobStatusListener;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setExecutionDeployerFactory(
-                ExecutionDeployer.Factory executionDeployerFactory) {
-            this.executionDeployerFactory = executionDeployerFactory;
-            return this;
-        }
-
-        public DefaultScheduler build() throws Exception {
-            final ExecutionGraphFactory executionGraphFactory =
-                    new DefaultExecutionGraphFactory(
-                            jobMasterConfiguration,
-                            userCodeLoader,
-                            new DefaultExecutionDeploymentTracker(),
-                            futureExecutor,
-                            ioExecutor,
-                            rpcTimeout,
-                            jobManagerJobMetricGroup,
-                            blobWriter,
-                            shuffleMaster,
-                            partitionTracker);
-
-            return new DefaultScheduler(
-                    log,
-                    jobGraph,
-                    ioExecutor,
-                    jobMasterConfiguration,
-                    componentMainThreadExecutor -> {},
-                    delayExecutor,
-                    userCodeLoader,
-                    checkpointCleaner,
-                    checkpointRecoveryFactory,
-                    jobManagerJobMetricGroup,
-                    schedulingStrategyFactory,
-                    failoverStrategyFactory,
-                    restartBackoffTimeStrategy,
-                    executionOperations,
-                    executionVertexVersioner,
-                    executionSlotAllocatorFactory,
-                    System.currentTimeMillis(),
-                    mainThreadExecutor,
-                    jobStatusListener,
-                    executionGraphFactory,
-                    shuffleMaster,
-                    rpcTimeout,
-                    computeVertexParallelismStore(jobGraph),
-                    executionDeployerFactory);
-        }
-    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
index e8636ea8435..d6508d8b66a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -177,15 +178,10 @@ public class AdaptiveBatchSchedulerTest extends 
TestLogger {
         configuration.set(
                 JobManagerOptions.SCHEDULER, 
JobManagerOptions.SchedulerType.AdaptiveBatch);
 
-        final AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder 
schedulerBuilder =
-                (AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder)
-                        new 
AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder(
-                                        jobGraph,
-                                        mainThreadExecutor,
-                                        EXECUTOR_RESOURCE.getExecutor())
-                                .setJobMasterConfiguration(configuration);
-        schedulerBuilder.setVertexParallelismDecider((ignored) -> 10);
-
-        return schedulerBuilder.build();
+        return new DefaultSchedulerBuilder(
+                        jobGraph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
+                .setJobMasterConfiguration(configuration)
+                .setVertexParallelismDecider((ignored) -> 10)
+                .buildAdaptiveBatchJobScheduler();
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
deleted file mode 100644
index d462c726bc4..00000000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flink.runtime.scheduler.adaptivebatch;
-
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
-import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
-import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
-import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
-import 
org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-/** A utility class to create {@link AdaptiveBatchScheduler} instances for 
testing. */
-public class AdaptiveBatchSchedulerTestUtils {
-
-    /** Builder for {@link AdaptiveBatchScheduler}. */
-    public static class AdaptiveBatchSchedulerBuilder
-            extends SchedulerTestingUtils.DefaultSchedulerBuilder {
-
-        private VertexParallelismDecider vertexParallelismDecider = (ignored) 
-> 0;
-
-        private int defaultMaxParallelism =
-                
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM.defaultValue();
-
-        public AdaptiveBatchSchedulerBuilder(
-                JobGraph jobGraph,
-                ComponentMainThreadExecutor mainThreadExecutor,
-                ScheduledExecutorService executorService) {
-            super(jobGraph, mainThreadExecutor, executorService);
-            setSchedulingStrategyFactory(new 
VertexwiseSchedulingStrategy.Factory());
-        }
-
-        public void setVertexParallelismDecider(VertexParallelismDecider 
vertexParallelismDecider) {
-            this.vertexParallelismDecider = vertexParallelismDecider;
-        }
-
-        public void setDefaultMaxParallelism(int defaultMaxParallelism) {
-            this.defaultMaxParallelism = defaultMaxParallelism;
-        }
-
-        @Override
-        public AdaptiveBatchScheduler build() throws Exception {
-            final ExecutionGraphFactory executionGraphFactory =
-                    new DefaultExecutionGraphFactory(
-                            jobMasterConfiguration,
-                            userCodeLoader,
-                            new DefaultExecutionDeploymentTracker(),
-                            futureExecutor,
-                            ioExecutor,
-                            rpcTimeout,
-                            jobManagerJobMetricGroup,
-                            blobWriter,
-                            shuffleMaster,
-                            partitionTracker,
-                            true,
-                            new ExecutionJobVertex.Factory());
-
-            return new AdaptiveBatchScheduler(
-                    log,
-                    jobGraph,
-                    ioExecutor,
-                    jobMasterConfiguration,
-                    componentMainThreadExecutor -> {},
-                    delayExecutor,
-                    userCodeLoader,
-                    checkpointCleaner,
-                    checkpointRecoveryFactory,
-                    jobManagerJobMetricGroup,
-                    schedulingStrategyFactory,
-                    failoverStrategyFactory,
-                    restartBackoffTimeStrategy,
-                    executionOperations,
-                    executionVertexVersioner,
-                    executionSlotAllocatorFactory,
-                    System.currentTimeMillis(),
-                    mainThreadExecutor,
-                    jobStatusListener,
-                    executionGraphFactory,
-                    shuffleMaster,
-                    rpcTimeout,
-                    vertexParallelismDecider,
-                    defaultMaxParallelism);
-        }
-    }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
index ed5ef5f7eea..887634cd974 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
@@ -36,7 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.scheduler.DefaultScheduler;
-import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
@@ -104,8 +104,7 @@ public class SchedulerBenchmarkUtils {
                 ComponentMainThreadExecutorServiceAdapter.forMainThread();
 
         final DefaultScheduler scheduler =
-                SchedulerTestingUtils.createSchedulerBuilder(
-                                jobGraph, mainThreadExecutor, 
scheduledExecutorService)
+                new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, 
scheduledExecutorService)
                         .setIoExecutor(scheduledExecutorService)
                         .setFutureExecutor(scheduledExecutorService)
                         .setDelayExecutor(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java
index 42fcf4cf317..318375d8bc0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
 import org.apache.flink.runtime.scheduler.benchmark.SchedulerBenchmarkBase;
@@ -76,8 +77,7 @@ public class SchedulerEndToEndBenchmarkBase extends 
SchedulerBenchmarkBase {
             ComponentMainThreadExecutor mainThreadExecutor,
             ScheduledExecutorService executorService)
             throws Exception {
-        return new SchedulerTestingUtils.DefaultSchedulerBuilder(
-                        jobGraph, mainThreadExecutor, executorService)
+        return new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, 
executorService)
                 .setExecutionSlotAllocatorFactory(
                         
SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(
                                 physicalSlotProvider))
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java
index af2624a55be..620fcbe67f7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java
@@ -31,9 +31,9 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
-import 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerTestUtils;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.testutils.TestingUtils;
@@ -221,11 +221,11 @@ class ExecutionTimeBasedSlowTaskDetectorTest {
         final JobGraph jobGraph = 
JobGraphTestUtils.streamingJobGraph(jobVertices);
 
         final SchedulerBase scheduler =
-                new 
AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
-                        .build();
+                        .buildAdaptiveBatchJobScheduler();
 
         final ExecutionGraph executionGraph = scheduler.getExecutionGraph();
 

Reply via email to