Repository: spark
Updated Branches:
  refs/heads/master d18276cb1 -> 6ca990fb3


[SPARK-13294][PROJECT INFRA] Remove MiMa's dependency on spark-class / Spark 
assembly

This patch removes the need to build a full Spark assembly before running the 
`dev/mima` script.

- I modified the `tools` project to remove a direct dependency on Spark, so 
`sbt/sbt tools/fullClasspath` will now return the classpath for the 
`GenerateMIMAIgnore` class itself plus its own dependencies.
   - This required me to delete two classes full of dead code that we don't use 
anymore
- `GenerateMIMAIgnore` now uses 
[ClassUtil](http://software.clapper.org/classutil/) to find all of the Spark 
classes rather than our homemade JAR traversal code. The problem in our own 
code was that it didn't handle folders of classes properly, which is necessary 
in order to generate excludes with an assembly-free Spark build.
- `./dev/mima` no longer runs through `spark-class`, eliminating the need to 
reason about classpath ordering between `SPARK_CLASSPATH` and the assembly.

Author: Josh Rosen <joshro...@databricks.com>

Closes #11178 from JoshRosen/remove-assembly-in-run-tests.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ca990fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ca990fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ca990fb

Branch: refs/heads/master
Commit: 6ca990fb366cf68cd9d5afb433725d28f07e51a0
Parents: d18276c
Author: Josh Rosen <joshro...@databricks.com>
Authored: Thu Mar 10 23:28:34 2016 -0800
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Thu Mar 10 23:28:34 2016 -0800

----------------------------------------------------------------------
 dev/mima                                        |  23 +-
 dev/run-tests.py                                |  24 +-
 .../launcher/SparkClassCommandBuilder.java      |  22 --
 project/SparkBuild.scala                        |  19 +-
 tools/pom.xml                                   |  15 +-
 .../apache/spark/tools/GenerateMIMAIgnore.scala |  53 +--
 .../tools/JavaAPICompletenessChecker.scala      | 367 -------------------
 .../apache/spark/tools/StoragePerfTester.scala  | 111 ------
 8 files changed, 58 insertions(+), 576 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6ca990fb/dev/mima
----------------------------------------------------------------------
diff --git a/dev/mima b/dev/mima
index d5baffc..b7f8d62 100755
--- a/dev/mima
+++ b/dev/mima
@@ -24,24 +24,21 @@ set -e
 FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
 cd "$FWDIR"
 
-echo -e "q\n" | build/sbt oldDeps/update
+TOOLS_CLASSPATH="$(build/sbt "export tools/fullClasspath" | tail -n1)"
+
 rm -f .generated-mima*
 
 generate_mima_ignore() {
-  SPARK_JAVA_OPTS="-XX:MaxPermSize=1g -Xmx2g" \
-    ./bin/spark-class org.apache.spark.tools.GenerateMIMAIgnore
+    java \
+      -XX:MaxPermSize=1g \
+      -Xmx2g \
+      -cp "$TOOLS_CLASSPATH:$1" \
+      org.apache.spark.tools.GenerateMIMAIgnore
 }
 
-# Generate Mima Ignore is called twice, first with latest built jars
-# on the classpath and then again with previous version jars on the classpath.
-# Because of a bug in GenerateMIMAIgnore that when old jars are ahead on 
classpath
-# it did not process the new classes (which are in assembly jar).
-generate_mima_ignore
-
-export SPARK_CLASSPATH="$(build/sbt "export oldDeps/fullClasspath" | tail -n1)"
-echo "SPARK_CLASSPATH=$SPARK_CLASSPATH"
-
-generate_mima_ignore
+SPARK_PROFILES="-Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver 
-Phive"
+generate_mima_ignore "$(build/sbt $SPARK_PROFILES "export 
assembly/fullClasspath" | tail -n1)"
+generate_mima_ignore "$(build/sbt $SPARK_PROFILES "export 
oldDeps/fullClasspath" | tail -n1)"
 
 echo -e "q\n" | build/sbt mima-report-binary-issues | grep -v -e 
"info.*Resolving"
 ret_val=$?

http://git-wip-us.apache.org/repos/asf/spark/blob/6ca990fb/dev/run-tests.py
----------------------------------------------------------------------
diff --git a/dev/run-tests.py b/dev/run-tests.py
index 6e45113..ebeede5 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -336,7 +336,6 @@ def build_spark_sbt(hadoop_version):
     # Enable all of the profiles for the build:
     build_profiles = get_hadoop_profiles(hadoop_version) + 
modules.root.build_profile_flags
     sbt_goals = ["package",
-                 "assembly/assembly",
                  "streaming-kafka-assembly/assembly",
                  "streaming-flume-assembly/assembly",
                  "streaming-mqtt-assembly/assembly",
@@ -350,6 +349,16 @@ def build_spark_sbt(hadoop_version):
     exec_sbt(profiles_and_goals)
 
 
+def build_spark_assembly_sbt(hadoop_version):
+    # Enable all of the profiles for the build:
+    build_profiles = get_hadoop_profiles(hadoop_version) + 
modules.root.build_profile_flags
+    sbt_goals = ["assembly/assembly"]
+    profiles_and_goals = build_profiles + sbt_goals
+    print("[info] Building Spark assembly (w/Hive 1.2.1) using SBT with these 
arguments: ",
+          " ".join(profiles_and_goals))
+    exec_sbt(profiles_and_goals)
+
+
 def build_apache_spark(build_tool, hadoop_version):
     """Will build Spark against Hive v1.2.1 given the passed in build tool 
(either `sbt` or
     `maven`). Defaults to using `sbt`."""
@@ -561,11 +570,14 @@ def main():
     # spark build
     build_apache_spark(build_tool, hadoop_version)
 
-    # TODO Temporarily disable MiMA check for DF-to-DS migration prototyping
-    # # backwards compatibility checks
-    # if build_tool == "sbt":
-    #     # Note: compatiblity tests only supported in sbt for now
-    #     detect_binary_inop_with_mima()
+    # backwards compatibility checks
+    if build_tool == "sbt":
+        # Note: compatibility tests only supported in sbt for now
+        # TODO Temporarily disable MiMA check for DF-to-DS migration 
prototyping
+        # detect_binary_inop_with_mima()
+        # Since we did not build assembly/assembly before running dev/mima, we 
need to
+        # do it here because the tests still rely on it; see SPARK-13294 for 
details.
+        build_spark_assembly_sbt(hadoop_version)
 
     # run the test suites
     run_scala_tests(build_tool, hadoop_version, test_modules, excluded_tags)

http://git-wip-us.apache.org/repos/asf/spark/blob/6ca990fb/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
 
b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
index 4018723..6b9d36c 100644
--- 
a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
+++ 
b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
@@ -17,12 +17,10 @@
 
 package org.apache.spark.launcher;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.regex.Pattern;
 
 import static org.apache.spark.launcher.CommandBuilderUtils.*;
 
@@ -76,26 +74,6 @@ class SparkClassCommandBuilder extends 
AbstractCommandBuilder {
       javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
       javaOptsKeys.add("SPARK_SHUFFLE_OPTS");
       memKey = "SPARK_DAEMON_MEMORY";
-    } else if (className.startsWith("org.apache.spark.tools.")) {
-      String sparkHome = getSparkHome();
-      File toolsDir = new File(join(File.separator, sparkHome, "tools", 
"target",
-        "scala-" + getScalaVersion()));
-      checkState(toolsDir.isDirectory(), "Cannot find tools build directory.");
-
-      Pattern re = Pattern.compile("spark-tools_.*\\.jar");
-      for (File f : toolsDir.listFiles()) {
-        if (re.matcher(f.getName()).matches()) {
-          extraClassPath = f.getAbsolutePath();
-          break;
-        }
-      }
-
-      checkState(extraClassPath != null,
-        "Failed to find Spark Tools Jar in %s.\n" +
-        "You need to run \"build/sbt tools/package\" before running %s.",
-        toolsDir.getAbsolutePath(), className);
-
-      javaOptsKeys.add("SPARK_JAVA_OPTS");
     } else {
       javaOptsKeys.add("SPARK_JAVA_OPTS");
       memKey = "SPARK_DRIVER_MEMORY";

http://git-wip-us.apache.org/repos/asf/spark/blob/6ca990fb/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index a380c4c..e74fb17 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -384,18 +384,19 @@ object OldDeps {
 
   lazy val project = Project("oldDeps", file("dev"), settings = 
oldDepsSettings)
 
-  def versionArtifact(id: String): Option[sbt.ModuleID] = {
-    val fullId = id + "_2.11"
-    Some("org.apache.spark" % fullId % "1.2.0")
-  }
-
   def oldDepsSettings() = Defaults.coreDefaultSettings ++ Seq(
     name := "old-deps",
     scalaVersion := "2.10.5",
-    libraryDependencies := Seq("spark-streaming-mqtt", 
"spark-streaming-zeromq",
-      "spark-streaming-flume", "spark-streaming-twitter",
-      "spark-streaming", "spark-mllib", "spark-graphx",
-      "spark-core").map(versionArtifact(_).get intransitive())
+    libraryDependencies := Seq(
+      "spark-streaming-mqtt",
+      "spark-streaming-zeromq",
+      "spark-streaming-flume",
+      "spark-streaming-twitter",
+      "spark-streaming",
+      "spark-mllib",
+      "spark-graphx",
+      "spark-core"
+    ).map(id => "org.apache.spark" % (id + "_2.11") % "1.2.0")
   )
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6ca990fb/tools/pom.xml
----------------------------------------------------------------------
diff --git a/tools/pom.xml b/tools/pom.xml
index b3a5ae2..9bb20e1 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -35,16 +35,6 @@
 
   <dependencies>
     <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-reflect</artifactId>
     </dependency>
@@ -52,6 +42,11 @@
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-compiler</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.clapper</groupId>
+      <artifactId>classutil_${scala.binary.version}</artifactId>
+      <version>1.0.6</version>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/spark/blob/6ca990fb/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala
----------------------------------------------------------------------
diff --git 
a/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala 
b/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala
index a947fac..738bd21 100644
--- a/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala
@@ -18,15 +18,13 @@
 // scalastyle:off classforname
 package org.apache.spark.tools
 
-import java.io.File
-import java.util.jar.JarFile
-
 import scala.collection.mutable
-import scala.collection.JavaConverters._
 import scala.reflect.runtime.{universe => unv}
 import scala.reflect.runtime.universe.runtimeMirror
 import scala.util.Try
 
+import org.clapper.classutil.ClassFinder
+
 /**
  * A tool for generating classes to be excluded during binary checking with 
MIMA. It is expected
  * that this tool is run with ./spark-class.
@@ -42,12 +40,13 @@ object GenerateMIMAIgnore {
   private val classLoader = Thread.currentThread().getContextClassLoader
   private val mirror = runtimeMirror(classLoader)
 
+  private def isDeveloperApi(sym: unv.Symbol) = sym.annotations.exists {
+    _.tpe =:= 
mirror.staticClass("org.apache.spark.annotation.DeveloperApi").toType
+  }
 
-  private def isDeveloperApi(sym: unv.Symbol) =
-    sym.annotations.exists(_.tpe =:= 
unv.typeOf[org.apache.spark.annotation.DeveloperApi])
-
-  private def isExperimental(sym: unv.Symbol) =
-    sym.annotations.exists(_.tpe =:= 
unv.typeOf[org.apache.spark.annotation.Experimental])
+  private def isExperimental(sym: unv.Symbol) = sym.annotations.exists {
+    _.tpe =:= 
mirror.staticClass("org.apache.spark.annotation.Experimental").toType
+  }
 
 
   private def isPackagePrivate(sym: unv.Symbol) =
@@ -160,35 +159,13 @@ object GenerateMIMAIgnore {
    * and subpackages both from directories and jars present on the classpath.
    */
   private def getClasses(packageName: String): Set[String] = {
-    val path = packageName.replace('.', '/')
-    val resources = classLoader.getResources(path)
-
-    val jars = resources.asScala.filter(_.getProtocol == "jar")
-      .map(_.getFile.split(":")(1).split("!")(0)).toSeq
-
-    jars.flatMap(getClassesFromJar(_, path))
-      .map(_.getName)
-      .filterNot(shouldExclude).toSet
-  }
-
-  /**
-   * Get all classes in a package from a jar file.
-   */
-  private def getClassesFromJar(jarPath: String, packageName: String) = {
-    import scala.collection.mutable
-    val jar = new JarFile(new File(jarPath))
-    val enums = 
jar.entries().asScala.map(_.getName).filter(_.startsWith(packageName))
-    val classes = mutable.HashSet[Class[_]]()
-    for (entry <- enums if entry.endsWith(".class")) {
-      try {
-        classes += Class.forName(entry.replace('/', 
'.').stripSuffix(".class"), false, classLoader)
-      } catch {
-        // scalastyle:off println
-        case _: Throwable => println("Unable to load:" + entry)
-        // scalastyle:on println
-      }
-    }
-    classes
+    val finder = ClassFinder()
+    finder
+      .getClasses
+      .map(_.name)
+      .filter(_.startsWith(packageName))
+      .filterNot(shouldExclude)
+      .toSet
   }
 }
 // scalastyle:on classforname

http://git-wip-us.apache.org/repos/asf/spark/blob/6ca990fb/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
----------------------------------------------------------------------
diff --git 
a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala 
b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
deleted file mode 100644
index ccd8fd3..0000000
--- 
a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.tools
-
-import java.lang.reflect.{Method, Type}
-
-import scala.collection.mutable.ArrayBuffer
-import scala.language.existentials
-
-import org.apache.spark._
-import org.apache.spark.api.java._
-import org.apache.spark.rdd.{DoubleRDDFunctions, OrderedRDDFunctions, 
PairRDDFunctions, RDD}
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaPairDStream, 
JavaStreamingContext}
-import org.apache.spark.streaming.dstream.{DStream, PairDStreamFunctions}
-
-
-private[spark] abstract class SparkType(val name: String)
-
-private[spark] case class BaseType(override val name: String) extends 
SparkType(name) {
-  override def toString: String = {
-    name
-  }
-}
-
-private[spark]
-case class ParameterizedType(override val name: String,
-                             parameters: Seq[SparkType],
-                             typebounds: String = "") extends SparkType(name) {
-  override def toString: String = {
-    if (typebounds != "") {
-      typebounds + " " + name + "<" + parameters.mkString(", ") + ">"
-    } else {
-      name + "<" + parameters.mkString(", ") + ">"
-    }
-  }
-}
-
-private[spark]
-case class SparkMethod(name: String, returnType: SparkType, parameters: 
Seq[SparkType]) {
-  override def toString: String = {
-    returnType + " " + name + "(" + parameters.mkString(", ") + ")"
-  }
-}
-
-/**
- * A tool for identifying methods that need to be ported from Scala to the 
Java API.
- *
- * It uses reflection to find methods in the Scala API and rewrites those 
methods' signatures
- * into appropriate Java equivalents.  If those equivalent methods have not 
been implemented in
- * the Java API, they are printed.
- */
-object JavaAPICompletenessChecker {
-
-  private def parseType(typeStr: String): SparkType = {
-    if (!typeStr.contains("<")) {
-      // Base types might begin with "class" or "interface", so we have to 
strip that off:
-      BaseType(typeStr.trim.split(" ").last)
-    } else if (typeStr.endsWith("[]")) {
-      ParameterizedType("Array", Seq(parseType(typeStr.stripSuffix("[]"))))
-    } else {
-      val parts = typeStr.split("<", 2)
-      val name = parts(0).trim
-      assert (parts(1).last == '>')
-      val parameters = parts(1).dropRight(1)
-      ParameterizedType(name, parseTypeList(parameters))
-    }
-  }
-
-  private def parseTypeList(typeStr: String): Seq[SparkType] = {
-    val types: ArrayBuffer[SparkType] = new ArrayBuffer[SparkType]
-    var stack = 0
-    var token: StringBuffer = new StringBuffer()
-    for (c <- typeStr.trim) {
-      if (c == ',' && stack == 0) {
-        types += parseType(token.toString)
-        token = new StringBuffer()
-      } else if (c == ' ' && stack != 0) {
-        // continue
-      } else {
-        if (c == '<') {
-          stack += 1
-        } else if (c == '>') {
-          stack -= 1
-        }
-        token.append(c)
-      }
-    }
-    assert (stack == 0)
-    if (token.toString != "") {
-      types += parseType(token.toString)
-    }
-    types.toSeq
-  }
-
-  private def parseReturnType(typeStr: String): SparkType = {
-    if (typeStr(0) == '<') {
-      val parts = typeStr.drop(0).split(">", 2)
-      val parsed = parseType(parts(1)).asInstanceOf[ParameterizedType]
-      ParameterizedType(parsed.name, parsed.parameters, parts(0))
-    } else {
-      parseType(typeStr)
-    }
-  }
-
-  private def toSparkMethod(method: Method): SparkMethod = {
-    val returnType = parseReturnType(method.getGenericReturnType.toString)
-    val name = method.getName
-    val parameters = method.getGenericParameterTypes.map(t => 
parseType(t.toString))
-    SparkMethod(name, returnType, parameters)
-  }
-
-  private def toJavaType(scalaType: SparkType, isReturnType: Boolean): 
SparkType = {
-    val renameSubstitutions = Map(
-      "scala.collection.Map" -> "java.util.Map",
-      // TODO: the JavaStreamingContext API accepts Array arguments
-      // instead of Lists, so this isn't a trivial translation / sub:
-      "scala.collection.Seq" -> "java.util.List",
-      "scala.Function2" -> "org.apache.spark.api.java.function.Function2",
-      "scala.collection.Iterator" -> "java.util.Iterator",
-      "scala.collection.mutable.Queue" -> "java.util.Queue",
-      "double" -> "java.lang.Double"
-    )
-    // Keep applying the substitutions until we've reached a fixedpoint.
-    def applySubs(scalaType: SparkType): SparkType = {
-      scalaType match {
-        case ParameterizedType(name, parameters, typebounds) =>
-          name match {
-            case "org.apache.spark.rdd.RDD" =>
-              if (parameters(0).name == classOf[Tuple2[_, _]].getName) {
-                val tupleParams =
-                  
parameters(0).asInstanceOf[ParameterizedType].parameters.map(applySubs)
-                ParameterizedType(classOf[JavaPairRDD[_, _]].getName, 
tupleParams)
-              } else {
-                ParameterizedType(classOf[JavaRDD[_]].getName, 
parameters.map(applySubs))
-              }
-            case "org.apache.spark.streaming.dstream.DStream" =>
-              if (parameters(0).name == classOf[Tuple2[_, _]].getName) {
-                val tupleParams =
-                  
parameters(0).asInstanceOf[ParameterizedType].parameters.map(applySubs)
-                
ParameterizedType("org.apache.spark.streaming.api.java.JavaPairDStream",
-                  tupleParams)
-              } else {
-                
ParameterizedType("org.apache.spark.streaming.api.java.JavaDStream",
-                  parameters.map(applySubs))
-              }
-            case "scala.Option" => {
-              if (isReturnType) {
-                ParameterizedType("org.apache.spark.api.java.Optional", 
parameters.map(applySubs))
-              } else {
-                applySubs(parameters(0))
-              }
-            }
-            case "scala.Function1" =>
-              val firstParamName = parameters.last.name
-              if (firstParamName.startsWith("scala.collection.Traversable") ||
-                firstParamName.startsWith("scala.collection.Iterator")) {
-                
ParameterizedType("org.apache.spark.api.java.function.FlatMapFunction",
-                  Seq(parameters(0),
-                    
parameters.last.asInstanceOf[ParameterizedType].parameters(0)).map(applySubs))
-              } else if (firstParamName == "scala.runtime.BoxedUnit") {
-                
ParameterizedType("org.apache.spark.api.java.function.VoidFunction",
-                  parameters.dropRight(1).map(applySubs))
-              } else {
-                
ParameterizedType("org.apache.spark.api.java.function.Function",
-                  parameters.map(applySubs))
-              }
-            case _ =>
-              ParameterizedType(renameSubstitutions.getOrElse(name, name),
-                parameters.map(applySubs))
-          }
-        case BaseType(name) =>
-          if (renameSubstitutions.contains(name)) {
-            BaseType(renameSubstitutions(name))
-          } else {
-            scalaType
-          }
-      }
-    }
-    var oldType = scalaType
-    var newType = applySubs(scalaType)
-    while (oldType != newType) {
-      oldType = newType
-      newType = applySubs(scalaType)
-    }
-    newType
-  }
-
-  private def toJavaMethod(method: SparkMethod): SparkMethod = {
-    val params = method.parameters
-      .filterNot(_.name == "scala.reflect.ClassTag")
-      .map(toJavaType(_, isReturnType = false))
-    SparkMethod(method.name, toJavaType(method.returnType, isReturnType = 
true), params)
-  }
-
-  private def isExcludedByName(method: Method): Boolean = {
-    val name = method.getDeclaringClass.getName + "." + method.getName
-    // Scala methods that are declared as private[mypackage] become public in 
the resulting
-    // Java bytecode.  As a result, we need to manually exclude those methods 
here.
-    // This list also includes a few methods that are only used by the web UI 
or other
-    // internal Spark components.
-    val excludedNames = Seq(
-      "org.apache.spark.rdd.RDD.origin",
-      "org.apache.spark.rdd.RDD.elementClassTag",
-      "org.apache.spark.rdd.RDD.checkpointData",
-      "org.apache.spark.rdd.RDD.partitioner",
-      "org.apache.spark.rdd.RDD.partitions",
-      "org.apache.spark.rdd.RDD.firstParent",
-      "org.apache.spark.rdd.RDD.doCheckpoint",
-      "org.apache.spark.rdd.RDD.markCheckpointed",
-      "org.apache.spark.rdd.RDD.clearDependencies",
-      "org.apache.spark.rdd.RDD.getDependencies",
-      "org.apache.spark.rdd.RDD.getPartitions",
-      "org.apache.spark.rdd.RDD.dependencies",
-      "org.apache.spark.rdd.RDD.getPreferredLocations",
-      "org.apache.spark.rdd.RDD.collectPartitions",
-      "org.apache.spark.rdd.RDD.computeOrReadCheckpoint",
-      "org.apache.spark.rdd.PairRDDFunctions.getKeyClass",
-      "org.apache.spark.rdd.PairRDDFunctions.getValueClass",
-      "org.apache.spark.SparkContext.stringToText",
-      "org.apache.spark.SparkContext.makeRDD",
-      "org.apache.spark.SparkContext.runJob",
-      "org.apache.spark.SparkContext.runApproximateJob",
-      "org.apache.spark.SparkContext.clean",
-      "org.apache.spark.SparkContext.metadataCleaner",
-      "org.apache.spark.SparkContext.ui",
-      "org.apache.spark.SparkContext.newShuffleId",
-      "org.apache.spark.SparkContext.newRddId",
-      "org.apache.spark.SparkContext.cleanup",
-      "org.apache.spark.SparkContext.receiverJobThread",
-      "org.apache.spark.SparkContext.getRDDStorageInfo",
-      "org.apache.spark.SparkContext.addedFiles",
-      "org.apache.spark.SparkContext.addedJars",
-      "org.apache.spark.SparkContext.persistentRdds",
-      "org.apache.spark.SparkContext.executorEnvs",
-      "org.apache.spark.SparkContext.checkpointDir",
-      "org.apache.spark.SparkContext.getSparkHome",
-      "org.apache.spark.SparkContext.executorMemoryRequested",
-      "org.apache.spark.SparkContext.getExecutorStorageStatus",
-      "org.apache.spark.streaming.dstream.DStream.generatedRDDs",
-      "org.apache.spark.streaming.dstream.DStream.zeroTime",
-      "org.apache.spark.streaming.dstream.DStream.rememberDuration",
-      "org.apache.spark.streaming.dstream.DStream.storageLevel",
-      "org.apache.spark.streaming.dstream.DStream.mustCheckpoint",
-      "org.apache.spark.streaming.dstream.DStream.checkpointDuration",
-      "org.apache.spark.streaming.dstream.DStream.checkpointData",
-      "org.apache.spark.streaming.dstream.DStream.graph",
-      "org.apache.spark.streaming.dstream.DStream.isInitialized",
-      "org.apache.spark.streaming.dstream.DStream.parentRememberDuration",
-      "org.apache.spark.streaming.dstream.DStream.initialize",
-      "org.apache.spark.streaming.dstream.DStream.validate",
-      "org.apache.spark.streaming.dstream.DStream.setContext",
-      "org.apache.spark.streaming.dstream.DStream.setGraph",
-      "org.apache.spark.streaming.dstream.DStream.remember",
-      "org.apache.spark.streaming.dstream.DStream.getOrCompute",
-      "org.apache.spark.streaming.dstream.DStream.generateJob",
-      "org.apache.spark.streaming.dstream.DStream.clearOldMetadata",
-      "org.apache.spark.streaming.dstream.DStream.addMetadata",
-      "org.apache.spark.streaming.dstream.DStream.updateCheckpointData",
-      "org.apache.spark.streaming.dstream.DStream.restoreCheckpointData",
-      "org.apache.spark.streaming.dstream.DStream.isTimeValid",
-      "org.apache.spark.streaming.StreamingContext.nextNetworkInputStreamId",
-      "org.apache.spark.streaming.StreamingContext.checkpointDir",
-      "org.apache.spark.streaming.StreamingContext.checkpointDuration",
-      "org.apache.spark.streaming.StreamingContext.receiverJobThread",
-      "org.apache.spark.streaming.StreamingContext.scheduler",
-      "org.apache.spark.streaming.StreamingContext.initialCheckpoint",
-      "org.apache.spark.streaming.StreamingContext.getNewNetworkStreamId",
-      "org.apache.spark.streaming.StreamingContext.validate",
-      "org.apache.spark.streaming.StreamingContext.createNewSparkContext",
-      "org.apache.spark.streaming.StreamingContext.rddToFileName",
-      "org.apache.spark.streaming.StreamingContext.getSparkCheckpointDir",
-      "org.apache.spark.streaming.StreamingContext.env",
-      "org.apache.spark.streaming.StreamingContext.graph",
-      "org.apache.spark.streaming.StreamingContext.isCheckpointPresent"
-    )
-    val excludedPatterns = Seq(
-      """^org\.apache\.spark\.SparkContext\..*To.*Functions""",
-      """^org\.apache\.spark\.SparkContext\..*WritableConverter""",
-      """^org\.apache\.spark\.SparkContext\..*To.*Writable"""
-    ).map(_.r)
-    lazy val excludedByPattern =
-      !excludedPatterns.map(_.findFirstIn(name)).filter(_.isDefined).isEmpty
-    name.contains("$") || excludedNames.contains(name) || excludedByPattern
-  }
-
-  private def isExcludedByInterface(method: Method): Boolean = {
-    val excludedInterfaces =
-      Set("org.apache.spark.Logging", 
"org.apache.hadoop.mapreduce.HadoopMapReduceUtil")
-    def toComparisionKey(method: Method): (Class[_], String, Type) =
-      (method.getReturnType, method.getName, method.getGenericReturnType)
-    val interfaces = method.getDeclaringClass.getInterfaces.filter { i =>
-      excludedInterfaces.contains(i.getName)
-    }
-    val excludedMethods = 
interfaces.flatMap(_.getMethods.map(toComparisionKey))
-    excludedMethods.contains(toComparisionKey(method))
-  }
-
-  private def printMissingMethods(scalaClass: Class[_], javaClass: Class[_]) {
-    val methods = scalaClass.getMethods
-      .filterNot(_.isAccessible)
-      .filterNot(isExcludedByName)
-      .filterNot(isExcludedByInterface)
-    val javaEquivalents = methods.map(m => 
toJavaMethod(toSparkMethod(m))).toSet
-
-    val javaMethods = javaClass.getMethods.map(toSparkMethod).toSet
-
-    val missingMethods = javaEquivalents -- javaMethods
-
-    for (method <- missingMethods) {
-      // scalastyle:off println
-      println(method)
-      // scalastyle:on println
-    }
-  }
-
-  def main(args: Array[String]) {
-    // scalastyle:off println
-    println("Missing RDD methods")
-    printMissingMethods(classOf[RDD[_]], classOf[JavaRDD[_]])
-    println()
-
-    println("Missing PairRDD methods")
-    printMissingMethods(classOf[PairRDDFunctions[_, _]], 
classOf[JavaPairRDD[_, _]])
-    println()
-
-    println("Missing DoubleRDD methods")
-    printMissingMethods(classOf[DoubleRDDFunctions], classOf[JavaDoubleRDD])
-    println()
-
-    println("Missing OrderedRDD methods")
-    printMissingMethods(classOf[OrderedRDDFunctions[_, _, _]], 
classOf[JavaPairRDD[_, _]])
-    println()
-
-    println("Missing SparkContext methods")
-    printMissingMethods(classOf[SparkContext], classOf[JavaSparkContext])
-    println()
-
-    println("Missing StreamingContext methods")
-    printMissingMethods(classOf[StreamingContext], 
classOf[JavaStreamingContext])
-    println()
-
-    println("Missing DStream methods")
-    printMissingMethods(classOf[DStream[_]], classOf[JavaDStream[_]])
-    println()
-
-    println("Missing PairDStream methods")
-    printMissingMethods(classOf[PairDStreamFunctions[_, _]], 
classOf[JavaPairDStream[_, _]])
-    println()
-    // scalastyle:on println
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6ca990fb/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
----------------------------------------------------------------------
diff --git 
a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala 
b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
deleted file mode 100644
index 8a5c7c0..0000000
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.tools
-
-import java.util.concurrent.{CountDownLatch, Executors}
-import java.util.concurrent.atomic.AtomicLong
-
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.executor.ShuffleWriteMetrics
-import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.shuffle.hash.HashShuffleManager
-import org.apache.spark.util.Utils
-
-/**
- * Internal utility for micro-benchmarking shuffle write performance.
- *
- * Writes simulated shuffle output from several threads and records the 
observed throughput.
- */
-object StoragePerfTester {
-  def main(args: Array[String]): Unit = {
-    /** Total amount of data to generate. Distributed evenly amongst maps and 
reduce splits. */
-    val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", 
"1g"))
-
-    /** Number of map tasks. All tasks execute concurrently. */
-    val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8)
-
-    /** Number of reduce splits for each map task. */
-    val numOutputSplits = 
sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500)
-
-    val recordLength = 1000 // ~1KB records
-    val totalRecords = dataSizeMb * 1000
-    val recordsPerMap = totalRecords / numMaps
-
-    val writeKey = "1" * (recordLength / 2)
-    val writeValue = "1" * (recordLength / 2)
-    val executor = Executors.newFixedThreadPool(numMaps)
-
-    val conf = new SparkConf()
-      .set("spark.shuffle.compress", "false")
-      .set("spark.shuffle.sync", "true")
-      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.hash.HashShuffleManager")
-
-    // This is only used to instantiate a BlockManager. All thread scheduling 
is done manually.
-    val sc = new SparkContext("local[4]", "Write Tester", conf)
-    val hashShuffleManager = 
sc.env.shuffleManager.asInstanceOf[HashShuffleManager]
-
-    def writeOutputBytes(mapId: Int, total: AtomicLong): Unit = {
-      val shuffle = hashShuffleManager.shuffleBlockResolver.forMapTask(1, 
mapId, numOutputSplits,
-        new KryoSerializer(sc.conf), new ShuffleWriteMetrics())
-      val writers = shuffle.writers
-      for (i <- 1 to recordsPerMap) {
-        writers(i % numOutputSplits).write(writeKey, writeValue)
-      }
-      writers.map { w =>
-        w.commitAndClose()
-        total.addAndGet(w.fileSegment().length)
-      }
-
-      shuffle.releaseWriters(true)
-    }
-
-    val start = System.currentTimeMillis()
-    val latch = new CountDownLatch(numMaps)
-    val totalBytes = new AtomicLong()
-    for (task <- 1 to numMaps) {
-      executor.submit(new Runnable() {
-        override def run(): Unit = {
-          try {
-            writeOutputBytes(task, totalBytes)
-            latch.countDown()
-          } catch {
-            case e: Exception =>
-              // scalastyle:off println
-              println("Exception in child thread: " + e + " " + e.getMessage)
-              // scalastyle:on println
-              System.exit(1)
-          }
-        }
-      })
-    }
-    latch.await()
-    val end = System.currentTimeMillis()
-    val time = (end - start) / 1000.0
-    val bytesPerSecond = totalBytes.get() / time
-    val bytesPerFile = (totalBytes.get() / (numOutputSplits * 
numMaps.toDouble)).toLong
-
-    // scalastyle:off println
-    System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits))
-    
System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile)))
-    
System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong)))
-    // scalastyle:on println
-
-    executor.shutdown()
-    sc.stop()
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to