Repository: spark Updated Branches: refs/heads/master 4d93b653f -> 0abee534f
[SPARK-14069][SQL] Improve SparkStatusTracker to also track executor information ## What changes were proposed in this pull request? Track executor information like host and port, cache size, running tasks. TODO: tests ## How was this patch tested? N/A Author: Wenchen Fan <wenc...@databricks.com> Closes #11888 from cloud-fan/status-tracker. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0abee534 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0abee534 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0abee534 Branch: refs/heads/master Commit: 0abee534f0ad9bbe84d8d3d3478ecaa594f1e0f4 Parents: 4d93b65 Author: Wenchen Fan <wenc...@databricks.com> Authored: Thu Mar 31 12:07:19 2016 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Thu Mar 31 12:07:19 2016 -0700 ---------------------------------------------------------------------- .../org/apache/spark/SparkExecutorInfo.java | 33 ++++++++++++++++++++ .../scala/org/apache/spark/SparkContext.scala | 3 +- .../org/apache/spark/SparkStatusTracker.scala | 20 ++++++++++++ .../scala/org/apache/spark/StatusAPIImpl.scala | 33 ++++++++++++-------- .../spark/scheduler/TaskSchedulerImpl.scala | 2 ++ .../org/apache/spark/storage/StorageUtils.scala | 5 ++- 6 files changed, 80 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0abee534/core/src/main/java/org/apache/spark/SparkExecutorInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/SparkExecutorInfo.java b/core/src/main/java/org/apache/spark/SparkExecutorInfo.java new file mode 100644 index 0000000..dc3e826 --- /dev/null +++ b/core/src/main/java/org/apache/spark/SparkExecutorInfo.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; + +/** + * Exposes information about Spark Executors. + * + * This interface is not designed to be implemented outside of Spark. We may add additional methods + * which may break binary compatibility with outside implementations. + */ +public interface SparkExecutorInfo extends Serializable { + String host(); + int port(); + long cacheSize(); + int numRunningTasks(); +} http://git-wip-us.apache.org/repos/asf/spark/blob/0abee534/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index dcb41f3..d7cb253 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -147,8 +147,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli appName: String, sparkHome: String = null, jars: Seq[String] = Nil, - environment: Map[String, String] = Map()) = - { + environment: Map[String, String] = Map()) = { this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment)) } http://git-wip-us.apache.org/repos/asf/spark/blob/0abee534/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala index 34ee3a4..52c4656 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala @@ -17,6 +17,8 @@ package org.apache.spark +import org.apache.spark.scheduler.TaskSchedulerImpl + /** * Low-level status reporting APIs for monitoring job and stage progress. * @@ -104,4 +106,22 @@ class SparkStatusTracker private[spark] (sc: SparkContext) { } } } + + /** + * Returns information of all known executors, including host, port, cacheSize, numRunningTasks. + */ + def getExecutorInfos: Array[SparkExecutorInfo] = { + val executorIdToRunningTasks: Map[String, Int] = + sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors() + + sc.getExecutorStorageStatus.map { status => + val bmId = status.blockManagerId + new SparkExecutorInfoImpl( + bmId.host, + bmId.port, + status.cacheSize, + executorIdToRunningTasks.getOrElse(bmId.executorId, 0) + ) + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/0abee534/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala index e5c7c8d..c1f24a6 100644 --- a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala +++ b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala @@ -18,18 +18,25 @@ package org.apache.spark private class SparkJobInfoImpl ( - val jobId: Int, - val stageIds: Array[Int], - val status: JobExecutionStatus) - extends SparkJobInfo + val jobId: Int, + val stageIds: Array[Int], + val status: JobExecutionStatus) + extends SparkJobInfo private class SparkStageInfoImpl( - val stageId: Int, - val currentAttemptId: Int, - val submissionTime: Long, - val name: String, - val numTasks: Int, - val numActiveTasks: Int, - val numCompletedTasks: Int, - val numFailedTasks: Int) - extends SparkStageInfo + val stageId: Int, + val currentAttemptId: Int, + val submissionTime: Long, + val name: String, + val numTasks: Int, + val numActiveTasks: Int, + val numCompletedTasks: Int, + val numFailedTasks: Int) + extends SparkStageInfo + +private class SparkExecutorInfoImpl( + val host: String, + val port: Int, + val cacheSize: Long, + val numRunningTasks: Int) + extends SparkExecutorInfo http://git-wip-us.apache.org/repos/asf/spark/blob/0abee534/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index f7790fc..daed2ff 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -90,6 +90,8 @@ private[spark] class TaskSchedulerImpl( // Number of tasks running on each executor private val executorIdToTaskCount = new HashMap[String, Int] + def runningTasksByExecutors(): Map[String, Int] = executorIdToTaskCount.toMap + // The set of executors we have on each host; this is used to compute hostsAlive, which // in turn is used to decide when we can attain data locality on a given host protected val executorsByHost = new HashMap[String, HashSet[String]] http://git-wip-us.apache.org/repos/asf/spark/blob/0abee534/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 199a5fc..fb9941b 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -175,7 +175,10 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { def memRemaining: Long = maxMem - memUsed /** Return the memory used by this block manager. */ - def memUsed: Long = _nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum + def memUsed: Long = _nonRddStorageInfo._1 + cacheSize + + /** Return the memory used by caching RDDs */ + def cacheSize: Long = _rddBlocks.keys.toSeq.map(memUsedByRdd).sum /** Return the disk space used by this block manager. */ def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org