spark git commit: [SPARK-13503][SQL] Support to specify the (writing) option for compression codec for TEXT
Repository: spark Updated Branches: refs/heads/master 26ac60806 -> 9812a24aa [SPARK-13503][SQL] Support to specify the (writing) option for compression codec for TEXT ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-13503 This PR makes the TEXT datasource can compress output by option instead of manually setting Hadoop configurations. For reflecting codec by names, it is similar with https://github.com/apache/spark/pull/10805 and https://github.com/apache/spark/pull/10858. ## How was this patch tested? This was tested with unittests and with `dev/run_tests` for coding style Author: hyukjinkwonCloses #11384 from HyukjinKwon/SPARK-13503. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9812a24a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9812a24a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9812a24a Branch: refs/heads/master Commit: 9812a24aa80713cd7a8d89f6abe5aadf1e567cc2 Parents: 26ac608 Author: hyukjinkwon Authored: Thu Feb 25 23:57:29 2016 -0800 Committer: Reynold Xin Committed: Thu Feb 25 23:57:29 2016 -0800 -- .../datasources/CompressionCodecs.scala | 14 .../execution/datasources/csv/CSVRelation.scala | 36 +--- .../datasources/json/JSONRelation.scala | 6 +--- .../datasources/text/DefaultSource.scala| 13 +-- .../execution/datasources/text/TextSuite.scala | 15 5 files changed, 57 insertions(+), 27 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9812a24a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala index e683a95..bc8ef4a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.{BZip2Codec, GzipCodec, Lz4Codec, SnappyCodec} import org.apache.spark.util.Utils @@ -44,4 +46,16 @@ private[datasources] object CompressionCodecs { s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.") } } + + /** + * Set compression configurations to Hadoop `Configuration`. + * `codec` should be a full class path + */ + def setCodecConfiguration(conf: Configuration, codec: String): Unit = { +conf.set("mapreduce.output.fileoutputformat.compress", "true") +conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) +conf.set("mapreduce.output.fileoutputformat.compress.codec", codec) +conf.set("mapreduce.map.output.compress", "true") +conf.set("mapreduce.map.output.compress.codec", codec) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/9812a24a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index da945c4..e9afee1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -24,7 +24,6 @@ import scala.util.control.NonFatal import com.google.common.base.Objects import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.{LongWritable, NullWritable, Text} -import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.hadoop.mapreduce.RecordWriter @@ -34,6 +33,7 @@ import org.apache.spark.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.CompressionCodecs import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -50,16 +50,16 @@ private[sql] class CSVRelation( case None => inferSchema(paths) } - private val params = new CSVOptions(parameters) + private val options
spark git commit: [SPARK-13487][SQL] User-facing RuntimeConfig interface
Repository: spark Updated Branches: refs/heads/master 8afe49141 -> 26ac60806 [SPARK-13487][SQL] User-facing RuntimeConfig interface ## What changes were proposed in this pull request? This patch creates the public API for runtime configuration and an implementation for it. The public runtime configuration includes configs for existing SQL, as well as Hadoop Configuration. This new interface is currently dead code. It will be added to SQLContext and a session entry point to Spark when we add that. ## How was this patch tested? a new unit test suite Author: Reynold XinCloses #11378 from rxin/SPARK-13487. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26ac6080 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26ac6080 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26ac6080 Branch: refs/heads/master Commit: 26ac60806cc23527ea8a75986c1eab83d312a15d Parents: 8afe491 Author: Reynold Xin Authored: Thu Feb 25 23:10:40 2016 -0800 Committer: Yin Huai Committed: Thu Feb 25 23:10:40 2016 -0800 -- .../org/apache/spark/sql/RuntimeConfig.scala| 100 +++ .../spark/sql/internal/RuntimeConfigImpl.scala | 73 ++ .../org/apache/spark/sql/internal/SQLConf.scala | 3 +- .../spark/sql/internal/RuntimeConfigSuite.scala | 86 4 files changed, 261 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/26ac6080/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala new file mode 100644 index 000..e90a042 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +/** + * Runtime configuration interface for Spark. To access this, use `SparkSession.conf`. + * + * @since 2.0.0 + */ +abstract class RuntimeConfig { + + /** + * Sets the given Spark runtime configuration property. + * + * @since 2.0.0 + */ + def set(key: String, value: String): RuntimeConfig + + /** + * Sets the given Spark runtime configuration property. + * + * @since 2.0.0 + */ + def set(key: String, value: Boolean): RuntimeConfig + + /** + * Sets the given Spark runtime configuration property. + * + * @since 2.0.0 + */ + def set(key: String, value: Long): RuntimeConfig + + /** + * Returns the value of Spark runtime configuration property for the given key. + * + * @throws NoSuchElementException if the key is not set and does not have a default value + * @since 2.0.0 + */ + @throws[NoSuchElementException]("if the key is not set") + def get(key: String): String + + /** + * Returns the value of Spark runtime configuration property for the given key. + * + * @since 2.0.0 + */ + def getOption(key: String): Option[String] + + /** + * Resets the configuration property for the given key. + * + * @since 2.0.0 + */ + def unset(key: String): Unit + + /** + * Sets the given Hadoop configuration property. This is passed directly to Hadoop during I/O. + * + * @since 2.0.0 + */ + def setHadoop(key: String, value: String): RuntimeConfig + + /** + * Returns the value of the Hadoop configuration property. + * + * @throws NoSuchElementException if the key is not set + * @since 2.0.0 + */ + @throws[NoSuchElementException]("if the key is not set") + def getHadoop(key: String): String + + /** + * Returns the value of the Hadoop configuration property. + * + * @since 2.0.0 + */ + def getHadoopOption(key: String): Option[String] + + /** + * Resets the Hadoop configuration property for the given key. + * + * @since 2.0.0 + */ + def unsetHadoop(key: String): Unit +}
spark git commit: [SPARK-12941][SQL][MASTER] Spark-SQL JDBC Oracle dialect fails to map string datatypes to Oracle VARCHAR datatype
Repository: spark Updated Branches: refs/heads/master 50e60e36f -> 8afe49141 [SPARK-12941][SQL][MASTER] Spark-SQL JDBC Oracle dialect fails to map string datatypes to Oracle VARCHAR datatype ## What changes were proposed in this pull request? This Pull request is used for the fix SPARK-12941, creating a data type mapping to Oracle for the corresponding data type"Stringtype" from dataframe. This PR is for the master branch fix, where as another PR is already tested with the branch 1.4 ## How was the this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) This patch was tested using the Oracle docker .Created a new integration suite for the same.The oracle.jdbc jar was to be downloaded from the maven repository.Since there was no jdbc jar available in the maven repository, the jar was downloaded from oracle site manually and installed in the local; thus tested. So, for SparkQA test case run, the ojdbc jar might be manually placed in the local maven repository(com/oracle/ojdbc6/11.2.0.2.0) while Spark QA test run. Author: thomastechsCloses #11306 from thomastechs/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8afe4914 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8afe4914 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8afe4914 Branch: refs/heads/master Commit: 8afe49141d9b6a603eb3907f32dce802a3d05172 Parents: 50e60e3 Author: thomastechs Authored: Thu Feb 25 22:52:25 2016 -0800 Committer: Yin Huai Committed: Thu Feb 25 22:52:25 2016 -0800 -- docker-integration-tests/pom.xml| 13 .../spark/sql/jdbc/OracleIntegrationSuite.scala | 80 .../apache/spark/sql/jdbc/OracleDialect.scala | 5 ++ 3 files changed, 98 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8afe4914/docker-integration-tests/pom.xml -- diff --git a/docker-integration-tests/pom.xml b/docker-integration-tests/pom.xml index 833ca29..048e58d 100644 --- a/docker-integration-tests/pom.xml +++ b/docker-integration-tests/pom.xml @@ -131,6 +131,19 @@ postgresql test + +
spark git commit: [SPARK-13504] [SPARKR] Add approxQuantile for SparkR
Repository: spark Updated Branches: refs/heads/master f3be369ef -> 50e60e36f [SPARK-13504] [SPARKR] Add approxQuantile for SparkR ## What changes were proposed in this pull request? Add ```approxQuantile``` for SparkR. ## How was this patch tested? unit tests Author: Yanbo LiangCloses #11383 from yanboliang/spark-13504 and squashes the following commits: 4f17adb [Yanbo Liang] Add approxQuantile for SparkR Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50e60e36 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50e60e36 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50e60e36 Branch: refs/heads/master Commit: 50e60e36f7775a10cf39338e7c5716578a24d89f Parents: f3be369 Author: Yanbo Liang Authored: Thu Feb 25 21:23:41 2016 -0800 Committer: Xiangrui Meng Committed: Thu Feb 25 21:23:41 2016 -0800 -- R/pkg/NAMESPACE | 1 + R/pkg/R/generics.R| 7 + R/pkg/R/stats.R | 39 ++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 8 ++ 4 files changed, 55 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/50e60e36/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 6a3d63f..636d39e 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -111,6 +111,7 @@ exportMethods("%in%", "add_months", "alias", "approxCountDistinct", + "approxQuantile", "array_contains", "asc", "ascii", http://git-wip-us.apache.org/repos/asf/spark/blob/50e60e36/R/pkg/R/generics.R -- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index ab61bce..3db72b5 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -67,6 +67,13 @@ setGeneric("crosstab", function(x, col1, col2) { standardGeneric("crosstab") }) # @export setGeneric("freqItems", function(x, cols, support = 0.01) { standardGeneric("freqItems") }) +# @rdname statfunctions +# @export +setGeneric("approxQuantile", + function(x, col, probabilities, relativeError) { + standardGeneric("approxQuantile") + }) + # @rdname distinct # @export setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") }) http://git-wip-us.apache.org/repos/asf/spark/blob/50e60e36/R/pkg/R/stats.R -- diff --git a/R/pkg/R/stats.R b/R/pkg/R/stats.R index 2e80768..edf7293 100644 --- a/R/pkg/R/stats.R +++ b/R/pkg/R/stats.R @@ -130,6 +130,45 @@ setMethod("freqItems", signature(x = "DataFrame", cols = "character"), collect(dataFrame(sct)) }) +#' approxQuantile +#' +#' Calculates the approximate quantiles of a numerical column of a DataFrame. +#' +#' The result of this algorithm has the following deterministic bound: +#' If the DataFrame has N elements and if we request the quantile at probability `p` up to error +#' `err`, then the algorithm will return a sample `x` from the DataFrame so that the *exact* rank +#' of `x` is close to (p * N). More precisely, +#' floor((p - err) * N) <= rank(x) <= ceil((p + err) * N). +#' This method implements a variation of the Greenwald-Khanna algorithm (with some speed +#' optimizations). The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670 +#' Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna. +#' +#' @param x A SparkSQL DataFrame. +#' @param col The name of the numerical column. +#' @param probabilities A list of quantile probabilities. Each number must belong to [0, 1]. +#' For example 0 is the minimum, 0.5 is the median, 1 is the maximum. +#' @param relativeError The relative target precision to achieve (>= 0). If set to zero, +#' the exact quantiles are computed, which could be very expensive. +#' Note that values greater than 1 are accepted but give the same result as 1. +#' @return The approximate quantiles at the given probabilities. +#' +#' @rdname statfunctions +#' @name approxQuantile +#' @export +#' @examples +#' \dontrun{ +#' df <- jsonFile(sqlContext, "/path/to/file.json") +#' quantiles <- approxQuantile(df, "key", c(0.5, 0.8), 0.0) +#' } +setMethod("approxQuantile", + signature(x = "DataFrame", col = "character", +probabilities = "numeric", relativeError = "numeric"), + function(x, col, probabilities, relativeError) { +statFunctions <- callJMethod(x@sdf, "stat") +
spark git commit: [SPARK-12363] [MLLIB] [BACKPORT-1.3] Remove setRun and fix PowerIterationClustering failed test
Repository: spark Updated Branches: refs/heads/branch-1.3 6ddde8eda -> 65cc451c8 [SPARK-12363] [MLLIB] [BACKPORT-1.3] Remove setRun and fix PowerIterationClustering failed test ## What changes were proposed in this pull request? Backport JIRA-SPARK-12363 to branch-1.3. ## How was the this patch tested? Unit test. cc mengxr Author: Liang-Chi HsiehAuthor: Xiangrui Meng Closes #11265 from viirya/backport-12363-1.3 and squashes the following commits: ec076dd [Liang-Chi Hsieh] Fix scala style. 7a3ef5f [Xiangrui Meng] use Graph instead of GraphImpl and update tests and example based on PIC paper b86018d [Liang-Chi Hsieh] Remove setRun and fix PowerIterationClustering failed test. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/65cc451c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/65cc451c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/65cc451c Branch: refs/heads/branch-1.3 Commit: 65cc451c89fd03daed8c315a91d93067ccdc3a5c Parents: 6ddde8e Author: Liang-Chi Hsieh Authored: Thu Feb 25 21:15:59 2016 -0800 Committer: Xiangrui Meng Committed: Thu Feb 25 21:15:59 2016 -0800 -- .../mllib/PowerIterationClusteringExample.scala | 53 .../clustering/PowerIterationClustering.scala | 18 --- .../PowerIterationClusteringSuite.scala | 48 +++--- 3 files changed, 60 insertions(+), 59 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/65cc451c/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala index 3f98ddd..c5b171d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala @@ -39,27 +39,23 @@ import org.apache.spark.{SparkConf, SparkContext} * n: Number of sampled points on innermost circle.. There are proportionally more points * within the outer/larger circles * maxIterations: Number of Power Iterations - * outerRadius: radius of the outermost of the concentric circles * }}} * * Here is a sample run and output: * - * ./bin/run-example mllib.PowerIterationClusteringExample -k 3 --n 30 --maxIterations 15 - * - * Cluster assignments: 1 -> [0,1,2,3,4],2 -> [5,6,7,8,9,10,11,12,13,14], - * 0 -> [15,16,17,18,19,20,21,22,23,24,25,26,27,28,29] + * ./bin/run-example mllib.PowerIterationClusteringExample -k 2 --n 10 --maxIterations 15 * + * Cluster assignments: 1 -> [0,1,2,3,4,5,6,7,8,9], + * 0 -> [10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29] * * If you use it as a template to create your own app, please use `spark-submit` to submit your app. */ object PowerIterationClusteringExample { case class Params( - input: String = null, - k: Int = 3, - numPoints: Int = 5, - maxIterations: Int = 10, - outerRadius: Double = 3.0 + k: Int = 2, + numPoints: Int = 10, + maxIterations: Int = 15 ) extends AbstractParams[Params] def main(args: Array[String]) { @@ -68,7 +64,7 @@ object PowerIterationClusteringExample { val parser = new OptionParser[Params]("PowerIterationClusteringExample") { head("PowerIterationClusteringExample: an example PIC app using concentric circles.") opt[Int]('k', "k") -.text(s"number of circles (/clusters), default: ${defaultParams.k}") +.text(s"number of circles (clusters), default: ${defaultParams.k}") .action((x, c) => c.copy(k = x)) opt[Int]('n', "n") .text(s"number of points in smallest circle, default: ${defaultParams.numPoints}") @@ -76,9 +72,6 @@ object PowerIterationClusteringExample { opt[Int]("maxIterations") .text(s"number of iterations, default: ${defaultParams.maxIterations}") .action((x, c) => c.copy(maxIterations = x)) - opt[Double]('r', "r") -.text(s"radius of outermost circle, default: ${defaultParams.outerRadius}") -.action((x, c) => c.copy(outerRadius = x)) } parser.parse(args, defaultParams).map { params => @@ -96,20 +89,21 @@ object PowerIterationClusteringExample { Logger.getRootLogger.setLevel(Level.WARN) -val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, params.outerRadius) +val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints) val model = new PowerIterationClustering()
spark git commit: [SPARK-13028] [ML] Add MaxAbsScaler to ML.feature as a transformer
Repository: spark Updated Branches: refs/heads/master 1b39fafa7 -> 90d07154c [SPARK-13028] [ML] Add MaxAbsScaler to ML.feature as a transformer jira: https://issues.apache.org/jira/browse/SPARK-13028 MaxAbsScaler works in a very similar way as MinMaxScaler, but scales in a way that the training data lies within the range [-1, 1] by dividing through the largest maximum value in each feature. The motivation to use this scaling includes robustness to very small standard deviations of features and preserving zero entries in sparse data. Unlike StandardScaler and MinMaxScaler, MaxAbsScaler does not shift/center the data, and thus does not destroy any sparsity. Something similar from sklearn: http://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MaxAbsScaler.html#sklearn.preprocessing.MaxAbsScaler Author: Yuhao YangCloses #10939 from hhbyyh/maxabs and squashes the following commits: fd8bdcd [Yuhao Yang] add tag and some optimization on fit 648fced [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into maxabs 75bebc2 [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into maxabs cb10bb6 [Yuhao Yang] remove minmax 91ef8f3 [Yuhao Yang] ut added 8ab0747 [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into maxabs a9215b5 [Yuhao Yang] max abs scaler Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90d07154 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90d07154 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90d07154 Branch: refs/heads/master Commit: 90d07154c2cef3d1095cb3caeafa7003218a3e49 Parents: 1b39faf Author: Yuhao Yang Authored: Thu Feb 25 21:04:35 2016 -0800 Committer: Xiangrui Meng Committed: Thu Feb 25 21:04:35 2016 -0800 -- .../apache/spark/ml/feature/MaxAbsScaler.scala | 176 +++ .../spark/ml/feature/MaxAbsScalerSuite.scala| 70 2 files changed, 246 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/90d07154/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala new file mode 100644 index 000..15c308b --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.hadoop.fs.Path + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param.{ParamMap, Params} +import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.linalg.{Vector, Vectors, VectorUDT} +import org.apache.spark.mllib.stat.Statistics +import org.apache.spark.sql._ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * Params for [[MaxAbsScaler]] and [[MaxAbsScalerModel]]. + */ +private[feature] trait MaxAbsScalerParams extends Params with HasInputCol with HasOutputCol { + + /** Validates and transforms the input schema. */ + protected def validateAndTransformSchema(schema: StructType): StructType = { +validateParams() +val inputType = schema($(inputCol)).dataType +require(inputType.isInstanceOf[VectorUDT], + s"Input column ${$(inputCol)} must be a vector column") +require(!schema.fieldNames.contains($(outputCol)), + s"Output column ${$(outputCol)} already exists.") +val outputFields = schema.fields :+ StructField($(outputCol), new VectorUDT, false) +StructType(outputFields) + } +} + +/** + * :: Experimental :: + * Rescale each feature individually to range [-1, 1] by dividing through the largest maximum + * absolute value in each feature. It does not
spark git commit: [SPARK-13361][SQL] Add benchmark codes for Encoder#compress() in CompressionSchemeBenchmark
Repository: spark Updated Branches: refs/heads/master 633d63a48 -> 1b39fafa7 [SPARK-13361][SQL] Add benchmark codes for Encoder#compress() in CompressionSchemeBenchmark This pr added benchmark codes for Encoder#compress(). Also, it replaced the benchmark results with new ones because the output format of `Benchmark` changed. Author: Takeshi YAMAMUROCloses #11236 from maropu/CompressionSpike. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b39fafa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b39fafa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b39fafa Branch: refs/heads/master Commit: 1b39fafa75a162f183824ff2daa61d73b05ebc83 Parents: 633d63a Author: Takeshi YAMAMURO Authored: Thu Feb 25 20:17:48 2016 -0800 Committer: Reynold Xin Committed: Thu Feb 25 20:17:48 2016 -0800 -- .../CompressionSchemeBenchmark.scala| 282 +-- 1 file changed, 193 insertions(+), 89 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1b39fafa/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala index 95eb5cf..a5d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala @@ -17,19 +17,13 @@ package org.apache.spark.sql.execution.columnar.compression -import java.nio.ByteBuffer -import java.nio.ByteOrder +import java.nio.{ByteBuffer, ByteOrder} import org.apache.commons.lang3.RandomStringUtils import org.apache.commons.math3.distribution.LogNormalDistribution import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericMutableRow} -import org.apache.spark.sql.execution.columnar.BOOLEAN -import org.apache.spark.sql.execution.columnar.INT -import org.apache.spark.sql.execution.columnar.LONG -import org.apache.spark.sql.execution.columnar.NativeColumnType -import org.apache.spark.sql.execution.columnar.SHORT -import org.apache.spark.sql.execution.columnar.STRING +import org.apache.spark.sql.execution.columnar.{BOOLEAN, INT, LONG, NativeColumnType, SHORT, STRING} import org.apache.spark.sql.types.AtomicType import org.apache.spark.util.Benchmark import org.apache.spark.util.Utils._ @@ -53,35 +47,70 @@ object CompressionSchemeBenchmark extends AllCompressionSchemes { () => rng.sample } - private[this] def runBenchmark[T <: AtomicType]( + private[this] def prepareEncodeInternal[T <: AtomicType]( +count: Int, +tpe: NativeColumnType[T], +supportedScheme: CompressionScheme, +input: ByteBuffer): ((ByteBuffer, ByteBuffer) => ByteBuffer, Double, ByteBuffer) = { +assert(supportedScheme.supports(tpe)) + +def toRow(d: Any) = new GenericInternalRow(Array[Any](d)) +val encoder = supportedScheme.encoder(tpe) +for (i <- 0 until count) { + encoder.gatherCompressibilityStats(toRow(tpe.extract(input)), 0) +} +input.rewind() + +val compressedSize = if (encoder.compressedSize == 0) { + input.remaining() +} else { + encoder.compressedSize +} + +(encoder.compress, encoder.compressionRatio, allocateLocal(4 + compressedSize)) + } + + private[this] def runEncodeBenchmark[T <: AtomicType]( name: String, iters: Int, count: Int, tpe: NativeColumnType[T], input: ByteBuffer): Unit = { - val benchmark = new Benchmark(name, iters * count) schemes.filter(_.supports(tpe)).map { scheme => - def toRow(d: Any) = new GenericInternalRow(Array[Any](d)) - val encoder = scheme.encoder(tpe) - for (i <- 0 until count) { -encoder.gatherCompressibilityStats(toRow(tpe.extract(input)), 0) - } - input.rewind() + val (compressFunc, compressionRatio, buf) = prepareEncodeInternal(count, tpe, scheme, input) + val label = s"${getFormattedClassName(scheme)}(${compressionRatio.formatted("%.3f")})" - val label = s"${getFormattedClassName(scheme)}(${encoder.compressionRatio.formatted("%.3f")})" benchmark.addCase(label)({ i: Int => -val compressedSize = if (encoder.compressedSize == 0) { - input.remaining() -} else { - encoder.compressedSize +for (n <- 0L until iters) { + compressFunc(input, buf) + input.rewind() +
[2/2] spark git commit: [SPARK-12757] Add block-level read/write locks to BlockManager
[SPARK-12757] Add block-level read/write locks to BlockManager ## Motivation As a pre-requisite to off-heap caching of blocks, we need a mechanism to prevent pages / blocks from being evicted while they are being read. With on-heap objects, evicting a block while it is being read merely leads to memory-accounting problems (because we assume that an evicted block is a candidate for garbage-collection, which will not be true during a read), but with off-heap memory this will lead to either data corruption or segmentation faults. ## Changes ### BlockInfoManager and reader/writer locks This patch adds block-level read/write locks to the BlockManager. It introduces a new `BlockInfoManager` component, which is contained within the `BlockManager`, holds the `BlockInfo` objects that the `BlockManager` uses for tracking block metadata, and exposes APIs for locking blocks in either shared read or exclusive write modes. `BlockManager`'s `get*()` and `put*()` methods now implicitly acquire the necessary locks. After a `get()` call successfully retrieves a block, that block is locked in a shared read mode. A `put()` call will block until it acquires an exclusive write lock. If the write succeeds, the write lock will be downgraded to a shared read lock before returning to the caller. This `put()` locking behavior allows us store a block and then immediately turn around and read it without having to worry about it having been evicted between the write and the read, which will allow us to significantly simplify `CacheManager` in the future (see #10748). See `BlockInfoManagerSuite`'s test cases for a more detailed specification of the locking semantics. ### Auto-release of locks at the end of tasks Our locking APIs support explicit release of locks (by calling `unlock()`), but it's not always possible to guarantee that locks will be released prior to the end of the task. One reason for this is our iterator interface: since our iterators don't support an explicit `close()` operator to signal that no more records will be consumed, operations like `take()` or `limit()` don't have a good means to release locks on their input iterators' blocks. Another example is broadcast variables, whose block locks can only be released at the end of the task. To address this, `BlockInfoManager` uses a pair of maps to track the set of locks acquired by each task. Lock acquisitions automatically record the current task attempt id by obtaining it from `TaskContext`. When a task finishes, code in `Executor` calls `BlockInfoManager.unlockAllLocksForTask(taskAttemptId)` to free locks. ### Locking and the MemoryStore In order to prevent in-memory blocks from being evicted while they are being read, the `MemoryStore`'s `evictBlocksToFreeSpace()` method acquires write locks on blocks which it is considering as candidates for eviction. These lock acquisitions are non-blocking, so a block which is being read will not be evicted. By holding write locks until the eviction is performed or skipped (in case evicting the blocks would not free enough memory), we avoid a race where a new reader starts to read a block after the block has been marked as an eviction candidate but before it has been removed. ### Locking and remote block transfer This patch makes small changes to to block transfer and network layer code so that locks acquired by the BlockTransferService are released as soon as block transfer messages are consumed and released by Netty. This builds on top of #11193, a bug fix related to freeing of network layer ManagedBuffers. ## FAQ - **Why not use Java's built-in [`ReadWriteLock`](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReadWriteLock.html)?** Our locks operate on a per-task rather than per-thread level. Under certain circumstances a task may consist of multiple threads, so using `ReadWriteLock` would mean that we might call `unlock()` from a thread which didn't hold the lock in question, an operation which has undefined semantics. If we could rely on Java 8 classes, we might be able to use [`StampedLock`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/StampedLock.html) to work around this issue. - **Why not detect "leaked" locks in tests?**: See above notes about `take()` and `limit`. Author: Josh RosenCloses #10705 from JoshRosen/pin-pages. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/633d63a4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/633d63a4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/633d63a4 Branch: refs/heads/master Commit: 633d63a48ad98754dc7c56f9ac150fc2aa4e42c5 Parents: 7129957 Author: Josh Rosen Authored: Thu Feb 25 17:17:56 2016 -0800 Committer: Andrew Or Committed: Thu Feb 25 17:17:56 2016 -0800
[1/2] spark git commit: [SPARK-12757] Add block-level read/write locks to BlockManager
Repository: spark Updated Branches: refs/heads/master 712995757 -> 633d63a48 http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index e1b2c96..e4ab9ee 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -45,6 +45,8 @@ import org.apache.spark.util._ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach with PrivateMethodTester with ResetSystemProperties { + import BlockManagerSuite._ + var conf: SparkConf = null var store: BlockManager = null var store2: BlockManager = null @@ -66,6 +68,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE maxMem: Long, name: String = SparkContext.DRIVER_IDENTIFIER, master: BlockManagerMaster = this.master): BlockManager = { +val serializer = new KryoSerializer(conf) val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf, @@ -169,14 +172,14 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a3 = new Array[Byte](4000) // Putting a1, a2 and a3 in memory and telling master only about a1 and a2 -store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) -store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) -store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false) +store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY) +store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY) +store.putSingleAndReleaseLock("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false) // Checking whether blocks are in memory -assert(store.getSingle("a1").isDefined, "a1 was not in store") -assert(store.getSingle("a2").isDefined, "a2 was not in store") -assert(store.getSingle("a3").isDefined, "a3 was not in store") +assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store") +assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store") +assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store") // Checking whether master knows about the blocks or not assert(master.getLocations("a1").size > 0, "master was not told about a1") @@ -184,10 +187,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getLocations("a3").size === 0, "master was told about a3") // Drop a1 and a2 from memory; this should be reported back to the master -store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer]) -store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer]) -assert(store.getSingle("a1") === None, "a1 not removed from store") -assert(store.getSingle("a2") === None, "a2 not removed from store") +store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ByteBuffer]) +store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ByteBuffer]) +assert(store.getSingleAndReleaseLock("a1") === None, "a1 not removed from store") +assert(store.getSingleAndReleaseLock("a2") === None, "a2 not removed from store") assert(master.getLocations("a1").size === 0, "master did not remove a1") assert(master.getLocations("a2").size === 0, "master did not remove a2") } @@ -202,7 +205,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) -store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2) +store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_2) store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2) assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1") assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2") @@ -215,17 +218,17 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val a3 = new Array[Byte](4000) // Putting a1, a2 and a3 in memory and telling master only about a1 and a2 -store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY) -store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY) -store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false) +store.putSingleAndReleaseLock("a1-to-remove", a1, StorageLevel.MEMORY_ONLY) +store.putSingleAndReleaseLock("a2-to-remove", a2,
spark git commit: [SPARK-13387][MESOS] Add support for SPARK_DAEMON_JAVA_OPTS with MesosClusterDispatcher.
Repository: spark Updated Branches: refs/heads/master f2cfafdfe -> 712995757 [SPARK-13387][MESOS] Add support for SPARK_DAEMON_JAVA_OPTS with MesosClusterDispatcher. ## What changes were proposed in this pull request? Add support for SPARK_DAEMON_JAVA_OPTS with MesosClusterDispatcher. ## How was the this patch tested? Manual testing by launching dispatcher with SPARK_DAEMON_JAVA_OPTS Author: Timothy ChenCloses #11277 from tnachen/cluster_dispatcher_opts. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71299575 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71299575 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71299575 Branch: refs/heads/master Commit: 712995757c22a0bd76e4ccb552446372acf2cc2e Parents: f2cfafd Author: Timothy Chen Authored: Thu Feb 25 17:07:58 2016 -0800 Committer: Andrew Or Committed: Thu Feb 25 17:07:58 2016 -0800 -- .../org/apache/spark/launcher/SparkClassCommandBuilder.java| 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/71299575/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java -- diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java index 931a24c..e575fd3 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java @@ -48,8 +48,8 @@ class SparkClassCommandBuilder extends AbstractCommandBuilder { String memKey = null; String extraClassPath = null; -// Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + -// SPARK_DAEMON_MEMORY. +// Master, Worker, HistoryServer, ExternalShuffleService, MesosClusterDispatcher use +// SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY. if (className.equals("org.apache.spark.deploy.master.Master")) { javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); javaOptsKeys.add("SPARK_MASTER_OPTS"); @@ -69,6 +69,8 @@ class SparkClassCommandBuilder extends AbstractCommandBuilder { } else if (className.equals("org.apache.spark.executor.MesosExecutorBackend")) { javaOptsKeys.add("SPARK_EXECUTOR_OPTS"); memKey = "SPARK_EXECUTOR_MEMORY"; +} else if (className.equals("org.apache.spark.deploy.mesos.MesosClusterDispatcher")) { + javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); } else if (className.equals("org.apache.spark.deploy.ExternalShuffleService") || className.equals("org.apache.spark.deploy.mesos.MesosExternalShuffleService")) { javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13501] Remove use of Guava Stopwatch
Repository: spark Updated Branches: refs/heads/master 7a6ee8a8f -> f2cfafdfe [SPARK-13501] Remove use of Guava Stopwatch Our nightly doc snapshot builds are failing due to some issue involving the Guava Stopwatch constructor: ``` [error] /home/jenkins/workspace/spark-master-docs/spark/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala:496: constructor Stopwatch in class Stopwatch cannot be accessed in class CoarseMesosSchedulerBackend [error] val stopwatch = new Stopwatch() [error] ^ ``` This Stopwatch constructor was deprecated in newer versions of Guava (https://github.com/google/guava/commit/fd0cbc2c5c90e85fb22c8e86ea19630032090943) and it's possible that some classpath issues affecting Unidoc could be causing this to trigger compilation failures. In order to work around this issue, this patch removes this use of Stopwatch since we don't use it anywhere else in the Spark codebase. Author: Josh RosenCloses #11376 from JoshRosen/remove-stopwatch. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f2cfafdf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f2cfafdf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f2cfafdf Branch: refs/heads/master Commit: f2cfafdfe0f4b18f31bc63969e2abced1a66e896 Parents: 7a6ee8a Author: Josh Rosen Authored: Thu Feb 25 17:04:43 2016 -0800 Committer: Andrew Or Committed: Thu Feb 25 17:04:43 2016 -0800 -- .../scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala | 7 ++- 1 file changed, 2 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f2cfafdf/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index f803cc7..622f361 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -19,14 +19,12 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File import java.util.{Collections, List => JList} -import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.{Buffer, HashMap, HashSet} -import com.google.common.base.Stopwatch import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver} import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} @@ -493,12 +491,11 @@ private[spark] class CoarseMesosSchedulerBackend( // Wait for executors to report done, or else mesosDriver.stop() will forcefully kill them. // See SPARK-12330 -val stopwatch = new Stopwatch() -stopwatch.start() +val startTime = System.nanoTime() // slaveIdsWithExecutors has no memory barrier, so this is eventually consistent while (numExecutors() > 0 && - stopwatch.elapsed(TimeUnit.MILLISECONDS) < shutdownTimeoutMS) { + System.nanoTime() - startTime < shutdownTimeoutMS * 1000L * 1000L) { Thread.sleep(100) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12009][YARN] Avoid to re-allocating yarn container while driver want to stop all Executors
Repository: spark Updated Branches: refs/heads/master dc6c5ea4c -> 7a6ee8a8f [SPARK-12009][YARN] Avoid to re-allocating yarn container while driver want to stop all Executors Author: hushanCloses #9992 from suyanNone/tricky. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a6ee8a8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a6ee8a8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a6ee8a8 Branch: refs/heads/master Commit: 7a6ee8a8fe0fad78416ed7e1ac694959de5c5314 Parents: dc6c5ea Author: hushan Authored: Thu Feb 25 16:57:41 2016 -0800 Committer: Andrew Or Committed: Thu Feb 25 16:57:41 2016 -0800 -- .../org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala | 3 +++ 1 file changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7a6ee8a8/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 1431bce..ca26277 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -83,6 +83,9 @@ private[spark] abstract class YarnSchedulerBackend( override def stop(): Unit = { try { + // SPARK-12009: To prevent Yarn allocator from requesting backup for the executors which + // was Stopped by SchedulerBackend. + requestTotalExecutors(0, 0, Map.empty) super.stop() } finally { services.stop() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13468][WEB UI] Fix a corner case where the Stage UI page should show DAG but it doesn't show
Repository: spark Updated Branches: refs/heads/master 35316cb0b -> dc6c5ea4c [SPARK-13468][WEB UI] Fix a corner case where the Stage UI page should show DAG but it doesn't show When uses clicks more than one time on any stage in the DAG graph on the *Job* web UI page, many new *Stage* web UI pages are opened, but only half of their DAG graphs are expanded. After this PR's fix, every newly opened *Stage* page's DAG graph is expanded. Before: ![](https://cloud.githubusercontent.com/assets/15843379/13279144/74808e86-db10-11e5-8514-cecf31af8908.png) After: ![](https://cloud.githubusercontent.com/assets/15843379/13279145/77ca5dec-db10-11e5-9457-8e1985461328.png) ## What changes were proposed in this pull request? - Removed the `expandDagViz` parameter for _Stage_ page and related codes - Added a `onclick` function setting `expandDagVizArrowKey(false)` as `true` ## How was this patch tested? Manual tests (with this fix) to verified this fix work: - clicked many times on _Job_ Page's DAG Graph â each newly opened Stage page's DAG graph is expanded Manual tests (with this fix) to verified this fix do not break features we already had: - refreshed many times for a same _Stage_ page (whose DAG already expanded) â DAG remained expanded upon every refresh - refreshed many times for a same _Stage_ page (whose DAG unexpanded) â DAG remained unexpanded upon every refresh - refreshed many times for a same _Job_ page (whose DAG already expanded) â DAG remained expanded upon every refresh - refreshed many times for a same _Job_ page (whose DAG unexpanded) â DAG remained unexpanded upon every refresh Author: Liwei LinCloses #11368 from proflin/SPARK-13468. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc6c5ea4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc6c5ea4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc6c5ea4 Branch: refs/heads/master Commit: dc6c5ea4c91c387deb87764c86c4f40ea71657b7 Parents: 35316cb Author: Liwei Lin Authored: Thu Feb 25 15:36:25 2016 -0800 Committer: Shixiong Zhu Committed: Thu Feb 25 15:36:25 2016 -0800 -- .../org/apache/spark/ui/static/spark-dag-viz.js | 3 ++- core/src/main/scala/org/apache/spark/ui/UIUtils.scala | 7 --- .../main/scala/org/apache/spark/ui/jobs/StagePage.scala | 12 3 files changed, 2 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dc6c5ea4/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js index 4337c42..1b0d469 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js @@ -222,10 +222,11 @@ function renderDagVizForJob(svgContainer) { var attemptId = 0 var stageLink = d3.select("#stage-" + stageId + "-" + attemptId) .select("a.name-link") -.attr("href") + "=true"; +.attr("href"); container = svgContainer .append("a") .attr("xlink:href", stageLink) +.attr("onclick", "window.localStorage.setItem(expandDagVizArrowKey(false), true)") .append("g") .attr("id", containerId); } http://git-wip-us.apache.org/repos/asf/spark/blob/dc6c5ea4/core/src/main/scala/org/apache/spark/ui/UIUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index ddd7f71..0493513 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -408,13 +408,6 @@ private[spark] object UIUtils extends Logging { } - /** Return a script element that automatically expands the DAG visualization on page load. */ - def expandDagVizOnLoad(forJob: Boolean): Seq[Node] = { - - {Unparsed("$(document).ready(function() { toggleDagViz(" + forJob + ") });")} - - } - /** * Returns HTML rendering of a job or stage description. It will try to parse the string as HTML * and make sure that it only contains anchors with root-relative links. Otherwise, http://git-wip-us.apache.org/repos/asf/spark/blob/dc6c5ea4/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
spark git commit: [SPARK-13292] [ML] [PYTHON] QuantileDiscretizer should take random seed in PySpark
Repository: spark Updated Branches: refs/heads/master 14e2700de -> 35316cb0b [SPARK-13292] [ML] [PYTHON] QuantileDiscretizer should take random seed in PySpark ## What changes were proposed in this pull request? QuantileDiscretizer in Python should also specify a random seed. ## How was this patch tested? unit tests Author: Yu ISHIKAWACloses #11362 from yu-iskw/SPARK-13292 and squashes the following commits: 02ffa76 [Yu ISHIKAWA] [SPARK-13292][ML][PYTHON] QuantileDiscretizer should take random seed in PySpark Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/35316cb0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/35316cb0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/35316cb0 Branch: refs/heads/master Commit: 35316cb0b744bef9bcb390411ddc321167f953be Parents: 14e2700 Author: Yu ISHIKAWA Authored: Thu Feb 25 13:29:10 2016 -0800 Committer: Xiangrui Meng Committed: Thu Feb 25 13:29:10 2016 -0800 -- python/pyspark/ml/feature.py | 14 -- 1 file changed, 8 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/35316cb0/python/pyspark/ml/feature.py -- diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 464c944..67bccfa 100644 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -939,7 +939,7 @@ class PolynomialExpansion(JavaTransformer, HasInputCol, HasOutputCol): @inherit_doc -class QuantileDiscretizer(JavaEstimator, HasInputCol, HasOutputCol): +class QuantileDiscretizer(JavaEstimator, HasInputCol, HasOutputCol, HasSeed): """ .. note:: Experimental @@ -951,7 +951,9 @@ class QuantileDiscretizer(JavaEstimator, HasInputCol, HasOutputCol): >>> df = sqlContext.createDataFrame([(0.1,), (0.4,), (1.2,), (1.5,)], ["values"]) >>> qds = QuantileDiscretizer(numBuckets=2, -... inputCol="values", outputCol="buckets") +... inputCol="values", outputCol="buckets", seed=123) +>>> qds.getSeed() +123 >>> bucketizer = qds.fit(df) >>> splits = bucketizer.getSplits() >>> splits[0] @@ -971,9 +973,9 @@ class QuantileDiscretizer(JavaEstimator, HasInputCol, HasOutputCol): "categories) into which data points are grouped. Must be >= 2. Default 2.") @keyword_only -def __init__(self, numBuckets=2, inputCol=None, outputCol=None): +def __init__(self, numBuckets=2, inputCol=None, outputCol=None, seed=None): """ -__init__(self, numBuckets=2, inputCol=None, outputCol=None) +__init__(self, numBuckets=2, inputCol=None, outputCol=None, seed=None) """ super(QuantileDiscretizer, self).__init__() self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.QuantileDiscretizer", @@ -987,9 +989,9 @@ class QuantileDiscretizer(JavaEstimator, HasInputCol, HasOutputCol): @keyword_only @since("2.0.0") -def setParams(self, numBuckets=2, inputCol=None, outputCol=None): +def setParams(self, numBuckets=2, inputCol=None, outputCol=None, seed=None): """ -setParams(self, numBuckets=2, inputCol=None, outputCol=None) +setParams(self, numBuckets=2, inputCol=None, outputCol=None, seed=None) Set the params for the QuantileDiscretizer """ kwargs = self.setParams._input_kwargs - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12874][ML] ML StringIndexer does not protect itself from column name duplication
Repository: spark Updated Branches: refs/heads/branch-1.6 d59a08f7c -> abe8f991a [SPARK-12874][ML] ML StringIndexer does not protect itself from column name duplication ## What changes were proposed in this pull request? ML StringIndexer does not protect itself from column name duplication. We should still improve a way to validate a schema of `StringIndexer` and `StringIndexerModel`. However, it would be great to fix at another issue. ## How was this patch tested? unit test Author: Yu ISHIKAWACloses #11370 from yu-iskw/SPARK-12874. (cherry picked from commit 14e2700de29d06460179a94cc9816bcd37344cf7) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abe8f991 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abe8f991 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abe8f991 Branch: refs/heads/branch-1.6 Commit: abe8f991a32bef92fbbcd2911836bb7d8e61ca97 Parents: d59a08f Author: Yu ISHIKAWA Authored: Thu Feb 25 13:21:33 2016 -0800 Committer: Xiangrui Meng Committed: Thu Feb 25 13:23:44 2016 -0800 -- .../org/apache/spark/ml/feature/StringIndexer.scala | 1 + .../org/apache/spark/ml/feature/StringIndexerSuite.scala | 11 +++ 2 files changed, 12 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/abe8f991/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala index 5c40c35..b3413a1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala @@ -149,6 +149,7 @@ class StringIndexerModel ( "Skip StringIndexerModel.") return dataset } +validateAndTransformSchema(dataset.schema) val indexer = udf { label: String => if (labelToIndex.contains(label)) { http://git-wip-us.apache.org/repos/asf/spark/blob/abe8f991/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala index 749bfac..26f4613 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala @@ -118,6 +118,17 @@ class StringIndexerSuite assert(indexerModel.transform(df).eq(df)) } + test("StringIndexerModel can't overwrite output column") { +val df = sqlContext.createDataFrame(Seq((1, 2), (3, 4))).toDF("input", "output") +val indexer = new StringIndexer() + .setInputCol("input") + .setOutputCol("output") + .fit(df) +intercept[IllegalArgumentException] { + indexer.transform(df) +} + } + test("StringIndexer read/write") { val t = new StringIndexer() .setInputCol("myInputCol") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12874][ML] ML StringIndexer does not protect itself from column name duplication
Repository: spark Updated Branches: refs/heads/master fb8bb0476 -> 14e2700de [SPARK-12874][ML] ML StringIndexer does not protect itself from column name duplication ## What changes were proposed in this pull request? ML StringIndexer does not protect itself from column name duplication. We should still improve a way to validate a schema of `StringIndexer` and `StringIndexerModel`. However, it would be great to fix at another issue. ## How was this patch tested? unit test Author: Yu ISHIKAWACloses #11370 from yu-iskw/SPARK-12874. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14e2700d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14e2700d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14e2700d Branch: refs/heads/master Commit: 14e2700de29d06460179a94cc9816bcd37344cf7 Parents: fb8bb04 Author: Yu ISHIKAWA Authored: Thu Feb 25 13:21:33 2016 -0800 Committer: Xiangrui Meng Committed: Thu Feb 25 13:21:33 2016 -0800 -- .../org/apache/spark/ml/feature/StringIndexer.scala | 1 + .../org/apache/spark/ml/feature/StringIndexerSuite.scala | 11 +++ 2 files changed, 12 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/14e2700d/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala index 912bd95..555f113 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala @@ -150,6 +150,7 @@ class StringIndexerModel ( "Skip StringIndexerModel.") return dataset } +validateAndTransformSchema(dataset.schema) val indexer = udf { label: String => if (labelToIndex.contains(label)) { http://git-wip-us.apache.org/repos/asf/spark/blob/14e2700d/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala index 5d199ca..0dbaed2 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala @@ -118,6 +118,17 @@ class StringIndexerSuite assert(indexerModel.transform(df).eq(df)) } + test("StringIndexerModel can't overwrite output column") { +val df = sqlContext.createDataFrame(Seq((1, 2), (3, 4))).toDF("input", "output") +val indexer = new StringIndexer() + .setInputCol("input") + .setOutputCol("output") + .fit(df) +intercept[IllegalArgumentException] { + indexer.transform(df) +} + } + test("StringIndexer read/write") { val t = new StringIndexer() .setInputCol("myInputCol") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13069][STREAMING] Add "ask" style store() to ActorReciever
Repository: spark Updated Branches: refs/heads/master 751724b13 -> fb8bb0476 [SPARK-13069][STREAMING] Add "ask" style store() to ActorReciever Introduces a "ask" style ```store``` in ```ActorReceiver``` as a way to allow actor receiver blocked by back pressure or maxRate. Author: Lin ZhaoCloses #11176 from lin-zhao/SPARK-13069. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb8bb047 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb8bb047 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb8bb047 Branch: refs/heads/master Commit: fb8bb04766005e8935607069c0155d639f407e8a Parents: 751724b Author: Lin Zhao Authored: Thu Feb 25 12:32:17 2016 -0800 Committer: Shixiong Zhu Committed: Thu Feb 25 12:32:24 2016 -0800 -- .../spark/streaming/akka/ActorReceiver.scala| 39 +++- .../streaming/akka/JavaAkkaUtilsSuite.java | 2 + .../spark/streaming/akka/AkkaUtilsSuite.scala | 3 ++ 3 files changed, 43 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fb8bb047/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala -- diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala index c75dc92..33415c1 100644 --- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala @@ -20,12 +20,15 @@ package org.apache.spark.streaming.akka import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicInteger +import scala.concurrent.Future import scala.concurrent.duration._ import scala.language.postfixOps import scala.reflect.ClassTag import akka.actor._ import akka.actor.SupervisorStrategy.{Escalate, Restart} +import akka.pattern.ask +import akka.util.Timeout import com.typesafe.config.ConfigFactory import org.apache.spark.{Logging, TaskContext} @@ -105,13 +108,26 @@ abstract class ActorReceiver extends Actor { } /** - * Store a single item of received data to Spark's memory. + * Store a single item of received data to Spark's memory asynchronously. * These single items will be aggregated together into data blocks before * being pushed into Spark's memory. */ def store[T](item: T) { context.parent ! SingleItemData(item) } + + /** + * Store a single item of received data to Spark's memory and returns a `Future`. + * The `Future` will be completed when the operator finishes, or with an + * `akka.pattern.AskTimeoutException` after the given timeout has expired. + * These single items will be aggregated together into data blocks before + * being pushed into Spark's memory. + * + * This method allows the user to control the flow speed using `Future` + */ + def store[T](item: T, timeout: Timeout): Future[Unit] = { +context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher) + } } /** @@ -162,6 +178,19 @@ abstract class JavaActorReceiver extends UntypedActor { def store[T](item: T) { context.parent ! SingleItemData(item) } + + /** + * Store a single item of received data to Spark's memory and returns a `Future`. + * The `Future` will be completed when the operator finishes, or with an + * `akka.pattern.AskTimeoutException` after the given timeout has expired. + * These single items will be aggregated together into data blocks before + * being pushed into Spark's memory. + * + * This method allows the user to control the flow speed using `Future` + */ + def store[T](item: T, timeout: Timeout): Future[Unit] = { +context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher) + } } /** @@ -179,8 +208,10 @@ case class Statistics(numberOfMsgs: Int, /** Case class to receive data sent by child actors */ private[akka] sealed trait ActorReceiverData private[akka] case class SingleItemData[T](item: T) extends ActorReceiverData +private[akka] case class AskStoreSingleItemData[T](item: T) extends ActorReceiverData private[akka] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData +private[akka] object Ack extends ActorReceiverData /** * Provides Actors as receivers for receiving stream. @@ -233,6 +264,12 @@ private[akka] class ActorReceiverSupervisor[T: ClassTag]( store(msg.asInstanceOf[T]) n.incrementAndGet + case AskStoreSingleItemData(msg) => +
spark git commit: [SPARK-13464][STREAMING][PYSPARK] Fix failed streaming in pyspark in branch 1.3
Repository: spark Updated Branches: refs/heads/branch-1.3 387d81891 -> 6ddde8eda [SPARK-13464][STREAMING][PYSPARK] Fix failed streaming in pyspark in branch 1.3 JIRA: https://issues.apache.org/jira/browse/SPARK-13464 ## What changes were proposed in this pull request? During backport a mllib feature, I found that the clearly checkouted branch-1.3 codebase would fail at the test `test_reduce_by_key_and_window_with_none_invFunc` in pyspark/streaming. We should fix it. ## How was the this patch tested? Unit test `test_reduce_by_key_and_window_with_none_invFunc` is fixed. Author: Liang-Chi HsiehCloses #11339 from viirya/fix-streaming-test-branch-1.3. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ddde8ed Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ddde8ed Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ddde8ed Branch: refs/heads/branch-1.3 Commit: 6ddde8eda28538e8f899912477a821f8193262ba Parents: 387d818 Author: Liang-Chi Hsieh Authored: Thu Feb 25 12:28:50 2016 -0800 Committer: Shixiong Zhu Committed: Thu Feb 25 12:28:50 2016 -0800 -- python/pyspark/streaming/tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6ddde8ed/python/pyspark/streaming/tests.py -- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index c136bf1..12293a0 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -423,7 +423,7 @@ class WindowFunctionTests(PySparkStreamingTestCase): .reduceByKeyAndWindow(operator.add, None, 5, 1)\ .filter(lambda kv: kv[1] > 0).count() -expected = [[2], [4], [6], [6], [6], [6]] +expected = [[1], [2], [3], [4], [5], [6]] self._test_func(input, func, expected) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Revert "[SPARK-13444][MLLIB] QuantileDiscretizer chooses bad splits on large DataFrames"
Repository: spark Updated Branches: refs/heads/branch-1.6 5f7440b25 -> d59a08f7c Revert "[SPARK-13444][MLLIB] QuantileDiscretizer chooses bad splits on large DataFrames" This reverts commit cb869a143d338985c3d99ef388dd78b1e3d90a73. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d59a08f7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d59a08f7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d59a08f7 Branch: refs/heads/branch-1.6 Commit: d59a08f7c1c455d86e7ee3d6522a3e9c55f9ee02 Parents: 5f7440b Author: Xiangrui MengAuthored: Thu Feb 25 12:28:03 2016 -0800 Committer: Xiangrui Meng Committed: Thu Feb 25 12:28:03 2016 -0800 -- .../spark/ml/feature/QuantileDiscretizer.scala | 11 ++- .../ml/feature/QuantileDiscretizerSuite.scala | 20 2 files changed, 2 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d59a08f7/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index cd5085a..7bf67c6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -97,13 +97,6 @@ final class QuantileDiscretizer(override val uid: String) @Since("1.6.0") object QuantileDiscretizer extends DefaultParamsReadable[QuantileDiscretizer] with Logging { - - /** - * Minimum number of samples required for finding splits, regardless of number of bins. If - * the dataset has fewer rows than this value, the entire dataset will be used. - */ - private[spark] val minSamplesRequired: Int = 1 - /** * Sampling from the given dataset to collect quantile statistics. */ @@ -111,8 +104,8 @@ object QuantileDiscretizer extends DefaultParamsReadable[QuantileDiscretizer] wi val totalSamples = dataset.count() require(totalSamples > 0, "QuantileDiscretizer requires non-empty input dataset but was given an empty input.") -val requiredSamples = math.max(numBins * numBins, minSamplesRequired) -val fraction = math.min(requiredSamples.toDouble / dataset.count(), 1.0) +val requiredSamples = math.max(numBins * numBins, 1) +val fraction = math.min(requiredSamples / dataset.count(), 1.0) dataset.sample(withReplacement = false, fraction, new XORShiftRandom().nextInt()).collect() } http://git-wip-us.apache.org/repos/asf/spark/blob/d59a08f7/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala index 32bfa43..3a4f6d2 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala @@ -71,26 +71,6 @@ class QuantileDiscretizerSuite } } - test("Test splits on dataset larger than minSamplesRequired") { -val sqlCtx = SQLContext.getOrCreate(sc) -import sqlCtx.implicits._ - -val datasetSize = QuantileDiscretizer.minSamplesRequired + 1 -val numBuckets = 5 -val df = sc.parallelize((1.0 to datasetSize by 1.0).map(Tuple1.apply)).toDF("input") -val discretizer = new QuantileDiscretizer() - .setInputCol("input") - .setOutputCol("result") - .setNumBuckets(numBuckets) - .setSeed(1) - -val result = discretizer.fit(df).transform(df) -val observedNumBuckets = result.select("result").distinct.count - -assert(observedNumBuckets === numBuckets, - "Observed number of buckets does not equal expected number of buckets.") - } - test("read/write") { val t = new QuantileDiscretizer() .setInputCol("myInputCol") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Revert "[SPARK-13457][SQL] Removes DataFrame RDD operations"
Repository: spark Updated Branches: refs/heads/master 46f6e7931 -> 751724b13 Revert "[SPARK-13457][SQL] Removes DataFrame RDD operations" This reverts commit 157fe64f3ecbd13b7286560286e50235eecfe30e. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/751724b1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/751724b1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/751724b1 Branch: refs/heads/master Commit: 751724b1320d38fd94186df3d8f1ca887f21947a Parents: 46f6e79 Author: Davies LiuAuthored: Thu Feb 25 11:53:48 2016 -0800 Committer: Davies Liu Committed: Thu Feb 25 11:53:48 2016 -0800 -- .../spark/examples/ml/DataFrameExample.scala| 2 +- .../spark/examples/ml/DecisionTreeExample.scala | 8 ++-- .../spark/examples/ml/OneVsRestExample.scala| 2 +- .../spark/examples/mllib/LDAExample.scala | 1 - .../apache/spark/examples/sql/RDDRelation.scala | 2 +- .../spark/examples/sql/hive/HiveFromSpark.scala | 2 +- .../scala/org/apache/spark/ml/Predictor.scala | 6 +-- .../ml/classification/LogisticRegression.scala | 13 +++--- .../spark/ml/clustering/BisectingKMeans.scala | 4 +- .../org/apache/spark/ml/clustering/KMeans.scala | 6 +-- .../org/apache/spark/ml/clustering/LDA.scala| 1 - .../BinaryClassificationEvaluator.scala | 9 +++-- .../MulticlassClassificationEvaluator.scala | 6 +-- .../ml/evaluation/RegressionEvaluator.scala | 3 +- .../apache/spark/ml/feature/ChiSqSelector.scala | 2 +- .../spark/ml/feature/CountVectorizer.scala | 2 +- .../scala/org/apache/spark/ml/feature/IDF.scala | 2 +- .../apache/spark/ml/feature/MinMaxScaler.scala | 2 +- .../apache/spark/ml/feature/OneHotEncoder.scala | 2 +- .../scala/org/apache/spark/ml/feature/PCA.scala | 2 +- .../spark/ml/feature/StandardScaler.scala | 2 +- .../apache/spark/ml/feature/StringIndexer.scala | 1 - .../apache/spark/ml/feature/VectorIndexer.scala | 2 +- .../org/apache/spark/ml/feature/Word2Vec.scala | 2 +- .../apache/spark/ml/recommendation/ALS.scala| 1 - .../ml/regression/AFTSurvivalRegression.scala | 2 +- .../ml/regression/IsotonicRegression.scala | 6 +-- .../spark/ml/regression/LinearRegression.scala | 16 +++- .../spark/mllib/api/python/PythonMLLibAPI.scala | 8 ++-- .../spark/mllib/clustering/KMeansModel.scala| 2 +- .../spark/mllib/clustering/LDAModel.scala | 4 +- .../clustering/PowerIterationClustering.scala | 2 +- .../BinaryClassificationMetrics.scala | 2 +- .../mllib/evaluation/MulticlassMetrics.scala| 2 +- .../mllib/evaluation/MultilabelMetrics.scala| 4 +- .../mllib/evaluation/RegressionMetrics.scala| 2 +- .../spark/mllib/feature/ChiSqSelector.scala | 2 +- .../org/apache/spark/mllib/fpm/FPGrowth.scala | 2 +- .../MatrixFactorizationModel.scala | 12 +++--- .../mllib/tree/model/DecisionTreeModel.scala| 2 +- .../mllib/tree/model/treeEnsembleModels.scala | 2 +- .../LogisticRegressionSuite.scala | 2 +- .../MultilayerPerceptronClassifierSuite.scala | 5 +-- .../ml/classification/OneVsRestSuite.scala | 6 +-- .../ml/clustering/BisectingKMeansSuite.scala| 3 +- .../spark/ml/clustering/KMeansSuite.scala | 3 +- .../apache/spark/ml/clustering/LDASuite.scala | 2 +- .../spark/ml/feature/OneHotEncoderSuite.scala | 4 +- .../spark/ml/feature/StringIndexerSuite.scala | 6 +-- .../spark/ml/feature/VectorIndexerSuite.scala | 5 +-- .../apache/spark/ml/feature/Word2VecSuite.scala | 8 ++-- .../spark/ml/recommendation/ALSSuite.scala | 7 ++-- .../spark/ml/regression/GBTRegressorSuite.scala | 2 +- .../ml/regression/IsotonicRegressionSuite.scala | 6 +-- .../ml/regression/LinearRegressionSuite.scala | 17 .../scala/org/apache/spark/sql/DataFrame.scala | 42 .../org/apache/spark/sql/GroupedData.scala | 1 - .../org/apache/spark/sql/api/r/SQLUtils.scala | 2 +- .../execution/datasources/jdbc/JdbcUtils.scala | 2 +- .../spark/sql/DataFrameAggregateSuite.scala | 2 +- .../parquet/ParquetFilterSuite.scala| 2 +- .../datasources/parquet/ParquetIOSuite.scala| 2 +- .../sql/execution/ui/SQLListenerSuite.scala | 4 +- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 2 +- .../sql/hive/execution/HiveQuerySuite.scala | 2 - .../spark/sql/hive/orc/OrcQuerySuite.scala | 5 +-- .../apache/spark/sql/hive/parquetSuites.scala | 2 +- 67 files changed, 159 insertions(+), 140 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/751724b1/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala
spark git commit: Revert "[SPARK-13117][WEB UI] WebUI should use the local ip not 0.0.0.0"
Repository: spark Updated Branches: refs/heads/master 5fcf4c2bf -> 46f6e7931 Revert "[SPARK-13117][WEB UI] WebUI should use the local ip not 0.0.0.0" This reverts commit 2e44031fafdb8cf486573b98e4faa6b31ffb90a4. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46f6e793 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46f6e793 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46f6e793 Branch: refs/heads/master Commit: 46f6e79316b72afea0c9b1559ea662dd3e95e57b Parents: 5fcf4c2 Author: Shixiong ZhuAuthored: Thu Feb 25 11:39:26 2016 -0800 Committer: Shixiong Zhu Committed: Thu Feb 25 11:39:26 2016 -0800 -- core/src/main/scala/org/apache/spark/ui/WebUI.scala | 2 +- .../scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala| 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/46f6e793/core/src/main/scala/org/apache/spark/ui/WebUI.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index e515916..fe4949b 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -134,7 +134,7 @@ private[spark] abstract class WebUI( def bind() { assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) try { - serverInfo = Some(startJettyServer(publicHostName, port, sslOptions, handlers, conf, name)) + serverInfo = Some(startJettyServer("0.0.0.0", port, sslOptions, handlers, conf, name)) logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort)) } catch { case e: Exception => http://git-wip-us.apache.org/repos/asf/spark/blob/46f6e793/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala index 972b552..f416ace 100644 --- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala @@ -26,7 +26,6 @@ import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSui import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded} import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.util.SparkConfWithEnv -import org.apache.spark.util.Utils class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext { @@ -54,7 +53,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext { } test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") { -val SPARK_PUBLIC_DNS = Utils.localHostNameForURI() +val SPARK_PUBLIC_DNS = "public_dns" val conf = new SparkConfWithEnv(Map("SPARK_PUBLIC_DNS" -> SPARK_PUBLIC_DNS)).set( "spark.extraListeners", classOf[SaveExecutorInfo].getName) sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12316] Wait a minutes to avoid cycle calling.
Repository: spark Updated Branches: refs/heads/branch-1.6 e3802a752 -> 5f7440b25 [SPARK-12316] Wait a minutes to avoid cycle calling. When application end, AM will clean the staging dir. But if the driver trigger to update the delegation token, it will can't find the right token file and then it will endless cycle call the method 'updateCredentialsIfRequired'. Then it lead driver StackOverflowError. https://issues.apache.org/jira/browse/SPARK-12316 Author: huangzhaoweiCloses #10475 from SaintBacchus/SPARK-12316. (cherry picked from commit 5fcf4c2bfce4b7e3543815c8e49ffdec8072c9a2) Signed-off-by: Tom Graves Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5f7440b2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5f7440b2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5f7440b2 Branch: refs/heads/branch-1.6 Commit: 5f7440b2529a0f6edfed5038756c004acecbce39 Parents: e3802a7 Author: huangzhaowei Authored: Thu Feb 25 09:14:19 2016 -0600 Committer: Tom Graves Committed: Thu Feb 25 09:14:36 2016 -0600 -- .../spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5f7440b2/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala index 94feb63..6febc70 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala @@ -76,7 +76,10 @@ private[spark] class ExecutorDelegationTokenUpdater( SparkHadoopUtil.get.getTimeFromNowToRenewal( sparkConf, 0.8, UserGroupInformation.getCurrentUser.getCredentials) if (timeFromNowToRenewal <= 0) { -executorUpdaterRunnable.run() +// We just checked for new credentials but none were there, wait a minute and retry. +// This handles the shutdown case where the staging directory may have been removed(see +// SPARK-12316 for more details). +delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.MINUTES) } else { logInfo(s"Scheduling token refresh from HDFS in $timeFromNowToRenewal millis.") delegationTokenRenewer.schedule( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12316] Wait a minutes to avoid cycle calling.
Repository: spark Updated Branches: refs/heads/master 157fe64f3 -> 5fcf4c2bf [SPARK-12316] Wait a minutes to avoid cycle calling. When application end, AM will clean the staging dir. But if the driver trigger to update the delegation token, it will can't find the right token file and then it will endless cycle call the method 'updateCredentialsIfRequired'. Then it lead driver StackOverflowError. https://issues.apache.org/jira/browse/SPARK-12316 Author: huangzhaoweiCloses #10475 from SaintBacchus/SPARK-12316. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5fcf4c2b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5fcf4c2b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5fcf4c2b Branch: refs/heads/master Commit: 5fcf4c2bfce4b7e3543815c8e49ffdec8072c9a2 Parents: 157fe64 Author: huangzhaowei Authored: Thu Feb 25 09:14:19 2016 -0600 Committer: Tom Graves Committed: Thu Feb 25 09:14:19 2016 -0600 -- .../spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5fcf4c2b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala index 9d99c0d..6474acc 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala @@ -76,7 +76,10 @@ private[spark] class ExecutorDelegationTokenUpdater( SparkHadoopUtil.get.getTimeFromNowToRenewal( sparkConf, 0.8, UserGroupInformation.getCurrentUser.getCredentials) if (timeFromNowToRenewal <= 0) { -executorUpdaterRunnable.run() +// We just checked for new credentials but none were there, wait a minute and retry. +// This handles the shutdown case where the staging directory may have been removed(see +// SPARK-12316 for more details). +delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.MINUTES) } else { logInfo(s"Scheduling token refresh from HDFS in $timeFromNowToRenewal millis.") delegationTokenRenewer.schedule( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13457][SQL] Removes DataFrame RDD operations
Repository: spark Updated Branches: refs/heads/master 4460113d4 -> 157fe64f3 [SPARK-13457][SQL] Removes DataFrame RDD operations ## What changes were proposed in this pull request? This PR removes DataFrame RDD operations. Original calls are now replaced by calls to methods of `DataFrame.rdd`. ## How was the this patch tested? No extra tests are added. Existing tests should do the work. Author: Cheng LianCloses #11323 from liancheng/remove-df-rdd-ops. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/157fe64f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/157fe64f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/157fe64f Branch: refs/heads/master Commit: 157fe64f3ecbd13b7286560286e50235eecfe30e Parents: 4460113 Author: Cheng Lian Authored: Thu Feb 25 23:07:59 2016 +0800 Committer: Cheng Lian Committed: Thu Feb 25 23:07:59 2016 +0800 -- .../spark/examples/ml/DataFrameExample.scala| 2 +- .../spark/examples/ml/DecisionTreeExample.scala | 8 ++-- .../spark/examples/ml/OneVsRestExample.scala| 2 +- .../spark/examples/mllib/LDAExample.scala | 1 + .../apache/spark/examples/sql/RDDRelation.scala | 2 +- .../spark/examples/sql/hive/HiveFromSpark.scala | 2 +- .../scala/org/apache/spark/ml/Predictor.scala | 6 ++- .../ml/classification/LogisticRegression.scala | 13 +++--- .../spark/ml/clustering/BisectingKMeans.scala | 4 +- .../org/apache/spark/ml/clustering/KMeans.scala | 6 +-- .../org/apache/spark/ml/clustering/LDA.scala| 1 + .../BinaryClassificationEvaluator.scala | 9 ++--- .../MulticlassClassificationEvaluator.scala | 6 +-- .../ml/evaluation/RegressionEvaluator.scala | 3 +- .../apache/spark/ml/feature/ChiSqSelector.scala | 2 +- .../spark/ml/feature/CountVectorizer.scala | 2 +- .../scala/org/apache/spark/ml/feature/IDF.scala | 2 +- .../apache/spark/ml/feature/MinMaxScaler.scala | 2 +- .../apache/spark/ml/feature/OneHotEncoder.scala | 2 +- .../scala/org/apache/spark/ml/feature/PCA.scala | 2 +- .../spark/ml/feature/StandardScaler.scala | 2 +- .../apache/spark/ml/feature/StringIndexer.scala | 1 + .../apache/spark/ml/feature/VectorIndexer.scala | 2 +- .../org/apache/spark/ml/feature/Word2Vec.scala | 2 +- .../apache/spark/ml/recommendation/ALS.scala| 1 + .../ml/regression/AFTSurvivalRegression.scala | 2 +- .../ml/regression/IsotonicRegression.scala | 6 +-- .../spark/ml/regression/LinearRegression.scala | 16 +--- .../spark/mllib/api/python/PythonMLLibAPI.scala | 8 ++-- .../spark/mllib/clustering/KMeansModel.scala| 2 +- .../spark/mllib/clustering/LDAModel.scala | 4 +- .../clustering/PowerIterationClustering.scala | 2 +- .../BinaryClassificationMetrics.scala | 2 +- .../mllib/evaluation/MulticlassMetrics.scala| 2 +- .../mllib/evaluation/MultilabelMetrics.scala| 4 +- .../mllib/evaluation/RegressionMetrics.scala| 2 +- .../spark/mllib/feature/ChiSqSelector.scala | 2 +- .../org/apache/spark/mllib/fpm/FPGrowth.scala | 2 +- .../MatrixFactorizationModel.scala | 12 +++--- .../mllib/tree/model/DecisionTreeModel.scala| 2 +- .../mllib/tree/model/treeEnsembleModels.scala | 2 +- .../LogisticRegressionSuite.scala | 2 +- .../MultilayerPerceptronClassifierSuite.scala | 5 ++- .../ml/classification/OneVsRestSuite.scala | 6 +-- .../ml/clustering/BisectingKMeansSuite.scala| 3 +- .../spark/ml/clustering/KMeansSuite.scala | 3 +- .../apache/spark/ml/clustering/LDASuite.scala | 2 +- .../spark/ml/feature/OneHotEncoderSuite.scala | 4 +- .../spark/ml/feature/StringIndexerSuite.scala | 6 +-- .../spark/ml/feature/VectorIndexerSuite.scala | 5 ++- .../apache/spark/ml/feature/Word2VecSuite.scala | 8 ++-- .../spark/ml/recommendation/ALSSuite.scala | 7 ++-- .../spark/ml/regression/GBTRegressorSuite.scala | 2 +- .../ml/regression/IsotonicRegressionSuite.scala | 6 +-- .../ml/regression/LinearRegressionSuite.scala | 17 .../scala/org/apache/spark/sql/DataFrame.scala | 42 .../org/apache/spark/sql/GroupedData.scala | 1 + .../org/apache/spark/sql/api/r/SQLUtils.scala | 2 +- .../execution/datasources/jdbc/JdbcUtils.scala | 2 +- .../spark/sql/DataFrameAggregateSuite.scala | 2 +- .../parquet/ParquetFilterSuite.scala| 2 +- .../datasources/parquet/ParquetIOSuite.scala| 2 +- .../sql/execution/ui/SQLListenerSuite.scala | 4 +- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 2 +- .../sql/hive/execution/HiveQuerySuite.scala | 2 + .../spark/sql/hive/orc/OrcQuerySuite.scala | 5 ++- .../apache/spark/sql/hive/parquetSuites.scala | 2 +- 67 files changed, 140 insertions(+), 159
spark git commit: [SPARK-13490][ML] ML LinearRegression should cache standardization param value
Repository: spark Updated Branches: refs/heads/master c98a93ded -> 4460113d4 [SPARK-13490][ML] ML LinearRegression should cache standardization param value ## What changes were proposed in this pull request? Like #11027 for ```LogisticRegression```, ```LinearRegression``` with L1 regularization should also cache the value of the ```standardization``` rather than re-fetching it from the ```ParamMap``` for every OWLQN iteration. cc srowen ## How was this patch tested? No extra tests are added. It should pass all existing tests. Author: Yanbo LiangCloses #11367 from yanboliang/spark-13490. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4460113d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4460113d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4460113d Branch: refs/heads/master Commit: 4460113d419b5da47ba3c956b8430fd00eb03217 Parents: c98a93d Author: Yanbo Liang Authored: Thu Feb 25 13:34:29 2016 + Committer: Sean Owen Committed: Thu Feb 25 13:34:29 2016 + -- .../scala/org/apache/spark/ml/regression/LinearRegression.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4460113d/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index e253f25..ccfb5c4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -277,8 +277,9 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String val optimizer = if ($(elasticNetParam) == 0.0 || effectiveRegParam == 0.0) { new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol)) } else { + val standardizationParam = $(standardization) def effectiveL1RegFun = (index: Int) => { -if ($(standardization)) { +if (standardizationParam) { effectiveL1RegParam } else { // If `standardization` is false, we still standardize the data - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13439][MESOS] Document that spark.mesos.uris is comma-separated
Repository: spark Updated Branches: refs/heads/branch-1.6 1f031635f -> e3802a752 [SPARK-13439][MESOS] Document that spark.mesos.uris is comma-separated Author: Michael GummeltCloses #11311 from mgummelt/document_csv. (cherry picked from commit c98a93ded36db5da2f3ebd519aa391de90927688) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e3802a75 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e3802a75 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e3802a75 Branch: refs/heads/branch-1.6 Commit: e3802a7522a83b91c84d0ee6f721a768a485774b Parents: 1f03163 Author: Michael Gummelt Authored: Thu Feb 25 13:32:09 2016 + Committer: Sean Owen Committed: Thu Feb 25 13:32:50 2016 + -- docs/running-on-mesos.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e3802a75/docs/running-on-mesos.md -- diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index ed720f1..13ad535 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -347,8 +347,9 @@ See the [configuration page](configuration.html) for information on Spark config spark.mesos.uris (none) -A list of URIs to be downloaded to the sandbox when driver or executor is launched by Mesos. -This applies to both coarse-grain and fine-grain mode. +A comma-separated list of URIs to be downloaded to the sandbox +when driver or executor is launched by Mesos. This applies to +both coarse-grained and fine-grained mode. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13439][MESOS] Document that spark.mesos.uris is comma-separated
Repository: spark Updated Branches: refs/heads/master fae88af18 -> c98a93ded [SPARK-13439][MESOS] Document that spark.mesos.uris is comma-separated Author: Michael GummeltCloses #11311 from mgummelt/document_csv. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c98a93de Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c98a93de Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c98a93de Branch: refs/heads/master Commit: c98a93ded36db5da2f3ebd519aa391de90927688 Parents: fae88af Author: Michael Gummelt Authored: Thu Feb 25 13:32:09 2016 + Committer: Sean Owen Committed: Thu Feb 25 13:32:09 2016 + -- docs/running-on-mesos.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c98a93de/docs/running-on-mesos.md -- diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index b9f64c7..9816d03 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -349,8 +349,9 @@ See the [configuration page](configuration.html) for information on Spark config spark.mesos.uris (none) -A list of URIs to be downloaded to the sandbox when driver or executor is launched by Mesos. -This applies to both coarse-grain and fine-grain mode. +A comma-separated list of URIs to be downloaded to the sandbox +when driver or executor is launched by Mesos. This applies to +both coarse-grained and fine-grained mode. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13441][YARN] Fix NPE in yarn Client.createConfArchive method
Repository: spark Updated Branches: refs/heads/branch-1.6 cb869a143 -> 1f031635f [SPARK-13441][YARN] Fix NPE in yarn Client.createConfArchive method ## What changes were proposed in this pull request? Instead of using result of File.listFiles() directly, which may throw NPE, check for null first. If it is null, log a warning instead ## How was the this patch tested? Ran the ./dev/run-tests locally Tested manually on a cluster Author: Terence YimCloses #11337 from chtyim/fixes/SPARK-13441-null-check. (cherry picked from commit fae88af18445c5a88212b4644e121de4b30ce027) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f031635 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f031635 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f031635 Branch: refs/heads/branch-1.6 Commit: 1f031635ffb4df472ad0d9c00bc82ebb601ebbb5 Parents: cb869a1 Author: Terence Yim Authored: Thu Feb 25 13:29:30 2016 + Committer: Sean Owen Committed: Thu Feb 25 13:29:41 2016 + -- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 11 --- 1 file changed, 8 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1f031635/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index f0590d2..7631aa3 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -539,9 +539,14 @@ private[spark] class Client( sys.env.get(envKey).foreach { path => val dir = new File(path) if (dir.isDirectory()) { - dir.listFiles().foreach { file => -if (file.isFile && !hadoopConfFiles.contains(file.getName())) { - hadoopConfFiles(file.getName()) = file + val files = dir.listFiles() + if (files == null) { +logWarning("Failed to list files under directory " + dir) + } else { +files.foreach { file => + if (file.isFile && !hadoopConfFiles.contains(file.getName())) { +hadoopConfFiles(file.getName()) = file + } } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13441][YARN] Fix NPE in yarn Client.createConfArchive method
Repository: spark Updated Branches: refs/heads/master 6f8e835c6 -> fae88af18 [SPARK-13441][YARN] Fix NPE in yarn Client.createConfArchive method ## What changes were proposed in this pull request? Instead of using result of File.listFiles() directly, which may throw NPE, check for null first. If it is null, log a warning instead ## How was the this patch tested? Ran the ./dev/run-tests locally Tested manually on a cluster Author: Terence YimCloses #11337 from chtyim/fixes/SPARK-13441-null-check. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fae88af1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fae88af1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fae88af1 Branch: refs/heads/master Commit: fae88af18445c5a88212b4644e121de4b30ce027 Parents: 6f8e835 Author: Terence Yim Authored: Thu Feb 25 13:29:30 2016 + Committer: Sean Owen Committed: Thu Feb 25 13:29:30 2016 + -- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 11 --- 1 file changed, 8 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fae88af1/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index d4ca255..530f1d7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -537,9 +537,14 @@ private[spark] class Client( sys.env.get(envKey).foreach { path => val dir = new File(path) if (dir.isDirectory()) { - dir.listFiles().foreach { file => -if (file.isFile && !hadoopConfFiles.contains(file.getName())) { - hadoopConfFiles(file.getName()) = file + val files = dir.listFiles() + if (files == null) { +logWarning("Failed to list files under directory " + dir) + } else { +files.foreach { file => + if (file.isFile && !hadoopConfFiles.contains(file.getName())) { +hadoopConfFiles(file.getName()) = file + } } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13444][MLLIB] QuantileDiscretizer chooses bad splits on large DataFrames
Repository: spark Updated Branches: refs/heads/branch-1.6 3cc938ac8 -> cb869a143 [SPARK-13444][MLLIB] QuantileDiscretizer chooses bad splits on large DataFrames Change line 113 of QuantileDiscretizer.scala to `val requiredSamples = math.max(numBins * numBins, 1.0)` so that `requiredSamples` is a `Double`. This will fix the division in line 114 which currently results in zero if `requiredSamples < dataset.count` Manual tests. I was having a problems using QuantileDiscretizer with my a dataset and after making this change QuantileDiscretizer behaves as expected. Author: Oliver PiersonAuthor: Oliver Pierson Closes #11319 from oliverpierson/SPARK-13444. (cherry picked from commit 6f8e835c68dff6fcf97326dc617132a41ff9d043) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb869a14 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb869a14 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb869a14 Branch: refs/heads/branch-1.6 Commit: cb869a143d338985c3d99ef388dd78b1e3d90a73 Parents: 3cc938a Author: Oliver Pierson Authored: Thu Feb 25 13:24:46 2016 + Committer: Sean Owen Committed: Thu Feb 25 13:27:10 2016 + -- .../spark/ml/feature/QuantileDiscretizer.scala | 11 +-- .../ml/feature/QuantileDiscretizerSuite.scala | 20 2 files changed, 29 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cb869a14/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index 7bf67c6..cd5085a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -97,6 +97,13 @@ final class QuantileDiscretizer(override val uid: String) @Since("1.6.0") object QuantileDiscretizer extends DefaultParamsReadable[QuantileDiscretizer] with Logging { + + /** + * Minimum number of samples required for finding splits, regardless of number of bins. If + * the dataset has fewer rows than this value, the entire dataset will be used. + */ + private[spark] val minSamplesRequired: Int = 1 + /** * Sampling from the given dataset to collect quantile statistics. */ @@ -104,8 +111,8 @@ object QuantileDiscretizer extends DefaultParamsReadable[QuantileDiscretizer] wi val totalSamples = dataset.count() require(totalSamples > 0, "QuantileDiscretizer requires non-empty input dataset but was given an empty input.") -val requiredSamples = math.max(numBins * numBins, 1) -val fraction = math.min(requiredSamples / dataset.count(), 1.0) +val requiredSamples = math.max(numBins * numBins, minSamplesRequired) +val fraction = math.min(requiredSamples.toDouble / dataset.count(), 1.0) dataset.sample(withReplacement = false, fraction, new XORShiftRandom().nextInt()).collect() } http://git-wip-us.apache.org/repos/asf/spark/blob/cb869a14/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala index 3a4f6d2..32bfa43 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala @@ -71,6 +71,26 @@ class QuantileDiscretizerSuite } } + test("Test splits on dataset larger than minSamplesRequired") { +val sqlCtx = SQLContext.getOrCreate(sc) +import sqlCtx.implicits._ + +val datasetSize = QuantileDiscretizer.minSamplesRequired + 1 +val numBuckets = 5 +val df = sc.parallelize((1.0 to datasetSize by 1.0).map(Tuple1.apply)).toDF("input") +val discretizer = new QuantileDiscretizer() + .setInputCol("input") + .setOutputCol("result") + .setNumBuckets(numBuckets) + .setSeed(1) + +val result = discretizer.fit(df).transform(df) +val observedNumBuckets = result.select("result").distinct.count + +assert(observedNumBuckets === numBuckets, + "Observed number of buckets does not equal expected number of buckets.") + } + test("read/write") { val t = new QuantileDiscretizer() .setInputCol("myInputCol")
spark git commit: [SPARK-13444][MLLIB] QuantileDiscretizer chooses bad splits on large DataFrames
Repository: spark Updated Branches: refs/heads/master 3fa6491be -> 6f8e835c6 [SPARK-13444][MLLIB] QuantileDiscretizer chooses bad splits on large DataFrames ## What changes were proposed in this pull request? Change line 113 of QuantileDiscretizer.scala to `val requiredSamples = math.max(numBins * numBins, 1.0)` so that `requiredSamples` is a `Double`. This will fix the division in line 114 which currently results in zero if `requiredSamples < dataset.count` ## How was the this patch tested? Manual tests. I was having a problems using QuantileDiscretizer with my a dataset and after making this change QuantileDiscretizer behaves as expected. Author: Oliver PiersonAuthor: Oliver Pierson Closes #11319 from oliverpierson/SPARK-13444. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f8e835c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f8e835c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f8e835c Branch: refs/heads/master Commit: 6f8e835c68dff6fcf97326dc617132a41ff9d043 Parents: 3fa6491 Author: Oliver Pierson Authored: Thu Feb 25 13:24:46 2016 + Committer: Sean Owen Committed: Thu Feb 25 13:24:46 2016 + -- .../spark/ml/feature/QuantileDiscretizer.scala | 11 +-- .../ml/feature/QuantileDiscretizerSuite.scala | 20 2 files changed, 29 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6f8e835c/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index 1f4cca1..769f440 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -103,6 +103,13 @@ final class QuantileDiscretizer(override val uid: String) @Since("1.6.0") object QuantileDiscretizer extends DefaultParamsReadable[QuantileDiscretizer] with Logging { + + /** + * Minimum number of samples required for finding splits, regardless of number of bins. If + * the dataset has fewer rows than this value, the entire dataset will be used. + */ + private[spark] val minSamplesRequired: Int = 1 + /** * Sampling from the given dataset to collect quantile statistics. */ @@ -110,8 +117,8 @@ object QuantileDiscretizer extends DefaultParamsReadable[QuantileDiscretizer] wi val totalSamples = dataset.count() require(totalSamples > 0, "QuantileDiscretizer requires non-empty input dataset but was given an empty input.") -val requiredSamples = math.max(numBins * numBins, 1) -val fraction = math.min(requiredSamples / dataset.count(), 1.0) +val requiredSamples = math.max(numBins * numBins, minSamplesRequired) +val fraction = math.min(requiredSamples.toDouble / dataset.count(), 1.0) dataset.sample(withReplacement = false, fraction, new XORShiftRandom(seed).nextInt()).collect() } http://git-wip-us.apache.org/repos/asf/spark/blob/6f8e835c/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala index 6a2c601..25fabf6 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala @@ -71,6 +71,26 @@ class QuantileDiscretizerSuite } } + test("Test splits on dataset larger than minSamplesRequired") { +val sqlCtx = SQLContext.getOrCreate(sc) +import sqlCtx.implicits._ + +val datasetSize = QuantileDiscretizer.minSamplesRequired + 1 +val numBuckets = 5 +val df = sc.parallelize((1.0 to datasetSize by 1.0).map(Tuple1.apply)).toDF("input") +val discretizer = new QuantileDiscretizer() + .setInputCol("input") + .setOutputCol("result") + .setNumBuckets(numBuckets) + .setSeed(1) + +val result = discretizer.fit(df).transform(df) +val observedNumBuckets = result.select("result").distinct.count + +assert(observedNumBuckets === numBuckets, + "Observed number of buckets does not equal expected number of buckets.") + } + test("read/write") { val t = new QuantileDiscretizer() .setInputCol("myInputCol") - To unsubscribe,
spark git commit: [SPARK-13473][SQL] Don't push predicate through project with nondeterministic field(s)
Repository: spark Updated Branches: refs/heads/branch-1.5 d2c1c67cf -> 0e920411f [SPARK-13473][SQL] Don't push predicate through project with nondeterministic field(s) ## What changes were proposed in this pull request? Predicates shouldn't be pushed through project with nondeterministic field(s). See https://github.com/graphframes/graphframes/pull/23 and SPARK-13473 for more details. This PR targets master, branch-1.6, and branch-1.5. ## How was this patch tested? A test case is added in `FilterPushdownSuite`. It constructs a query plan where a filter is over a project with a nondeterministic field. Optimized query plan shouldn't change in this case. Author: Cheng LianCloses #11348 from liancheng/spark-13473-no-ppd-through-nondeterministic-project-field. (cherry picked from commit 3fa6491be66dad690ca5329dd32e7c82037ae8c1) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e920411 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e920411 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e920411 Branch: refs/heads/branch-1.5 Commit: 0e920411f69178b0c1aac80797c4af067a2ba0c6 Parents: d2c1c67 Author: Cheng Lian Authored: Thu Feb 25 20:43:03 2016 +0800 Committer: Wenchen Fan Committed: Thu Feb 25 20:46:32 2016 +0800 -- .../sql/catalyst/optimizer/Optimizer.scala | 9 ++- .../optimizer/FilterPushdownSuite.scala | 27 +++- 2 files changed, 11 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0e920411/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d2b5c4f..c51de49 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -586,7 +586,14 @@ object SimplifyFilters extends Rule[LogicalPlan] { */ object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case filter @ Filter(condition, project @ Project(fields, grandChild)) => +// SPARK-13473: We can't push the predicate down when the underlying projection output non- +// deterministic field(s). Non-deterministic expressions are essentially stateful. This +// implies that, for a given input row, the output are determined by the expression's initial +// state and all the input rows processed before. In another word, the order of input rows +// matters for non-deterministic expressions, while pushing down predicates changes the order. +case filter @ Filter(condition, project @ Project(fields, grandChild)) + if fields.forall(_.deterministic) => + // Create a map of Aliases to their values from the child projection. // e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b). val aliasMap = AttributeMap(fields.collect { http://git-wip-us.apache.org/repos/asf/spark/blob/0e920411/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 0f1fde2..e6d651b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -146,7 +146,7 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - test("nondeterministic: can't push down filter through project") { + test("nondeterministic: can't push down filter with nondeterministic condition through project") { val originalQuery = testRelation .select(Rand(10).as('rand), 'a) .where('rand > 5 || 'a > 5) @@ -157,36 +157,15 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, originalQuery) } - test("nondeterministic: push down part of filter through project") { + test("nondeterministic: can't push down filter through project with nondeterministic field") { val originalQuery = testRelation .select(Rand(10).as('rand), 'a) - .where('rand > 5 && 'a > 5) -
spark git commit: [SPARK-13473][SQL] Don't push predicate through project with nondeterministic field(s)
Repository: spark Updated Branches: refs/heads/branch-1.6 897599601 -> 3cc938ac8 [SPARK-13473][SQL] Don't push predicate through project with nondeterministic field(s) ## What changes were proposed in this pull request? Predicates shouldn't be pushed through project with nondeterministic field(s). See https://github.com/graphframes/graphframes/pull/23 and SPARK-13473 for more details. This PR targets master, branch-1.6, and branch-1.5. ## How was this patch tested? A test case is added in `FilterPushdownSuite`. It constructs a query plan where a filter is over a project with a nondeterministic field. Optimized query plan shouldn't change in this case. Author: Cheng LianCloses #11348 from liancheng/spark-13473-no-ppd-through-nondeterministic-project-field. (cherry picked from commit 3fa6491be66dad690ca5329dd32e7c82037ae8c1) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cc938ac Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cc938ac Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cc938ac Branch: refs/heads/branch-1.6 Commit: 3cc938ac8124b8445f171baa365fa44a47962cc9 Parents: 8975996 Author: Cheng Lian Authored: Thu Feb 25 20:43:03 2016 +0800 Committer: Wenchen Fan Committed: Thu Feb 25 20:45:18 2016 +0800 -- .../sql/catalyst/optimizer/Optimizer.scala | 9 ++- .../optimizer/FilterPushdownSuite.scala | 27 +++- 2 files changed, 11 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3cc938ac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 06d14fc..682b860 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -618,7 +618,14 @@ object SimplifyFilters extends Rule[LogicalPlan] { */ object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case filter @ Filter(condition, project @ Project(fields, grandChild)) => +// SPARK-13473: We can't push the predicate down when the underlying projection output non- +// deterministic field(s). Non-deterministic expressions are essentially stateful. This +// implies that, for a given input row, the output are determined by the expression's initial +// state and all the input rows processed before. In another word, the order of input rows +// matters for non-deterministic expressions, while pushing down predicates changes the order. +case filter @ Filter(condition, project @ Project(fields, grandChild)) + if fields.forall(_.deterministic) => + // Create a map of Aliases to their values from the child projection. // e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b). val aliasMap = AttributeMap(fields.collect { http://git-wip-us.apache.org/repos/asf/spark/blob/3cc938ac/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index fba4c5c..6978807 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -147,7 +147,7 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - test("nondeterministic: can't push down filter through project") { + test("nondeterministic: can't push down filter with nondeterministic condition through project") { val originalQuery = testRelation .select(Rand(10).as('rand), 'a) .where('rand > 5 || 'a > 5) @@ -158,36 +158,15 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, originalQuery) } - test("nondeterministic: push down part of filter through project") { + test("nondeterministic: can't push down filter through project with nondeterministic field") { val originalQuery = testRelation .select(Rand(10).as('rand), 'a) - .where('rand > 5 && 'a > 5) -
spark git commit: [SPARK-13473][SQL] Don't push predicate through project with nondeterministic field(s)
Repository: spark Updated Branches: refs/heads/master 2e44031fa -> 3fa6491be [SPARK-13473][SQL] Don't push predicate through project with nondeterministic field(s) ## What changes were proposed in this pull request? Predicates shouldn't be pushed through project with nondeterministic field(s). See https://github.com/graphframes/graphframes/pull/23 and SPARK-13473 for more details. This PR targets master, branch-1.6, and branch-1.5. ## How was this patch tested? A test case is added in `FilterPushdownSuite`. It constructs a query plan where a filter is over a project with a nondeterministic field. Optimized query plan shouldn't change in this case. Author: Cheng LianCloses #11348 from liancheng/spark-13473-no-ppd-through-nondeterministic-project-field. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3fa6491b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3fa6491b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3fa6491b Branch: refs/heads/master Commit: 3fa6491be66dad690ca5329dd32e7c82037ae8c1 Parents: 2e44031 Author: Cheng Lian Authored: Thu Feb 25 20:43:03 2016 +0800 Committer: Wenchen Fan Committed: Thu Feb 25 20:43:03 2016 +0800 -- .../sql/catalyst/optimizer/Optimizer.scala | 9 ++- .../optimizer/FilterPushdownSuite.scala | 27 +++- 2 files changed, 11 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3fa6491b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2b80497..2aeb957 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -792,7 +792,14 @@ object SimplifyFilters extends Rule[LogicalPlan] { */ object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case filter @ Filter(condition, project @ Project(fields, grandChild)) => +// SPARK-13473: We can't push the predicate down when the underlying projection output non- +// deterministic field(s). Non-deterministic expressions are essentially stateful. This +// implies that, for a given input row, the output are determined by the expression's initial +// state and all the input rows processed before. In another word, the order of input rows +// matters for non-deterministic expressions, while pushing down predicates changes the order. +case filter @ Filter(condition, project @ Project(fields, grandChild)) + if fields.forall(_.deterministic) => + // Create a map of Aliases to their values from the child projection. // e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b). val aliasMap = AttributeMap(fields.collect { http://git-wip-us.apache.org/repos/asf/spark/blob/3fa6491b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 7d60862..1292aa0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -98,7 +98,7 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - test("nondeterministic: can't push down filter through project") { + test("nondeterministic: can't push down filter with nondeterministic condition through project") { val originalQuery = testRelation .select(Rand(10).as('rand), 'a) .where('rand > 5 || 'a > 5) @@ -109,36 +109,15 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, originalQuery) } - test("nondeterministic: push down part of filter through project") { + test("nondeterministic: can't push down filter through project with nondeterministic field") { val originalQuery = testRelation .select(Rand(10).as('rand), 'a) - .where('rand > 5 && 'a > 5) - .analyze - -val optimized = Optimize.execute(originalQuery) - -val correctAnswer = testRelation .where('a > 5) -
spark git commit: [SPARK-13117][WEB UI] WebUI should use the local ip not 0.0.0.0
Repository: spark Updated Branches: refs/heads/master 2b2c8c332 -> 2e44031fa [SPARK-13117][WEB UI] WebUI should use the local ip not 0.0.0.0 Fixed the HTTP Server Host Name/IP issue i.e. HTTP Server to take the configured host name/IP and not '0.0.0.0' always. Author: Devaraj KCloses #11133 from devaraj-kavali/SPARK-13117. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2e44031f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2e44031f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2e44031f Branch: refs/heads/master Commit: 2e44031fafdb8cf486573b98e4faa6b31ffb90a4 Parents: 2b2c8c3 Author: Devaraj K Authored: Thu Feb 25 12:18:43 2016 + Committer: Sean Owen Committed: Thu Feb 25 12:18:43 2016 + -- core/src/main/scala/org/apache/spark/ui/WebUI.scala | 2 +- .../scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala| 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2e44031f/core/src/main/scala/org/apache/spark/ui/WebUI.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index fe4949b..e515916 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -134,7 +134,7 @@ private[spark] abstract class WebUI( def bind() { assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) try { - serverInfo = Some(startJettyServer("0.0.0.0", port, sslOptions, handlers, conf, name)) + serverInfo = Some(startJettyServer(publicHostName, port, sslOptions, handlers, conf, name)) logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort)) } catch { case e: Exception => http://git-wip-us.apache.org/repos/asf/spark/blob/2e44031f/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala index f416ace..972b552 100644 --- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSui import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded} import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.util.SparkConfWithEnv +import org.apache.spark.util.Utils class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext { @@ -53,7 +54,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext { } test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") { -val SPARK_PUBLIC_DNS = "public_dns" +val SPARK_PUBLIC_DNS = Utils.localHostNameForURI() val conf = new SparkConfWithEnv(Map("SPARK_PUBLIC_DNS" -> SPARK_PUBLIC_DNS)).set( "spark.extraListeners", classOf[SaveExecutorInfo].getName) sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[3/3] spark git commit: [SPARK-13486][SQL] Move SQLConf into an internal package
[SPARK-13486][SQL] Move SQLConf into an internal package ## What changes were proposed in this pull request? This patch moves SQLConf into org.apache.spark.sql.internal package to make it very explicit that it is internal. Soon I will also submit more API work that creates implementations of interfaces in this internal package. ## How was this patch tested? If it compiles, then the refactoring should work. Author: Reynold XinCloses #11363 from rxin/SPARK-13486. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b2c8c33 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b2c8c33 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b2c8c33 Branch: refs/heads/master Commit: 2b2c8c33236677c916541f956f7b94bba014a9ce Parents: 07f92ef Author: Reynold Xin Authored: Thu Feb 25 17:49:50 2016 +0800 Committer: Cheng Lian Committed: Thu Feb 25 17:49:50 2016 +0800 -- project/MimaExcludes.scala | 6 + .../scala/org/apache/spark/sql/DataFrame.scala | 1 + .../org/apache/spark/sql/GroupedData.scala | 1 + .../scala/org/apache/spark/sql/SQLConf.scala| 730 --- .../scala/org/apache/spark/sql/SQLContext.scala | 3 +- .../spark/sql/execution/ExistingRDD.scala | 3 +- .../spark/sql/execution/SparkStrategies.scala | 3 +- .../apache/spark/sql/execution/commands.scala | 3 +- .../InsertIntoHadoopFsRelation.scala| 1 + .../execution/datasources/SqlNewHadoopRDD.scala | 3 +- .../execution/datasources/WriterContainer.scala | 1 + .../parquet/CatalystSchemaConverter.scala | 4 +- .../parquet/CatalystWriteSupport.scala | 2 +- .../datasources/parquet/ParquetRelation.scala | 1 + .../spark/sql/execution/debug/package.scala | 1 + .../execution/local/BinaryHashJoinNode.scala| 2 +- .../execution/local/BroadcastHashJoinNode.scala | 2 +- .../sql/execution/local/ConvertToSafeNode.scala | 2 +- .../execution/local/ConvertToUnsafeNode.scala | 2 +- .../spark/sql/execution/local/ExpandNode.scala | 2 +- .../spark/sql/execution/local/FilterNode.scala | 2 +- .../sql/execution/local/IntersectNode.scala | 4 +- .../spark/sql/execution/local/LimitNode.scala | 2 +- .../spark/sql/execution/local/LocalNode.scala | 3 +- .../execution/local/NestedLoopJoinNode.scala| 2 +- .../spark/sql/execution/local/ProjectNode.scala | 2 +- .../spark/sql/execution/local/SampleNode.scala | 2 +- .../spark/sql/execution/local/SeqScanNode.scala | 2 +- .../local/TakeOrderedAndProjectNode.scala | 2 +- .../spark/sql/execution/local/UnionNode.scala | 2 +- .../org/apache/spark/sql/internal/SQLConf.scala | 730 +++ .../apache/spark/sql/internal/package-info.java | 22 + .../org/apache/spark/sql/internal/package.scala | 24 + .../spark/sql/DataFrameAggregateSuite.scala | 1 + .../apache/spark/sql/DataFramePivotSuite.scala | 1 + .../org/apache/spark/sql/DataFrameSuite.scala | 1 + .../scala/org/apache/spark/sql/JoinSuite.scala | 1 + .../spark/sql/MultiSQLContextsSuite.scala | 1 + .../scala/org/apache/spark/sql/QueryTest.scala | 1 + .../apache/spark/sql/SQLConfEntrySuite.scala| 150 .../org/apache/spark/sql/SQLConfSuite.scala | 132 .../org/apache/spark/sql/SQLContextSuite.scala | 1 + .../org/apache/spark/sql/SQLQuerySuite.scala| 1 + .../execution/ExchangeCoordinatorSuite.scala| 1 + .../spark/sql/execution/PlannerSuite.scala | 3 +- .../columnar/PartitionBatchPruningSuite.scala | 1 + .../execution/datasources/json/JsonSuite.scala | 1 + .../parquet/ParquetFilterSuite.scala| 1 + .../datasources/parquet/ParquetIOSuite.scala| 1 + .../ParquetPartitionDiscoverySuite.scala| 1 + .../datasources/parquet/ParquetQuerySuite.scala | 1 + .../parquet/ParquetReadBenchmark.scala | 3 +- .../datasources/parquet/ParquetTest.scala | 3 +- .../sql/execution/joins/InnerJoinSuite.scala| 3 +- .../sql/execution/joins/OuterJoinSuite.scala| 3 +- .../sql/execution/joins/SemiJoinSuite.scala | 3 +- .../spark/sql/execution/local/DummyNode.scala | 2 +- .../sql/execution/local/HashJoinNodeSuite.scala | 2 +- .../sql/execution/local/LocalNodeTest.scala | 2 +- .../local/NestedLoopJoinNodeSuite.scala | 2 +- .../sql/execution/metric/SQLMetricsSuite.scala | 1 + .../spark/sql/internal/SQLConfEntrySuite.scala | 150 .../spark/sql/internal/SQLConfSuite.scala | 133 .../spark/sql/sources/DataSourceTest.scala | 1 + .../spark/sql/sources/FilteredScanSuite.scala | 1 + .../spark/sql/sources/PrunedScanSuite.scala | 1 + .../spark/sql/sources/SaveLoadSuite.scala | 3 +-
[1/3] spark git commit: [SPARK-13486][SQL] Move SQLConf into an internal package
Repository: spark Updated Branches: refs/heads/master 07f92ef1f -> 2b2c8c332 http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala index c89a151..28ad7ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.test import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.{SQLConf, SQLContext} - +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.internal.SQLConf /** * A special [[SQLContext]] prepared for testing. @@ -39,7 +39,7 @@ private[sql] class TestSQLContext(sc: SparkContext) extends SQLContext(sc) { sel super.clear() // Make sure we start with the default test configs even after clear - TestSQLContext.overrideConfs.map { + TestSQLContext.overrideConfs.foreach { case (key, value) => setConfString(key, value) } } http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 66eaa3e..f32ba5f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -32,10 +32,10 @@ import org.apache.hive.service.server.{HiveServer2, HiveServerServerOptionsProce import org.apache.spark.{Logging, SparkContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart} -import org.apache.spark.sql.SQLConf import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{ShutdownHookManager, Utils} /** http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 8fef22c..458d4f2 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -33,9 +33,10 @@ import org.apache.hive.service.cli.operation.ExecuteStatementOperation import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.Logging -import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLConf} +import org.apache.spark.sql.{DataFrame, Row => SparkRow} import org.apache.spark.sql.execution.SetCommand import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.{Utils => SparkUtils} http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala -- diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 5f9952a..c05527b 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -202,7 +202,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } test("test multiple session") { -import org.apache.spark.sql.SQLConf +import org.apache.spark.sql.internal.SQLConf var defaultV1: String = null var defaultV2: String = null var data: ArrayBuffer[Int] = null
spark git commit: [SPARK-13376] [SPARK-13476] [SQL] improve column pruning
Repository: spark Updated Branches: refs/heads/master 264533b55 -> 07f92ef1f [SPARK-13376] [SPARK-13476] [SQL] improve column pruning ## What changes were proposed in this pull request? This PR mostly rewrite the ColumnPruning rule to support most of the SQL logical plans (except those for Dataset). This PR also fix a bug in Generate, it should always output UnsafeRow, added an regression test for that. ## How was this patch tested? This is test by unit tests, also manually test with TPCDS Q78, which could prune all unused columns successfully, improved the performance by 78% (from 22s to 12s). Author: Davies LiuCloses #11354 from davies/fix_column_pruning. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07f92ef1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07f92ef1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07f92ef1 Branch: refs/heads/master Commit: 07f92ef1fa090821bef9c60689bf41909d781ee7 Parents: 264533b Author: Davies Liu Authored: Thu Feb 25 00:13:07 2016 -0800 Committer: Davies Liu Committed: Thu Feb 25 00:13:07 2016 -0800 -- .../sql/catalyst/optimizer/Optimizer.scala | 128 +-- .../catalyst/optimizer/ColumnPruningSuite.scala | 128 ++- .../optimizer/FilterPushdownSuite.scala | 80 .../optimizer/JoinOptimizationSuite.scala | 2 +- .../apache/spark/sql/execution/Generate.scala | 28 ++-- .../columnar/InMemoryColumnarTableScan.scala| 7 +- .../org/apache/spark/sql/DataFrameSuite.scala | 8 ++ 7 files changed, 215 insertions(+), 166 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/07f92ef1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 1f05f20..2b80497 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -313,97 +313,85 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { */ object ColumnPruning extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case a @ Aggregate(_, _, e @ Expand(projects, output, child)) - if (e.outputSet -- a.references).nonEmpty => - val newOutput = output.filter(a.references.contains(_)) - val newProjects = projects.map { proj => -proj.zip(output).filter { case (e, a) => +// Prunes the unused columns from project list of Project/Aggregate/Window/Expand +case p @ Project(_, p2: Project) if (p2.outputSet -- p.references).nonEmpty => + p.copy(child = p2.copy(projectList = p2.projectList.filter(p.references.contains))) +case p @ Project(_, a: Aggregate) if (a.outputSet -- p.references).nonEmpty => + p.copy( +child = a.copy(aggregateExpressions = a.aggregateExpressions.filter(p.references.contains))) +case p @ Project(_, w: Window) if (w.outputSet -- p.references).nonEmpty => + p.copy(child = w.copy( +projectList = w.projectList.filter(p.references.contains), +windowExpressions = w.windowExpressions.filter(p.references.contains))) +case a @ Project(_, e @ Expand(_, _, grandChild)) if (e.outputSet -- a.references).nonEmpty => + val newOutput = e.output.filter(a.references.contains(_)) + val newProjects = e.projections.map { proj => +proj.zip(e.output).filter { case (e, a) => newOutput.contains(a) }.unzip._1 } - a.copy(child = Expand(newProjects, newOutput, child)) + a.copy(child = Expand(newProjects, newOutput, grandChild)) +// TODO: support some logical plan for Dataset -case a @ Aggregate(_, _, e @ Expand(_, _, child)) - if (child.outputSet -- e.references -- a.references).nonEmpty => - a.copy(child = e.copy(child = prunedChild(child, e.references ++ a.references))) - -// Eliminate attributes that are not needed to calculate the specified aggregates. +// Prunes the unused columns from child of Aggregate/Window/Expand/Generate case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty => - a.copy(child = Project(a.references.toSeq, child)) - -// Eliminate attributes that are not needed to calculate the Generate. + a.copy(child = prunedChild(child, a.references)) +case w @ Window(_, _, _, _, child) if (child.outputSet -- w.references).nonEmpty => +