This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fdc32cd732fb356494879f60a9227c230ccbc01a
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Wed Apr 24 17:22:48 2019 +0200

    [FLINK-12322] Remove legacy ActorTaskManagerGateway
    
    Remove ActorTaskManagerGateway from InstanceManagerTest
    
    Remove ActorTaskManagerGateway from ExecutionGraphRestartTest
    
    Remove ActorTaskManagerGateway from ExecutionVertexCancelTest
    
    Remove ActorTaskManagerGateway from InstanceTest
    
    Remove ActorTaskManagerGateway from SimpleSlotTest
    
    Remove ActorTaskManagerGateway
    
    This closes #8255.
---
 .../jobmanager/slots/ActorTaskManagerGateway.java  | 192 ---------------------
 .../executiongraph/ExecutionGraphRestartTest.java  |  20 +--
 .../executiongraph/ExecutionVertexCancelTest.java  |   8 +-
 .../runtime/instance/InstanceManagerTest.java      |  60 ++-----
 .../flink/runtime/instance/InstanceTest.java       |   8 +-
 .../flink/runtime/instance/SimpleSlotTest.java     |   4 +-
 6 files changed, 28 insertions(+), 264 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
deleted file mode 100644
index 005ee04..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.slots;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.blob.TransientBlobKey;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.PartitionInfo;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.StackTraceSampleMessages;
-import org.apache.flink.runtime.messages.StackTraceSampleResponse;
-import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.messages.TaskMessages;
-import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
-import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
-import org.apache.flink.util.Preconditions;
-
-import java.util.concurrent.CompletableFuture;
-
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
-
-/**
- * Implementation of the {@link TaskManagerGateway} for {@link ActorGateway}.
- */
-public class ActorTaskManagerGateway implements TaskManagerGateway {
-       private final ActorGateway actorGateway;
-
-       public ActorTaskManagerGateway(ActorGateway actorGateway) {
-               this.actorGateway = Preconditions.checkNotNull(actorGateway);
-       }
-
-       public ActorGateway getActorGateway() {
-               return actorGateway;
-       }
-
-       
//-------------------------------------------------------------------------------
-       // Task manager rpc methods
-       
//-------------------------------------------------------------------------------
-
-       @Override
-       public String getAddress() {
-               return actorGateway.path();
-       }
-
-       @Override
-       public CompletableFuture<StackTraceSampleResponse> 
requestStackTraceSample(
-                       ExecutionAttemptID executionAttemptID,
-                       int sampleId,
-                       int numSamples,
-                       Time delayBetweenSamples,
-                       int maxStackTraceDepth,
-                       Time timeout) {
-               Preconditions.checkNotNull(executionAttemptID);
-               Preconditions.checkArgument(numSamples > 0, "The number of 
samples must be greater than 0.");
-               Preconditions.checkNotNull(delayBetweenSamples);
-               Preconditions.checkArgument(maxStackTraceDepth >= 0, "The max 
stack trace depth must be greater or equal than 0.");
-               Preconditions.checkNotNull(timeout);
-
-               scala.concurrent.Future<StackTraceSampleResponse> 
stackTraceSampleResponseFuture = actorGateway.ask(
-                       new StackTraceSampleMessages.TriggerStackTraceSample(
-                               sampleId,
-                               executionAttemptID,
-                               numSamples,
-                               delayBetweenSamples,
-                               maxStackTraceDepth),
-                       new FiniteDuration(timeout.getSize(), 
timeout.getUnit()))
-                       
.mapTo(ClassTag$.MODULE$.<StackTraceSampleResponse>apply(StackTraceSampleResponse.class));
-
-               return FutureUtils.toJava(stackTraceSampleResponseFuture);
-       }
-
-       @Override
-       public CompletableFuture<Acknowledge> 
submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
-               Preconditions.checkNotNull(tdd);
-               Preconditions.checkNotNull(timeout);
-
-               scala.concurrent.Future<Acknowledge> submitResult = 
actorGateway.ask(
-                       new TaskMessages.SubmitTask(tdd),
-                       new FiniteDuration(timeout.getSize(), 
timeout.getUnit()))
-                       
.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
-
-               return FutureUtils.toJava(submitResult);
-       }
-
-       @Override
-       public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID 
executionAttemptID, Time timeout) {
-               Preconditions.checkNotNull(executionAttemptID);
-               Preconditions.checkNotNull(timeout);
-
-               scala.concurrent.Future<Acknowledge> cancelResult = 
actorGateway.ask(
-                       new TaskMessages.CancelTask(executionAttemptID),
-                       new FiniteDuration(timeout.getSize(), 
timeout.getUnit()))
-                       
.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
-
-               return FutureUtils.toJava(cancelResult);
-       }
-
-       @Override
-       public CompletableFuture<Acknowledge> 
updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> 
partitionInfos, Time timeout) {
-               Preconditions.checkNotNull(executionAttemptID);
-               Preconditions.checkNotNull(partitionInfos);
-
-               TaskMessages.UpdatePartitionInfo updatePartitionInfoMessage = 
new TaskMessages.UpdateTaskMultiplePartitionInfos(
-                       executionAttemptID,
-                       partitionInfos);
-
-               scala.concurrent.Future<Acknowledge> updatePartitionsResult = 
actorGateway.ask(
-                       updatePartitionInfoMessage,
-                       new FiniteDuration(timeout.getSize(), 
timeout.getUnit()))
-                       
.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
-
-               return FutureUtils.toJava(updatePartitionsResult);
-       }
-
-       @Override
-       public void failPartition(ExecutionAttemptID executionAttemptID) {
-               Preconditions.checkNotNull(executionAttemptID);
-
-               actorGateway.tell(new 
TaskMessages.FailIntermediateResultPartitions(executionAttemptID));
-       }
-
-       @Override
-       public void notifyCheckpointComplete(
-                       ExecutionAttemptID executionAttemptID,
-                       JobID jobId,
-                       long checkpointId,
-                       long timestamp) {
-
-               Preconditions.checkNotNull(executionAttemptID);
-               Preconditions.checkNotNull(jobId);
-
-               actorGateway.tell(new NotifyCheckpointComplete(jobId, 
executionAttemptID, checkpointId, timestamp));
-       }
-
-       @Override
-       public void triggerCheckpoint(
-                       ExecutionAttemptID executionAttemptID,
-                       JobID jobId,
-                       long checkpointId,
-                       long timestamp,
-                       CheckpointOptions checkpointOptions,
-                       boolean advanceToEndOfEventTime) {
-
-               // we ignore the `advanceToEndOfEventTime` because this is dead 
code.
-
-               Preconditions.checkNotNull(executionAttemptID);
-               Preconditions.checkNotNull(jobId);
-
-               actorGateway.tell(new TriggerCheckpoint(jobId, 
executionAttemptID, checkpointId, timestamp, checkpointOptions));
-       }
-
-       @Override
-       public CompletableFuture<Acknowledge> freeSlot(AllocationID 
allocationId, Throwable cause, Time timeout) {
-               throw new UnsupportedOperationException("The old TaskManager 
does not support freeing slots");
-       }
-
-       private CompletableFuture<TransientBlobKey> 
requestTaskManagerLog(TaskManagerMessages.RequestTaskManagerLog request, Time 
timeout) {
-               Preconditions.checkNotNull(request);
-               Preconditions.checkNotNull(timeout);
-
-               scala.concurrent.Future<TransientBlobKey> blobKeyFuture = 
actorGateway
-                       .ask(
-                               request,
-                               new FiniteDuration(timeout.getSize(), 
timeout.getUnit()))
-                       
.mapTo(ClassTag$.MODULE$.<TransientBlobKey>apply(TransientBlobKey.class));
-
-               return FutureUtils.toJava(blobKeyFuture);
-       }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index d03e2fe..1c5b650 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -47,7 +47,6 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -76,7 +75,6 @@ import java.util.function.Consumer;
 
 import scala.concurrent.duration.FiniteDuration;
 
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph;
@@ -176,8 +174,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
 
                Instance instance = ExecutionGraphTestUtils.getInstance(
-                       new ActorTaskManagerGateway(
-                               new 
SimpleActorGateway(TestingUtils.directExecutionContext())),
+                       new SimpleAckingTaskManagerGateway(),
                        NUM_TASKS);
 
                scheduler.newInstanceAvailable(instance);
@@ -317,8 +314,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
        @Test
        public void testFailingExecutionAfterRestart() throws Exception {
                Instance instance = ExecutionGraphTestUtils.getInstance(
-                       new ActorTaskManagerGateway(
-                               new 
SimpleActorGateway(TestingUtils.directExecutionContext())),
+                       new SimpleAckingTaskManagerGateway(),
                        2);
 
                Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
@@ -377,8 +373,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
        @Test
        public void testFailExecutionAfterCancel() throws Exception {
                Instance instance = ExecutionGraphTestUtils.getInstance(
-                       new ActorTaskManagerGateway(
-                               new 
SimpleActorGateway(TestingUtils.directExecutionContext())),
+                       new SimpleAckingTaskManagerGateway(),
                        2);
 
                Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
@@ -423,8 +418,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
        @Test
        public void testFailExecutionGraphAfterCancel() throws Exception {
                Instance instance = ExecutionGraphTestUtils.getInstance(
-                       new ActorTaskManagerGateway(
-                               new 
SimpleActorGateway(TestingUtils.directExecutionContext())),
+                       new SimpleAckingTaskManagerGateway(),
                        2);
 
                Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
@@ -467,8 +461,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
        public void testSuspendWhileRestarting() throws Exception {
 
                Instance instance = ExecutionGraphTestUtils.getInstance(
-                       new ActorTaskManagerGateway(
-                               new 
SimpleActorGateway(TestingUtils.directExecutionContext())),
+                       new SimpleAckingTaskManagerGateway(),
                        NUM_TASKS);
 
                Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
@@ -763,8 +756,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
        private Tuple2<ExecutionGraph, Instance> 
createExecutionGraph(RestartStrategy restartStrategy) throws Exception {
                Instance instance = ExecutionGraphTestUtils.getInstance(
-                       new ActorTaskManagerGateway(
-                               new 
SimpleActorGateway(TestingUtils.directExecutionContext())),
+                       new SimpleAckingTaskManagerGateway(),
                        NUM_TASKS);
 
                Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 3426901..8f961fc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -23,13 +23,11 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
-import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -304,7 +302,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
                        // deploying after canceling from CREATED needs to 
raise an exception, because
                        // the scheduler (or any caller) needs to know that the 
slot should be released
                        try {
-                               Instance instance = getInstance(new 
ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
+                               Instance instance = getInstance(new 
SimpleAckingTaskManagerGateway());
                                SimpleSlot slot = instance.allocateSimpleSlot();
 
                                vertex.deployToSlot(slot);
@@ -344,7 +342,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
                                                AkkaUtils.getDefaultTimeout());
                                setVertexState(vertex, 
ExecutionState.CANCELING);
 
-                               Instance instance = getInstance(new 
ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
+                               Instance instance = getInstance(new 
SimpleAckingTaskManagerGateway());
                                SimpleSlot slot = instance.allocateSimpleSlot();
 
                                vertex.deployToSlot(slot);
@@ -360,7 +358,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
                                ExecutionVertex vertex = new 
ExecutionVertex(ejv, 0, new IntermediateResult[0],
                                                AkkaUtils.getDefaultTimeout());
 
-                               Instance instance = getInstance(new 
ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
+                               Instance instance = getInstance(new 
SimpleAckingTaskManagerGateway());
                                SimpleSlot slot = instance.allocateSimpleSlot();
 
                                setVertexResource(vertex, slot);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index 880be52..b5ece37 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -18,20 +18,13 @@
 
 package org.apache.flink.runtime.instance;
 
-import akka.actor.ActorSystem;
-import akka.actor.RobustActorSystem;
-import akka.testkit.JavaTestKit;
-
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.net.InetAddress;
@@ -39,7 +32,6 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
-import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -51,21 +43,6 @@ import static org.junit.Assert.fail;
  */
 public class InstanceManagerTest extends TestLogger {
 
-       static ActorSystem system;
-
-       static UUID leaderSessionID = UUID.randomUUID();
-
-       @BeforeClass
-       public static void setup(){
-               system = RobustActorSystem.create("TestingActorSystem", 
TestingUtils.testConfig());
-       }
-
-       @AfterClass
-       public static void teardown(){
-               JavaTestKit.shutdownActorSystem(system);
-               system = null;
-       }
-
        @Test
        public void testInstanceRegistering() {
                try {
@@ -81,27 +58,23 @@ public class InstanceManagerTest extends TestLogger {
                        ResourceID resID1 = ResourceID.generate();
                        ResourceID resID2 = ResourceID.generate();
                        ResourceID resID3 = ResourceID.generate();
-                       
+
                        TaskManagerLocation ici1 = new 
TaskManagerLocation(resID1, address, dataPort);
                        TaskManagerLocation ici2 = new 
TaskManagerLocation(resID2, address, dataPort + 15);
                        TaskManagerLocation ici3 = new 
TaskManagerLocation(resID3, address, dataPort + 30);
 
-                       final JavaTestKit probe1 = new JavaTestKit(system);
-                       final JavaTestKit probe2 = new JavaTestKit(system);
-                       final JavaTestKit probe3 = new JavaTestKit(system);
-
                        cm.registerTaskManager(
-                               new ActorTaskManagerGateway(new 
AkkaActorGateway(probe1.getRef(), leaderSessionID)),
+                               new SimpleAckingTaskManagerGateway(),
                                ici1,
                                hardwareDescription,
                                1);
                        cm.registerTaskManager(
-                               new ActorTaskManagerGateway(new 
AkkaActorGateway(probe2.getRef(), leaderSessionID)),
+                               new SimpleAckingTaskManagerGateway(),
                                ici2,
                                hardwareDescription,
                                2);
                        cm.registerTaskManager(
-                               new ActorTaskManagerGateway(new 
AkkaActorGateway(probe3.getRef(), leaderSessionID)),
+                               new SimpleAckingTaskManagerGateway(),
                                ici3,
                                hardwareDescription,
                                5);
@@ -113,7 +86,7 @@ public class InstanceManagerTest extends TestLogger {
                        Set<TaskManagerLocation> taskManagerLocations = new
                                        HashSet<TaskManagerLocation>();
 
-                       for(Instance instance: instances){
+                       for (Instance instance: instances){
                                
taskManagerLocations.add(instance.getTaskManagerLocation());
                        }
 
@@ -138,15 +111,13 @@ public class InstanceManagerTest extends TestLogger {
                        final int dataPort = 20000;
 
                        ResourceID resID1 = ResourceID.generate();
-                       ResourceID resID2 = ResourceID.generate();
 
                        HardwareDescription resources = 
HardwareDescription.extractFromSystem(4096);
                        InetAddress address = 
InetAddress.getByName("127.0.0.1");
                        TaskManagerLocation ici = new 
TaskManagerLocation(resID1, address, dataPort);
 
-                       JavaTestKit probe = new JavaTestKit(system);
                        cm.registerTaskManager(
-                               new ActorTaskManagerGateway(new 
AkkaActorGateway(probe.getRef(), leaderSessionID)),
+                               new SimpleAckingTaskManagerGateway(),
                                ici,
                                resources,
                                1);
@@ -156,7 +127,7 @@ public class InstanceManagerTest extends TestLogger {
 
                        try {
                                cm.registerTaskManager(
-                                       new ActorTaskManagerGateway(new 
AkkaActorGateway(probe.getRef(), leaderSessionID)),
+                                       new SimpleAckingTaskManagerGateway(),
                                        ici,
                                        resources,
                                        1);
@@ -197,22 +168,18 @@ public class InstanceManagerTest extends TestLogger {
                        TaskManagerLocation ici2 = new 
TaskManagerLocation(resID2, address, dataPort + 1);
                        TaskManagerLocation ici3 = new 
TaskManagerLocation(resID3, address, dataPort + 2);
 
-                       JavaTestKit probe1 = new JavaTestKit(system);
-                       JavaTestKit probe2 = new JavaTestKit(system);
-                       JavaTestKit probe3 = new JavaTestKit(system);
-
                        InstanceID instanceID1 = cm.registerTaskManager(
-                               new ActorTaskManagerGateway(new 
AkkaActorGateway(probe1.getRef(), leaderSessionID)),
+                               new SimpleAckingTaskManagerGateway(),
                                ici1,
                                hardwareDescription,
                                1);
                        InstanceID instanceID2 = cm.registerTaskManager(
-                               new ActorTaskManagerGateway(new 
AkkaActorGateway(probe2.getRef(), leaderSessionID)),
+                               new SimpleAckingTaskManagerGateway(),
                                ici2,
                                hardwareDescription,
                                1);
                        InstanceID instanceID3 = cm.registerTaskManager(
-                               new ActorTaskManagerGateway(new 
AkkaActorGateway(probe3.getRef(), leaderSessionID)),
+                               new SimpleAckingTaskManagerGateway(),
                                ici3,
                                hardwareDescription,
                                1);
@@ -268,9 +235,8 @@ public class InstanceManagerTest extends TestLogger {
                                InetAddress address = 
InetAddress.getByName("127.0.0.1");
                                TaskManagerLocation ici = new 
TaskManagerLocation(resID, address, 20000);
 
-                               JavaTestKit probe = new JavaTestKit(system);
                                cm.registerTaskManager(
-                                       new ActorTaskManagerGateway(new 
AkkaActorGateway(probe.getRef(), leaderSessionID)),
+                                       new SimpleAckingTaskManagerGateway(),
                                        ici,
                                        resources,
                                        1);
@@ -279,7 +245,7 @@ public class InstanceManagerTest extends TestLogger {
                        catch (IllegalStateException e) {
                                // expected
                        }
-                       
+
                        assertFalse(cm.reportHeartBeat(new InstanceID()));
                }
                catch (Exception e) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index 0c2b7c1..097cab5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.instance;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import org.junit.Test;
@@ -47,7 +47,7 @@ public class InstanceTest {
                        TaskManagerLocation connection = new 
TaskManagerLocation(resourceID, address, 10001);
 
                        Instance instance = new Instance(
-                               new 
ActorTaskManagerGateway(DummyActorGateway.INSTANCE),
+                               new SimpleAckingTaskManagerGateway(),
                                connection,
                                new InstanceID(),
                                hardwareDescription,
@@ -114,7 +114,7 @@ public class InstanceTest {
                        TaskManagerLocation connection = new 
TaskManagerLocation(resourceID, address, 10001);
 
                        Instance instance = new Instance(
-                               new 
ActorTaskManagerGateway(DummyActorGateway.INSTANCE),
+                               new SimpleAckingTaskManagerGateway(),
                                connection,
                                new InstanceID(),
                                hardwareDescription,
@@ -150,7 +150,7 @@ public class InstanceTest {
                        TaskManagerLocation connection = new 
TaskManagerLocation(resourceID, address, 10001);
 
                        Instance instance = new Instance(
-                               new 
ActorTaskManagerGateway(DummyActorGateway.INSTANCE),
+                               new SimpleAckingTaskManagerGateway(),
                                connection,
                                new InstanceID(),
                                hardwareDescription,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
index de2ae41..89fa90a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.instance;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.TestingPayload;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.TestLogger;
@@ -131,7 +131,7 @@ public class SimpleSlotTest extends  TestLogger {
                TaskManagerLocation connection = new 
TaskManagerLocation(resourceID, address, 10001);
 
                Instance instance = new Instance(
-                       new ActorTaskManagerGateway(DummyActorGateway.INSTANCE),
+                       new SimpleAckingTaskManagerGateway(),
                        connection,
                        new InstanceID(),
                        hardwareDescription,

Reply via email to