Repository: flink Updated Branches: refs/heads/release-1.1 62c666f57 -> caa0fbb21
[FLINK-4672] [taskmanager] Do not decorate Actor Kill messages Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/caa0fbb2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/caa0fbb2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/caa0fbb2 Branch: refs/heads/release-1.1 Commit: caa0fbb2157de56c9bdc4bbf8aedb73df90edede Parents: 62c666f Author: Stephan Ewen <se...@apache.org> Authored: Fri Sep 23 18:42:47 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Sep 26 11:25:01 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/taskmanager/TaskManager.scala | 2 +- .../runtime/taskmanager/TaskManagerTest.java | 33 ++++++++++++++++++-- 2 files changed, 31 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/caa0fbb2/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index a7dd789..8e787bb 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1333,7 +1333,7 @@ class TaskManager( "\n" + "A fatal error occurred, forcing the TaskManager to shut down: " + message, cause) - self ! decorateMessage(Kill) + self ! Kill } override def notifyLeaderAddress(leaderAddress: String, leaderSessionID: UUID): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/caa0fbb2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index ce88c09..1c50265 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.Kill; import akka.actor.Props; import akka.japi.Creator; import akka.testkit.JavaTestKit; @@ -55,6 +56,7 @@ import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackT import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleSuccess; import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample; import org.apache.flink.runtime.messages.TaskManagerMessages; +import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError; import org.apache.flink.runtime.messages.TaskMessages; import org.apache.flink.runtime.messages.TaskMessages.CancelTask; import org.apache.flink.runtime.messages.TaskMessages.PartitionState; @@ -1369,6 +1371,28 @@ public class TaskManagerTest extends TestLogger { }}; } + @Test + public void testTerminationOnFatalError() { + new JavaTestKit(system){{ + + final ActorGateway taskManager = TestingUtils.createTaskManager( + system, + system.deadLetters(), // no jobmanager + new Configuration(), + true, + false); + + try { + watch(taskManager.actor()); + taskManager.tell(new FatalError("test fatal error", new Exception("something super bad"))); + expectTerminated(d, taskManager.actor()); + } + finally { + taskManager.tell(Kill.getInstance()); + } + }}; + } + // -------------------------------------------------------------------------------------------- public static class SimpleJobManager extends FlinkUntypedActor { @@ -1549,11 +1573,14 @@ public class TaskManagerTest extends TestLogger { @Override public void invoke() throws Exception { - Object o = new Object(); + final Object o = new Object(); + //noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (o) { - o.wait(); + //noinspection InfiniteLoopStatement + while (true) { + o.wait(); + } } } } - }