This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0bfe4666c95b41f9f3aa25d4dff703e5e72aae1a Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Wed Apr 24 18:00:53 2019 +0200 [FLINK-12324] Remove legacy ActorGateway interface This commit removes the legacy ActorGateway and all its implementations. This closes #8257. --- .../flink/runtime/instance/ActorGateway.java | 107 ------------- .../flink/runtime/instance/AkkaActorGateway.java | 165 --------------------- .../executiongraph/ExecutionGraphTestUtils.java | 30 ---- .../runtime/instance/BaseTestingActorGateway.java | 119 --------------- .../taskmanager/ForwardingActorGateway.java | 48 ------ .../flink/runtime/testingUtils/TestingUtils.scala | 15 +- 6 files changed, 3 insertions(+), 481 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java deleted file mode 100644 index a82debb..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.instance; - -import akka.actor.ActorRef; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -import java.io.Serializable; -import java.util.UUID; - -/** - * Interface to abstract the communication with an actor. - * - * It allows to avoid direct interaction with an ActorRef. - */ -public interface ActorGateway extends Serializable { - - /** - * Sends a message asynchronously and returns its response. The response to the message is - * returned as a future. - * - * @param message Message to be sent - * @param timeout Timeout until the Future is completed with an AskTimeoutException - * @return Future which contains the response to the sent message - */ - Future<Object> ask(Object message, FiniteDuration timeout); - - /** - * Sends a message asynchronously without a result. - * - * @param message Message to be sent - */ - void tell(Object message); - - /** - * Sends a message asynchronously without a result with sender being the sender. - * - * @param message Message to be sent - * @param sender Sender of the message - */ - void tell(Object message, ActorGateway sender); - - /** - * Forwards a message. For the receiver of this message it looks as if sender has sent the - * message. - * - * @param message Message to be sent - * @param sender Sender of the forwarded message - */ - void forward(Object message, ActorGateway sender); - - /** - * Retries to send asynchronously a message up to numberRetries times. The response to this - * message is returned as a future. The message is re-sent if the number of retries is not yet - * exceeded and if an exception occurred while sending it. - * - * @param message Message to be sent - * @param numberRetries Number of times to retry sending the message - * @param timeout Timeout for each sending attempt - * @param executionContext ExecutionContext which is used to send the message multiple times - * @return Future of the response to the sent message - */ - Future<Object> retry( - Object message, - int numberRetries, - FiniteDuration timeout, - ExecutionContext executionContext); - - /** - * Returns the path of the remote instance. - * - * @return Path of the remote instance. - */ - String path(); - - /** - * Returns the underlying actor with which is communicated - * - * @return ActorRef of the target actor - */ - ActorRef actor(); - - /** - * Returns the leaderSessionID associated with the remote actor or null. - * - * @return Leader session ID if its associated with this gateway, otherwise null - */ - UUID leaderSessionID(); -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java deleted file mode 100644 index 26b6176..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.instance; - -import akka.actor.ActorRef; -import akka.pattern.Patterns; -import akka.util.Timeout; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.messages.LeaderSessionMessageDecorator; -import org.apache.flink.runtime.messages.MessageDecorator; -import org.apache.flink.util.Preconditions; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -import java.io.Serializable; -import java.util.UUID; - -/** - * Concrete {@link ActorGateway} implementation which uses Akka to communicate with remote actors. - */ -public class AkkaActorGateway implements ActorGateway, Serializable { - - private static final long serialVersionUID = 42L; - - // ActorRef of the remote instance - private final ActorRef actor; - - // Associated leader session ID, which is used for RequiresLeaderSessionID messages - private final UUID leaderSessionID; - - // Decorator for messages - private final MessageDecorator decorator; - - public AkkaActorGateway(ActorRef actor, UUID leaderSessionID) { - this.actor = Preconditions.checkNotNull(actor); - this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID); - // we want to wrap RequiresLeaderSessionID messages in a LeaderSessionMessage - this.decorator = new LeaderSessionMessageDecorator(leaderSessionID); - } - - /** - * Sends a message asynchronously and returns its response. The response to the message is - * returned as a future. - * - * @param message Message to be sent - * @param timeout Timeout until the Future is completed with an AskTimeoutException - * @return Future which contains the response to the sent message - */ - @Override - public Future<Object> ask(Object message, FiniteDuration timeout) { - Object newMessage = decorator.decorate(message); - return Patterns.ask(actor, newMessage, new Timeout(timeout)); - } - - /** - * Sends a message asynchronously without a result. - * - * @param message Message to be sent - */ - @Override - public void tell(Object message) { - Object newMessage = decorator.decorate(message); - actor.tell(newMessage, ActorRef.noSender()); - } - - /** - * Sends a message asynchronously without a result with sender being the sender. - * - * @param message Message to be sent - * @param sender Sender of the message - */ - @Override - public void tell(Object message, ActorGateway sender) { - Object newMessage = decorator.decorate(message); - actor.tell(newMessage, sender.actor()); - } - - /** - * Forwards a message. For the receiver of this message it looks as if sender has sent the - * message. - * - * @param message Message to be sent - * @param sender Sender of the forwarded message - */ - @Override - public void forward(Object message, ActorGateway sender) { - Object newMessage = decorator.decorate(message); - actor.tell(newMessage, sender.actor()); - } - - /** - * Retries to send asynchronously a message up to numberRetries times. The response to this - * message is returned as a future. The message is re-sent if the number of retries is not yet - * exceeded and if an exception occurred while sending it. - * - * @param message Message to be sent - * @param numberRetries Number of times to retry sending the message - * @param timeout Timeout for each sending attempt - * @param executionContext ExecutionContext which is used to send the message multiple times - * @return Future of the response to the sent message - */ - @Override - public Future<Object> retry( - Object message, - int numberRetries, - FiniteDuration timeout, - ExecutionContext executionContext) { - - Object newMessage = decorator.decorate(message); - - return AkkaUtils.retry( - actor, - newMessage, - numberRetries, - executionContext, - timeout); - } - - /** - * Returns the ActorPath of the remote instance. - * - * @return ActorPath of the remote instance. - */ - @Override - public String path() { - return actor.path().toString(); - } - - /** - * Returns {@link ActorRef} of the target actor - * - * @return ActorRef of the target actor - */ - @Override - public ActorRef actor() { - return actor; - } - - @Override - public UUID leaderSessionID() { - return leaderSessionID; - } - - @Override - public String toString() { - return String.format("AkkaActorGateway(%s, %s)", actor.path(), leaderSessionID); - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 90fc6d2..99bb7b0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -35,7 +35,6 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; -import org.apache.flink.runtime.instance.BaseTestingActorGateway; import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceID; @@ -50,10 +49,6 @@ import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotOwner; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.messages.TaskMessages.CancelTask; -import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions; -import org.apache.flink.runtime.messages.TaskMessages.SubmitTask; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; @@ -74,8 +69,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeoutException; import java.util.function.Predicate; -import scala.concurrent.ExecutionContext; - import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.junit.Assert.assertEquals; @@ -496,29 +489,6 @@ public class ExecutionGraphTestUtils { return groupVertex; } - @SuppressWarnings("serial") - public static class SimpleActorGateway extends BaseTestingActorGateway { - - - public SimpleActorGateway(ExecutionContext executionContext){ - super(executionContext); - } - - @Override - public Object handleMessage(Object message) { - if (message instanceof SubmitTask) { - SubmitTask submitTask = (SubmitTask) message; - return Acknowledge.get(); - } else if(message instanceof CancelTask) { - return Acknowledge.get(); - } else if(message instanceof FailIntermediateResultPartitions) { - return new Object(); - } else { - return null; - } - } - } - public static final String ERROR_MESSAGE = "test_failure_error_message"; public static ExecutionJobVertex getExecutionVertex( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java deleted file mode 100644 index 62bc96b..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.instance; - -import akka.actor.ActorRef; -import akka.dispatch.Futures; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -import java.util.UUID; -import java.util.concurrent.Callable; - -/** - * Abstract base class for testing {@link ActorGateway} instances. The implementing subclass - * only has to provide an implementation for handleMessage which contains the logic to treat - * different messages. - */ -abstract public class BaseTestingActorGateway implements ActorGateway { - /** - * {@link ExecutionContext} which is used to execute the futures. - */ - private final ExecutionContext executionContext; - - public BaseTestingActorGateway(ExecutionContext executionContext) { - this.executionContext = executionContext; - } - - @Override - public Future<Object> ask(Object message, FiniteDuration timeout) { - try { - final Object result = handleMessage(message); - - return Futures.future(new Callable<Object>() { - @Override - public Object call() throws Exception { - return result; - } - }, executionContext); - - } catch (final Exception e) { - // if an exception occurred in the handleMessage method then return it as part of the future - return Futures.future(new Callable<Object>() { - @Override - public Object call() throws Exception { - throw e; - } - }, executionContext); - } - } - - /** - * Handles the supported messages by this InstanceGateway - * - * @param message Message to handle - * @return Result - * @throws Exception - */ - abstract public Object handleMessage(Object message) throws Exception; - - @Override - public void tell(Object message) { - try { - handleMessage(message); - } catch (Exception e) { - // discard exception because it happens on the "remote" instance - } - } - - @Override - public void tell(Object message, ActorGateway sender) { - try{ - handleMessage(message); - } catch (Exception e) { - // discard exception because it happens on the "remote" instance - } - } - - @Override - public void forward(Object message, ActorGateway sender) { - throw new UnsupportedOperationException(); - } - - @Override - public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) { - return ask(message, timeout); - } - - @Override - public String path() { - return "BaseTestingInstanceGateway"; - } - - @Override - public ActorRef actor() { - return ActorRef.noSender(); - } - - @Override - public UUID leaderSessionID() { - return null; - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/ForwardingActorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/ForwardingActorGateway.java deleted file mode 100644 index 18a3c3b..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/ForwardingActorGateway.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import org.apache.flink.runtime.instance.BaseTestingActorGateway; -import org.apache.flink.runtime.testingUtils.TestingUtils; - -import java.util.concurrent.BlockingQueue; - -/** - * Testing {@link org.apache.flink.runtime.instance.ActorGateway} which stores all handled messages - * in a {@link BlockingQueue}. - */ -public class ForwardingActorGateway extends BaseTestingActorGateway { - private static final long serialVersionUID = 7001973884263802603L; - - private final transient BlockingQueue<Object> queue; - - public ForwardingActorGateway(BlockingQueue<Object> queue) { - super(TestingUtils.directExecutionContext()); - - this.queue = queue; - } - - @Override - public Object handleMessage(Object message) throws Exception { - // don't do anything with the message, but storing it in queue - queue.add(message); - - return null; - } -} diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 59709d1..fdc2c07 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -19,26 +19,17 @@ package org.apache.flink.runtime.testingUtils import java.util +import java.util.Collections import java.util.concurrent._ -import java.util.{Collections, UUID} -import akka.actor.{ActorRef, ActorSystem, Kill, Props} -import akka.pattern.{Patterns, ask} +import akka.actor.{ActorRef, Kill} import com.typesafe.config.ConfigFactory -import grizzled.slf4j.Logger import org.apache.flink.api.common.time.Time -import org.apache.flink.configuration._ import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.concurrent.{ScheduledExecutor, ScheduledExecutorServiceAdapter} -import org.apache.flink.runtime.highavailability.HighAvailabilityServices -import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway} -import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService -import org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager, RegisteredAtJobManager} -import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl} import scala.concurrent.duration.{TimeUnit, _} -import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor} +import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} import scala.language.postfixOps /**