Repository: spark
Updated Branches:
  refs/heads/master 6151f29f9 -> fe2b7a456


[SPARK-23285][K8S] Add a config property for specifying physical executor cores

## What changes were proposed in this pull request?

As mentioned in SPARK-23285, this PR introduces a new configuration property 
`spark.kubernetes.executor.cores` for specifying the physical CPU cores 
requested for each executor pod. This is to avoid changing the semantics of 
`spark.executor.cores` and `spark.task.cpus` and their role in task scheduling, 
task parallelism, dynamic resource allocation, etc. The new configuration 
property only determines the physical CPU cores available to an executor. An 
executor can still run multiple tasks simultaneously by using appropriate 
values for `spark.executor.cores` and `spark.task.cpus`.

## How was this patch tested?

Unit tests.

felixcheung srowen jiangxb1987 jerryshao mccheah foxish

Author: Yinan Li <y...@google.com>
Author: Yinan Li <liyinan...@gmail.com>

Closes #20553 from liyinan926/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe2b7a45
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe2b7a45
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe2b7a45

Branch: refs/heads/master
Commit: fe2b7a4568d65a62da6e6eb00fff05f248b4332c
Parents: 6151f29
Author: Yinan Li <y...@google.com>
Authored: Mon Apr 2 12:20:55 2018 -0700
Committer: Anirudh Ramanathan <ramanath...@google.com>
Committed: Mon Apr 2 12:20:55 2018 -0700

----------------------------------------------------------------------
 docs/running-on-kubernetes.md                   | 15 ++++++++---
 .../org/apache/spark/deploy/k8s/Config.scala    |  6 +++++
 .../cluster/k8s/ExecutorPodFactory.scala        | 12 ++++++---
 .../cluster/k8s/ExecutorPodFactorySuite.scala   | 27 ++++++++++++++++++++
 4 files changed, 53 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fe2b7a45/docs/running-on-kubernetes.md
----------------------------------------------------------------------
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 975b28d..9c46449 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -549,14 +549,23 @@ specific to Spark on Kubernetes.
   <td><code>spark.kubernetes.driver.limit.cores</code></td>
   <td>(none)</td>
   <td>
-    Specify the hard CPU 
[limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container)
 for the driver pod.
+    Specify a hard cpu 
[limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container)
 for the driver pod.
   </td>
 </tr>
 <tr>
+  <td><code>spark.kubernetes.executor.request.cores</code></td>
+  <td>(none)</td>
+  <td>
+    Specify the cpu request for each executor pod. Values conform to the 
Kubernetes 
[convention](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu).
 
+    Example values include 0.1, 500m, 1.5, 5, etc., with the definition of cpu 
units documented in [CPU 
units](https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#cpu-units).
   
+    This is distinct from <code>spark.executor.cores</code>: it is only used 
and takes precedence over <code>spark.executor.cores</code> for specifying the 
executor pod cpu request if set. Task 
+    parallelism, e.g., number of tasks an executor can run concurrently is not 
affected by this.
+</tr>
+<tr>
   <td><code>spark.kubernetes.executor.limit.cores</code></td>
   <td>(none)</td>
   <td>
-    Specify the hard CPU 
[limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container)
 for each executor pod launched for the Spark Application.
+    Specify a hard cpu 
[limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container)
 for each executor pod launched for the Spark Application.
   </td>
 </tr>
 <tr>
@@ -593,4 +602,4 @@ specific to Spark on Kubernetes.
    <code>spark.kubernetes.executor.secrets.spark-secret=/etc/secrets</code>.
   </td>
 </tr>
-</table>
\ No newline at end of file
+</table>

http://git-wip-us.apache.org/repos/asf/spark/blob/fe2b7a45/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index da34a7e..405ea47 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -91,6 +91,12 @@ private[spark] object Config extends Logging {
       .stringConf
       .createOptional
 
+  val KUBERNETES_EXECUTOR_REQUEST_CORES =
+    ConfigBuilder("spark.kubernetes.executor.request.cores")
+      .doc("Specify the cpu request for each executor pod")
+      .stringConf
+      .createOptional
+
   val KUBERNETES_DRIVER_POD_NAME =
     ConfigBuilder("spark.kubernetes.driver.pod.name")
       .doc("Name of the driver pod.")

http://git-wip-us.apache.org/repos/asf/spark/blob/fe2b7a45/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
index ac42385..7143f7a 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
@@ -83,7 +83,12 @@ private[spark] class ExecutorPodFactory(
       MEMORY_OVERHEAD_MIN_MIB))
   private val executorMemoryWithOverhead = executorMemoryMiB + 
memoryOverheadMiB
 
-  private val executorCores = sparkConf.getDouble("spark.executor.cores", 1)
+  private val executorCores = sparkConf.getInt("spark.executor.cores", 1)
+  private val executorCoresRequest = if 
(sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) {
+    sparkConf.get(KUBERNETES_EXECUTOR_REQUEST_CORES).get
+  } else {
+    executorCores.toString
+  }
   private val executorLimitCores = 
sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
 
   /**
@@ -111,7 +116,7 @@ private[spark] class ExecutorPodFactory(
       .withAmount(s"${executorMemoryWithOverhead}Mi")
       .build()
     val executorCpuQuantity = new QuantityBuilder(false)
-      .withAmount(executorCores.toString)
+      .withAmount(executorCoresRequest)
       .build()
     val executorExtraClasspathEnv = executorExtraClasspath.map { cp =>
       new EnvVarBuilder()
@@ -130,8 +135,7 @@ private[spark] class ExecutorPodFactory(
       }.getOrElse(Seq.empty[EnvVar])
     val executorEnv = (Seq(
       (ENV_DRIVER_URL, driverUrl),
-      // Executor backend expects integral value for executor cores, so round 
it up to an int.
-      (ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString),
+      (ENV_EXECUTOR_CORES, executorCores.toString),
       (ENV_EXECUTOR_MEMORY, executorMemoryString),
       (ENV_APPLICATION_ID, applicationId),
       // This is to set the SPARK_CONF_DIR to be /opt/spark/conf

http://git-wip-us.apache.org/repos/asf/spark/blob/fe2b7a45/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
index cee8fe2..a71a2a1 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
@@ -85,6 +85,33 @@ class ExecutorPodFactorySuite extends SparkFunSuite with 
BeforeAndAfter with Bef
     checkOwnerReferences(executor, driverPodUid)
   }
 
+  test("executor core request specification") {
+    var factory = new ExecutorPodFactory(baseConf, None)
+    var executor = factory.createExecutorPod(
+      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, 
Int]())
+    assert(executor.getSpec.getContainers.size() === 1)
+    
assert(executor.getSpec.getContainers.get(0).getResources.getRequests.get("cpu").getAmount
+      === "1")
+
+    val conf = baseConf.clone()
+
+    conf.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "0.1")
+    factory = new ExecutorPodFactory(conf, None)
+    executor = factory.createExecutorPod(
+      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, 
Int]())
+    assert(executor.getSpec.getContainers.size() === 1)
+    
assert(executor.getSpec.getContainers.get(0).getResources.getRequests.get("cpu").getAmount
+      === "0.1")
+
+    conf.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "100m")
+    factory = new ExecutorPodFactory(conf, None)
+    conf.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "100m")
+    executor = factory.createExecutorPod(
+      "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, 
Int]())
+    
assert(executor.getSpec.getContainers.get(0).getResources.getRequests.get("cpu").getAmount
+      === "100m")
+  }
+
   test("executor pod hostnames get truncated to 63 characters") {
     val conf = baseConf.clone()
     conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to