This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f0529de9a35009c190863060c8bda1328b3bfd83 Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Thu Apr 9 10:26:27 2020 +0200 [FLINK-15347] Add SupervisorActor which monitors the proper termination of AkkaRpcActors In order to properly complete the termination future of an RpcEndpoint, we need to monitor when the underlying AkkaRpcActor has been removed from the ActorSystem. If this is not done, then it can happen that another RpcEndpoint using the same name cannot be started because the old RpcEndpoint is still registered. The way we achieve this monitoring is to introduce a helper actor which is responsible for starting the AkkaRpcActors for all RpcEndpoints. Since the SupervisorActor is the parent of all RpcEndpoints, it can tell when they are being removed from the ActorSystem through the SupervisorStrategy. A consequence of the new actor is that the akka urls change from akka://flink@actorsystem:port/user/xyz to akka://flink@actorsystem:port/user/rpc/xyz. The respective method AkkaRpcServiceUtils.getRpcUrl has been updated to reflect this change. This hierarchy change also warrants the bump of the AkkaRpcService.VERSION. The failure behaviour of the underlying ActorSystem has been changed so that it terminates all running actors if an exception is thrown from the SupervisorActor. The assumption is that such an exception always indicates a programming error and is unrecoverable. The same applies to failure originating from an AkkaRpcActor (children of the SupervisorActor). If such an exception is thrown, then we assume that the system is in an illegal state and shut it down. The way it works is by recording the failure cause for the respective AkkaRpcActor and then terminating the ActorSystem. --- ...tegy.java => EscalatingSupervisorStrategy.java} | 21 +- .../flink/runtime/rpc/akka/AkkaRpcService.java | 77 +++-- .../runtime/rpc/akka/AkkaRpcServiceUtils.java | 4 +- .../flink/runtime/rpc/akka/SupervisorActor.java | 360 +++++++++++++++++++++ .../org/apache/flink/runtime/akka/AkkaUtils.scala | 2 +- .../runtime/rpc/akka/AkkaActorSystemTest.java | 89 +++++ .../flink/runtime/rpc/akka/AkkaRpcActorTest.java | 22 ++ .../runtime/rpc/akka/SupervisorActorTest.java | 249 ++++++++++++++ 8 files changed, 777 insertions(+), 47 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/EscalatingSupervisorStrategy.java similarity index 61% rename from flink-runtime/src/main/java/org/apache/flink/runtime/akka/StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/akka/EscalatingSupervisorStrategy.java index e2d8fd6..dc0832f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/EscalatingSupervisorStrategy.java @@ -18,36 +18,23 @@ package org.apache.flink.runtime.akka; -import akka.actor.ActorKilledException; import akka.actor.OneForOneStrategy; import akka.actor.SupervisorStrategy; import akka.actor.SupervisorStrategyConfigurator; import akka.japi.pf.PFBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * Stopping supervisor strategy which logs {@link ActorKilledException} only on debug log level. + * Escalating supervisor strategy. */ -public class StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy implements SupervisorStrategyConfigurator { - - private static final Logger LOG = LoggerFactory.getLogger(StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy.class); +public class EscalatingSupervisorStrategy implements SupervisorStrategyConfigurator { @Override public SupervisorStrategy create() { return new OneForOneStrategy( false, new PFBuilder<Throwable, SupervisorStrategy.Directive>() - .match( - Exception.class, - (Exception e) -> { - if (e instanceof ActorKilledException) { - LOG.debug("Actor was killed. Stopping it now.", e); - } else { - LOG.error("Actor failed with exception. Stopping it now.", e); - } - return SupervisorStrategy.Stop$.MODULE$; - }) + .matchAny( + (ignored) -> SupervisorStrategy.escalate()) .build()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 87a7682..f3604de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -32,10 +32,12 @@ import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcServer; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcRuntimeException; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.runtime.rpc.messages.HandshakeSuccessMessage; import org.apache.flink.runtime.rpc.messages.RemoteHandshakeMessage; +import akka.actor.AbstractActor; import akka.actor.ActorIdentity; import akka.actor.ActorRef; import akka.actor.ActorSelection; @@ -105,6 +107,8 @@ public class AkkaRpcService implements RpcService { private final CompletableFuture<Void> terminationFuture; + private final ActorRef supervisor; + private volatile boolean stopped; @VisibleForTesting @@ -133,6 +137,12 @@ public class AkkaRpcService implements RpcService { terminationFuture = new CompletableFuture<>(); stopped = false; + + supervisor = startSupervisorActor(); + } + + private ActorRef startSupervisorActor() { + return SupervisorActor.startSupervisorActor(actorSystem); } public ActorSystem getActorSystem() { @@ -201,32 +211,9 @@ public class AkkaRpcService implements RpcService { public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) { checkNotNull(rpcEndpoint, "rpc endpoint"); - CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); - final Props akkaRpcActorProps; - - if (rpcEndpoint instanceof FencedRpcEndpoint) { - akkaRpcActorProps = Props.create( - FencedAkkaRpcActor.class, - rpcEndpoint, - terminationFuture, - getVersion(), - configuration.getMaximumFramesize()); - } else { - akkaRpcActorProps = Props.create( - AkkaRpcActor.class, - rpcEndpoint, - terminationFuture, - getVersion(), - configuration.getMaximumFramesize()); - } - - ActorRef actorRef; - - synchronized (lock) { - checkState(!stopped, "RpcService is stopped"); - actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId()); - actors.put(actorRef, rpcEndpoint); - } + final SupervisorActor.ActorRegistration actorRegistration = registerAkkaRpcActor(rpcEndpoint); + final ActorRef actorRef = actorRegistration.getActorRef(); + final CompletableFuture<Void> actorTerminationFuture = actorRegistration.getTerminationFuture(); LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path()); @@ -254,7 +241,7 @@ public class AkkaRpcService implements RpcService { actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), - terminationFuture, + actorTerminationFuture, ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken, captureAskCallstacks); @@ -266,7 +253,7 @@ public class AkkaRpcService implements RpcService { actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), - terminationFuture, + actorTerminationFuture, captureAskCallstacks); } @@ -284,6 +271,40 @@ public class AkkaRpcService implements RpcService { return server; } + private <C extends RpcEndpoint & RpcGateway> SupervisorActor.ActorRegistration registerAkkaRpcActor(C rpcEndpoint) { + final Class<? extends AbstractActor> akkaRpcActorType; + + if (rpcEndpoint instanceof FencedRpcEndpoint) { + akkaRpcActorType = FencedAkkaRpcActor.class; + } else { + akkaRpcActorType = AkkaRpcActor.class; + } + + synchronized (lock) { + checkState(!stopped, "RpcService is stopped"); + + final SupervisorActor.StartAkkaRpcActorResponse startAkkaRpcActorResponse = SupervisorActor.startAkkaRpcActor( + supervisor, + actorTerminationFuture -> Props.create( + akkaRpcActorType, + rpcEndpoint, + actorTerminationFuture, + getVersion(), + configuration.getMaximumFramesize()), + rpcEndpoint.getEndpointId()); + + final SupervisorActor.ActorRegistration actorRegistration = startAkkaRpcActorResponse.orElseThrow(cause -> new AkkaRpcRuntimeException( + String.format("Could not create the %s for %s.", + AkkaRpcActor.class.getSimpleName(), + rpcEndpoint.getEndpointId()), + cause)); + + actors.put(actorRegistration.getActorRef(), rpcEndpoint); + + return actorRegistration; + } + } + @Override public <F extends Serializable> RpcServer fenceRpcServer(RpcServer rpcServer, F fencingToken) { if (rpcServer instanceof AkkaBasedEndpoint) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java index 1b50e45..5b464f4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java @@ -57,6 +57,8 @@ public class AkkaRpcServiceUtils { private static final String AKKA_TCP = "akka.tcp"; private static final String AKKA_SSL_TCP = "akka.ssl.tcp"; + static final String SUPERVISOR_NAME = "rpc"; + private static final String SIMPLE_AKKA_CONFIG_TEMPLATE = "akka {remote {netty.tcp {maximum-frame-size = %s}}}"; @@ -166,7 +168,7 @@ public class AkkaRpcServiceUtils { final String hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port); - return String.format("%s://flink@%s/user/%s", protocolPrefix, hostPort, endpointName); + return String.format("%s://flink@%s/user/%s/%s", protocolPrefix, hostPort, SUPERVISOR_NAME, endpointName); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/SupervisorActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/SupervisorActor.java new file mode 100644 index 0000000..e74ad67 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/SupervisorActor.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException; +import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException; +import org.apache.flink.util.Preconditions; + +import akka.AkkaException; +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.ChildRestartStats; +import akka.actor.Props; +import akka.actor.SupervisorStrategy; +import akka.japi.pf.DeciderBuilder; +import akka.pattern.Patterns; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.Function; + +import scala.PartialFunction; +import scala.collection.Iterable; + +/** + * Supervisor actor which is responsible for starting {@link AkkaRpcActor} instances and monitoring + * when the actors have terminated. + */ +class SupervisorActor extends AbstractActor { + + private static final Logger LOG = LoggerFactory.getLogger(SupervisorActor.class); + + private final Map<ActorRef, AkkaRpcActorRegistration> registeredAkkaRpcActors; + + SupervisorActor() { + this.registeredAkkaRpcActors = new HashMap<>(); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(StartAkkaRpcActor.class, this::createStartAkkaRpcActorMessage) + .matchAny(this::handleUnknownMessage) + .build(); + } + + @Override + public void postStop() throws Exception { + LOG.debug("Stopping supervisor actor."); + + super.postStop(); + + for (AkkaRpcActorRegistration actorRegistration : registeredAkkaRpcActors.values()) { + terminateAkkaRpcActorOnStop(actorRegistration); + } + + registeredAkkaRpcActors.clear(); + } + + @Override + public SupervisorActorSupervisorStrategy supervisorStrategy() { + return new SupervisorActorSupervisorStrategy(); + } + + private void terminateAkkaRpcActorOnStop(AkkaRpcActorRegistration akkaRpcActorRegistration) { + akkaRpcActorRegistration.terminateExceptionally(new AkkaRpcException( + String.format("Unexpected closing of %s with name %s.", getClass().getSimpleName(), akkaRpcActorRegistration.getEndpointId()))); + } + + private void createStartAkkaRpcActorMessage(StartAkkaRpcActor startAkkaRpcActor) { + final String endpointId = startAkkaRpcActor.getEndpointId(); + final AkkaRpcActorRegistration akkaRpcActorRegistration = new AkkaRpcActorRegistration(endpointId); + + final Props akkaRpcActorProps = startAkkaRpcActor.getPropsFactory().create(akkaRpcActorRegistration.getInternalTerminationFuture()); + + LOG.debug("Starting {} with name {}.", akkaRpcActorProps.actorClass().getSimpleName(), endpointId); + + try { + final ActorRef actorRef = getContext().actorOf(akkaRpcActorProps, endpointId); + + registeredAkkaRpcActors.put(actorRef, akkaRpcActorRegistration); + + getSender().tell(StartAkkaRpcActorResponse.success( + ActorRegistration.create( + actorRef, + akkaRpcActorRegistration.getExternalTerminationFuture())), + getSelf()); + } catch (AkkaException akkaException) { + getSender().tell(StartAkkaRpcActorResponse.failure(akkaException), getSelf()); + } + } + + private void akkaRpcActorTerminated(ActorRef actorRef) { + final AkkaRpcActorRegistration actorRegistration = removeAkkaRpcActor(actorRef); + + LOG.debug("AkkaRpcActor {} has terminated.", actorRef.path()); + actorRegistration.terminate(); + } + + private void akkaRpcActorFailed(ActorRef actorRef, Throwable cause) { + LOG.warn("AkkaRpcActor {} has failed. Shutting it down now.", actorRef.path(), cause); + + for (Map.Entry<ActorRef, AkkaRpcActorRegistration> registeredAkkaRpcActor : registeredAkkaRpcActors.entrySet()) { + final ActorRef otherActorRef = registeredAkkaRpcActor.getKey(); + if (otherActorRef.equals(actorRef)) { + final AkkaRpcException error = new AkkaRpcException(String.format("Stopping actor %s because it failed.", actorRef.path()), cause); + registeredAkkaRpcActor.getValue().markFailed(error); + } else { + final AkkaRpcException siblingException = new AkkaRpcException(String.format("Stopping actor %s because its sibling %s has failed.", otherActorRef.path(), actorRef.path())); + registeredAkkaRpcActor.getValue().markFailed(siblingException); + } + } + + getContext().getSystem().terminate(); + } + + private AkkaRpcActorRegistration removeAkkaRpcActor(ActorRef actorRef) { + return Optional.ofNullable(registeredAkkaRpcActors.remove(actorRef)) + .orElseThrow(() -> new IllegalStateException(String.format("Could not find actor %s.", actorRef.path()))); + } + + private void handleUnknownMessage(Object msg) { + final AkkaUnknownMessageException cause = new AkkaUnknownMessageException(String.format("Cannot handle unknown message %s.", msg)); + getSender().tell(new akka.actor.Status.Failure(cause), getSelf()); + throw cause; + } + + public static String getActorName() { + return AkkaRpcServiceUtils.SUPERVISOR_NAME; + } + + public static ActorRef startSupervisorActor(ActorSystem actorSystem) { + final Props supervisorProps = Props.create(SupervisorActor.class); + return actorSystem.actorOf(supervisorProps, getActorName()); + } + + public static StartAkkaRpcActorResponse startAkkaRpcActor( + ActorRef supervisor, + StartAkkaRpcActor.PropsFactory propsFactory, + String endpointId) { + return Patterns.ask( + supervisor, + createStartAkkaRpcActorMessage( + propsFactory, + endpointId), + RpcUtils.INF_DURATION).toCompletableFuture() + .thenApply(SupervisorActor.StartAkkaRpcActorResponse.class::cast) + .join(); + } + + public static StartAkkaRpcActor createStartAkkaRpcActorMessage( + StartAkkaRpcActor.PropsFactory propsFactory, + String endpointId) { + return StartAkkaRpcActor.create(propsFactory, endpointId); + } + + // ----------------------------------------------------------------------------- + // Internal classes + // ----------------------------------------------------------------------------- + + private final class SupervisorActorSupervisorStrategy extends SupervisorStrategy { + + @Override + public PartialFunction<Throwable, Directive> decider() { + return DeciderBuilder.match( + Exception.class, e -> SupervisorStrategy.stop() + ).build(); + } + + @Override + public boolean loggingEnabled() { + return false; + } + + @Override + public void handleChildTerminated(akka.actor.ActorContext context, ActorRef child, Iterable<ActorRef> children) { + akkaRpcActorTerminated(child); + } + + @Override + public void processFailure(akka.actor.ActorContext context, boolean restart, ActorRef child, Throwable cause, ChildRestartStats stats, Iterable<ChildRestartStats> children) { + Preconditions.checkArgument(!restart, "The supervisor strategy should never restart an actor."); + + akkaRpcActorFailed(child, cause); + } + } + + private static final class AkkaRpcActorRegistration { + private final String endpointId; + + private final CompletableFuture<Void> internalTerminationFuture; + + private final CompletableFuture<Void> externalTerminationFuture; + + @Nullable + private Throwable errorCause; + + private AkkaRpcActorRegistration(String endpointId) { + this.endpointId = endpointId; + internalTerminationFuture = new CompletableFuture<>(); + externalTerminationFuture = new CompletableFuture<>(); + errorCause = null; + } + + private CompletableFuture<Void> getInternalTerminationFuture() { + return internalTerminationFuture; + } + + private CompletableFuture<Void> getExternalTerminationFuture() { + return externalTerminationFuture; + } + + private String getEndpointId() { + return endpointId; + } + + private void terminate() { + CompletableFuture<Void> terminationFuture = internalTerminationFuture; + + if (errorCause != null) { + if (!internalTerminationFuture.completeExceptionally(errorCause)) { + // we have another failure reason -> let's add it + terminationFuture = internalTerminationFuture.handle( + (ignored, throwable) -> { + if (throwable != null) { + errorCause.addSuppressed(throwable); + } + + throw new CompletionException(errorCause); + }); + } + } else { + internalTerminationFuture.completeExceptionally( + new AkkaRpcException( + String.format("RpcEndpoint %s did not complete the internal termination future.", endpointId))); + } + + FutureUtils.forward(terminationFuture, externalTerminationFuture); + } + + private void terminateExceptionally(Throwable cause) { + externalTerminationFuture.completeExceptionally(cause); + } + + public void markFailed(Throwable cause) { + if (errorCause == null) { + errorCause = cause; + } else { + errorCause.addSuppressed(cause); + } + } + } + + // ----------------------------------------------------------------------------- + // Messages + // ----------------------------------------------------------------------------- + + static final class StartAkkaRpcActor { + private final PropsFactory propsFactory; + private final String endpointId; + + private StartAkkaRpcActor(PropsFactory propsFactory, String endpointId) { + this.propsFactory = propsFactory; + this.endpointId = endpointId; + } + + public String getEndpointId() { + return endpointId; + } + + public PropsFactory getPropsFactory() { + return propsFactory; + } + + private static StartAkkaRpcActor create(PropsFactory propsFactory, String endpointId) { + return new StartAkkaRpcActor(propsFactory, endpointId); + } + + interface PropsFactory { + Props create(CompletableFuture<Void> terminationFuture); + } + } + + static final class ActorRegistration { + private final ActorRef actorRef; + private final CompletableFuture<Void> terminationFuture; + + private ActorRegistration(ActorRef actorRef, CompletableFuture<Void> terminationFuture) { + this.actorRef = actorRef; + this.terminationFuture = terminationFuture; + } + + public ActorRef getActorRef() { + return actorRef; + } + + public CompletableFuture<Void> getTerminationFuture() { + return terminationFuture; + } + + public static ActorRegistration create(ActorRef actorRef, CompletableFuture<Void> terminationFuture) { + return new ActorRegistration(actorRef, terminationFuture); + } + } + + static final class StartAkkaRpcActorResponse { + @Nullable + private final ActorRegistration actorRegistration; + + @Nullable + private final Throwable error; + + private StartAkkaRpcActorResponse(@Nullable ActorRegistration actorRegistration, @Nullable Throwable error) { + this.actorRegistration = actorRegistration; + this.error = error; + } + + public <X extends Throwable> ActorRegistration orElseThrow(Function<? super Throwable, ? extends X> throwableFunction) throws X { + if (actorRegistration != null) { + return actorRegistration; + } else { + throw throwableFunction.apply(error); + } + } + + public static StartAkkaRpcActorResponse success(ActorRegistration actorRegistration) { + return new StartAkkaRpcActorResponse(actorRegistration, null); + } + + public static StartAkkaRpcActorResponse failure(Throwable error) { + return new StartAkkaRpcActorResponse(null, error); + } + } +} diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index efa15c2..adf0ebd 100755 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -273,7 +273,7 @@ object AkkaUtils { val logLevel = getLogLevel - val supervisorStrategy = classOf[StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy] + val supervisorStrategy = classOf[EscalatingSupervisorStrategy] .getCanonicalName val config = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaActorSystemTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaActorSystemTest.java new file mode 100644 index 0000000..6d3830a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaActorSystemTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.Terminated; +import akka.japi.pf.ReceiveBuilder; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; + +/** + * Tests for the {@link akka.actor.ActorSystem} instantiated through {@link AkkaUtils}. + */ +public class AkkaActorSystemTest extends TestLogger { + + @Test + public void shutsDownOnActorFailure() { + final ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); + + try { + final CompletableFuture<Terminated> terminationFuture = actorSystem.getWhenTerminated().toCompletableFuture(); + final ActorRef actorRef = actorSystem.actorOf(Props.create(SimpleActor.class)); + + final FlinkException cause = new FlinkException("Flink test exception"); + + actorRef.tell(Fail.exceptionally(cause), ActorRef.noSender()); + + // make sure that the ActorSystem shuts down + terminationFuture.join(); + } finally { + AkkaUtils.terminateActorSystem(actorSystem).join(); + } + } + + private static final class SimpleActor extends AbstractActor { + + @Override + public Receive createReceive() { + return ReceiveBuilder.create() + .match(Fail.class, this::handleFail) + .build(); + } + + private void handleFail(Fail fail) { + throw new RuntimeException(fail.getErrorCause()); + } + } + + private static final class Fail { + private final Throwable errorCause; + + private Fail(Throwable errorCause) { + this.errorCause = errorCause; + } + + private Throwable getErrorCause() { + return errorCause; + } + + private static Fail exceptionally(Throwable errorCause) { + return new Fail(errorCause); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 64270c4..404f5c1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -42,6 +42,8 @@ import org.hamcrest.core.Is; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -52,6 +54,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -64,6 +67,8 @@ import static org.junit.Assert.fail; */ public class AkkaRpcActorTest extends TestLogger { + private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcActorTest.class); + // ------------------------------------------------------------------------ // shared test members // ------------------------------------------------------------------------ @@ -428,6 +433,23 @@ public class AkkaRpcActorTest extends TestLogger { } } + @Test + public void canReuseEndpointNameAfterTermination() throws Exception { + final String endpointName = "not_unique"; + try (SimpleRpcEndpoint simpleRpcEndpoint1 = new SimpleRpcEndpoint(akkaRpcService, endpointName)) { + + simpleRpcEndpoint1.start(); + + simpleRpcEndpoint1.closeAsync().join(); + + try (SimpleRpcEndpoint simpleRpcEndpoint2 = new SimpleRpcEndpoint(akkaRpcService, endpointName)) { + simpleRpcEndpoint2.start(); + + assertThat(simpleRpcEndpoint2.getAddress(), is(equalTo(simpleRpcEndpoint1.getAddress()))); + } + } + } + // ------------------------------------------------------------------------ // Test Actors and Interfaces // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SupervisorActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SupervisorActorTest.java new file mode 100644 index 0000000..ecf68ee --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SupervisorActorTest.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.Terminated; +import akka.japi.pf.ReceiveBuilder; +import org.junit.Rule; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link SupervisorActor}. + */ +public class SupervisorActorTest extends TestLogger { + + @Rule + public final ActorSystemResource actorSystemResource = ActorSystemResource.defaultConfiguration(); + + @Test + public void completesTerminationFutureIfActorStops() { + final ActorSystem actorSystem = actorSystemResource.getActorSystem(); + + final ActorRef supervisor = SupervisorActor.startSupervisorActor(actorSystem); + + final SupervisorActor.ActorRegistration actorRegistration = startAkkaRpcActor(supervisor, "foobar"); + + final CompletableFuture<Void> terminationFuture = actorRegistration.getTerminationFuture(); + assertThat(terminationFuture.isDone(), is(false)); + + actorRegistration.getActorRef().tell(TerminateWithFutureCompletion.normal(), ActorRef.noSender()); + + terminationFuture.join(); + } + + @Test + public void completesTerminationFutureExceptionallyIfActorStopsExceptionally() throws Exception { + final ActorSystem actorSystem = actorSystemResource.getActorSystem(); + + final ActorRef supervisor = SupervisorActor.startSupervisorActor(actorSystem); + + final SupervisorActor.ActorRegistration actorRegistration = startAkkaRpcActor(supervisor, "foobar"); + + final CompletableFuture<Void> terminationFuture = actorRegistration.getTerminationFuture(); + assertThat(terminationFuture.isDone(), is(false)); + + final FlinkException cause = new FlinkException("Test cause."); + actorRegistration.getActorRef().tell(TerminateWithFutureCompletion.exceptionally(cause), ActorRef.noSender()); + + try { + terminationFuture.get(); + fail("Expected the termination future being completed exceptionally"); + } catch (ExecutionException expected) { + ExceptionUtils.findThrowable(expected, e -> e.equals(cause)) + .orElseThrow(() -> new FlinkException("Unexpected exception", expected)); + } + } + + @Test + public void completesTerminationFutureExceptionallyIfActorStopsWithoutReason() throws InterruptedException { + final ActorSystem actorSystem = actorSystemResource.getActorSystem(); + + final ActorRef supervisor = SupervisorActor.startSupervisorActor(actorSystem); + + final SupervisorActor.ActorRegistration actorRegistration = startAkkaRpcActor(supervisor, "foobar"); + + final CompletableFuture<Void> terminationFuture = actorRegistration.getTerminationFuture(); + assertThat(terminationFuture.isDone(), is(false)); + + actorRegistration.getActorRef().tell(Terminate.INSTANCE, ActorRef.noSender()); + + try { + terminationFuture.get(); + fail("Expected the termination future being completed exceptionally"); + } catch (ExecutionException expected) {} + } + + @Test + public void completesTerminationFutureExceptionallyIfActorFails() throws Exception { + final ActorSystem actorSystem = actorSystemResource.getActorSystem(); + + final ActorRef supervisor = SupervisorActor.startSupervisorActor(actorSystem); + + final SupervisorActor.ActorRegistration actorRegistration = startAkkaRpcActor(supervisor, "foobar"); + + final CompletableFuture<Void> terminationFuture = actorRegistration.getTerminationFuture(); + assertThat(terminationFuture.isDone(), is(false)); + + final CompletableFuture<Terminated> actorSystemTerminationFuture = actorSystem.getWhenTerminated().toCompletableFuture(); + + final FlinkException cause = new FlinkException("Test cause."); + actorRegistration.getActorRef().tell(Fail.exceptionally(cause), ActorRef.noSender()); + + try { + terminationFuture.get(); + fail("Expected the termination future being completed exceptionally"); + } catch (ExecutionException expected) { + ExceptionUtils.findThrowable(expected, e -> e.equals(cause)) + .orElseThrow(() -> new FlinkException("Unexpected exception", expected)); + } + + // make sure that the supervisor actor has stopped --> terminating the actor system + actorSystemTerminationFuture.join(); + } + + @Test + public void completesTerminationFutureOfSiblingsIfActorFails() throws Exception { + final ActorSystem actorSystem = actorSystemResource.getActorSystem(); + + final ActorRef supervisor = SupervisorActor.startSupervisorActor(actorSystem); + + final SupervisorActor.ActorRegistration actorRegistration1 = startAkkaRpcActor(supervisor, "foobar1"); + final SupervisorActor.ActorRegistration actorRegistration2 = startAkkaRpcActor(supervisor, "foobar2"); + + final CompletableFuture<Void> terminationFuture = actorRegistration2.getTerminationFuture(); + assertThat(terminationFuture.isDone(), is(false)); + + final FlinkException cause = new FlinkException("Test cause."); + actorRegistration1.getActorRef().tell(Fail.exceptionally(cause), ActorRef.noSender()); + + try { + terminationFuture.get(); + fail("Expected the termination future being completed exceptionally"); + } catch (ExecutionException expected) {} + } + + private SupervisorActor.ActorRegistration startAkkaRpcActor(ActorRef supervisor, String endpointId) { + final SupervisorActor.StartAkkaRpcActorResponse startResponse = SupervisorActor.startAkkaRpcActor( + supervisor, + terminationFuture -> Props.create(SimpleActor.class, terminationFuture), + endpointId); + + return startResponse.orElseThrow(cause -> new AssertionError("Expected the start to succeed.", cause)); + } + + private static final class SimpleActor extends AbstractActor { + + private final CompletableFuture<Void> terminationFuture; + + private SimpleActor(CompletableFuture<Void> terminationFuture) { + this.terminationFuture = terminationFuture; + } + + @Override + public Receive createReceive() { + return ReceiveBuilder.create() + .match(Terminate.class, this::terminate) + .match(TerminateWithFutureCompletion.class, this::terminateActorWithFutureCompletion) + .match(Fail.class, this::fail) + .build(); + } + + private void fail(Fail fail) { + throw new RuntimeException(fail.getCause()); + } + + private void terminate(Terminate terminate) { + terminateActor(); + } + + private void terminateActor() { + getContext().stop(getSelf()); + } + + private void terminateActorWithFutureCompletion(TerminateWithFutureCompletion terminateWithFutureCompletion) { + final Throwable terminationError = terminateWithFutureCompletion.getTerminationError(); + if (terminationError == null) { + terminationFuture.complete(null); + } else { + terminationFuture.completeExceptionally(terminationError); + } + + terminateActor(); + } + } + + private static final class Terminate { + private static final Terminate INSTANCE = new Terminate(); + } + + private static final class TerminateWithFutureCompletion { + @Nullable + private final Throwable terminationError; + + private TerminateWithFutureCompletion(@Nullable Throwable terminationError) { + this.terminationError = terminationError; + } + + @Nullable + private Throwable getTerminationError() { + return terminationError; + } + + private static TerminateWithFutureCompletion normal() { + return new TerminateWithFutureCompletion(null); + } + + private static TerminateWithFutureCompletion exceptionally(Throwable cause) { + return new TerminateWithFutureCompletion(cause); + } + } + + private static final class Fail { + private final Throwable cause; + + private Fail(Throwable cause) { + this.cause = cause; + } + + private Throwable getCause() { + return cause; + } + + private static Fail exceptionally(Throwable cause) { + return new Fail(cause); + } + } +}