http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 79b9a74..c9a60b4 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -20,11 +20,9 @@ package org.apache.flink.runtime.jobmanager import java.io.{IOException, File} import java.net.InetSocketAddress -import java.util.concurrent.TimeUnit import akka.actor._ -import akka.pattern.Patterns -import akka.pattern.{ask, pipe} +import akka.pattern.ask import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration} import org.apache.flink.core.io.InputSplitAssigner import org.apache.flink.runtime.blob.BlobServer @@ -37,7 +35,7 @@ import org.apache.flink.runtime.{JobException, ActorLogMessages} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.instance.InstanceManager -import org.apache.flink.runtime.jobgraph.{JobStatus, JobID} +import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobID} import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.runtime.messages.JobManagerMessages._ @@ -48,14 +46,43 @@ import org.slf4j.LoggerFactory import scala.concurrent.Future import scala.concurrent.duration._ +import scala.language.postfixOps -class JobManager(val configuration: Configuration) - extends Actor with ActorLogMessages with ActorLogging { +/** + * The job manager is responsible for receiving Flink jobs, scheduling the tasks, gathering the + * job status and managing the task managers. It is realized as an actor and receives amongst others + * the following messages: + * + * - [[RegisterTaskManager]] is sent by a TaskManager which wants to registe at the job manager. + * A successful registration at the instance manager is acknowledged by [[AcknowledgeRegistration]] + * + * - [[SubmitJob]] is sent by a client which wants to submit a job to the system. The submit + * message contains the job description in the form of the JobGraph. The JobGraph is appended to + * the ExecutionGraph and the corresponding JobExecutionVertices are scheduled for execution on + * the TaskManagers. + * + * - [[CancelJob]] requests to cancel the job with the specified jobID. A successful cancellation + * is indicated by [[CancellationSuccess]] and a failure by [[CancellationFailure]] + * + * - [[UpdateTaskExecutionState]] is sent by a TaskManager to update the state of an + * [[org.apache.flink.runtime.executiongraph.ExecutionVertex]] contained in the [[ExecutionGraph]]. + * A successful update is acknowledged by true and otherwise false. + * + * - [[RequestNextInputSplit]] requests the next input split for a running task on a + * [[TaskManager]]. The assigned input split or null is sent to the sender in the form of the + * message [[NextInputSplit]]. + * + * - [[JobStatusChanged]] indicates that the status of job (RUNNING, CANCELING, FINISHED, etc.) has + * changed. This message is sent by the ExecutionGraph. + * + * @param configuration object with user provided configuration values + */ +class JobManager(val configuration: Configuration) extends +Actor with ActorLogMessages with ActorLogging { import context._ import scala.collection.JavaConverters._ - implicit val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT, - ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS) + implicit val timeout = AkkaUtils.getTimeout(configuration) log.info(s"Starting job manager at ${self.path}.") @@ -117,7 +144,7 @@ class JobManager(val configuration: Configuration) } override def receiveWithLogMessages: Receive = { - case RegisterTaskManager(connectionInfo, hardwareInformation, numberOfSlots) => { + case RegisterTaskManager(connectionInfo, hardwareInformation, numberOfSlots) => val taskManager = sender val instanceID = instanceManager.registerTaskManager(taskManager, connectionInfo, hardwareInformation, numberOfSlots) @@ -132,146 +159,35 @@ class JobManager(val configuration: Configuration) taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort) } - } - case RequestNumberRegisteredTaskManager => { + + case RequestNumberRegisteredTaskManager => sender ! instanceManager.getNumberOfRegisteredTaskManagers - } - case RequestTotalNumberOfSlots => { + case RequestTotalNumberOfSlots => sender ! instanceManager.getTotalNumberOfSlots - } - - case SubmitJob(jobGraph, listenToEvents, detach) => { - try { - if (jobGraph == null) { - sender ! akka.actor.Status.Failure(new IllegalArgumentException("JobGraph must not be" + - " null.")) - } else { - log.info("Received job {} ({}).", jobGraph.getJobID, jobGraph.getName) - - if (jobGraph.getNumberOfVertices == 0) { - sender ! SubmissionFailure(jobGraph.getJobID, new IllegalArgumentException("Job is " + - "empty.")) - } else { - // Create the user code class loader - libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys) - - val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID) - - val (executionGraph, jobInfo) = currentJobs.getOrElseUpdate(jobGraph.getJobID(), - (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName, - jobGraph.getJobConfiguration, timeout, jobGraph.getUserJarBlobKeys, userCodeLoader), - JobInfo(sender, System.currentTimeMillis()))) - - val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries >= 0) { - jobGraph.getNumberOfExecutionRetries - } else { - defaultExecutionRetries - } - - executionGraph.setNumberOfRetriesLeft(jobNumberRetries) - executionGraph.setDelayBeforeRetrying(delayBetweenRetries) - - if (userCodeLoader == null) { - throw new JobException("The user code class loader could not be initialized.") - } - - if (log.isDebugEnabled) { - log.debug("Running master initialization of job {} ({}).", - jobGraph.getJobID, jobGraph.getName) - } - - for (vertex <- jobGraph.getVertices.asScala) { - val executableClass = vertex.getInvokableClassName - if (executableClass == null || executableClass.length == 0) { - throw new JobException(s"The vertex ${vertex.getID} (${vertex.getName}) has no " + - s"invokable class.") - } - - vertex.initializeOnMaster(userCodeLoader) - } - - // topological sorting of the job vertices - val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources - - if (log.isDebugEnabled) { - log.debug("Adding {} vertices from job graph {} ({}).", - sortedTopology.size(), jobGraph.getJobID, jobGraph.getName) - } - executionGraph.attachJobGraph(sortedTopology) + case SubmitJob(jobGraph, listen, d) => + submitJob(jobGraph, listenToEvents = listen, detached = d) - if (log.isDebugEnabled) { - log.debug("Successfully created execution graph from job graph {} ({}).", - jobGraph.getJobID, jobGraph.getName) - } - - executionGraph.setScheduleMode(jobGraph.getScheduleMode) - executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling) - - // get notified about job status changes - executionGraph.registerJobStatusListener(self) - - if (listenToEvents) { - // the sender will be notified about state changes - executionGraph.registerExecutionListener(sender) - executionGraph.registerJobStatusListener(sender) - } - - jobInfo.detach = detach - - log.info("Scheduling job {}.", jobGraph.getName) - - executionGraph.scheduleForExecution(scheduler) - - sender ! SubmissionSuccess(jobGraph.getJobID) - } - } - } catch { - case t: Throwable => - log.error(t, "Job submission failed.") - - currentJobs.get(jobGraph.getJobID) match { - case Some((executionGraph, jobInfo)) => - executionGraph.fail(t) - - // don't send the client the final job status because we will send him - // SubmissionFailure - jobInfo.detach = true - - val status = Patterns.ask(self, RequestFinalJobStatus(jobGraph.getJobID), 10 second) - status.onFailure{ - case _: Throwable => self ! JobStatusChanged(executionGraph.getJobID, - JobStatus.FAILED, System.currentTimeMillis(), - s"Cleanup job ${jobGraph.getJobID}.") - } - case None => - libraryCacheManager.unregisterJob(jobGraph.getJobID) - currentJobs.remove(jobGraph.getJobID) - } - - sender ! SubmissionFailure(jobGraph.getJobID, t) - } - } - - case CancelJob(jobID) => { + case CancelJob(jobID) => log.info("Trying to cancel job with ID {}.", jobID) currentJobs.get(jobID) match { case Some((executionGraph, _)) => + // execute the cancellation asynchronously Future { executionGraph.cancel() } + sender ! CancellationSuccess(jobID) case None => log.info("No job found with ID {}.", jobID) sender ! CancellationFailure(jobID, new IllegalArgumentException("No job found with " + - s"ID ${jobID}.")) + s"ID $jobID.")) } - } - case UpdateTaskExecutionState(taskExecutionState) => { + case UpdateTaskExecutionState(taskExecutionState) => if(taskExecutionState == null){ sender ! false }else { @@ -286,12 +202,11 @@ class JobManager(val configuration: Configuration) sender ! false } } - } - case RequestNextInputSplit(jobID, vertexID, executionAttempt) => { + case RequestNextInputSplit(jobID, vertexID, executionAttempt) => val nextInputSplit = currentJobs.get(jobID) match { case Some((executionGraph,_)) => - val execution = executionGraph.getRegisteredExecutions().get(executionAttempt) + val execution = executionGraph.getRegisteredExecutions.get(executionAttempt) if(execution == null){ log.error("Can not find Execution for attempt {}.", executionAttempt) @@ -328,9 +243,8 @@ class JobManager(val configuration: Configuration) log.debug("Send next input split {}.", nextInputSplit) } sender ! NextInputSplit(nextInputSplit) - } - case JobStatusChanged(jobID, newJobStatus, timeStamp, optionalMessage) => { + case JobStatusChanged(jobID, newJobStatus, timeStamp, optionalMessage) => currentJobs.get(jobID) match { case Some((executionGraph, jobInfo)) => executionGraph.getJobName log.info("Status of job {} ({}) changed to {}{}.", @@ -340,7 +254,8 @@ class JobManager(val configuration: Configuration) if(newJobStatus.isTerminalState) { jobInfo.end = timeStamp - if(!jobInfo.detach) { + // is the client waiting for the job result? + if(!jobInfo.detached) { newJobStatus match { case JobStatus.FINISHED => val accumulatorResults = accumulatorManager.getJobAccumulatorResults(jobID) @@ -364,86 +279,220 @@ class JobManager(val configuration: Configuration) case None => removeJob(jobID) } - } - case RequestFinalJobStatus(jobID) => { + case RequestFinalJobStatus(jobID) => currentJobs.get(jobID) match { case Some(_) => val listeners = finalJobStatusListener.getOrElse(jobID, Set()) finalJobStatusListener += jobID -> (listeners + sender) case None => - archive ! RequestJobStatus(jobID) + // There is no job running with this job ID. Check the archive. + archive forward RequestJobStatus(jobID) } - } - case ScheduleOrUpdateConsumers(jobId, executionId, partitionIndex) => { + case ScheduleOrUpdateConsumers(jobId, executionId, partitionIndex) => currentJobs.get(jobId) match { case Some((executionGraph, _)) => - sender ! ConsumerNotificationResult(executionGraph - .scheduleOrUpdateConsumers(executionId, partitionIndex)) + sender ! ConsumerNotificationResult( + executionGraph.scheduleOrUpdateConsumers(executionId, partitionIndex) + ) case None => log.error("Cannot find execution graph for job ID {}.", jobId) - sender ! ConsumerNotificationResult(false, Some( + sender ! ConsumerNotificationResult(success = false, Some( new IllegalStateException("Cannot find execution graph for job ID " + jobId))) } - } - case ReportAccumulatorResult(accumulatorEvent) => { + case ReportAccumulatorResult(accumulatorEvent) => accumulatorManager.processIncomingAccumulators(accumulatorEvent.getJobID, - accumulatorEvent.getAccumulators - (libraryCacheManager.getClassLoader(accumulatorEvent.getJobID))) - } + accumulatorEvent.getAccumulators( + libraryCacheManager.getClassLoader(accumulatorEvent.getJobID) + ) + ) - case RequestAccumulatorResults(jobID) => { + case RequestAccumulatorResults(jobID) => import scala.collection.JavaConverters._ sender ! AccumulatorResultsFound(jobID, accumulatorManager.getJobAccumulatorResults (jobID).asScala.toMap) - } - case RequestJobStatus(jobID) => { + case RequestJobStatus(jobID) => currentJobs.get(jobID) match { case Some((executionGraph,_)) => sender ! CurrentJobStatus(jobID, executionGraph.getState) - case None => (archive ? RequestJobStatus(jobID))(timeout) pipeTo sender + case None => + // check the archive + archive forward RequestJobStatus(jobID) } - } - case RequestRunningJobs => { + case RequestRunningJobs => val executionGraphs = currentJobs map { case (_, (eg, jobInfo)) => eg } sender ! RunningJobs(executionGraphs) - } - case RequestJob(jobID) => { + case RequestJob(jobID) => currentJobs.get(jobID) match { case Some((eg, _)) => sender ! JobFound(jobID, eg) - case None => (archive ? RequestJob(jobID))(timeout) pipeTo sender + case None => + // check the archive + archive forward RequestJob(jobID) } - } - case RequestBlobManagerPort => { + case RequestBlobManagerPort => sender ! libraryCacheManager.getBlobServerPort - } - case RequestRegisteredTaskManagers => { + case RequestRegisteredTaskManagers => import scala.collection.JavaConverters._ sender ! RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala) - } - case Heartbeat(instanceID) => { + case Heartbeat(instanceID) => instanceManager.reportHeartBeat(instanceID) - } - case Terminated(taskManager) => { + case Terminated(taskManager) => log.info("Task manager {} terminated.", taskManager.path) JobManager.LOG.warn(s"Task manager ${taskManager.path} terminated.") instanceManager.unregisterTaskManager(taskManager) context.unwatch(taskManager) - } - case RequestJobManagerStatus => { + case RequestJobManagerStatus => sender ! JobManagerStatusAlive + } + + /** + * Submits a job to the job manager. The job is registered at the libraryCacheManager which + * creates the job's class loader. The job graph is appended to the corresponding execution + * graph and the execution vertices are queued for scheduling. + * + * @param jobGraph representing the Flink job + * @param listenToEvents true if the sender wants to listen to job status and execution state + * change notificatinos. false if not. + * @param detached true if the job runs in detached mode, meaning that the sender does not wait + * for the result of the job. false otherwise. + */ + private def submitJob(jobGraph: JobGraph, listenToEvents: Boolean, detached: Boolean): Unit = { + try { + if (jobGraph == null) { + sender ! akka.actor.Status.Failure(new IllegalArgumentException("JobGraph must not be" + + " null.")) + } else { + log.info(s"Received job ${jobGraph.getJobID} (${jobGraph.getName}).") + + if (jobGraph.getNumberOfVertices == 0) { + sender ! SubmissionFailure(jobGraph.getJobID, new IllegalArgumentException("Job is " + + "empty.")) + } else { + // Create the user code class loader + libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys) + + val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID) + + // see if there already exists an ExecutionGraph for the corresponding job ID + val (executionGraph, jobInfo) = currentJobs.getOrElseUpdate(jobGraph.getJobID, + (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName, + jobGraph.getJobConfiguration, timeout, jobGraph.getUserJarBlobKeys, userCodeLoader), + JobInfo(sender, System.currentTimeMillis()))) + + val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries >= 0) { + jobGraph.getNumberOfExecutionRetries + } else { + defaultExecutionRetries + } + + executionGraph.setNumberOfRetriesLeft(jobNumberRetries) + executionGraph.setDelayBeforeRetrying(delayBetweenRetries) + + if (userCodeLoader == null) { + throw new JobException("The user code class loader could not be initialized.") + } + + if (log.isDebugEnabled) { + log.debug(s"Running master initialization of job ${jobGraph.getJobID} (${ + jobGraph + .getName + }}).") + } + + for (vertex <- jobGraph.getVertices.asScala) { + val executableClass = vertex.getInvokableClassName + if (executableClass == null || executableClass.length == 0) { + throw new JobException(s"The vertex ${vertex.getID} (${vertex.getName}) has no " + + s"invokable class.") + } + + vertex.initializeOnMaster(userCodeLoader) + } + + // topological sorting of the job vertices + val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources + + if (log.isDebugEnabled) { + log.debug(s"Adding ${sortedTopology.size()} vertices from job graph " + + s"${jobGraph.getJobID} (${jobGraph.getName}).") + } + + executionGraph.attachJobGraph(sortedTopology) + + if (log.isDebugEnabled) { + log.debug(s"Successfully created execution graph from job graph " + + s"${jobGraph.getJobID} (${jobGraph.getName}).") + } + + executionGraph.setScheduleMode(jobGraph.getScheduleMode) + executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling) + + // get notified about job status changes + executionGraph.registerJobStatusListener(self) + + if (listenToEvents) { + // the sender wants to be notified about state changes + executionGraph.registerExecutionListener(sender) + executionGraph.registerJobStatusListener(sender) + } + + jobInfo.detached = detached + + log.info(s"Scheduling job ${jobGraph.getName}.") + + executionGraph.scheduleForExecution(scheduler) + + sender ! SubmissionSuccess(jobGraph.getJobID) + } + } + } catch { + case t: Throwable => + log.error(t, "Job submission failed.") + + currentJobs.get(jobGraph.getJobID) match { + case Some((executionGraph, jobInfo)) => + /* + * Register self to be notified about job status changes in case that it did not happen + * before. That way the proper cleanup of the job is triggered in the JobStatusChanged + * handler. + */ + val status = (self ? RequestFinalJobStatus(jobGraph.getJobID))(10 second) + + /* + * if we cannot register as final job status listener, then send manually a + * JobStatusChanged message with JobStatus.FAILED. + */ + val selfActorRef = self + status.onFailure{ + case _: Throwable => selfActorRef ! JobStatusChanged(executionGraph.getJobID, + JobStatus.FAILED, System.currentTimeMillis(), s"Cleanup job ${jobGraph.getJobID}.") + } + + /* + * Don't send the client the final job status because we will send him a + * SubmissionFailure. + */ + jobInfo.detached = true + + executionGraph.fail(t) + case None => + libraryCacheManager.unregisterJob(jobGraph.getJobID) + currentJobs.remove(jobGraph.getJobID) + } + + sender ! SubmissionFailure(jobGraph.getJobID, t) } } @@ -460,10 +509,10 @@ class JobManager(val configuration: Configuration) */ private def removeJob(jobID: JobID): Unit = { currentJobs.remove(jobID) match { - case Some((eg, _)) => { + case Some((eg, _)) => eg.prepareForArchiving() archive ! ArchiveExecutionGraph(jobID, eg) - } + case None => } @@ -475,7 +524,7 @@ class JobManager(val configuration: Configuration) } } - private def checkJavaVersion { + private def checkJavaVersion(): Unit = { if (System.getProperty("java.version").substring(0, 3).toDouble < 1.7) { log.warning("Warning: Flink is running with Java 6. " + "Java 6 is not maintained any more by Oracle or the OpenJDK community. " + @@ -496,21 +545,29 @@ object JobManager { def main(args: Array[String]): Unit = { EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager") + val (configuration, executionMode, listeningAddress) = parseArgs(args) - val (hostname, port, configuration, executionMode) = parseArgs(args) - val jobManagerSystem = AkkaUtils.createActorSystem(hostname, port, configuration) + val jobManagerSystem = AkkaUtils.createActorSystem(configuration, listeningAddress) startActor(Props(new JobManager(configuration) with WithWebServer))(jobManagerSystem) if(executionMode.equals(LOCAL)){ - TaskManager.startActorWithConfiguration(hostname, configuration, true)(jobManagerSystem) + TaskManager.startActorWithConfiguration("", configuration, + localAkkaCommunication = true, localTaskManagerCommunication = true)(jobManagerSystem) } jobManagerSystem.awaitTermination() } - def parseArgs(args: Array[String]): (String, Int, Configuration, ExecutionMode) = { + /** + * Loads the configuration, execution mode and the listening address from the provided command + * line arguments. + * + * @param args command line arguments + * @return triple of configuration, execution mode and an optional listening address + */ + def parseArgs(args: Array[String]): (Configuration, ExecutionMode, Option[(String, Int)]) = { val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("jobmanager") { head("flink jobmanager") opt[String]("configDir") action { (x, c) => c.copy(configDir = x) } text ("Specify " + @@ -530,28 +587,38 @@ object JobManager { config => GlobalConfiguration.loadConfiguration(config.configDir) - val configuration = GlobalConfiguration.getConfiguration() + val configuration = GlobalConfiguration.getConfiguration + if (config.configDir != null && new File(config.configDir).isDirectory) { configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, config.configDir + "/..") } - val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) - val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) + val listeningAddress = if(config.executionMode.equals(LOCAL)){ + // All communication happens within the same actor system + None + }else{ + val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) - (hostname, port, configuration, config.executionMode) + // Listening address on which the actor system listens for remote messages + Some((hostname, port)) + } + + (configuration, config.executionMode, listeningAddress) } getOrElse { LOG.error("CLI Parsing failed. Usage: " + parser.usage) sys.exit(FAILURE_RETURN_CODE) } } - def startActorSystemAndActor(hostname: String, port: Int, configuration: Configuration): - (ActorSystem, ActorRef) = { - implicit val actorSystem = AkkaUtils.createActorSystem(hostname, port, configuration) - (actorSystem, startActor(configuration)) - } - + /** + * Extracts the job manager configuration values from a configuration instance. + * + * @param configuration Object with the user provided configuration values + * @return Tuple of (number of archived jobs, profiling enabled, cleanup interval of the library + * cache manager, default number of execution retries, delay between retries) + */ def parseConfiguration(configuration: Configuration): (Int, Boolean, Long, Int, Long) = { val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT) @@ -580,7 +647,11 @@ object JobManager { } def getRemoteAkkaURL(address: String): String = { - s"akka.tcp://flink@${address}/user/${JOB_MANAGER_NAME}" + s"akka.tcp://flink@$address/user/$JOB_MANAGER_NAME" + } + + def getLocalAkkaURL: String = { + s"akka://flink/user/$JOB_MANAGER_NAME" } def getProfiler(jobManager: ActorRef)(implicit system: ActorSystem, timeout: FiniteDuration):
http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala index 89e8b54..e44f7e9 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerProfiler.scala @@ -26,19 +26,19 @@ import org.apache.flink.runtime.profiling.types.ThreadProfilingEvent import scala.collection.convert.WrapAsScala +/** + * Basic skeleton for the JobManager profiler. Currently, it simply logs the received messages. + */ class JobManagerProfiler extends Actor with ActorLogMessages with ActorLogging with WrapAsScala { override def receiveWithLogMessages: Receive = { - case ReportProfilingData(profilingContainer) => { - + case ReportProfilingData(profilingContainer) => profilingContainer.getIterator foreach { case x: InternalExecutionVertexThreadProfilingData => - log.info(s"Received InternalExecutionVertexThreadProfilingData ${x}.") + log.info(s"Received InternalExecutionVertexThreadProfilingData $x.") case x: InternalInstanceProfilingData => - log.info(s"Received InternalInstanceProfilingData ${x}.") - + log.info(s"Received InternalInstanceProfilingData $x.") case x => log.error(s"Received unknown profiling data: ${x.getClass.getName}" ) } - } } } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala index 88dc927..2d055ed 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala @@ -25,52 +25,67 @@ import org.apache.flink.runtime.jobgraph.JobID import org.apache.flink.runtime.messages.ArchiveMessages._ import org.apache.flink.runtime.messages.JobManagerMessages._ -import scala.collection.mutable.LinkedHashMap +import scala.collection.mutable import scala.ref.SoftReference -class MemoryArchivist(private val max_entries: Int) extends Actor - with ActorLogMessages with ActorLogging { - - /** +/** + * Actor which stores terminated Flink jobs. The number of stored Flink jobs is set by max_entries. + * If this number is exceeded, the oldest job will be discarded. One can interact with the actor by + * the following messages: + * + * - [[ArchiveExecutionGraph]] archives the attached [[ExecutionGraph]] + * + * - [[RequestArchivedJobs]] returns all currently stored [[ExecutionGraph]]s to the sender + * encapsulated in a [[ArchivedJobs]] message. + * + * - [[RequestJob]] returns the corresponding [[org.apache.flink.runtime.jobgraph.JobGraph]] + * encapsulated in [[JobFound]] if the job is stored by the MemoryArchivist. If not, then + * [[JobNotFound]] is returned. + * + * - [[RequestJobStatus]] returns the last state of the corresponding job. If the job can be found, + * then a [[CurrentJobStatus]] message with the last state is returned to the sender, otherwise + * a [[JobNotFound]] message is returned + * + * @param max_entries Maximum number of stored Flink jobs + */ +class MemoryArchivist(private val max_entries: Int) extends Actor with ActorLogMessages with +ActorLogging { + /* * Map of execution graphs belonging to recently started jobs with the time stamp of the last * received job event. The insert order is preserved through a LinkedHashMap. */ - val graphs = LinkedHashMap[JobID, SoftReference[ExecutionGraph]]() + val graphs = mutable.LinkedHashMap[JobID, SoftReference[ExecutionGraph]]() override def receiveWithLogMessages: Receive = { /* Receive Execution Graph to archive */ - case ArchiveExecutionGraph(jobID, graph) => { + case ArchiveExecutionGraph(jobID, graph) => // wrap graph inside a soft reference graphs.update(jobID, new SoftReference(graph)) trimHistory() - } - case RequestArchivedJobs => { - sender ! ArchivedJobs(getAllGraphs()) - } + case RequestArchivedJobs => + sender ! ArchivedJobs(getAllGraphs) - case RequestJob(jobID) => { + case RequestJob(jobID) => getGraph(jobID) match { case Some(graph) => sender ! JobFound(jobID, graph) case None => sender ! JobNotFound(jobID) } - } - case RequestJobStatus(jobID) => { + case RequestJobStatus(jobID) => getGraph(jobID) match { case Some(graph) => sender ! CurrentJobStatus(jobID, graph.getState) case None => sender ! JobNotFound(jobID) } - } } /** * Gets all graphs that have not been garbage collected. * @return An iterable with all valid ExecutionGraphs */ - protected def getAllGraphs(): Iterable[ExecutionGraph] = graphs.values.flatMap(_.get) + protected def getAllGraphs: Iterable[ExecutionGraph] = graphs.values.flatMap(_.get) /** * Gets a graph with a jobID if it has not been garbage collected. http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala index 715fc0c..62e56fe 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/WithWebServer.scala @@ -21,6 +21,9 @@ package org.apache.flink.runtime.jobmanager import akka.actor.Actor import org.apache.flink.runtime.jobmanager.web.WebInfoServer +/** + * Mixin for the [[JobManager]] which starts a [[WebInfoServer]] for the JobManager. + */ trait WithWebServer extends Actor { that: JobManager => http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala index 884bc2a..704bf86 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala @@ -37,7 +37,7 @@ object ArchiveMessages { * Response to [[RequestArchivedJobs]] message. The response contains the archived jobs. * @param jobs */ - case class ArchivedJobs(val jobs: Iterable[ExecutionGraph]){ + case class ArchivedJobs(jobs: Iterable[ExecutionGraph]){ def asJavaIterable: java.lang.Iterable[ExecutionGraph] = { import scala.collection.JavaConverters._ jobs.asJava @@ -53,7 +53,7 @@ object ArchiveMessages { // Utility methods to allow simpler case object access from Java // -------------------------------------------------------------------------- - def getRequestArchivedJobs() : AnyRef = { + def getRequestArchivedJobs : AnyRef = { RequestArchivedJobs } } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala index 51767e4..ef5b99c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala @@ -21,7 +21,7 @@ package org.apache.flink.runtime.messages import java.text.SimpleDateFormat import java.util.Date -import org.apache.flink.runtime.execution.{ExecutionState} +import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.executiongraph.ExecutionAttemptID import org.apache.flink.runtime.jobgraph.{JobStatus, JobVertexID, JobID} @@ -56,7 +56,7 @@ object ExecutionGraphMessages { "" } s"${timestampToString(timestamp)}\t$taskName(${subtaskIndex + - 1}/${totalNumberOfSubTasks}) switched to $newExecutionState $oMsg" + 1}/$totalNumberOfSubTasks) switched to $newExecutionState $oMsg" } } @@ -71,7 +71,7 @@ object ExecutionGraphMessages { case class JobStatusChanged(jobID: JobID, newJobStatus: JobStatus, timestamp: Long, optionalMessage: String){ override def toString: String = { - s"${timestampToString(timestamp)}\tJob execution switched to status ${newJobStatus}." + s"${timestampToString(timestamp)}\tJob execution switched to status $newJobStatus." } } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala index 7ce013b..5189a02 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala @@ -31,7 +31,7 @@ object JobManagerMessages { /** * Submits a job to the job manager. If [[registerForEvents]] is true, - * then the sender will be registered as listener for the state change messages. If [[detach]] + * then the sender will be registered as listener for the state change messages. If [[detached]] * is set to true, then the sender will detach from the job execution. Consequently, * he will not receive the job execution result [[JobResult]]. The submission result will be sent * back to the @@ -39,10 +39,10 @@ object JobManagerMessages { * * @param jobGraph * @param registerForEvents if true, then register for state change events - * @param detach if true, then detach from the job execution + * @param detached if true, then detach from the job execution */ case class SubmitJob(jobGraph: JobGraph, registerForEvents: Boolean = false, - detach: Boolean = false) + detached: Boolean = false) /** * Cancels a job with the given [[jobID]] at the JobManager. The result of the cancellation is @@ -330,31 +330,31 @@ object JobManagerMessages { // Utility methods to allow simpler case object access from Java // -------------------------------------------------------------------------- - def getRequestNumberRegisteredTaskManager() : AnyRef = { + def getRequestNumberRegisteredTaskManager : AnyRef = { RequestNumberRegisteredTaskManager } - def getRequestTotalNumberOfSlots() : AnyRef = { + def getRequestTotalNumberOfSlots : AnyRef = { RequestTotalNumberOfSlots } - def getRequestBlobManagerPort() : AnyRef = { + def getRequestBlobManagerPort : AnyRef = { RequestBlobManagerPort } - def getRequestRunningJobs() : AnyRef = { + def getRequestRunningJobs : AnyRef = { RequestRunningJobs } - def getRequestRegisteredTaskManagers() : AnyRef = { + def getRequestRegisteredTaskManagers : AnyRef = { RequestRegisteredTaskManagers } - def getRequestJobManagerStatus() : AnyRef = { + def getRequestJobManagerStatus : AnyRef = { RequestJobManagerStatus } - def getJobManagerStatusAlive() : AnyRef = { + def getJobManagerStatusAlive : AnyRef = { JobManagerStatusAlive } } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala index d5e7e9b..968dc46 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala @@ -134,23 +134,23 @@ object TaskManagerMessages { // Utility methods to allow simpler case object access from Java // -------------------------------------------------------------------------- - def getNotifyWhenRegisteredAtJobManagerMessage() : AnyRef = { + def getNotifyWhenRegisteredAtJobManagerMessage : AnyRef = { NotifyWhenRegisteredAtJobManager } - def getRegisteredAtJobManagerMessage() : AnyRef = { + def getRegisteredAtJobManagerMessage : AnyRef = { RegisteredAtJobManager } - def getRegisterAtJobManagerMessage() : AnyRef = { + def getRegisterAtJobManagerMessage : AnyRef = { RegisterAtJobManager } - def getSendHeartbeatMessage() : AnyRef = { + def getSendHeartbeatMessage : AnyRef = { SendHeartbeat } - def getLogMemoryUsageMessage() : AnyRef = { + def getLogMemoryUsageMessage : AnyRef = { RegisteredAtJobManager } } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 16884ca..dd158cb 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -18,36 +18,38 @@ package org.apache.flink.runtime.minicluster -import java.util.concurrent.TimeUnit - import akka.pattern.ask import akka.actor.{ActorRef, ActorSystem} -import com.typesafe.config.{ConfigFactory} +import com.typesafe.config.Config import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager import org.slf4j.LoggerFactory -import scala.concurrent.duration.FiniteDuration import scala.concurrent.{Future, Await} +/** + * Abstract base class for Flink's mini cluster. The mini cluster starts a + * [[org.apache.flink.runtime.jobmanager.JobManager]] and one or multiple + * [[org.apache.flink.runtime.taskmanager.TaskManager]]. Depending on the settings, the different + * actors can all be run in the same [[ActorSystem]] or each one in its own. + * + * @param userConfiguration Configuration object with the user provided configuration values + * @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same + * [[ActorSystem]], otherwise false + */ abstract class FlinkMiniCluster(userConfiguration: Configuration, val singleActorSystem: Boolean) { import FlinkMiniCluster._ val HOSTNAME = "localhost" - implicit val timeout = FiniteDuration(userConfiguration.getInteger(ConfigConstants - .AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS) + implicit val timeout = AkkaUtils.getTimeout(userConfiguration) val configuration = generateConfiguration(userConfiguration) - if(singleActorSystem){ - configuration.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, "akka://flink/user/jobmanager") - } - - val jobManagerActorSystem = startJobManagerActorSystem() - val jobManagerActor = startJobManager(jobManagerActorSystem) + var jobManagerActorSystem = startJobManagerActorSystem() + var jobManagerActor = startJobManager(jobManagerActorSystem) val numTaskManagers = configuration.getInteger(ConfigConstants .LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1) @@ -63,46 +65,43 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration, (actorSystem, startTaskManager(i)(actorSystem)) } - val (taskManagerActorSystems, taskManagerActors) = actorSystemsTaskManagers.unzip + var (taskManagerActorSystems, taskManagerActors) = actorSystemsTaskManagers.unzip waitForTaskManagersToBeRegistered() def generateConfiguration(userConfiguration: Configuration): Configuration def startJobManager(implicit system: ActorSystem): ActorRef - def startTaskManager(index: Int)(implicit system: ActorSystem): - ActorRef + def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef - def getJobManagerAkkaConfigString(): String = { - val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants - .DEFAULT_JOB_MANAGER_IPC_PORT) + def getJobManagerAkkaConfig: Config = { + val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) if(singleActorSystem){ - AkkaUtils.getLocalConfigString(configuration) + AkkaUtils.getAkkaConfig(configuration, None) }else{ - AkkaUtils.getConfigString(HOSTNAME, port, configuration) + AkkaUtils.getAkkaConfig(configuration, Some((HOSTNAME, port))) } - } def startJobManagerActorSystem(): ActorSystem = { - val configString = getJobManagerAkkaConfigString() - - val config = ConfigFactory.parseString(getJobManagerAkkaConfigString()) + val config = getJobManagerAkkaConfig AkkaUtils.createActorSystem(config) } - def getTaskManagerAkkaConfigString(index: Int): String = { + def getTaskManagerAkkaConfig(index: Int): Config = { val port = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT) - AkkaUtils.getConfigString(HOSTNAME, if(port != 0) port + index else port, - configuration) + val resolvedPort = if(port != 0) port + index else port + + AkkaUtils.getAkkaConfig(configuration, Some((HOSTNAME, resolvedPort))) } def startTaskManagerActorSystem(index: Int): ActorSystem = { - val config = ConfigFactory.parseString(getTaskManagerAkkaConfigString(index)) + val config = getTaskManagerAkkaConfig(index) AkkaUtils.createActorSystem(config) } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 59174f6..06d611a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -29,6 +29,16 @@ import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.util.EnvironmentInformation import org.slf4j.LoggerFactory +/** + * Local Flink mini cluster which executes all [[TaskManager]]s and the [[JobManager]] in the same + * JVM. It extends the [[FlinkMiniCluster]] by providing a [[JobClient]], having convenience + * functions to setup Flink's configuration and implementations to create [[JobManager]] and + * [[TaskManager]]. + * + * @param userConfiguration Configuration object with the user provided configuration values + * @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same + * [[ActorSystem]], otherwise false + */ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true) extends FlinkMiniCluster(userConfiguration, singleActorSystem){ import LocalFlinkMiniCluster._ @@ -36,7 +46,8 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: val jobClientActorSystem = if(singleActorSystem){ jobManagerActorSystem }else{ - AkkaUtils.createActorSystem() + // create an actor system listening on a random port + AkkaUtils.createDefaultActorSystem() } var jobClient: Option[ActorRef] = None @@ -81,7 +92,10 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: false } - TaskManager.startActorWithConfiguration(HOSTNAME, config, localExecution)(system) + TaskManager.startActorWithConfiguration(HOSTNAME, + config, + singleActorSystem, + localExecution)(system) } def getJobClient(): ActorRef ={ @@ -93,11 +107,8 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME) config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getJobManagerRPCPort) - if(singleActorSystem){ - config.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, "akka://flink/user/jobmanager") - } - - val jc = JobClient.startActorWithConfiguration(config)(jobClientActorSystem) + val jc = JobClient.startActorWithConfiguration(config, + singleActorSystem)(jobClientActorSystem) jobClient = Some(jc) jc } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/MemoryUsageLogging.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/MemoryUsageLogging.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/MemoryUsageLogging.scala deleted file mode 100644 index 05f0f9a..0000000 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/MemoryUsageLogging.scala +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager - -case class MemoryUsageLogging(logIntervalMs: Option[Int]= None) http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index ed1da80..91eac35 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -42,14 +42,15 @@ import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectio import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.io.network.netty.NettyConfig -import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, JobID} +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.memorymanager.DefaultMemoryManager import org.apache.flink.runtime.messages.JobManagerMessages.UpdateTaskExecutionState import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered, RefuseRegistration, AcknowledgeRegistration, RegisterTaskManager} import org.apache.flink.runtime.messages.TaskManagerMessages._ -import org.apache.flink.runtime.messages.TaskManagerProfilerMessages.{MonitorTask, RegisterProfilingListener, UnmonitorTask} +import org.apache.flink.runtime.messages.TaskManagerProfilerMessages +.{UnregisterProfilingListener, UnmonitorTask, MonitorTask, RegisterProfilingListener} import org.apache.flink.runtime.net.NetUtils import org.apache.flink.runtime.profiling.ProfilingUtils import org.apache.flink.runtime.util.EnvironmentInformation @@ -59,11 +60,11 @@ import org.slf4j.LoggerFactory import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.{Failure, Success} +import scala.language.postfixOps /** - * - * - * The TaskManager has the following phases: + * The TaskManager is responsible for executing the individual tasks of a Flink job. It is + * implemented as an actor. The TaskManager has the following phases: * * - Waiting to be registered with its JobManager. In that phase, it periodically sends * [[RegisterAtJobManager]] messages to itself, which trigger the sending of @@ -73,6 +74,13 @@ import scala.util.{Failure, Success} * message. This stops the registration messages and initializes all fields * that require the JobManager's actor reference * + * - [[SubmitTask]] is sent from the JobManager and contains the next Task to be executed on this + * TaskManager + * + * - [[CancelTask]] requests to cancel the corresponding task + * + * - [[FailTask]] requests to fail the corresponding task + * * - ... */ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkkaURL: String, @@ -117,7 +125,7 @@ import scala.collection.JavaConverters._ } if (log.isInfoEnabled) { - log.info(TaskManager.getMemoryUsageStatsAsString(ManagementFactory.getMemoryMXBean())) + log.info(TaskManager.getMemoryUsageStatsAsString(ManagementFactory.getMemoryMXBean)) } var libraryCacheManager: LibraryCacheManager = null @@ -184,7 +192,7 @@ import scala.collection.JavaConverters._ } override def receiveWithLogMessages: Receive = { - case RegisterAtJobManager => { + case RegisterAtJobManager => if(!registered) { registrationDuration += registrationDelay // double delay for exponential backoff @@ -196,9 +204,9 @@ import scala.collection.JavaConverters._ maxRegistrationDuration) self ! PoisonPill - } else if (!registered) { - log.info(s"Try to register at master ${jobManagerAkkaURL}. ${registrationAttempts}. " + - s"Attempt") + } else { + log.info("Try to register at master {}. {}. Attempt", jobManagerAkkaURL, + registrationAttempts) val jobManager = context.actorSelection(jobManagerAkkaURL) jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) @@ -206,25 +214,21 @@ import scala.collection.JavaConverters._ context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) } } - } - case AcknowledgeRegistration(id, blobPort) => { + case AcknowledgeRegistration(id, blobPort) => if(!registered) { - finishRegistration(id, blobPort) - registered = true + finishRegistration(sender, id, blobPort) } else { log.info("The TaskManager {} is already registered at the JobManager {}, but received " + "another AcknowledgeRegistration message.", self.path, currentJobManager.path) } - } case AlreadyRegistered(id, blobPort) => if(!registered) { log.warning("The TaskManager {} seems to be already registered at the JobManager {} even" + "though it has not yet finished the registration process.", self.path, sender.path) - finishRegistration(id, blobPort) - registered = true + finishRegistration(sender, id, blobPort) } else { // ignore AlreadyRegistered messages which arrived after AcknowledgeRegistration log.info("The TaskManager {} has already been registered at the JobManager {}.", @@ -244,17 +248,16 @@ import scala.collection.JavaConverters._ "registered") } - case SubmitTask(tdd) => { + case SubmitTask(tdd) => submitTask(tdd) - } - case UpdateTask(executionId, resultId, partitionInfo) => { + case UpdateTask(executionId, resultId, partitionInfo) => updateTask(executionId, resultId, partitionInfo) - } - case CancelTask(executionID) => { + case CancelTask(executionID) => runningTasks.get(executionID) match { case Some(task) => + // execute cancel operation concurrently Future { task.cancelExecution() } @@ -263,73 +266,74 @@ import scala.collection.JavaConverters._ sender ! new TaskOperationResult(executionID, false, "No task with that execution ID was found.") } - } - case UnregisterTask(executionID) => { + case UnregisterTask(executionID) => unregisterTask(executionID) - } - case SendHeartbeat => { + case updateMsg:UpdateTaskExecutionState => + val futureResponse = (currentJobManager ? updateMsg)(timeout) + + val jobID = updateMsg.taskExecutionState.getJobID + val executionID = updateMsg.taskExecutionState.getID + val executionState = updateMsg.taskExecutionState.getExecutionState + + futureResponse.mapTo[Boolean].onComplete{ + case Success(result) => + if(!result){ + self ! FailTask(executionID, + new IllegalStateException("Task has been disposed on JobManager.")) + } + + if (!result || executionState == ExecutionState.FINISHED || executionState == + ExecutionState.CANCELED || executionState == ExecutionState.FAILED) { + self ! UnregisterTask(executionID) + } + case Failure(t) => + log.warning(s"Execution state change notification failed for task $executionID " + + s"of job $jobID. Cause ${t.getMessage}.") + self ! UnregisterTask(executionID) + } + + case SendHeartbeat => currentJobManager ! Heartbeat(instanceID) - } - case LogMemoryUsage => { + case LogMemoryUsage => logMemoryStats() - } - case NotifyWhenRegisteredAtJobManager => { + case NotifyWhenRegisteredAtJobManager => if (registered) { sender ! RegisteredAtJobManager } else { waitForRegistration += sender } - } - case FailTask(executionID, cause) => { + case FailTask(executionID, cause) => runningTasks.get(executionID) match { case Some(task) => + // execute failing operation concurrently Future { task.failExternally(cause) } case None => } - } - case Terminated(jobManager) => { + case Terminated(jobManager) => log.info("Job manager {} is no longer reachable. Cancelling all tasks and trying to " + "reregister.", jobManager.path) cancelAndClearEverything(new Throwable("Lost connection to JobManager")) - tryJobManagerRegistration() - } - } - def notifyExecutionStateChange(jobID: JobID, executionID: ExecutionAttemptID, - executionState: ExecutionState, - optionalError: Throwable): Unit = { - log.info("Update execution state to {}.", executionState) - val futureResponse = (currentJobManager ? UpdateTaskExecutionState(new TaskExecutionState - (jobID, executionID, executionState, optionalError)))(timeout) - - futureResponse.mapTo[Boolean].onComplete { - case Success(result) => - if (!result) { - self ! FailTask(executionID, new IllegalStateException("Task has been disposed on " + - "JobManager.")) - } + cleanupTaskManager() - if (!result || executionState == ExecutionState.FINISHED || executionState == - ExecutionState.CANCELED || executionState == ExecutionState.FAILED) { - self ! UnregisterTask(executionID) - } - case Failure(t) => { - log.warning("Execution state change notification failed for task {} of job {}. Cause {}.", - executionID, jobID, t.getMessage) - self ! UnregisterTask(executionID) - } - } + tryJobManagerRegistration() } + /** + * Receives a [[TaskDeploymentDescriptor]] describing the task to be executed. Sets up a + * [[RuntimeEnvironment]] for the task and starts its execution in a separate thread. + * + * @param tdd TaskDeploymentDescriptor describing the task to be executed on this [[TaskManager]] + */ private def submitTask(tdd: TaskDeploymentDescriptor): Unit = { val jobID = tdd.getJobID val vertexID = tdd.getVertexID @@ -343,7 +347,9 @@ import scala.collection.JavaConverters._ if (log.isDebugEnabled) { startRegisteringTask = System.currentTimeMillis() } - libraryCacheManager.registerTask(jobID, executionID, tdd.getRequiredJarFiles()) + libraryCacheManager.registerTask(jobID, executionID, tdd.getRequiredJarFiles) + // triggers the download of all missing jar files from the job manager + libraryCacheManager.registerTask(jobID, executionID, tdd.getRequiredJarFiles) if (log.isDebugEnabled) { log.debug("Register task {} took {}s", executionID, @@ -357,11 +363,11 @@ import scala.collection.JavaConverters._ } task = new Task(jobID, vertexID, taskIndex, numSubtasks, executionID, - tdd.getTaskName, this) + tdd.getTaskName, self) runningTasks.put(executionID, task) match { case Some(_) => throw new RuntimeException( - s"TaskManager contains already a task with executionID ${executionID}.") + s"TaskManager contains already a task with executionID $executionID.") case None => } @@ -384,7 +390,7 @@ import scala.collection.JavaConverters._ if (jobConfig.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) { profiler match { - case Some(profiler) => profiler ! MonitorTask(task) + case Some(profilerActorRef) => profilerActorRef ! MonitorTask(task) case None => log.info("There is no profiling enabled for the task manager.") } } @@ -401,7 +407,7 @@ import scala.collection.JavaConverters._ throw new RuntimeException("Cannot start task. Task was canceled or failed.") } - sender ! TaskOperationResult(executionID, true) + sender ! TaskOperationResult(executionID, success = true) } catch { case t: Throwable => val message = if (t.isInstanceOf[CancelTaskException]) { @@ -426,12 +432,42 @@ import scala.collection.JavaConverters._ } } + private def cleanupTaskManager(): Unit = { + context.unwatch(currentJobManager) + + networkEnvironment foreach { + _.shutdown() + } + + networkEnvironment = None + + if(libraryCacheManager != null){ + libraryCacheManager.shutdown() + } + + libraryCacheManager = null + + heartbeatScheduler foreach { + _.cancel() + } + + heartbeatScheduler = None + + profiler foreach { + _.tell(UnregisterProfilingListener, JobManager.getProfiler(currentJobManager)) + } + + currentJobManager = ActorRef.noSender + instanceID = null + registered = false + } + private def updateTask(executionId: ExecutionAttemptID, resultId: IntermediateDataSetID, partitionInfo: PartitionInfo): Unit = { partitionInfo.getProducerLocation match { case PartitionInfo.PartitionLocation.UNKNOWN => - sender ! TaskOperationResult(executionId, false, + sender ! TaskOperationResult(executionId, success = false, "Tried to update task with UNKNOWN channel.") case _ => @@ -455,42 +491,45 @@ import scala.collection.JavaConverters._ } } } - sender ! TaskOperationResult(executionId, true) - case None => sender ! TaskOperationResult(executionId, false, "No reader with ID " + - resultId + " was found.") + sender ! TaskOperationResult(executionId, success = true) + case None => sender ! TaskOperationResult(executionId, success = false, + s"No reader with ID $resultId was found.") } - case None => sender ! TaskOperationResult(executionId, false, "No task with execution" + - "ID " + executionId + " was found.") + case None => sender ! TaskOperationResult(executionId, success = false, + s"No task with execution ID $executionId was found.") } } } - private def finishRegistration(id: InstanceID, blobPort: Int): Unit = { - currentJobManager = sender + private def finishRegistration(jobManager: ActorRef, id: InstanceID, blobPort: Int): Unit = { + setupTaskManager(jobManager, id, blobPort) + + for (listener <- waitForRegistration) { + listener ! RegisteredAtJobManager + } + + waitForRegistration.clear() + } + + private def setupTaskManager(jobManager: ActorRef, id: InstanceID, blobPort: Int): Unit = { + registered = true + currentJobManager = jobManager instanceID = id + // watch job manager to detect when it dies context.watch(currentJobManager) - log.info(s"TaskManager successfully registered at JobManager ${ - currentJobManager.path.toString - }.") - setupNetworkEnvironment() setupLibraryCacheManager(blobPort) + // schedule regular heartbeat message for oneself heartbeatScheduler = Some(context.system.scheduler.schedule( TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat)) profiler foreach { _.tell(RegisterProfilingListener, JobManager.getProfiler(currentJobManager)) } - - for (listener <- waitForRegistration) { - listener ! RegisteredAtJobManager - } - - waitForRegistration.clear() } private def setupNetworkEnvironment(): Unit = { @@ -514,7 +553,7 @@ import scala.collection.JavaConverters._ } private def setupLibraryCacheManager(blobPort: Int): Unit = { - // shutdown existing library cache manager + // shutdown existing library cache manager first if (libraryCacheManager != null) { try { libraryCacheManager.shutdown() @@ -525,6 +564,7 @@ import scala.collection.JavaConverters._ libraryCacheManager = null } + // Check if a blob server is specified if (blobPort > 0) { val address = new InetSocketAddress(currentJobManager.path.address.host.getOrElse ("localhost"), blobPort) @@ -543,6 +583,7 @@ import scala.collection.JavaConverters._ private def cancelAndClearEverything(cause: Throwable) { if (runningTasks.size > 0) { log.info("Cancelling all computations and discarding all cached data.") + for (t <- runningTasks.values) { t.failExternally(cause) runningTasks.remove(t.getExecutionId) @@ -552,6 +593,7 @@ import scala.collection.JavaConverters._ private def unregisterTask(executionID: ExecutionAttemptID): Unit = { log.info("Unregister task with execution ID {}.", executionID) + runningTasks.remove(executionID) match { case Some(task) => removeAllTaskResources(task) @@ -566,8 +608,8 @@ import scala.collection.JavaConverters._ private def removeAllTaskResources(task: Task): Unit = { if (task.getEnvironment != null) { try { - for (entry <- DistributedCache.readFileInfoFromConfig(task.getEnvironment - .getJobConfiguration).asScala) { + for (entry <- DistributedCache.readFileInfoFromConfig( + task.getEnvironment.getJobConfiguration).asScala) { fileCache.deleteTmpFile(entry.getKey, entry.getValue, task.getJobID) } } catch { @@ -589,8 +631,8 @@ import scala.collection.JavaConverters._ private def logMemoryStats(): Unit = { if (log.isInfoEnabled) { - val memoryMXBean = ManagementFactory.getMemoryMXBean() - val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans().asScala + val memoryMXBean = ManagementFactory.getMemoryMXBean + val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans.asScala log.info(TaskManager.getMemoryUsageStatsAsString(memoryMXBean)) log.info(TaskManager.getGarbageCollectorStatsAsString(gcMXBeans)) @@ -620,21 +662,29 @@ object TaskManager { val (hostname, port, configuration) = parseArgs(args) - val (taskManagerSystem, _) = startActorSystemAndActor(hostname, port, configuration) + val (taskManagerSystem, _) = startActorSystemAndActor(hostname, port, configuration, + localAkkaCommunication = false, localTaskManagerCommunication = false) taskManagerSystem.awaitTermination() } + /** + * Parse the command line arguments of the [[TaskManager]]. The method loads the configuration, + * extracts the hostname and port on which the actor system shall listen. + * + * @param args Command line arguments + * @return Tuple of (hostname, port, configuration) + */ def parseArgs(args: Array[String]): (String, Int, Configuration) = { val parser = new scopt.OptionParser[TaskManagerCLIConfiguration]("taskmanager") { head("flink task manager") opt[String]("configDir") action { (x, c) => c.copy(configDir = x) - } text ("Specify configuration directory.") + } text "Specify configuration directory." opt[String]("tempDir") optional() action { (x, c) => c.copy(tmpDir = x) - } text ("Specify temporary directory.") + } text "Specify temporary directory." } @@ -642,7 +692,7 @@ object TaskManager { config => GlobalConfiguration.loadConfiguration(config.configDir) - val configuration = GlobalConfiguration.getConfiguration() + val configuration = GlobalConfiguration.getConfiguration if (config.tmpDir != null && GlobalConfiguration.getString(ConfigConstants .TASK_MANAGER_TMP_DIR_KEY, @@ -650,14 +700,16 @@ object TaskManager { configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, config.tmpDir) } - val jobManagerHostname = configuration.getString(ConfigConstants - .JOB_MANAGER_IPC_ADDRESS_KEY, null) + val jobManagerHostname = configuration.getString( + ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + val jobManagerPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort) val port = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0) + // try to find out the TaskManager's own hostname by connecting to jobManagerAddress val hostname = NetUtils.resolveAddress(jobManagerAddress).getHostName (hostname, port, configuration) @@ -669,18 +721,35 @@ object TaskManager { } def startActorSystemAndActor(hostname: String, port: Int, configuration: Configuration, - localExecution: Boolean = false): (ActorSystem, ActorRef) = { - implicit val actorSystem = AkkaUtils.createActorSystem(hostname, port, configuration) + localAkkaCommunication: Boolean, + localTaskManagerCommunication: Boolean): (ActorSystem, ActorRef) = { + implicit val actorSystem = AkkaUtils.createActorSystem(configuration, Some((hostname, port))) val (connectionInfo, jobManagerURL, taskManagerConfig, networkConfig) = - parseConfiguration(hostname, configuration, localExecution) + parseConfiguration(hostname, configuration, localAkkaCommunication, + localTaskManagerCommunication) (actorSystem, startActor(connectionInfo, jobManagerURL, taskManagerConfig, networkConfig)) } + /** + * Extracts from the configuration the TaskManager's settings. Returns the TaskManager's + * connection information, the JobManager's Akka URL, the task manager configuration and the + * network connection configuration. + * + * @param hostname Hostname of the instance on which the TaskManager runs + * @param configuration Configuration instance containing the user provided configuration values + * @param localAkkaCommunication true if the TaskManager runs in the same [[ActorSystem]] as the + * JobManager, otherwise false + * @param localTaskManagerCommunication true if all TaskManager run in the same JVM, otherwise + * false + * @return Tuple of (TaskManager's connection information, JobManager's Akka URL, TaskManager's + * configuration, network connection configuration) + */ def parseConfiguration(hostname: String, configuration: Configuration, - localExecution: Boolean = false): + localAkkaCommunication: Boolean, + localTaskManagerCommunication: Boolean): (InstanceConnectionInfo, String, TaskManagerConfiguration, NetworkEnvironmentConfiguration) = { val dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT) match { @@ -690,20 +759,21 @@ object TaskManager { val connectionInfo = new InstanceConnectionInfo(InetAddress.getByName(hostname), dataport) - val jobManagerURL = configuration.getString(ConfigConstants.JOB_MANAGER_AKKA_URL, null) match { - case url: String => url - case _ => - val jobManagerAddress = configuration.getString(ConfigConstants + val jobManagerURL = if (localAkkaCommunication) { + // JobManager and TaskManager are in the same ActorSystem -> Use local Akka URL + JobManager.getLocalAkkaURL + } else { + val jobManagerAddress = configuration.getString(ConfigConstants .JOB_MANAGER_IPC_ADDRESS_KEY, null) - val jobManagerRPCPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, + val jobManagerRPCPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) - if (jobManagerAddress == null) { - throw new RuntimeException("JobManager address has not been specified in the " + - "configuration.") - } + if (jobManagerAddress == null) { + throw new RuntimeException("JobManager address has not been specified in the " + + "configuration.") + } - JobManager.getRemoteAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort) + JobManager.getRemoteAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort) } val slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) @@ -720,7 +790,7 @@ object TaskManager { ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) - val nettyConfig = localExecution match { + val nettyConfig = localTaskManagerCommunication match { case true => None case false => Some(new NettyConfig( connectionInfo.address(), connectionInfo.dataPort(), pageSize, configuration)) @@ -728,10 +798,11 @@ object TaskManager { val networkConfig = NetworkEnvironmentConfiguration(numNetworkBuffers, pageSize, nettyConfig) - val networkBufferMem = if (localExecution) 0 else numNetworkBuffers * pageSize + val networkBufferMem = if (localTaskManagerCommunication) 0 else numNetworkBuffers * pageSize - val configuredMemory: Long = configuration.getInteger(ConfigConstants - .TASK_MANAGER_MEMORY_SIZE_KEY, -1) + val configuredMemory: Long = configuration.getInteger( + ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1 + ) val memorySize = if (configuredMemory > 0) { configuredMemory << 20 @@ -745,9 +816,10 @@ object TaskManager { .toLong } - val memoryLoggingIntervalMs = configuration.getBoolean(ConfigConstants - .TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD, - ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD) match { + val memoryLoggingIntervalMs = configuration.getBoolean( + ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD, + ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD + ) match { case true => Some( configuration.getLong(ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS, ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS) @@ -755,19 +827,19 @@ object TaskManager { case false => None } - val profilingInterval = configuration.getBoolean(ProfilingUtils.ENABLE_PROFILING_KEY, - false) match { + val profilingInterval = configuration.getBoolean( + ProfilingUtils.ENABLE_PROFILING_KEY, false + ) match { case true => Some(configuration.getInteger(ProfilingUtils.TASKMANAGER_REPORTINTERVAL_KEY, ProfilingUtils.DEFAULT_TASKMANAGER_REPORTINTERVAL).toLong) case false => None } - val cleanupInterval = configuration.getLong(ConfigConstants - .LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, + val cleanupInterval = configuration.getLong( + ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000 - val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT, - ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS) + val timeout = AkkaUtils.getTimeout(configuration) val maxRegistrationDuration = Duration(configuration.getString( ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, @@ -793,10 +865,12 @@ object TaskManager { } def startActorWithConfiguration(hostname: String, configuration: Configuration, - localExecution: Boolean = false) + localAkkaCommunication: Boolean, + localTaskManagerCommunication: Boolean) (implicit system: ActorSystem) = { val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration) = - parseConfiguration(hostname, configuration, localExecution) + parseConfiguration(hostname, configuration, localAkkaCommunication, + localTaskManagerCommunication) startActor(connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration) } @@ -832,7 +906,7 @@ object TaskManager { LOG.info(f"Temporary file directory '$path': total $totalSpaceGb GB, " + f"usable $usableSpaceGb GB ($usablePercentage%.2f%% usable)") } - case (_, id) => throw new Exception(s"Temporary file directory #${id} is null.") + case (_, id) => throw new Exception(s"Temporary file directory #$id is null.") } } @@ -859,7 +933,7 @@ object TaskManager { bean => s"[${bean.getName}, GC TIME (ms): ${bean.getCollectionTime}, " + s"GC COUNT: ${bean.getCollectionCount}]" - } mkString (", ") + } mkString ", " "Garbage collector stats: " + beans } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala index 9d061d7..d2d9cf4 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala @@ -18,4 +18,10 @@ package org.apache.flink.runtime.taskmanager +/** + * Command line configuration object for the [[TaskManager]] + * + * @param configDir Path to configuration directory + * @param tmpDir Path to temporary directory + */ case class TaskManagerCLIConfiguration(configDir: String = null, tmpDir: String = null) http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala index 616fb61..1a7c31d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerProfiler.scala @@ -35,6 +35,14 @@ import org.apache.flink.runtime.profiling.impl.{EnvironmentThreadSet, InstancePr import scala.concurrent.duration.FiniteDuration +/** + * Actor which is responsible for profiling task threads on the [[TaskManager]]. The monitoring + * is triggered by the self-addressed message [[ProfileTasks]] which is scheduled to be sent + * repeatedly. + * + * @param instancePath Akka URL to [[TaskManager]] instance + * @param reportInterval Interval of profiling action + */ class TaskManagerProfiler(val instancePath: String, val reportInterval: Int) extends Actor with ActorLogMessages with ActorLogging { @@ -57,30 +65,26 @@ ActorLogMessages with ActorLogging { override def receiveWithLogMessages: Receive = { - case MonitorTask(task) => { + case MonitorTask(task) => task.registerExecutionListener(self) environments += task.getExecutionId -> task.getEnvironment - } - case UnmonitorTask(executionAttemptID) => { + case UnmonitorTask(executionAttemptID) => environments.remove(executionAttemptID) - } - case RegisterProfilingListener => { + case RegisterProfilingListener => listeners += sender if (monitoringScheduler.isEmpty) { - startMonitoring + startMonitoring() } - } - case UnregisterProfilingListener => { + case UnregisterProfilingListener => listeners -= sender if (listeners.isEmpty) { - stopMonitoring + stopMonitoring() } - } - case ProfileTasks => { + case ProfileTasks => val timestamp = System.currentTimeMillis() val profilingDataContainer = new ProfilingDataContainer() @@ -96,10 +100,9 @@ ActorLogMessages with ActorLogging { val instanceProfilingData = try { Some(instanceProfiler.generateProfilingData(timestamp)) } catch { - case e: ProfilingException => { + case e: ProfilingException => log.error(e, "Error while retrieving instance profiling data.") None - } } instanceProfilingData foreach { @@ -115,10 +118,9 @@ ActorLogMessages with ActorLogging { profilingDataContainer.clear() } } - } case ExecutionStateChanged(_, vertexID, _, _, subtaskIndex, executionID, newExecutionState, - _, _) => { + _, _) => import ExecutionState._ environments.get(executionID) match { @@ -131,14 +133,15 @@ ActorLogMessages with ActorLogging { case _ => } case None => - log.warning(s"Could not find environment for execution id ${executionID}.") + log.warning(s"Could not find environment for execution id $executionID.") } - } } def startMonitoring(): Unit = { val interval = new FiniteDuration(reportInterval, TimeUnit.MILLISECONDS) val delay = new FiniteDuration((reportInterval * Math.random()).toLong, TimeUnit.MILLISECONDS) + + // schedule ProfileTasks message to be sent repeatedly to oneself monitoringScheduler = Some(context.system.scheduler.schedule(delay, interval, self, ProfileTasks)) } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java index dc81cbe..4d10585 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java @@ -45,13 +45,13 @@ public class AllVerticesIteratorTest { ExecutionGraph eg = Mockito.mock(ExecutionGraph.class); ExecutionJobVertex ejv1 = new ExecutionJobVertex(eg, v1, 1, - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); ExecutionJobVertex ejv2 = new ExecutionJobVertex(eg, v2, 1, - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); ExecutionJobVertex ejv3 = new ExecutionJobVertex(eg, v3, 1, - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); ExecutionJobVertex ejv4 = new ExecutionJobVertex(eg, v4, 1, - AkkaUtils.DEFAULT_TIMEOUT()); + AkkaUtils.getDefaultTimeout()); AllVerticesIterator iter = new AllVerticesIterator(Arrays.asList(ejv1, ejv2, ejv3, ejv4).iterator());
