Repository: incubator-flink Updated Branches: refs/heads/master 24c47362e -> 2b86e9536
[FLINK-1349] [runtime] Various cleanups to make scala runtime code interact smoother with java Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/cec30ff4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/cec30ff4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/cec30ff4 Branch: refs/heads/master Commit: cec30ff4f1b75a9a4642819ac28c1ee1939d4343 Parents: 972a7b0 Author: Stephan Ewen <[email protected]> Authored: Tue Jan 6 00:40:38 2015 +0100 Committer: Stephan Ewen <[email protected]> Committed: Tue Jan 6 11:07:02 2015 +0100 ---------------------------------------------------------------------- .../flink/runtime/executiongraph/Execution.java | 11 ++++---- .../runtime/messages/TaskManagerMessages.scala | 27 ++++++++++++++++++- .../flink/runtime/taskmanager/TaskManager.scala | 17 +++++++++--- .../runtime/taskmanager/TaskManagerTest.java | 28 +++++++++++--------- .../testingUtils/TestingTaskManager.scala | 8 +++++- .../TestingTaskManagerMessages.scala | 28 +++++++++++++++++--- 6 files changed, 91 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cec30ff4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 9100000..e7f4333 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -86,8 +86,8 @@ public class Execution implements Serializable { private static final int NUM_CANCEL_CALL_TRIES = 3; - public static FiniteDuration timeout = new FiniteDuration(ConfigConstants - .DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS); + public static FiniteDuration timeout = new FiniteDuration( + ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS); // -------------------------------------------------------------------------------------------- @@ -289,9 +289,9 @@ public class Execution implements Serializable { @Override public void onComplete(Throwable failure, Object success) throws Throwable { - if(failure != null){ + if (failure != null) { markFailed(failure); - }else{ + } else { TaskOperationResult result = (TaskOperationResult) success; if (success == null) { markFailed(new Exception("Failed to deploy the task to slot " + slot + ": TaskOperationResult was null")); @@ -305,8 +305,7 @@ public class Execution implements Serializable { else { // deployment failed :( markFailed(new Exception("Failed to deploy the task " + - getVertexWithAttempt() + " to slot " + slot + ": " + result - .description())); + getVertexWithAttempt() + " to slot " + slot + ": " + result.description())); } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cec30ff4/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala index aad9efe..f69b629 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala @@ -21,10 +21,11 @@ package org.apache.flink.runtime.messages import org.apache.flink.core.io.InputSplit import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor import org.apache.flink.runtime.executiongraph.ExecutionAttemptID -import org.apache.flink.runtime.instance.InstanceID +import org.apache.flink.runtime.instance.InstanceID object TaskManagerMessages { + /** * Cancels the task associated with [[attemptID]]. The result is sent back to the sender as a * [[TaskOperationResult]] message. @@ -113,4 +114,28 @@ object TaskManagerMessages { * @param cause reason for the external failure */ case class FailTask(executionID: ExecutionAttemptID, cause: Throwable) + + // -------------------------------------------------------------------------- + // Utility methods to allow simpler case object access from Java + // -------------------------------------------------------------------------- + + def getNotifyWhenRegisteredAtJobManagerMessage() : AnyRef = { + NotifyWhenRegisteredAtJobManager + } + + def getRegisteredAtJobManagerMessage() : AnyRef = { + RegisteredAtJobManager + } + + def getRegisterAtJobManagerMessage() : AnyRef = { + RegisterAtJobManager + } + + def getSendHeartbeatMessage() : AnyRef = { + SendHeartbeat + } + + def getLogMemoryUsageMessage() : AnyRef = { + RegisteredAtJobManager + } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cec30ff4/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 5b6ee86..4676ae9 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -171,14 +171,16 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka if (registered) { registrationScheduler.foreach(_.cancel()) - } else if (registrationAttempts <= TaskManager.MAX_REGISTRATION_ATTEMPTS) { + } + else if (registrationAttempts <= TaskManager.MAX_REGISTRATION_ATTEMPTS) { log.info(s"Try to register at master ${jobManagerAkkaURL}. ${registrationAttempts}. " + s"Attempt") val jobManager = context.actorSelection(jobManagerAkkaURL) jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) - } else { + } + else { log.error("TaskManager could not register at JobManager."); self ! PoisonPill } @@ -212,6 +214,10 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka waitForRegistration.clear() } } + + case SubmitTask(tdd) => { + submitTask(tdd) + } case CancelTask(executionID) => { runningTasks.get(executionID) match { @@ -502,7 +508,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka } /** - * TaskManager companion object. Contains TaskManager executable entry point, command line parsing, and constants. + * TaskManager companion object. Contains TaskManager executable entry point, command + * line parsing, and constants. */ object TaskManager { @@ -749,7 +756,9 @@ object TaskManager { s"NON HEAP: $nonHeapUsed/$nonHeapCommitted/$nonHeapMax MB (used/committed/max)]" } - private def getGarbageCollectorStatsAsString(gcMXBeans: Iterable[GarbageCollectorMXBean]): String = { + private def getGarbageCollectorStatsAsString(gcMXBeans: Iterable[GarbageCollectorMXBean]) + : String = + { val beans = gcMXBeans map { bean => s"[${bean.getName}, GC TIME (ms): ${bean.getCollectionTime}, " + http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cec30ff4/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 500532d..4355298 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -26,6 +26,7 @@ import akka.japi.Creator; import akka.pattern.Patterns; import akka.testkit.JavaTestKit; import akka.util.Timeout; + import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -46,15 +47,16 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.RegistrationMessages; import org.apache.flink.runtime.messages.TaskManagerMessages.CancelTask; -import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager$; import org.apache.flink.runtime.messages.TaskManagerMessages.SubmitTask; import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult; +import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.types.IntegerRecord; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; + import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -162,8 +164,8 @@ public class TaskManagerTest { expectMsgEquals(new TaskOperationResult(eid1, true)); expectMsgEquals(new TaskOperationResult(eid2, true)); - - tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef()); + + tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef()); Map<ExecutionAttemptID, Task> runningTasks = expectMsgClass(TestingTaskManagerMessages .ResponseRunningTasks.class).asJava(); @@ -187,7 +189,7 @@ public class TaskManagerTest { assertEquals(ExecutionState.CANCELED, t1.getExecutionState()); - tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef()); + tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef()); runningTasks = expectMsgClass(TestingTaskManagerMessages .ResponseRunningTasks.class).asJava(); @@ -206,7 +208,7 @@ public class TaskManagerTest { assertEquals(ExecutionState.CANCELED, t2.getExecutionState()); - tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef()); + tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef()); runningTasks = expectMsgClass(TestingTaskManagerMessages .ResponseRunningTasks.class).asJava(); @@ -276,7 +278,7 @@ public class TaskManagerTest { expectMsgEquals(true); expectMsgEquals(true); - tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef()); + tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef()); Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages .ResponseRunningTasks.class).asJava(); @@ -337,7 +339,7 @@ public class TaskManagerTest { tm.tell(new SubmitTask(tdd1), getRef()); expectMsgEquals(new TaskOperationResult(eid1, true)); - tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef()); + tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef()); Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks .class).asJava(); @@ -359,7 +361,7 @@ public class TaskManagerTest { assertEquals(ExecutionState.FINISHED, t2.getExecutionState()); } - tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef()); + tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef()); tasks = expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks .class).asJava(); @@ -424,7 +426,7 @@ public class TaskManagerTest { expectMsgEquals(new TaskOperationResult(eid2, true)); expectMsgEquals(new TaskOperationResult(eid1, true)); - tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef()); + tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef()); Map<ExecutionAttemptID, Task> tasks = expectMsgClass(TestingTaskManagerMessages .ResponseRunningTasks.class).asJava(); @@ -450,7 +452,7 @@ public class TaskManagerTest { Await.ready(response, d); } - tm.tell(TestingTaskManagerMessages.RequestRunningTasks$.MODULE$, getRef()); + tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef()); tasks = expectMsgClass(TestingTaskManagerMessages .ResponseRunningTasks.class).asJava(); @@ -514,6 +516,7 @@ public class TaskManagerTest { } } + @SuppressWarnings("serial") public static class SimpleLookupJobManagerCreator implements Creator<SimpleLookupJobManager>{ private final ChannelID receiverID; @@ -527,6 +530,7 @@ public class TaskManagerTest { } } + @SuppressWarnings("serial") public static class SimpleLookupFailingUpdateJobManagerCreator implements Creator<SimpleLookupFailingUpdateJobManager>{ private final ChannelID receiverID; @@ -550,8 +554,8 @@ public class TaskManagerTest { ActorRef taskManager = TestingUtils.startTestingTaskManagerWithConfiguration("localhost", cfg, system); - Future<Object> response = Patterns.ask(taskManager, NotifyWhenRegisteredAtJobManager$.MODULE$, - timeout); + Future<Object> response = Patterns.ask(taskManager, + TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), timeout); try { FiniteDuration d = new FiniteDuration(20, TimeUnit.SECONDS); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cec30ff4/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala index 080af11..b1aa437 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala @@ -39,8 +39,10 @@ trait TestingTaskManager extends ActorLogMessages { } def receiveTestMessages: Receive = { + case RequestRunningTasks => sender ! ResponseRunningTasks(runningTasks.toMap) + case NotifyWhenTaskRemoved(executionID) => runningTasks.get(executionID) match { case Some(_) => @@ -48,16 +50,19 @@ trait TestingTaskManager extends ActorLogMessages { waitForRemoval += (executionID -> (set + sender)) case None => sender ! true } + case UnregisterTask(executionID) => super.receiveWithLogMessages(UnregisterTask(executionID)) waitForRemoval.get(executionID) match { case Some(actors) => for(actor <- actors) actor ! true case None => } + case RequestBroadcastVariablesWithReferences => { sender ! ResponseBroadcastVariablesWithReferences( bcVarManager.getNumberOfVariablesWithReferences) } + case NotifyWhenJobRemoved(jobID) => { if(runningTasks.values.exists(_.getJobID == jobID)){ val set = waitForJobRemoval.getOrElse(jobID, Set()) @@ -71,13 +76,14 @@ trait TestingTaskManager extends ActorLogMessages { } } } + case CheckIfJobRemoved(jobID) => { if(runningTasks.values.forall(_.getJobID != jobID)){ waitForJobRemoval.get(jobID) match { case Some(listeners) => listeners foreach (_ ! true) case None => } - }else{ + } else { import context.dispatcher context.system.scheduler.scheduleOnce(200 milliseconds, this.self, CheckIfJobRemoved(jobID)) } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cec30ff4/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala index cb5282e..38cc829 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala @@ -22,16 +22,36 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID import org.apache.flink.runtime.jobgraph.JobID import org.apache.flink.runtime.taskmanager.Task -object TestingTaskManagerMessages{ +/** + * Additional messages that the [[TestingTaskManager]] understands. + */ +object TestingTaskManagerMessages { + case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID) - - case object RequestRunningTasks + case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){ import collection.JavaConverters._ def asJava: java.util.Map[ExecutionAttemptID, Task] = tasks.asJava } - case object RequestBroadcastVariablesWithReferences + case class ResponseBroadcastVariablesWithReferences(number: Int) case class CheckIfJobRemoved(jobID: JobID) + + case object RequestRunningTasks + + case object RequestBroadcastVariablesWithReferences + + // -------------------------------------------------------------------------- + // Utility methods to allow simpler case object access from Java + // -------------------------------------------------------------------------- + + def getRequestRunningTasksMessage() : AnyRef = { + RequestRunningTasks + } + + def getRequestBroadcastVariablesWithReferencesMessage() : AnyRef = { + RequestBroadcastVariablesWithReferences + } } +
