Repository: spark
Updated Branches:
  refs/heads/master 78981efc2 -> 1cfda4482


[SPARK-25021][K8S] Add spark.executor.pyspark.memory limit for K8S

## What changes were proposed in this pull request?

Add spark.executor.pyspark.memory limit for K8S

## How was this patch tested?

Unit and Integration tests

Closes #22298 from ifilonenko/SPARK-25021.

Authored-by: Ilan Filonenko <i...@cornell.edu>
Signed-off-by: Holden Karau <hol...@pigscanfly.ca>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1cfda448
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1cfda448
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1cfda448

Branch: refs/heads/master
Commit: 1cfda448255d5b4a0df88148e0f6acd88aa6e318
Parents: 78981ef
Author: Ilan Filonenko <i...@cornell.edu>
Authored: Sat Sep 8 22:18:06 2018 -0700
Committer: Holden Karau <hol...@pigscanfly.ca>
Committed: Sat Sep 8 22:18:06 2018 -0700

----------------------------------------------------------------------
 dev/make-distribution.sh                        |  1 +
 docs/configuration.md                           |  2 +-
 examples/src/main/python/py_container_checks.py | 32 -------------
 examples/src/main/python/pyfiles.py             | 38 ----------------
 .../org/apache/spark/deploy/k8s/Config.scala    |  7 +++
 .../k8s/features/BasicExecutorFeatureStep.scala | 14 +++++-
 .../bindings/JavaDriverFeatureStep.scala        |  4 +-
 .../bindings/PythonDriverFeatureStep.scala      |  4 +-
 .../features/bindings/RDriverFeatureStep.scala  |  4 +-
 .../features/BasicDriverFeatureStepSuite.scala  |  1 -
 .../BasicExecutorFeatureStepSuite.scala         | 24 ++++++++++
 .../bindings/JavaDriverFeatureStepSuite.scala   |  1 -
 .../src/main/dockerfiles/spark/Dockerfile       |  1 +
 .../dockerfiles/spark/bindings/R/Dockerfile     |  2 +-
 .../spark/bindings/python/Dockerfile            |  2 +-
 .../k8s/integrationtest/KubernetesSuite.scala   | 33 ++++++++++++++
 .../k8s/integrationtest/PythonTestsSuite.scala  | 34 +++++++++++---
 .../k8s/integrationtest/SecretsTestsSuite.scala |  1 +
 .../tests/py_container_checks.py                | 32 +++++++++++++
 .../integration-tests/tests/pyfiles.py          | 38 ++++++++++++++++
 .../tests/worker_memory_check.py                | 47 ++++++++++++++++++++
 21 files changed, 236 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1cfda448/dev/make-distribution.sh
----------------------------------------------------------------------
diff --git a/dev/make-distribution.sh b/dev/make-distribution.sh
index ad99ce5..778d376 100755
--- a/dev/make-distribution.sh
+++ b/dev/make-distribution.sh
@@ -192,6 +192,7 @@ fi
 if [ -d "$SPARK_HOME"/resource-managers/kubernetes/core/target/ ]; then
   mkdir -p "$DISTDIR/kubernetes/"
   cp -a "$SPARK_HOME"/resource-managers/kubernetes/docker/src/main/dockerfiles 
"$DISTDIR/kubernetes/"
+  cp -a "$SPARK_HOME"/resource-managers/kubernetes/integration-tests/tests 
"$DISTDIR/kubernetes/"
 fi
 
 # Copy examples and dependencies

http://git-wip-us.apache.org/repos/asf/spark/blob/1cfda448/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index f344bcd..3a8d567 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -188,7 +188,7 @@ of the most common options to set are:
     unless otherwise specified.  If set, PySpark memory for an executor will be
     limited to this amount. If not set, Spark will not limit Python's memory 
use
     and it is up to the application to avoid exceeding the overhead memory 
space
-    shared with other non-JVM processes. When PySpark is run in YARN, this 
memory
+    shared with other non-JVM processes. When PySpark is run in YARN or 
Kubernetes, this memory
     is added to executor resource requests.
   </td>
 </tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/1cfda448/examples/src/main/python/py_container_checks.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/py_container_checks.py 
b/examples/src/main/python/py_container_checks.py
deleted file mode 100644
index f6b3be2..0000000
--- a/examples/src/main/python/py_container_checks.py
+++ /dev/null
@@ -1,32 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import os
-import sys
-
-
-def version_check(python_env, major_python_version):
-    """
-        These are various tests to test the Python container image.
-        This file will be distributed via --py-files in the e2e tests.
-    """
-    env_version = os.environ.get('PYSPARK_PYTHON')
-    print("Python runtime version check is: " +
-          str(sys.version_info[0] == major_python_version))
-
-    print("Python environment version check is: " +
-          str(env_version == python_env))

http://git-wip-us.apache.org/repos/asf/spark/blob/1cfda448/examples/src/main/python/pyfiles.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/pyfiles.py 
b/examples/src/main/python/pyfiles.py
deleted file mode 100644
index 4193654..0000000
--- a/examples/src/main/python/pyfiles.py
+++ /dev/null
@@ -1,38 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import print_function
-
-import sys
-
-from pyspark.sql import SparkSession
-
-
-if __name__ == "__main__":
-    """
-        Usage: pyfiles [major_python_version]
-    """
-    spark = SparkSession \
-        .builder \
-        .appName("PyFilesTest") \
-        .getOrCreate()
-
-    from py_container_checks import version_check
-    # Begin of Python container checks
-    version_check(sys.argv[1], 2 if sys.argv[1] == "python" else 3)
-
-    spark.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/1cfda448/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index c5f4d6c..71e4d32 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -225,6 +225,13 @@ private[spark] object Config extends Logging {
         "Ensure that major Python version is either Python2 or Python3")
       .createWithDefault("2")
 
+  val APP_RESOURCE_TYPE =
+    ConfigBuilder("spark.kubernetes.resource.type")
+      .doc("This sets the resource type internally")
+      .internal()
+      .stringConf
+      .createOptional
+
   val KUBERNETES_LOCAL_DIRS_TMPFS =
     ConfigBuilder("spark.kubernetes.local.dirs.tmpfs")
       .doc("If set to true then emptyDir volumes created to back 
SPARK_LOCAL_DIRS will have " +

http://git-wip-us.apache.org/repos/asf/spark/blob/1cfda448/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index c37f713..d89995b 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -24,7 +24,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, 
EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
+import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, 
EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD, 
PYSPARK_EXECUTOR_MEMORY}
 import org.apache.spark.rpc.RpcEndpointAddress
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
@@ -58,6 +58,16 @@ private[spark] class BasicExecutorFeatureStep(
       (kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt,
       MEMORY_OVERHEAD_MIN_MIB))
   private val executorMemoryWithOverhead = executorMemoryMiB + 
memoryOverheadMiB
+  private val executorMemoryTotal = kubernetesConf.sparkConf
+    .getOption(APP_RESOURCE_TYPE.key).map{ res =>
+      val additionalPySparkMemory = res match {
+        case "python" =>
+          kubernetesConf.sparkConf
+            .get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
+        case _ => 0
+      }
+    executorMemoryWithOverhead + additionalPySparkMemory
+  }.getOrElse(executorMemoryWithOverhead)
 
   private val executorCores = 
kubernetesConf.sparkConf.getInt("spark.executor.cores", 1)
   private val executorCoresRequest =
@@ -76,7 +86,7 @@ private[spark] class BasicExecutorFeatureStep(
     // executorId
     val hostname = name.substring(Math.max(0, name.length - 63))
     val executorMemoryQuantity = new QuantityBuilder(false)
-      .withAmount(s"${executorMemoryWithOverhead}Mi")
+      .withAmount(s"${executorMemoryTotal}Mi")
       .build()
     val executorCpuQuantity = new QuantityBuilder(false)
       .withAmount(executorCoresRequest)

http://git-wip-us.apache.org/repos/asf/spark/blob/1cfda448/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala
index f52ec9f..6f063b2 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.features.bindings
 import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata}
 
 import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE
 import org.apache.spark.deploy.k8s.Constants.SPARK_CONF_PATH
 import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
 import org.apache.spark.launcher.SparkLauncher
@@ -38,7 +39,8 @@ private[spark] class JavaDriverFeatureStep(
       .build()
     SparkPod(pod.pod, withDriverArgs)
   }
-  override def getAdditionalPodSystemProperties(): Map[String, String] = 
Map.empty
+  override def getAdditionalPodSystemProperties(): Map[String, String] =
+    Map(APP_RESOURCE_TYPE.key -> "java")
 
   override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1cfda448/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala
index 406944a..cf0c03b 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
 import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
HasMetadata}
 
 import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
+import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
 
@@ -68,7 +69,8 @@ private[spark] class PythonDriverFeatureStep(
 
     SparkPod(pod.pod, withPythonPrimaryContainer)
   }
-  override def getAdditionalPodSystemProperties(): Map[String, String] = 
Map.empty
+  override def getAdditionalPodSystemProperties(): Map[String, String] =
+    Map(APP_RESOURCE_TYPE.key -> "python")
 
   override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1cfda448/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
index 11b09b3..1a7ef52 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
 import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
HasMetadata}
 
 import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
+import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
 
@@ -54,7 +55,8 @@ private[spark] class RDriverFeatureStep(
 
     SparkPod(pod.pod, withRPrimaryContainer)
   }
-  override def getAdditionalPodSystemProperties(): Map[String, String] = 
Map.empty
+  override def getAdditionalPodSystemProperties(): Map[String, String] =
+    Map(APP_RESOURCE_TYPE.key -> "r")
 
   override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1cfda448/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index d98e113..0968cce 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -57,7 +57,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     MAIN_CLASS,
     APP_ARGS)
 
-
   test("Check the pod respects all configurations from the user.") {
     val sparkConf = new SparkConf()
       .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")

http://git-wip-us.apache.org/repos/asf/spark/blob/1cfda448/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index 95d373f..63b237b 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -75,6 +75,7 @@ class BasicExecutorFeatureStepSuite
       .set("spark.driver.host", DRIVER_HOSTNAME)
       .set("spark.driver.port", DRIVER_PORT.toString)
       .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS.mkString(","))
+      .set("spark.kubernetes.resource.type", "java")
   }
 
   test("basic executor pod has reasonable defaults") {
@@ -161,6 +162,29 @@ class BasicExecutorFeatureStepSuite
     checkOwnerReferences(executor.pod, DRIVER_POD_UID)
   }
 
+  test("test executor pyspark memory") {
+    val conf = baseConf.clone()
+    conf.set("spark.kubernetes.resource.type", "python")
+    conf.set(org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY, 42L)
+
+    val step = new BasicExecutorFeatureStep(
+      KubernetesConf(
+        conf,
+        KubernetesExecutorSpecificConf("1", Some(DRIVER_POD)),
+        RESOURCE_NAME_PREFIX,
+        APP_ID,
+        LABELS,
+        ANNOTATIONS,
+        Map.empty,
+        Map.empty,
+        Map.empty,
+        Nil,
+        Seq.empty[String]))
+    val executor = step.configurePod(SparkPod.initialPod())
+    // This is checking that basic executor + executorMemory = 1408 + 42 = 1450
+    assert(executor.container.getResources.getRequests.get("memory").getAmount 
=== "1450Mi")
+  }
+
   // There is always exactly one controller reference, and it points to the 
driver pod.
   private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit 
= {
     assert(executor.getMetadata.getOwnerReferences.size() === 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/1cfda448/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
index 18874af..bf552ae 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
@@ -56,6 +56,5 @@ class JavaDriverFeatureStepSuite extends SparkFunSuite {
       "--properties-file", SPARK_CONF_PATH,
       "--class", "test-class",
       "spark-internal", "5 7"))
-
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1cfda448/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile
index 071aa20..7ae57bf 100644
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile
@@ -43,6 +43,7 @@ COPY bin /opt/spark/bin
 COPY sbin /opt/spark/sbin
 COPY ${img_path}/spark/entrypoint.sh /opt/
 COPY examples /opt/spark/examples
+COPY kubernetes/tests /opt/spark/tests
 COPY data /opt/spark/data
 
 ENV SPARK_HOME /opt/spark

http://git-wip-us.apache.org/repos/asf/spark/blob/1cfda448/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/R/Dockerfile
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/R/Dockerfile
 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/R/Dockerfile
index e627883..9f67422 100644
--- 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/R/Dockerfile
+++ 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/R/Dockerfile
@@ -19,10 +19,10 @@ ARG base_img
 FROM $base_img
 WORKDIR /
 RUN mkdir ${SPARK_HOME}/R
-COPY R ${SPARK_HOME}/R
 
 RUN apk add --no-cache R R-dev
 
+COPY R ${SPARK_HOME}/R
 ENV R_HOME /usr/lib/R
 
 WORKDIR /opt/spark/work-dir

http://git-wip-us.apache.org/repos/asf/spark/blob/1cfda448/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile
 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile
index 72bb962..69b6efa 100644
--- 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile
+++ 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile
@@ -19,7 +19,6 @@ ARG base_img
 FROM $base_img
 WORKDIR /
 RUN mkdir ${SPARK_HOME}/python
-COPY python/lib ${SPARK_HOME}/python/lib
 # TODO: Investigate running both pip and pip3 via virtualenvs
 RUN apk add --no-cache python && \
     apk add --no-cache python3 && \
@@ -33,6 +32,7 @@ RUN apk add --no-cache python && \
     # Removed the .cache to save space
     rm -r /root/.cache
 
+COPY python/lib ${SPARK_HOME}/python/lib
 ENV PYTHONPATH 
${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-*.zip
 
 WORKDIR /opt/spark/work-dir

http://git-wip-us.apache.org/repos/asf/spark/blob/1cfda448/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 896a83a..82e6efa 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -50,6 +50,17 @@ private[spark] class KubernetesSuite extends SparkFunSuite
   protected var containerLocalSparkDistroExamplesJar: String = _
   protected var appLocator: String = _
 
+  // Default memory limit is 1024M + 384M (minimum overhead constant)
+  private val baseMemory = s"${1024 + 384}Mi"
+  protected val memOverheadConstant = 0.8
+  private val standardNonJVMMemory = s"${(1024 + 0.4*1024).toInt}Mi"
+  protected val additionalMemory = 200
+  // 209715200 is 200Mi
+  protected val additionalMemoryInBytes = 209715200
+  private val extraDriverTotalMemory = s"${(1024 + 
memOverheadConstant*1024).toInt}Mi"
+  private val extraExecTotalMemory =
+    s"${(1024 + memOverheadConstant*1024 + additionalMemory).toInt}Mi"
+
   override def beforeAll(): Unit = {
     // The scalatest-maven-plugin gives system properties that are referenced 
but not set null
     // values. We need to remove the null-value properties before initializing 
the test backend.
@@ -233,6 +244,8 @@ private[spark] class KubernetesSuite extends SparkFunSuite
     assert(driverPod.getMetadata.getName === driverPodName)
     assert(driverPod.getSpec.getContainers.get(0).getImage === image)
     assert(driverPod.getSpec.getContainers.get(0).getName === 
"spark-kubernetes-driver")
+    
assert(driverPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
+      === baseMemory)
   }
 
 
@@ -240,28 +253,48 @@ private[spark] class KubernetesSuite extends SparkFunSuite
     assert(driverPod.getMetadata.getName === driverPodName)
     assert(driverPod.getSpec.getContainers.get(0).getImage === pyImage)
     assert(driverPod.getSpec.getContainers.get(0).getName === 
"spark-kubernetes-driver")
+    
assert(driverPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
+      === standardNonJVMMemory)
   }
 
   protected def doBasicDriverRPodCheck(driverPod: Pod): Unit = {
     assert(driverPod.getMetadata.getName === driverPodName)
     assert(driverPod.getSpec.getContainers.get(0).getImage === rImage)
     assert(driverPod.getSpec.getContainers.get(0).getName === 
"spark-kubernetes-driver")
+    
assert(driverPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
+      === standardNonJVMMemory)
   }
 
 
   protected def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
     assert(executorPod.getSpec.getContainers.get(0).getImage === image)
     assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
+    
assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
+      === baseMemory)
   }
 
   protected def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = {
     assert(executorPod.getSpec.getContainers.get(0).getImage === pyImage)
     assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
+    
assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
+      === standardNonJVMMemory)
   }
 
   protected def doBasicExecutorRPodCheck(executorPod: Pod): Unit = {
     assert(executorPod.getSpec.getContainers.get(0).getImage === rImage)
     assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
+    
assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
+      === standardNonJVMMemory)
+  }
+
+  protected def doDriverMemoryCheck(driverPod: Pod): Unit = {
+    
assert(driverPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
+      === extraDriverTotalMemory)
+  }
+
+  protected def doExecutorMemoryCheck(executorPod: Pod): Unit = {
+    
assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
+      === extraExecTotalMemory)
   }
 
   protected def checkCustomSettings(pod: Pod): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/1cfda448/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala
index 1ebb300..06b7310 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala
@@ -23,9 +23,11 @@ private[spark] trait PythonTestsSuite { k8sSuite: 
KubernetesSuite =>
   import PythonTestsSuite._
   import KubernetesSuite.k8sTestTag
 
+  private val pySparkDockerImage =
+    s"${getTestImageRepo}/spark-py:${getTestImageTag}"
   test("Run PySpark on simple pi.py example", k8sTestTag) {
     sparkAppConf
-      .set("spark.kubernetes.container.image", 
s"${getTestImageRepo}/spark-py:${getTestImageTag}")
+      .set("spark.kubernetes.container.image", pySparkDockerImage)
     runSparkApplicationAndVerifyCompletion(
       appResource = PYSPARK_PI,
       mainClass = "",
@@ -39,7 +41,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: 
KubernetesSuite =>
 
   test("Run PySpark with Python2 to test a pyfiles example", k8sTestTag) {
     sparkAppConf
-      .set("spark.kubernetes.container.image", 
s"${getTestImageRepo}/spark-py:${getTestImageTag}")
+      .set("spark.kubernetes.container.image", pySparkDockerImage)
       .set("spark.kubernetes.pyspark.pythonVersion", "2")
     runSparkApplicationAndVerifyCompletion(
       appResource = PYSPARK_FILES,
@@ -57,7 +59,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: 
KubernetesSuite =>
 
   test("Run PySpark with Python3 to test a pyfiles example", k8sTestTag) {
     sparkAppConf
-      .set("spark.kubernetes.container.image", 
s"${getTestImageRepo}/spark-py:${getTestImageTag}")
+      .set("spark.kubernetes.container.image", pySparkDockerImage)
       .set("spark.kubernetes.pyspark.pythonVersion", "3")
     runSparkApplicationAndVerifyCompletion(
       appResource = PYSPARK_FILES,
@@ -72,12 +74,32 @@ private[spark] trait PythonTestsSuite { k8sSuite: 
KubernetesSuite =>
       isJVM = false,
       pyFiles = Some(PYSPARK_CONTAINER_TESTS))
   }
+
+  test("Run PySpark with memory customization", k8sTestTag) {
+    sparkAppConf
+      .set("spark.kubernetes.container.image", pySparkDockerImage)
+      .set("spark.kubernetes.pyspark.pythonVersion", "3")
+      .set("spark.kubernetes.memoryOverheadFactor", s"$memOverheadConstant")
+      .set("spark.executor.pyspark.memory", s"${additionalMemory}m")
+    runSparkApplicationAndVerifyCompletion(
+      appResource = PYSPARK_MEMORY_CHECK,
+      mainClass = "",
+      expectedLogOnCompletion = Seq(
+        "PySpark Worker Memory Check is: True"),
+      appArgs = Array(s"$additionalMemoryInBytes"),
+      driverPodChecker = doDriverMemoryCheck,
+      executorPodChecker = doExecutorMemoryCheck,
+      appLocator = appLocator,
+      isJVM = false,
+      pyFiles = Some(PYSPARK_CONTAINER_TESTS))
+  }
 }
 
 private[spark] object PythonTestsSuite {
   val CONTAINER_LOCAL_PYSPARK: String = 
"local:///opt/spark/examples/src/main/python/"
   val PYSPARK_PI: String = CONTAINER_LOCAL_PYSPARK + "pi.py"
-  val PYSPARK_FILES: String = CONTAINER_LOCAL_PYSPARK + "pyfiles.py"
-  val PYSPARK_CONTAINER_TESTS: String = CONTAINER_LOCAL_PYSPARK + 
"py_container_checks.py"
+  val TEST_LOCAL_PYSPARK: String = "local:///opt/spark/tests/"
+  val PYSPARK_FILES: String = TEST_LOCAL_PYSPARK + "pyfiles.py"
+  val PYSPARK_CONTAINER_TESTS: String = TEST_LOCAL_PYSPARK + 
"py_container_checks.py"
+  val PYSPARK_MEMORY_CHECK: String = TEST_LOCAL_PYSPARK + 
"worker_memory_check.py"
 }
-

http://git-wip-us.apache.org/repos/asf/spark/blob/1cfda448/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
index 7b05c13..9b039bb 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
@@ -53,6 +53,7 @@ private[spark] trait SecretsTestsSuite { k8sSuite: 
KubernetesSuite =>
       .delete()
   }
 
+  // TODO: [SPARK-25291] This test is flaky with regards to memory of executors
   test("Run SparkPi with env and mount secrets.", k8sTestTag) {
     createTestSecret()
     sparkAppConf

http://git-wip-us.apache.org/repos/asf/spark/blob/1cfda448/resource-managers/kubernetes/integration-tests/tests/py_container_checks.py
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/tests/py_container_checks.py 
b/resource-managers/kubernetes/integration-tests/tests/py_container_checks.py
new file mode 100644
index 0000000..f6b3be2
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/tests/py_container_checks.py
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+
+
+def version_check(python_env, major_python_version):
+    """
+        These are various tests to test the Python container image.
+        This file will be distributed via --py-files in the e2e tests.
+    """
+    env_version = os.environ.get('PYSPARK_PYTHON')
+    print("Python runtime version check is: " +
+          str(sys.version_info[0] == major_python_version))
+
+    print("Python environment version check is: " +
+          str(env_version == python_env))

http://git-wip-us.apache.org/repos/asf/spark/blob/1cfda448/resource-managers/kubernetes/integration-tests/tests/pyfiles.py
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/integration-tests/tests/pyfiles.py 
b/resource-managers/kubernetes/integration-tests/tests/pyfiles.py
new file mode 100644
index 0000000..4193654
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/tests/pyfiles.py
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import print_function
+
+import sys
+
+from pyspark.sql import SparkSession
+
+
+if __name__ == "__main__":
+    """
+        Usage: pyfiles [major_python_version]
+    """
+    spark = SparkSession \
+        .builder \
+        .appName("PyFilesTest") \
+        .getOrCreate()
+
+    from py_container_checks import version_check
+    # Begin of Python container checks
+    version_check(sys.argv[1], 2 if sys.argv[1] == "python" else 3)
+
+    spark.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/1cfda448/resource-managers/kubernetes/integration-tests/tests/worker_memory_check.py
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/tests/worker_memory_check.py 
b/resource-managers/kubernetes/integration-tests/tests/worker_memory_check.py
new file mode 100644
index 0000000..d312a29
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/tests/worker_memory_check.py
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import print_function
+
+import resource
+import sys
+
+from pyspark.sql import SparkSession
+
+
+if __name__ == "__main__":
+    """
+        Usage: worker_memory_check [Memory_in_Mi]
+    """
+    spark = SparkSession \
+        .builder \
+        .appName("PyMemoryTest") \
+        .getOrCreate()
+    sc = spark.sparkContext
+    if len(sys.argv) < 2:
+        print("Usage: worker_memory_check [Memory_in_Mi]", file=sys.stderr)
+        sys.exit(-1)
+
+    def f(x):
+        rLimit = resource.getrlimit(resource.RLIMIT_AS)
+        print("RLimit is " + str(rLimit))
+        return rLimit
+    resourceValue = sc.parallelize([1]).map(f).collect()[0][0]
+    print("Resource Value is " + str(resourceValue))
+    truthCheck = (resourceValue == int(sys.argv[1]))
+    print("PySpark Worker Memory Check is: " + str(truthCheck))
+    spark.stop()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to