This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fdc32cd732fb356494879f60a9227c230ccbc01a Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Wed Apr 24 17:22:48 2019 +0200 [FLINK-12322] Remove legacy ActorTaskManagerGateway Remove ActorTaskManagerGateway from InstanceManagerTest Remove ActorTaskManagerGateway from ExecutionGraphRestartTest Remove ActorTaskManagerGateway from ExecutionVertexCancelTest Remove ActorTaskManagerGateway from InstanceTest Remove ActorTaskManagerGateway from SimpleSlotTest Remove ActorTaskManagerGateway This closes #8255. --- .../jobmanager/slots/ActorTaskManagerGateway.java | 192 --------------------- .../executiongraph/ExecutionGraphRestartTest.java | 20 +-- .../executiongraph/ExecutionVertexCancelTest.java | 8 +- .../runtime/instance/InstanceManagerTest.java | 60 ++----- .../flink/runtime/instance/InstanceTest.java | 8 +- .../flink/runtime/instance/SimpleSlotTest.java | 4 +- 6 files changed, 28 insertions(+), 264 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java deleted file mode 100644 index 005ee04..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager.slots; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.blob.TransientBlobKey; -import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.PartitionInfo; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.messages.StackTraceSampleMessages; -import org.apache.flink.runtime.messages.StackTraceSampleResponse; -import org.apache.flink.runtime.messages.TaskManagerMessages; -import org.apache.flink.runtime.messages.TaskMessages; -import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete; -import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint; -import org.apache.flink.util.Preconditions; - -import java.util.concurrent.CompletableFuture; - -import scala.concurrent.duration.FiniteDuration; -import scala.reflect.ClassTag$; - -/** - * Implementation of the {@link TaskManagerGateway} for {@link ActorGateway}. - */ -public class ActorTaskManagerGateway implements TaskManagerGateway { - private final ActorGateway actorGateway; - - public ActorTaskManagerGateway(ActorGateway actorGateway) { - this.actorGateway = Preconditions.checkNotNull(actorGateway); - } - - public ActorGateway getActorGateway() { - return actorGateway; - } - - //------------------------------------------------------------------------------- - // Task manager rpc methods - //------------------------------------------------------------------------------- - - @Override - public String getAddress() { - return actorGateway.path(); - } - - @Override - public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample( - ExecutionAttemptID executionAttemptID, - int sampleId, - int numSamples, - Time delayBetweenSamples, - int maxStackTraceDepth, - Time timeout) { - Preconditions.checkNotNull(executionAttemptID); - Preconditions.checkArgument(numSamples > 0, "The number of samples must be greater than 0."); - Preconditions.checkNotNull(delayBetweenSamples); - Preconditions.checkArgument(maxStackTraceDepth >= 0, "The max stack trace depth must be greater or equal than 0."); - Preconditions.checkNotNull(timeout); - - scala.concurrent.Future<StackTraceSampleResponse> stackTraceSampleResponseFuture = actorGateway.ask( - new StackTraceSampleMessages.TriggerStackTraceSample( - sampleId, - executionAttemptID, - numSamples, - delayBetweenSamples, - maxStackTraceDepth), - new FiniteDuration(timeout.getSize(), timeout.getUnit())) - .mapTo(ClassTag$.MODULE$.<StackTraceSampleResponse>apply(StackTraceSampleResponse.class)); - - return FutureUtils.toJava(stackTraceSampleResponseFuture); - } - - @Override - public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) { - Preconditions.checkNotNull(tdd); - Preconditions.checkNotNull(timeout); - - scala.concurrent.Future<Acknowledge> submitResult = actorGateway.ask( - new TaskMessages.SubmitTask(tdd), - new FiniteDuration(timeout.getSize(), timeout.getUnit())) - .mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class)); - - return FutureUtils.toJava(submitResult); - } - - @Override - public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) { - Preconditions.checkNotNull(executionAttemptID); - Preconditions.checkNotNull(timeout); - - scala.concurrent.Future<Acknowledge> cancelResult = actorGateway.ask( - new TaskMessages.CancelTask(executionAttemptID), - new FiniteDuration(timeout.getSize(), timeout.getUnit())) - .mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class)); - - return FutureUtils.toJava(cancelResult); - } - - @Override - public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) { - Preconditions.checkNotNull(executionAttemptID); - Preconditions.checkNotNull(partitionInfos); - - TaskMessages.UpdatePartitionInfo updatePartitionInfoMessage = new TaskMessages.UpdateTaskMultiplePartitionInfos( - executionAttemptID, - partitionInfos); - - scala.concurrent.Future<Acknowledge> updatePartitionsResult = actorGateway.ask( - updatePartitionInfoMessage, - new FiniteDuration(timeout.getSize(), timeout.getUnit())) - .mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class)); - - return FutureUtils.toJava(updatePartitionsResult); - } - - @Override - public void failPartition(ExecutionAttemptID executionAttemptID) { - Preconditions.checkNotNull(executionAttemptID); - - actorGateway.tell(new TaskMessages.FailIntermediateResultPartitions(executionAttemptID)); - } - - @Override - public void notifyCheckpointComplete( - ExecutionAttemptID executionAttemptID, - JobID jobId, - long checkpointId, - long timestamp) { - - Preconditions.checkNotNull(executionAttemptID); - Preconditions.checkNotNull(jobId); - - actorGateway.tell(new NotifyCheckpointComplete(jobId, executionAttemptID, checkpointId, timestamp)); - } - - @Override - public void triggerCheckpoint( - ExecutionAttemptID executionAttemptID, - JobID jobId, - long checkpointId, - long timestamp, - CheckpointOptions checkpointOptions, - boolean advanceToEndOfEventTime) { - - // we ignore the `advanceToEndOfEventTime` because this is dead code. - - Preconditions.checkNotNull(executionAttemptID); - Preconditions.checkNotNull(jobId); - - actorGateway.tell(new TriggerCheckpoint(jobId, executionAttemptID, checkpointId, timestamp, checkpointOptions)); - } - - @Override - public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) { - throw new UnsupportedOperationException("The old TaskManager does not support freeing slots"); - } - - private CompletableFuture<TransientBlobKey> requestTaskManagerLog(TaskManagerMessages.RequestTaskManagerLog request, Time timeout) { - Preconditions.checkNotNull(request); - Preconditions.checkNotNull(timeout); - - scala.concurrent.Future<TransientBlobKey> blobKeyFuture = actorGateway - .ask( - request, - new FiniteDuration(timeout.getSize(), timeout.getUnit())) - .mapTo(ClassTag$.MODULE$.<TransientBlobKey>apply(TransientBlobKey.class)); - - return FutureUtils.toJava(blobKeyFuture); - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index d03e2fe..1c5b650 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -47,7 +47,6 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -76,7 +75,6 @@ import java.util.function.Consumer; import scala.concurrent.duration.FiniteDuration; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph; @@ -176,8 +174,7 @@ public class ExecutionGraphRestartTest extends TestLogger { Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); Instance instance = ExecutionGraphTestUtils.getInstance( - new ActorTaskManagerGateway( - new SimpleActorGateway(TestingUtils.directExecutionContext())), + new SimpleAckingTaskManagerGateway(), NUM_TASKS); scheduler.newInstanceAvailable(instance); @@ -317,8 +314,7 @@ public class ExecutionGraphRestartTest extends TestLogger { @Test public void testFailingExecutionAfterRestart() throws Exception { Instance instance = ExecutionGraphTestUtils.getInstance( - new ActorTaskManagerGateway( - new SimpleActorGateway(TestingUtils.directExecutionContext())), + new SimpleAckingTaskManagerGateway(), 2); Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); @@ -377,8 +373,7 @@ public class ExecutionGraphRestartTest extends TestLogger { @Test public void testFailExecutionAfterCancel() throws Exception { Instance instance = ExecutionGraphTestUtils.getInstance( - new ActorTaskManagerGateway( - new SimpleActorGateway(TestingUtils.directExecutionContext())), + new SimpleAckingTaskManagerGateway(), 2); Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); @@ -423,8 +418,7 @@ public class ExecutionGraphRestartTest extends TestLogger { @Test public void testFailExecutionGraphAfterCancel() throws Exception { Instance instance = ExecutionGraphTestUtils.getInstance( - new ActorTaskManagerGateway( - new SimpleActorGateway(TestingUtils.directExecutionContext())), + new SimpleAckingTaskManagerGateway(), 2); Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); @@ -467,8 +461,7 @@ public class ExecutionGraphRestartTest extends TestLogger { public void testSuspendWhileRestarting() throws Exception { Instance instance = ExecutionGraphTestUtils.getInstance( - new ActorTaskManagerGateway( - new SimpleActorGateway(TestingUtils.directExecutionContext())), + new SimpleAckingTaskManagerGateway(), NUM_TASKS); Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); @@ -763,8 +756,7 @@ public class ExecutionGraphRestartTest extends TestLogger { private Tuple2<ExecutionGraph, Instance> createExecutionGraph(RestartStrategy restartStrategy) throws Exception { Instance instance = ExecutionGraphTestUtils.getInstance( - new ActorTaskManagerGateway( - new SimpleActorGateway(TestingUtils.directExecutionContext())), + new SimpleAckingTaskManagerGateway(), NUM_TASKS); Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java index 3426901..8f961fc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java @@ -23,13 +23,11 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.instance.DummyActorGateway; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; -import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; import org.apache.flink.runtime.messages.Acknowledge; @@ -304,7 +302,7 @@ public class ExecutionVertexCancelTest extends TestLogger { // deploying after canceling from CREATED needs to raise an exception, because // the scheduler (or any caller) needs to know that the slot should be released try { - Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE)); + Instance instance = getInstance(new SimpleAckingTaskManagerGateway()); SimpleSlot slot = instance.allocateSimpleSlot(); vertex.deployToSlot(slot); @@ -344,7 +342,7 @@ public class ExecutionVertexCancelTest extends TestLogger { AkkaUtils.getDefaultTimeout()); setVertexState(vertex, ExecutionState.CANCELING); - Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE)); + Instance instance = getInstance(new SimpleAckingTaskManagerGateway()); SimpleSlot slot = instance.allocateSimpleSlot(); vertex.deployToSlot(slot); @@ -360,7 +358,7 @@ public class ExecutionVertexCancelTest extends TestLogger { ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); - Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE)); + Instance instance = getInstance(new SimpleAckingTaskManagerGateway()); SimpleSlot slot = instance.allocateSimpleSlot(); setVertexResource(vertex, slot); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java index 880be52..b5ece37 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java @@ -18,20 +18,13 @@ package org.apache.flink.runtime.instance; -import akka.actor.ActorSystem; -import akka.actor.RobustActorSystem; -import akka.testkit.JavaTestKit; - import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Test; import java.net.InetAddress; @@ -39,7 +32,6 @@ import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.Set; -import java.util.UUID; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -51,21 +43,6 @@ import static org.junit.Assert.fail; */ public class InstanceManagerTest extends TestLogger { - static ActorSystem system; - - static UUID leaderSessionID = UUID.randomUUID(); - - @BeforeClass - public static void setup(){ - system = RobustActorSystem.create("TestingActorSystem", TestingUtils.testConfig()); - } - - @AfterClass - public static void teardown(){ - JavaTestKit.shutdownActorSystem(system); - system = null; - } - @Test public void testInstanceRegistering() { try { @@ -81,27 +58,23 @@ public class InstanceManagerTest extends TestLogger { ResourceID resID1 = ResourceID.generate(); ResourceID resID2 = ResourceID.generate(); ResourceID resID3 = ResourceID.generate(); - + TaskManagerLocation ici1 = new TaskManagerLocation(resID1, address, dataPort); TaskManagerLocation ici2 = new TaskManagerLocation(resID2, address, dataPort + 15); TaskManagerLocation ici3 = new TaskManagerLocation(resID3, address, dataPort + 30); - final JavaTestKit probe1 = new JavaTestKit(system); - final JavaTestKit probe2 = new JavaTestKit(system); - final JavaTestKit probe3 = new JavaTestKit(system); - cm.registerTaskManager( - new ActorTaskManagerGateway(new AkkaActorGateway(probe1.getRef(), leaderSessionID)), + new SimpleAckingTaskManagerGateway(), ici1, hardwareDescription, 1); cm.registerTaskManager( - new ActorTaskManagerGateway(new AkkaActorGateway(probe2.getRef(), leaderSessionID)), + new SimpleAckingTaskManagerGateway(), ici2, hardwareDescription, 2); cm.registerTaskManager( - new ActorTaskManagerGateway(new AkkaActorGateway(probe3.getRef(), leaderSessionID)), + new SimpleAckingTaskManagerGateway(), ici3, hardwareDescription, 5); @@ -113,7 +86,7 @@ public class InstanceManagerTest extends TestLogger { Set<TaskManagerLocation> taskManagerLocations = new HashSet<TaskManagerLocation>(); - for(Instance instance: instances){ + for (Instance instance: instances){ taskManagerLocations.add(instance.getTaskManagerLocation()); } @@ -138,15 +111,13 @@ public class InstanceManagerTest extends TestLogger { final int dataPort = 20000; ResourceID resID1 = ResourceID.generate(); - ResourceID resID2 = ResourceID.generate(); HardwareDescription resources = HardwareDescription.extractFromSystem(4096); InetAddress address = InetAddress.getByName("127.0.0.1"); TaskManagerLocation ici = new TaskManagerLocation(resID1, address, dataPort); - JavaTestKit probe = new JavaTestKit(system); cm.registerTaskManager( - new ActorTaskManagerGateway(new AkkaActorGateway(probe.getRef(), leaderSessionID)), + new SimpleAckingTaskManagerGateway(), ici, resources, 1); @@ -156,7 +127,7 @@ public class InstanceManagerTest extends TestLogger { try { cm.registerTaskManager( - new ActorTaskManagerGateway(new AkkaActorGateway(probe.getRef(), leaderSessionID)), + new SimpleAckingTaskManagerGateway(), ici, resources, 1); @@ -197,22 +168,18 @@ public class InstanceManagerTest extends TestLogger { TaskManagerLocation ici2 = new TaskManagerLocation(resID2, address, dataPort + 1); TaskManagerLocation ici3 = new TaskManagerLocation(resID3, address, dataPort + 2); - JavaTestKit probe1 = new JavaTestKit(system); - JavaTestKit probe2 = new JavaTestKit(system); - JavaTestKit probe3 = new JavaTestKit(system); - InstanceID instanceID1 = cm.registerTaskManager( - new ActorTaskManagerGateway(new AkkaActorGateway(probe1.getRef(), leaderSessionID)), + new SimpleAckingTaskManagerGateway(), ici1, hardwareDescription, 1); InstanceID instanceID2 = cm.registerTaskManager( - new ActorTaskManagerGateway(new AkkaActorGateway(probe2.getRef(), leaderSessionID)), + new SimpleAckingTaskManagerGateway(), ici2, hardwareDescription, 1); InstanceID instanceID3 = cm.registerTaskManager( - new ActorTaskManagerGateway(new AkkaActorGateway(probe3.getRef(), leaderSessionID)), + new SimpleAckingTaskManagerGateway(), ici3, hardwareDescription, 1); @@ -268,9 +235,8 @@ public class InstanceManagerTest extends TestLogger { InetAddress address = InetAddress.getByName("127.0.0.1"); TaskManagerLocation ici = new TaskManagerLocation(resID, address, 20000); - JavaTestKit probe = new JavaTestKit(system); cm.registerTaskManager( - new ActorTaskManagerGateway(new AkkaActorGateway(probe.getRef(), leaderSessionID)), + new SimpleAckingTaskManagerGateway(), ici, resources, 1); @@ -279,7 +245,7 @@ public class InstanceManagerTest extends TestLogger { catch (IllegalStateException e) { // expected } - + assertFalse(cm.reportHeartBeat(new InstanceID())); } catch (Exception e) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java index 0c2b7c1..097cab5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.instance; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.junit.Test; @@ -47,7 +47,7 @@ public class InstanceTest { TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001); Instance instance = new Instance( - new ActorTaskManagerGateway(DummyActorGateway.INSTANCE), + new SimpleAckingTaskManagerGateway(), connection, new InstanceID(), hardwareDescription, @@ -114,7 +114,7 @@ public class InstanceTest { TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001); Instance instance = new Instance( - new ActorTaskManagerGateway(DummyActorGateway.INSTANCE), + new SimpleAckingTaskManagerGateway(), connection, new InstanceID(), hardwareDescription, @@ -150,7 +150,7 @@ public class InstanceTest { TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001); Instance instance = new Instance( - new ActorTaskManagerGateway(DummyActorGateway.INSTANCE), + new SimpleAckingTaskManagerGateway(), connection, new InstanceID(), hardwareDescription, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java index de2ae41..89fa90a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.instance; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.jobmaster.TestingPayload; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.TestLogger; @@ -131,7 +131,7 @@ public class SimpleSlotTest extends TestLogger { TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001); Instance instance = new Instance( - new ActorTaskManagerGateway(DummyActorGateway.INSTANCE), + new SimpleAckingTaskManagerGateway(), connection, new InstanceID(), hardwareDescription,