spark git commit: [SPARK-20886][CORE] HadoopMapReduceCommitProtocol to handle FileOutputCommitter.getWorkPath==null
Repository: spark Updated Branches: refs/heads/master 3d0e17424 -> e47f48c73 [SPARK-20886][CORE] HadoopMapReduceCommitProtocol to handle FileOutputCommitter.getWorkPath==null ## What changes were proposed in this pull request? Handles the situation where a `FileOutputCommitter.getWorkPath()` returns `null` by downgrading to the supplied `path` argument. The existing code does an `Option(workPath.toString).getOrElse(path)`, which triggers an NPE in the `toString()` operation if the workPath == null. The code apparently was meant to handle this (hence the getOrElse() clause, but as the NPE has already occurred at that point the else-clause never gets invoked. ## How was this patch tested? Manually, with some later code review. Author: Steve LoughranCloses #18111 from steveloughran/cloud/SPARK-20886-committer-NPE. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e47f48c7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e47f48c7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e47f48c7 Branch: refs/heads/master Commit: e47f48c737052564e92903de16ff16707fae32c3 Parents: 3d0e174 Author: Steve Loughran Authored: Wed Aug 30 13:03:30 2017 +0900 Committer: hyukjinkwon Committed: Wed Aug 30 13:03:30 2017 +0900 -- .../apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e47f48c7/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala -- diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 22e2679..b1d07ab 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -73,7 +73,8 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) val stagingDir: String = committer match { // For FileOutputCommitter it has its own staging path called "work path". - case f: FileOutputCommitter => Option(f.getWorkPath.toString).getOrElse(path) + case f: FileOutputCommitter => +Option(f.getWorkPath).map(_.toString).getOrElse(path) case _ => path } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21845][SQL] Make codegen fallback of expressions configurable
Repository: spark Updated Branches: refs/heads/master fba9cc846 -> 3d0e17424 [SPARK-21845][SQL] Make codegen fallback of expressions configurable ## What changes were proposed in this pull request? We should make codegen fallback of expressions configurable. So far, it is always on. We might hide it when our codegen have compilation bugs. Thus, we should also disable the codegen fallback when running test cases. ## How was this patch tested? Added test cases Author: gatorsmileCloses #19062 from gatorsmile/fallbackCodegen. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d0e1742 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d0e1742 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d0e1742 Branch: refs/heads/master Commit: 3d0e174244bc293f11dff0f11ef705ba6cd5fe3a Parents: fba9cc8 Author: gatorsmile Authored: Tue Aug 29 20:59:01 2017 -0700 Committer: gatorsmile Committed: Tue Aug 29 20:59:01 2017 -0700 -- .../org/apache/spark/sql/internal/SQLConf.scala | 6 +++--- .../org/apache/spark/sql/execution/SparkPlan.scala | 15 +-- .../spark/sql/execution/WholeStageCodegenExec.scala | 2 +- .../apache/spark/sql/DataFrameFunctionsSuite.scala | 2 +- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 12 +++- .../org/apache/spark/sql/test/SharedSQLContext.scala | 2 ++ .../org/apache/spark/sql/hive/test/TestHive.scala| 1 + 7 files changed, 24 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3d0e1742/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a685099..24f51ef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -551,9 +551,9 @@ object SQLConf { .intConf .createWithDefault(100) - val WHOLESTAGE_FALLBACK = buildConf("spark.sql.codegen.fallback") + val CODEGEN_FALLBACK = buildConf("spark.sql.codegen.fallback") .internal() -.doc("When true, whole stage codegen could be temporary disabled for the part of query that" + +.doc("When true, (whole stage) codegen could be temporary disabled for the part of query that" + " fail to compile generated code") .booleanConf .createWithDefault(true) @@ -1041,7 +1041,7 @@ class SQLConf extends Serializable with Logging { def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS) - def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK) + def codegenFallback: Boolean = getConf(CODEGEN_FALLBACK) def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES) http://git-wip-us.apache.org/repos/asf/spark/blob/3d0e1742/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index c7277c2..b1db9dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -56,14 +56,10 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ protected def sparkContext = sqlContext.sparkContext - // sqlContext will be null when we are being deserialized on the slaves. In this instance - // the value of subexpressionEliminationEnabled will be set by the deserializer after the - // constructor has run. - val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) { -sqlContext.conf.subexpressionEliminationEnabled - } else { -false - } + // whether we should fallback when hitting compilation errors caused by codegen + private val codeGenFallBack = sqlContext.conf.codegenFallback + + protected val subexpressionEliminationEnabled = sqlContext.conf.subexpressionEliminationEnabled /** Overridden make copy also propagates sqlContext to copied plan. */ override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = { @@ -370,8 +366,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ try { GeneratePredicate.generate(expression, inputSchema) } catch { - case e @ (_: JaninoRuntimeException | _: CompileException) - if sqlContext == null || sqlContext.conf.wholeStageFallback => + case _ @ (_:
spark git commit: [SPARK-21813][CORE] Modify TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES comments
Repository: spark Updated Branches: refs/heads/master d7b1fcf8f -> fba9cc846 [SPARK-21813][CORE] Modify TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES comments ## What changes were proposed in this pull request? The variable "TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES" comment error, It shouldn't be 2^32-1, should be 2^31-1, That means the maximum value of int. ## How was this patch tested? Existing test cases Author: he.qiaoCloses #19025 from Geek-He/08_23_comments. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fba9cc84 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fba9cc84 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fba9cc84 Branch: refs/heads/master Commit: fba9cc8466dccdcd1f6f372ea7962e7ae9e09be1 Parents: d7b1fcf Author: he.qiao Authored: Tue Aug 29 23:44:27 2017 +0100 Committer: Sean Owen Committed: Tue Aug 29 23:44:27 2017 +0100 -- .../main/java/org/apache/spark/memory/TaskMemoryManager.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fba9cc84/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java -- diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index 761ba9d..0f1e902 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -53,8 +53,8 @@ import org.apache.spark.util.Utils; * retrieve the base object. * * This allows us to address 8192 pages. In on-heap mode, the maximum page size is limited by the - * maximum size of a long[] array, allowing us to address 8192 * 2^32 * 8 bytes, which is - * approximately 35 terabytes of memory. + * maximum size of a long[] array, allowing us to address 8192 * (2^31 - 1) * 8 bytes, which is + * approximately 140 terabytes of memory. */ public class TaskMemoryManager { @@ -74,7 +74,7 @@ public class TaskMemoryManager { * Maximum supported data page size (in bytes). In principle, the maximum addressable page size is * (1L OFFSET_BITS) bytes, which is 2+ petabytes. However, the on-heap allocator's * maximum page size is limited by the maximum amount of data that can be stored in a long[] - * array, which is (2^32 - 1) * 8 bytes (or 16 gigabytes). Therefore, we cap this at 16 gigabytes. + * array, which is (2^31 - 1) * 8 bytes (or about 17 gigabytes). Therefore, we cap this at 17 gigabytes. */ public static final long MAXIMUM_PAGE_SIZE_BYTES = ((1L << 31) - 1) * 8L; - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21728][CORE] Allow SparkSubmit to use Logging.
Repository: spark Updated Branches: refs/heads/master 840ba053b -> d7b1fcf8f [SPARK-21728][CORE] Allow SparkSubmit to use Logging. This change initializes logging when SparkSubmit runs, using a configuration that should avoid printing log messages as much as possible with most configurations, and adds code to restore the Spark logging system to as close as possible to its initial state, so the Spark app being run can re-initialize logging with its own configuration. With that feature, some duplicate code in SparkSubmit can now be replaced with the existing methods in the Utils class, which could not be used before because they initialized logging. As part of that I also did some minor refactoring, moving methods that should really belong in DependencyUtils. The change also shuffles some code in SparkHadoopUtil so that SparkSubmit can create a Hadoop config like the rest of Spark code, respecting the user's Spark configuration. The behavior was verified running spark-shell, pyspark and normal applications, then verifying the logging behavior, with and without dependency downloads. Author: Marcelo VanzinCloses #19013 from vanzin/SPARK-21728. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7b1fcf8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7b1fcf8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7b1fcf8 Branch: refs/heads/master Commit: d7b1fcf8f0a267322af0592b2cb31f1c8970fb16 Parents: 840ba05 Author: Marcelo Vanzin Authored: Tue Aug 29 14:42:24 2017 -0700 Committer: Marcelo Vanzin Committed: Tue Aug 29 14:42:24 2017 -0700 -- .../apache/spark/deploy/DependencyUtils.scala | 112 .../apache/spark/deploy/SparkHadoopUtil.scala | 79 .../org/apache/spark/deploy/SparkSubmit.scala | 179 --- .../spark/deploy/worker/DriverWrapper.scala | 9 +- .../org/apache/spark/internal/Logging.scala | 61 +-- .../scala/org/apache/spark/util/Utils.scala | 10 +- .../apache/spark/deploy/SparkSubmitSuite.scala | 60 +-- 7 files changed, 263 insertions(+), 247 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d7b1fcf8/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala index 97f3803..db92a8f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala @@ -18,15 +18,15 @@ package org.apache.spark.deploy import java.io.File -import java.nio.file.Files import scala.collection.mutable.HashMap -import org.apache.commons.io.FileUtils import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.util.MutableURLClassLoader +import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.util.{MutableURLClassLoader, Utils} private[deploy] object DependencyUtils { @@ -51,41 +51,22 @@ private[deploy] object DependencyUtils { SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings, exclusions = exclusions) } - def createTempDir(): File = { -val targetDir = Files.createTempDirectory("tmp").toFile -// scalastyle:off runtimeaddshutdownhook -Runtime.getRuntime.addShutdownHook(new Thread() { - override def run(): Unit = { -FileUtils.deleteQuietly(targetDir) - } -}) -// scalastyle:on runtimeaddshutdownhook -targetDir - } - - def resolveAndDownloadJars(jars: String, userJar: String): String = { -val targetDir = DependencyUtils.createTempDir() -val hadoopConf = new Configuration() -val sparkProperties = new HashMap[String, String]() -val securityProperties = List("spark.ssl.fs.trustStore", "spark.ssl.trustStore", - "spark.ssl.fs.trustStorePassword", "spark.ssl.trustStorePassword", - "spark.ssl.fs.protocol", "spark.ssl.protocol") - -securityProperties.foreach { pName => - sys.props.get(pName).foreach { pValue => -sparkProperties.put(pName, pValue) - } -} - + def resolveAndDownloadJars( + jars: String, + userJar: String, + sparkConf: SparkConf, + hadoopConf: Configuration, + secMgr: SecurityManager): String = { +val targetDir = Utils.createTempDir() Option(jars) .map { -SparkSubmit.resolveGlobPaths(_, hadoopConf) +resolveGlobPaths(_, hadoopConf) .split(",") .filterNot(_.contains(userJar.split("/").last))
spark git commit: [MINOR][ML] Document treatment of instance weights in logreg summary
Repository: spark Updated Branches: refs/heads/master 6077e3ef3 -> 840ba053b [MINOR][ML] Document treatment of instance weights in logreg summary ## What changes were proposed in this pull request? Add Scaladoc noting that instance weights are currently ignored in the logistic regression summary traits. Author: Joseph K. BradleyCloses #19071 from jkbradley/lr-summary-minor. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/840ba053 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/840ba053 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/840ba053 Branch: refs/heads/master Commit: 840ba053b982362dfe84c6faa59d2237994d591c Parents: 6077e3e Author: Joseph K. Bradley Authored: Tue Aug 29 13:01:37 2017 -0700 Committer: Joseph K. Bradley Committed: Tue Aug 29 13:01:37 2017 -0700 -- .../org/apache/spark/ml/classification/LogisticRegression.scala | 4 1 file changed, 4 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/840ba053/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index ffe4b52..1869d51 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -1356,6 +1356,8 @@ private[ml] class MultiClassSummarizer extends Serializable { /** * :: Experimental :: * Abstraction for logistic regression results for a given model. + * + * Currently, the summary ignores the instance weights. */ @Experimental sealed trait LogisticRegressionSummary extends Serializable { @@ -1495,6 +1497,8 @@ sealed trait LogisticRegressionTrainingSummary extends LogisticRegressionSummary /** * :: Experimental :: * Abstraction for binary logistic regression results for a given model. + * + * Currently, the summary ignores the instance weights. */ @Experimental sealed trait BinaryLogisticRegressionSummary extends LogisticRegressionSummary { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Revert "[SPARK-21714][CORE][BACKPORT-2.2] Avoiding re-uploading remote resources in yarn client mode"
Repository: spark Updated Branches: refs/heads/branch-2.2 59529b21a -> 917fe6635 Revert "[SPARK-21714][CORE][BACKPORT-2.2] Avoiding re-uploading remote resources in yarn client mode" This reverts commit 59529b21a99f3c4db16b31da9dc7fce62349cf11. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/917fe663 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/917fe663 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/917fe663 Branch: refs/heads/branch-2.2 Commit: 917fe6635891ea76b22a3bcba282040afd14651d Parents: 59529b2 Author: Marcelo VanzinAuthored: Tue Aug 29 12:51:27 2017 -0700 Committer: Marcelo Vanzin Committed: Tue Aug 29 12:51:27 2017 -0700 -- .../org/apache/spark/deploy/SparkSubmit.scala | 66 +++ .../apache/spark/internal/config/package.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 25 +++ .../apache/spark/deploy/SparkSubmitSuite.scala | 68 .../main/scala/org/apache/spark/repl/Main.scala | 2 +- 5 files changed, 48 insertions(+), 115 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/917fe663/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 86d578e..c60a2a1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -208,20 +208,14 @@ object SparkSubmit extends CommandLineUtils { /** * Prepare the environment for submitting an application. - * - * @param args the parsed SparkSubmitArguments used for environment preparation. - * @param conf the Hadoop Configuration, this argument will only be set in unit test. - * @return a 4-tuple: - *(1) the arguments for the child process, - *(2) a list of classpath entries for the child, - *(3) a map of system properties, and - *(4) the main class for the child - * + * This returns a 4-tuple: + * (1) the arguments for the child process, + * (2) a list of classpath entries for the child, + * (3) a map of system properties, and + * (4) the main class for the child * Exposed for testing. */ - private[deploy] def prepareSubmitEnvironment( - args: SparkSubmitArguments, - conf: Option[HadoopConfiguration] = None) + private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments) : (Seq[String], Seq[String], Map[String, String], String) = { // Return values val childArgs = new ArrayBuffer[String]() @@ -317,16 +311,12 @@ object SparkSubmit extends CommandLineUtils { } // In client mode, download remote files. -var localPrimaryResource: String = null -var localJars: String = null -var localPyFiles: String = null -var localFiles: String = null if (deployMode == CLIENT) { - val hadoopConf = conf.getOrElse(new HadoopConfiguration()) - localPrimaryResource = Option(args.primaryResource).map(downloadFile(_, hadoopConf)).orNull - localJars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull - localPyFiles = Option(args.pyFiles).map(downloadFileList(_, hadoopConf)).orNull - localFiles = Option(args.files).map(downloadFileList(_, hadoopConf)).orNull + val hadoopConf = new HadoopConfiguration() + args.primaryResource = Option(args.primaryResource).map(downloadFile(_, hadoopConf)).orNull + args.jars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull + args.pyFiles = Option(args.pyFiles).map(downloadFileList(_, hadoopConf)).orNull + args.files = Option(args.files).map(downloadFileList(_, hadoopConf)).orNull } // Require all python files to be local, so we can add them to the PYTHONPATH @@ -376,7 +366,7 @@ object SparkSubmit extends CommandLineUtils { // If a python file is provided, add it to the child arguments and list of files to deploy. // Usage: PythonAppRunner [app arguments] args.mainClass = "org.apache.spark.deploy.PythonRunner" -args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs +args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs if (clusterManager != YARN) { // The YARN backend distributes the primary file differently, so don't merge it. args.files = mergeFileLists(args.files, args.primaryResource) @@ -386,8 +376,8 @@ object SparkSubmit extends CommandLineUtils { // The YARN backend handles python files differently, so
spark git commit: [SPARK-21714][CORE][BACKPORT-2.2] Avoiding re-uploading remote resources in yarn client mode
Repository: spark Updated Branches: refs/heads/branch-2.2 59bb7ebfb -> 59529b21a [SPARK-21714][CORE][BACKPORT-2.2] Avoiding re-uploading remote resources in yarn client mode ## What changes were proposed in this pull request? This is a backport PR to fix issue of re-uploading remote resource in yarn client mode. The original PR is #18962. ## How was this patch tested? Tested in local UT. Author: jerryshaoCloses #19074 from jerryshao/SPARK-21714-2.2-backport. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/59529b21 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/59529b21 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/59529b21 Branch: refs/heads/branch-2.2 Commit: 59529b21a99f3c4db16b31da9dc7fce62349cf11 Parents: 59bb7eb Author: jerryshao Authored: Tue Aug 29 10:50:03 2017 -0700 Committer: Marcelo Vanzin Committed: Tue Aug 29 10:50:03 2017 -0700 -- .../org/apache/spark/deploy/SparkSubmit.scala | 66 --- .../apache/spark/internal/config/package.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 25 --- .../apache/spark/deploy/SparkSubmitSuite.scala | 68 .../main/scala/org/apache/spark/repl/Main.scala | 2 +- 5 files changed, 115 insertions(+), 48 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/59529b21/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index c60a2a1..86d578e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -208,14 +208,20 @@ object SparkSubmit extends CommandLineUtils { /** * Prepare the environment for submitting an application. - * This returns a 4-tuple: - * (1) the arguments for the child process, - * (2) a list of classpath entries for the child, - * (3) a map of system properties, and - * (4) the main class for the child + * + * @param args the parsed SparkSubmitArguments used for environment preparation. + * @param conf the Hadoop Configuration, this argument will only be set in unit test. + * @return a 4-tuple: + *(1) the arguments for the child process, + *(2) a list of classpath entries for the child, + *(3) a map of system properties, and + *(4) the main class for the child + * * Exposed for testing. */ - private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments) + private[deploy] def prepareSubmitEnvironment( + args: SparkSubmitArguments, + conf: Option[HadoopConfiguration] = None) : (Seq[String], Seq[String], Map[String, String], String) = { // Return values val childArgs = new ArrayBuffer[String]() @@ -311,12 +317,16 @@ object SparkSubmit extends CommandLineUtils { } // In client mode, download remote files. +var localPrimaryResource: String = null +var localJars: String = null +var localPyFiles: String = null +var localFiles: String = null if (deployMode == CLIENT) { - val hadoopConf = new HadoopConfiguration() - args.primaryResource = Option(args.primaryResource).map(downloadFile(_, hadoopConf)).orNull - args.jars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull - args.pyFiles = Option(args.pyFiles).map(downloadFileList(_, hadoopConf)).orNull - args.files = Option(args.files).map(downloadFileList(_, hadoopConf)).orNull + val hadoopConf = conf.getOrElse(new HadoopConfiguration()) + localPrimaryResource = Option(args.primaryResource).map(downloadFile(_, hadoopConf)).orNull + localJars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull + localPyFiles = Option(args.pyFiles).map(downloadFileList(_, hadoopConf)).orNull + localFiles = Option(args.files).map(downloadFileList(_, hadoopConf)).orNull } // Require all python files to be local, so we can add them to the PYTHONPATH @@ -366,7 +376,7 @@ object SparkSubmit extends CommandLineUtils { // If a python file is provided, add it to the child arguments and list of files to deploy. // Usage: PythonAppRunner [app arguments] args.mainClass = "org.apache.spark.deploy.PythonRunner" -args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs +args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs if (clusterManager != YARN) { // The YARN backend distributes the
spark git commit: [SPARK-21801][SPARKR][TEST] unit test randomly fail with randomforest
Repository: spark Updated Branches: refs/heads/master 6327ea570 -> 6077e3ef3 [SPARK-21801][SPARKR][TEST] unit test randomly fail with randomforest ## What changes were proposed in this pull request? fix the random seed to eliminate variability ## How was this patch tested? jenkins, appveyor, lots more jenkins Author: Felix CheungCloses #19018 from felixcheung/rrftest. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6077e3ef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6077e3ef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6077e3ef Branch: refs/heads/master Commit: 6077e3ef3cb39fab061cbf6a357423da030c47c4 Parents: 6327ea5 Author: Felix Cheung Authored: Tue Aug 29 10:09:41 2017 -0700 Committer: Felix Cheung Committed: Tue Aug 29 10:09:41 2017 -0700 -- R/pkg/tests/fulltests/test_mllib_tree.R | 36 +++- 1 file changed, 19 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6077e3ef/R/pkg/tests/fulltests/test_mllib_tree.R -- diff --git a/R/pkg/tests/fulltests/test_mllib_tree.R b/R/pkg/tests/fulltests/test_mllib_tree.R index 799f944..facd3a9 100644 --- a/R/pkg/tests/fulltests/test_mllib_tree.R +++ b/R/pkg/tests/fulltests/test_mllib_tree.R @@ -66,7 +66,7 @@ test_that("spark.gbt", { # label must be binary - GBTClassifier currently only supports binary classification. iris2 <- iris[iris$Species != "virginica", ] data <- suppressWarnings(createDataFrame(iris2)) - model <- spark.gbt(data, Species ~ Petal_Length + Petal_Width, "classification") + model <- spark.gbt(data, Species ~ Petal_Length + Petal_Width, "classification", seed = 12) stats <- summary(model) expect_equal(stats$numFeatures, 2) expect_equal(stats$numTrees, 20) @@ -94,7 +94,7 @@ test_that("spark.gbt", { iris2$NumericSpecies <- ifelse(iris2$Species == "setosa", 0, 1) df <- suppressWarnings(createDataFrame(iris2)) - m <- spark.gbt(df, NumericSpecies ~ ., type = "classification") + m <- spark.gbt(df, NumericSpecies ~ ., type = "classification", seed = 12) s <- summary(m) # test numeric prediction values expect_equal(iris2$NumericSpecies, as.double(collect(predict(m, df))$prediction)) @@ -106,7 +106,7 @@ test_that("spark.gbt", { if (windows_with_hadoop()) { data <- read.df(absoluteSparkPath("data/mllib/sample_binary_classification_data.txt"), source = "libsvm") -model <- spark.gbt(data, label ~ features, "classification") +model <- spark.gbt(data, label ~ features, "classification", seed = 12) expect_equal(summary(model)$numFeatures, 692) } @@ -117,10 +117,11 @@ test_that("spark.gbt", { trainidxs <- base::sample(nrow(data), nrow(data) * 0.7) traindf <- as.DataFrame(data[trainidxs, ]) testdf <- as.DataFrame(rbind(data[-trainidxs, ], c(0, "the other"))) - model <- spark.gbt(traindf, clicked ~ ., type = "classification") + model <- spark.gbt(traindf, clicked ~ ., type = "classification", seed = 23) predictions <- predict(model, testdf) expect_error(collect(predictions)) - model <- spark.gbt(traindf, clicked ~ ., type = "classification", handleInvalid = "keep") + model <- spark.gbt(traindf, clicked ~ ., type = "classification", handleInvalid = "keep", +seed = 23) predictions <- predict(model, testdf) expect_equal(class(collect(predictions)$clicked[1]), "character") }) @@ -129,7 +130,7 @@ test_that("spark.randomForest", { # regression data <- suppressWarnings(createDataFrame(longley)) model <- spark.randomForest(data, Employed ~ ., "regression", maxDepth = 5, maxBins = 16, - numTrees = 1) + numTrees = 1, seed = 1) predictions <- collect(predict(model, data)) expect_equal(predictions$prediction, c(60.323, 61.122, 60.171, 61.187, @@ -177,7 +178,7 @@ test_that("spark.randomForest", { # classification data <- suppressWarnings(createDataFrame(iris)) model <- spark.randomForest(data, Species ~ Petal_Length + Petal_Width, "classification", - maxDepth = 5, maxBins = 16) + maxDepth = 5, maxBins = 16, seed = 123) stats <- summary(model) expect_equal(stats$numFeatures, 2) @@ -215,7 +216,7 @@ test_that("spark.randomForest", { iris$NumericSpecies <- lapply(iris$Species, labelToIndex) data <- suppressWarnings(createDataFrame(iris[-5])) model <- spark.randomForest(data, NumericSpecies ~ Petal_Length + Petal_Width, "classification", - maxDepth = 5, maxBins = 16) + maxDepth = 5,
spark git commit: [SPARK-21255][SQL] simplify encoder for java enum
Repository: spark Updated Branches: refs/heads/master 8fcbda9c9 -> 6327ea570 [SPARK-21255][SQL] simplify encoder for java enum ## What changes were proposed in this pull request? This is a follow-up for https://github.com/apache/spark/pull/18488, to simplify the code. The major change is, we should map java enum to string type, instead of a struct type with a single string field. ## How was this patch tested? existing tests Author: Wenchen FanCloses #19066 from cloud-fan/fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6327ea57 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6327ea57 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6327ea57 Branch: refs/heads/master Commit: 6327ea570bf542983081c5d1d3ee7e6123365c8f Parents: 8fcbda9 Author: Wenchen Fan Authored: Tue Aug 29 09:15:59 2017 -0700 Committer: gatorsmile Committed: Tue Aug 29 09:15:59 2017 -0700 -- .../spark/sql/catalyst/JavaTypeInference.scala | 46 ++-- .../catalyst/encoders/ExpressionEncoder.scala | 14 +- .../catalyst/expressions/objects/objects.scala | 4 +- .../org/apache/spark/sql/JavaDatasetSuite.java | 24 -- 4 files changed, 25 insertions(+), 63 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6327ea57/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index 33f6ce0..3ecc137 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.expressions.objects._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String -import org.apache.spark.util.Utils /** * Type-inference utilities for POJOs and Java collections. @@ -120,8 +119,7 @@ object JavaTypeInference { (MapType(keyDataType, valueDataType, nullable), true) case other if other.isEnum => -(StructType(Seq(StructField(typeToken.getRawType.getSimpleName, - StringType, nullable = false))), true) +(StringType, true) case other => if (seenTypeSet.contains(other)) { @@ -310,9 +308,12 @@ object JavaTypeInference { returnNullable = false) case other if other.isEnum => -StaticInvoke(JavaTypeInference.getClass, ObjectType(other), "deserializeEnumName", - expressions.Literal.create(other.getEnumConstants.apply(0), ObjectType(other)) -:: getPath :: Nil) +StaticInvoke( + other, + ObjectType(other), + "valueOf", + Invoke(getPath, "toString", ObjectType(classOf[String]), returnNullable = false) :: Nil, + returnNullable = false) case other => val properties = getJavaBeanReadableAndWritableProperties(other) @@ -356,30 +357,6 @@ object JavaTypeInference { } } - /** Returns a mapping from enum value to int for given enum type */ - def enumSerializer[T <: Enum[T]](enum: Class[T]): T => UTF8String = { -assert(enum.isEnum) -inputObject: T => - UTF8String.fromString(inputObject.name()) - } - - /** Returns value index for given enum type and value */ - def serializeEnumName[T <: Enum[T]](enum: UTF8String, inputObject: T): UTF8String = { - enumSerializer(Utils.classForName(enum.toString).asInstanceOf[Class[T]])(inputObject) - } - - /** Returns a mapping from int to enum value for given enum type */ - def enumDeserializer[T <: Enum[T]](enum: Class[T]): InternalRow => T = { -assert(enum.isEnum) -value: InternalRow => - Enum.valueOf(enum, value.getUTF8String(0).toString) - } - - /** Returns enum value for given enum type and value index */ - def deserializeEnumName[T <: Enum[T]](typeDummy: T, inputObject: InternalRow): T = { -enumDeserializer(typeDummy.getClass.asInstanceOf[Class[T]])(inputObject) - } - private def serializerFor(inputObject: Expression, typeToken: TypeToken[_]): Expression = { def toCatalystArray(input: Expression, elementType: TypeToken[_]): Expression = { @@ -465,9 +442,12 @@ object JavaTypeInference { ) case other if other.isEnum => - CreateNamedStruct(expressions.Literal("enum") :: - StaticInvoke(JavaTypeInference.getClass, StringType, "serializeEnumName", -
spark git commit: [SPARK-21848][SQL] Add trait UserDefinedExpression to identify user-defined functions
Repository: spark Updated Branches: refs/heads/master 32fa0b814 -> 8fcbda9c9 [SPARK-21848][SQL] Add trait UserDefinedExpression to identify user-defined functions ## What changes were proposed in this pull request? Add trait UserDefinedExpression to identify user-defined functions. UDF can be expensive. In optimizer we may need to avoid executing UDF multiple times. E.g. ```scala table.select(UDF as 'a).select('a, ('a + 1) as 'b) ``` If UDF is expensive in this case, optimizer should not collapse the project to ```scala table.select(UDF as 'a, (UDF+1) as 'b) ``` Currently UDF classes like PythonUDF, HiveGenericUDF are not defined in catalyst. This PR is to add a new trait to make it easier to identify user-defined functions. ## How was this patch tested? Unit test Author: Wang GengliangCloses #19064 from gengliangwang/UDFType. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8fcbda9c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8fcbda9c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8fcbda9c Branch: refs/heads/master Commit: 8fcbda9c93175c0d44b0e4deaf10df1a427e03ea Parents: 32fa0b8 Author: Wang Gengliang Authored: Tue Aug 29 09:08:59 2017 -0700 Committer: gatorsmile Committed: Tue Aug 29 09:08:59 2017 -0700 -- .../sql/catalyst/expressions/Expression.scala | 6 ++ .../spark/sql/catalyst/expressions/ScalaUDF.scala | 2 +- .../spark/sql/execution/aggregate/udaf.scala | 6 +- .../spark/sql/execution/python/PythonUDF.scala| 4 ++-- .../org/apache/spark/sql/hive/hiveUDFs.scala | 18 ++ 5 files changed, 28 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8fcbda9c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 74c4cdd..c058425 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -635,3 +635,9 @@ abstract class TernaryExpression extends Expression { } } } + +/** + * Common base trait for user-defined functions, including UDF/UDAF/UDTF of different languages + * and Hive function wrappers. + */ +trait UserDefinedExpression http://git-wip-us.apache.org/repos/asf/spark/blob/8fcbda9c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala index 9df0e2e..527f167 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala @@ -47,7 +47,7 @@ case class ScalaUDF( udfName: Option[String] = None, nullable: Boolean = true, udfDeterministic: Boolean = true) - extends Expression with ImplicitCastInputTypes with NonSQLExpression { + extends Expression with ImplicitCastInputTypes with NonSQLExpression with UserDefinedExpression { override def deterministic: Boolean = udfDeterministic && children.forall(_.deterministic) http://git-wip-us.apache.org/repos/asf/spark/blob/8fcbda9c/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala index ae5e2c6..fec1add 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala @@ -324,7 +324,11 @@ case class ScalaUDAF( udaf: UserDefinedAggregateFunction, mutableAggBufferOffset: Int = 0, inputAggBufferOffset: Int = 0) - extends ImperativeAggregate with NonSQLExpression with Logging with ImplicitCastInputTypes { + extends ImperativeAggregate + with NonSQLExpression + with Logging + with ImplicitCastInputTypes + with UserDefinedExpression { override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate = copy(mutableAggBufferOffset = newMutableAggBufferOffset)
spark-website git commit: Update committer page
Repository: spark-website Updated Branches: refs/heads/asf-site 8f64443a4 -> 434db70b4 Update committer page Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/434db70b Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/434db70b Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/434db70b Branch: refs/heads/asf-site Commit: 434db70b4d0392b6e3c87e55640752ffde39544c Parents: 8f64443 Author: jerryshaoAuthored: Tue Aug 29 16:31:38 2017 +0800 Committer: jerryshao Committed: Tue Aug 29 21:26:54 2017 +0800 -- committers.md| 1 + site/committers.html | 4 2 files changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark-website/blob/434db70b/committers.md -- diff --git a/committers.md b/committers.md index 040d419..54aac4d 100644 --- a/committers.md +++ b/committers.md @@ -50,6 +50,7 @@ navigation: |Josh Rosen|Databricks| |Sandy Ryza|Remix| |Kousuke Saruta|NTT Data| +|Saisai Shao|Hortonworks| |Prashant Sharma|IBM| |Ram Sriharsha|Databricks| |DB Tsai|Netflix| http://git-wip-us.apache.org/repos/asf/spark-website/blob/434db70b/site/committers.html -- diff --git a/site/committers.html b/site/committers.html index 4ca12ce..770487c 100644 --- a/site/committers.html +++ b/site/committers.html @@ -365,6 +365,10 @@ NTT Data + Saisai Shao + Hortonworks + + Prashant Sharma IBM - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark-website git commit: Update committer page
Repository: spark-website Updated Branches: refs/remotes/apache/asf-site [created] 1895d5cb0 Update committer page Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/1895d5cb Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/1895d5cb Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/1895d5cb Branch: refs/remotes/apache/asf-site Commit: 1895d5cb0cf79a507e4f14f626de585aa7b2534b Parents: 35eb147 Author: jerryshaoAuthored: Tue Aug 29 16:31:38 2017 +0800 Committer: jerryshao Committed: Tue Aug 29 16:57:48 2017 +0800 -- committers.md| 1 + site/committers.html | 4 2 files changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark-website/blob/1895d5cb/committers.md -- diff --git a/committers.md b/committers.md index 040d419..54aac4d 100644 --- a/committers.md +++ b/committers.md @@ -50,6 +50,7 @@ navigation: |Josh Rosen|Databricks| |Sandy Ryza|Remix| |Kousuke Saruta|NTT Data| +|Saisai Shao|Hortonworks| |Prashant Sharma|IBM| |Ram Sriharsha|Databricks| |DB Tsai|Netflix| http://git-wip-us.apache.org/repos/asf/spark-website/blob/1895d5cb/site/committers.html -- diff --git a/site/committers.html b/site/committers.html index 4ca12ce..770487c 100644 --- a/site/committers.html +++ b/site/committers.html @@ -365,6 +365,10 @@ NTT Data + Saisai Shao + Hortonworks + + Prashant Sharma IBM - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21781][SQL] Modify DataSourceScanExec to use concrete ColumnVector type.
Repository: spark Updated Branches: refs/heads/master c7270a46f -> 32fa0b814 [SPARK-21781][SQL] Modify DataSourceScanExec to use concrete ColumnVector type. ## What changes were proposed in this pull request? As mentioned at https://github.com/apache/spark/pull/18680#issuecomment-316820409, when we have more `ColumnVector` implementations, it might (or might not) have huge performance implications because it might disable inlining, or force virtual dispatches. As for read path, one of the major paths is the one generated by `ColumnBatchScan`. Currently it refers `ColumnVector` so the penalty will be bigger as we have more classes, but we can know the concrete type from its usage, e.g. vectorized Parquet reader uses `OnHeapColumnVector`. We can use the concrete type in the generated code directly to avoid the penalty. ## How was this patch tested? Existing tests. Author: Takuya UESHINCloses #18989 from ueshin/issues/SPARK-21781. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32fa0b81 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32fa0b81 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32fa0b81 Branch: refs/heads/master Commit: 32fa0b81411f781173e185f4b19b9fd6d118f9fe Parents: c7270a4 Author: Takuya UESHIN Authored: Tue Aug 29 20:16:45 2017 +0800 Committer: Wenchen Fan Committed: Tue Aug 29 20:16:45 2017 +0800 -- .../spark/sql/execution/ColumnarBatchScan.scala | 14 +- .../spark/sql/execution/DataSourceScanExec.scala | 5 + .../spark/sql/execution/datasources/FileFormat.scala | 10 ++ .../datasources/parquet/ParquetFileFormat.scala | 8 4 files changed, 32 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/32fa0b81/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala index 74a47da..1afe83e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala @@ -33,6 +33,8 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { val inMemoryTableScan: InMemoryTableScanExec = null + def vectorTypes: Option[Seq[String]] = None + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) @@ -79,17 +81,19 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { val scanTimeTotalNs = ctx.freshName("scanTime") ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;") -val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch" +val columnarBatchClz = classOf[ColumnarBatch].getName val batch = ctx.freshName("batch") ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;") -val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector" val idx = ctx.freshName("batchIdx") ctx.addMutableState("int", idx, s"$idx = 0;") val colVars = output.indices.map(i => ctx.freshName("colInstance" + i)) -val columnAssigns = colVars.zipWithIndex.map { case (name, i) => - ctx.addMutableState(columnVectorClz, name, s"$name = null;") - s"$name = $batch.column($i);" +val columnVectorClzs = vectorTypes.getOrElse( + Seq.fill(colVars.size)(classOf[ColumnVector].getName)) +val columnAssigns = colVars.zip(columnVectorClzs).zipWithIndex.map { + case ((name, columnVectorClz), i) => +ctx.addMutableState(columnVectorClz, name, s"$name = null;") +s"$name = ($columnVectorClz) $batch.column($i);" } val nextBatch = ctx.freshName("nextBatch") http://git-wip-us.apache.org/repos/asf/spark/blob/32fa0b81/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 588c937..77e6dbf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -174,6 +174,11 @@ case class FileSourceScanExec( false } + override def vectorTypes: Option[Seq[String]]
spark-website git commit: identify SPIP jira tickets by summary as well as label
Repository: spark-website Updated Branches: refs/heads/asf-site 35eb14717 -> 8f64443a4 identify SPIP jira tickets by summary as well as label Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/8f64443a Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/8f64443a Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/8f64443a Branch: refs/heads/asf-site Commit: 8f64443a44df71849856f740ad41ed88701008ba Parents: 35eb147 Author: cody koeningerAuthored: Mon Aug 28 15:07:49 2017 -0500 Committer: Sean Owen Committed: Tue Aug 29 10:58:28 2017 +0100 -- improvement-proposals.md| 4 ++-- site/improvement-proposals.html | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark-website/blob/8f64443a/improvement-proposals.md -- diff --git a/improvement-proposals.md b/improvement-proposals.md index dca7918..8fab696 100644 --- a/improvement-proposals.md +++ b/improvement-proposals.md @@ -23,9 +23,9 @@ An SPIP: - Follows the template defined below - Includes discussions on the JIRA ticket and dev@ list about the proposal -https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20status%20in%20(Open%2C%20Reopened%2C%20%22In%20Progress%22)%20AND%20labels%20%3D%20SPIP%20ORDER%20BY%20createdDate%20DESC">Current SPIPs +https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20status%20in%20(Open%2C%20Reopened%2C%20%22In%20Progress%22)%20AND%20(labels%20%3D%20SPIP%20OR%20summary%20~%20%22SPIP%22)%20ORDER%20BY%20createdDate%20DESC">Current SPIPs -https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20status%20in%20(Resolved)%20AND%20labels%20%3D%20SPIP%20ORDER%20BY%20createdDate%20DESC">Past SPIPs +https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20status%20in%20(Resolved)%20AND%20(labels%20%3D%20SPIP%20OR%20summary%20~%20%22SPIP%22)%20ORDER%20BY%20createdDate%20DESC">Past SPIPs Who? http://git-wip-us.apache.org/repos/asf/spark-website/blob/8f64443a/site/improvement-proposals.html -- diff --git a/site/improvement-proposals.html b/site/improvement-proposals.html index c5d4ac5..92adf75 100644 --- a/site/improvement-proposals.html +++ b/site/improvement-proposals.html @@ -212,9 +212,9 @@ Includes discussions on the JIRA ticket and dev@ list about the proposal -https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20status%20in%20(Open%2C%20Reopened%2C%20%22In%20Progress%22)%20AND%20labels%20%3D%20SPIP%20ORDER%20BY%20createdDate%20DESC">Current SPIPs +https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20status%20in%20(Open%2C%20Reopened%2C%20%22In%20Progress%22)%20AND%20(labels%20%3D%20SPIP%20OR%20summary%20~%20%22SPIP%22)%20ORDER%20BY%20createdDate%20DESC">Current SPIPs -https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20status%20in%20(Resolved)%20AND%20labels%20%3D%20SPIP%20ORDER%20BY%20createdDate%20DESC">Past SPIPs +https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20status%20in%20(Resolved)%20AND%20(labels%20%3D%20SPIP%20OR%20summary%20~%20%22SPIP%22)%20ORDER%20BY%20createdDate%20DESC">Past SPIPs Who? - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org