This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f0529de9a35009c190863060c8bda1328b3bfd83
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Thu Apr 9 10:26:27 2020 +0200

    [FLINK-15347] Add SupervisorActor which monitors the proper termination of 
AkkaRpcActors
    
    In order to properly complete the termination future of an RpcEndpoint, we 
need to monitor
    when the underlying AkkaRpcActor has been removed from the ActorSystem. If 
this is not done,
    then it can happen that another RpcEndpoint using the same name cannot be 
started because
    the old RpcEndpoint is still registered.
    
    The way we achieve this monitoring is to introduce a helper actor which is 
responsible for
    starting the AkkaRpcActors for all RpcEndpoints. Since the SupervisorActor 
is the parent
    of all RpcEndpoints, it can tell when they are being removed from the 
ActorSystem through
    the SupervisorStrategy.
    
    A consequence of the new actor is that the akka urls change from 
akka://flink@actorsystem:port/user/xyz
    to akka://flink@actorsystem:port/user/rpc/xyz. The respective method 
AkkaRpcServiceUtils.getRpcUrl
    has been updated to reflect this change. This hierarchy change also 
warrants the bump of the
    AkkaRpcService.VERSION.
    
    The failure behaviour of the underlying ActorSystem has been changed so 
that it terminates
    all running actors if an exception is thrown from the SupervisorActor. The 
assumption is that
    such an exception always indicates a programming error and is unrecoverable.
    
    The same applies to failure originating from an AkkaRpcActor (children of 
the SupervisorActor).
    If such an exception is thrown, then we assume that the system is in an 
illegal state and shut it
    down. The way it works is by recording the failure cause for the respective 
AkkaRpcActor and
    then terminating the ActorSystem.
---
 ...tegy.java => EscalatingSupervisorStrategy.java} |  21 +-
 .../flink/runtime/rpc/akka/AkkaRpcService.java     |  77 +++--
 .../runtime/rpc/akka/AkkaRpcServiceUtils.java      |   4 +-
 .../flink/runtime/rpc/akka/SupervisorActor.java    | 360 +++++++++++++++++++++
 .../org/apache/flink/runtime/akka/AkkaUtils.scala  |   2 +-
 .../runtime/rpc/akka/AkkaActorSystemTest.java      |  89 +++++
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java   |  22 ++
 .../runtime/rpc/akka/SupervisorActorTest.java      | 249 ++++++++++++++
 8 files changed, 777 insertions(+), 47 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/EscalatingSupervisorStrategy.java
similarity index 61%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/akka/StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/akka/EscalatingSupervisorStrategy.java
index e2d8fd6..dc0832f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/EscalatingSupervisorStrategy.java
@@ -18,36 +18,23 @@
 
 package org.apache.flink.runtime.akka;
 
-import akka.actor.ActorKilledException;
 import akka.actor.OneForOneStrategy;
 import akka.actor.SupervisorStrategy;
 import akka.actor.SupervisorStrategyConfigurator;
 import akka.japi.pf.PFBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * Stopping supervisor strategy which logs {@link ActorKilledException} only 
on debug log level.
+ * Escalating supervisor strategy.
  */
-public class StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy 
implements SupervisorStrategyConfigurator {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy.class);
+public class EscalatingSupervisorStrategy implements 
SupervisorStrategyConfigurator {
 
        @Override
        public SupervisorStrategy create() {
                return new OneForOneStrategy(
                        false,
                        new PFBuilder<Throwable, SupervisorStrategy.Directive>()
-                               .match(
-                                       Exception.class,
-                                       (Exception e) -> {
-                                               if (e instanceof 
ActorKilledException) {
-                                                       LOG.debug("Actor was 
killed. Stopping it now.", e);
-                                               } else {
-                                                       LOG.error("Actor failed 
with exception. Stopping it now.", e);
-                                               }
-                                               return 
SupervisorStrategy.Stop$.MODULE$;
-                                       })
+                               .matchAny(
+                                       (ignored) -> 
SupervisorStrategy.escalate())
                                .build());
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 87a7682..f3604de 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -32,10 +32,12 @@ import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcServer;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcRuntimeException;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.runtime.rpc.messages.HandshakeSuccessMessage;
 import org.apache.flink.runtime.rpc.messages.RemoteHandshakeMessage;
 
+import akka.actor.AbstractActor;
 import akka.actor.ActorIdentity;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -105,6 +107,8 @@ public class AkkaRpcService implements RpcService {
 
        private final CompletableFuture<Void> terminationFuture;
 
+       private final ActorRef supervisor;
+
        private volatile boolean stopped;
 
        @VisibleForTesting
@@ -133,6 +137,12 @@ public class AkkaRpcService implements RpcService {
                terminationFuture = new CompletableFuture<>();
 
                stopped = false;
+
+               supervisor = startSupervisorActor();
+       }
+
+       private ActorRef startSupervisorActor() {
+               return SupervisorActor.startSupervisorActor(actorSystem);
        }
 
        public ActorSystem getActorSystem() {
@@ -201,32 +211,9 @@ public class AkkaRpcService implements RpcService {
        public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C 
rpcEndpoint) {
                checkNotNull(rpcEndpoint, "rpc endpoint");
 
-               CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
-               final Props akkaRpcActorProps;
-
-               if (rpcEndpoint instanceof FencedRpcEndpoint) {
-                       akkaRpcActorProps = Props.create(
-                               FencedAkkaRpcActor.class,
-                               rpcEndpoint,
-                               terminationFuture,
-                               getVersion(),
-                               configuration.getMaximumFramesize());
-               } else {
-                       akkaRpcActorProps = Props.create(
-                               AkkaRpcActor.class,
-                               rpcEndpoint,
-                               terminationFuture,
-                               getVersion(),
-                               configuration.getMaximumFramesize());
-               }
-
-               ActorRef actorRef;
-
-               synchronized (lock) {
-                       checkState(!stopped, "RpcService is stopped");
-                       actorRef = actorSystem.actorOf(akkaRpcActorProps, 
rpcEndpoint.getEndpointId());
-                       actors.put(actorRef, rpcEndpoint);
-               }
+               final SupervisorActor.ActorRegistration actorRegistration = 
registerAkkaRpcActor(rpcEndpoint);
+               final ActorRef actorRef = actorRegistration.getActorRef();
+               final CompletableFuture<Void> actorTerminationFuture = 
actorRegistration.getTerminationFuture();
 
                LOG.info("Starting RPC endpoint for {} at {} .", 
rpcEndpoint.getClass().getName(), actorRef.path());
 
@@ -254,7 +241,7 @@ public class AkkaRpcService implements RpcService {
                                actorRef,
                                configuration.getTimeout(),
                                configuration.getMaximumFramesize(),
-                               terminationFuture,
+                               actorTerminationFuture,
                                ((FencedRpcEndpoint<?>) 
rpcEndpoint)::getFencingToken,
                                captureAskCallstacks);
 
@@ -266,7 +253,7 @@ public class AkkaRpcService implements RpcService {
                                actorRef,
                                configuration.getTimeout(),
                                configuration.getMaximumFramesize(),
-                               terminationFuture,
+                               actorTerminationFuture,
                                captureAskCallstacks);
                }
 
@@ -284,6 +271,40 @@ public class AkkaRpcService implements RpcService {
                return server;
        }
 
+       private <C extends RpcEndpoint & RpcGateway> 
SupervisorActor.ActorRegistration registerAkkaRpcActor(C rpcEndpoint) {
+               final Class<? extends AbstractActor> akkaRpcActorType;
+
+               if (rpcEndpoint instanceof FencedRpcEndpoint) {
+                       akkaRpcActorType = FencedAkkaRpcActor.class;
+               } else {
+                       akkaRpcActorType = AkkaRpcActor.class;
+               }
+
+               synchronized (lock) {
+                       checkState(!stopped, "RpcService is stopped");
+
+                       final SupervisorActor.StartAkkaRpcActorResponse 
startAkkaRpcActorResponse = SupervisorActor.startAkkaRpcActor(
+                               supervisor,
+                               actorTerminationFuture -> Props.create(
+                                       akkaRpcActorType,
+                                       rpcEndpoint,
+                                       actorTerminationFuture,
+                                       getVersion(),
+                                       configuration.getMaximumFramesize()),
+                               rpcEndpoint.getEndpointId());
+
+                       final SupervisorActor.ActorRegistration 
actorRegistration = startAkkaRpcActorResponse.orElseThrow(cause -> new 
AkkaRpcRuntimeException(
+                               String.format("Could not create the %s for %s.",
+                                       AkkaRpcActor.class.getSimpleName(),
+                                       rpcEndpoint.getEndpointId()),
+                               cause));
+
+                       actors.put(actorRegistration.getActorRef(), 
rpcEndpoint);
+
+                       return actorRegistration;
+               }
+       }
+
        @Override
        public <F extends Serializable> RpcServer fenceRpcServer(RpcServer 
rpcServer, F fencingToken) {
                if (rpcServer instanceof AkkaBasedEndpoint) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 1b50e45..5b464f4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -57,6 +57,8 @@ public class AkkaRpcServiceUtils {
        private static final String AKKA_TCP = "akka.tcp";
        private static final String AKKA_SSL_TCP = "akka.ssl.tcp";
 
+       static final String SUPERVISOR_NAME = "rpc";
+
        private static final String SIMPLE_AKKA_CONFIG_TEMPLATE =
                "akka {remote {netty.tcp {maximum-frame-size = %s}}}";
 
@@ -166,7 +168,7 @@ public class AkkaRpcServiceUtils {
 
                final String hostPort = 
NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port);
 
-               return String.format("%s://flink@%s/user/%s", protocolPrefix, 
hostPort, endpointName);
+               return String.format("%s://flink@%s/user/%s/%s", 
protocolPrefix, hostPort, SUPERVISOR_NAME, endpointName);
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/SupervisorActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/SupervisorActor.java
new file mode 100644
index 0000000..e74ad67
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/SupervisorActor.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
+import 
org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
+import org.apache.flink.util.Preconditions;
+
+import akka.AkkaException;
+import akka.actor.AbstractActor;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.ChildRestartStats;
+import akka.actor.Props;
+import akka.actor.SupervisorStrategy;
+import akka.japi.pf.DeciderBuilder;
+import akka.pattern.Patterns;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.Function;
+
+import scala.PartialFunction;
+import scala.collection.Iterable;
+
+/**
+ * Supervisor actor which is responsible for starting {@link AkkaRpcActor} 
instances and monitoring
+ * when the actors have terminated.
+ */
+class SupervisorActor extends AbstractActor {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorActor.class);
+
+       private final Map<ActorRef, AkkaRpcActorRegistration> 
registeredAkkaRpcActors;
+
+       SupervisorActor() {
+               this.registeredAkkaRpcActors = new HashMap<>();
+       }
+
+       @Override
+       public Receive createReceive() {
+               return receiveBuilder()
+                       .match(StartAkkaRpcActor.class, 
this::createStartAkkaRpcActorMessage)
+                       .matchAny(this::handleUnknownMessage)
+                       .build();
+       }
+
+       @Override
+       public void postStop() throws Exception {
+               LOG.debug("Stopping supervisor actor.");
+
+               super.postStop();
+
+               for (AkkaRpcActorRegistration actorRegistration : 
registeredAkkaRpcActors.values()) {
+                       terminateAkkaRpcActorOnStop(actorRegistration);
+               }
+
+               registeredAkkaRpcActors.clear();
+       }
+
+       @Override
+       public SupervisorActorSupervisorStrategy supervisorStrategy() {
+               return new SupervisorActorSupervisorStrategy();
+       }
+
+       private void terminateAkkaRpcActorOnStop(AkkaRpcActorRegistration 
akkaRpcActorRegistration) {
+               akkaRpcActorRegistration.terminateExceptionally(new 
AkkaRpcException(
+                       String.format("Unexpected closing of %s with name %s.", 
getClass().getSimpleName(), akkaRpcActorRegistration.getEndpointId())));
+       }
+
+       private void createStartAkkaRpcActorMessage(StartAkkaRpcActor 
startAkkaRpcActor) {
+               final String endpointId = startAkkaRpcActor.getEndpointId();
+               final AkkaRpcActorRegistration akkaRpcActorRegistration = new 
AkkaRpcActorRegistration(endpointId);
+
+               final Props akkaRpcActorProps = 
startAkkaRpcActor.getPropsFactory().create(akkaRpcActorRegistration.getInternalTerminationFuture());
+
+               LOG.debug("Starting {} with name {}.", 
akkaRpcActorProps.actorClass().getSimpleName(), endpointId);
+
+               try {
+                       final ActorRef actorRef = 
getContext().actorOf(akkaRpcActorProps, endpointId);
+
+                       registeredAkkaRpcActors.put(actorRef, 
akkaRpcActorRegistration);
+
+                       getSender().tell(StartAkkaRpcActorResponse.success(
+                               ActorRegistration.create(
+                                       actorRef,
+                                       
akkaRpcActorRegistration.getExternalTerminationFuture())),
+                               getSelf());
+               } catch (AkkaException akkaException) {
+                       
getSender().tell(StartAkkaRpcActorResponse.failure(akkaException), getSelf());
+               }
+       }
+
+       private void akkaRpcActorTerminated(ActorRef actorRef) {
+               final AkkaRpcActorRegistration actorRegistration = 
removeAkkaRpcActor(actorRef);
+
+               LOG.debug("AkkaRpcActor {} has terminated.", actorRef.path());
+               actorRegistration.terminate();
+       }
+
+       private void akkaRpcActorFailed(ActorRef actorRef, Throwable cause) {
+               LOG.warn("AkkaRpcActor {} has failed. Shutting it down now.", 
actorRef.path(), cause);
+
+               for (Map.Entry<ActorRef, AkkaRpcActorRegistration> 
registeredAkkaRpcActor : registeredAkkaRpcActors.entrySet()) {
+                       final ActorRef otherActorRef = 
registeredAkkaRpcActor.getKey();
+                       if (otherActorRef.equals(actorRef)) {
+                               final AkkaRpcException error = new 
AkkaRpcException(String.format("Stopping actor %s because it failed.", 
actorRef.path()), cause);
+                               
registeredAkkaRpcActor.getValue().markFailed(error);
+                       } else {
+                               final AkkaRpcException siblingException = new 
AkkaRpcException(String.format("Stopping actor %s because its sibling %s has 
failed.", otherActorRef.path(), actorRef.path()));
+                               
registeredAkkaRpcActor.getValue().markFailed(siblingException);
+                       }
+               }
+
+               getContext().getSystem().terminate();
+       }
+
+       private AkkaRpcActorRegistration removeAkkaRpcActor(ActorRef actorRef) {
+               return 
Optional.ofNullable(registeredAkkaRpcActors.remove(actorRef))
+                       .orElseThrow(() -> new 
IllegalStateException(String.format("Could not find actor %s.", 
actorRef.path())));
+       }
+
+       private void handleUnknownMessage(Object msg) {
+               final AkkaUnknownMessageException cause = new 
AkkaUnknownMessageException(String.format("Cannot handle unknown message %s.", 
msg));
+               getSender().tell(new akka.actor.Status.Failure(cause), 
getSelf());
+               throw cause;
+       }
+
+       public static String getActorName() {
+               return AkkaRpcServiceUtils.SUPERVISOR_NAME;
+       }
+
+       public static ActorRef startSupervisorActor(ActorSystem actorSystem) {
+               final Props supervisorProps = 
Props.create(SupervisorActor.class);
+               return actorSystem.actorOf(supervisorProps, getActorName());
+       }
+
+       public static StartAkkaRpcActorResponse startAkkaRpcActor(
+                       ActorRef supervisor,
+                       StartAkkaRpcActor.PropsFactory propsFactory,
+                       String endpointId) {
+               return Patterns.ask(
+                               supervisor,
+                               createStartAkkaRpcActorMessage(
+                                       propsFactory,
+                                       endpointId),
+                               RpcUtils.INF_DURATION).toCompletableFuture()
+                       
.thenApply(SupervisorActor.StartAkkaRpcActorResponse.class::cast)
+                       .join();
+       }
+
+       public static StartAkkaRpcActor createStartAkkaRpcActorMessage(
+                       StartAkkaRpcActor.PropsFactory propsFactory,
+                       String endpointId) {
+               return StartAkkaRpcActor.create(propsFactory, endpointId);
+       }
+
+       // 
-----------------------------------------------------------------------------
+       // Internal classes
+       // 
-----------------------------------------------------------------------------
+
+       private final class SupervisorActorSupervisorStrategy extends 
SupervisorStrategy {
+
+               @Override
+               public PartialFunction<Throwable, Directive> decider() {
+                       return DeciderBuilder.match(
+                               Exception.class, e -> SupervisorStrategy.stop()
+                       ).build();
+               }
+
+               @Override
+               public boolean loggingEnabled() {
+                       return false;
+               }
+
+               @Override
+               public void handleChildTerminated(akka.actor.ActorContext 
context, ActorRef child, Iterable<ActorRef> children) {
+                       akkaRpcActorTerminated(child);
+               }
+
+               @Override
+               public void processFailure(akka.actor.ActorContext context, 
boolean restart, ActorRef child, Throwable cause, ChildRestartStats stats, 
Iterable<ChildRestartStats> children) {
+                       Preconditions.checkArgument(!restart, "The supervisor 
strategy should never restart an actor.");
+
+                       akkaRpcActorFailed(child, cause);
+               }
+       }
+
+       private static final class AkkaRpcActorRegistration {
+               private final String endpointId;
+
+               private final CompletableFuture<Void> internalTerminationFuture;
+
+               private final CompletableFuture<Void> externalTerminationFuture;
+
+               @Nullable
+               private Throwable errorCause;
+
+               private AkkaRpcActorRegistration(String endpointId) {
+                       this.endpointId = endpointId;
+                       internalTerminationFuture = new CompletableFuture<>();
+                       externalTerminationFuture = new CompletableFuture<>();
+                       errorCause = null;
+               }
+
+               private CompletableFuture<Void> getInternalTerminationFuture() {
+                       return internalTerminationFuture;
+               }
+
+               private CompletableFuture<Void> getExternalTerminationFuture() {
+                       return externalTerminationFuture;
+               }
+
+               private String getEndpointId() {
+                       return endpointId;
+               }
+
+               private void terminate() {
+                       CompletableFuture<Void> terminationFuture = 
internalTerminationFuture;
+
+                       if (errorCause != null) {
+                               if 
(!internalTerminationFuture.completeExceptionally(errorCause)) {
+                                       // we have another failure reason -> 
let's add it
+                                       terminationFuture = 
internalTerminationFuture.handle(
+                                               (ignored, throwable) -> {
+                                                       if (throwable != null) {
+                                                               
errorCause.addSuppressed(throwable);
+                                                       }
+
+                                                       throw new 
CompletionException(errorCause);
+                                               });
+                               }
+                       } else {
+                               internalTerminationFuture.completeExceptionally(
+                                       new AkkaRpcException(
+                                               String.format("RpcEndpoint %s 
did not complete the internal termination future.", endpointId)));
+                       }
+
+                       FutureUtils.forward(terminationFuture, 
externalTerminationFuture);
+               }
+
+               private void terminateExceptionally(Throwable cause) {
+                       externalTerminationFuture.completeExceptionally(cause);
+               }
+
+               public void markFailed(Throwable cause) {
+                       if (errorCause == null) {
+                               errorCause = cause;
+                       } else {
+                               errorCause.addSuppressed(cause);
+                       }
+               }
+       }
+
+       // 
-----------------------------------------------------------------------------
+       // Messages
+       // 
-----------------------------------------------------------------------------
+
+       static final class StartAkkaRpcActor {
+               private final PropsFactory propsFactory;
+               private final String endpointId;
+
+               private StartAkkaRpcActor(PropsFactory propsFactory, String 
endpointId) {
+                       this.propsFactory = propsFactory;
+                       this.endpointId = endpointId;
+               }
+
+               public String getEndpointId() {
+                       return endpointId;
+               }
+
+               public PropsFactory getPropsFactory() {
+                       return propsFactory;
+               }
+
+               private static StartAkkaRpcActor create(PropsFactory 
propsFactory, String endpointId) {
+                       return new StartAkkaRpcActor(propsFactory, endpointId);
+               }
+
+               interface PropsFactory {
+                       Props create(CompletableFuture<Void> terminationFuture);
+               }
+       }
+
+       static final class ActorRegistration {
+               private final ActorRef actorRef;
+               private final CompletableFuture<Void> terminationFuture;
+
+               private ActorRegistration(ActorRef actorRef, 
CompletableFuture<Void> terminationFuture) {
+                       this.actorRef = actorRef;
+                       this.terminationFuture = terminationFuture;
+               }
+
+               public ActorRef getActorRef() {
+                       return actorRef;
+               }
+
+               public CompletableFuture<Void> getTerminationFuture() {
+                       return terminationFuture;
+               }
+
+               public static ActorRegistration create(ActorRef actorRef, 
CompletableFuture<Void> terminationFuture) {
+                       return new ActorRegistration(actorRef, 
terminationFuture);
+               }
+       }
+
+       static final class StartAkkaRpcActorResponse {
+               @Nullable
+               private final ActorRegistration actorRegistration;
+
+               @Nullable
+               private final Throwable error;
+
+               private StartAkkaRpcActorResponse(@Nullable ActorRegistration 
actorRegistration, @Nullable Throwable error) {
+                       this.actorRegistration = actorRegistration;
+                       this.error = error;
+               }
+
+               public <X extends Throwable> ActorRegistration 
orElseThrow(Function<? super Throwable, ? extends X> throwableFunction) throws 
X {
+                       if (actorRegistration != null) {
+                               return actorRegistration;
+                       } else {
+                               throw throwableFunction.apply(error);
+                       }
+               }
+
+               public static StartAkkaRpcActorResponse 
success(ActorRegistration actorRegistration) {
+                       return new StartAkkaRpcActorResponse(actorRegistration, 
null);
+               }
+
+               public static StartAkkaRpcActorResponse failure(Throwable 
error) {
+                       return new StartAkkaRpcActorResponse(null, error);
+               }
+       }
+}
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index efa15c2..adf0ebd 100755
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -273,7 +273,7 @@ object AkkaUtils {
 
     val logLevel = getLogLevel
 
-    val supervisorStrategy = 
classOf[StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy]
+    val supervisorStrategy = classOf[EscalatingSupervisorStrategy]
       .getCanonicalName
 
     val config =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaActorSystemTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaActorSystemTest.java
new file mode 100644
index 0000000..6d3830a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaActorSystemTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.AbstractActor;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Terminated;
+import akka.japi.pf.ReceiveBuilder;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Tests for the {@link akka.actor.ActorSystem} instantiated through {@link 
AkkaUtils}.
+ */
+public class AkkaActorSystemTest extends TestLogger {
+
+       @Test
+       public void shutsDownOnActorFailure() {
+               final ActorSystem actorSystem = 
AkkaUtils.createLocalActorSystem(new Configuration());
+
+               try {
+                       final CompletableFuture<Terminated> terminationFuture = 
actorSystem.getWhenTerminated().toCompletableFuture();
+                       final ActorRef actorRef = 
actorSystem.actorOf(Props.create(SimpleActor.class));
+
+                       final FlinkException cause = new FlinkException("Flink 
test exception");
+
+                       actorRef.tell(Fail.exceptionally(cause), 
ActorRef.noSender());
+
+                       // make sure that the ActorSystem shuts down
+                       terminationFuture.join();
+               } finally {
+                       AkkaUtils.terminateActorSystem(actorSystem).join();
+               }
+       }
+
+       private static final class SimpleActor extends AbstractActor {
+
+               @Override
+               public Receive createReceive() {
+                       return ReceiveBuilder.create()
+                               .match(Fail.class, this::handleFail)
+                               .build();
+               }
+
+               private void handleFail(Fail fail) {
+                       throw new RuntimeException(fail.getErrorCause());
+               }
+       }
+
+       private static final class Fail {
+               private final Throwable errorCause;
+
+               private Fail(Throwable errorCause) {
+                       this.errorCause = errorCause;
+               }
+
+               private Throwable getErrorCause() {
+                       return errorCause;
+               }
+
+               private static Fail exceptionally(Throwable errorCause) {
+                       return new Fail(errorCause);
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 64270c4..404f5c1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -42,6 +42,8 @@ import org.hamcrest.core.Is;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -52,6 +54,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -64,6 +67,8 @@ import static org.junit.Assert.fail;
  */
 public class AkkaRpcActorTest extends TestLogger {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(AkkaRpcActorTest.class);
+
        // 
------------------------------------------------------------------------
        //  shared test members
        // 
------------------------------------------------------------------------
@@ -428,6 +433,23 @@ public class AkkaRpcActorTest extends TestLogger {
                }
        }
 
+       @Test
+       public void canReuseEndpointNameAfterTermination() throws Exception {
+               final String endpointName = "not_unique";
+               try (SimpleRpcEndpoint simpleRpcEndpoint1 = new 
SimpleRpcEndpoint(akkaRpcService, endpointName)) {
+
+                       simpleRpcEndpoint1.start();
+
+                       simpleRpcEndpoint1.closeAsync().join();
+
+                       try (SimpleRpcEndpoint simpleRpcEndpoint2 = new 
SimpleRpcEndpoint(akkaRpcService, endpointName)) {
+                               simpleRpcEndpoint2.start();
+
+                               assertThat(simpleRpcEndpoint2.getAddress(), 
is(equalTo(simpleRpcEndpoint1.getAddress())));
+                       }
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Test Actors and Interfaces
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SupervisorActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SupervisorActorTest.java
new file mode 100644
index 0000000..ecf68ee
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SupervisorActorTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.AbstractActor;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Terminated;
+import akka.japi.pf.ReceiveBuilder;
+import org.junit.Rule;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link SupervisorActor}.
+ */
+public class SupervisorActorTest extends TestLogger {
+
+       @Rule
+       public final ActorSystemResource actorSystemResource = 
ActorSystemResource.defaultConfiguration();
+
+       @Test
+       public void completesTerminationFutureIfActorStops() {
+               final ActorSystem actorSystem = 
actorSystemResource.getActorSystem();
+
+               final ActorRef supervisor = 
SupervisorActor.startSupervisorActor(actorSystem);
+
+               final SupervisorActor.ActorRegistration actorRegistration = 
startAkkaRpcActor(supervisor, "foobar");
+
+               final CompletableFuture<Void> terminationFuture = 
actorRegistration.getTerminationFuture();
+               assertThat(terminationFuture.isDone(), is(false));
+
+               
actorRegistration.getActorRef().tell(TerminateWithFutureCompletion.normal(), 
ActorRef.noSender());
+
+               terminationFuture.join();
+       }
+
+       @Test
+       public void 
completesTerminationFutureExceptionallyIfActorStopsExceptionally() throws 
Exception {
+               final ActorSystem actorSystem = 
actorSystemResource.getActorSystem();
+
+               final ActorRef supervisor = 
SupervisorActor.startSupervisorActor(actorSystem);
+
+               final SupervisorActor.ActorRegistration actorRegistration = 
startAkkaRpcActor(supervisor, "foobar");
+
+               final CompletableFuture<Void> terminationFuture = 
actorRegistration.getTerminationFuture();
+               assertThat(terminationFuture.isDone(), is(false));
+
+               final FlinkException cause = new FlinkException("Test cause.");
+               
actorRegistration.getActorRef().tell(TerminateWithFutureCompletion.exceptionally(cause),
 ActorRef.noSender());
+
+               try {
+                       terminationFuture.get();
+                       fail("Expected the termination future being completed 
exceptionally");
+               } catch (ExecutionException expected) {
+                       ExceptionUtils.findThrowable(expected, e -> 
e.equals(cause))
+                               .orElseThrow(() -> new 
FlinkException("Unexpected exception", expected));
+               }
+       }
+
+       @Test
+       public void 
completesTerminationFutureExceptionallyIfActorStopsWithoutReason() throws 
InterruptedException {
+               final ActorSystem actorSystem = 
actorSystemResource.getActorSystem();
+
+               final ActorRef supervisor = 
SupervisorActor.startSupervisorActor(actorSystem);
+
+               final SupervisorActor.ActorRegistration actorRegistration = 
startAkkaRpcActor(supervisor, "foobar");
+
+               final CompletableFuture<Void> terminationFuture = 
actorRegistration.getTerminationFuture();
+               assertThat(terminationFuture.isDone(), is(false));
+
+               actorRegistration.getActorRef().tell(Terminate.INSTANCE, 
ActorRef.noSender());
+
+               try {
+                       terminationFuture.get();
+                       fail("Expected the termination future being completed 
exceptionally");
+               } catch (ExecutionException expected) {}
+       }
+
+       @Test
+       public void completesTerminationFutureExceptionallyIfActorFails() 
throws Exception {
+               final ActorSystem actorSystem = 
actorSystemResource.getActorSystem();
+
+               final ActorRef supervisor = 
SupervisorActor.startSupervisorActor(actorSystem);
+
+               final SupervisorActor.ActorRegistration actorRegistration = 
startAkkaRpcActor(supervisor, "foobar");
+
+               final CompletableFuture<Void> terminationFuture = 
actorRegistration.getTerminationFuture();
+               assertThat(terminationFuture.isDone(), is(false));
+
+               final CompletableFuture<Terminated> 
actorSystemTerminationFuture = 
actorSystem.getWhenTerminated().toCompletableFuture();
+
+               final FlinkException cause = new FlinkException("Test cause.");
+               actorRegistration.getActorRef().tell(Fail.exceptionally(cause), 
ActorRef.noSender());
+
+               try {
+                       terminationFuture.get();
+                       fail("Expected the termination future being completed 
exceptionally");
+               } catch (ExecutionException expected) {
+                       ExceptionUtils.findThrowable(expected, e -> 
e.equals(cause))
+                               .orElseThrow(() -> new 
FlinkException("Unexpected exception", expected));
+               }
+
+               // make sure that the supervisor actor has stopped --> 
terminating the actor system
+               actorSystemTerminationFuture.join();
+       }
+
+       @Test
+       public void completesTerminationFutureOfSiblingsIfActorFails() throws 
Exception {
+               final ActorSystem actorSystem = 
actorSystemResource.getActorSystem();
+
+               final ActorRef supervisor = 
SupervisorActor.startSupervisorActor(actorSystem);
+
+               final SupervisorActor.ActorRegistration actorRegistration1 = 
startAkkaRpcActor(supervisor, "foobar1");
+               final SupervisorActor.ActorRegistration actorRegistration2 = 
startAkkaRpcActor(supervisor, "foobar2");
+
+               final CompletableFuture<Void> terminationFuture = 
actorRegistration2.getTerminationFuture();
+               assertThat(terminationFuture.isDone(), is(false));
+
+               final FlinkException cause = new FlinkException("Test cause.");
+               
actorRegistration1.getActorRef().tell(Fail.exceptionally(cause), 
ActorRef.noSender());
+
+               try {
+                       terminationFuture.get();
+                       fail("Expected the termination future being completed 
exceptionally");
+               } catch (ExecutionException expected) {}
+       }
+
+       private SupervisorActor.ActorRegistration startAkkaRpcActor(ActorRef 
supervisor, String endpointId) {
+               final SupervisorActor.StartAkkaRpcActorResponse startResponse = 
SupervisorActor.startAkkaRpcActor(
+                       supervisor,
+                       terminationFuture -> Props.create(SimpleActor.class, 
terminationFuture),
+                       endpointId);
+
+               return startResponse.orElseThrow(cause -> new 
AssertionError("Expected the start to succeed.", cause));
+       }
+
+       private static final class SimpleActor extends AbstractActor {
+
+               private final CompletableFuture<Void> terminationFuture;
+
+               private SimpleActor(CompletableFuture<Void> terminationFuture) {
+                       this.terminationFuture = terminationFuture;
+               }
+
+               @Override
+               public Receive createReceive() {
+                       return ReceiveBuilder.create()
+                               .match(Terminate.class, this::terminate)
+                               .match(TerminateWithFutureCompletion.class, 
this::terminateActorWithFutureCompletion)
+                               .match(Fail.class, this::fail)
+                               .build();
+               }
+
+               private void fail(Fail fail) {
+                       throw new RuntimeException(fail.getCause());
+               }
+
+               private void terminate(Terminate terminate) {
+                       terminateActor();
+               }
+
+               private void terminateActor() {
+                       getContext().stop(getSelf());
+               }
+
+               private void 
terminateActorWithFutureCompletion(TerminateWithFutureCompletion 
terminateWithFutureCompletion) {
+                       final Throwable terminationError = 
terminateWithFutureCompletion.getTerminationError();
+                       if (terminationError == null) {
+                               terminationFuture.complete(null);
+                       } else {
+                               
terminationFuture.completeExceptionally(terminationError);
+                       }
+
+                       terminateActor();
+               }
+       }
+
+       private static final class Terminate {
+               private static final Terminate INSTANCE = new Terminate();
+       }
+
+       private static final class TerminateWithFutureCompletion {
+               @Nullable
+               private final Throwable terminationError;
+
+               private TerminateWithFutureCompletion(@Nullable Throwable 
terminationError) {
+                       this.terminationError = terminationError;
+               }
+
+               @Nullable
+               private Throwable getTerminationError() {
+                       return terminationError;
+               }
+
+               private static TerminateWithFutureCompletion normal() {
+                       return new TerminateWithFutureCompletion(null);
+               }
+
+               private static TerminateWithFutureCompletion 
exceptionally(Throwable cause) {
+                       return new TerminateWithFutureCompletion(cause);
+               }
+       }
+
+       private static final class Fail {
+               private final Throwable cause;
+
+               private Fail(Throwable cause) {
+                       this.cause = cause;
+               }
+
+               private Throwable getCause() {
+                       return cause;
+               }
+
+               private static Fail exceptionally(Throwable cause) {
+                       return new Fail(cause);
+               }
+       }
+}

Reply via email to