Repository: spark
Updated Branches:
  refs/heads/branch-1.5 02a6333d2 -> 11d231159


[SPARK-8313] R Spark packages support

shivaram cafreeman Could you please help me in testing this out? Exposing and 
running `rPackageBuilder` from inside the shell works, but for some reason, I 
can't get it to work during Spark Submit. It just starts relaunching Spark 
Submit.

For testing, you may use the R branch with 
[sbt-spark-package](https://github.com/databricks/sbt-spark-package). You can 
call spPackage, and then pass the jar using `--jars`.

Author: Burak Yavuz <brk...@gmail.com>

Closes #7139 from brkyvz/r-submit and squashes the following commits:

0de384f [Burak Yavuz] remove unused imports 2
d253708 [Burak Yavuz] removed unused imports
6603d0d [Burak Yavuz] addressed comments
4258ffe [Burak Yavuz] merged master
ddfcc06 [Burak Yavuz] added zipping test
3a1be7d [Burak Yavuz] don't zip
77995df [Burak Yavuz] fix URI
ac45527 [Burak Yavuz] added zipping of all libs
e6bf7b0 [Burak Yavuz] add println ignores
1bc5554 [Burak Yavuz] add assumes for tests
9778e03 [Burak Yavuz] addressed comments
b42b300 [Burak Yavuz] merged master
ffd134e [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into 
r-submit
d867756 [Burak Yavuz] add apache header
eff5ba1 [Burak Yavuz] ready for review
8838edb [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into 
r-submit
e5b5a06 [Burak Yavuz] added doc
bb751ce [Burak Yavuz] fix null bug
0226768 [Burak Yavuz] fixed issues
8810beb [Burak Yavuz] R packages support

(cherry picked from commit c9a4c36d052456c2dd1f7e0a871c6b764b5064d2)
Signed-off-by: Shivaram Venkataraman <shiva...@cs.berkeley.edu>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/11d23115
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/11d23115
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/11d23115

Branch: refs/heads/branch-1.5
Commit: 11d2311593587a52ee5015fb0ffd6403ea1138b0
Parents: 02a6333
Author: Burak Yavuz <brk...@gmail.com>
Authored: Tue Aug 4 18:20:12 2015 -0700
Committer: Shivaram Venkataraman <shiva...@cs.berkeley.edu>
Committed: Tue Aug 4 18:20:20 2015 -0700

----------------------------------------------------------------------
 R/install-dev.sh                                |   4 -
 R/pkg/inst/tests/packageInAJarTest.R            |  30 +++
 .../scala/org/apache/spark/api/r/RUtils.scala   |  14 +-
 .../org/apache/spark/deploy/RPackageUtils.scala | 232 +++++++++++++++++++
 .../org/apache/spark/deploy/SparkSubmit.scala   |  11 +-
 .../spark/deploy/SparkSubmitArguments.scala     |   1 -
 .../org/apache/spark/deploy/IvyTestUtils.scala  | 101 ++++++--
 .../spark/deploy/RPackageUtilsSuite.scala       | 156 +++++++++++++
 .../apache/spark/deploy/SparkSubmitSuite.scala  |  24 ++
 9 files changed, 538 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/11d23115/R/install-dev.sh
----------------------------------------------------------------------
diff --git a/R/install-dev.sh b/R/install-dev.sh
index 4972bb9..59d98c9 100755
--- a/R/install-dev.sh
+++ b/R/install-dev.sh
@@ -42,8 +42,4 @@ Rscript -e ' if("devtools" %in% 
rownames(installed.packages())) { library(devtoo
 # Install SparkR to $LIB_DIR
 R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/
 
-# Zip the SparkR package so that it can be distributed to worker nodes on YARN
-cd $LIB_DIR
-jar cfM "$LIB_DIR/sparkr.zip" SparkR
-
 popd > /dev/null

http://git-wip-us.apache.org/repos/asf/spark/blob/11d23115/R/pkg/inst/tests/packageInAJarTest.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/packageInAJarTest.R 
b/R/pkg/inst/tests/packageInAJarTest.R
new file mode 100644
index 0000000..207a37a
--- /dev/null
+++ b/R/pkg/inst/tests/packageInAJarTest.R
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+library(SparkR)
+library(sparkPackageTest)
+
+sc <- sparkR.init()
+
+run1 <- myfunc(5L)
+
+run2 <- myfunc(-4L)
+
+sparkR.stop()
+
+if(run1 != 6) quit(save = "no", status = 1)
+
+if(run2 != -3) quit(save = "no", status = 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/11d23115/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala 
b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
index d53abd3..93b3bea 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
@@ -19,6 +19,8 @@ package org.apache.spark.api.r
 
 import java.io.File
 
+import scala.collection.JavaConversions._
+
 import org.apache.spark.{SparkEnv, SparkException}
 
 private[spark] object RUtils {
@@ -26,7 +28,7 @@ private[spark] object RUtils {
    * Get the SparkR package path in the local spark distribution.
    */
   def localSparkRPackagePath: Option[String] = {
-    val sparkHome = sys.env.get("SPARK_HOME")
+    val sparkHome = 
sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.test.home"))
     sparkHome.map(
       Seq(_, "R", "lib").mkString(File.separator)
     )
@@ -46,8 +48,8 @@ private[spark] object RUtils {
         (sparkConf.get("spark.master"), 
sparkConf.get("spark.submit.deployMode"))
       }
 
-    val isYarnCluster = master.contains("yarn") && deployMode == "cluster"
-    val isYarnClient = master.contains("yarn") && deployMode == "client"
+    val isYarnCluster = master != null && master.contains("yarn") && 
deployMode == "cluster"
+    val isYarnClient = master != null && master.contains("yarn") && deployMode 
== "client"
 
     // In YARN mode, the SparkR package is distributed as an archive 
symbolically
     // linked to the "sparkr" file in the current directory. Note that this 
does not apply
@@ -62,4 +64,10 @@ private[spark] object RUtils {
       }
     }
   }
+
+  /** Check if R is installed before running tests that use R commands. */
+  def isRInstalled: Boolean = {
+    val builder = new ProcessBuilder(Seq("R", "--version"))
+    builder.start().waitFor() == 0
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/11d23115/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala 
b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
new file mode 100644
index 0000000..ed1e972
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io._
+import java.util.jar.JarFile
+import java.util.logging.Level
+import java.util.zip.{ZipEntry, ZipOutputStream}
+
+import scala.collection.JavaConversions._
+
+import com.google.common.io.{ByteStreams, Files}
+
+import org.apache.spark.{SparkException, Logging}
+import org.apache.spark.api.r.RUtils
+import org.apache.spark.util.{RedirectThread, Utils}
+
+private[deploy] object RPackageUtils extends Logging {
+
+  /** The key in the MANIFEST.mf that we look for, in case a jar contains R 
code. */
+  private final val hasRPackage = "Spark-HasRPackage"
+
+  /** Base of the shell command used in order to install R packages. */
+  private final val baseInstallCmd = Seq("R", "CMD", "INSTALL", "-l")
+
+  /** R source code should exist under R/pkg in a jar. */
+  private final val RJarEntries = "R/pkg"
+
+  /** Documentation on how the R source file layout should be in the jar. */
+  private[deploy] final val RJarDoc =
+    s"""In order for Spark to build R packages that are parts of Spark 
Packages, there are a few
+      |requirements. The R source code must be shipped in a jar, with 
additional Java/Scala
+      |classes. The jar must be in the following format:
+      |  1- The Manifest (META-INF/MANIFEST.mf) must contain the key-value: 
$hasRPackage: true
+      |  2- The standard R package layout must be preserved under R/pkg/ 
inside the jar. More
+      |  information on the standard R package layout can be found in:
+      |  http://cran.r-project.org/doc/contrib/Leisch-CreatingPackages.pdf
+      |  An example layout is given below. After running `jar tf $$JAR_FILE | 
sort`:
+      |
+      |META-INF/MANIFEST.MF
+      |R/
+      |R/pkg/
+      |R/pkg/DESCRIPTION
+      |R/pkg/NAMESPACE
+      |R/pkg/R/
+      |R/pkg/R/myRcode.R
+      |org/
+      |org/apache/
+      |...
+    """.stripMargin.trim
+
+  /** Internal method for logging. We log to a printStream in tests, for 
debugging purposes. */
+  private def print(
+      msg: String,
+      printStream: PrintStream,
+      level: Level = Level.FINE,
+      e: Throwable = null): Unit = {
+    if (printStream != null) {
+      // scalastyle:off println
+      printStream.println(msg)
+      // scalastyle:on println
+      if (e != null) {
+        e.printStackTrace(printStream)
+      }
+    } else {
+      level match {
+        case Level.INFO => logInfo(msg)
+        case Level.WARNING => logWarning(msg)
+        case Level.SEVERE => logError(msg, e)
+        case _ => logDebug(msg)
+      }
+    }
+  }
+
+  /**
+   * Checks the manifest of the Jar whether there is any R source code bundled 
with it.
+   * Exposed for testing.
+   */
+  private[deploy] def checkManifestForR(jar: JarFile): Boolean = {
+    val manifest = jar.getManifest.getMainAttributes
+    manifest.getValue(hasRPackage) != null && 
manifest.getValue(hasRPackage).trim == "true"
+  }
+
+  /**
+   * Runs the standard R package installation code to build the R package from 
source.
+   * Multiple runs don't cause problems.
+   */
+  private def rPackageBuilder(dir: File, printStream: PrintStream, verbose: 
Boolean): Boolean = {
+    // this code should be always running on the driver.
+    val pathToSparkR = RUtils.localSparkRPackagePath.getOrElse(
+      throw new SparkException("SPARK_HOME not set. Can't locate SparkR 
package."))
+    val pathToPkg = Seq(dir, "R", "pkg").mkString(File.separator)
+    val installCmd = baseInstallCmd ++ Seq(pathToSparkR, pathToPkg)
+    if (verbose) {
+      print(s"Building R package with the command: $installCmd", printStream)
+    }
+    try {
+      val builder = new ProcessBuilder(installCmd)
+      builder.redirectErrorStream(true)
+      val env = builder.environment()
+      env.clear()
+      val process = builder.start()
+      new RedirectThread(process.getInputStream, printStream, "redirect R 
packaging").start()
+      process.waitFor() == 0
+    } catch {
+      case e: Throwable =>
+        print("Failed to build R package.", printStream, Level.SEVERE, e)
+        false
+    }
+  }
+
+  /**
+   * Extracts the files under /R in the jar to a temporary directory for 
building.
+   */
+  private def extractRFolder(jar: JarFile, printStream: PrintStream, verbose: 
Boolean): File = {
+    val tempDir = Utils.createTempDir(null)
+    val jarEntries = jar.entries()
+    while (jarEntries.hasMoreElements) {
+      val entry = jarEntries.nextElement()
+      val entryRIndex = entry.getName.indexOf(RJarEntries)
+      if (entryRIndex > -1) {
+        val entryPath = entry.getName.substring(entryRIndex)
+        if (entry.isDirectory) {
+          val dir = new File(tempDir, entryPath)
+          if (verbose) {
+            print(s"Creating directory: $dir", printStream)
+          }
+          dir.mkdirs
+        } else {
+          val inStream = jar.getInputStream(entry)
+          val outPath = new File(tempDir, entryPath)
+          Files.createParentDirs(outPath)
+          val outStream = new FileOutputStream(outPath)
+          if (verbose) {
+            print(s"Extracting $entry to $outPath", printStream)
+          }
+          Utils.copyStream(inStream, outStream, closeStreams = true)
+        }
+      }
+    }
+    tempDir
+  }
+
+  /**
+   * Extracts the files under /R in the jar to a temporary directory for 
building.
+   */
+  private[deploy] def checkAndBuildRPackage(
+      jars: String,
+      printStream: PrintStream = null,
+      verbose: Boolean = false): Unit = {
+    jars.split(",").foreach { jarPath =>
+      val file = new File(Utils.resolveURI(jarPath))
+      if (file.exists()) {
+        val jar = new JarFile(file)
+        if (checkManifestForR(jar)) {
+          print(s"$file contains R source code. Now installing package.", 
printStream, Level.INFO)
+          val rSource = extractRFolder(jar, printStream, verbose)
+          try {
+            if (!rPackageBuilder(rSource, printStream, verbose)) {
+              print(s"ERROR: Failed to build R package in $file.", printStream)
+              print(RJarDoc, printStream)
+            }
+          } finally {
+            rSource.delete() // clean up
+          }
+        } else {
+          if (verbose) {
+            print(s"$file doesn't contain R source code, skipping...", 
printStream)
+          }
+        }
+      } else {
+        print(s"WARN: $file resolved as dependency, but not found.", 
printStream, Level.WARNING)
+      }
+    }
+  }
+
+  private def listFilesRecursively(dir: File, excludePatterns: Seq[String]): 
Set[File] = {
+    if (!dir.exists()) {
+      Set.empty[File]
+    } else {
+      if (dir.isDirectory) {
+        val subDir = dir.listFiles(new FilenameFilter {
+          override def accept(dir: File, name: String): Boolean = {
+            !excludePatterns.map(name.contains).reduce(_ || _) // exclude 
files with given pattern
+          }
+        })
+        subDir.flatMap(listFilesRecursively(_, excludePatterns)).toSet
+      } else {
+        Set(dir)
+      }
+    }
+  }
+
+  /** Zips all the libraries found with SparkR in the R/lib directory for 
distribution with Yarn. */
+  private[deploy] def zipRLibraries(dir: File, name: String): File = {
+    val filesToBundle = listFilesRecursively(dir, Seq(".zip"))
+    // create a zip file from scratch, do not append to existing file.
+    val zipFile = new File(dir, name)
+    zipFile.delete()
+    val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, 
false))
+    try {
+      filesToBundle.foreach { file =>
+        // get the relative paths for proper naming in the zip file
+        val relPath = file.getAbsolutePath.replaceFirst(dir.getAbsolutePath, 
"")
+        val fis = new FileInputStream(file)
+        val zipEntry = new ZipEntry(relPath)
+        zipOutputStream.putNextEntry(zipEntry)
+        ByteStreams.copy(fis, zipOutputStream)
+        zipOutputStream.closeEntry()
+        fis.close()
+      }
+    } finally {
+      zipOutputStream.close()
+    }
+    zipFile
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/11d23115/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 31185c8..1186bed 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -292,6 +292,12 @@ object SparkSubmit {
       }
     }
 
+    // install any R packages that may have been passed through --jars or 
--packages.
+    // Spark Packages may contain R source code inside the jar.
+    if (args.isR && !StringUtils.isBlank(args.jars)) {
+      RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
+    }
+
     // Require all python files to be local, so we can add them to the 
PYTHONPATH
     // In YARN cluster mode, python files are distributed as regular files, 
which can be non-local
     if (args.isPython && !isYarnCluster) {
@@ -361,7 +367,8 @@ object SparkSubmit {
       if (rPackagePath.isEmpty) {
         printErrorAndExit("SPARK_HOME does not exist for R application in YARN 
mode.")
       }
-      val rPackageFile = new File(rPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
+      val rPackageFile =
+        RPackageUtils.zipRLibraries(new File(rPackagePath.get), 
SPARKR_PACKAGE_ARCHIVE)
       if (!rPackageFile.exists()) {
         printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R 
application in YARN mode.")
       }
@@ -987,11 +994,9 @@ private[spark] object SparkSubmitUtils {
         addExclusionRules(ivySettings, ivyConfName, md)
         // add all supplied maven artifacts as dependencies
         addDependenciesToIvy(md, artifacts, ivyConfName)
-
         exclusions.foreach { e =>
           md.addExcludeRule(createExclusion(e + ":*", ivySettings, 
ivyConfName))
         }
-
         // resolve dependencies
         val rr: ResolveReport = ivy.resolve(md, resolveOptions)
         if (rr.hasError) {

http://git-wip-us.apache.org/repos/asf/spark/blob/11d23115/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 44852ce..3f3c662 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -611,5 +611,4 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
       System.setErr(currentErr)
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/11d23115/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala 
b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
index 823050b..d93febc 100644
--- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
@@ -19,6 +19,10 @@ package org.apache.spark.deploy
 
 import java.io.{File, FileInputStream, FileOutputStream}
 import java.util.jar.{JarEntry, JarOutputStream}
+import java.util.jar.Attributes.Name
+import java.util.jar.Manifest
+
+import scala.collection.mutable.ArrayBuffer
 
 import com.google.common.io.{Files, ByteStreams}
 
@@ -35,7 +39,7 @@ private[deploy] object IvyTestUtils {
    * Create the path for the jar and pom from the maven coordinate. Extension 
should be `jar`
    * or `pom`.
    */
-  private def pathFromCoordinate(
+  private[deploy] def pathFromCoordinate(
       artifact: MavenCoordinate,
       prefix: File,
       ext: String,
@@ -52,7 +56,7 @@ private[deploy] object IvyTestUtils {
   }
 
   /** Returns the artifact naming based on standard ivy or maven format. */
-  private def artifactName(
+  private[deploy] def artifactName(
       artifact: MavenCoordinate,
       useIvyLayout: Boolean,
       ext: String = ".jar"): String = {
@@ -73,7 +77,7 @@ private[deploy] object IvyTestUtils {
   }
 
   /** Write the contents to a file to the supplied directory. */
-  private def writeFile(dir: File, fileName: String, contents: String): File = 
{
+  private[deploy] def writeFile(dir: File, fileName: String, contents: 
String): File = {
     val outputFile = new File(dir, fileName)
     val outputStream = new FileOutputStream(outputFile)
     outputStream.write(contents.toCharArray.map(_.toByte))
@@ -90,6 +94,42 @@ private[deploy] object IvyTestUtils {
     writeFile(dir, "mylib.py", contents)
   }
 
+  /** Create an example R package that calls the given Java class. */
+  private def createRFiles(
+      dir: File,
+      className: String,
+      packageName: String): Seq[(String, File)] = {
+    val rFilesDir = new File(dir, "R" + File.separator + "pkg")
+    Files.createParentDirs(new File(rFilesDir, "R" + File.separator + 
"mylib.R"))
+    val contents =
+      s"""myfunc <- function(x) {
+        |  SparkR:::callJStatic("$packageName.$className", "myFunc", x)
+        |}
+      """.stripMargin
+    val source = writeFile(new File(rFilesDir, "R"), "mylib.R", contents)
+    val description =
+      """Package: sparkPackageTest
+        |Type: Package
+        |Title: Test for building an R package
+        |Version: 0.1
+        |Date: 2015-07-08
+        |Author: Burak Yavuz
+        |Imports: methods, SparkR
+        |Depends: R (>= 3.1), methods, SparkR
+        |Suggests: testthat
+        |Description: Test for building an R package within a jar
+        |License: Apache License (== 2.0)
+        |Collate: 'mylib.R'
+      """.stripMargin
+    val descFile = writeFile(rFilesDir, "DESCRIPTION", description)
+    val namespace =
+      """import(SparkR)
+        |export("myfunc")
+      """.stripMargin
+    val nameFile = writeFile(rFilesDir, "NAMESPACE", namespace)
+    Seq(("R/pkg/R/mylib.R", source), ("R/pkg/DESCRIPTION", descFile), 
("R/pkg/NAMESPACE", nameFile))
+  }
+
   /** Create a simple testable Class. */
   private def createJavaClass(dir: File, className: String, packageName: 
String): File = {
     val contents =
@@ -97,17 +137,14 @@ private[deploy] object IvyTestUtils {
         |
         |import java.lang.Integer;
         |
-        |class $className implements java.io.Serializable {
-        |
-        | public $className() {}
-        |
-        | public Integer myFunc(Integer x) {
+        |public class $className implements java.io.Serializable {
+        | public static Integer myFunc(Integer x) {
         |   return x + 1;
         | }
         |}
       """.stripMargin
     val sourceFile =
-      new JavaSourceFromString(new File(dir, className + 
".java").getAbsolutePath, contents)
+      new JavaSourceFromString(new File(dir, className).getAbsolutePath, 
contents)
     createCompiledClass(className, dir, sourceFile, Seq.empty)
   }
 
@@ -199,14 +236,25 @@ private[deploy] object IvyTestUtils {
   }
 
   /** Create the jar for the given maven coordinate, using the supplied files. 
*/
-  private def packJar(
+  private[deploy] def packJar(
       dir: File,
       artifact: MavenCoordinate,
       files: Seq[(String, File)],
-      useIvyLayout: Boolean): File = {
+      useIvyLayout: Boolean,
+      withR: Boolean,
+      withManifest: Option[Manifest] = None): File = {
     val jarFile = new File(dir, artifactName(artifact, useIvyLayout))
     val jarFileStream = new FileOutputStream(jarFile)
-    val jarStream = new JarOutputStream(jarFileStream, new 
java.util.jar.Manifest())
+    val manifest = withManifest.getOrElse {
+      val mani = new Manifest()
+      if (withR) {
+        val attr = mani.getMainAttributes
+        attr.put(Name.MANIFEST_VERSION, "1.0")
+        attr.put(new Name("Spark-HasRPackage"), "true")
+      }
+      mani
+    }
+    val jarStream = new JarOutputStream(jarFileStream, manifest)
 
     for (file <- files) {
       val jarEntry = new JarEntry(file._1)
@@ -239,7 +287,8 @@ private[deploy] object IvyTestUtils {
       dependencies: Option[Seq[MavenCoordinate]] = None,
       tempDir: Option[File] = None,
       useIvyLayout: Boolean = false,
-      withPython: Boolean = false): File = {
+      withPython: Boolean = false,
+      withR: Boolean = false): File = {
     // Where the root of the repository exists, and what Ivy will search in
     val tempPath = tempDir.getOrElse(Files.createTempDir())
     // Create directory if it doesn't exist
@@ -255,14 +304,16 @@ private[deploy] object IvyTestUtils {
       val javaClass = createJavaClass(root, className, artifact.groupId)
       // A tuple of files representation in the jar, and the file
       val javaFile = (artifact.groupId.replace(".", "/") + "/" + 
javaClass.getName, javaClass)
-      val allFiles =
-        if (withPython) {
-          val pythonFile = createPythonFile(root)
-          Seq(javaFile, (pythonFile.getName, pythonFile))
-        } else {
-          Seq(javaFile)
-        }
-      val jarFile = packJar(jarPath, artifact, allFiles, useIvyLayout)
+      val allFiles = ArrayBuffer[(String, File)](javaFile)
+      if (withPython) {
+        val pythonFile = createPythonFile(root)
+        allFiles.append((pythonFile.getName, pythonFile))
+      }
+      if (withR) {
+        val rFiles = createRFiles(root, className, artifact.groupId)
+        allFiles.append(rFiles: _*)
+      }
+      val jarFile = packJar(jarPath, artifact, allFiles, useIvyLayout, withR)
       assert(jarFile.exists(), "Problem creating Jar file")
       val descriptor = createDescriptor(tempPath, artifact, dependencies, 
useIvyLayout)
       assert(descriptor.exists(), "Problem creating Pom file")
@@ -286,9 +337,10 @@ private[deploy] object IvyTestUtils {
       dependencies: Option[String],
       rootDir: Option[File],
       useIvyLayout: Boolean = false,
-      withPython: Boolean = false): File = {
+      withPython: Boolean = false,
+      withR: Boolean = false): File = {
     val deps = dependencies.map(SparkSubmitUtils.extractMavenCoordinates)
-    val mainRepo = createLocalRepository(artifact, deps, rootDir, 
useIvyLayout, withPython)
+    val mainRepo = createLocalRepository(artifact, deps, rootDir, 
useIvyLayout, withPython, withR)
     deps.foreach { seq => seq.foreach { dep =>
       createLocalRepository(dep, None, Some(mainRepo), useIvyLayout, 
withPython = false)
     }}
@@ -311,11 +363,12 @@ private[deploy] object IvyTestUtils {
       rootDir: Option[File],
       useIvyLayout: Boolean = false,
       withPython: Boolean = false,
+      withR: Boolean = false,
       ivySettings: IvySettings = new IvySettings)(f: String => Unit): Unit = {
     val deps = dependencies.map(SparkSubmitUtils.extractMavenCoordinates)
     purgeLocalIvyCache(artifact, deps, ivySettings)
     val repo = createLocalRepositoryForTests(artifact, dependencies, rootDir, 
useIvyLayout,
-      withPython)
+      withPython, withR)
     try {
       f(repo.toURI.toString)
     } finally {

http://git-wip-us.apache.org/repos/asf/spark/blob/11d23115/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
new file mode 100644
index 0000000..47a6408
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io.{PrintStream, OutputStream, File}
+import java.net.URI
+import java.util.jar.Attributes.Name
+import java.util.jar.{JarFile, Manifest}
+import java.util.zip.{ZipEntry, ZipFile}
+
+import org.scalatest.BeforeAndAfterEach
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.io.Files
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.api.r.RUtils
+import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
+
+class RPackageUtilsSuite extends SparkFunSuite with BeforeAndAfterEach {
+
+  private val main = MavenCoordinate("a", "b", "c")
+  private val dep1 = MavenCoordinate("a", "dep1", "c")
+  private val dep2 = MavenCoordinate("a", "dep2", "d")
+
+  private def getJarPath(coord: MavenCoordinate, repo: File): File = {
+    new File(IvyTestUtils.pathFromCoordinate(coord, repo, "jar", useIvyLayout 
= false),
+      IvyTestUtils.artifactName(coord, useIvyLayout = false, ".jar"))
+  }
+
+  private val lineBuffer = ArrayBuffer[String]()
+
+  private val noOpOutputStream = new OutputStream {
+    def write(b: Int) = {}
+  }
+
+  /** Simple PrintStream that reads data into a buffer */
+  private class BufferPrintStream extends PrintStream(noOpOutputStream) {
+    // scalastyle:off println
+    override def println(line: String) {
+    // scalastyle:on println
+      lineBuffer += line
+    }
+  }
+
+  def beforeAll() {
+    System.setProperty("spark.testing", "true")
+  }
+
+  override def beforeEach(): Unit = {
+    lineBuffer.clear()
+  }
+
+  test("pick which jars to unpack using the manifest") {
+    val deps = Seq(dep1, dep2).mkString(",")
+    IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo =>
+      val jars = Seq(main, dep1, dep2).map(c => new JarFile(getJarPath(c, new 
File(new URI(repo)))))
+      assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code")
+      assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R 
code")
+      assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R 
code")
+    }
+  }
+
+  test("build an R package from a jar end to end") {
+    assume(RUtils.isRInstalled, "R isn't installed on this machine.")
+    val deps = Seq(dep1, dep2).mkString(",")
+    IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo =>
+      val jars = Seq(main, dep1, dep2).map { c =>
+        getJarPath(c, new File(new URI(repo)))
+      }.mkString(",")
+      RPackageUtils.checkAndBuildRPackage(jars, new BufferPrintStream, verbose 
= true)
+      val firstJar = jars.substring(0, jars.indexOf(","))
+      val output = lineBuffer.mkString("\n")
+      assert(output.contains("Building R package"))
+      assert(output.contains("Extracting"))
+      assert(output.contains(s"$firstJar contains R source code. Now 
installing package."))
+      assert(output.contains("doesn't contain R source code, skipping..."))
+    }
+  }
+
+  test("jars that don't exist are skipped and print warning") {
+    assume(RUtils.isRInstalled, "R isn't installed on this machine.")
+    val deps = Seq(dep1, dep2).mkString(",")
+    IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo =>
+      val jars = Seq(main, dep1, dep2).map { c =>
+        getJarPath(c, new File(new URI(repo))) + "dummy"
+      }.mkString(",")
+      RPackageUtils.checkAndBuildRPackage(jars, new BufferPrintStream, verbose 
= true)
+      val individualJars = jars.split(",")
+      val output = lineBuffer.mkString("\n")
+      individualJars.foreach { jarFile =>
+        assert(output.contains(s"$jarFile"))
+      }
+    }
+  }
+
+  test("faulty R package shows documentation") {
+    assume(RUtils.isRInstalled, "R isn't installed on this machine.")
+    IvyTestUtils.withRepository(main, None, None) { repo =>
+      val manifest = new Manifest
+      val attr = manifest.getMainAttributes
+      attr.put(Name.MANIFEST_VERSION, "1.0")
+      attr.put(new Name("Spark-HasRPackage"), "true")
+      val jar = IvyTestUtils.packJar(new File(new URI(repo)), dep1, Nil,
+        useIvyLayout = false, withR = false, Some(manifest))
+      RPackageUtils.checkAndBuildRPackage(jar.getAbsolutePath, new 
BufferPrintStream,
+        verbose = true)
+      val output = lineBuffer.mkString("\n")
+      assert(output.contains(RPackageUtils.RJarDoc))
+    }
+  }
+
+  test("SparkR zipping works properly") {
+    val tempDir = Files.createTempDir()
+    try {
+      IvyTestUtils.writeFile(tempDir, "test.R", "abc")
+      val fakeSparkRDir = new File(tempDir, "SparkR")
+      assert(fakeSparkRDir.mkdirs())
+      IvyTestUtils.writeFile(fakeSparkRDir, "abc.R", "abc")
+      IvyTestUtils.writeFile(fakeSparkRDir, "DESCRIPTION", "abc")
+      IvyTestUtils.writeFile(tempDir, "package.zip", "abc") // fake zip file :)
+      val fakePackageDir = new File(tempDir, "packageTest")
+      assert(fakePackageDir.mkdirs())
+      IvyTestUtils.writeFile(fakePackageDir, "def.R", "abc")
+      IvyTestUtils.writeFile(fakePackageDir, "DESCRIPTION", "abc")
+      val finalZip = RPackageUtils.zipRLibraries(tempDir, "sparkr.zip")
+      assert(finalZip.exists())
+      val entries = new ZipFile(finalZip).entries().toSeq.map(_.getName)
+      assert(entries.contains("/test.R"))
+      assert(entries.contains("/SparkR/abc.R"))
+      assert(entries.contains("/SparkR/DESCRIPTION"))
+      assert(!entries.contains("/package.zip"))
+      assert(entries.contains("/packageTest/def.R"))
+      assert(entries.contains("/packageTest/DESCRIPTION"))
+    } finally {
+      FileUtils.deleteDirectory(tempDir)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/11d23115/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index aa78bfe..757e0ce 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -362,6 +362,30 @@ class SparkSubmitSuite
     }
   }
 
+  test("correctly builds R packages included in a jar with --packages") {
+    // TODO(SPARK-9603): Building a package to $SPARK_HOME/R/lib is 
unavailable on Jenkins.
+    // It's hard to write the test in SparkR (because we can't create the 
repository dynamically)
+    /*
+    assume(RUtils.isRInstalled, "R isn't installed on this machine.")
+    val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
+    val sparkHome = sys.props.getOrElse("spark.test.home", 
fail("spark.test.home is not set!"))
+    val rScriptDir =
+      Seq(sparkHome, "R", "pkg", "inst", "tests", 
"packageInAJarTest.R").mkString(File.separator)
+    assert(new File(rScriptDir).exists)
+    IvyTestUtils.withRepository(main, None, None, withR = true) { repo =>
+      val args = Seq(
+        "--name", "testApp",
+        "--master", "local-cluster[2,1,1024]",
+        "--packages", main.toString,
+        "--repositories", repo,
+        "--verbose",
+        "--conf", "spark.ui.enabled=false",
+        rScriptDir)
+      runSparkSubmit(args)
+    }
+    */
+  }
+
   test("resolves command line argument paths correctly") {
     val jars = "/jar1,/jar2"                 // --jars
     val files = "hdfs:/file1,file2"          // --files


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to