Repository: incubator-flink
Updated Branches:
  refs/heads/master 24c47362e -> 2b86e9536


[FLINK-1349] [runtime] Various cleanups to make scala runtime code interact 
smoother with java


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/cec30ff4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/cec30ff4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/cec30ff4

Branch: refs/heads/master
Commit: cec30ff4f1b75a9a4642819ac28c1ee1939d4343
Parents: 972a7b0
Author: Stephan Ewen <[email protected]>
Authored: Tue Jan 6 00:40:38 2015 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Tue Jan 6 11:07:02 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 11 ++++----
 .../runtime/messages/TaskManagerMessages.scala  | 27 ++++++++++++++++++-
 .../flink/runtime/taskmanager/TaskManager.scala | 17 +++++++++---
 .../runtime/taskmanager/TaskManagerTest.java    | 28 +++++++++++---------
 .../testingUtils/TestingTaskManager.scala       |  8 +++++-
 .../TestingTaskManagerMessages.scala            | 28 +++++++++++++++++---
 6 files changed, 91 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cec30ff4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 9100000..e7f4333 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -86,8 +86,8 @@ public class Execution implements Serializable {
        
        private static final int NUM_CANCEL_CALL_TRIES = 3;
 
-       public static FiniteDuration timeout = new 
FiniteDuration(ConfigConstants
-                       .DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS);
+       public static FiniteDuration timeout = new FiniteDuration(
+                       ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, 
TimeUnit.SECONDS);
        
        // 
--------------------------------------------------------------------------------------------
        
@@ -289,9 +289,9 @@ public class Execution implements Serializable {
 
                                @Override
                                public void onComplete(Throwable failure, 
Object success) throws Throwable {
-                                       if(failure != null){
+                                       if (failure != null) {
                                                markFailed(failure);
-                                       }else{
+                                       } else {
                                                TaskOperationResult result = 
(TaskOperationResult) success;
                                                if (success == null) {
                                                        markFailed(new 
Exception("Failed to deploy the task to slot " + slot + ": TaskOperationResult 
was null"));
@@ -305,8 +305,7 @@ public class Execution implements Serializable {
                                                else {
                                                        // deployment failed :(
                                                        markFailed(new 
Exception("Failed to deploy the task " +
-                                                                       
getVertexWithAttempt() + " to slot " + slot + ": " + result
-                                                                       
.description()));
+                                                                       
getVertexWithAttempt() + " to slot " + slot + ": " + result.description()));
                                                }
                                        }
                                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cec30ff4/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index aad9efe..f69b629 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -21,10 +21,11 @@ package org.apache.flink.runtime.messages
 import org.apache.flink.core.io.InputSplit
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.instance.InstanceID
+import org.apache.flink.runtime.instance.InstanceID 
 
 object TaskManagerMessages {
 
+  
   /**
    * Cancels the task associated with [[attemptID]]. The result is sent back 
to the sender as a
    * [[TaskOperationResult]] message.
@@ -113,4 +114,28 @@ object TaskManagerMessages {
    * @param cause reason for the external failure
    */
   case class FailTask(executionID: ExecutionAttemptID, cause: Throwable)
+  
+    // 
--------------------------------------------------------------------------
+  // Utility methods to allow simpler case object access from Java
+  // --------------------------------------------------------------------------
+  
+  def getNotifyWhenRegisteredAtJobManagerMessage() : AnyRef = {
+    NotifyWhenRegisteredAtJobManager
+  }
+  
+  def getRegisteredAtJobManagerMessage() : AnyRef = {
+    RegisteredAtJobManager
+  }
+  
+  def getRegisterAtJobManagerMessage() : AnyRef = {
+    RegisterAtJobManager
+  }
+
+  def getSendHeartbeatMessage() : AnyRef = {
+    SendHeartbeat
+  }
+
+  def getLogMemoryUsageMessage() : AnyRef = {
+    RegisteredAtJobManager
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cec30ff4/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 5b6ee86..4676ae9 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -171,14 +171,16 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo, val jobManagerAkka
 
       if (registered) {
         registrationScheduler.foreach(_.cancel())
-      } else if (registrationAttempts <= 
TaskManager.MAX_REGISTRATION_ATTEMPTS) {
+      }
+      else if (registrationAttempts <= TaskManager.MAX_REGISTRATION_ATTEMPTS) {
 
         log.info(s"Try to register at master ${jobManagerAkkaURL}. 
${registrationAttempts}. " +
           s"Attempt")
         val jobManager = context.actorSelection(jobManagerAkkaURL)
 
         jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, 
numberOfSlots)
-      } else {
+      }
+      else {
         log.error("TaskManager could not register at JobManager.");
         self ! PoisonPill
       }
@@ -212,6 +214,10 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo, val jobManagerAkka
         waitForRegistration.clear()
       }
     }
+    
+    case SubmitTask(tdd) => {
+      submitTask(tdd)
+    }
 
     case CancelTask(executionID) => {
       runningTasks.get(executionID) match {
@@ -502,7 +508,8 @@ class TaskManager(val connectionInfo: 
InstanceConnectionInfo, val jobManagerAkka
 }
 
 /**
- * TaskManager companion object. Contains TaskManager executable entry point, 
command line parsing, and constants.
+ * TaskManager companion object. Contains TaskManager executable entry point, 
command
+ * line parsing, and constants.
  */
 object TaskManager {
 
@@ -749,7 +756,9 @@ object TaskManager {
       s"NON HEAP: $nonHeapUsed/$nonHeapCommitted/$nonHeapMax MB 
(used/committed/max)]"
   }
 
-  private def getGarbageCollectorStatsAsString(gcMXBeans: 
Iterable[GarbageCollectorMXBean]): String = {
+  private def getGarbageCollectorStatsAsString(gcMXBeans: 
Iterable[GarbageCollectorMXBean]) 
+    : String =
+  {
     val beans = gcMXBeans map {
       bean =>
         s"[${bean.getName}, GC TIME (ms): ${bean.getCollectionTime}, " +

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cec30ff4/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 500532d..4355298 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -26,6 +26,7 @@ import akka.japi.Creator;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
+
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -46,15 +47,16 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.RegistrationMessages;
 import org.apache.flink.runtime.messages.TaskManagerMessages.CancelTask;
-import 
org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager$;
 import org.apache.flink.runtime.messages.TaskManagerMessages.SubmitTask;
 import 
org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.types.IntegerRecord;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -162,8 +164,8 @@ public class TaskManagerTest {
 
                                                        expectMsgEquals(new 
TaskOperationResult(eid1, true));
                                                        expectMsgEquals(new 
TaskOperationResult(eid2, true));
-
-                                                       
tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef());
+                                                       
+                                                       
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
 
                                                        Map<ExecutionAttemptID, 
Task> runningTasks = expectMsgClass(TestingTaskManagerMessages
                                                                        
.ResponseRunningTasks.class).asJava();
@@ -187,7 +189,7 @@ public class TaskManagerTest {
 
                                                        
assertEquals(ExecutionState.CANCELED, t1.getExecutionState());
 
-                                                       
tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef());
+                                                       
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                                                        runningTasks = 
expectMsgClass(TestingTaskManagerMessages
                                                                        
.ResponseRunningTasks.class).asJava();
 
@@ -206,7 +208,7 @@ public class TaskManagerTest {
 
                                                        
assertEquals(ExecutionState.CANCELED, t2.getExecutionState());
 
-                                                       
tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef());
+                                                       
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                                                        runningTasks = 
expectMsgClass(TestingTaskManagerMessages
                                                                        
.ResponseRunningTasks.class).asJava();
 
@@ -276,7 +278,7 @@ public class TaskManagerTest {
                                                        expectMsgEquals(true);
                                                        expectMsgEquals(true);
 
-                                                       
tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef());
+                                                       
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                                                        Map<ExecutionAttemptID, 
Task> tasks = expectMsgClass(TestingTaskManagerMessages
                                                                        
.ResponseRunningTasks.class).asJava();
 
@@ -337,7 +339,7 @@ public class TaskManagerTest {
                                                tm.tell(new SubmitTask(tdd1), 
getRef());
                                                expectMsgEquals(new 
TaskOperationResult(eid1, true));
 
-                                               
tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef());
+                                               
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                                                Map<ExecutionAttemptID, Task> 
tasks = expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks
                                                                
.class).asJava();
 
@@ -359,7 +361,7 @@ public class TaskManagerTest {
                                assertEquals(ExecutionState.FINISHED, 
t2.getExecutionState());
                        }
 
-                                               
tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef());
+                                               
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                                                tasks = 
expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks
                                                                
.class).asJava();
 
@@ -424,7 +426,7 @@ public class TaskManagerTest {
                                                expectMsgEquals(new 
TaskOperationResult(eid2, true));
                                                expectMsgEquals(new 
TaskOperationResult(eid1, true));
 
-                                               
tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef());
+                                               
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                                                Map<ExecutionAttemptID, Task> 
tasks = expectMsgClass(TestingTaskManagerMessages
                                                                
.ResponseRunningTasks.class).asJava();
 
@@ -450,7 +452,7 @@ public class TaskManagerTest {
                                                        Await.ready(response, 
d);
                                                }
 
-                                               
tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef());
+                                               
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                                                tasks = 
expectMsgClass(TestingTaskManagerMessages
                                                                
.ResponseRunningTasks.class).asJava();
 
@@ -514,6 +516,7 @@ public class TaskManagerTest {
                }
        }
 
+       @SuppressWarnings("serial")
        public static class SimpleLookupJobManagerCreator implements 
Creator<SimpleLookupJobManager>{
                private final ChannelID receiverID;
 
@@ -527,6 +530,7 @@ public class TaskManagerTest {
                }
        }
 
+       @SuppressWarnings("serial")
        public static class SimpleLookupFailingUpdateJobManagerCreator 
implements
                        Creator<SimpleLookupFailingUpdateJobManager>{
                private final ChannelID receiverID;
@@ -550,8 +554,8 @@ public class TaskManagerTest {
 
                ActorRef taskManager = 
TestingUtils.startTestingTaskManagerWithConfiguration("localhost", cfg, system);
 
-               Future<Object> response = Patterns.ask(taskManager, 
NotifyWhenRegisteredAtJobManager$.MODULE$,
-                               timeout);
+               Future<Object> response = Patterns.ask(taskManager, 
+                               
TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), timeout);
 
                try {
                        FiniteDuration d = new FiniteDuration(20, 
TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cec30ff4/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 080af11..b1aa437 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -39,8 +39,10 @@ trait TestingTaskManager extends ActorLogMessages {
   }
 
   def receiveTestMessages: Receive = {
+    
     case RequestRunningTasks =>
       sender ! ResponseRunningTasks(runningTasks.toMap)
+      
     case NotifyWhenTaskRemoved(executionID) =>
       runningTasks.get(executionID) match {
         case Some(_) =>
@@ -48,16 +50,19 @@ trait TestingTaskManager extends ActorLogMessages {
           waitForRemoval += (executionID -> (set + sender))
         case None => sender ! true
       }
+      
     case UnregisterTask(executionID) =>
       super.receiveWithLogMessages(UnregisterTask(executionID))
       waitForRemoval.get(executionID) match {
         case Some(actors) => for(actor <- actors) actor ! true
         case None =>
       }
+      
     case RequestBroadcastVariablesWithReferences => {
       sender ! ResponseBroadcastVariablesWithReferences(
         bcVarManager.getNumberOfVariablesWithReferences)
     }
+    
     case NotifyWhenJobRemoved(jobID) => {
       if(runningTasks.values.exists(_.getJobID == jobID)){
         val set = waitForJobRemoval.getOrElse(jobID, Set())
@@ -71,13 +76,14 @@ trait TestingTaskManager extends ActorLogMessages {
         }
       }
     }
+    
     case CheckIfJobRemoved(jobID) => {
       if(runningTasks.values.forall(_.getJobID != jobID)){
         waitForJobRemoval.get(jobID) match {
           case Some(listeners) => listeners foreach (_ ! true)
           case None =>
         }
-      }else{
+      } else {
         import context.dispatcher
         context.system.scheduler.scheduleOnce(200 milliseconds, this.self, 
CheckIfJobRemoved(jobID))
       }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cec30ff4/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
index cb5282e..38cc829 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
@@ -22,16 +22,36 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.taskmanager.Task
 
-object TestingTaskManagerMessages{
+/**
+ * Additional messages that the [[TestingTaskManager]] understands.
+ */
+object TestingTaskManagerMessages {
+  
   case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID)
-
-  case object RequestRunningTasks
+  
   case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){
     import collection.JavaConverters._
     def asJava: java.util.Map[ExecutionAttemptID, Task] = tasks.asJava
   }
-  case object RequestBroadcastVariablesWithReferences
+  
   case class ResponseBroadcastVariablesWithReferences(number: Int)
 
   case class CheckIfJobRemoved(jobID: JobID)
+  
+  case object RequestRunningTasks
+  
+  case object RequestBroadcastVariablesWithReferences
+  
+  // --------------------------------------------------------------------------
+  // Utility methods to allow simpler case object access from Java
+  // --------------------------------------------------------------------------
+  
+  def getRequestRunningTasksMessage() : AnyRef = {
+    RequestRunningTasks
+  }
+  
+  def getRequestBroadcastVariablesWithReferencesMessage() : AnyRef = {
+    RequestBroadcastVariablesWithReferences
+  }
 }
+

Reply via email to