Repository: flink
Updated Branches:
  refs/heads/release-1.1 62c666f57 -> caa0fbb21


[FLINK-4672] [taskmanager] Do not decorate Actor Kill messages


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/caa0fbb2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/caa0fbb2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/caa0fbb2

Branch: refs/heads/release-1.1
Commit: caa0fbb2157de56c9bdc4bbf8aedb73df90edede
Parents: 62c666f
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Sep 23 18:42:47 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 26 11:25:01 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/taskmanager/TaskManager.scala |  2 +-
 .../runtime/taskmanager/TaskManagerTest.java    | 33 ++++++++++++++++++--
 2 files changed, 31 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/caa0fbb2/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a7dd789..8e787bb 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1333,7 +1333,7 @@ class TaskManager(
       "\n" +
       "A fatal error occurred, forcing the TaskManager to shut down: " + 
message, cause)
 
-    self ! decorateMessage(Kill)
+    self ! Kill
   }
 
   override def notifyLeaderAddress(leaderAddress: String, leaderSessionID: 
UUID): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/caa0fbb2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index ce88c09..1c50265 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.Kill;
 import akka.actor.Props;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
@@ -55,6 +56,7 @@ import 
org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackT
 import 
org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleSuccess;
 import 
org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
 import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
 import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
@@ -1369,6 +1371,28 @@ public class TaskManagerTest extends TestLogger {
                }};
        }
 
+       @Test
+       public void testTerminationOnFatalError() {
+               new JavaTestKit(system){{
+
+                       final ActorGateway taskManager = 
TestingUtils.createTaskManager(
+                                       system,
+                                       system.deadLetters(), // no jobmanager
+                                       new Configuration(),
+                                       true,
+                                       false);
+
+                       try {
+                               watch(taskManager.actor());
+                               taskManager.tell(new FatalError("test fatal 
error", new Exception("something super bad")));
+                               expectTerminated(d, taskManager.actor());
+                       }
+                       finally {
+                               taskManager.tell(Kill.getInstance());
+                       }
+               }};
+       }
+       
        // 
--------------------------------------------------------------------------------------------
 
        public static class SimpleJobManager extends FlinkUntypedActor {
@@ -1549,11 +1573,14 @@ public class TaskManagerTest extends TestLogger {
 
                @Override
                public void invoke() throws Exception {
-                       Object o = new Object();
+                       final Object o = new Object();
+                       //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
                        synchronized (o) {
-                               o.wait();
+                               //noinspection InfiniteLoopStatement
+                               while (true) {
+                                       o.wait();
+                               }
                        }
                }
        }
-
 }

Reply via email to