[ 
https://issues.apache.org/jira/browse/FLINK-1019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14250810#comment-14250810
 ] 

ASF GitHub Bot commented on FLINK-1019:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/149#discussion_r22014136
  
    --- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
    @@ -0,0 +1,714 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskmanager
    +
    +import java.io.{IOException, File}
    +import java.lang.management.{GarbageCollectorMXBean, MemoryMXBean, 
ManagementFactory}
    +import java.net.{InetAddress, InetSocketAddress}
    +import java.util
    +import java.util.concurrent.{FutureTask, TimeUnit}
    +
    +import akka.actor._
    +import akka.pattern.ask
    +import org.apache.flink.api.common.cache.DistributedCache
    +import org.apache.flink.configuration.{GlobalConfiguration, 
ConfigConstants, Configuration}
    +import org.apache.flink.core.fs.Path
    +import org.apache.flink.runtime.ActorLogMessages
    +import org.apache.flink.runtime.akka.AkkaUtils
    +import org.apache.flink.runtime.blob.BlobCache
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager
    +import org.apache.flink.runtime.execution.{ExecutionState, 
RuntimeEnvironment}
    +import 
org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager,
    +FallbackLibraryCacheManager, LibraryCacheManager}
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
    +import org.apache.flink.runtime.filecache.FileCache
    +import org.apache.flink.runtime.instance.{InstanceConnectionInfo, 
HardwareDescription, InstanceID}
    +import org.apache.flink.runtime.io.disk.iomanager.{IOManagerAsync}
    +import org.apache.flink.runtime.io.network.netty.NettyConnectionManager
    +import org.apache.flink.runtime.io.network.{NetworkConnectionManager, 
LocalConnectionManager,
    +ChannelManager}
    +import org.apache.flink.runtime.jobgraph.JobID
    +import org.apache.flink.runtime.jobmanager.JobManager
    +import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
    +import 
org.apache.flink.runtime.messages.JobManagerMessages.UpdateTaskExecutionState
    +import 
org.apache.flink.runtime.messages.RegistrationMessages.{RegisterTaskManager,
    +AcknowledgeRegistration}
    +import org.apache.flink.runtime.messages.TaskManagerMessages._
    +import 
org.apache.flink.runtime.messages.TaskManagerProfilerMessages.{UnmonitorTask, 
MonitorTask,
    +RegisterProfilingListener}
    +import org.apache.flink.runtime.net.NetUtils
    +import org.apache.flink.runtime.profiling.ProfilingUtils
    +import org.apache.flink.runtime.util.EnvironmentInformation
    +import org.apache.flink.util.ExceptionUtils
    +import org.slf4j.LoggerFactory
    +
    +import scala.collection.convert.{WrapAsScala, DecorateAsScala}
    +import scala.concurrent.Future
    +import scala.concurrent.duration._
    +import scala.util.Failure
    +import scala.util.Success
    +
    +class TaskManager(val connectionInfo: InstanceConnectionInfo, val 
jobManagerAkkaURL: String,
    +                  val taskManagerConfig: TaskManagerConfiguration,
    +                  val networkConnectionConfig: 
NetworkConnectionConfiguration)
    +  extends Actor with ActorLogMessages with ActorLogging with 
DecorateAsScala with WrapAsScala {
    +
    +  import context._
    +  import taskManagerConfig.{timeout => tmTimeout, _}
    +  implicit val timeout = tmTimeout
    +
    +  log.info(s"Starting task manager at ${self.path}.")
    +
    +  val REGISTRATION_DELAY = 0 seconds
    +  val REGISTRATION_INTERVAL = 10 seconds
    +  val MAX_REGISTRATION_ATTEMPTS = 10
    +  val HEARTBEAT_INTERVAL = 5000 millisecond
    +
    +  TaskManager.checkTempDirs(tmpDirPaths)
    +  val ioManager = new IOManagerAsync(tmpDirPaths)
    +  val memoryManager = new DefaultMemoryManager(memorySize, numberOfSlots, 
pageSize)
    +  val bcVarManager = new BroadcastVariableManager();
    +  val hardwareDescription = 
HardwareDescription.extractFromSystem(memoryManager.getMemorySize)
    +  val fileCache = new FileCache()
    +  val runningTasks = scala.collection.mutable.HashMap[ExecutionAttemptID, 
Task]()
    +
    +  // Actors which want to be notified once this task manager has been 
registered at the job manager
    +  val waitForRegistration = scala.collection.mutable.Set[ActorRef]();
    +
    +  val profiler = profilingInterval match {
    +    case Some(interval) => 
Some(TaskManager.startProfiler(self.path.toSerializationFormat,
    +      interval))
    +    case None => None
    +  }
    +
    +  var libraryCacheManager: LibraryCacheManager = null
    +  var channelManager: Option[ChannelManager] = None
    +  var registrationScheduler: Option[Cancellable] = None
    +  var registrationAttempts: Int = 0
    +  var registered: Boolean = false
    +  var currentJobManager = ActorRef.noSender
    +  var instanceID: InstanceID = null;
    +  var memoryMXBean: Option[MemoryMXBean] = None
    +  var gcMXBeans: Option[Iterable[GarbageCollectorMXBean]] = None
    +  var heartbeatScheduler: Option[Cancellable] = None
    +
    +  if (log.isDebugEnabled) {
    +    memoryLogggingIntervalMs.foreach {
    +      interval =>
    +        val d = FiniteDuration(interval, TimeUnit.MILLISECONDS)
    +        memoryMXBean = Some(ManagementFactory.getMemoryMXBean)
    +        gcMXBeans = 
Some(ManagementFactory.getGarbageCollectorMXBeans.asScala)
    +
    +        context.system.scheduler.schedule(d, d, self, LogMemoryUsage)
    +    }
    +  }
    +
    +  override def preStart(): Unit = {
    +    tryJobManagerRegistration()
    +  }
    +
    +  override def postStop(): Unit = {
    +    log.info(s"Stopping task manager ${self.path}.")
    +
    +    cancelAndClearEverything(new Exception("Task Manager is shutting 
down."))
    +
    +    heartbeatScheduler foreach {
    +      _.cancel()
    +    }
    +
    +    channelManager foreach {
    +      channelManager =>
    +        try {
    +          channelManager.shutdown()
    +        } catch {
    +          case t: Throwable =>
    +            log.error(t, "ChannelManager did not shutdown properly.")
    +        }
    +    }
    +
    +    ioManager.shutdown()
    +    memoryManager.shutdown()
    +    fileCache.shutdown()
    +
    +    if(libraryCacheManager != null){
    +      libraryCacheManager.shutdown()
    +    }
    +  }
    +
    +  def tryJobManagerRegistration(): Unit = {
    +    registrationAttempts = 0
    +    import context.dispatcher
    +    registrationScheduler = 
Some(context.system.scheduler.schedule(REGISTRATION_DELAY,
    +      REGISTRATION_INTERVAL, self, RegisterAtJobManager))
    +  }
    +
    +
    +  override def receiveWithLogMessages: Receive = {
    +    case RegisterAtJobManager => {
    +      registrationAttempts += 1
    +
    +      if (registered) {
    +        registrationScheduler.foreach(_.cancel())
    +      } else if (registrationAttempts <= MAX_REGISTRATION_ATTEMPTS) {
    +
    +        log.info(s"Try to register at master ${jobManagerAkkaURL}. 
${registrationAttempts}. " +
    +          s"Attempt")
    +        val jobManager = context.actorSelection(jobManagerAkkaURL)
    +
    +        jobManager ! RegisterTaskManager(connectionInfo, 
hardwareDescription, numberOfSlots)
    +      } else {
    +        log.error("TaskManager could not register at JobManager.");
    +        self ! PoisonPill
    +      }
    +    }
    +
    +    case AcknowledgeRegistration(id, blobPort) => {
    +      if (!registered) {
    +        registered = true
    +        currentJobManager = sender
    +        instanceID = id
    +
    +        context.watch(currentJobManager)
    +
    +        log.info(s"TaskManager successfully registered at JobManager ${
    +          currentJobManager.path
    +            .toString
    +        }.")
    +
    +        setupChannelManager()
    +        setupLibraryCacheManager(blobPort)
    +
    +        heartbeatScheduler = 
Some(context.system.scheduler.schedule(HEARTBEAT_INTERVAL,
    +          HEARTBEAT_INTERVAL, self,
    +          SendHeartbeat))
    +
    +        profiler foreach {
    +          _.tell(RegisterProfilingListener, 
JobManager.getProfiler(currentJobManager))
    +        }
    +
    +        for (listener <- waitForRegistration) {
    +          listener ! RegisteredAtJobManager
    +        }
    +
    +        waitForRegistration.clear()
    +      }
    +    }
    +
    +    case CancelTask(executionID) => {
    +      runningTasks.get(executionID) match {
    +        case Some(task) =>
    +          Future {
    +            task.cancelExecution()
    +          }
    +          sender ! new TaskOperationResult(executionID, true)
    +        case None =>
    +          sender ! new TaskOperationResult(executionID, false, "No task 
with that execution ID " +
    +            "was " +
    +            "found.")
    +      }
    +    }
    +
    +    case SubmitTask(tdd) => {
    +      val jobID = tdd.getJobID
    +      val vertexID = tdd.getVertexID
    +      val executionID = tdd.getExecutionId
    +      val taskIndex = tdd.getIndexInSubtaskGroup
    +      val numSubtasks = tdd.getCurrentNumberOfSubtasks
    +      var jarsRegistered = false
    +      var startRegisteringTask = 0L
    +
    +      try {
    +        if(log.isDebugEnabled){
    +          startRegisteringTask = System.currentTimeMillis()
    +        }
    +        libraryCacheManager.registerTask(jobID, executionID, 
tdd.getRequiredJarFiles());
    +
    +        if(log.isDebugEnabled){
    +          log.debug(s"Register task ${executionID} took 
${(System.currentTimeMillis() -
    +            startRegisteringTask)/1000.0}s")
    +        }
    +        jarsRegistered = true
    +
    +        val userCodeClassLoader = libraryCacheManager.getClassLoader(jobID)
    +
    +        if (userCodeClassLoader == null) {
    +          throw new RuntimeException("No user code Classloader available.")
    +        }
    +
    +        val task = new Task(jobID, vertexID, taskIndex, numSubtasks, 
executionID,
    +          tdd.getTaskName, this)
    +
    +        runningTasks.put(executionID, task) match {
    +          case Some(_) => throw new RuntimeException(s"TaskManager 
contains already a task with " +
    +            s"executionID ${executionID}.")
    +          case None =>
    +        }
    +
    +        val splitProvider = new TaskInputSplitProvider(currentJobManager, 
jobID, vertexID,
    +          executionID, timeout)
    +        val env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, 
memoryManager,
    +          ioManager, splitProvider, currentJobManager, bcVarManager)
    +
    +        task.setEnvironment(env)
    +
    +        // register the task with the network stack and profilers
    +        channelManager match {
    +          case Some(cm) => cm.register(task)
    +          case None => throw new RuntimeException("ChannelManager has not 
been properly " +
    +            "instantiated.")
    +        }
    +
    +        val jobConfig = tdd.getJobConfiguration
    +
    +        if (jobConfig.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) {
    +          profiler match {
    +            case Some(profiler) => profiler ! MonitorTask(task)
    +            case None => log.info("There is no profiling enabled for the 
task manager.")
    +          }
    +        }
    +
    +        val cpTasks = new util.HashMap[String, FutureTask[Path]]()
    +
    +        for (entry <- 
DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration)) {
    +          val cp = fileCache.createTmpFile(entry.getKey, entry.getValue, 
jobID)
    +          cpTasks.put(entry.getKey, cp)
    +        }
    +        env.addCopyTasksForCacheFile(cpTasks)
    +
    +        if (!task.startExecution()) {
    +          throw new RuntimeException("Cannot start task. Task was canceled 
or failed.")
    +        }
    +
    +        sender ! TaskOperationResult(executionID, true)
    +      } catch {
    +        case t: Throwable =>
    --- End diff --
    
    I think you forgot to unregister the task from the channel manager again?


> Rework RPC service
> ------------------
>
>                 Key: FLINK-1019
>                 URL: https://issues.apache.org/jira/browse/FLINK-1019
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Runtime
>            Reporter: Ufuk Celebi
>
> There is work going on to improve the RPC service by using [Akka|akka.io]. I 
> couldn't find a issue for it.
> Could one of the two people working on it ([~StephanEwen] and [~asteriosk]) 
> please give an overview of the changes and a status update?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to