[
https://issues.apache.org/jira/browse/FLINK-1019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14250810#comment-14250810
]
ASF GitHub Bot commented on FLINK-1019:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/incubator-flink/pull/149#discussion_r22014136
--- Diff:
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
---
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager
+
+import java.io.{IOException, File}
+import java.lang.management.{GarbageCollectorMXBean, MemoryMXBean,
ManagementFactory}
+import java.net.{InetAddress, InetSocketAddress}
+import java.util
+import java.util.concurrent.{FutureTask, TimeUnit}
+
+import akka.actor._
+import akka.pattern.ask
+import org.apache.flink.api.common.cache.DistributedCache
+import org.apache.flink.configuration.{GlobalConfiguration,
ConfigConstants, Configuration}
+import org.apache.flink.core.fs.Path
+import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.blob.BlobCache
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager
+import org.apache.flink.runtime.execution.{ExecutionState,
RuntimeEnvironment}
+import
org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager,
+FallbackLibraryCacheManager, LibraryCacheManager}
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.filecache.FileCache
+import org.apache.flink.runtime.instance.{InstanceConnectionInfo,
HardwareDescription, InstanceID}
+import org.apache.flink.runtime.io.disk.iomanager.{IOManagerAsync}
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager
+import org.apache.flink.runtime.io.network.{NetworkConnectionManager,
LocalConnectionManager,
+ChannelManager}
+import org.apache.flink.runtime.jobgraph.JobID
+import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
+import
org.apache.flink.runtime.messages.JobManagerMessages.UpdateTaskExecutionState
+import
org.apache.flink.runtime.messages.RegistrationMessages.{RegisterTaskManager,
+AcknowledgeRegistration}
+import org.apache.flink.runtime.messages.TaskManagerMessages._
+import
org.apache.flink.runtime.messages.TaskManagerProfilerMessages.{UnmonitorTask,
MonitorTask,
+RegisterProfilingListener}
+import org.apache.flink.runtime.net.NetUtils
+import org.apache.flink.runtime.profiling.ProfilingUtils
+import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.flink.util.ExceptionUtils
+import org.slf4j.LoggerFactory
+
+import scala.collection.convert.{WrapAsScala, DecorateAsScala}
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.util.Failure
+import scala.util.Success
+
+class TaskManager(val connectionInfo: InstanceConnectionInfo, val
jobManagerAkkaURL: String,
+ val taskManagerConfig: TaskManagerConfiguration,
+ val networkConnectionConfig:
NetworkConnectionConfiguration)
+ extends Actor with ActorLogMessages with ActorLogging with
DecorateAsScala with WrapAsScala {
+
+ import context._
+ import taskManagerConfig.{timeout => tmTimeout, _}
+ implicit val timeout = tmTimeout
+
+ log.info(s"Starting task manager at ${self.path}.")
+
+ val REGISTRATION_DELAY = 0 seconds
+ val REGISTRATION_INTERVAL = 10 seconds
+ val MAX_REGISTRATION_ATTEMPTS = 10
+ val HEARTBEAT_INTERVAL = 5000 millisecond
+
+ TaskManager.checkTempDirs(tmpDirPaths)
+ val ioManager = new IOManagerAsync(tmpDirPaths)
+ val memoryManager = new DefaultMemoryManager(memorySize, numberOfSlots,
pageSize)
+ val bcVarManager = new BroadcastVariableManager();
+ val hardwareDescription =
HardwareDescription.extractFromSystem(memoryManager.getMemorySize)
+ val fileCache = new FileCache()
+ val runningTasks = scala.collection.mutable.HashMap[ExecutionAttemptID,
Task]()
+
+ // Actors which want to be notified once this task manager has been
registered at the job manager
+ val waitForRegistration = scala.collection.mutable.Set[ActorRef]();
+
+ val profiler = profilingInterval match {
+ case Some(interval) =>
Some(TaskManager.startProfiler(self.path.toSerializationFormat,
+ interval))
+ case None => None
+ }
+
+ var libraryCacheManager: LibraryCacheManager = null
+ var channelManager: Option[ChannelManager] = None
+ var registrationScheduler: Option[Cancellable] = None
+ var registrationAttempts: Int = 0
+ var registered: Boolean = false
+ var currentJobManager = ActorRef.noSender
+ var instanceID: InstanceID = null;
+ var memoryMXBean: Option[MemoryMXBean] = None
+ var gcMXBeans: Option[Iterable[GarbageCollectorMXBean]] = None
+ var heartbeatScheduler: Option[Cancellable] = None
+
+ if (log.isDebugEnabled) {
+ memoryLogggingIntervalMs.foreach {
+ interval =>
+ val d = FiniteDuration(interval, TimeUnit.MILLISECONDS)
+ memoryMXBean = Some(ManagementFactory.getMemoryMXBean)
+ gcMXBeans =
Some(ManagementFactory.getGarbageCollectorMXBeans.asScala)
+
+ context.system.scheduler.schedule(d, d, self, LogMemoryUsage)
+ }
+ }
+
+ override def preStart(): Unit = {
+ tryJobManagerRegistration()
+ }
+
+ override def postStop(): Unit = {
+ log.info(s"Stopping task manager ${self.path}.")
+
+ cancelAndClearEverything(new Exception("Task Manager is shutting
down."))
+
+ heartbeatScheduler foreach {
+ _.cancel()
+ }
+
+ channelManager foreach {
+ channelManager =>
+ try {
+ channelManager.shutdown()
+ } catch {
+ case t: Throwable =>
+ log.error(t, "ChannelManager did not shutdown properly.")
+ }
+ }
+
+ ioManager.shutdown()
+ memoryManager.shutdown()
+ fileCache.shutdown()
+
+ if(libraryCacheManager != null){
+ libraryCacheManager.shutdown()
+ }
+ }
+
+ def tryJobManagerRegistration(): Unit = {
+ registrationAttempts = 0
+ import context.dispatcher
+ registrationScheduler =
Some(context.system.scheduler.schedule(REGISTRATION_DELAY,
+ REGISTRATION_INTERVAL, self, RegisterAtJobManager))
+ }
+
+
+ override def receiveWithLogMessages: Receive = {
+ case RegisterAtJobManager => {
+ registrationAttempts += 1
+
+ if (registered) {
+ registrationScheduler.foreach(_.cancel())
+ } else if (registrationAttempts <= MAX_REGISTRATION_ATTEMPTS) {
+
+ log.info(s"Try to register at master ${jobManagerAkkaURL}.
${registrationAttempts}. " +
+ s"Attempt")
+ val jobManager = context.actorSelection(jobManagerAkkaURL)
+
+ jobManager ! RegisterTaskManager(connectionInfo,
hardwareDescription, numberOfSlots)
+ } else {
+ log.error("TaskManager could not register at JobManager.");
+ self ! PoisonPill
+ }
+ }
+
+ case AcknowledgeRegistration(id, blobPort) => {
+ if (!registered) {
+ registered = true
+ currentJobManager = sender
+ instanceID = id
+
+ context.watch(currentJobManager)
+
+ log.info(s"TaskManager successfully registered at JobManager ${
+ currentJobManager.path
+ .toString
+ }.")
+
+ setupChannelManager()
+ setupLibraryCacheManager(blobPort)
+
+ heartbeatScheduler =
Some(context.system.scheduler.schedule(HEARTBEAT_INTERVAL,
+ HEARTBEAT_INTERVAL, self,
+ SendHeartbeat))
+
+ profiler foreach {
+ _.tell(RegisterProfilingListener,
JobManager.getProfiler(currentJobManager))
+ }
+
+ for (listener <- waitForRegistration) {
+ listener ! RegisteredAtJobManager
+ }
+
+ waitForRegistration.clear()
+ }
+ }
+
+ case CancelTask(executionID) => {
+ runningTasks.get(executionID) match {
+ case Some(task) =>
+ Future {
+ task.cancelExecution()
+ }
+ sender ! new TaskOperationResult(executionID, true)
+ case None =>
+ sender ! new TaskOperationResult(executionID, false, "No task
with that execution ID " +
+ "was " +
+ "found.")
+ }
+ }
+
+ case SubmitTask(tdd) => {
+ val jobID = tdd.getJobID
+ val vertexID = tdd.getVertexID
+ val executionID = tdd.getExecutionId
+ val taskIndex = tdd.getIndexInSubtaskGroup
+ val numSubtasks = tdd.getCurrentNumberOfSubtasks
+ var jarsRegistered = false
+ var startRegisteringTask = 0L
+
+ try {
+ if(log.isDebugEnabled){
+ startRegisteringTask = System.currentTimeMillis()
+ }
+ libraryCacheManager.registerTask(jobID, executionID,
tdd.getRequiredJarFiles());
+
+ if(log.isDebugEnabled){
+ log.debug(s"Register task ${executionID} took
${(System.currentTimeMillis() -
+ startRegisteringTask)/1000.0}s")
+ }
+ jarsRegistered = true
+
+ val userCodeClassLoader = libraryCacheManager.getClassLoader(jobID)
+
+ if (userCodeClassLoader == null) {
+ throw new RuntimeException("No user code Classloader available.")
+ }
+
+ val task = new Task(jobID, vertexID, taskIndex, numSubtasks,
executionID,
+ tdd.getTaskName, this)
+
+ runningTasks.put(executionID, task) match {
+ case Some(_) => throw new RuntimeException(s"TaskManager
contains already a task with " +
+ s"executionID ${executionID}.")
+ case None =>
+ }
+
+ val splitProvider = new TaskInputSplitProvider(currentJobManager,
jobID, vertexID,
+ executionID, timeout)
+ val env = new RuntimeEnvironment(task, tdd, userCodeClassLoader,
memoryManager,
+ ioManager, splitProvider, currentJobManager, bcVarManager)
+
+ task.setEnvironment(env)
+
+ // register the task with the network stack and profilers
+ channelManager match {
+ case Some(cm) => cm.register(task)
+ case None => throw new RuntimeException("ChannelManager has not
been properly " +
+ "instantiated.")
+ }
+
+ val jobConfig = tdd.getJobConfiguration
+
+ if (jobConfig.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) {
+ profiler match {
+ case Some(profiler) => profiler ! MonitorTask(task)
+ case None => log.info("There is no profiling enabled for the
task manager.")
+ }
+ }
+
+ val cpTasks = new util.HashMap[String, FutureTask[Path]]()
+
+ for (entry <-
DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration)) {
+ val cp = fileCache.createTmpFile(entry.getKey, entry.getValue,
jobID)
+ cpTasks.put(entry.getKey, cp)
+ }
+ env.addCopyTasksForCacheFile(cpTasks)
+
+ if (!task.startExecution()) {
+ throw new RuntimeException("Cannot start task. Task was canceled
or failed.")
+ }
+
+ sender ! TaskOperationResult(executionID, true)
+ } catch {
+ case t: Throwable =>
--- End diff --
I think you forgot to unregister the task from the channel manager again?
> Rework RPC service
> ------------------
>
> Key: FLINK-1019
> URL: https://issues.apache.org/jira/browse/FLINK-1019
> Project: Flink
> Issue Type: Improvement
> Components: Distributed Runtime
> Reporter: Ufuk Celebi
>
> There is work going on to improve the RPC service by using [Akka|akka.io]. I
> couldn't find a issue for it.
> Could one of the two people working on it ([~StephanEwen] and [~asteriosk])
> please give an overview of the changes and a status update?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)