This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c1ffb7204a60a9b2aad9b73881a0c4d82e5f9324
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Fri Aug 2 19:01:18 2019 +0200

    [hotfix] Introduce TestingLogicalSlotBuilder
---
 .../ExecutionGraphDeploymentTest.java              | 10 +--
 .../executiongraph/ExecutionGraphMetricsTest.java  |  6 +-
 .../ExecutionGraphPartitionReleaseTest.java        |  4 +-
 .../ExecutionGraphSchedulingTest.java              | 24 ++----
 .../executiongraph/ExecutionGraphTestUtils.java    |  4 +-
 .../runtime/executiongraph/ExecutionTest.java      |  4 +-
 .../executiongraph/ExecutionVertexCancelTest.java  | 16 ++--
 .../ExecutionVertexDeploymentTest.java             | 13 +--
 .../ExecutionVertexSchedulingTest.java             |  8 +-
 .../executiongraph/utils/SimpleSlotProvider.java   | 18 ++---
 .../runtime/jobmaster/TestingLogicalSlot.java      | 69 ++++------------
 .../jobmaster/TestingLogicalSlotBuilder.java       | 93 ++++++++++++++++++++++
 .../DefaultExecutionSlotAllocatorTest.java         |  4 +-
 ...GraphToInputsLocationsRetrieverAdapterTest.java |  3 +-
 14 files changed, 161 insertions(+), 115 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index d10fcbd..13f74f4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -58,7 +58,7 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.operators.BatchTask;
@@ -199,7 +199,7 @@ public class ExecutionGraphDeploymentTest extends 
TestLogger {
                                tdd.complete(taskDeploymentDescriptor);
                        }));
 
-                       final LogicalSlot slot = new 
TestingLogicalSlot(taskManagerGateway);
+                       final LogicalSlot slot = new 
TestingLogicalSlotBuilder().setTaskManagerGateway(taskManagerGateway).createTestingLogicalSlot();
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
 
@@ -443,7 +443,7 @@ public class ExecutionGraphDeploymentTest extends 
TestLogger {
 
                final ArrayDeque<CompletableFuture<LogicalSlot>> slotFutures = 
new ArrayDeque<>();
                for (int i = 0; i < dop1; i++) {
-                       
slotFutures.addLast(CompletableFuture.completedFuture(new 
TestingLogicalSlot()));
+                       
slotFutures.addLast(CompletableFuture.completedFuture(new 
TestingLogicalSlotBuilder().createTestingLogicalSlot()));
                }
 
                final SlotProvider slotProvider = new 
TestingSlotProvider(ignore -> slotFutures.removeFirst());
@@ -507,7 +507,7 @@ public class ExecutionGraphDeploymentTest extends 
TestLogger {
 
                final ArrayDeque<CompletableFuture<LogicalSlot>> slotFutures = 
new ArrayDeque<>();
                for (int i = 0; i < dop1 + dop2; i++) {
-                       
slotFutures.addLast(CompletableFuture.completedFuture(new 
TestingLogicalSlot()));
+                       
slotFutures.addLast(CompletableFuture.completedFuture(new 
TestingLogicalSlotBuilder().createTestingLogicalSlot()));
                }
 
                final SlotProvider slotProvider = new 
TestingSlotProvider(ignore -> slotFutures.removeFirst());
@@ -706,7 +706,7 @@ public class ExecutionGraphDeploymentTest extends 
TestLogger {
                Collections.shuffle(shuffledFutures);
 
                for (CompletableFuture<LogicalSlot> slotFuture : 
shuffledFutures) {
-                       slotFuture.complete(new 
TestingLogicalSlot(taskManagerGateway));
+                       slotFuture.complete(new 
TestingLogicalSlotBuilder().setTaskManagerGateway(taskManagerGateway).createTestingLogicalSlot());
                }
 
                final List<ExecutionAttemptID> submittedTasks = new 
ArrayList<>(numberTasks);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 72d3416..7389296 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.SerializedValue;
@@ -71,8 +71,8 @@ public class ExecutionGraphMetricsTest extends TestLogger {
                        Configuration jobConfig = new Configuration();
                        Time timeout = Time.seconds(10L);
 
-                       CompletableFuture<LogicalSlot> slotFuture1 = 
CompletableFuture.completedFuture(new TestingLogicalSlot());
-                       CompletableFuture<LogicalSlot> slotFuture2 = 
CompletableFuture.completedFuture(new TestingLogicalSlot());
+                       CompletableFuture<LogicalSlot> slotFuture1 = 
CompletableFuture.completedFuture(new 
TestingLogicalSlotBuilder().createTestingLogicalSlot());
+                       CompletableFuture<LogicalSlot> slotFuture2 = 
CompletableFuture.completedFuture(new 
TestingLogicalSlotBuilder().createTestingLogicalSlot());
                        ArrayDeque<CompletableFuture<LogicalSlot>> slotFutures 
= new ArrayDeque<>();
                        slotFutures.addLast(slotFuture1);
                        slotFutures.addLast(slotFuture2);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
index 7d2369f..5c48b56 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.TestLogger;
@@ -183,7 +183,7 @@ public class ExecutionGraphPartitionReleaseTest extends 
TestLogger {
                        new Configuration(),
                        scheduledExecutorService,
                        mainThreadExecutor.getMainThreadExecutor(),
-                       new TestingSlotProvider(ignored -> 
CompletableFuture.completedFuture(new TestingLogicalSlot())),
+                       new TestingSlotProvider(ignored -> 
CompletableFuture.completedFuture(new 
TestingLogicalSlotBuilder().createTestingLogicalSlot())),
                        
ExecutionGraphPartitionReleaseTest.class.getClassLoader(),
                        new StandaloneCheckpointRecoveryFactory(),
                        AkkaUtils.getDefaultTimeout(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 4c59b52..3850a70 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -32,7 +32,6 @@ import 
org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SimpleSlotContext;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -48,10 +47,10 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
@@ -62,7 +61,6 @@ import org.junit.After;
 import org.junit.Test;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import java.net.InetAddress;
 import java.util.Set;
@@ -444,16 +442,15 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                
executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
                executionGraph.scheduleForExecution();
 
-               final CompletableFuture<?> releaseFuture = new 
CompletableFuture<>();
-
-               final TestingLogicalSlot slot = 
createTestingSlot(releaseFuture);
+               final TestingLogicalSlot slot = createTestingSlot();
+               final CompletableFuture<?> releaseFuture = 
slot.getReleaseFuture();
                slotFuture1.complete(slot);
 
                // cancel should change the state of all executions to CANCELLED
                executionGraph.cancel();
 
                // complete the now CANCELLED execution --> this should cause a 
failure
-               slotFuture2.complete(new TestingLogicalSlot());
+               slotFuture2.complete(new 
TestingLogicalSlotBuilder().createTestingLogicalSlot());
 
                Thread.sleep(1L);
                // release the first slot to finish the cancellation
@@ -650,14 +647,9 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
        }
 
        @Nonnull
-       private static TestingLogicalSlot createTestingSlot(@Nullable 
CompletableFuture<?> releaseFuture) {
-               return new TestingLogicalSlot(
-                       new LocalTaskManagerLocation(),
-                       new SimpleAckingTaskManagerGateway(),
-                       0,
-                       new AllocationID(),
-                       new SlotRequestId(),
-                       new SlotSharingGroupId(),
-                       releaseFuture);
+       private static TestingLogicalSlot createTestingSlot() {
+               return new TestingLogicalSlotBuilder()
+                       .setAutomaticallyCompleteReleaseFuture(false)
+                       .createTestingLogicalSlot();
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 6c2e02c..25d169c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -50,7 +50,7 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
@@ -555,7 +555,7 @@ public class ExecutionGraphTestUtils {
                private Time rpcTimeout = AkkaUtils.getDefaultTimeout();
                private CheckpointRecoveryFactory checkpointRecoveryFactory = 
new StandaloneCheckpointRecoveryFactory();
                private ClassLoader classLoader = getClass().getClassLoader();
-               private SlotProvider slotProvider = new 
TestingSlotProvider(slotRequestId -> CompletableFuture.completedFuture(new 
TestingLogicalSlot()));
+               private SlotProvider slotProvider = new 
TestingSlotProvider(slotRequestId -> CompletableFuture.completedFuture(new 
TestingLogicalSlotBuilder().createTestingLogicalSlot()));
                private Executor ioExecutor = TestingUtils.defaultExecutor();
                private ScheduledExecutorService futureExecutor = 
TestingUtils.defaultExecutor();
                private Configuration jobMasterConfig = new Configuration();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 433e3f6..6df8366 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -33,7 +33,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
@@ -112,7 +112,7 @@ public class ExecutionTest extends TestLogger {
                        0,
                        new SimpleAckingTaskManagerGateway());
 
-               final LogicalSlot otherSlot = new TestingLogicalSlot();
+               final LogicalSlot otherSlot = new 
TestingLogicalSlotBuilder().createTestingLogicalSlot();
 
                CompletableFuture<Execution> allocationFuture = 
execution.allocateResourcesForExecution(
                        executionGraph.getSlotProviderStrategy(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index c99684e..9172348 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.TestLogger;
@@ -120,7 +120,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
                                        AkkaUtils.getDefaultTimeout());
 
-                       LogicalSlot slot = new TestingLogicalSlot(new 
CancelSequenceSimpleAckingTaskManagerGateway(1));
+                       LogicalSlot slot = new 
TestingLogicalSlotBuilder().setTaskManagerGateway(new 
CancelSequenceSimpleAckingTaskManagerGateway(1)).createTestingLogicalSlot();
 
                        setVertexResource(vertex, slot);
                        setVertexState(vertex, ExecutionState.RUNNING);
@@ -155,7 +155,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
                                        AkkaUtils.getDefaultTimeout());
 
-                       LogicalSlot slot = new TestingLogicalSlot(new 
CancelSequenceSimpleAckingTaskManagerGateway(1));
+                       LogicalSlot slot = new 
TestingLogicalSlotBuilder().setTaskManagerGateway(new 
CancelSequenceSimpleAckingTaskManagerGateway(1)).createTestingLogicalSlot();
 
                        setVertexResource(vertex, slot);
                        setVertexState(vertex, ExecutionState.RUNNING);
@@ -198,7 +198,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
                                        AkkaUtils.getDefaultTimeout());
 
-                       LogicalSlot slot = new TestingLogicalSlot(new 
CancelSequenceSimpleAckingTaskManagerGateway(1));
+                       LogicalSlot slot = new 
TestingLogicalSlotBuilder().setTaskManagerGateway(new 
CancelSequenceSimpleAckingTaskManagerGateway(1)).createTestingLogicalSlot();
 
                        setVertexResource(vertex, slot);
                        setVertexState(vertex, ExecutionState.RUNNING);
@@ -228,7 +228,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
                                        AkkaUtils.getDefaultTimeout());
 
-                       LogicalSlot slot = new TestingLogicalSlot(new 
CancelSequenceSimpleAckingTaskManagerGateway(0));
+                       LogicalSlot slot = new 
TestingLogicalSlotBuilder().setTaskManagerGateway(new 
CancelSequenceSimpleAckingTaskManagerGateway(0)).createTestingLogicalSlot();
 
                        setVertexResource(vertex, slot);
                        setVertexState(vertex, ExecutionState.RUNNING);
@@ -305,7 +305,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
                        // the scheduler (or any caller) needs to know that the 
slot should be released
                        try {
 
-                               final LogicalSlot slot = new 
TestingLogicalSlot();
+                               final LogicalSlot slot = new 
TestingLogicalSlotBuilder().createTestingLogicalSlot();
 
                                vertex.deployToSlot(slot);
                                fail("Method should throw an exception");
@@ -347,7 +347,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
                                                AkkaUtils.getDefaultTimeout());
                                setVertexState(vertex, 
ExecutionState.CANCELING);
 
-                               final LogicalSlot slot = new 
TestingLogicalSlot();
+                               final LogicalSlot slot = new 
TestingLogicalSlotBuilder().createTestingLogicalSlot();
 
                                vertex.deployToSlot(slot);
                                fail("Method should throw an exception");
@@ -361,7 +361,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
                                ExecutionVertex vertex = new 
ExecutionVertex(ejv, 0, new IntermediateResult[0],
                                                AkkaUtils.getDefaultTimeout());
 
-                               final LogicalSlot slot = new 
TestingLogicalSlot();
+                               final LogicalSlot slot = new 
TestingLogicalSlotBuilder().createTestingLogicalSlot();
 
                                setVertexResource(vertex, slot);
                                setVertexState(vertex, 
ExecutionState.CANCELING);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 34eabe8..6093000 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
@@ -64,7 +65,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger 
{
 
                        final ExecutionJobVertex ejv = getExecutionVertex(jid);
 
-                       final LogicalSlot slot = new TestingLogicalSlot();
+                       final LogicalSlot slot = new 
TestingLogicalSlotBuilder().createTestingLogicalSlot();
 
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
                                AkkaUtils.getDefaultTimeout());
@@ -98,7 +99,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger 
{
 
                        final ExecutionJobVertex ejv = getExecutionVertex(jid, 
new DirectScheduledExecutorService());
 
-                       final LogicalSlot slot = new TestingLogicalSlot();
+                       final LogicalSlot slot = new 
TestingLogicalSlotBuilder().createTestingLogicalSlot();
 
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
                                AkkaUtils.getDefaultTimeout());
@@ -137,7 +138,7 @@ public class ExecutionVertexDeploymentTest extends 
TestLogger {
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
                                AkkaUtils.getDefaultTimeout());
 
-                       final LogicalSlot slot = new TestingLogicalSlot();
+                       final LogicalSlot slot = new 
TestingLogicalSlotBuilder().createTestingLogicalSlot();
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
 
@@ -179,7 +180,7 @@ public class ExecutionVertexDeploymentTest extends 
TestLogger {
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
                                AkkaUtils.getDefaultTimeout());
 
-                       final LogicalSlot slot = new TestingLogicalSlot(new 
SubmitFailingSimpleAckingTaskManagerGateway());
+                       final LogicalSlot slot = new 
TestingLogicalSlotBuilder().setTaskManagerGateway(new 
SubmitFailingSimpleAckingTaskManagerGateway()).createTestingLogicalSlot();
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
 
@@ -206,7 +207,7 @@ public class ExecutionVertexDeploymentTest extends 
TestLogger {
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
                                AkkaUtils.getDefaultTimeout());
 
-                       final LogicalSlot slot = new TestingLogicalSlot(new 
SubmitFailingSimpleAckingTaskManagerGateway());
+                       final LogicalSlot slot = new 
TestingLogicalSlotBuilder().setTaskManagerGateway(new 
SubmitFailingSimpleAckingTaskManagerGateway()).createTestingLogicalSlot();
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
 
@@ -244,7 +245,7 @@ public class ExecutionVertexDeploymentTest extends 
TestLogger {
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
                                AkkaUtils.getDefaultTimeout());
 
-                       TestingLogicalSlot testingLogicalSlot = new 
TestingLogicalSlot(new SubmitBlockingSimpleAckingTaskManagerGateway());
+                       TestingLogicalSlot testingLogicalSlot = new 
TestingLogicalSlotBuilder().setTaskManagerGateway(new 
SubmitBlockingSimpleAckingTaskManagerGateway()).createTestingLogicalSlot();
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
                        vertex.deployToSlot(testingLogicalSlot);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index f7a2600..cf55f70 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -46,7 +46,7 @@ public class ExecutionVertexSchedulingTest extends TestLogger 
{
                                        AkkaUtils.getDefaultTimeout());
 
                        // a slot than cannot be deployed to
-                       final LogicalSlot slot = new TestingLogicalSlot();
+                       final LogicalSlot slot = new 
TestingLogicalSlotBuilder().createTestingLogicalSlot();
                        slot.releaseSlot(new Exception("Test Exception"));
 
                        assertFalse(slot.isAlive());
@@ -78,7 +78,7 @@ public class ExecutionVertexSchedulingTest extends TestLogger 
{
                                        AkkaUtils.getDefaultTimeout());
 
                        // a slot than cannot be deployed to
-                       final LogicalSlot slot = new TestingLogicalSlot();
+                       final LogicalSlot slot = new 
TestingLogicalSlotBuilder().createTestingLogicalSlot();
                        slot.releaseSlot(new Exception("Test Exception"));
 
                        assertFalse(slot.isAlive());
@@ -113,7 +113,7 @@ public class ExecutionVertexSchedulingTest extends 
TestLogger {
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
                                        AkkaUtils.getDefaultTimeout());
 
-                       final LogicalSlot slot = new TestingLogicalSlot();
+                       final LogicalSlot slot = new 
TestingLogicalSlotBuilder().createTestingLogicalSlot();
 
                        CompletableFuture<LogicalSlot> future = 
CompletableFuture.completedFuture(slot);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
index 76f7daa..15ec343 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -99,15 +100,14 @@ public class SimpleSlotProvider implements SlotProvider, 
SlotOwner {
                                slot = slots.removeFirst();
                        }
                        if (slot != null) {
-                               TestingLogicalSlot result = new 
TestingLogicalSlot(
-                                       slot.getTaskManagerLocation(),
-                                       slot.getTaskManagerGateway(),
-                                       slot.getPhysicalSlotNumber(),
-                                       slot.getAllocationId(),
-                                       slotRequestId,
-                                       new SlotSharingGroupId(),
-                                       null,
-                                       this);
+                               TestingLogicalSlot result = new 
TestingLogicalSlotBuilder()
+                                       
.setTaskManagerLocation(slot.getTaskManagerLocation())
+                                       
.setTaskManagerGateway(slot.getTaskManagerGateway())
+                                       
.setSlotNumber(slot.getPhysicalSlotNumber())
+                                       .setAllocationId(slot.getAllocationId())
+                                       .setSlotRequestId(slotRequestId)
+                                       .setSlotOwner(this)
+                                       .createTestingLogicalSlot();
                                allocatedSlots.put(slotRequestId, slot);
                                return 
CompletableFuture.completedFuture(result);
                        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
index 5060478..3304c96 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java
@@ -19,11 +19,9 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 
@@ -47,10 +45,8 @@ public class TestingLogicalSlot implements LogicalSlot {
 
        private final CompletableFuture<?> releaseFuture;
 
-       @Nullable
-       private final CompletableFuture<?> customReleaseFuture;
+       private final boolean automaticallyCompleteReleaseFuture;
 
-       @Nullable
        private final SlotOwner slotOwner;
 
        private final AllocationID allocationId;
@@ -59,49 +55,15 @@ public class TestingLogicalSlot implements LogicalSlot {
 
        private final SlotSharingGroupId slotSharingGroupId;
 
-       public TestingLogicalSlot() {
-               this(new SimpleAckingTaskManagerGateway());
-       }
-
-       public TestingLogicalSlot(TaskManagerGateway taskManagerGateway) {
-               this(
-                       new LocalTaskManagerLocation(),
-                       taskManagerGateway,
-                       0,
-                       new AllocationID(),
-                       new SlotRequestId(),
-                       new SlotSharingGroupId(),
-                       null);
-       }
-
-       public TestingLogicalSlot(
+       TestingLogicalSlot(
                        TaskManagerLocation taskManagerLocation,
                        TaskManagerGateway taskManagerGateway,
                        int slotNumber,
                        AllocationID allocationId,
                        SlotRequestId slotRequestId,
                        SlotSharingGroupId slotSharingGroupId,
-                       @Nullable CompletableFuture<?> customReleaseFuture) {
-               this(
-                       taskManagerLocation,
-                       taskManagerGateway,
-                       slotNumber,
-                       allocationId,
-                       slotRequestId,
-                       slotSharingGroupId,
-                       customReleaseFuture,
-                       null);
-       }
-
-       public TestingLogicalSlot(
-                       TaskManagerLocation taskManagerLocation,
-                       TaskManagerGateway taskManagerGateway,
-                       int slotNumber,
-                       AllocationID allocationId,
-                       SlotRequestId slotRequestId,
-                       SlotSharingGroupId slotSharingGroupId,
-                       @Nullable CompletableFuture<?> customReleaseFuture,
-                       @Nullable SlotOwner slotOwner) {
+                       boolean automaticallyCompleteReleaseFuture,
+                       SlotOwner slotOwner) {
 
                this.taskManagerLocation = 
Preconditions.checkNotNull(taskManagerLocation);
                this.taskManagerGateway = 
Preconditions.checkNotNull(taskManagerGateway);
@@ -111,7 +73,7 @@ public class TestingLogicalSlot implements LogicalSlot {
                this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
                this.slotSharingGroupId = 
Preconditions.checkNotNull(slotSharingGroupId);
                this.releaseFuture = new CompletableFuture<>();
-               this.customReleaseFuture = customReleaseFuture;
+               this.automaticallyCompleteReleaseFuture = 
automaticallyCompleteReleaseFuture;
                this.slotOwner = slotOwner;
        }
 
@@ -132,11 +94,7 @@ public class TestingLogicalSlot implements LogicalSlot {
 
        @Override
        public boolean isAlive() {
-               if (customReleaseFuture != null) {
-                       return !customReleaseFuture.isDone();
-               } else {
-                       return !releaseFuture.isDone();
-               }
+               return !releaseFuture.isDone();
        }
 
        @Override
@@ -152,16 +110,13 @@ public class TestingLogicalSlot implements LogicalSlot {
 
        @Override
        public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
-               if (slotOwner != null) {
-                       slotOwner.returnLogicalSlot(this);
-               }
+               slotOwner.returnLogicalSlot(this);
 
-               if (customReleaseFuture != null) {
-                       return customReleaseFuture;
-               } else {
+               if (automaticallyCompleteReleaseFuture) {
                        releaseFuture.complete(null);
-                       return releaseFuture;
                }
+
+               return releaseFuture;
        }
 
        @Override
@@ -184,4 +139,8 @@ public class TestingLogicalSlot implements LogicalSlot {
        public SlotSharingGroupId getSlotSharingGroupId() {
                return slotSharingGroupId;
        }
+
+       public CompletableFuture<?> getReleaseFuture() {
+               return releaseFuture;
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java
new file mode 100644
index 0000000..f703c91
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+/**
+ * Builder for the {@link TestingLogicalSlot}.
+ */
+public class TestingLogicalSlotBuilder {
+       private TaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
+       private TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+       private int slotNumber = 0;
+       private AllocationID allocationId = new AllocationID();
+       private SlotRequestId slotRequestId = new SlotRequestId();
+       private SlotSharingGroupId slotSharingGroupId = new 
SlotSharingGroupId();
+       private SlotOwner slotOwner = new DummySlotOwner();
+       private boolean automaticallyCompleteReleaseFuture = true;
+
+       public TestingLogicalSlotBuilder 
setTaskManagerGateway(TaskManagerGateway taskManagerGateway) {
+               this.taskManagerGateway = taskManagerGateway;
+               return this;
+       }
+
+       public TestingLogicalSlotBuilder 
setTaskManagerLocation(TaskManagerLocation taskManagerLocation) {
+               this.taskManagerLocation = taskManagerLocation;
+               return this;
+       }
+
+       public TestingLogicalSlotBuilder setSlotNumber(int slotNumber) {
+               this.slotNumber = slotNumber;
+               return this;
+       }
+
+       public TestingLogicalSlotBuilder setAllocationId(AllocationID 
allocationId) {
+               this.allocationId = allocationId;
+               return this;
+       }
+
+       public TestingLogicalSlotBuilder setSlotRequestId(SlotRequestId 
slotRequestId) {
+               this.slotRequestId = slotRequestId;
+               return this;
+       }
+
+       public TestingLogicalSlotBuilder 
setSlotSharingGroupId(SlotSharingGroupId slotSharingGroupId) {
+               this.slotSharingGroupId = slotSharingGroupId;
+               return this;
+       }
+
+       public TestingLogicalSlotBuilder 
setAutomaticallyCompleteReleaseFuture(boolean 
automaticallyCompleteReleaseFuture) {
+               this.automaticallyCompleteReleaseFuture = 
automaticallyCompleteReleaseFuture;
+               return this;
+       }
+
+       public TestingLogicalSlotBuilder setSlotOwner(SlotOwner slotOwner) {
+               this.slotOwner = slotOwner;
+               return this;
+       }
+
+       public TestingLogicalSlot createTestingLogicalSlot() {
+               return new TestingLogicalSlot(
+                       taskManagerLocation,
+                       taskManagerGateway,
+                       slotNumber,
+                       allocationId,
+                       slotRequestId,
+                       slotSharingGroupId,
+                       automaticallyCompleteReleaseFuture,
+                       slotOwner);
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
index 4c0dd01..4088f33 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
@@ -303,7 +303,7 @@ public class DefaultExecutionSlotAllocatorTest extends 
TestLogger {
                        if (slotAllocationDisabled) {
                                return new CompletableFuture<>();
                        } else {
-                               return CompletableFuture.completedFuture(new 
TestingLogicalSlot());
+                               return CompletableFuture.completedFuture(new 
TestingLogicalSlotBuilder().createTestingLogicalSlot());
                        }
                }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphToInputsLocationsRetrieverAdapterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphToInputsLocationsRetrieverAdapterTest.java
index a92ff2c..d9eae6b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphToInputsLocationsRetrieverAdapterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphToInputsLocationsRetrieverAdapterTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.TestLogger;
@@ -110,7 +111,7 @@ public class 
ExecutionGraphToInputsLocationsRetrieverAdapterTest extends TestLog
        public void testGetTaskManagerLocationWhenScheduled() throws Exception {
                final JobVertex jobVertex = 
ExecutionGraphTestUtils.createNoOpVertex(1);
 
-               final TestingLogicalSlot testingLogicalSlot = new 
TestingLogicalSlot();
+               final TestingLogicalSlot testingLogicalSlot = new 
TestingLogicalSlotBuilder().createTestingLogicalSlot();
                final ExecutionGraph eg = 
ExecutionGraphTestUtils.createSimpleTestGraph(new JobID(), jobVertex);
                final ExecutionGraphToInputsLocationsRetrieverAdapter 
inputsLocationsRetriever =
                                new 
ExecutionGraphToInputsLocationsRetrieverAdapter(eg);

Reply via email to