Repository: spark
Updated Branches:
  refs/heads/master 173fe450d -> 1a644afba


[SPARK-23984][K8S] Initial Python Bindings for PySpark on K8s

## What changes were proposed in this pull request?

Introducing Python Bindings for PySpark.

- [x] Running PySpark Jobs
- [x] Increased Default Memory Overhead value
- [ ] Dependency Management for virtualenv/conda

## How was this patch tested?

This patch was tested with

- [x] Unit Tests
- [x] Integration tests with [this 
addition](https://github.com/apache-spark-on-k8s/spark-integration/pull/46)
```
KubernetesSuite:
- Run SparkPi with no resources
- Run SparkPi with a very long application name.
- Run SparkPi with a master URL without a scheme.
- Run SparkPi with an argument.
- Run SparkPi with custom labels, annotations, and environment variables.
- Run SparkPi with a test secret mounted into the driver and executor pods
- Run extraJVMOptions check on driver
- Run SparkRemoteFileTest using a remote data file
- Run PySpark on simple pi.py example
- Run PySpark with Python2 to test a pyfiles example
- Run PySpark with Python3 to test a pyfiles example
Run completed in 4 minutes, 28 seconds.
Total number of tests run: 11
Suites: completed 2, aborted 0
Tests: succeeded 11, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

Author: Ilan Filonenko <i...@cornell.edu>
Author: Ilan Filonenko <ifilo...@gmail.com>

Closes #21092 from ifilonenko/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a644afb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a644afb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a644afb

Branch: refs/heads/master
Commit: 1a644afbac35c204f9ad55f86999319a9ab458c6
Parents: 173fe45
Author: Ilan Filonenko <i...@cornell.edu>
Authored: Fri Jun 8 11:18:34 2018 -0700
Committer: mcheah <mch...@palantir.com>
Committed: Fri Jun 8 11:18:34 2018 -0700

----------------------------------------------------------------------
 bin/docker-image-tool.sh                        |  23 ++--
 .../org/apache/spark/deploy/SparkSubmit.scala   |  14 ++-
 docs/running-on-kubernetes.md                   |  16 ++-
 examples/src/main/python/py_container_checks.py |  32 ++++++
 examples/src/main/python/pyfiles.py             |  38 +++++++
 .../org/apache/spark/deploy/k8s/Config.scala    |  40 +++++++
 .../org/apache/spark/deploy/k8s/Constants.scala |   7 +-
 .../spark/deploy/k8s/KubernetesConf.scala       |  62 ++++++++---
 .../spark/deploy/k8s/KubernetesUtils.scala      |   2 +-
 .../k8s/features/BasicDriverFeatureStep.scala   |  14 +--
 .../k8s/features/BasicExecutorFeatureStep.scala |   3 +-
 .../bindings/JavaDriverFeatureStep.scala        |  44 ++++++++
 .../bindings/PythonDriverFeatureStep.scala      |  73 +++++++++++++
 .../submit/KubernetesClientApplication.scala    |  16 ++-
 .../k8s/submit/KubernetesDriverBuilder.scala    |  39 +++++--
 .../deploy/k8s/submit/MainAppResource.scala     |   5 +
 .../cluster/k8s/KubernetesExecutorBuilder.scala |  22 ++--
 .../spark/deploy/k8s/KubernetesConfSuite.scala  |  66 ++++++++++--
 .../features/BasicDriverFeatureStepSuite.scala  |  58 +++++++++-
 .../BasicExecutorFeatureStepSuite.scala         |   9 +-
 ...rKubernetesCredentialsFeatureStepSuite.scala |   9 +-
 .../DriverServiceFeatureStepSuite.scala         |  18 ++--
 .../features/EnvSecretsFeatureStepSuite.scala   |   3 +-
 .../features/LocalDirsFeatureStepSuite.scala    |   3 +-
 .../features/MountSecretsFeatureStepSuite.scala |   3 +-
 .../bindings/JavaDriverFeatureStepSuite.scala   |  60 +++++++++++
 .../bindings/PythonDriverFeatureStepSuite.scala | 108 +++++++++++++++++++
 .../spark/deploy/k8s/submit/ClientSuite.scala   |   3 +-
 .../submit/KubernetesDriverBuilderSuite.scala   |  78 ++++++++++++--
 .../k8s/KubernetesExecutorBuilderSuite.scala    |   6 +-
 .../spark/bindings/python/Dockerfile            |  39 +++++++
 .../src/main/dockerfiles/spark/entrypoint.sh    |  30 ++++++
 32 files changed, 842 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/bin/docker-image-tool.sh
----------------------------------------------------------------------
diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh
index f090240..a871ab5 100755
--- a/bin/docker-image-tool.sh
+++ b/bin/docker-image-tool.sh
@@ -63,12 +63,20 @@ function build {
   if [ ! -d "$IMG_PATH" ]; then
     error "Cannot find docker image. This script must be run from a runnable 
distribution of Apache Spark."
   fi
-
-  local DOCKERFILE=${DOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
+  local BINDING_BUILD_ARGS=(
+    --build-arg
+    base_img=$(image_ref spark)
+  )
+  local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
+  local 
PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/spark/bindings/python/Dockerfile"}
 
   docker build "${BUILD_ARGS[@]}" \
     -t $(image_ref spark) \
-    -f "$DOCKERFILE" .
+    -f "$BASEDOCKERFILE" .
+
+    docker build "${BINDING_BUILD_ARGS[@]}" \
+    -t $(image_ref spark-py) \
+    -f "$PYDOCKERFILE" .
 }
 
 function push {
@@ -86,7 +94,8 @@ Commands:
   push        Push a pre-built image to a registry. Requires a repository 
address to be provided.
 
 Options:
-  -f file     Dockerfile to build. By default builds the Dockerfile shipped 
with Spark.
+  -f file     Dockerfile to build for JVM based Jobs. By default builds the 
Dockerfile shipped with Spark.
+  -p file     Dockerfile with Python baked in. By default builds the 
Dockerfile shipped with Spark.
   -r repo     Repository address.
   -t tag      Tag to apply to the built image, or to identify the image to be 
pushed.
   -m          Use minikube's Docker daemon.
@@ -116,12 +125,14 @@ fi
 
 REPO=
 TAG=
-DOCKERFILE=
+BASEDOCKERFILE=
+PYDOCKERFILE=
 while getopts f:mr:t: option
 do
  case "${option}"
  in
- f) DOCKERFILE=${OPTARG};;
+ f) BASEDOCKERFILE=${OPTARG};;
+ p) PYDOCKERFILE=${OPTARG};;
  r) REPO=${OPTARG};;
  t) TAG=${OPTARG};;
  m)

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index a46af26..e83d82f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -285,8 +285,6 @@ private[spark] class SparkSubmit extends Logging {
       case (STANDALONE, CLUSTER) if args.isR =>
         error("Cluster deploy mode is currently not supported for R " +
           "applications on standalone clusters.")
-      case (KUBERNETES, _) if args.isPython =>
-        error("Python applications are currently not supported for 
Kubernetes.")
       case (KUBERNETES, _) if args.isR =>
         error("R applications are currently not supported for Kubernetes.")
       case (LOCAL, CLUSTER) =>
@@ -694,9 +692,17 @@ private[spark] class SparkSubmit extends Logging {
     if (isKubernetesCluster) {
       childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS
       if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
-        childArgs ++= Array("--primary-java-resource", args.primaryResource)
+        if (args.isPython) {
+          childArgs ++= Array("--primary-py-file", args.primaryResource)
+          childArgs ++= Array("--main-class", 
"org.apache.spark.deploy.PythonRunner")
+          if (args.pyFiles != null) {
+            childArgs ++= Array("--other-py-files", args.pyFiles)
+          }
+        } else {
+          childArgs ++= Array("--primary-java-resource", args.primaryResource)
+          childArgs ++= Array("--main-class", args.mainClass)
+        }
       }
-      childArgs ++= Array("--main-class", args.mainClass)
       if (args.childArgs != null) {
         args.childArgs.foreach { arg =>
           childArgs += ("--arg", arg)

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/docs/running-on-kubernetes.md
----------------------------------------------------------------------
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 4eac9bd..408e446 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -270,7 +270,6 @@ future versions of the spark-kubernetes integration.
 
 Some of these include:
 
-* PySpark
 * R
 * Dynamic Executor Scaling
 * Local File Dependency Management
@@ -631,4 +630,19 @@ specific to Spark on Kubernetes.
    <code>spark.kubernetes.executor.secrets.ENV_VAR=spark-secret:key</code>.
   </td>
 </tr>
+<tr>
+  <td><code>spark.kubernetes.memoryOverheadFactor</code></td>
+  <td><code>0.1</code></td>
+  <td>
+    This sets the Memory Overhead Factor that will allocate memory to non-JVM 
memory, which includes off-heap memory allocations, non-JVM tasks, and various 
systems processes. For JVM-based jobs this value will default to 0.10 and 0.40 
for non-JVM jobs.
+    This is done as non-JVM tasks need more non-JVM heap space and such tasks 
commonly fail with "Memory Overhead Exceeded" errors. This prempts this error 
with a higher default. 
+  </td>
+</tr>
+<tr>
+  <td><code>spark.kubernetes.pyspark.pythonversion</code></td>
+  <td><code>"2"</code></td>
+  <td>
+   This sets the major Python version of the docker image used to run the 
driver and executor containers. Can either be 2 or 3. 
+  </td>
+</tr>
 </table>

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/examples/src/main/python/py_container_checks.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/py_container_checks.py 
b/examples/src/main/python/py_container_checks.py
new file mode 100644
index 0000000..f6b3be2
--- /dev/null
+++ b/examples/src/main/python/py_container_checks.py
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+
+
+def version_check(python_env, major_python_version):
+    """
+        These are various tests to test the Python container image.
+        This file will be distributed via --py-files in the e2e tests.
+    """
+    env_version = os.environ.get('PYSPARK_PYTHON')
+    print("Python runtime version check is: " +
+          str(sys.version_info[0] == major_python_version))
+
+    print("Python environment version check is: " +
+          str(env_version == python_env))

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/examples/src/main/python/pyfiles.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/pyfiles.py 
b/examples/src/main/python/pyfiles.py
new file mode 100644
index 0000000..4193654
--- /dev/null
+++ b/examples/src/main/python/pyfiles.py
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import print_function
+
+import sys
+
+from pyspark.sql import SparkSession
+
+
+if __name__ == "__main__":
+    """
+        Usage: pyfiles [major_python_version]
+    """
+    spark = SparkSession \
+        .builder \
+        .appName("PyFilesTest") \
+        .getOrCreate()
+
+    from py_container_checks import version_check
+    # Begin of Python container checks
+    version_check(sys.argv[1], 2 if sys.argv[1] == "python" else 3)
+
+    spark.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 560dedf..590deaa 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -117,6 +117,28 @@ private[spark] object Config extends Logging {
       .stringConf
       .createWithDefault("spark")
 
+  val KUBERNETES_PYSPARK_PY_FILES =
+    ConfigBuilder("spark.kubernetes.python.pyFiles")
+      .doc("The PyFiles that are distributed via client arguments")
+      .internal()
+      .stringConf
+      .createOptional
+
+  val KUBERNETES_PYSPARK_MAIN_APP_RESOURCE =
+    ConfigBuilder("spark.kubernetes.python.mainAppResource")
+      .doc("The main app resource for pyspark jobs")
+      .internal()
+      .stringConf
+      .createOptional
+
+  val KUBERNETES_PYSPARK_APP_ARGS =
+    ConfigBuilder("spark.kubernetes.python.appArgs")
+      .doc("The app arguments for PySpark Jobs")
+      .internal()
+      .stringConf
+      .createOptional
+
+
   val KUBERNETES_ALLOCATION_BATCH_SIZE =
     ConfigBuilder("spark.kubernetes.allocation.batch.size")
       .doc("Number of pods to launch at once in each round of executor 
allocation.")
@@ -154,6 +176,24 @@ private[spark] object Config extends Logging {
       .checkValue(interval => interval > 0, s"Logging interval must be a 
positive time value.")
       .createWithDefaultString("1s")
 
+  val MEMORY_OVERHEAD_FACTOR =
+    ConfigBuilder("spark.kubernetes.memoryOverheadFactor")
+      .doc("This sets the Memory Overhead Factor that will allocate memory to 
non-JVM jobs " +
+        "which in the case of JVM tasks will default to 0.10 and 0.40 for 
non-JVM jobs")
+      .doubleConf
+      .checkValue(mem_overhead => mem_overhead >= 0 && mem_overhead < 1,
+        "Ensure that memory overhead is a double between 0 --> 1.0")
+      .createWithDefault(0.1)
+
+  val PYSPARK_MAJOR_PYTHON_VERSION =
+    ConfigBuilder("spark.kubernetes.pyspark.pythonversion")
+      .doc("This sets the major Python version. Either 2 or 3. (Python2 or 
Python3)")
+      .stringConf
+      .checkValue(pv => List("2", "3").contains(pv),
+        "Ensure that major Python version is either Python2 or Python3")
+      .createWithDefault("2")
+
+
   val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
     "spark.kubernetes.authenticate.submission"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index 8da5f24..69bd03d 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -71,9 +71,14 @@ private[spark] object Constants {
   val SPARK_CONF_FILE_NAME = "spark.properties"
   val SPARK_CONF_PATH = s"$SPARK_CONF_DIR_INTERNAL/$SPARK_CONF_FILE_NAME"
 
+  // BINDINGS
+  val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
+  val ENV_PYSPARK_FILES = "PYSPARK_FILES"
+  val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS"
+  val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION"
+
   // Miscellaneous
   val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc";
   val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
-  val MEMORY_OVERHEAD_FACTOR = 0.10
   val MEMORY_OVERHEAD_MIN_MIB = 384L
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index 5a94418..b0ccaa3 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -16,14 +16,17 @@
  */
 package org.apache.spark.deploy.k8s
 
+import scala.collection.mutable
+
 import io.fabric8.kubernetes.api.model.{LocalObjectReference, 
LocalObjectReferenceBuilder, Pod}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, 
MainAppResource}
+import org.apache.spark.deploy.k8s.submit._
 import org.apache.spark.internal.config.ConfigEntry
 
+
 private[spark] sealed trait KubernetesRoleSpecificConf
 
 /*
@@ -55,7 +58,8 @@ private[spark] case class KubernetesConf[T <: 
KubernetesRoleSpecificConf](
     roleAnnotations: Map[String, String],
     roleSecretNamesToMountPaths: Map[String, String],
     roleSecretEnvNamesToKeyRefs: Map[String, String],
-    roleEnvs: Map[String, String]) {
+    roleEnvs: Map[String, String],
+    sparkFiles: Seq[String]) {
 
   def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
 
@@ -64,10 +68,14 @@ private[spark] case class KubernetesConf[T <: 
KubernetesRoleSpecificConf](
     .map(str => str.split(",").toSeq)
     .getOrElse(Seq.empty[String])
 
-  def sparkFiles(): Seq[String] = sparkConf
-    .getOption("spark.files")
-    .map(str => str.split(",").toSeq)
-    .getOrElse(Seq.empty[String])
+  def pyFiles(): Option[String] = sparkConf
+    .get(KUBERNETES_PYSPARK_PY_FILES)
+
+  def pySparkMainResource(): Option[String] = sparkConf
+    .get(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE)
+
+  def pySparkPythonVersion(): String = sparkConf
+      .get(PYSPARK_MAJOR_PYTHON_VERSION)
 
   def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
 
@@ -102,17 +110,30 @@ private[spark] object KubernetesConf {
       appId: String,
       mainAppResource: Option[MainAppResource],
       mainClass: String,
-      appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
+      appArgs: Array[String],
+      maybePyFiles: Option[String]): 
KubernetesConf[KubernetesDriverSpecificConf] = {
     val sparkConfWithMainAppJar = sparkConf.clone()
+    val additionalFiles = mutable.ArrayBuffer.empty[String]
     mainAppResource.foreach {
-      case JavaMainAppResource(res) =>
-        val previousJars = sparkConf
-          .getOption("spark.jars")
-          .map(_.split(","))
-          .getOrElse(Array.empty)
-        if (!previousJars.contains(res)) {
-          sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
-        }
+        case JavaMainAppResource(res) =>
+          val previousJars = sparkConf
+            .getOption("spark.jars")
+            .map(_.split(","))
+            .getOrElse(Array.empty)
+          if (!previousJars.contains(res)) {
+            sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
+          }
+        // The function of this outer match is to account for multiple nonJVM
+        // bindings that will all have increased MEMORY_OVERHEAD_FACTOR to 0.4
+        case nonJVM: NonJVMResource =>
+          nonJVM match {
+            case PythonMainAppResource(res) =>
+              additionalFiles += res
+              maybePyFiles.foreach{maybePyFiles =>
+                additionalFiles.appendAll(maybePyFiles.split(","))}
+              
sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res)
+          }
+          sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4)
     }
 
     val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
@@ -135,6 +156,11 @@ private[spark] object KubernetesConf {
     val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
       sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)
 
+    val sparkFiles = sparkConf
+      .getOption("spark.files")
+      .map(str => str.split(",").toSeq)
+      .getOrElse(Seq.empty[String]) ++ additionalFiles
+
     KubernetesConf(
       sparkConfWithMainAppJar,
       KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, 
appArgs),
@@ -144,7 +170,8 @@ private[spark] object KubernetesConf {
       driverAnnotations,
       driverSecretNamesToMountPaths,
       driverSecretEnvNamesToKeyRefs,
-      driverEnvs)
+      driverEnvs,
+      sparkFiles)
   }
 
   def createExecutorConf(
@@ -186,6 +213,7 @@ private[spark] object KubernetesConf {
       executorAnnotations,
       executorMountSecrets,
       executorEnvSecrets,
-      executorEnv)
+      executorEnv,
+      Seq.empty[String])
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index ee62906..593fb53 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -52,7 +52,7 @@ private[spark] object KubernetesUtils {
     }
   }
 
-  private def resolveFileUri(uri: String): String = {
+  def resolveFileUri(uri: String): String = {
     val fileUri = Utils.resolveURI(uri)
     val fileScheme = Option(fileUri.getScheme).getOrElse("file")
     fileScheme match {

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 07bdccb..143dc8a 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -25,8 +25,8 @@ import org.apache.spark.SparkException
 import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit._
 import org.apache.spark.internal.config._
-import org.apache.spark.launcher.SparkLauncher
 
 private[spark] class BasicDriverFeatureStep(
     conf: KubernetesConf[KubernetesDriverSpecificConf])
@@ -48,7 +48,8 @@ private[spark] class BasicDriverFeatureStep(
   private val driverMemoryMiB = conf.get(DRIVER_MEMORY)
   private val memoryOverheadMiB = conf
     .get(DRIVER_MEMORY_OVERHEAD)
-    .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, 
MEMORY_OVERHEAD_MIN_MIB))
+    .getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * 
driverMemoryMiB).toInt,
+      MEMORY_OVERHEAD_MIN_MIB))
   private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB
 
   override def configurePod(pod: SparkPod): SparkPod = {
@@ -88,13 +89,6 @@ private[spark] class BasicDriverFeatureStep(
         .addToRequests("memory", driverMemoryQuantity)
         .addToLimits("memory", driverMemoryQuantity)
         .endResources()
-      .addToArgs("driver")
-      .addToArgs("--properties-file", SPARK_CONF_PATH)
-      .addToArgs("--class", conf.roleSpecificConf.mainClass)
-      // The user application jar is merged into the spark.jars list and 
managed through that
-      // property, so there is no need to reference it explicitly here.
-      .addToArgs(SparkLauncher.NO_RESOURCE)
-      .addToArgs(conf.roleSpecificConf.appArgs: _*)
       .build()
 
     val driverPod = new PodBuilder(pod.pod)
@@ -122,7 +116,7 @@ private[spark] class BasicDriverFeatureStep(
     val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath(
       conf.sparkJars())
     val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath(
-      conf.sparkFiles())
+      conf.sparkFiles)
     if (resolvedSparkJars.nonEmpty) {
       additionalProps.put("spark.jars", resolvedSparkJars.mkString(","))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index 529069d..91c54a9 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -54,7 +54,8 @@ private[spark] class BasicExecutorFeatureStep(
 
   private val memoryOverheadMiB = kubernetesConf
     .get(EXECUTOR_MEMORY_OVERHEAD)
-    .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
+    .getOrElse(math.max(
+      (kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt,
       MEMORY_OVERHEAD_MIN_MIB))
   private val executorMemoryWithOverhead = executorMemoryMiB + 
memoryOverheadMiB
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala
new file mode 100644
index 0000000..f52ec9f
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features.bindings
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata}
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Constants.SPARK_CONF_PATH
+import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
+import org.apache.spark.launcher.SparkLauncher
+
+private[spark] class JavaDriverFeatureStep(
+  kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf])
+  extends KubernetesFeatureConfigStep {
+  override def configurePod(pod: SparkPod): SparkPod = {
+    val withDriverArgs = new ContainerBuilder(pod.container)
+      .addToArgs("driver")
+      .addToArgs("--properties-file", SPARK_CONF_PATH)
+      .addToArgs("--class", kubernetesConf.roleSpecificConf.mainClass)
+      // The user application jar is merged into the spark.jars list and 
managed through that
+      // property, so there is no need to reference it explicitly here.
+      .addToArgs(SparkLauncher.NO_RESOURCE)
+      .addToArgs(kubernetesConf.roleSpecificConf.appArgs: _*)
+      .build()
+    SparkPod(pod.pod, withDriverArgs)
+  }
+  override def getAdditionalPodSystemProperties(): Map[String, String] = 
Map.empty
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala
new file mode 100644
index 0000000..c20bcac
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features.bindings
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
HasMetadata}
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
+
+private[spark] class PythonDriverFeatureStep(
+  kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf])
+  extends KubernetesFeatureConfigStep {
+  override def configurePod(pod: SparkPod): SparkPod = {
+    val roleConf = kubernetesConf.roleSpecificConf
+    require(roleConf.mainAppResource.isDefined, "PySpark Main Resource must be 
defined")
+    val maybePythonArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map(
+      pyArgs =>
+        new EnvVarBuilder()
+          .withName(ENV_PYSPARK_ARGS)
+          .withValue(pyArgs.mkString(","))
+          .build())
+    val maybePythonFiles = kubernetesConf.pyFiles().map(
+      // Dilineation by ":" is to append the PySpark Files to the PYTHONPATH
+      // of the respective PySpark pod
+      pyFiles =>
+        new EnvVarBuilder()
+          .withName(ENV_PYSPARK_FILES)
+          .withValue(KubernetesUtils.resolveFileUrisAndPath(pyFiles.split(","))
+            .mkString(":"))
+          .build())
+    val envSeq =
+      Seq(new EnvVarBuilder()
+          .withName(ENV_PYSPARK_PRIMARY)
+          
.withValue(KubernetesUtils.resolveFileUri(kubernetesConf.pySparkMainResource().get))
+        .build(),
+          new EnvVarBuilder()
+          .withName(ENV_PYSPARK_MAJOR_PYTHON_VERSION)
+          .withValue(kubernetesConf.pySparkPythonVersion())
+        .build())
+    val pythonEnvs = envSeq ++
+      maybePythonArgs.toSeq ++
+      maybePythonFiles.toSeq
+
+    val withPythonPrimaryContainer = new ContainerBuilder(pod.container)
+        .addAllToEnv(pythonEnvs.asJava)
+        .addToArgs("driver-py")
+        .addToArgs("--properties-file", SPARK_CONF_PATH)
+        .addToArgs("--class", roleConf.mainClass)
+      .build()
+
+    SparkPod(pod.pod, withPythonPrimaryContainer)
+  }
+  override def getAdditionalPodSystemProperties(): Map[String, String] = 
Map.empty
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index a97f565..eaff472 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -39,11 +39,13 @@ import org.apache.spark.util.Utils
  * @param mainAppResource the main application resource if any
  * @param mainClass the main class of the application to run
  * @param driverArgs arguments to the driver
+ * @param maybePyFiles additional Python files via --py-files
  */
 private[spark] case class ClientArguments(
     mainAppResource: Option[MainAppResource],
     mainClass: String,
-    driverArgs: Array[String])
+    driverArgs: Array[String],
+    maybePyFiles: Option[String])
 
 private[spark] object ClientArguments {
 
@@ -51,10 +53,15 @@ private[spark] object ClientArguments {
     var mainAppResource: Option[MainAppResource] = None
     var mainClass: Option[String] = None
     val driverArgs = mutable.ArrayBuffer.empty[String]
+    var maybePyFiles : Option[String] = None
 
     args.sliding(2, 2).toList.foreach {
       case Array("--primary-java-resource", primaryJavaResource: String) =>
         mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+      case Array("--primary-py-file", primaryPythonResource: String) =>
+        mainAppResource = Some(PythonMainAppResource(primaryPythonResource))
+      case Array("--other-py-files", pyFiles: String) =>
+        maybePyFiles = Some(pyFiles)
       case Array("--main-class", clazz: String) =>
         mainClass = Some(clazz)
       case Array("--arg", arg: String) =>
@@ -69,7 +76,8 @@ private[spark] object ClientArguments {
     ClientArguments(
       mainAppResource,
       mainClass.get,
-      driverArgs.toArray)
+      driverArgs.toArray,
+      maybePyFiles)
   }
 }
 
@@ -206,6 +214,7 @@ private[spark] class KubernetesClientApplication extends 
SparkApplication {
     val kubernetesResourceNamePrefix = {
       s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
     }
+    sparkConf.set(KUBERNETES_PYSPARK_PY_FILES, 
clientArguments.maybePyFiles.getOrElse(""))
     val kubernetesConf = KubernetesConf.createDriverConf(
       sparkConf,
       appName,
@@ -213,7 +222,8 @@ private[spark] class KubernetesClientApplication extends 
SparkApplication {
       kubernetesAppId,
       clientArguments.mainAppResource,
       clientArguments.mainClass,
-      clientArguments.driverArgs)
+      clientArguments.driverArgs,
+      clientArguments.maybePyFiles)
     val builder = new KubernetesDriverBuilder
     val namespace = kubernetesConf.namespace()
     // The master URL has been checked for validity already in SparkSubmit.

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
index fdc5eb0..5762d82 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -17,7 +17,8 @@
 package org.apache.spark.deploy.k8s.submit
 
 import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, 
KubernetesDriverSpecificConf, KubernetesRoleSpecificConf}
-import org.apache.spark.deploy.k8s.features._
+import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, 
DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, 
EnvSecretsFeatureStep, KubernetesFeatureConfigStep, LocalDirsFeatureStep, 
MountSecretsFeatureStep}
+import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, 
PythonDriverFeatureStep}
 
 private[spark] class KubernetesDriverBuilder(
     provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => 
BasicDriverFeatureStep =
@@ -33,9 +34,17 @@ private[spark] class KubernetesDriverBuilder(
     provideEnvSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
       => EnvSecretsFeatureStep) =
     new EnvSecretsFeatureStep(_),
-    provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
-      => LocalDirsFeatureStep =
-      new LocalDirsFeatureStep(_)) {
+    provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
+      => LocalDirsFeatureStep) =
+      new LocalDirsFeatureStep(_),
+    provideJavaStep: (
+      KubernetesConf[KubernetesDriverSpecificConf]
+        => JavaDriverFeatureStep) =
+      new JavaDriverFeatureStep(_),
+    providePythonStep: (
+      KubernetesConf[KubernetesDriverSpecificConf]
+      => PythonDriverFeatureStep) =
+      new PythonDriverFeatureStep(_)) {
 
   def buildFromFeatures(
     kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): 
KubernetesDriverSpec = {
@@ -44,13 +53,23 @@ private[spark] class KubernetesDriverBuilder(
       provideCredentialsStep(kubernetesConf),
       provideServiceStep(kubernetesConf),
       provideLocalDirsStep(kubernetesConf))
-    var allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) 
{
-      baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
-    } else baseFeatures
 
-    allFeatures = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
-      allFeatures ++ Seq(provideEnvSecretsStep(kubernetesConf))
-    } else allFeatures
+    val maybeRoleSecretNamesStep = if 
(kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
+      Some(provideSecretsStep(kubernetesConf)) } else None
+
+    val maybeProvideSecretsStep = if 
(kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
+      Some(provideEnvSecretsStep(kubernetesConf)) } else None
+
+    val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.map {
+        case JavaMainAppResource(_) =>
+          provideJavaStep(kubernetesConf)
+        case PythonMainAppResource(_) =>
+          
providePythonStep(kubernetesConf)}.getOrElse(provideJavaStep(kubernetesConf))
+
+    val allFeatures: Seq[KubernetesFeatureConfigStep] =
+      (baseFeatures :+ bindingsStep) ++
+        maybeRoleSecretNamesStep.toSeq ++
+        maybeProvideSecretsStep.toSeq
 
     var spec = 
KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap)
     for (feature <- allFeatures) {

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
index cca9f46..cbe081a 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
@@ -18,4 +18,9 @@ package org.apache.spark.deploy.k8s.submit
 
 private[spark] sealed trait MainAppResource
 
+private[spark] sealed trait NonJVMResource
+
 private[spark] case class JavaMainAppResource(primaryResource: String) extends 
MainAppResource
+
+private[spark] case class PythonMainAppResource(primaryResource: String)
+  extends MainAppResource with NonJVMResource

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index d5e1de3..769a0a5 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.scheduler.cluster.k8s
 
 import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
-import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, 
EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
+import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, 
EnvSecretsFeatureStep, KubernetesFeatureConfigStep, LocalDirsFeatureStep, 
MountSecretsFeatureStep}
 
 private[spark] class KubernetesExecutorBuilder(
     provideBasicStep: (KubernetesConf[KubernetesExecutorSpecificConf]) => 
BasicExecutorFeatureStep =
@@ -34,14 +34,20 @@ private[spark] class KubernetesExecutorBuilder(
 
   def buildFromFeatures(
     kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod 
= {
-    val baseFeatures = Seq(provideBasicStep(kubernetesConf), 
provideLocalDirsStep(kubernetesConf))
-    var allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) 
{
-      baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
-    } else baseFeatures
+    val baseFeatures = Seq(
+      provideBasicStep(kubernetesConf),
+      provideLocalDirsStep(kubernetesConf))
 
-    allFeatures = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
-      allFeatures ++ Seq(provideEnvSecretsStep(kubernetesConf))
-    } else allFeatures
+    val maybeRoleSecretNamesStep = if 
(kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
+      Some(provideSecretsStep(kubernetesConf)) } else None
+
+    val maybeProvideSecretsStep = if 
(kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
+      Some(provideEnvSecretsStep(kubernetesConf)) } else None
+
+    val allFeatures: Seq[KubernetesFeatureConfigStep] =
+      baseFeatures ++
+      maybeRoleSecretNamesStep.toSeq ++
+      maybeProvideSecretsStep.toSeq
 
     var executorPod = SparkPod.initialPod()
     for (feature <- allFeatures) {

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
index 3d23e1c..661f942 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
@@ -22,7 +22,7 @@ import 
io.fabric8.kubernetes.api.model.{LocalObjectReferenceBuilder, PodBuilder}
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
+import org.apache.spark.deploy.k8s.submit._
 
 class KubernetesConfSuite extends SparkFunSuite {
 
@@ -56,9 +56,10 @@ class KubernetesConfSuite extends SparkFunSuite {
       APP_NAME,
       RESOURCE_NAME_PREFIX,
       APP_ID,
-      None,
+      mainAppResource = None,
       MAIN_CLASS,
-      APP_ARGS)
+      APP_ARGS,
+      maybePyFiles = None)
     assert(conf.appId === APP_ID)
     assert(conf.sparkConf.getAll.toMap === sparkConf.getAll.toMap)
     assert(conf.appResourceNamePrefix === RESOURCE_NAME_PREFIX)
@@ -79,7 +80,8 @@ class KubernetesConfSuite extends SparkFunSuite {
       APP_ID,
       mainAppJar,
       MAIN_CLASS,
-      APP_ARGS)
+      APP_ARGS,
+      maybePyFiles = None)
     assert(kubernetesConfWithMainJar.sparkConf.get("spark.jars")
       .split(",")
       === Array("local:///opt/spark/jar1.jar", "local:///opt/spark/main.jar"))
@@ -88,15 +90,59 @@ class KubernetesConfSuite extends SparkFunSuite {
       APP_NAME,
       RESOURCE_NAME_PREFIX,
       APP_ID,
-      None,
+      mainAppResource = None,
       MAIN_CLASS,
-      APP_ARGS)
+      APP_ARGS,
+      maybePyFiles = None)
     assert(kubernetesConfWithoutMainJar.sparkConf.get("spark.jars").split(",")
       === Array("local:///opt/spark/jar1.jar"))
+    assert(kubernetesConfWithoutMainJar.sparkConf.get(MEMORY_OVERHEAD_FACTOR) 
=== 0.1)
   }
 
-  test("Resolve driver labels, annotations, secret mount paths, and envs.") {
+  test("Creating driver conf with a python primary file") {
+    val mainResourceFile = "local:///opt/spark/main.py"
+    val inputPyFiles = Array("local:///opt/spark/example2.py", 
"local:///example3.py")
     val sparkConf = new SparkConf(false)
+      .setJars(Seq("local:///opt/spark/jar1.jar"))
+      .set("spark.files", "local:///opt/spark/example4.py")
+    val mainAppResource = Some(PythonMainAppResource(mainResourceFile))
+    val kubernetesConfWithMainResource = KubernetesConf.createDriverConf(
+      sparkConf,
+      APP_NAME,
+      RESOURCE_NAME_PREFIX,
+      APP_ID,
+      mainAppResource,
+      MAIN_CLASS,
+      APP_ARGS,
+      Some(inputPyFiles.mkString(",")))
+    
assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",")
+      === Array("local:///opt/spark/jar1.jar"))
+    
assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 
0.4)
+    assert(kubernetesConfWithMainResource.sparkFiles
+      === Array("local:///opt/spark/example4.py", mainResourceFile) ++ 
inputPyFiles)
+  }
+
+  test("Testing explicit setting of memory overhead on non-JVM tasks") {
+    val sparkConf = new SparkConf(false)
+      .set(MEMORY_OVERHEAD_FACTOR, 0.3)
+
+    val mainResourceFile = "local:///opt/spark/main.py"
+    val mainAppResource = Some(PythonMainAppResource(mainResourceFile))
+    val conf = KubernetesConf.createDriverConf(
+      sparkConf,
+      APP_NAME,
+      RESOURCE_NAME_PREFIX,
+      APP_ID,
+      mainAppResource,
+      MAIN_CLASS,
+      APP_ARGS,
+      None)
+    assert(conf.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.3)
+  }
+
+  test("Resolve driver labels, annotations, secret mount paths, envs, and 
memory overhead") {
+    val sparkConf = new SparkConf(false)
+      .set(MEMORY_OVERHEAD_FACTOR, 0.3)
     CUSTOM_LABELS.foreach { case (key, value) =>
       sparkConf.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$key", value)
     }
@@ -118,9 +164,10 @@ class KubernetesConfSuite extends SparkFunSuite {
       APP_NAME,
       RESOURCE_NAME_PREFIX,
       APP_ID,
-      None,
+      mainAppResource = None,
       MAIN_CLASS,
-      APP_ARGS)
+      APP_ARGS,
+      maybePyFiles = None)
     assert(conf.roleLabels === Map(
       SPARK_APP_ID_LABEL -> APP_ID,
       SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++
@@ -129,6 +176,7 @@ class KubernetesConfSuite extends SparkFunSuite {
     assert(conf.roleSecretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
     assert(conf.roleSecretEnvNamesToKeyRefs === SECRET_ENV_VARS)
     assert(conf.roleEnvs === CUSTOM_ENVS)
+    assert(conf.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.3)
   }
 
   test("Basic executor translated fields.") {

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index b2813d8..04b909d 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -24,6 +24,8 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
+import org.apache.spark.deploy.k8s.submit.PythonMainAppResource
 
 class BasicDriverFeatureStepSuite extends SparkFunSuite {
 
@@ -33,6 +35,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
   private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent"
   private val APP_NAME = "spark-test"
   private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
+  private val PY_MAIN_CLASS = "org.apache.spark.deploy.PythonRunner"
   private val APP_ARGS = Array("arg1", "arg2", "\"arg 3\"")
   private val CUSTOM_ANNOTATION_KEY = "customAnnotation"
   private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue"
@@ -60,7 +63,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     val kubernetesConf = KubernetesConf(
       sparkConf,
       KubernetesDriverSpecificConf(
-        None,
+        Some(JavaMainAppResource("")),
         APP_NAME,
         MAIN_CLASS,
         APP_ARGS),
@@ -70,7 +73,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       DRIVER_ANNOTATIONS,
       Map.empty,
       Map.empty,
-      DRIVER_ENVS)
+      DRIVER_ENVS,
+      Seq.empty[String])
 
     val featureStep = new BasicDriverFeatureStep(kubernetesConf)
     val basePod = SparkPod.initialPod()
@@ -110,7 +114,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     assert(driverPodMetadata.getLabels.asScala === DRIVER_LABELS)
     assert(driverPodMetadata.getAnnotations.asScala === DRIVER_ANNOTATIONS)
     assert(configuredPod.pod.getSpec.getRestartPolicy === "Never")
-
     val expectedSparkConf = Map(
       KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
       "spark.app.id" -> APP_ID,
@@ -119,6 +122,50 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     assert(featureStep.getAdditionalPodSystemProperties() === 
expectedSparkConf)
   }
 
+  test("Check appropriate entrypoint rerouting for various bindings") {
+    val javaSparkConf = new SparkConf()
+      .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "4g")
+      .set(CONTAINER_IMAGE, "spark-driver:latest")
+    val pythonSparkConf = new SparkConf()
+      .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "4g")
+      .set(CONTAINER_IMAGE, "spark-driver:latest")
+    val javaKubernetesConf = KubernetesConf(
+      javaSparkConf,
+      KubernetesDriverSpecificConf(
+        Some(JavaMainAppResource("")),
+        APP_NAME,
+        PY_MAIN_CLASS,
+        APP_ARGS),
+      RESOURCE_NAME_PREFIX,
+      APP_ID,
+      DRIVER_LABELS,
+      DRIVER_ANNOTATIONS,
+      Map.empty,
+      Map.empty,
+      DRIVER_ENVS,
+      Seq.empty[String])
+    val pythonKubernetesConf = KubernetesConf(
+      pythonSparkConf,
+      KubernetesDriverSpecificConf(
+        Some(PythonMainAppResource("")),
+        APP_NAME,
+        PY_MAIN_CLASS,
+        APP_ARGS),
+      RESOURCE_NAME_PREFIX,
+      APP_ID,
+      DRIVER_LABELS,
+      DRIVER_ANNOTATIONS,
+      Map.empty,
+      Map.empty,
+      DRIVER_ENVS,
+      Seq.empty[String])
+    val javaFeatureStep = new BasicDriverFeatureStep(javaKubernetesConf)
+    val pythonFeatureStep = new BasicDriverFeatureStep(pythonKubernetesConf)
+    val basePod = SparkPod.initialPod()
+    val configuredJavaPod = javaFeatureStep.configurePod(basePod)
+    val configuredPythonPod = pythonFeatureStep.configurePod(basePod)
+  }
+
   test("Additional system properties resolve jars and set cluster-mode 
confs.") {
     val allJars = Seq("local:///opt/spark/jar1.jar", 
"hdfs:///opt/spark/jar2.jar")
     val allFiles = Seq("https://localhost:9000/file1.txt";, 
"local:///opt/spark/file2.txt")
@@ -130,7 +177,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     val kubernetesConf = KubernetesConf(
       sparkConf,
       KubernetesDriverSpecificConf(
-        None,
+        Some(JavaMainAppResource("")),
         APP_NAME,
         MAIN_CLASS,
         APP_ARGS),
@@ -140,7 +187,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       DRIVER_ANNOTATIONS,
       Map.empty,
       Map.empty,
-      Map.empty)
+      DRIVER_ENVS,
+      allFiles)
     val step = new BasicDriverFeatureStep(kubernetesConf)
     val additionalProperties = step.getAdditionalPodSystemProperties()
     val expectedSparkConf = Map(

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index 9182134..f06030a 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -88,7 +88,8 @@ class BasicExecutorFeatureStepSuite
         ANNOTATIONS,
         Map.empty,
         Map.empty,
-        Map.empty))
+        Map.empty,
+        Seq.empty[String]))
     val executor = step.configurePod(SparkPod.initialPod())
 
     // The executor pod name and default labels.
@@ -126,7 +127,8 @@ class BasicExecutorFeatureStepSuite
         ANNOTATIONS,
         Map.empty,
         Map.empty,
-        Map.empty))
+        Map.empty,
+        Seq.empty[String]))
     
assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length 
=== 63)
   }
 
@@ -145,7 +147,8 @@ class BasicExecutorFeatureStepSuite
         ANNOTATIONS,
         Map.empty,
         Map.empty,
-        Map("qux" -> "quux")))
+        Map("qux" -> "quux"),
+        Seq.empty[String]))
     val executor = step.configurePod(SparkPod.initialPod())
 
     checkEnv(executor,

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
index f81894f..7cea835 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
@@ -60,7 +60,8 @@ class DriverKubernetesCredentialsFeatureStepSuite extends 
SparkFunSuite with Bef
       Map.empty,
       Map.empty,
       Map.empty,
-      Map.empty)
+      Map.empty,
+      Seq.empty[String])
     val kubernetesCredentialsStep = new 
DriverKubernetesCredentialsFeatureStep(kubernetesConf)
     assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === 
BASE_DRIVER_POD)
     
assert(kubernetesCredentialsStep.getAdditionalPodSystemProperties().isEmpty)
@@ -90,7 +91,8 @@ class DriverKubernetesCredentialsFeatureStepSuite extends 
SparkFunSuite with Bef
       Map.empty,
       Map.empty,
       Map.empty,
-      Map.empty)
+      Map.empty,
+      Seq.empty[String])
 
     val kubernetesCredentialsStep = new 
DriverKubernetesCredentialsFeatureStep(kubernetesConf)
     assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === 
BASE_DRIVER_POD)
@@ -127,7 +129,8 @@ class DriverKubernetesCredentialsFeatureStepSuite extends 
SparkFunSuite with Bef
       Map.empty,
       Map.empty,
       Map.empty,
-      Map.empty)
+      Map.empty,
+      Seq.empty[String])
     val kubernetesCredentialsStep = new 
DriverKubernetesCredentialsFeatureStep(kubernetesConf)
     val resolvedProperties = 
kubernetesCredentialsStep.getAdditionalPodSystemProperties()
     val expectedSparkConf = Map(

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
index f265522..77d38bf 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
@@ -66,7 +66,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
         Map.empty,
         Map.empty,
         Map.empty,
-        Map.empty))
+        Map.empty,
+        Seq.empty[String]))
     assert(configurationStep.configurePod(SparkPod.initialPod()) === 
SparkPod.initialPod())
     assert(configurationStep.getAdditionalKubernetesResources().size === 1)
     
assert(configurationStep.getAdditionalKubernetesResources().head.isInstanceOf[Service])
@@ -96,7 +97,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
         Map.empty,
         Map.empty,
         Map.empty,
-        Map.empty))
+        Map.empty,
+        Seq.empty[String]))
     val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
       DriverServiceFeatureStep.DRIVER_SVC_POSTFIX
     val expectedHostName = s"$expectedServiceName.my-namespace.svc"
@@ -116,7 +118,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
         Map.empty,
         Map.empty,
         Map.empty,
-        Map.empty))
+        Map.empty,
+        Seq.empty[String]))
     val resolvedService = configurationStep
       .getAdditionalKubernetesResources()
       .head
@@ -145,7 +148,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
         Map.empty,
         Map.empty,
         Map.empty,
-        Map.empty),
+        Map.empty,
+        Seq.empty[String]),
       clock)
     val driverService = configurationStep
       .getAdditionalKubernetesResources()
@@ -171,7 +175,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
           Map.empty,
           Map.empty,
           Map.empty,
-          Map.empty),
+          Map.empty,
+          Seq.empty[String]),
         clock)
       fail("The driver bind address should not be allowed.")
     } catch {
@@ -195,7 +200,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
           Map.empty,
           Map.empty,
           Map.empty,
-          Map.empty),
+          Map.empty,
+          Seq.empty[String]),
         clock)
       fail("The driver host address should not be allowed.")
     } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
index 8b0b2d0..af6b35e 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
@@ -44,7 +44,8 @@ class EnvSecretsFeatureStepSuite extends SparkFunSuite{
       Map.empty,
       Map.empty,
       envVarsToKeys,
-      Map.empty)
+      Map.empty,
+      Seq.empty[String])
 
     val step = new EnvSecretsFeatureStep(kubernetesConf)
     val driverContainerWithEnvSecrets = 
step.configurePod(baseDriverPod).container

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
index 2542a02..bd6ce4b 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
@@ -44,7 +44,8 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with 
BeforeAndAfter {
       Map.empty,
       Map.empty,
       Map.empty,
-      Map.empty)
+      Map.empty,
+      Seq.empty[String])
   }
 
   test("Resolve to default local dir if neither env nor configuration are 
set") {

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
index 9155793..eff75b8 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
@@ -42,7 +42,8 @@ class MountSecretsFeatureStepSuite extends SparkFunSuite {
       Map.empty,
       secretNamesToMountPaths,
       Map.empty,
-      Map.empty)
+      Map.empty,
+      Seq.empty[String])
 
     val step = new MountSecretsFeatureStep(kubernetesConf)
     val driverPodWithSecretsMounted = step.configurePod(baseDriverPod).pod

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
new file mode 100644
index 0000000..0f2bf2f
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features.bindings
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.PythonMainAppResource
+
+class JavaDriverFeatureStepSuite extends SparkFunSuite {
+
+  test("Java Step modifies container correctly") {
+    val baseDriverPod = SparkPod.initialPod()
+    val sparkConf = new SparkConf(false)
+    val kubernetesConf = KubernetesConf(
+      sparkConf,
+      KubernetesDriverSpecificConf(
+        Some(PythonMainAppResource("local:///main.jar")),
+        "test-class",
+        "java-runner",
+        Seq("5 7")),
+      appResourceNamePrefix = "",
+      appId = "",
+      roleLabels = Map.empty,
+      roleAnnotations = Map.empty,
+      roleSecretNamesToMountPaths = Map.empty,
+      roleSecretEnvNamesToKeyRefs = Map.empty,
+      roleEnvs = Map.empty,
+      sparkFiles = Seq.empty[String])
+
+    val step = new JavaDriverFeatureStep(kubernetesConf)
+    val driverPod = step.configurePod(baseDriverPod).pod
+    val driverContainerwithJavaStep = 
step.configurePod(baseDriverPod).container
+    assert(driverContainerwithJavaStep.getArgs.size === 7)
+    val args = driverContainerwithJavaStep
+      .getArgs.asScala
+    assert(args === List(
+      "driver",
+      "--properties-file", SPARK_CONF_PATH,
+      "--class", "test-class",
+      "spark-internal", "5 7"))
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala
new file mode 100644
index 0000000..a1f9a5d
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features.bindings
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.PythonMainAppResource
+
+class PythonDriverFeatureStepSuite extends SparkFunSuite {
+
+  test("Python Step modifies container correctly") {
+    val expectedMainResource = "/main.py"
+    val mainResource = "local:///main.py"
+    val pyFiles = Seq("local:///example2.py", "local:///example3.py")
+    val expectedPySparkFiles =
+      "/example2.py:/example3.py"
+    val baseDriverPod = SparkPod.initialPod()
+    val sparkConf = new SparkConf(false)
+      .set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, mainResource)
+      .set(KUBERNETES_PYSPARK_PY_FILES, pyFiles.mkString(","))
+      .set("spark.files", "local:///example.py")
+      .set(PYSPARK_MAJOR_PYTHON_VERSION, "2")
+    val kubernetesConf = KubernetesConf(
+      sparkConf,
+      KubernetesDriverSpecificConf(
+        Some(PythonMainAppResource("local:///main.py")),
+        "test-app",
+        "python-runner",
+        Seq("5 7")),
+      appResourceNamePrefix = "",
+      appId = "",
+      roleLabels = Map.empty,
+      roleAnnotations = Map.empty,
+      roleSecretNamesToMountPaths = Map.empty,
+      roleSecretEnvNamesToKeyRefs = Map.empty,
+      roleEnvs = Map.empty,
+      sparkFiles = Seq.empty[String])
+
+    val step = new PythonDriverFeatureStep(kubernetesConf)
+    val driverPod = step.configurePod(baseDriverPod).pod
+    val driverContainerwithPySpark = step.configurePod(baseDriverPod).container
+    assert(driverContainerwithPySpark.getEnv.size === 4)
+    val envs = driverContainerwithPySpark
+      .getEnv
+      .asScala
+      .map(env => (env.getName, env.getValue))
+      .toMap
+    assert(envs(ENV_PYSPARK_PRIMARY) === expectedMainResource)
+    assert(envs(ENV_PYSPARK_FILES) === expectedPySparkFiles)
+    assert(envs(ENV_PYSPARK_ARGS) === "5 7")
+    assert(envs(ENV_PYSPARK_MAJOR_PYTHON_VERSION) === "2")
+  }
+  test("Python Step testing empty pyfiles") {
+    val mainResource = "local:///main.py"
+    val baseDriverPod = SparkPod.initialPod()
+    val sparkConf = new SparkConf(false)
+      .set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, mainResource)
+      .set(PYSPARK_MAJOR_PYTHON_VERSION, "3")
+    val kubernetesConf = KubernetesConf(
+      sparkConf,
+      KubernetesDriverSpecificConf(
+        Some(PythonMainAppResource("local:///main.py")),
+        "test-class-py",
+        "python-runner",
+        Seq.empty[String]),
+      appResourceNamePrefix = "",
+      appId = "",
+      roleLabels = Map.empty,
+      roleAnnotations = Map.empty,
+      roleSecretNamesToMountPaths = Map.empty,
+      roleSecretEnvNamesToKeyRefs = Map.empty,
+      roleEnvs = Map.empty,
+      sparkFiles = Seq.empty[String])
+    val step = new PythonDriverFeatureStep(kubernetesConf)
+    val driverContainerwithPySpark = step.configurePod(baseDriverPod).container
+    val args = driverContainerwithPySpark
+      .getArgs.asScala
+    assert(driverContainerwithPySpark.getArgs.size === 5)
+    assert(args === List(
+      "driver-py",
+      "--properties-file", SPARK_CONF_PATH,
+      "--class", "test-class-py"))
+    val envs = driverContainerwithPySpark
+      .getEnv
+      .asScala
+      .map(env => (env.getName, env.getValue))
+      .toMap
+    assert(envs(ENV_PYSPARK_MAJOR_PYTHON_VERSION) === "3")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index 0775338..a8a8218 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -143,7 +143,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter 
{
       Map.empty,
       Map.empty,
       Map.empty,
-      Map.empty)
+      Map.empty,
+      Seq.empty[String])
     
when(driverBuilder.buildFromFeatures(kubernetesConf)).thenReturn(BUILT_KUBERNETES_SPEC)
     when(kubernetesClient.pods()).thenReturn(podOperations)
     when(podOperations.withName(POD_NAME)).thenReturn(namedPods)

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
index cb72406..4e8c300 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.submit
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, 
KubernetesDriverSpecificConf}
 import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, 
DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, 
EnvSecretsFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, 
MountSecretsFeatureStep}
+import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, 
PythonDriverFeatureStep}
 
 class KubernetesDriverBuilderSuite extends SparkFunSuite {
 
@@ -27,6 +28,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
   private val SERVICE_STEP_TYPE = "service"
   private val LOCAL_DIRS_STEP_TYPE = "local-dirs"
   private val SECRETS_STEP_TYPE = "mount-secrets"
+  private val JAVA_STEP_TYPE = "java-bindings"
+  private val PYSPARK_STEP_TYPE = "pyspark-bindings"
   private val ENV_SECRETS_STEP_TYPE = "env-secrets"
 
   private val basicFeatureStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
@@ -44,6 +47,12 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
   private val secretsStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
     SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep])
 
+  private val javaStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+    JAVA_STEP_TYPE, classOf[JavaDriverFeatureStep])
+
+  private val pythonStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+    PYSPARK_STEP_TYPE, classOf[PythonDriverFeatureStep])
+
   private val envSecretsStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
     ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep])
 
@@ -54,13 +63,15 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
       _ => serviceStep,
       _ => secretsStep,
       _ => envSecretsStep,
-      _ => localDirsStep)
+      _ => localDirsStep,
+      _ => javaStep,
+      _ => pythonStep)
 
   test("Apply fundamental steps all the time.") {
     val conf = KubernetesConf(
       new SparkConf(false),
       KubernetesDriverSpecificConf(
-        None,
+        Some(JavaMainAppResource("example.jar")),
         "test-app",
         "main",
         Seq.empty),
@@ -70,13 +81,15 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
       Map.empty,
       Map.empty,
       Map.empty,
-      Map.empty)
+      Map.empty,
+      Seq.empty[String])
     validateStepTypesApplied(
       builderUnderTest.buildFromFeatures(conf),
       BASIC_STEP_TYPE,
       CREDENTIALS_STEP_TYPE,
       SERVICE_STEP_TYPE,
-      LOCAL_DIRS_STEP_TYPE)
+      LOCAL_DIRS_STEP_TYPE,
+      JAVA_STEP_TYPE)
   }
 
   test("Apply secrets step if secrets are present.") {
@@ -93,7 +106,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
       Map.empty,
       Map("secret" -> "secretMountPath"),
       Map("EnvName" -> "SecretName:secretKey"),
-      Map.empty)
+      Map.empty,
+      Seq.empty[String])
     validateStepTypesApplied(
       builderUnderTest.buildFromFeatures(conf),
       BASIC_STEP_TYPE,
@@ -101,8 +115,58 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
       SERVICE_STEP_TYPE,
       LOCAL_DIRS_STEP_TYPE,
       SECRETS_STEP_TYPE,
-      ENV_SECRETS_STEP_TYPE
-    )
+      ENV_SECRETS_STEP_TYPE,
+      JAVA_STEP_TYPE)
+  }
+
+  test("Apply Java step if main resource is none.") {
+    val conf = KubernetesConf(
+      new SparkConf(false),
+      KubernetesDriverSpecificConf(
+        None,
+        "test-app",
+        "main",
+        Seq.empty),
+      "prefix",
+      "appId",
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Seq.empty[String])
+    validateStepTypesApplied(
+      builderUnderTest.buildFromFeatures(conf),
+      BASIC_STEP_TYPE,
+      CREDENTIALS_STEP_TYPE,
+      SERVICE_STEP_TYPE,
+      LOCAL_DIRS_STEP_TYPE,
+      JAVA_STEP_TYPE)
+  }
+
+  test("Apply Python step if main resource is python.") {
+    val conf = KubernetesConf(
+      new SparkConf(false),
+      KubernetesDriverSpecificConf(
+        Some(PythonMainAppResource("example.py")),
+        "test-app",
+        "main",
+        Seq.empty),
+      "prefix",
+      "appId",
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Seq.empty[String])
+    validateStepTypesApplied(
+      builderUnderTest.buildFromFeatures(conf),
+      BASIC_STEP_TYPE,
+      CREDENTIALS_STEP_TYPE,
+      SERVICE_STEP_TYPE,
+      LOCAL_DIRS_STEP_TYPE,
+      PYSPARK_STEP_TYPE)
   }
 
   private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, 
stepTypes: String*)

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
index 753cd30..a6bc8bc 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
@@ -54,7 +54,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
       Map.empty,
       Map.empty,
       Map.empty,
-      Map.empty)
+      Map.empty,
+      Seq.empty[String])
     validateStepTypesApplied(
       builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, 
LOCAL_DIRS_STEP_TYPE)
   }
@@ -70,7 +71,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
       Map.empty,
       Map("secret" -> "secretMountPath"),
       Map("secret-name" -> "secret-key"),
-      Map.empty)
+      Map.empty,
+      Seq.empty[String])
     validateStepTypesApplied(
       builderUnderTest.buildFromFeatures(conf),
       BASIC_STEP_TYPE,

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile
 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile
new file mode 100644
index 0000000..72bb962
--- /dev/null
+++ 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+ARG base_img
+FROM $base_img
+WORKDIR /
+RUN mkdir ${SPARK_HOME}/python
+COPY python/lib ${SPARK_HOME}/python/lib
+# TODO: Investigate running both pip and pip3 via virtualenvs
+RUN apk add --no-cache python && \
+    apk add --no-cache python3 && \
+    python -m ensurepip && \
+    python3 -m ensurepip && \
+    # We remove ensurepip since it adds no functionality since pip is
+    # installed on the image and it just takes up 1.6MB on the image
+    rm -r /usr/lib/python*/ensurepip && \
+    pip install --upgrade pip setuptools && \
+    # You may install with python3 packages by using pip3.6
+    # Removed the .cache to save space
+    rm -r /root/.cache
+
+ENV PYTHONPATH 
${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-*.zip
+
+WORKDIR /opt/spark/work-dir
+ENTRYPOINT [ "/opt/entrypoint.sh" ]

http://git-wip-us.apache.org/repos/asf/spark/blob/1a644afb/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index 3e16611..acdb4b1 100755
--- 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++ 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
@@ -53,6 +53,28 @@ if [ -n "$SPARK_MOUNTED_FILES_DIR" ]; then
   cp -R "$SPARK_MOUNTED_FILES_DIR/." .
 fi
 
+if [ -n "$PYSPARK_FILES" ]; then
+    PYTHONPATH="$PYTHONPATH:$PYSPARK_FILES"
+fi
+
+PYSPARK_ARGS=""
+if [ -n "$PYSPARK_APP_ARGS" ]; then
+    PYSPARK_ARGS="$PYSPARK_APP_ARGS"
+fi
+
+
+if [ "$PYSPARK_MAJOR_PYTHON_VERSION" == "2" ]; then
+    pyv="$(python -V 2>&1)"
+    export PYTHON_VERSION="${pyv:7}"
+    export PYSPARK_PYTHON="python"
+    export PYSPARK_DRIVER_PYTHON="python"
+elif [ "$PYSPARK_MAJOR_PYTHON_VERSION" == "3" ]; then
+    pyv3="$(python3 -V 2>&1)"
+    export PYTHON_VERSION="${pyv3:7}"
+    export PYSPARK_PYTHON="python3"
+    export PYSPARK_DRIVER_PYTHON="python3"
+fi
+
 case "$SPARK_K8S_CMD" in
   driver)
     CMD=(
@@ -62,6 +84,14 @@ case "$SPARK_K8S_CMD" in
       "$@"
     )
     ;;
+  driver-py)
+    CMD=(
+      "$SPARK_HOME/bin/spark-submit"
+      --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS"
+      --deploy-mode client
+      "$@" $PYSPARK_PRIMARY $PYSPARK_ARGS
+    )
+    ;;
 
   executor)
     CMD=(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to