This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c1ffb7204a60a9b2aad9b73881a0c4d82e5f9324 Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Fri Aug 2 19:01:18 2019 +0200 [hotfix] Introduce TestingLogicalSlotBuilder --- .../ExecutionGraphDeploymentTest.java | 10 +-- .../executiongraph/ExecutionGraphMetricsTest.java | 6 +- .../ExecutionGraphPartitionReleaseTest.java | 4 +- .../ExecutionGraphSchedulingTest.java | 24 ++---- .../executiongraph/ExecutionGraphTestUtils.java | 4 +- .../runtime/executiongraph/ExecutionTest.java | 4 +- .../executiongraph/ExecutionVertexCancelTest.java | 16 ++-- .../ExecutionVertexDeploymentTest.java | 13 +-- .../ExecutionVertexSchedulingTest.java | 8 +- .../executiongraph/utils/SimpleSlotProvider.java | 18 ++--- .../runtime/jobmaster/TestingLogicalSlot.java | 69 ++++------------ .../jobmaster/TestingLogicalSlotBuilder.java | 93 ++++++++++++++++++++++ .../DefaultExecutionSlotAllocatorTest.java | 4 +- ...GraphToInputsLocationsRetrieverAdapterTest.java | 3 +- 14 files changed, 161 insertions(+), 115 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index d10fcbd..13f74f4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -58,7 +58,7 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway; import org.apache.flink.runtime.jobmaster.SlotOwner; import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.operators.BatchTask; @@ -199,7 +199,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger { tdd.complete(taskDeploymentDescriptor); })); - final LogicalSlot slot = new TestingLogicalSlot(taskManagerGateway); + final LogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(taskManagerGateway).createTestingLogicalSlot(); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -443,7 +443,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger { final ArrayDeque<CompletableFuture<LogicalSlot>> slotFutures = new ArrayDeque<>(); for (int i = 0; i < dop1; i++) { - slotFutures.addLast(CompletableFuture.completedFuture(new TestingLogicalSlot())); + slotFutures.addLast(CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot())); } final SlotProvider slotProvider = new TestingSlotProvider(ignore -> slotFutures.removeFirst()); @@ -507,7 +507,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger { final ArrayDeque<CompletableFuture<LogicalSlot>> slotFutures = new ArrayDeque<>(); for (int i = 0; i < dop1 + dop2; i++) { - slotFutures.addLast(CompletableFuture.completedFuture(new TestingLogicalSlot())); + slotFutures.addLast(CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot())); } final SlotProvider slotProvider = new TestingSlotProvider(ignore -> slotFutures.removeFirst()); @@ -706,7 +706,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger { Collections.shuffle(shuffledFutures); for (CompletableFuture<LogicalSlot> slotFuture : shuffledFutures) { - slotFuture.complete(new TestingLogicalSlot(taskManagerGateway)); + slotFuture.complete(new TestingLogicalSlotBuilder().setTaskManagerGateway(taskManagerGateway).createTestingLogicalSlot()); } final List<ExecutionAttemptID> submittedTasks = new ArrayList<>(numberTasks); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java index 72d3416..7389296 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmaster.LogicalSlot; -import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.util.SerializedValue; @@ -71,8 +71,8 @@ public class ExecutionGraphMetricsTest extends TestLogger { Configuration jobConfig = new Configuration(); Time timeout = Time.seconds(10L); - CompletableFuture<LogicalSlot> slotFuture1 = CompletableFuture.completedFuture(new TestingLogicalSlot()); - CompletableFuture<LogicalSlot> slotFuture2 = CompletableFuture.completedFuture(new TestingLogicalSlot()); + CompletableFuture<LogicalSlot> slotFuture1 = CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot()); + CompletableFuture<LogicalSlot> slotFuture2 = CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot()); ArrayDeque<CompletableFuture<LogicalSlot>> slotFutures = new ArrayDeque<>(); slotFutures.addLast(slotFuture1); slotFutures.addLast(slotFuture2); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java index 7d2369f..5c48b56 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java @@ -35,7 +35,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.util.TestLogger; @@ -183,7 +183,7 @@ public class ExecutionGraphPartitionReleaseTest extends TestLogger { new Configuration(), scheduledExecutorService, mainThreadExecutor.getMainThreadExecutor(), - new TestingSlotProvider(ignored -> CompletableFuture.completedFuture(new TestingLogicalSlot())), + new TestingSlotProvider(ignored -> CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot())), ExecutionGraphPartitionReleaseTest.class.getClassLoader(), new StandaloneCheckpointRecoveryFactory(), AkkaUtils.getDefaultTimeout(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java index 4c59b52..3850a70 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java @@ -32,7 +32,6 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SimpleSlotContext; -import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; @@ -48,10 +47,10 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotOwner; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.shuffle.NettyShuffleMaster; -import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; @@ -62,7 +61,6 @@ import org.junit.After; import org.junit.Test; import javax.annotation.Nonnull; -import javax.annotation.Nullable; import java.net.InetAddress; import java.util.Set; @@ -444,16 +442,15 @@ public class ExecutionGraphSchedulingTest extends TestLogger { executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread()); executionGraph.scheduleForExecution(); - final CompletableFuture<?> releaseFuture = new CompletableFuture<>(); - - final TestingLogicalSlot slot = createTestingSlot(releaseFuture); + final TestingLogicalSlot slot = createTestingSlot(); + final CompletableFuture<?> releaseFuture = slot.getReleaseFuture(); slotFuture1.complete(slot); // cancel should change the state of all executions to CANCELLED executionGraph.cancel(); // complete the now CANCELLED execution --> this should cause a failure - slotFuture2.complete(new TestingLogicalSlot()); + slotFuture2.complete(new TestingLogicalSlotBuilder().createTestingLogicalSlot()); Thread.sleep(1L); // release the first slot to finish the cancellation @@ -650,14 +647,9 @@ public class ExecutionGraphSchedulingTest extends TestLogger { } @Nonnull - private static TestingLogicalSlot createTestingSlot(@Nullable CompletableFuture<?> releaseFuture) { - return new TestingLogicalSlot( - new LocalTaskManagerLocation(), - new SimpleAckingTaskManagerGateway(), - 0, - new AllocationID(), - new SlotRequestId(), - new SlotSharingGroupId(), - releaseFuture); + private static TestingLogicalSlot createTestingSlot() { + return new TestingLogicalSlotBuilder() + .setAutomaticallyCompleteReleaseFuture(false) + .createTestingLogicalSlot(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 6c2e02c..25d169c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -50,7 +50,7 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.LogicalSlot; -import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.shuffle.ShuffleMaster; @@ -555,7 +555,7 @@ public class ExecutionGraphTestUtils { private Time rpcTimeout = AkkaUtils.getDefaultTimeout(); private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory(); private ClassLoader classLoader = getClass().getClassLoader(); - private SlotProvider slotProvider = new TestingSlotProvider(slotRequestId -> CompletableFuture.completedFuture(new TestingLogicalSlot())); + private SlotProvider slotProvider = new TestingSlotProvider(slotRequestId -> CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot())); private Executor ioExecutor = TestingUtils.defaultExecutor(); private ScheduledExecutorService futureExecutor = TestingUtils.defaultExecutor(); private Configuration jobMasterConfig = new Configuration(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java index 433e3f6..6df8366 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java @@ -33,7 +33,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotOwner; import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; @@ -112,7 +112,7 @@ public class ExecutionTest extends TestLogger { 0, new SimpleAckingTaskManagerGateway()); - final LogicalSlot otherSlot = new TestingLogicalSlot(); + final LogicalSlot otherSlot = new TestingLogicalSlotBuilder().createTestingLogicalSlot(); CompletableFuture<Execution> allocationFuture = execution.allocateResourcesForExecution( executionGraph.getSlotProviderStrategy(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java index c99684e..9172348 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java @@ -27,7 +27,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; import org.apache.flink.runtime.jobmaster.LogicalSlot; -import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.util.TestLogger; @@ -120,7 +120,7 @@ public class ExecutionVertexCancelTest extends TestLogger { final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); - LogicalSlot slot = new TestingLogicalSlot(new CancelSequenceSimpleAckingTaskManagerGateway(1)); + LogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new CancelSequenceSimpleAckingTaskManagerGateway(1)).createTestingLogicalSlot(); setVertexResource(vertex, slot); setVertexState(vertex, ExecutionState.RUNNING); @@ -155,7 +155,7 @@ public class ExecutionVertexCancelTest extends TestLogger { final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); - LogicalSlot slot = new TestingLogicalSlot(new CancelSequenceSimpleAckingTaskManagerGateway(1)); + LogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new CancelSequenceSimpleAckingTaskManagerGateway(1)).createTestingLogicalSlot(); setVertexResource(vertex, slot); setVertexState(vertex, ExecutionState.RUNNING); @@ -198,7 +198,7 @@ public class ExecutionVertexCancelTest extends TestLogger { final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); - LogicalSlot slot = new TestingLogicalSlot(new CancelSequenceSimpleAckingTaskManagerGateway(1)); + LogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new CancelSequenceSimpleAckingTaskManagerGateway(1)).createTestingLogicalSlot(); setVertexResource(vertex, slot); setVertexState(vertex, ExecutionState.RUNNING); @@ -228,7 +228,7 @@ public class ExecutionVertexCancelTest extends TestLogger { final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); - LogicalSlot slot = new TestingLogicalSlot(new CancelSequenceSimpleAckingTaskManagerGateway(0)); + LogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new CancelSequenceSimpleAckingTaskManagerGateway(0)).createTestingLogicalSlot(); setVertexResource(vertex, slot); setVertexState(vertex, ExecutionState.RUNNING); @@ -305,7 +305,7 @@ public class ExecutionVertexCancelTest extends TestLogger { // the scheduler (or any caller) needs to know that the slot should be released try { - final LogicalSlot slot = new TestingLogicalSlot(); + final LogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot(); vertex.deployToSlot(slot); fail("Method should throw an exception"); @@ -347,7 +347,7 @@ public class ExecutionVertexCancelTest extends TestLogger { AkkaUtils.getDefaultTimeout()); setVertexState(vertex, ExecutionState.CANCELING); - final LogicalSlot slot = new TestingLogicalSlot(); + final LogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot(); vertex.deployToSlot(slot); fail("Method should throw an exception"); @@ -361,7 +361,7 @@ public class ExecutionVertexCancelTest extends TestLogger { ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); - final LogicalSlot slot = new TestingLogicalSlot(); + final LogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot(); setVertexResource(vertex, slot); setVertexState(vertex, ExecutionState.CANCELING); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java index 34eabe8..6093000 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; @@ -64,7 +65,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger { final ExecutionJobVertex ejv = getExecutionVertex(jid); - final LogicalSlot slot = new TestingLogicalSlot(); + final LogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot(); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); @@ -98,7 +99,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger { final ExecutionJobVertex ejv = getExecutionVertex(jid, new DirectScheduledExecutorService()); - final LogicalSlot slot = new TestingLogicalSlot(); + final LogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot(); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); @@ -137,7 +138,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger { final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); - final LogicalSlot slot = new TestingLogicalSlot(); + final LogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot(); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -179,7 +180,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger { final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); - final LogicalSlot slot = new TestingLogicalSlot(new SubmitFailingSimpleAckingTaskManagerGateway()); + final LogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SubmitFailingSimpleAckingTaskManagerGateway()).createTestingLogicalSlot(); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -206,7 +207,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger { final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); - final LogicalSlot slot = new TestingLogicalSlot(new SubmitFailingSimpleAckingTaskManagerGateway()); + final LogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SubmitFailingSimpleAckingTaskManagerGateway()).createTestingLogicalSlot(); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -244,7 +245,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger { final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); - TestingLogicalSlot testingLogicalSlot = new TestingLogicalSlot(new SubmitBlockingSimpleAckingTaskManagerGateway()); + TestingLogicalSlot testingLogicalSlot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SubmitBlockingSimpleAckingTaskManagerGateway()).createTestingLogicalSlot(); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); vertex.deployToSlot(testingLogicalSlot); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java index f7a2600..cf55f70 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java @@ -23,7 +23,7 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; import org.apache.flink.runtime.jobmaster.LogicalSlot; -import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -46,7 +46,7 @@ public class ExecutionVertexSchedulingTest extends TestLogger { AkkaUtils.getDefaultTimeout()); // a slot than cannot be deployed to - final LogicalSlot slot = new TestingLogicalSlot(); + final LogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot(); slot.releaseSlot(new Exception("Test Exception")); assertFalse(slot.isAlive()); @@ -78,7 +78,7 @@ public class ExecutionVertexSchedulingTest extends TestLogger { AkkaUtils.getDefaultTimeout()); // a slot than cannot be deployed to - final LogicalSlot slot = new TestingLogicalSlot(); + final LogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot(); slot.releaseSlot(new Exception("Test Exception")); assertFalse(slot.isAlive()); @@ -113,7 +113,7 @@ public class ExecutionVertexSchedulingTest extends TestLogger { final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); - final LogicalSlot slot = new TestingLogicalSlot(); + final LogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot(); CompletableFuture<LogicalSlot> future = CompletableFuture.completedFuture(slot); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java index 76f7daa..15ec343 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobmaster.SlotContext; import org.apache.flink.runtime.jobmaster.SlotOwner; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.FlinkRuntimeException; @@ -99,15 +100,14 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner { slot = slots.removeFirst(); } if (slot != null) { - TestingLogicalSlot result = new TestingLogicalSlot( - slot.getTaskManagerLocation(), - slot.getTaskManagerGateway(), - slot.getPhysicalSlotNumber(), - slot.getAllocationId(), - slotRequestId, - new SlotSharingGroupId(), - null, - this); + TestingLogicalSlot result = new TestingLogicalSlotBuilder() + .setTaskManagerLocation(slot.getTaskManagerLocation()) + .setTaskManagerGateway(slot.getTaskManagerGateway()) + .setSlotNumber(slot.getPhysicalSlotNumber()) + .setAllocationId(slot.getAllocationId()) + .setSlotRequestId(slotRequestId) + .setSlotOwner(this) + .createTestingLogicalSlot(); allocatedSlots.put(slotRequestId, slot); return CompletableFuture.completedFuture(result); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java index 5060478..3304c96 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java @@ -19,11 +19,9 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobmanager.scheduler.Locality; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; -import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.Preconditions; @@ -47,10 +45,8 @@ public class TestingLogicalSlot implements LogicalSlot { private final CompletableFuture<?> releaseFuture; - @Nullable - private final CompletableFuture<?> customReleaseFuture; + private final boolean automaticallyCompleteReleaseFuture; - @Nullable private final SlotOwner slotOwner; private final AllocationID allocationId; @@ -59,49 +55,15 @@ public class TestingLogicalSlot implements LogicalSlot { private final SlotSharingGroupId slotSharingGroupId; - public TestingLogicalSlot() { - this(new SimpleAckingTaskManagerGateway()); - } - - public TestingLogicalSlot(TaskManagerGateway taskManagerGateway) { - this( - new LocalTaskManagerLocation(), - taskManagerGateway, - 0, - new AllocationID(), - new SlotRequestId(), - new SlotSharingGroupId(), - null); - } - - public TestingLogicalSlot( + TestingLogicalSlot( TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, int slotNumber, AllocationID allocationId, SlotRequestId slotRequestId, SlotSharingGroupId slotSharingGroupId, - @Nullable CompletableFuture<?> customReleaseFuture) { - this( - taskManagerLocation, - taskManagerGateway, - slotNumber, - allocationId, - slotRequestId, - slotSharingGroupId, - customReleaseFuture, - null); - } - - public TestingLogicalSlot( - TaskManagerLocation taskManagerLocation, - TaskManagerGateway taskManagerGateway, - int slotNumber, - AllocationID allocationId, - SlotRequestId slotRequestId, - SlotSharingGroupId slotSharingGroupId, - @Nullable CompletableFuture<?> customReleaseFuture, - @Nullable SlotOwner slotOwner) { + boolean automaticallyCompleteReleaseFuture, + SlotOwner slotOwner) { this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation); this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway); @@ -111,7 +73,7 @@ public class TestingLogicalSlot implements LogicalSlot { this.slotRequestId = Preconditions.checkNotNull(slotRequestId); this.slotSharingGroupId = Preconditions.checkNotNull(slotSharingGroupId); this.releaseFuture = new CompletableFuture<>(); - this.customReleaseFuture = customReleaseFuture; + this.automaticallyCompleteReleaseFuture = automaticallyCompleteReleaseFuture; this.slotOwner = slotOwner; } @@ -132,11 +94,7 @@ public class TestingLogicalSlot implements LogicalSlot { @Override public boolean isAlive() { - if (customReleaseFuture != null) { - return !customReleaseFuture.isDone(); - } else { - return !releaseFuture.isDone(); - } + return !releaseFuture.isDone(); } @Override @@ -152,16 +110,13 @@ public class TestingLogicalSlot implements LogicalSlot { @Override public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) { - if (slotOwner != null) { - slotOwner.returnLogicalSlot(this); - } + slotOwner.returnLogicalSlot(this); - if (customReleaseFuture != null) { - return customReleaseFuture; - } else { + if (automaticallyCompleteReleaseFuture) { releaseFuture.complete(null); - return releaseFuture; } + + return releaseFuture; } @Override @@ -184,4 +139,8 @@ public class TestingLogicalSlot implements LogicalSlot { public SlotSharingGroupId getSlotSharingGroupId() { return slotSharingGroupId; } + + public CompletableFuture<?> getReleaseFuture() { + return releaseFuture; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java new file mode 100644 index 0000000..f703c91 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +/** + * Builder for the {@link TestingLogicalSlot}. + */ +public class TestingLogicalSlotBuilder { + private TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + private TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + private int slotNumber = 0; + private AllocationID allocationId = new AllocationID(); + private SlotRequestId slotRequestId = new SlotRequestId(); + private SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId(); + private SlotOwner slotOwner = new DummySlotOwner(); + private boolean automaticallyCompleteReleaseFuture = true; + + public TestingLogicalSlotBuilder setTaskManagerGateway(TaskManagerGateway taskManagerGateway) { + this.taskManagerGateway = taskManagerGateway; + return this; + } + + public TestingLogicalSlotBuilder setTaskManagerLocation(TaskManagerLocation taskManagerLocation) { + this.taskManagerLocation = taskManagerLocation; + return this; + } + + public TestingLogicalSlotBuilder setSlotNumber(int slotNumber) { + this.slotNumber = slotNumber; + return this; + } + + public TestingLogicalSlotBuilder setAllocationId(AllocationID allocationId) { + this.allocationId = allocationId; + return this; + } + + public TestingLogicalSlotBuilder setSlotRequestId(SlotRequestId slotRequestId) { + this.slotRequestId = slotRequestId; + return this; + } + + public TestingLogicalSlotBuilder setSlotSharingGroupId(SlotSharingGroupId slotSharingGroupId) { + this.slotSharingGroupId = slotSharingGroupId; + return this; + } + + public TestingLogicalSlotBuilder setAutomaticallyCompleteReleaseFuture(boolean automaticallyCompleteReleaseFuture) { + this.automaticallyCompleteReleaseFuture = automaticallyCompleteReleaseFuture; + return this; + } + + public TestingLogicalSlotBuilder setSlotOwner(SlotOwner slotOwner) { + this.slotOwner = slotOwner; + return this; + } + + public TestingLogicalSlot createTestingLogicalSlot() { + return new TestingLogicalSlot( + taskManagerLocation, + taskManagerGateway, + slotNumber, + allocationId, + slotRequestId, + slotSharingGroupId, + automaticallyCompleteReleaseFuture, + slotOwner); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java index 4c0dd01..4088f33 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; @@ -303,7 +303,7 @@ public class DefaultExecutionSlotAllocatorTest extends TestLogger { if (slotAllocationDisabled) { return new CompletableFuture<>(); } else { - return CompletableFuture.completedFuture(new TestingLogicalSlot()); + return CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphToInputsLocationsRetrieverAdapterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphToInputsLocationsRetrieverAdapterTest.java index a92ff2c..d9eae6b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphToInputsLocationsRetrieverAdapterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphToInputsLocationsRetrieverAdapterTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.TestLogger; @@ -110,7 +111,7 @@ public class ExecutionGraphToInputsLocationsRetrieverAdapterTest extends TestLog public void testGetTaskManagerLocationWhenScheduled() throws Exception { final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1); - final TestingLogicalSlot testingLogicalSlot = new TestingLogicalSlot(); + final TestingLogicalSlot testingLogicalSlot = new TestingLogicalSlotBuilder().createTestingLogicalSlot(); final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new JobID(), jobVertex); final ExecutionGraphToInputsLocationsRetrieverAdapter inputsLocationsRetriever = new ExecutionGraphToInputsLocationsRetrieverAdapter(eg);