Repository: flink
Updated Branches:
  refs/heads/master 753763ad1 -> bdf5c486f


[FLINK-3347] [akka] Add QuarantineMonitor which shuts a quarantined actor 
system and JVM down

The QuarantineMonitor subscribes to the actor system's event bus and listens to
AssociationErrorEvents. These are the events which are generated when the actor 
system
has quarantined another actor system or if it has been quarantined by another 
actor
system. In case of the quarantined state, the actor system will be shutdown 
killing
all actors and then the JVM is terminated.

Disable the quarantine monitor per default

This closes #2696.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bdf5c486
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bdf5c486
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bdf5c486

Branch: refs/heads/master
Commit: bdf5c486f50b262dc09678a6c15790c54aa85240
Parents: 753763a
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Oct 27 00:24:12 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri Apr 28 11:50:54 2017 +0200

----------------------------------------------------------------------
 .../flink/configuration/TaskManagerOptions.java |   9 +
 .../runtime/akka/DefaultQuarantineHandler.java  |  76 +++++
 .../flink/runtime/akka/QuarantineHandler.java   |  46 +++
 .../flink/runtime/akka/QuarantineMonitor.java   | 100 ++++++
 .../flink/runtime/taskmanager/TaskManager.scala |  20 +-
 .../runtime/akka/QuarantineMonitorTest.java     | 326 +++++++++++++++++++
 6 files changed, 572 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bdf5c486/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 777ee21..e915c0b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -156,6 +156,15 @@ public class TaskManagerOptions {
                        key("task.checkpoint.alignment.max-size")
                        .defaultValue(-1L);
 
+       /**
+        * Whether the quarantine monitor for task managers shall be started. 
The quarantine monitor
+        * shuts down the actor system if it detects that it has quarantined 
another actor system
+        * or if it has been quarantined by another actor system.
+        */
+       public static final ConfigOption<Boolean> EXIT_ON_FATAL_AKKA_ERROR =
+               key("taskmanager.exit-on-fatal-akka-error")
+               .defaultValue(false);
+
        // 
------------------------------------------------------------------------
 
        /** Not intended to be instantiated */

http://git-wip-us.apache.org/repos/asf/flink/blob/bdf5c486/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
new file mode 100644
index 0000000..378cb25
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Default quarantine handler which logs the quarantining events, then shuts 
down the given
+ * actor system by sending Kill to all actors and then shutting the JVM down 
with the given
+ * exit code.
+ */
+public class DefaultQuarantineHandler implements QuarantineHandler {
+
+       private final FiniteDuration timeout;
+       private final int exitCode;
+       private final Logger log;
+
+       public DefaultQuarantineHandler(Time timeout, int exitCode, Logger log) 
{
+               Preconditions.checkNotNull(timeout);
+               this.timeout = new FiniteDuration(timeout.getSize(), 
timeout.getUnit());
+               this.exitCode = exitCode;
+               this.log = Preconditions.checkNotNull(log);
+       }
+
+       @Override
+       public void wasQuarantinedBy(String remoteSystem, ActorSystem 
actorSystem) {
+               Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
+               log.error("The actor system {} has been quarantined by {}. 
Shutting the actor system " +
+                       "down to be able to reestablish a connection!", 
actorSystemAddress, remoteSystem);
+
+               shutdownActorSystem(actorSystem);
+       }
+
+       @Override
+       public void hasQuarantined(String remoteSystem, ActorSystem 
actorSystem) {
+               Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
+               log.error("The actor system {} has quarantined the remote actor 
system {}. Shutting " +
+                       "the actor system down to be able to reestablish a 
connection!", actorSystemAddress, remoteSystem);
+
+               shutdownActorSystem(actorSystem);
+       }
+
+       private void shutdownActorSystem(ActorSystem actorSystem) {
+               // shut the actor system down
+               actorSystem.shutdown();
+
+               try {
+                       // give it some time to complete the shutdown
+                       actorSystem.awaitTermination(timeout);
+               } finally {
+                       // now let's crash the JVM
+                       System.exit(exitCode);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdf5c486/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
new file mode 100644
index 0000000..21623e8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import akka.actor.ActorSystem;
+
+/**
+ * Callback interface for the {@link QuarantineMonitor} which is called in 
case the actor system
+ * has been quarantined or quarantined another system.
+ */
+public interface QuarantineHandler {
+
+       /**
+        * Callback when the given actor system was quarantined by the given 
remote actor system.
+        *
+        * @param remoteSystem is the address of the remote actor system which 
has quarantined this
+        *                     actor system
+        * @param actorSystem which has been quarantined
+        */
+       void wasQuarantinedBy(final String remoteSystem, final ActorSystem 
actorSystem);
+
+       /**
+        * Callback when the given actor system has quarantined the given 
remote actor system.
+        *
+        * @param remoteSystem is the address of the remote actor system which 
has been quarantined
+        *                     by our actor system
+        * @param actorSystem which has quarantined the other actor system
+        */
+       void hasQuarantined(final String remoteSystem, final ActorSystem 
actorSystem);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdf5c486/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
new file mode 100644
index 0000000..de82f29
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import akka.actor.UntypedActor;
+import akka.remote.AssociationErrorEvent;
+import akka.remote.transport.Transport;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * The quarantine monitor subscribes to the event bus of the actor system in 
which it was started.
+ * It listens to {@link AssociationErrorEvent} which contain information if we 
got quarantined
+ * or quarantine another remote actor system. If the actor detects that the 
actor system has been
+ * quarantined or quarantined another system, then the {@link 
QuarantineHandler} is called.
+ *
+ * IMPORTANT: The implementation if highly specific for Akka 2.3.7. With 
different version the
+ * quarantine state might be detected differently.
+ */
+public class QuarantineMonitor extends UntypedActor {
+
+       private static final Pattern pattern = Pattern.compile("^Invalid 
address:\\s+(.*)$");
+
+       private static final String QUARANTINE_MSG = "The remote system has a 
UID that has been quarantined. Association aborted.";
+       private static final String QUARANTINED_MSG = "The remote system has 
quarantined this system. No further associations to the remote system are 
possible until this system is restarted.";
+
+       private final QuarantineHandler handler;
+       private final Logger log;
+
+       public QuarantineMonitor(QuarantineHandler handler, Logger log) {
+               this.handler = Preconditions.checkNotNull(handler);
+               this.log = Preconditions.checkNotNull(log);
+       }
+
+       @Override
+       public void preStart() {
+               getContext().system().eventStream().subscribe(getSelf(), 
AssociationErrorEvent.class);
+       }
+
+       @Override
+       public void onReceive(Object message) throws Exception {
+               if (message instanceof AssociationErrorEvent) {
+                       AssociationErrorEvent associationErrorEvent = 
(AssociationErrorEvent) message;
+
+                       // IMPORTANT: The check for the quarantining event is 
highly specific to Akka 2.3.7
+                       // and can change with a different Akka version.
+                       // It assumes the following:
+                       // 
AssociationErrorEvent(InvalidAssociation(InvalidAssociationException(QUARANTINE(D)_MSG))
+                       if (associationErrorEvent.getCause() != null) {
+                               Throwable invalidAssociation = 
associationErrorEvent.getCause();
+                               Matcher matcher = 
pattern.matcher(invalidAssociation.getMessage());
+
+                               final String remoteSystem;
+
+                               if (matcher.find()) {
+                                       remoteSystem = matcher.group(1);
+                               } else {
+                                       remoteSystem = "Unknown";
+                               }
+
+                               if (invalidAssociation.getCause() instanceof 
Transport.InvalidAssociationException) {
+                                       Transport.InvalidAssociationException 
invalidAssociationException = (Transport.InvalidAssociationException) 
invalidAssociation.getCause();
+
+                                       // don't hate the player, hate the 
game! That's the only way to find out if we
+                                       // got quarantined or quarantined 
another actor system in Akka 2.3.7
+                                       if 
(QUARANTINE_MSG.equals(invalidAssociationException.getMessage())) {
+                                               
handler.hasQuarantined(remoteSystem, getContext().system());
+                                       } else if 
(QUARANTINED_MSG.equals(invalidAssociationException.getMessage())) {
+                                               
handler.wasQuarantinedBy(remoteSystem, getContext().system());
+                                       } else {
+                                               log.debug("The invalid 
association exception's message could not be matched.", associationErrorEvent);
+                                       }
+                               } else {
+                                       log.debug("The association error 
event's root cause is not of type {}.", 
Transport.InvalidAssociationException.class.getSimpleName(), 
associationErrorEvent);
+                               }
+                       } else {
+                               log.debug("Received association error event 
which did not contain a cause.", associationErrorEvent);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdf5c486/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 2e8a6fa..97e55f0 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -28,15 +28,13 @@ import java.util.{Collections, UUID}
 import _root_.akka.actor._
 import _root_.akka.pattern.ask
 import _root_.akka.util.Timeout
-
 import grizzled.slf4j.Logger
-
 import org.apache.commons.lang3.exception.ExceptionUtils
-
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration._
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
-import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.akka.{AkkaUtils, DefaultQuarantineHandler, 
QuarantineMonitor}
 import org.apache.flink.runtime.blob.{BlobCache, BlobClient, BlobService}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
@@ -66,7 +64,7 @@ import org.apache.flink.runtime.metrics.{MetricRegistry => 
FlinkMetricRegistry}
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
-import org.apache.flink.runtime.taskexecutor.{TaskManagerServices, 
TaskManagerServicesConfiguration, TaskManagerConfiguration}
+import org.apache.flink.runtime.taskexecutor.{TaskManagerConfiguration, 
TaskManagerServices, TaskManagerServicesConfiguration}
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, 
LogMessages}
 
@@ -1794,6 +1792,18 @@ object TaskManager {
         Props(classOf[ProcessReaper], taskManager, LOG.logger, 
RUNTIME_FAILURE_RETURN_CODE),
         "TaskManager_Process_Reaper")
 
+      if 
(configuration.getBoolean(TaskManagerOptions.EXIT_ON_FATAL_AKKA_ERROR)) {
+        val quarantineHandler = new DefaultQuarantineHandler(
+          Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis),
+          RUNTIME_FAILURE_RETURN_CODE,
+          LOG.logger)
+
+        LOG.debug("Starting TaskManager quarantine monitor")
+        taskManagerSystem.actorOf(
+          Props(classOf[QuarantineMonitor], quarantineHandler, LOG.logger)
+        )
+      }
+
       // if desired, start the logging daemon that periodically logs the
       // memory usage information
       if (LOG.isInfoEnabled && configuration.getBoolean(

http://git-wip-us.apache.org/repos/asf/flink/blob/bdf5c486/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
new file mode 100644
index 0000000..97309a4
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.dispatch.OnComplete;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public class QuarantineMonitorTest extends TestLogger {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(QuarantineMonitorTest.class);
+
+       private static final FiniteDuration zeroDelay = new FiniteDuration(0L, 
TimeUnit.SECONDS);
+
+       // we need two actor systems because we're quarantining one of them
+       private static ActorSystem actorSystem1;
+       private ActorSystem actorSystem2;
+
+       @BeforeClass
+       public static void setup() {
+               Properties properties = new Properties();
+               
properties.setProperty("akka.remote.watch-failure-detector.threshold", 
"0.00001");
+               
properties.setProperty("akka.remote.watch-failure-detector.heartbeat-interval", 
"1 ms");
+               
properties.setProperty("akka.remote.watch-failure-detector.acceptable-heartbeat-pause",
 "1 ms");
+               Config deathWatch = ConfigFactory.parseProperties(properties);
+               Config defaultConfig = AkkaUtils.getDefaultAkkaConfig();
+
+               actorSystem1 = 
AkkaUtils.createActorSystem(deathWatch.withFallback(defaultConfig));
+       }
+
+       @AfterClass
+       public static void tearDown() {
+               if (actorSystem1 != null) {
+                       actorSystem1.shutdown();
+                       actorSystem1.awaitTermination();
+               }
+       }
+
+       @Before
+       public void setupTest() {
+               actorSystem2 = AkkaUtils.createDefaultActorSystem();
+       }
+
+       @After
+       public void tearDownTest() {
+               if (actorSystem2 != null) {
+                       actorSystem2.shutdown();
+                       actorSystem2.awaitTermination();
+               }
+       }
+
+       /**
+        * Tests that the quarantine monitor detects if an actor system has 
been quarantined by another
+        * actor system.
+        */
+       @Test(timeout = 5000L)
+       public void testWatcheeQuarantined() throws ExecutionException, 
InterruptedException {
+               TestingQuarantineHandler quarantineHandler = new 
TestingQuarantineHandler();
+
+               ActorRef watchee = null;
+               ActorRef watcher = null;
+               ActorRef monitor = null;
+
+               FiniteDuration timeout = new FiniteDuration(5, 
TimeUnit.SECONDS);
+               FiniteDuration interval = new FiniteDuration(200, 
TimeUnit.MILLISECONDS);
+
+               try {
+                       // start the quarantine monitor in the watchee actor 
system
+                       monitor = 
actorSystem2.actorOf(getQuarantineMonitorProps(quarantineHandler), 
"quarantineMonitor");
+
+                       watchee = actorSystem2.actorOf(getWatcheeProps(timeout, 
interval, quarantineHandler), "watchee");
+                       watcher = actorSystem1.actorOf(getWatcherProps(timeout, 
interval, quarantineHandler), "watcher");
+
+                       final Address actorSystem1Address = 
AkkaUtils.getAddress(actorSystem1);
+                       final String watcheeAddress = 
AkkaUtils.getAkkaURL(actorSystem2, watchee);
+                       final String watcherAddress = 
AkkaUtils.getAkkaURL(actorSystem1, watcher);
+
+                       // ping the watcher continuously
+                       watchee.tell(new Ping(watcherAddress), 
ActorRef.noSender());
+                       // start watching the watchee
+                       watcher.tell(new Watch(watcheeAddress), 
ActorRef.noSender());
+
+                       Future<String> quarantineFuture = 
quarantineHandler.getWasQuarantinedByFuture();
+
+                       Assert.assertEquals(actorSystem1Address.toString(), 
quarantineFuture.get());
+               } finally {
+                       TestingUtils.stopActor(watchee);
+                       TestingUtils.stopActor(watcher);
+                       TestingUtils.stopActor(monitor);
+               }
+       }
+
+       /**
+        * Tests that the quarantine monitor detects if an actor system 
quarantines another actor
+        * system.
+        */
+       @Test(timeout = 5000L)
+       public void testWatcherQuarantining() throws ExecutionException, 
InterruptedException {
+               TestingQuarantineHandler quarantineHandler = new 
TestingQuarantineHandler();
+
+               ActorRef watchee = null;
+               ActorRef watcher = null;
+               ActorRef monitor = null;
+
+               FiniteDuration timeout = new FiniteDuration(5, 
TimeUnit.SECONDS);
+               FiniteDuration interval = new FiniteDuration(200, 
TimeUnit.MILLISECONDS);
+
+               try {
+                       // start the quarantine monitor in the watcher actor 
system
+                       monitor = 
actorSystem1.actorOf(getQuarantineMonitorProps(quarantineHandler), 
"quarantineMonitor");
+
+                       watchee = actorSystem2.actorOf(getWatcheeProps(timeout, 
interval, quarantineHandler), "watchee");
+                       watcher = actorSystem1.actorOf(getWatcherProps(timeout, 
interval, quarantineHandler), "watcher");
+
+                       final Address actorSystem1Address = 
AkkaUtils.getAddress(actorSystem2);
+                       final String watcheeAddress = 
AkkaUtils.getAkkaURL(actorSystem2, watchee);
+                       final String watcherAddress = 
AkkaUtils.getAkkaURL(actorSystem1, watcher);
+
+                       // ping the watcher continuously
+                       watchee.tell(new Ping(watcherAddress), 
ActorRef.noSender());
+                       // start watching the watchee
+                       watcher.tell(new Watch(watcheeAddress), 
ActorRef.noSender());
+
+                       Future<String> quarantineFuture = 
quarantineHandler.getHasQuarantinedFuture();
+
+                       Assert.assertEquals(actorSystem1Address.toString(), 
quarantineFuture.get());
+               } finally {
+                       TestingUtils.stopActor(watchee);
+                       TestingUtils.stopActor(watcher);
+                       TestingUtils.stopActor(monitor);
+               }
+       }
+
+       private static class TestingQuarantineHandler implements 
QuarantineHandler, ErrorHandler {
+
+               private final CompletableFuture<String> wasQuarantinedByFuture;
+               private final CompletableFuture<String> hasQuarantinedFuture;
+
+               public TestingQuarantineHandler() {
+                       this.wasQuarantinedByFuture = new 
FlinkCompletableFuture<>();
+                       this.hasQuarantinedFuture = new 
FlinkCompletableFuture<>();
+               }
+
+               @Override
+               public void wasQuarantinedBy(String remoteSystem, ActorSystem 
actorSystem) {
+                       wasQuarantinedByFuture.complete(remoteSystem);
+               }
+
+               @Override
+               public void hasQuarantined(String remoteSystem, ActorSystem 
actorSystem) {
+                       hasQuarantinedFuture.complete(remoteSystem);
+               }
+
+               public Future<String> getWasQuarantinedByFuture() {
+                       return wasQuarantinedByFuture;
+               }
+
+               public Future<String> getHasQuarantinedFuture() {
+                       return hasQuarantinedFuture;
+               }
+
+               @Override
+               public void handleError(Throwable failure) {
+                       wasQuarantinedByFuture.completeExceptionally(failure);
+                       hasQuarantinedFuture.completeExceptionally(failure);
+               }
+       }
+
+       private interface ErrorHandler {
+               void handleError(Throwable failure);
+       }
+
+       static class Watcher extends UntypedActor {
+
+               private final FiniteDuration timeout;
+               private final FiniteDuration interval;
+               private final ErrorHandler errorHandler;
+
+               Watcher(FiniteDuration timeout, FiniteDuration interval, 
ErrorHandler errorHandler) {
+                       this.timeout = Preconditions.checkNotNull(timeout);
+                       this.interval = Preconditions.checkNotNull(interval);
+                       this.errorHandler = 
Preconditions.checkNotNull(errorHandler);
+               }
+
+               @Override
+               public void onReceive(Object message) throws Exception {
+                       if (message instanceof Watch) {
+                               Watch watch = (Watch) message;
+
+                               
getContext().actorSelection(watch.getTarget()).resolveOne(timeout).onComplete(new
 OnComplete<ActorRef>() {
+                                       @Override
+                                       public void onComplete(Throwable 
failure, ActorRef success) throws Throwable {
+                                               if (success != null) {
+                                                       
getContext().watch(success);
+                                                       // constantly ping the 
watchee
+                                                       
getContext().system().scheduler().schedule(
+                                                               zeroDelay,
+                                                               interval,
+                                                               success,
+                                                               "Watcher 
message",
+                                                               
getContext().dispatcher(),
+                                                               getSelf());
+                                               } else {
+                                                       
errorHandler.handleError(failure);
+                                               }
+                                       }
+                               }, getContext().dispatcher());
+                       }
+               }
+       }
+
+       static class Watchee extends UntypedActor {
+
+               private final FiniteDuration timeout;
+               private final FiniteDuration interval;
+               private final ErrorHandler errorHandler;
+
+               Watchee(FiniteDuration timeout, FiniteDuration interval, 
ErrorHandler errorHandler) {
+                       this.timeout = Preconditions.checkNotNull(timeout);
+                       this.interval = Preconditions.checkNotNull(interval);
+                       this.errorHandler = 
Preconditions.checkNotNull(errorHandler);
+               }
+
+               @Override
+               public void onReceive(Object message) throws Exception {
+                       if (message instanceof Ping) {
+                               final Ping ping = (Ping) message;
+
+                               
getContext().actorSelection(ping.getTarget()).resolveOne(timeout).onComplete(new
 OnComplete<ActorRef>() {
+                                       @Override
+                                       public void onComplete(Throwable 
failure, ActorRef success) throws Throwable {
+                                               if (success != null) {
+                                                       // constantly ping the 
target
+                                                       
getContext().system().scheduler().schedule(
+                                                               zeroDelay,
+                                                               interval,
+                                                               success,
+                                                               "Watchee 
message",
+                                                               
getContext().dispatcher(),
+                                                               getSelf());
+                                               } else {
+                                                       
errorHandler.handleError(failure);
+                                               }
+                                       }
+                               }, getContext().dispatcher());
+                       }
+               }
+       }
+
+       static class Watch {
+               private final String target;
+
+               Watch(String target) {
+                       this.target = target;
+               }
+
+               public String getTarget() {
+                       return target;
+               }
+       }
+
+       static class Ping {
+               private final String target;
+
+               Ping(String target) {
+                       this.target = target;
+               }
+
+               public String getTarget() {
+                       return target;
+               }
+       }
+
+       static Props getWatcheeProps(FiniteDuration timeout, FiniteDuration 
interval, ErrorHandler errorHandler) {
+               return Props.create(Watchee.class, timeout, interval, 
errorHandler);
+       }
+
+       static Props getWatcherProps(FiniteDuration timeout, FiniteDuration 
interval, ErrorHandler errorHandler) {
+               return Props.create(Watcher.class, timeout, interval, 
errorHandler);
+       }
+
+       static Props getQuarantineMonitorProps(QuarantineHandler handler) {
+               return Props.create(QuarantineMonitor.class, handler, LOG);
+       }
+
+}

Reply via email to