[
https://issues.apache.org/jira/browse/FLINK-1019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14237636#comment-14237636
]
ASF GitHub Bot commented on FLINK-1019:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/incubator-flink/pull/149#discussion_r21440673
--- Diff:
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
---
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager
+
+import java.io.File
+import java.net.{InetSocketAddress}
+import java.util.concurrent.TimeUnit
+
+import akka.actor._
+import akka.pattern.Patterns
+import akka.pattern.{ask, pipe}
+import org.apache.flink.configuration.{ConfigConstants,
GlobalConfiguration, Configuration}
+import org.apache.flink.core.io.InputSplitAssigner
+import org.apache.flink.runtime.blob.BlobServer
+import org.apache.flink.runtime.executiongraph.{Execution,
ExecutionJobVertex, ExecutionGraph}
+import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse
+import
org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
+import
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
+import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.{JobException, ActorLogMessages}
+import org.apache.flink.runtime.akka.AkkaUtils
+import
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.instance.{InstanceManager}
+import org.apache.flink.runtime.jobgraph.{JobStatus, JobID}
+import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
+import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler =>
FlinkScheduler}
+import org.apache.flink.runtime.messages.JobManagerMessages._
+import org.apache.flink.runtime.messages.RegistrationMessages._
+import
org.apache.flink.runtime.messages.TaskManagerMessages.{NextInputSplit,
Heartbeat}
+import org.apache.flink.runtime.profiling.ProfilingUtils
+import org.slf4j.LoggerFactory
+
+import scala.collection.convert.WrapAsScala
+import scala.concurrent.{Future}
+import scala.concurrent.duration._
+
+class JobManager(val configuration: Configuration) extends
+Actor with ActorLogMessages with ActorLogging with WrapAsScala {
+ import context._
+ implicit val timeout =
FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
+ ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
+
+ Execution.timeout = timeout;
+
+ log.info("Starting job manager.")
+
+ val (archiveCount,
+ profiling,
+ cleanupInterval,
+ defaultExecutionRetries,
+ delayBetweenRetries) = JobManager.parseConfiguration(configuration)
+
+ // Props for the profiler actor
+ def profilerProps: Props = Props(classOf[JobManagerProfiler])
+
+ // Props for the archive actor
+ def archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
+
+ val profiler = profiling match {
+ case true => Some(context.actorOf(profilerProps,
JobManager.PROFILER_NAME))
+ case false => None
+ }
+
+ val archive = context.actorOf(archiveProps, JobManager.ARCHIVE_NAME)
+
+ val accumulatorManager = new AccumulatorManager(Math.min(1,
archiveCount))
+ val instanceManager = new InstanceManager()
+ val scheduler = new FlinkScheduler()
+ val libraryCacheManager = new BlobLibraryCacheManager(new BlobServer(),
cleanupInterval)
+
+ // List of current jobs running
+ val currentJobs = scala.collection.mutable.HashMap[JobID,
(ExecutionGraph, JobInfo)]()
+
+ // Map of actors which want to be notified once a specific job terminates
+ val finalJobStatusListener = scala.collection.mutable.HashMap[JobID,
Set[ActorRef]]()
+
+ instanceManager.addInstanceListener(scheduler)
+
+ log.info(s"Started job manager. Waiting for incoming messages.")
+
+ override def postStop(): Unit = {
+ log.info(s"Stopping job manager ${self.path}.")
+ instanceManager.shutdown()
+ scheduler.shutdown()
+ libraryCacheManager.shutdown()
+ }
+
+ override def receiveWithLogMessages: Receive = {
+ case RegisterTaskManager(connectionInfo, hardwareInformation,
numberOfSlots) => {
+ val taskManager = sender()
+ val instanceID = instanceManager.registerTaskManager(taskManager,
connectionInfo,
+ hardwareInformation, numberOfSlots)
+
+ // to be notified when the taskManager is no longer reachable
+// context.watch(taskManager);
+
+ taskManager ! AcknowledgeRegistration(instanceID,
libraryCacheManager.getBlobServerPort)
+ }
+
+ case RequestNumberRegisteredTaskManager => {
+ sender() ! instanceManager.getNumberOfRegisteredTaskManagers
+ }
+
+ case RequestTotalNumberOfSlots => {
+ sender() ! instanceManager.getTotalNumberOfSlots
+ }
+
+ case SubmitJob(jobGraph, listenToEvents, detach) => {
+ try {
+ if (jobGraph == null) {
+ sender() ! akka.actor.Status.Failure(new
IllegalArgumentException("JobGraph must not be" +
+ " null."))
+ } else {
+
+ log.info(s"Received job ${jobGraph.getJobID}
(${jobGraph.getName}}).")
+
+ // Create the user code class loader
+ libraryCacheManager.registerJob(jobGraph.getJobID,
jobGraph.getUserJarBlobKeys)
+
+ val (executionGraph, jobInfo) =
currentJobs.getOrElseUpdate(jobGraph.getJobID(),
+ (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName,
+ jobGraph.getJobConfiguration, jobGraph.getUserJarBlobKeys),
JobInfo(sender(),
+ System.currentTimeMillis())))
+
+ val jobNumberRetries = if(jobGraph.getNumberOfExecutionRetries
>= 0){
+ jobGraph.getNumberOfExecutionRetries
+ }else{
+ defaultExecutionRetries
+ }
+
+ executionGraph.setNumberOfRetriesLeft(jobNumberRetries)
+ executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
+
+ val userCodeLoader =
libraryCacheManager.getClassLoader(jobGraph.getJobID)
+
+ if (userCodeLoader == null) {
+ throw new JobException("The user code class loader could not
be initialized.")
+ }
+
+ log.debug(s"Running master initialization of job
${jobGraph.getJobID} (${jobGraph
+ .getName}).")
+
+ for (vertex <- jobGraph.getVertices) {
+ val executableClass = vertex.getInvokableClassName
+ if (executableClass == null || executableClass.length == 0) {
+ throw new JobException(s"The vertex ${vertex.getID}
(${vertex.getName}) has no " +
+ s"invokable class.")
+ }
+
+ vertex.initializeOnMaster(userCodeLoader)
+ }
+
+ // topological sorting of the job vertices
+ val sortedTopology =
jobGraph.getVerticesSortedTopologicallyFromSources
+
+ log.debug(s"Adding ${sortedTopology.size()} vertices from job
graph ${jobGraph
+ .getJobID} (${jobGraph.getName}).")
+
+ executionGraph.attachJobGraph(sortedTopology)
+
+ log.debug(s"Successfully created execution graph from job graph
${jobGraph.getJobID} " +
+ s"(${jobGraph.getName}).")
+
+
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
+
+ // get notified about job status changes
+ executionGraph.registerJobStatusListener(self)
+
+ if(listenToEvents){
+ // the sender will be notified about state changes
+ executionGraph.registerExecutionListener(sender())
+ executionGraph.registerJobStatusListener(sender())
+ }
+
+ jobInfo.detach = detach
+
+ log.info(s"Scheduling job ${jobGraph.getName}.")
+
+ executionGraph.scheduleForExecution(scheduler)
+
+ sender() ! SubmissionSuccess(jobGraph.getJobID)
+ }
+ } catch {
+ case t: Throwable =>
+ log.error(t, "Job submission failed.")
+
+ currentJobs.get(jobGraph.getJobID) match {
+ case Some((executionGraph, jobInfo)) =>
+ executionGraph.fail(t)
+
+ // don't send the client the final job status because we
already send him
+ // SubmissionFailure
+ jobInfo.detach = true
+
+ val status = Patterns.ask(self,
RequestFinalJobStatus(jobGraph.getJobID), 10 second)
+ status.onFailure{
+ case _: Throwable => self !
JobStatusChanged(executionGraph.getJobID,
+ JobStatus.FAILED, System.currentTimeMillis(),
+ s"Cleanup job ${jobGraph.getJobID}.")
+ }
+ case None =>
+ libraryCacheManager.unregisterJob(jobGraph.getJobID)
+ currentJobs.remove(jobGraph.getJobID)
+
+ }
+
+ sender() ! SubmissionFailure(jobGraph.getJobID, t)
+ }
+ }
+
+ case CancelJob(jobID) => {
+ log.info(s"Trying to cancel job with ID ${jobID}.")
+
+ currentJobs.get(jobID) match {
+ case Some((executionGraph, _)) =>
+ Future {
+ executionGraph.cancel()
+ }
+ sender() ! CancellationSuccess(jobID)
+ case None =>
+ log.info(s"No job found with ID ${jobID}.")
+ sender() ! CancellationFailure(jobID, new
IllegalArgumentException(s"No job found with " +
+ s"ID ${jobID}."))
+ }
+ }
+
+ case UpdateTaskExecutionState(taskExecutionState) => {
+ if(taskExecutionState == null){
+ sender() ! false
+ }else {
+ currentJobs.get(taskExecutionState.getJobID) match {
+ case Some((executionGraph, _)) =>
+ sender() ! executionGraph.updateState(taskExecutionState)
+ case None => log.error(s"Cannot find execution graph for ID
${taskExecutionState
+ .getJobID} to change state to
${taskExecutionState.getExecutionState}.")
+ sender() ! false
+ }
+ }
+ }
+
+ case RequestNextInputSplit(jobID, vertexID) => {
+ val nextInputSplit = currentJobs.get(jobID) match {
+ case Some((executionGraph,_)) =>
executionGraph.getJobVertex(vertexID) match {
+ case vertex: ExecutionJobVertex => vertex.getSplitAssigner match
{
+ case splitAssigner: InputSplitAssigner =>
splitAssigner.getNextInputSplit(null)
--- End diff --
Good point, I probably just copied the old bug.
> Rework RPC service
> ------------------
>
> Key: FLINK-1019
> URL: https://issues.apache.org/jira/browse/FLINK-1019
> Project: Flink
> Issue Type: Improvement
> Components: Distributed Runtime
> Reporter: Ufuk Celebi
>
> There is work going on to improve the RPC service by using [Akka|akka.io]. I
> couldn't find a issue for it.
> Could one of the two people working on it ([~StephanEwen] and [~asteriosk])
> please give an overview of the changes and a status update?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)