Repository: spark
Updated Branches:
  refs/heads/master da2dc6929 -> ba84bcb2c


[SPARK-24433][K8S] Initial R Bindings for SparkR on K8s

## What changes were proposed in this pull request?

Introducing R Bindings for Spark R on K8s

- [x] Running SparkR Job

## How was this patch tested?

This patch was tested with

- [x] Unit Tests
- [x] Integration Tests

## Example:

Commands to run example spark job:
1. `dev/make-distribution.sh --pip --r --tgz -Psparkr -Phadoop-2.7 -Pkubernetes`
2. `bin/docker-image-tool.sh -m -t testing build`
3.
```
bin/spark-submit \
    --master k8s://https://192.168.64.33:8443 \
    --deploy-mode cluster \
    --name spark-r \
    --conf spark.executor.instances=1 \
    --conf spark.kubernetes.container.image=spark-r:testing \
    local:///opt/spark/examples/src/main/r/dataframe.R
```

This above spark-submit command works given the distribution. (Will include 
this integration test in PR once PRB is ready).

Author: Ilan Filonenko <i...@cornell.edu>

Closes #21584 from ifilonenko/spark-r.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba84bcb2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba84bcb2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba84bcb2

Branch: refs/heads/master
Commit: ba84bcb2c4f73baf63782ff6fad5a607008c7cd2
Parents: da2dc69
Author: Ilan Filonenko <i...@cornell.edu>
Authored: Fri Aug 17 16:04:02 2018 -0700
Committer: mcheah <mch...@palantir.com>
Committed: Fri Aug 17 16:04:02 2018 -0700

----------------------------------------------------------------------
 bin/docker-image-tool.sh                        | 23 ++++---
 .../org/apache/spark/deploy/SparkSubmit.scala   |  8 ++-
 .../org/apache/spark/deploy/k8s/Config.scala    | 13 ++++
 .../org/apache/spark/deploy/k8s/Constants.scala |  2 +
 .../spark/deploy/k8s/KubernetesConf.scala       |  8 ++-
 .../features/bindings/RDriverFeatureStep.scala  | 59 ++++++++++++++++++
 .../submit/KubernetesClientApplication.scala    |  2 +
 .../k8s/submit/KubernetesDriverBuilder.scala    | 22 ++++---
 .../deploy/k8s/submit/MainAppResource.scala     |  3 +
 .../spark/deploy/k8s/KubernetesConfSuite.scala  | 22 +++++++
 .../bindings/RDriverFeatureStepSuite.scala      | 63 ++++++++++++++++++++
 .../submit/KubernetesDriverBuilderSuite.scala   | 36 ++++++++++-
 .../dockerfiles/spark/bindings/R/Dockerfile     | 29 +++++++++
 .../src/main/dockerfiles/spark/entrypoint.sh    | 14 ++++-
 .../integrationtest/ClientModeTestsSuite.scala  |  2 +-
 .../k8s/integrationtest/KubernetesSuite.scala   | 21 ++++++-
 .../k8s/integrationtest/RTestsSuite.scala       | 44 ++++++++++++++
 17 files changed, 344 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/bin/docker-image-tool.sh
----------------------------------------------------------------------
diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh
index cd22e75..d637105 100755
--- a/bin/docker-image-tool.sh
+++ b/bin/docker-image-tool.sh
@@ -71,6 +71,7 @@ function build {
   )
   local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
   local 
PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/spark/bindings/python/Dockerfile"}
+  local RDOCKERFILE=${RDOCKERFILE:-"$IMG_PATH/spark/bindings/R/Dockerfile"}
 
   docker build $NOCACHEARG "${BUILD_ARGS[@]}" \
     -t $(image_ref spark) \
@@ -79,11 +80,16 @@ function build {
   docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \
     -t $(image_ref spark-py) \
     -f "$PYDOCKERFILE" .
+
+  docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \
+    -t $(image_ref spark-r) \
+    -f "$RDOCKERFILE" .
 }
 
 function push {
   docker push "$(image_ref spark)"
   docker push "$(image_ref spark-py)"
+  docker push "$(image_ref spark-r)"
 }
 
 function usage {
@@ -97,12 +103,13 @@ Commands:
   push        Push a pre-built image to a registry. Requires a repository 
address to be provided.
 
 Options:
-  -f file     Dockerfile to build for JVM based Jobs. By default builds the 
Dockerfile shipped with Spark.
-  -p file     Dockerfile with Python baked in. By default builds the 
Dockerfile shipped with Spark.
-  -r repo     Repository address.
-  -t tag      Tag to apply to the built image, or to identify the image to be 
pushed.
-  -m          Use minikube's Docker daemon.
-  -n          Build docker image with --no-cache
+  -f file               Dockerfile to build for JVM based Jobs. By default 
builds the Dockerfile shipped with Spark.
+  -p file               Dockerfile to build for PySpark Jobs. Builds Python 
dependencies and ships with Spark.
+  -R file               Dockerfile to build for SparkR Jobs. Builds R 
dependencies and ships with Spark.
+  -r repo               Repository address.
+  -t tag                Tag to apply to the built image, or to identify the 
image to be pushed.
+  -m                    Use minikube's Docker daemon.
+  -n                    Build docker image with --no-cache
   -b arg      Build arg to build or push the image. For multiple build args, 
this option needs to
               be used separately for each build arg.
 
@@ -133,14 +140,16 @@ REPO=
 TAG=
 BASEDOCKERFILE=
 PYDOCKERFILE=
+RDOCKERFILE=
 NOCACHEARG=
 BUILD_PARAMS=
-while getopts f:p:mr:t:n:b: option
+while getopts f:p:R:mr:t:n:b: option
 do
  case "${option}"
  in
  f) BASEDOCKERFILE=${OPTARG};;
  p) PYDOCKERFILE=${OPTARG};;
+ R) RDOCKERFILE=${OPTARG};;
  r) REPO=${OPTARG};;
  t) TAG=${OPTARG};;
  n) NOCACHEARG="--no-cache";;

http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 6e70bcd..cf902db 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -286,8 +286,6 @@ private[spark] class SparkSubmit extends Logging {
       case (STANDALONE, CLUSTER) if args.isR =>
         error("Cluster deploy mode is currently not supported for R " +
           "applications on standalone clusters.")
-      case (KUBERNETES, _) if args.isR =>
-        error("R applications are currently not supported for Kubernetes.")
       case (LOCAL, CLUSTER) =>
         error("Cluster deploy mode is not compatible with master \"local\"")
       case (_, CLUSTER) if isShell(args.primaryResource) =>
@@ -700,7 +698,11 @@ private[spark] class SparkSubmit extends Logging {
           if (args.pyFiles != null) {
             childArgs ++= Array("--other-py-files", args.pyFiles)
           }
-        } else {
+        } else if (args.isR) {
+          childArgs ++= Array("--primary-r-file", args.primaryResource)
+          childArgs ++= Array("--main-class", 
"org.apache.spark.deploy.RRunner")
+        }
+        else {
           childArgs ++= Array("--primary-java-resource", args.primaryResource)
           childArgs ++= Array("--main-class", args.mainClass)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 4442333..1b582fe 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -139,6 +139,19 @@ private[spark] object Config extends Logging {
       .stringConf
       .createOptional
 
+  val KUBERNETES_R_MAIN_APP_RESOURCE =
+    ConfigBuilder("spark.kubernetes.r.mainAppResource")
+      .doc("The main app resource for SparkR jobs")
+      .internal()
+      .stringConf
+      .createOptional
+
+  val KUBERNETES_R_APP_ARGS =
+    ConfigBuilder("spark.kubernetes.r.appArgs")
+      .doc("The app arguments for SparkR Jobs")
+      .internal()
+      .stringConf
+      .createOptional
 
   val KUBERNETES_ALLOCATION_BATCH_SIZE =
     ConfigBuilder("spark.kubernetes.allocation.batch.size")

http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index f82cd7f..8202d87 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -71,6 +71,8 @@ private[spark] object Constants {
   val ENV_PYSPARK_FILES = "PYSPARK_FILES"
   val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS"
   val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION"
+  val ENV_R_PRIMARY = "R_PRIMARY"
+  val ENV_R_ARGS = "R_APP_ARGS"
 
   // Miscellaneous
   val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc";

http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index 866ba3c..3aa35d4 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -78,6 +78,9 @@ private[spark] case class KubernetesConf[T <: 
KubernetesRoleSpecificConf](
   def pySparkPythonVersion(): String = sparkConf
       .get(PYSPARK_MAJOR_PYTHON_VERSION)
 
+  def sparkRMainResource(): Option[String] = sparkConf
+    .get(KUBERNETES_R_MAIN_APP_RESOURCE)
+
   def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
 
   def imagePullSecrets(): Seq[LocalObjectReference] = {
@@ -125,7 +128,7 @@ private[spark] object KubernetesConf {
             sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
           }
         // The function of this outer match is to account for multiple nonJVM
-        // bindings that will all have increased MEMORY_OVERHEAD_FACTOR to 0.4
+        // bindings that will all have increased default 
MEMORY_OVERHEAD_FACTOR to 0.4
         case nonJVM: NonJVMResource =>
           nonJVM match {
             case PythonMainAppResource(res) =>
@@ -133,6 +136,9 @@ private[spark] object KubernetesConf {
               maybePyFiles.foreach{maybePyFiles =>
                 additionalFiles.appendAll(maybePyFiles.split(","))}
               
sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res)
+            case RMainAppResource(res) =>
+              additionalFiles += res
+              sparkConfWithMainAppJar.set(KUBERNETES_R_MAIN_APP_RESOURCE, res)
           }
           sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
new file mode 100644
index 0000000..b33b86e
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features.bindings
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
HasMetadata}
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
+
+private[spark] class RDriverFeatureStep(
+  kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf])
+  extends KubernetesFeatureConfigStep {
+  override def configurePod(pod: SparkPod): SparkPod = {
+    val roleConf = kubernetesConf.roleSpecificConf
+    require(roleConf.mainAppResource.isDefined, "R Main Resource must be 
defined")
+    val maybeRArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map(
+      rArgs =>
+        new EnvVarBuilder()
+          .withName(ENV_R_ARGS)
+          .withValue(rArgs.mkString(","))
+          .build())
+    val envSeq =
+      Seq(new EnvVarBuilder()
+            .withName(ENV_R_PRIMARY)
+            
.withValue(KubernetesUtils.resolveFileUri(kubernetesConf.sparkRMainResource().get))
+          .build())
+    val rEnvs = envSeq ++
+      maybeRArgs.toSeq
+
+    val withRPrimaryContainer = new ContainerBuilder(pod.container)
+        .addAllToEnv(rEnvs.asJava)
+        .addToArgs("driver-r")
+        .addToArgs("--properties-file", SPARK_CONF_PATH)
+        .addToArgs("--class", roleConf.mainClass)
+      .build()
+
+    SparkPod(pod.pod, withRPrimaryContainer)
+  }
+  override def getAdditionalPodSystemProperties(): Map[String, String] = 
Map.empty
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 9398fae..986c950 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -60,6 +60,8 @@ private[spark] object ClientArguments {
         mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
       case Array("--primary-py-file", primaryPythonResource: String) =>
         mainAppResource = Some(PythonMainAppResource(primaryPythonResource))
+      case Array("--primary-r-file", primaryRFile: String) =>
+        mainAppResource = Some(RMainAppResource(primaryRFile))
       case Array("--other-py-files", pyFiles: String) =>
         maybePyFiles = Some(pyFiles)
       case Array("--main-class", clazz: String) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
index 7208e3d..8f3f18f 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -17,8 +17,8 @@
 package org.apache.spark.deploy.k8s.submit
 
 import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, 
KubernetesDriverSpecificConf, KubernetesRoleSpecificConf}
-import org.apache.spark.deploy.k8s.features._
-import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, 
PythonDriverFeatureStep}
+import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, 
DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, 
EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep, 
MountVolumesFeatureStep}
+import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, 
PythonDriverFeatureStep, RDriverFeatureStep}
 
 private[spark] class KubernetesDriverBuilder(
     provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => 
BasicDriverFeatureStep =
@@ -40,14 +40,18 @@ private[spark] class KubernetesDriverBuilder(
     provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
       => MountVolumesFeatureStep) =
       new MountVolumesFeatureStep(_),
-    provideJavaStep: (
-      KubernetesConf[KubernetesDriverSpecificConf]
-        => JavaDriverFeatureStep) =
-      new JavaDriverFeatureStep(_),
     providePythonStep: (
       KubernetesConf[KubernetesDriverSpecificConf]
       => PythonDriverFeatureStep) =
-      new PythonDriverFeatureStep(_)) {
+      new PythonDriverFeatureStep(_),
+    provideRStep: (
+      KubernetesConf[KubernetesDriverSpecificConf]
+        => RDriverFeatureStep) =
+    new RDriverFeatureStep(_),
+    provideJavaStep: (
+      KubernetesConf[KubernetesDriverSpecificConf]
+        => JavaDriverFeatureStep) =
+    new JavaDriverFeatureStep(_)) {
 
   def buildFromFeatures(
     kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): 
KubernetesDriverSpec = {
@@ -71,7 +75,9 @@ private[spark] class KubernetesDriverBuilder(
         case JavaMainAppResource(_) =>
           provideJavaStep(kubernetesConf)
         case PythonMainAppResource(_) =>
-          providePythonStep(kubernetesConf)}
+          providePythonStep(kubernetesConf)
+        case RMainAppResource(_) =>
+          provideRStep(kubernetesConf)}
       .getOrElse(provideJavaStep(kubernetesConf))
 
     val allFeatures = (baseFeatures :+ bindingsStep) ++

http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
index cbe081a..dd5a454 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
@@ -24,3 +24,6 @@ private[spark] case class 
JavaMainAppResource(primaryResource: String) extends M
 
 private[spark] case class PythonMainAppResource(primaryResource: String)
   extends MainAppResource with NonJVMResource
+
+private[spark] case class RMainAppResource(primaryResource: String)
+  extends MainAppResource with NonJVMResource

http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
index ecdb713..e3c19cd 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
@@ -122,6 +122,28 @@ class KubernetesConfSuite extends SparkFunSuite {
       === Array("local:///opt/spark/example4.py", mainResourceFile) ++ 
inputPyFiles)
   }
 
+  test("Creating driver conf with a r primary file") {
+    val mainResourceFile = "local:///opt/spark/main.R"
+    val sparkConf = new SparkConf(false)
+      .setJars(Seq("local:///opt/spark/jar1.jar"))
+      .set("spark.files", "local:///opt/spark/example2.R")
+    val mainAppResource = Some(RMainAppResource(mainResourceFile))
+    val kubernetesConfWithMainResource = KubernetesConf.createDriverConf(
+      sparkConf,
+      APP_NAME,
+      RESOURCE_NAME_PREFIX,
+      APP_ID,
+      mainAppResource,
+      MAIN_CLASS,
+      APP_ARGS,
+      maybePyFiles = None)
+    
assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",")
+      === Array("local:///opt/spark/jar1.jar"))
+    
assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 
0.4)
+    assert(kubernetesConfWithMainResource.sparkFiles
+      === Array("local:///opt/spark/example2.R", mainResourceFile))
+  }
+
   test("Testing explicit setting of memory overhead on non-JVM tasks") {
     val sparkConf = new SparkConf(false)
       .set(MEMORY_OVERHEAD_FACTOR, 0.3)

http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala
new file mode 100644
index 0000000..8fdf91e
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features.bindings
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.RMainAppResource
+
+class RDriverFeatureStepSuite extends SparkFunSuite {
+
+  test("R Step modifies container correctly") {
+    val expectedMainResource = "/main.R"
+    val mainResource = "local:///main.R"
+    val baseDriverPod = SparkPod.initialPod()
+    val sparkConf = new SparkConf(false)
+      .set(KUBERNETES_R_MAIN_APP_RESOURCE, mainResource)
+    val kubernetesConf = KubernetesConf(
+      sparkConf,
+      KubernetesDriverSpecificConf(
+        Some(RMainAppResource(mainResource)),
+        "test-app",
+        "r-runner",
+        Seq("5 7")),
+      appResourceNamePrefix = "",
+      appId = "",
+      roleLabels = Map.empty,
+      roleAnnotations = Map.empty,
+      roleSecretNamesToMountPaths = Map.empty,
+      roleSecretEnvNamesToKeyRefs = Map.empty,
+      roleEnvs = Map.empty,
+      roleVolumes = Seq.empty,
+      sparkFiles = Seq.empty[String])
+
+    val step = new RDriverFeatureStep(kubernetesConf)
+    val driverContainerwithR = step.configurePod(baseDriverPod).container
+    assert(driverContainerwithR.getEnv.size === 2)
+    val envs = driverContainerwithR
+      .getEnv
+      .asScala
+      .map(env => (env.getName, env.getValue))
+      .toMap
+    assert(envs(ENV_R_PRIMARY) === expectedMainResource)
+    assert(envs(ENV_R_ARGS) === "5 7")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
index 046e578..4117c54 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
@@ -20,7 +20,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.features._
 import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, 
DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, 
EnvSecretsFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, 
MountSecretsFeatureStep}
-import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, 
PythonDriverFeatureStep}
+import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, 
PythonDriverFeatureStep, RDriverFeatureStep}
 
 class KubernetesDriverBuilderSuite extends SparkFunSuite {
 
@@ -31,6 +31,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
   private val SECRETS_STEP_TYPE = "mount-secrets"
   private val JAVA_STEP_TYPE = "java-bindings"
   private val PYSPARK_STEP_TYPE = "pyspark-bindings"
+  private val R_STEP_TYPE = "r-bindings"
   private val ENV_SECRETS_STEP_TYPE = "env-secrets"
   private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes"
 
@@ -55,6 +56,9 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
   private val pythonStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
     PYSPARK_STEP_TYPE, classOf[PythonDriverFeatureStep])
 
+  private val rStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+    R_STEP_TYPE, classOf[RDriverFeatureStep])
+
   private val envSecretsStep = 
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
     ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep])
 
@@ -70,8 +74,9 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
       _ => envSecretsStep,
       _ => localDirsStep,
       _ => mountVolumesStep,
-      _ => javaStep,
-      _ => pythonStep)
+      _ => pythonStep,
+      _ => rStep,
+      _ => javaStep)
 
   test("Apply fundamental steps all the time.") {
     val conf = KubernetesConf(
@@ -211,6 +216,31 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
       JAVA_STEP_TYPE)
   }
 
+  test("Apply R step if main resource is R.") {
+    val conf = KubernetesConf(
+      new SparkConf(false),
+      KubernetesDriverSpecificConf(
+        Some(RMainAppResource("example.R")),
+        "test-app",
+        "main",
+        Seq.empty),
+      "prefix",
+      "appId",
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Nil,
+      Seq.empty[String])
+    validateStepTypesApplied(
+      builderUnderTest.buildFromFeatures(conf),
+      BASIC_STEP_TYPE,
+      CREDENTIALS_STEP_TYPE,
+      SERVICE_STEP_TYPE,
+      LOCAL_DIRS_STEP_TYPE,
+      R_STEP_TYPE)
+  }
 
   private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, 
stepTypes: String*)
     : Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/R/Dockerfile
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/R/Dockerfile
 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/R/Dockerfile
new file mode 100644
index 0000000..e627883
--- /dev/null
+++ 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/R/Dockerfile
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+ARG base_img
+FROM $base_img
+WORKDIR /
+RUN mkdir ${SPARK_HOME}/R
+COPY R ${SPARK_HOME}/R
+
+RUN apk add --no-cache R R-dev
+
+ENV R_HOME /usr/lib/R
+
+WORKDIR /opt/spark/work-dir
+ENTRYPOINT [ "/opt/entrypoint.sh" ]

http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index 8bdb0f7..216e8fe 100755
--- 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++ 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
@@ -38,7 +38,7 @@ fi
 
 SPARK_K8S_CMD="$1"
 case "$SPARK_K8S_CMD" in
-    driver | driver-py | executor)
+    driver | driver-py | driver-r | executor)
       shift 1
       ;;
     "")
@@ -66,6 +66,10 @@ if [ -n "$PYSPARK_APP_ARGS" ]; then
     PYSPARK_ARGS="$PYSPARK_APP_ARGS"
 fi
 
+R_ARGS=""
+if [ -n "$R_APP_ARGS" ]; then
+    R_ARGS="$R_APP_ARGS"
+fi
 
 if [ "$PYSPARK_MAJOR_PYTHON_VERSION" == "2" ]; then
     pyv="$(python -V 2>&1)"
@@ -96,6 +100,14 @@ case "$SPARK_K8S_CMD" in
       "$@" $PYSPARK_PRIMARY $PYSPARK_ARGS
     )
     ;;
+    driver-r)
+    CMD=(
+      "$SPARK_HOME/bin/spark-submit"
+      --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS"
+      --deploy-mode client
+      "$@" $R_PRIMARY $R_ARGS
+    )
+    ;;
   executor)
     CMD=(
       ${JAVA_HOME}/bin/java

http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
index 159cfd9..c8bd584 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
 
 import 
org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{k8sTestTag, 
INTERVAL, TIMEOUT}
 
-trait ClientModeTestsSuite { k8sSuite: KubernetesSuite =>
+private[spark] trait ClientModeTestsSuite { k8sSuite: KubernetesSuite =>
 
   test("Run in client mode.", k8sTestTag) {
     val labels = Map("spark-app-selector" -> driverPodName)

http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 13ce2ef..896a83a 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -38,10 +38,12 @@ private[spark] class KubernetesSuite extends SparkFunSuite
 
   import KubernetesSuite._
 
-  protected var testBackend: IntegrationTestBackend = _
-  protected var sparkHomeDir: Path = _
+  private var sparkHomeDir: Path = _
+  private var pyImage: String = _
+  private var rImage: String = _
+
   protected var image: String = _
-  protected var pyImage: String = _
+  protected var testBackend: IntegrationTestBackend = _
   protected var driverPodName: String = _
   protected var kubernetesTestComponents: KubernetesTestComponents = _
   protected var sparkAppConf: SparkAppConf = _
@@ -67,6 +69,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite
     val imageRepo = getTestImageRepo
     image = s"$imageRepo/spark:$imageTag"
     pyImage = s"$imageRepo/spark-py:$imageTag"
+    rImage = s"$imageRepo/spark-r:$imageTag"
 
     val sparkDistroExamplesJarFile: File = 
sparkHomeDir.resolve(Paths.get("examples", "jars"))
       .toFile
@@ -239,6 +242,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite
     assert(driverPod.getSpec.getContainers.get(0).getName === 
"spark-kubernetes-driver")
   }
 
+  protected def doBasicDriverRPodCheck(driverPod: Pod): Unit = {
+    assert(driverPod.getMetadata.getName === driverPodName)
+    assert(driverPod.getSpec.getContainers.get(0).getImage === rImage)
+    assert(driverPod.getSpec.getContainers.get(0).getName === 
"spark-kubernetes-driver")
+  }
+
+
   protected def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
     assert(executorPod.getSpec.getContainers.get(0).getImage === image)
     assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
@@ -249,6 +259,11 @@ private[spark] class KubernetesSuite extends SparkFunSuite
     assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
   }
 
+  protected def doBasicExecutorRPodCheck(executorPod: Pod): Unit = {
+    assert(executorPod.getSpec.getContainers.get(0).getImage === rImage)
+    assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
+  }
+
   protected def checkCustomSettings(pod: Pod): Unit = {
     assert(pod.getMetadata.getLabels.get("label1") === "label1-value")
     assert(pod.getMetadata.getLabels.get("label2") === "label2-value")

http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/RTestsSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/RTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/RTestsSuite.scala
new file mode 100644
index 0000000..885a23c
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/RTestsSuite.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import 
org.apache.spark.deploy.k8s.integrationtest.TestConfig.{getTestImageRepo, 
getTestImageTag}
+
+private[spark] trait RTestsSuite { k8sSuite: KubernetesSuite =>
+
+  import RTestsSuite._
+  import KubernetesSuite.k8sTestTag
+
+  test("Run SparkR on simple dataframe.R example", k8sTestTag) {
+    sparkAppConf
+      .set("spark.kubernetes.container.image", 
s"${getTestImageRepo}/spark-r:${getTestImageTag}")
+    runSparkApplicationAndVerifyCompletion(
+      appResource = SPARK_R_DATAFRAME_TEST,
+      mainClass = "",
+      expectedLogOnCompletion = Seq("name: string (nullable = true)", "1 
Justin"),
+      appArgs = Array.empty[String],
+      driverPodChecker = doBasicDriverRPodCheck,
+      executorPodChecker = doBasicExecutorRPodCheck,
+      appLocator = appLocator,
+      isJVM = false)
+  }
+}
+
+private[spark] object RTestsSuite {
+  val CONTAINER_LOCAL_SPARKR: String = 
"local:///opt/spark/examples/src/main/r/"
+  val SPARK_R_DATAFRAME_TEST: String = CONTAINER_LOCAL_SPARKR + "dataframe.R"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to