This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0bfe4666c95b41f9f3aa25d4dff703e5e72aae1a
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Wed Apr 24 18:00:53 2019 +0200

    [FLINK-12324] Remove legacy ActorGateway interface
    
    This commit removes the legacy ActorGateway and all its implementations.
    
    This closes #8257.
---
 .../flink/runtime/instance/ActorGateway.java       | 107 -------------
 .../flink/runtime/instance/AkkaActorGateway.java   | 165 ---------------------
 .../executiongraph/ExecutionGraphTestUtils.java    |  30 ----
 .../runtime/instance/BaseTestingActorGateway.java  | 119 ---------------
 .../taskmanager/ForwardingActorGateway.java        |  48 ------
 .../flink/runtime/testingUtils/TestingUtils.scala  |  15 +-
 6 files changed, 3 insertions(+), 481 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java
deleted file mode 100644
index a82debb..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import akka.actor.ActorRef;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.Serializable;
-import java.util.UUID;
-
-/**
- * Interface to abstract the communication with an actor.
- *
- * It allows to avoid direct interaction with an ActorRef.
- */
-public interface ActorGateway extends Serializable {
-
-       /**
-        * Sends a message asynchronously and returns its response. The 
response to the message is
-        * returned as a future.
-        *
-        * @param message Message to be sent
-        * @param timeout Timeout until the Future is completed with an 
AskTimeoutException
-        * @return Future which contains the response to the sent message
-        */
-       Future<Object> ask(Object message, FiniteDuration timeout);
-
-       /**
-        * Sends a message asynchronously without a result.
-        *
-        * @param message Message to be sent
-        */
-       void tell(Object message);
-
-       /**
-        * Sends a message asynchronously without a result with sender being 
the sender.
-        *
-        * @param message Message to be sent
-        * @param sender Sender of the message
-        */
-       void tell(Object message, ActorGateway sender);
-
-       /**
-        * Forwards a message. For the receiver of this message it looks as if 
sender has sent the
-        * message.
-        *
-        * @param message Message to be sent
-        * @param sender Sender of the forwarded message
-        */
-       void forward(Object message, ActorGateway sender);
-
-       /**
-        * Retries to send asynchronously a message up to numberRetries times. 
The response to this
-        * message is returned as a future. The message is re-sent if the 
number of retries is not yet
-        * exceeded and if an exception occurred while sending it.
-        *
-        * @param message Message to be sent
-        * @param numberRetries Number of times to retry sending the message
-        * @param timeout Timeout for each sending attempt
-        * @param executionContext ExecutionContext which is used to send the 
message multiple times
-        * @return Future of the response to the sent message
-        */
-       Future<Object> retry(
-                       Object message,
-                       int numberRetries,
-                       FiniteDuration timeout,
-                       ExecutionContext executionContext);
-
-       /**
-        * Returns the path of the remote instance.
-        *
-        * @return Path of the remote instance.
-        */
-       String path();
-
-       /**
-        * Returns the underlying actor with which is communicated
-        *
-        * @return ActorRef of the target actor
-        */
-       ActorRef actor();
-
-       /**
-        * Returns the leaderSessionID associated with the remote actor or null.
-        *
-        * @return Leader session ID if its associated with this gateway, 
otherwise null
-        */
-       UUID leaderSessionID();
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
deleted file mode 100644
index 26b6176..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.messages.LeaderSessionMessageDecorator;
-import org.apache.flink.runtime.messages.MessageDecorator;
-import org.apache.flink.util.Preconditions;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.Serializable;
-import java.util.UUID;
-
-/**
- * Concrete {@link ActorGateway} implementation which uses Akka to communicate 
with remote actors.
- */
-public class AkkaActorGateway implements ActorGateway, Serializable {
-
-       private static final long serialVersionUID = 42L;
-
-       // ActorRef of the remote instance
-       private final ActorRef actor;
-
-       // Associated leader session ID, which is used for 
RequiresLeaderSessionID messages
-       private final UUID leaderSessionID;
-
-       // Decorator for messages
-       private final MessageDecorator decorator;
-
-       public AkkaActorGateway(ActorRef actor, UUID leaderSessionID) {
-               this.actor = Preconditions.checkNotNull(actor);
-               this.leaderSessionID = 
Preconditions.checkNotNull(leaderSessionID);
-               // we want to wrap RequiresLeaderSessionID messages in a 
LeaderSessionMessage
-               this.decorator = new 
LeaderSessionMessageDecorator(leaderSessionID);
-       }
-
-       /**
-        * Sends a message asynchronously and returns its response. The 
response to the message is
-        * returned as a future.
-        *
-        * @param message Message to be sent
-        * @param timeout Timeout until the Future is completed with an 
AskTimeoutException
-        * @return Future which contains the response to the sent message
-        */
-       @Override
-       public Future<Object> ask(Object message, FiniteDuration timeout) {
-               Object newMessage = decorator.decorate(message);
-               return Patterns.ask(actor, newMessage, new Timeout(timeout));
-       }
-
-       /**
-        * Sends a message asynchronously without a result.
-        *
-        * @param message Message to be sent
-        */
-       @Override
-       public void tell(Object message) {
-               Object newMessage = decorator.decorate(message);
-               actor.tell(newMessage, ActorRef.noSender());
-       }
-
-       /**
-        * Sends a message asynchronously without a result with sender being 
the sender.
-        *
-        * @param message Message to be sent
-        * @param sender Sender of the message
-        */
-       @Override
-       public void tell(Object message, ActorGateway sender) {
-               Object newMessage = decorator.decorate(message);
-               actor.tell(newMessage, sender.actor());
-       }
-
-       /**
-        * Forwards a message. For the receiver of this message it looks as if 
sender has sent the
-        * message.
-        *
-        * @param message Message to be sent
-        * @param sender Sender of the forwarded message
-        */
-       @Override
-       public void forward(Object message, ActorGateway sender) {
-               Object newMessage = decorator.decorate(message);
-               actor.tell(newMessage, sender.actor());
-       }
-
-       /**
-        * Retries to send asynchronously a message up to numberRetries times. 
The response to this
-        * message is returned as a future. The message is re-sent if the 
number of retries is not yet
-        * exceeded and if an exception occurred while sending it.
-        *
-        * @param message Message to be sent
-        * @param numberRetries Number of times to retry sending the message
-        * @param timeout Timeout for each sending attempt
-        * @param executionContext ExecutionContext which is used to send the 
message multiple times
-        * @return Future of the response to the sent message
-        */
-       @Override
-       public Future<Object> retry(
-                       Object message,
-                       int numberRetries,
-                       FiniteDuration timeout,
-                       ExecutionContext executionContext) {
-
-               Object newMessage = decorator.decorate(message);
-
-               return AkkaUtils.retry(
-                       actor,
-                       newMessage,
-                       numberRetries,
-                       executionContext,
-                       timeout);
-       }
-
-       /**
-        * Returns the ActorPath of the remote instance.
-        *
-        * @return ActorPath of the remote instance.
-        */
-       @Override
-       public String path() {
-               return actor.path().toString();
-       }
-
-       /**
-        * Returns {@link ActorRef} of the target actor
-        *
-        * @return ActorRef of the target actor
-        */
-       @Override
-       public ActorRef actor() {
-               return actor;
-       }
-
-       @Override
-       public UUID leaderSessionID() {
-               return leaderSessionID;
-       }
-
-       @Override
-       public String toString() {
-               return String.format("AkkaActorGateway(%s, %s)", actor.path(), 
leaderSessionID);
-       }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 90fc6d2..99bb7b0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -35,7 +35,6 @@ import 
org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
-import org.apache.flink.runtime.instance.BaseTestingActorGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
@@ -50,10 +49,6 @@ import 
org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
-import 
org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
-import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -74,8 +69,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Predicate;
 
-import scala.concurrent.ExecutionContext;
-
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.junit.Assert.assertEquals;
@@ -496,29 +489,6 @@ public class ExecutionGraphTestUtils {
                return groupVertex;
        }
 
-       @SuppressWarnings("serial")
-       public static class SimpleActorGateway extends BaseTestingActorGateway {
-
-
-               public SimpleActorGateway(ExecutionContext executionContext){
-                       super(executionContext);
-               }
-
-               @Override
-               public Object handleMessage(Object message) {
-                       if (message instanceof SubmitTask) {
-                               SubmitTask submitTask = (SubmitTask) message;
-                               return Acknowledge.get();
-                       } else if(message instanceof CancelTask) {
-                               return Acknowledge.get();
-                       } else if(message instanceof 
FailIntermediateResultPartitions) {
-                               return new Object();
-                       } else {
-                               return null;
-                       }
-               }
-       }
-
        public static final String ERROR_MESSAGE = "test_failure_error_message";
 
        public static ExecutionJobVertex getExecutionVertex(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java
deleted file mode 100644
index 62bc96b..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import akka.actor.ActorRef;
-import akka.dispatch.Futures;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.UUID;
-import java.util.concurrent.Callable;
-
-/**
- * Abstract base class for testing {@link ActorGateway} instances. The 
implementing subclass
- * only has to provide an implementation for handleMessage which contains the 
logic to treat
- * different messages.
- */
-abstract public class BaseTestingActorGateway implements ActorGateway {
-       /**
-        * {@link ExecutionContext} which is used to execute the futures.
-        */
-       private final ExecutionContext executionContext;
-
-       public BaseTestingActorGateway(ExecutionContext executionContext) {
-               this.executionContext = executionContext;
-       }
-
-       @Override
-       public Future<Object> ask(Object message, FiniteDuration timeout) {
-               try {
-                       final Object result = handleMessage(message);
-
-                       return Futures.future(new Callable<Object>() {
-                               @Override
-                               public Object call() throws Exception {
-                                       return result;
-                               }
-                       }, executionContext);
-
-               } catch (final Exception e) {
-                       // if an exception occurred in the handleMessage method 
then return it as part of the future
-                       return Futures.future(new Callable<Object>() {
-                               @Override
-                               public Object call() throws Exception {
-                                       throw e;
-                               }
-                       }, executionContext);
-               }
-       }
-
-       /**
-        * Handles the supported messages by this InstanceGateway
-        *
-        * @param message Message to handle
-        * @return Result
-        * @throws Exception
-        */
-       abstract public Object handleMessage(Object message) throws Exception;
-
-       @Override
-       public void tell(Object message) {
-               try {
-                       handleMessage(message);
-               } catch (Exception e) {
-                       // discard exception because it happens on the "remote" 
instance
-               }
-       }
-
-       @Override
-       public void tell(Object message, ActorGateway sender) {
-               try{
-                       handleMessage(message);
-               } catch (Exception e) {
-                       // discard exception because it happens on the "remote" 
instance
-               }
-       }
-
-       @Override
-       public void forward(Object message, ActorGateway sender) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public Future<Object> retry(Object message, int numberRetries, 
FiniteDuration timeout, ExecutionContext executionContext) {
-               return ask(message, timeout);
-       }
-
-       @Override
-       public String path() {
-               return "BaseTestingInstanceGateway";
-       }
-
-       @Override
-       public ActorRef actor() {
-               return ActorRef.noSender();
-       }
-
-       @Override
-       public UUID leaderSessionID() {
-               return null;
-       }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/ForwardingActorGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/ForwardingActorGateway.java
deleted file mode 100644
index 18a3c3b..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/ForwardingActorGateway.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.runtime.instance.BaseTestingActorGateway;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Testing {@link org.apache.flink.runtime.instance.ActorGateway} which stores 
all handled messages
- * in a {@link BlockingQueue}.
- */
-public class ForwardingActorGateway extends BaseTestingActorGateway {
-       private static final long serialVersionUID = 7001973884263802603L;
-
-       private final transient BlockingQueue<Object> queue;
-
-       public ForwardingActorGateway(BlockingQueue<Object> queue) {
-               super(TestingUtils.directExecutionContext());
-
-               this.queue = queue;
-       }
-
-       @Override
-       public Object handleMessage(Object message) throws Exception {
-               // don't do anything with the message, but storing it in queue
-               queue.add(message);
-
-               return null;
-       }
-}
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 59709d1..fdc2c07 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -19,26 +19,17 @@
 package org.apache.flink.runtime.testingUtils
 
 import java.util
+import java.util.Collections
 import java.util.concurrent._
-import java.util.{Collections, UUID}
 
-import akka.actor.{ActorRef, ActorSystem, Kill, Props}
-import akka.pattern.{Patterns, ask}
+import akka.actor.{ActorRef, Kill}
 import com.typesafe.config.ConfigFactory
-import grizzled.slf4j.Logger
 import org.apache.flink.api.common.time.Time
-import org.apache.flink.configuration._
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.concurrent.{ScheduledExecutor, 
ScheduledExecutorServiceAdapter}
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices
-import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
-import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService
-import 
org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager,
 RegisteredAtJobManager}
-import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, 
MetricRegistryImpl}
 
 import scala.concurrent.duration.{TimeUnit, _}
-import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor}
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
 import scala.language.postfixOps
 
 /**

Reply via email to