spark git commit: [SPARK-13503][SQL] Support to specify the (writing) option for compression codec for TEXT

2016-02-25 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 26ac60806 -> 9812a24aa


[SPARK-13503][SQL] Support to specify the (writing) option for compression 
codec for TEXT

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-13503
This PR makes the TEXT datasource can compress output by option instead of 
manually setting Hadoop configurations.
For reflecting codec by names, it is similar with 
https://github.com/apache/spark/pull/10805 and 
https://github.com/apache/spark/pull/10858.

## How was this patch tested?

This was tested with unittests and with `dev/run_tests` for coding style

Author: hyukjinkwon 

Closes #11384 from HyukjinKwon/SPARK-13503.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9812a24a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9812a24a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9812a24a

Branch: refs/heads/master
Commit: 9812a24aa80713cd7a8d89f6abe5aadf1e567cc2
Parents: 26ac608
Author: hyukjinkwon 
Authored: Thu Feb 25 23:57:29 2016 -0800
Committer: Reynold Xin 
Committed: Thu Feb 25 23:57:29 2016 -0800

--
 .../datasources/CompressionCodecs.scala | 14 
 .../execution/datasources/csv/CSVRelation.scala | 36 +---
 .../datasources/json/JSONRelation.scala |  6 +---
 .../datasources/text/DefaultSource.scala| 13 +--
 .../execution/datasources/text/TextSuite.scala  | 15 
 5 files changed, 57 insertions(+), 27 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9812a24a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
index e683a95..bc8ef4a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.{BZip2Codec, GzipCodec, Lz4Codec, 
SnappyCodec}
 
 import org.apache.spark.util.Utils
@@ -44,4 +46,16 @@ private[datasources] object CompressionCodecs {
   s"is not available. Known codecs are 
${shortCompressionCodecNames.keys.mkString(", ")}.")
 }
   }
+
+  /**
+   * Set compression configurations to Hadoop `Configuration`.
+   * `codec` should be a full class path
+   */
+  def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
+conf.set("mapreduce.output.fileoutputformat.compress", "true")
+conf.set("mapreduce.output.fileoutputformat.compress.type", 
CompressionType.BLOCK.toString)
+conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
+conf.set("mapreduce.map.output.compress", "true")
+conf.set("mapreduce.map.output.compress.codec", codec)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9812a24a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index da945c4..e9afee1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -24,7 +24,6 @@ import scala.util.control.NonFatal
 import com.google.common.base.Objects
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
-import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.mapred.TextInputFormat
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 import org.apache.hadoop.mapreduce.RecordWriter
@@ -34,6 +33,7 @@ import org.apache.spark.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.CompressionCodecs
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 
@@ -50,16 +50,16 @@ private[sql] class CSVRelation(
 case None => inferSchema(paths)
   }
 
-  private val params = new CSVOptions(parameters)
+  private val options 

spark git commit: [SPARK-13487][SQL] User-facing RuntimeConfig interface

2016-02-25 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 8afe49141 -> 26ac60806


[SPARK-13487][SQL] User-facing RuntimeConfig interface

## What changes were proposed in this pull request?
This patch creates the public API for runtime configuration and an 
implementation for it. The public runtime configuration includes configs for 
existing SQL, as well as Hadoop Configuration.

This new interface is currently dead code. It will be added to SQLContext and a 
session entry point to Spark when we add that.

## How was this patch tested?
a new unit test suite

Author: Reynold Xin 

Closes #11378 from rxin/SPARK-13487.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26ac6080
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26ac6080
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26ac6080

Branch: refs/heads/master
Commit: 26ac60806cc23527ea8a75986c1eab83d312a15d
Parents: 8afe491
Author: Reynold Xin 
Authored: Thu Feb 25 23:10:40 2016 -0800
Committer: Yin Huai 
Committed: Thu Feb 25 23:10:40 2016 -0800

--
 .../org/apache/spark/sql/RuntimeConfig.scala| 100 +++
 .../spark/sql/internal/RuntimeConfigImpl.scala  |  73 ++
 .../org/apache/spark/sql/internal/SQLConf.scala |   3 +-
 .../spark/sql/internal/RuntimeConfigSuite.scala |  86 
 4 files changed, 261 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/26ac6080/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
new file mode 100644
index 000..e90a042
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+/**
+ * Runtime configuration interface for Spark. To access this, use 
`SparkSession.conf`.
+ *
+ * @since 2.0.0
+ */
+abstract class RuntimeConfig {
+
+  /**
+   * Sets the given Spark runtime configuration property.
+   *
+   * @since 2.0.0
+   */
+  def set(key: String, value: String): RuntimeConfig
+
+  /**
+   * Sets the given Spark runtime configuration property.
+   *
+   * @since 2.0.0
+   */
+  def set(key: String, value: Boolean): RuntimeConfig
+
+  /**
+   * Sets the given Spark runtime configuration property.
+   *
+   * @since 2.0.0
+   */
+  def set(key: String, value: Long): RuntimeConfig
+
+  /**
+   * Returns the value of Spark runtime configuration property for the given 
key.
+   *
+   * @throws NoSuchElementException if the key is not set and does not have a 
default value
+   * @since 2.0.0
+   */
+  @throws[NoSuchElementException]("if the key is not set")
+  def get(key: String): String
+
+  /**
+   * Returns the value of Spark runtime configuration property for the given 
key.
+   *
+   * @since 2.0.0
+   */
+  def getOption(key: String): Option[String]
+
+  /**
+   * Resets the configuration property for the given key.
+   *
+   * @since 2.0.0
+   */
+  def unset(key: String): Unit
+
+  /**
+   * Sets the given Hadoop configuration property. This is passed directly to 
Hadoop during I/O.
+   *
+   * @since 2.0.0
+   */
+  def setHadoop(key: String, value: String): RuntimeConfig
+
+  /**
+   * Returns the value of the Hadoop configuration property.
+   *
+   * @throws NoSuchElementException if the key is not set
+   * @since 2.0.0
+   */
+  @throws[NoSuchElementException]("if the key is not set")
+  def getHadoop(key: String): String
+
+  /**
+   * Returns the value of the Hadoop configuration property.
+   *
+   * @since 2.0.0
+   */
+  def getHadoopOption(key: String): Option[String]
+
+  /**
+   * Resets the Hadoop configuration property for the given key.
+   *
+   * @since 2.0.0
+   */
+  def unsetHadoop(key: String): Unit
+}


spark git commit: [SPARK-12941][SQL][MASTER] Spark-SQL JDBC Oracle dialect fails to map string datatypes to Oracle VARCHAR datatype

2016-02-25 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 50e60e36f -> 8afe49141


[SPARK-12941][SQL][MASTER] Spark-SQL JDBC Oracle dialect fails to map string 
datatypes to Oracle VARCHAR datatype

## What changes were proposed in this pull request?

This Pull request is used for the fix SPARK-12941, creating a data type mapping 
to Oracle for the corresponding data type"Stringtype" from dataframe. This PR 
is for the master branch fix, where as another PR is already tested with the 
branch 1.4

## How was the this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, 
manual tests)
This patch was tested using the Oracle docker .Created a new integration suite 
for the same.The oracle.jdbc jar was to be downloaded from the maven 
repository.Since there was no jdbc jar available in the maven repository, the 
jar was downloaded from oracle site manually and installed in the local; thus 
tested. So, for SparkQA test case run, the ojdbc jar might be manually placed 
in the local maven repository(com/oracle/ojdbc6/11.2.0.2.0) while Spark QA test 
run.

Author: thomastechs 

Closes #11306 from thomastechs/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8afe4914
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8afe4914
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8afe4914

Branch: refs/heads/master
Commit: 8afe49141d9b6a603eb3907f32dce802a3d05172
Parents: 50e60e3
Author: thomastechs 
Authored: Thu Feb 25 22:52:25 2016 -0800
Committer: Yin Huai 
Committed: Thu Feb 25 22:52:25 2016 -0800

--
 docker-integration-tests/pom.xml| 13 
 .../spark/sql/jdbc/OracleIntegrationSuite.scala | 80 
 .../apache/spark/sql/jdbc/OracleDialect.scala   |  5 ++
 3 files changed, 98 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8afe4914/docker-integration-tests/pom.xml
--
diff --git a/docker-integration-tests/pom.xml b/docker-integration-tests/pom.xml
index 833ca29..048e58d 100644
--- a/docker-integration-tests/pom.xml
+++ b/docker-integration-tests/pom.xml
@@ -131,6 +131,19 @@
   postgresql
   test
 
+
+
 

spark git commit: [SPARK-13504] [SPARKR] Add approxQuantile for SparkR

2016-02-25 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master f3be369ef -> 50e60e36f


[SPARK-13504] [SPARKR] Add approxQuantile for SparkR

## What changes were proposed in this pull request?
Add ```approxQuantile``` for SparkR.
## How was this patch tested?
unit tests

Author: Yanbo Liang 

Closes #11383 from yanboliang/spark-13504 and squashes the following commits:

4f17adb [Yanbo Liang] Add approxQuantile for SparkR


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50e60e36
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50e60e36
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50e60e36

Branch: refs/heads/master
Commit: 50e60e36f7775a10cf39338e7c5716578a24d89f
Parents: f3be369
Author: Yanbo Liang 
Authored: Thu Feb 25 21:23:41 2016 -0800
Committer: Xiangrui Meng 
Committed: Thu Feb 25 21:23:41 2016 -0800

--
 R/pkg/NAMESPACE   |  1 +
 R/pkg/R/generics.R|  7 +
 R/pkg/R/stats.R   | 39 ++
 R/pkg/inst/tests/testthat/test_sparkSQL.R |  8 ++
 4 files changed, 55 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/50e60e36/R/pkg/NAMESPACE
--
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 6a3d63f..636d39e 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -111,6 +111,7 @@ exportMethods("%in%",
   "add_months",
   "alias",
   "approxCountDistinct",
+  "approxQuantile",
   "array_contains",
   "asc",
   "ascii",

http://git-wip-us.apache.org/repos/asf/spark/blob/50e60e36/R/pkg/R/generics.R
--
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index ab61bce..3db72b5 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -67,6 +67,13 @@ setGeneric("crosstab", function(x, col1, col2) { 
standardGeneric("crosstab") })
 # @export
 setGeneric("freqItems", function(x, cols, support = 0.01) { 
standardGeneric("freqItems") })
 
+# @rdname statfunctions
+# @export
+setGeneric("approxQuantile",
+   function(x, col, probabilities, relativeError) {
+ standardGeneric("approxQuantile")
+   })
+
 # @rdname distinct
 # @export
 setGeneric("distinct", function(x, numPartitions = 1) { 
standardGeneric("distinct") })

http://git-wip-us.apache.org/repos/asf/spark/blob/50e60e36/R/pkg/R/stats.R
--
diff --git a/R/pkg/R/stats.R b/R/pkg/R/stats.R
index 2e80768..edf7293 100644
--- a/R/pkg/R/stats.R
+++ b/R/pkg/R/stats.R
@@ -130,6 +130,45 @@ setMethod("freqItems", signature(x = "DataFrame", cols = 
"character"),
 collect(dataFrame(sct))
   })
 
+#' approxQuantile
+#'
+#' Calculates the approximate quantiles of a numerical column of a DataFrame.
+#'
+#' The result of this algorithm has the following deterministic bound:
+#' If the DataFrame has N elements and if we request the quantile at 
probability `p` up to error
+#' `err`, then the algorithm will return a sample `x` from the DataFrame so 
that the *exact* rank
+#' of `x` is close to (p * N). More precisely,
+#'   floor((p - err) * N) <= rank(x) <= ceil((p + err) * N).
+#' This method implements a variation of the Greenwald-Khanna algorithm (with 
some speed
+#' optimizations). The algorithm was first present in 
[[http://dx.doi.org/10.1145/375663.375670
+#' Space-efficient Online Computation of Quantile Summaries]] by Greenwald and 
Khanna.
+#'
+#' @param x A SparkSQL DataFrame.
+#' @param col The name of the numerical column.
+#' @param probabilities A list of quantile probabilities. Each number must 
belong to [0, 1].
+#'  For example 0 is the minimum, 0.5 is the median, 1 is 
the maximum.
+#' @param relativeError The relative target precision to achieve (>= 0). If 
set to zero,
+#'  the exact quantiles are computed, which could be very 
expensive.
+#'  Note that values greater than 1 are accepted but give 
the same result as 1.
+#' @return The approximate quantiles at the given probabilities.
+#'
+#' @rdname statfunctions
+#' @name approxQuantile
+#' @export
+#' @examples
+#' \dontrun{
+#' df <- jsonFile(sqlContext, "/path/to/file.json")
+#' quantiles <- approxQuantile(df, "key", c(0.5, 0.8), 0.0)
+#' }
+setMethod("approxQuantile",
+  signature(x = "DataFrame", col = "character",
+probabilities = "numeric", relativeError = "numeric"),
+  function(x, col, probabilities, relativeError) {
+statFunctions <- callJMethod(x@sdf, "stat")
+

spark git commit: [SPARK-12363] [MLLIB] [BACKPORT-1.3] Remove setRun and fix PowerIterationClustering failed test

2016-02-25 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 6ddde8eda -> 65cc451c8


[SPARK-12363] [MLLIB] [BACKPORT-1.3] Remove setRun and fix 
PowerIterationClustering failed test

## What changes were proposed in this pull request?

Backport JIRA-SPARK-12363 to branch-1.3.

## How was the this patch tested?

Unit test.

cc mengxr

Author: Liang-Chi Hsieh 
Author: Xiangrui Meng 

Closes #11265 from viirya/backport-12363-1.3 and squashes the following commits:

ec076dd [Liang-Chi Hsieh] Fix scala style.
7a3ef5f [Xiangrui Meng] use Graph instead of GraphImpl and update tests and 
example based on PIC paper
b86018d [Liang-Chi Hsieh] Remove setRun and fix PowerIterationClustering failed 
test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/65cc451c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/65cc451c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/65cc451c

Branch: refs/heads/branch-1.3
Commit: 65cc451c89fd03daed8c315a91d93067ccdc3a5c
Parents: 6ddde8e
Author: Liang-Chi Hsieh 
Authored: Thu Feb 25 21:15:59 2016 -0800
Committer: Xiangrui Meng 
Committed: Thu Feb 25 21:15:59 2016 -0800

--
 .../mllib/PowerIterationClusteringExample.scala | 53 
 .../clustering/PowerIterationClustering.scala   | 18 ---
 .../PowerIterationClusteringSuite.scala | 48 +++---
 3 files changed, 60 insertions(+), 59 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/65cc451c/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
 
b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
index 3f98ddd..c5b171d 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
@@ -39,27 +39,23 @@ import org.apache.spark.{SparkConf, SparkContext}
  *   n:  Number of sampled points on innermost circle.. There are 
proportionally more points
  *  within the outer/larger circles
  *   maxIterations:   Number of Power Iterations
- *   outerRadius:  radius of the outermost of the concentric circles
  * }}}
  *
  * Here is a sample run and output:
  *
- * ./bin/run-example mllib.PowerIterationClusteringExample -k 3 --n 30 
--maxIterations 15
- *
- * Cluster assignments: 1 -> [0,1,2,3,4],2 -> [5,6,7,8,9,10,11,12,13,14],
- * 0 -> [15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
+ * ./bin/run-example mllib.PowerIterationClusteringExample -k 2 --n 10 
--maxIterations 15
  *
+ * Cluster assignments: 1 -> [0,1,2,3,4,5,6,7,8,9],
+ *   0 -> [10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
  *
  * If you use it as a template to create your own app, please use 
`spark-submit` to submit your app.
  */
 object PowerIterationClusteringExample {
 
   case class Params(
-  input: String = null,
-  k: Int = 3,
-  numPoints: Int = 5,
-  maxIterations: Int = 10,
-  outerRadius: Double = 3.0
+  k: Int = 2,
+  numPoints: Int = 10,
+  maxIterations: Int = 15
 ) extends AbstractParams[Params]
 
   def main(args: Array[String]) {
@@ -68,7 +64,7 @@ object PowerIterationClusteringExample {
 val parser = new OptionParser[Params]("PowerIterationClusteringExample") {
   head("PowerIterationClusteringExample: an example PIC app using 
concentric circles.")
   opt[Int]('k', "k")
-.text(s"number of circles (/clusters), default: ${defaultParams.k}")
+.text(s"number of circles (clusters), default: ${defaultParams.k}")
 .action((x, c) => c.copy(k = x))
   opt[Int]('n', "n")
 .text(s"number of points in smallest circle, default: 
${defaultParams.numPoints}")
@@ -76,9 +72,6 @@ object PowerIterationClusteringExample {
   opt[Int]("maxIterations")
 .text(s"number of iterations, default: ${defaultParams.maxIterations}")
 .action((x, c) => c.copy(maxIterations = x))
-  opt[Double]('r', "r")
-.text(s"radius of outermost circle, default: 
${defaultParams.outerRadius}")
-.action((x, c) => c.copy(outerRadius = x))
 }
 
 parser.parse(args, defaultParams).map { params =>
@@ -96,20 +89,21 @@ object PowerIterationClusteringExample {
 
 Logger.getRootLogger.setLevel(Level.WARN)
 
-val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, 
params.outerRadius)
+val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
 val model = new PowerIterationClustering()
 

spark git commit: [SPARK-13028] [ML] Add MaxAbsScaler to ML.feature as a transformer

2016-02-25 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 1b39fafa7 -> 90d07154c


[SPARK-13028] [ML] Add MaxAbsScaler to ML.feature as a transformer

jira: https://issues.apache.org/jira/browse/SPARK-13028
MaxAbsScaler works in a very similar way as MinMaxScaler, but scales in a way 
that the training data lies within the range [-1, 1] by dividing through the 
largest maximum value in each feature. The motivation to use this scaling 
includes robustness to very small standard deviations of features and 
preserving zero entries in sparse data.

Unlike StandardScaler and MinMaxScaler, MaxAbsScaler does not shift/center the 
data, and thus does not destroy any sparsity.

Something similar from sklearn:
http://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MaxAbsScaler.html#sklearn.preprocessing.MaxAbsScaler

Author: Yuhao Yang 

Closes #10939 from hhbyyh/maxabs and squashes the following commits:

fd8bdcd [Yuhao Yang] add tag and some optimization on fit
648fced [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into maxabs
75bebc2 [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into maxabs
cb10bb6 [Yuhao Yang] remove minmax
91ef8f3 [Yuhao Yang] ut added
8ab0747 [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into maxabs
a9215b5 [Yuhao Yang] max abs scaler


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90d07154
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90d07154
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90d07154

Branch: refs/heads/master
Commit: 90d07154c2cef3d1095cb3caeafa7003218a3e49
Parents: 1b39faf
Author: Yuhao Yang 
Authored: Thu Feb 25 21:04:35 2016 -0800
Committer: Xiangrui Meng 
Committed: Thu Feb 25 21:04:35 2016 -0800

--
 .../apache/spark/ml/feature/MaxAbsScaler.scala  | 176 +++
 .../spark/ml/feature/MaxAbsScalerSuite.scala|  70 
 2 files changed, 246 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/90d07154/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala
new file mode 100644
index 000..15c308b
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.{Estimator, Model}
+import org.apache.spark.ml.param.{ParamMap, Params}
+import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.linalg.{Vector, Vectors, VectorUDT}
+import org.apache.spark.mllib.stat.Statistics
+import org.apache.spark.sql._
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * Params for [[MaxAbsScaler]] and [[MaxAbsScalerModel]].
+ */
+private[feature] trait MaxAbsScalerParams extends Params with HasInputCol with 
HasOutputCol {
+
+   /** Validates and transforms the input schema. */
+  protected def validateAndTransformSchema(schema: StructType): StructType = {
+validateParams()
+val inputType = schema($(inputCol)).dataType
+require(inputType.isInstanceOf[VectorUDT],
+  s"Input column ${$(inputCol)} must be a vector column")
+require(!schema.fieldNames.contains($(outputCol)),
+  s"Output column ${$(outputCol)} already exists.")
+val outputFields = schema.fields :+ StructField($(outputCol), new 
VectorUDT, false)
+StructType(outputFields)
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Rescale each feature individually to range [-1, 1] by dividing through the 
largest maximum
+ * absolute value in each feature. It does not 

spark git commit: [SPARK-13361][SQL] Add benchmark codes for Encoder#compress() in CompressionSchemeBenchmark

2016-02-25 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 633d63a48 -> 1b39fafa7


[SPARK-13361][SQL] Add benchmark codes for Encoder#compress() in 
CompressionSchemeBenchmark

This pr added benchmark codes for Encoder#compress().
Also, it replaced the benchmark results with new ones because the output format 
of `Benchmark` changed.

Author: Takeshi YAMAMURO 

Closes #11236 from maropu/CompressionSpike.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b39fafa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b39fafa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b39fafa

Branch: refs/heads/master
Commit: 1b39fafa75a162f183824ff2daa61d73b05ebc83
Parents: 633d63a
Author: Takeshi YAMAMURO 
Authored: Thu Feb 25 20:17:48 2016 -0800
Committer: Reynold Xin 
Committed: Thu Feb 25 20:17:48 2016 -0800

--
 .../CompressionSchemeBenchmark.scala| 282 +--
 1 file changed, 193 insertions(+), 89 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1b39fafa/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
index 95eb5cf..a5d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
@@ -17,19 +17,13 @@
 
 package org.apache.spark.sql.execution.columnar.compression
 
-import java.nio.ByteBuffer
-import java.nio.ByteOrder
+import java.nio.{ByteBuffer, ByteOrder}
 
 import org.apache.commons.lang3.RandomStringUtils
 import org.apache.commons.math3.distribution.LogNormalDistribution
 
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
GenericMutableRow}
-import org.apache.spark.sql.execution.columnar.BOOLEAN
-import org.apache.spark.sql.execution.columnar.INT
-import org.apache.spark.sql.execution.columnar.LONG
-import org.apache.spark.sql.execution.columnar.NativeColumnType
-import org.apache.spark.sql.execution.columnar.SHORT
-import org.apache.spark.sql.execution.columnar.STRING
+import org.apache.spark.sql.execution.columnar.{BOOLEAN, INT, LONG, 
NativeColumnType, SHORT, STRING}
 import org.apache.spark.sql.types.AtomicType
 import org.apache.spark.util.Benchmark
 import org.apache.spark.util.Utils._
@@ -53,35 +47,70 @@ object CompressionSchemeBenchmark extends 
AllCompressionSchemes {
 () => rng.sample
   }
 
-  private[this] def runBenchmark[T <: AtomicType](
+  private[this] def prepareEncodeInternal[T <: AtomicType](
+count: Int,
+tpe: NativeColumnType[T],
+supportedScheme: CompressionScheme,
+input: ByteBuffer): ((ByteBuffer, ByteBuffer) => ByteBuffer, Double, 
ByteBuffer) = {
+assert(supportedScheme.supports(tpe))
+
+def toRow(d: Any) = new GenericInternalRow(Array[Any](d))
+val encoder = supportedScheme.encoder(tpe)
+for (i <- 0 until count) {
+  encoder.gatherCompressibilityStats(toRow(tpe.extract(input)), 0)
+}
+input.rewind()
+
+val compressedSize = if (encoder.compressedSize == 0) {
+  input.remaining()
+} else {
+  encoder.compressedSize
+}
+
+(encoder.compress, encoder.compressionRatio, allocateLocal(4 + 
compressedSize))
+  }
+
+  private[this] def runEncodeBenchmark[T <: AtomicType](
   name: String,
   iters: Int,
   count: Int,
   tpe: NativeColumnType[T],
   input: ByteBuffer): Unit = {
-
 val benchmark = new Benchmark(name, iters * count)
 
 schemes.filter(_.supports(tpe)).map { scheme =>
-  def toRow(d: Any) = new GenericInternalRow(Array[Any](d))
-  val encoder = scheme.encoder(tpe)
-  for (i <- 0 until count) {
-encoder.gatherCompressibilityStats(toRow(tpe.extract(input)), 0)
-  }
-  input.rewind()
+  val (compressFunc, compressionRatio, buf) = prepareEncodeInternal(count, 
tpe, scheme, input)
+  val label = 
s"${getFormattedClassName(scheme)}(${compressionRatio.formatted("%.3f")})"
 
-  val label = 
s"${getFormattedClassName(scheme)}(${encoder.compressionRatio.formatted("%.3f")})"
   benchmark.addCase(label)({ i: Int =>
-val compressedSize = if (encoder.compressedSize == 0) {
-  input.remaining()
-} else {
-  encoder.compressedSize
+for (n <- 0L until iters) {
+  compressFunc(input, buf)
+  input.rewind()
+ 

[2/2] spark git commit: [SPARK-12757] Add block-level read/write locks to BlockManager

2016-02-25 Thread andrewor14
[SPARK-12757] Add block-level read/write locks to BlockManager

## Motivation

As a pre-requisite to off-heap caching of blocks, we need a mechanism to 
prevent pages / blocks from being evicted while they are being read. With 
on-heap objects, evicting a block while it is being read merely leads to 
memory-accounting problems (because we assume that an evicted block is a 
candidate for garbage-collection, which will not be true during a read), but 
with off-heap memory this will lead to either data corruption or segmentation 
faults.

## Changes

### BlockInfoManager and reader/writer locks

This patch adds block-level read/write locks to the BlockManager. It introduces 
a new `BlockInfoManager` component, which is contained within the 
`BlockManager`, holds the `BlockInfo` objects that the `BlockManager` uses for 
tracking block metadata, and exposes APIs for locking blocks in either shared 
read or exclusive write modes.

`BlockManager`'s `get*()` and `put*()` methods now implicitly acquire the 
necessary locks. After a `get()` call successfully retrieves a block, that 
block is locked in a shared read mode. A `put()` call will block until it 
acquires an exclusive write lock. If the write succeeds, the write lock will be 
downgraded to a shared read lock before returning to the caller. This `put()` 
locking behavior allows us store a block and then immediately turn around and 
read it without having to worry about it having been evicted between the write 
and the read, which will allow us to significantly simplify `CacheManager` in 
the future (see #10748).

See `BlockInfoManagerSuite`'s test cases for a more detailed specification of 
the locking semantics.

### Auto-release of locks at the end of tasks

Our locking APIs support explicit release of locks (by calling `unlock()`), but 
it's not always possible to guarantee that locks will be released prior to the 
end of the task. One reason for this is our iterator interface: since our 
iterators don't support an explicit `close()` operator to signal that no more 
records will be consumed, operations like `take()` or `limit()` don't have a 
good means to release locks on their input iterators' blocks. Another example 
is broadcast variables, whose block locks can only be released at the end of 
the task.

To address this, `BlockInfoManager` uses a pair of maps to track the set of 
locks acquired by each task. Lock acquisitions automatically record the current 
task attempt id by obtaining it from `TaskContext`. When a task finishes, code 
in `Executor` calls `BlockInfoManager.unlockAllLocksForTask(taskAttemptId)` to 
free locks.

### Locking and the MemoryStore

In order to prevent in-memory blocks from being evicted while they are being 
read, the `MemoryStore`'s `evictBlocksToFreeSpace()` method acquires write 
locks on blocks which it is considering as candidates for eviction. These lock 
acquisitions are non-blocking, so a block which is being read will not be 
evicted. By holding write locks until the eviction is performed or skipped (in 
case evicting the blocks would not free enough memory), we avoid a race where a 
new reader starts to read a block after the block has been marked as an 
eviction candidate but before it has been removed.

### Locking and remote block transfer

This patch makes small changes to to block transfer and network layer code so 
that locks acquired by the BlockTransferService are released as soon as block 
transfer messages are consumed and released by Netty. This builds on top of 
#11193, a bug fix related to freeing of network layer ManagedBuffers.

## FAQ

- **Why not use Java's built-in 
[`ReadWriteLock`](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReadWriteLock.html)?**

  Our locks operate on a per-task rather than per-thread level. Under certain 
circumstances a task may consist of multiple threads, so using `ReadWriteLock` 
would mean that we might call `unlock()` from a thread which didn't hold the 
lock in question, an operation which has undefined semantics. If we could rely 
on Java 8 classes, we might be able to use 
[`StampedLock`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/StampedLock.html)
 to work around this issue.

- **Why not detect "leaked" locks in tests?**:

  See above notes about `take()` and `limit`.

Author: Josh Rosen 

Closes #10705 from JoshRosen/pin-pages.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/633d63a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/633d63a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/633d63a4

Branch: refs/heads/master
Commit: 633d63a48ad98754dc7c56f9ac150fc2aa4e42c5
Parents: 7129957
Author: Josh Rosen 
Authored: Thu Feb 25 17:17:56 2016 -0800
Committer: Andrew Or 
Committed: Thu Feb 25 17:17:56 2016 -0800


[1/2] spark git commit: [SPARK-12757] Add block-level read/write locks to BlockManager

2016-02-25 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 712995757 -> 633d63a48


http://git-wip-us.apache.org/repos/asf/spark/blob/633d63a4/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index e1b2c96..e4ab9ee 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -45,6 +45,8 @@ import org.apache.spark.util._
 class BlockManagerSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterEach
   with PrivateMethodTester with ResetSystemProperties {
 
+  import BlockManagerSuite._
+
   var conf: SparkConf = null
   var store: BlockManager = null
   var store2: BlockManager = null
@@ -66,6 +68,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   maxMem: Long,
   name: String = SparkContext.DRIVER_IDENTIFIER,
   master: BlockManagerMaster = this.master): BlockManager = {
+val serializer = new KryoSerializer(conf)
 val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 
1)
 val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, 
numCores = 1)
 val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf,
@@ -169,14 +172,14 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 val a3 = new Array[Byte](4000)
 
 // Putting a1, a2  and a3 in memory and telling master only about a1 and a2
-store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
-store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
-store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
+store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
+store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY)
+store.putSingleAndReleaseLock("a3", a3, StorageLevel.MEMORY_ONLY, 
tellMaster = false)
 
 // Checking whether blocks are in memory
-assert(store.getSingle("a1").isDefined, "a1 was not in store")
-assert(store.getSingle("a2").isDefined, "a2 was not in store")
-assert(store.getSingle("a3").isDefined, "a3 was not in store")
+assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in 
store")
+assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in 
store")
+assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in 
store")
 
 // Checking whether master knows about the blocks or not
 assert(master.getLocations("a1").size > 0, "master was not told about a1")
@@ -184,10 +187,10 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 assert(master.getLocations("a3").size === 0, "master was told about a3")
 
 // Drop a1 and a2 from memory; this should be reported back to the master
-store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer])
-store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer])
-assert(store.getSingle("a1") === None, "a1 not removed from store")
-assert(store.getSingle("a2") === None, "a2 not removed from store")
+store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], 
ByteBuffer])
+store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], 
ByteBuffer])
+assert(store.getSingleAndReleaseLock("a1") === None, "a1 not removed from 
store")
+assert(store.getSingleAndReleaseLock("a2") === None, "a2 not removed from 
store")
 assert(master.getLocations("a1").size === 0, "master did not remove a1")
 assert(master.getLocations("a2").size === 0, "master did not remove a2")
   }
@@ -202,7 +205,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
 
 val a1 = new Array[Byte](400)
 val a2 = new Array[Byte](400)
-store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
+store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_2)
 store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
 assert(master.getLocations("a1").size === 2, "master did not report 2 
locations for a1")
 assert(master.getLocations("a2").size === 2, "master did not report 2 
locations for a2")
@@ -215,17 +218,17 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 val a3 = new Array[Byte](4000)
 
 // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
-store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
-store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
-store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = 
false)
+store.putSingleAndReleaseLock("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
+store.putSingleAndReleaseLock("a2-to-remove", a2, 

spark git commit: [SPARK-13387][MESOS] Add support for SPARK_DAEMON_JAVA_OPTS with MesosClusterDispatcher.

2016-02-25 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master f2cfafdfe -> 712995757


[SPARK-13387][MESOS] Add support for SPARK_DAEMON_JAVA_OPTS with 
MesosClusterDispatcher.

## What changes were proposed in this pull request?

Add support for SPARK_DAEMON_JAVA_OPTS with MesosClusterDispatcher.

## How was the this patch tested?

Manual testing by launching dispatcher with SPARK_DAEMON_JAVA_OPTS

Author: Timothy Chen 

Closes #11277 from tnachen/cluster_dispatcher_opts.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71299575
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71299575
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71299575

Branch: refs/heads/master
Commit: 712995757c22a0bd76e4ccb552446372acf2cc2e
Parents: f2cfafd
Author: Timothy Chen 
Authored: Thu Feb 25 17:07:58 2016 -0800
Committer: Andrew Or 
Committed: Thu Feb 25 17:07:58 2016 -0800

--
 .../org/apache/spark/launcher/SparkClassCommandBuilder.java| 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/71299575/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
--
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
 
b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
index 931a24c..e575fd3 100644
--- 
a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
+++ 
b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
@@ -48,8 +48,8 @@ class SparkClassCommandBuilder extends AbstractCommandBuilder 
{
 String memKey = null;
 String extraClassPath = null;
 
-// Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and 
specific opts) +
-// SPARK_DAEMON_MEMORY.
+// Master, Worker, HistoryServer, ExternalShuffleService, 
MesosClusterDispatcher use
+// SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
 if (className.equals("org.apache.spark.deploy.master.Master")) {
   javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
   javaOptsKeys.add("SPARK_MASTER_OPTS");
@@ -69,6 +69,8 @@ class SparkClassCommandBuilder extends AbstractCommandBuilder 
{
 } else if 
(className.equals("org.apache.spark.executor.MesosExecutorBackend")) {
   javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
   memKey = "SPARK_EXECUTOR_MEMORY";
+} else if 
(className.equals("org.apache.spark.deploy.mesos.MesosClusterDispatcher")) {
+  javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
 } else if 
(className.equals("org.apache.spark.deploy.ExternalShuffleService") ||
 
className.equals("org.apache.spark.deploy.mesos.MesosExternalShuffleService")) {
   javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-13501] Remove use of Guava Stopwatch

2016-02-25 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 7a6ee8a8f -> f2cfafdfe


[SPARK-13501] Remove use of Guava Stopwatch

Our nightly doc snapshot builds are failing due to some issue involving the 
Guava Stopwatch constructor:

```
[error] 
/home/jenkins/workspace/spark-master-docs/spark/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala:496:
 constructor Stopwatch in class Stopwatch cannot be accessed in class 
CoarseMesosSchedulerBackend
[error] val stopwatch = new Stopwatch()
[error] ^
```

This Stopwatch constructor was deprecated in newer versions of Guava 
(https://github.com/google/guava/commit/fd0cbc2c5c90e85fb22c8e86ea19630032090943)
 and it's possible that some classpath issues affecting Unidoc could be causing 
this to trigger compilation failures.

In order to work around this issue, this patch removes this use of Stopwatch 
since we don't use it anywhere else in the Spark codebase.

Author: Josh Rosen 

Closes #11376 from JoshRosen/remove-stopwatch.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f2cfafdf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f2cfafdf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f2cfafdf

Branch: refs/heads/master
Commit: f2cfafdfe0f4b18f31bc63969e2abced1a66e896
Parents: 7a6ee8a
Author: Josh Rosen 
Authored: Thu Feb 25 17:04:43 2016 -0800
Committer: Andrew Or 
Committed: Thu Feb 25 17:04:43 2016 -0800

--
 .../scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala | 7 ++-
 1 file changed, 2 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f2cfafdf/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index f803cc7..622f361 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -19,14 +19,12 @@ package org.apache.spark.scheduler.cluster.mesos
 
 import java.io.File
 import java.util.{Collections, List => JList}
-import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.{Buffer, HashMap, HashSet}
 
-import com.google.common.base.Stopwatch
 import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver}
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
 
@@ -493,12 +491,11 @@ private[spark] class CoarseMesosSchedulerBackend(
 
 // Wait for executors to report done, or else mesosDriver.stop() will 
forcefully kill them.
 // See SPARK-12330
-val stopwatch = new Stopwatch()
-stopwatch.start()
+val startTime = System.nanoTime()
 
 // slaveIdsWithExecutors has no memory barrier, so this is eventually 
consistent
 while (numExecutors() > 0 &&
-  stopwatch.elapsed(TimeUnit.MILLISECONDS) < shutdownTimeoutMS) {
+  System.nanoTime() - startTime < shutdownTimeoutMS * 1000L * 1000L) {
   Thread.sleep(100)
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12009][YARN] Avoid to re-allocating yarn container while driver want to stop all Executors

2016-02-25 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master dc6c5ea4c -> 7a6ee8a8f


[SPARK-12009][YARN] Avoid to re-allocating yarn container while driver want to 
stop all Executors

Author: hushan 

Closes #9992 from suyanNone/tricky.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a6ee8a8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a6ee8a8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a6ee8a8

Branch: refs/heads/master
Commit: 7a6ee8a8fe0fad78416ed7e1ac694959de5c5314
Parents: dc6c5ea
Author: hushan 
Authored: Thu Feb 25 16:57:41 2016 -0800
Committer: Andrew Or 
Committed: Thu Feb 25 16:57:41 2016 -0800

--
 .../org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala | 3 +++
 1 file changed, 3 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7a6ee8a8/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
--
diff --git 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 1431bce..ca26277 100644
--- 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -83,6 +83,9 @@ private[spark] abstract class YarnSchedulerBackend(
 
   override def stop(): Unit = {
 try {
+  // SPARK-12009: To prevent Yarn allocator from requesting backup for the 
executors which
+  // was Stopped by SchedulerBackend.
+  requestTotalExecutors(0, 0, Map.empty)
   super.stop()
 } finally {
   services.stop()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-13468][WEB UI] Fix a corner case where the Stage UI page should show DAG but it doesn't show

2016-02-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 35316cb0b -> dc6c5ea4c


[SPARK-13468][WEB UI] Fix a corner case where the Stage UI page should show DAG 
but it doesn't show

When uses clicks more than one time on any stage in the DAG graph on the *Job* 
web UI page, many new *Stage* web UI pages are opened, but only half of their 
DAG graphs are expanded.

After this PR's fix, every newly opened *Stage* page's DAG graph is expanded.

Before:
![](https://cloud.githubusercontent.com/assets/15843379/13279144/74808e86-db10-11e5-8514-cecf31af8908.png)

After:
![](https://cloud.githubusercontent.com/assets/15843379/13279145/77ca5dec-db10-11e5-9457-8e1985461328.png)

## What changes were proposed in this pull request?

- Removed the `expandDagViz` parameter for _Stage_ page and related codes
- Added a `onclick` function setting `expandDagVizArrowKey(false)` as `true`

## How was this patch tested?

Manual tests (with this fix) to verified this fix work:
- clicked many times on _Job_ Page's DAG Graph → each newly opened Stage 
page's DAG graph is expanded

Manual tests (with this fix) to verified this fix do not break features we 
already had:
- refreshed many times for a same _Stage_ page (whose DAG already expanded) → 
DAG remained expanded upon every refresh
- refreshed many times for a same _Stage_ page (whose DAG unexpanded) → DAG 
remained unexpanded upon every refresh
- refreshed many times for a same _Job_ page (whose DAG already expanded) → 
DAG remained expanded upon every refresh
- refreshed many times for a same _Job_ page (whose DAG unexpanded) → DAG 
remained unexpanded upon every refresh

Author: Liwei Lin 

Closes #11368 from proflin/SPARK-13468.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc6c5ea4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc6c5ea4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc6c5ea4

Branch: refs/heads/master
Commit: dc6c5ea4c91c387deb87764c86c4f40ea71657b7
Parents: 35316cb
Author: Liwei Lin 
Authored: Thu Feb 25 15:36:25 2016 -0800
Committer: Shixiong Zhu 
Committed: Thu Feb 25 15:36:25 2016 -0800

--
 .../org/apache/spark/ui/static/spark-dag-viz.js |  3 ++-
 core/src/main/scala/org/apache/spark/ui/UIUtils.scala   |  7 ---
 .../main/scala/org/apache/spark/ui/jobs/StagePage.scala | 12 
 3 files changed, 2 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/dc6c5ea4/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
--
diff --git 
a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js 
b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
index 4337c42..1b0d469 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
@@ -222,10 +222,11 @@ function renderDagVizForJob(svgContainer) {
   var attemptId = 0
   var stageLink = d3.select("#stage-" + stageId + "-" + attemptId)
 .select("a.name-link")
-.attr("href") + "=true";
+.attr("href");
   container = svgContainer
 .append("a")
 .attr("xlink:href", stageLink)
+.attr("onclick", 
"window.localStorage.setItem(expandDagVizArrowKey(false), true)")
 .append("g")
 .attr("id", containerId);
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dc6c5ea4/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index ddd7f71..0493513 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -408,13 +408,6 @@ private[spark] object UIUtils extends Logging {
 
   }
 
-  /** Return a script element that automatically expands the DAG visualization 
on page load. */
-  def expandDagVizOnLoad(forJob: Boolean): Seq[Node] = {
-
-  {Unparsed("$(document).ready(function() { toggleDagViz(" + forJob + ") 
});")}
-
-  }
-
   /**
* Returns HTML rendering of a job or stage description. It will try to 
parse the string as HTML
* and make sure that it only contains anchors with root-relative links. 
Otherwise,

http://git-wip-us.apache.org/repos/asf/spark/blob/dc6c5ea4/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala 

spark git commit: [SPARK-13292] [ML] [PYTHON] QuantileDiscretizer should take random seed in PySpark

2016-02-25 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 14e2700de -> 35316cb0b


[SPARK-13292] [ML] [PYTHON] QuantileDiscretizer should take random seed in 
PySpark

## What changes were proposed in this pull request?
QuantileDiscretizer in Python should also specify a random seed.

## How was this patch tested?
unit tests

Author: Yu ISHIKAWA 

Closes #11362 from yu-iskw/SPARK-13292 and squashes the following commits:

02ffa76 [Yu ISHIKAWA] [SPARK-13292][ML][PYTHON] QuantileDiscretizer should take 
random seed in PySpark


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/35316cb0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/35316cb0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/35316cb0

Branch: refs/heads/master
Commit: 35316cb0b744bef9bcb390411ddc321167f953be
Parents: 14e2700
Author: Yu ISHIKAWA 
Authored: Thu Feb 25 13:29:10 2016 -0800
Committer: Xiangrui Meng 
Committed: Thu Feb 25 13:29:10 2016 -0800

--
 python/pyspark/ml/feature.py | 14 --
 1 file changed, 8 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/35316cb0/python/pyspark/ml/feature.py
--
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 464c944..67bccfa 100644
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -939,7 +939,7 @@ class PolynomialExpansion(JavaTransformer, HasInputCol, 
HasOutputCol):
 
 
 @inherit_doc
-class QuantileDiscretizer(JavaEstimator, HasInputCol, HasOutputCol):
+class QuantileDiscretizer(JavaEstimator, HasInputCol, HasOutputCol, HasSeed):
 """
 .. note:: Experimental
 
@@ -951,7 +951,9 @@ class QuantileDiscretizer(JavaEstimator, HasInputCol, 
HasOutputCol):
 
 >>> df = sqlContext.createDataFrame([(0.1,), (0.4,), (1.2,), (1.5,)], 
["values"])
 >>> qds = QuantileDiscretizer(numBuckets=2,
-... inputCol="values", outputCol="buckets")
+... inputCol="values", outputCol="buckets", seed=123)
+>>> qds.getSeed()
+123
 >>> bucketizer = qds.fit(df)
 >>> splits = bucketizer.getSplits()
 >>> splits[0]
@@ -971,9 +973,9 @@ class QuantileDiscretizer(JavaEstimator, HasInputCol, 
HasOutputCol):
"categories) into which data points are grouped. Must 
be >= 2. Default 2.")
 
 @keyword_only
-def __init__(self, numBuckets=2, inputCol=None, outputCol=None):
+def __init__(self, numBuckets=2, inputCol=None, outputCol=None, seed=None):
 """
-__init__(self, numBuckets=2, inputCol=None, outputCol=None)
+__init__(self, numBuckets=2, inputCol=None, outputCol=None, seed=None)
 """
 super(QuantileDiscretizer, self).__init__()
 self._java_obj = 
self._new_java_obj("org.apache.spark.ml.feature.QuantileDiscretizer",
@@ -987,9 +989,9 @@ class QuantileDiscretizer(JavaEstimator, HasInputCol, 
HasOutputCol):
 
 @keyword_only
 @since("2.0.0")
-def setParams(self, numBuckets=2, inputCol=None, outputCol=None):
+def setParams(self, numBuckets=2, inputCol=None, outputCol=None, 
seed=None):
 """
-setParams(self, numBuckets=2, inputCol=None, outputCol=None)
+setParams(self, numBuckets=2, inputCol=None, outputCol=None, seed=None)
 Set the params for the QuantileDiscretizer
 """
 kwargs = self.setParams._input_kwargs


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12874][ML] ML StringIndexer does not protect itself from column name duplication

2016-02-25 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 d59a08f7c -> abe8f991a


[SPARK-12874][ML] ML StringIndexer does not protect itself from column name 
duplication

## What changes were proposed in this pull request?
ML StringIndexer does not protect itself from column name duplication.

We should still improve a way to validate a schema of `StringIndexer` and 
`StringIndexerModel`.  However, it would be great to fix at another issue.

## How was this patch tested?
unit test

Author: Yu ISHIKAWA 

Closes #11370 from yu-iskw/SPARK-12874.

(cherry picked from commit 14e2700de29d06460179a94cc9816bcd37344cf7)
Signed-off-by: Xiangrui Meng 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abe8f991
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abe8f991
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abe8f991

Branch: refs/heads/branch-1.6
Commit: abe8f991a32bef92fbbcd2911836bb7d8e61ca97
Parents: d59a08f
Author: Yu ISHIKAWA 
Authored: Thu Feb 25 13:21:33 2016 -0800
Committer: Xiangrui Meng 
Committed: Thu Feb 25 13:23:44 2016 -0800

--
 .../org/apache/spark/ml/feature/StringIndexer.scala  |  1 +
 .../org/apache/spark/ml/feature/StringIndexerSuite.scala | 11 +++
 2 files changed, 12 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/abe8f991/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
index 5c40c35..b3413a1 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
@@ -149,6 +149,7 @@ class StringIndexerModel (
 "Skip StringIndexerModel.")
   return dataset
 }
+validateAndTransformSchema(dataset.schema)
 
 val indexer = udf { label: String =>
   if (labelToIndex.contains(label)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/abe8f991/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
index 749bfac..26f4613 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
@@ -118,6 +118,17 @@ class StringIndexerSuite
 assert(indexerModel.transform(df).eq(df))
   }
 
+  test("StringIndexerModel can't overwrite output column") {
+val df = sqlContext.createDataFrame(Seq((1, 2), (3, 4))).toDF("input", 
"output")
+val indexer = new StringIndexer()
+  .setInputCol("input")
+  .setOutputCol("output")
+  .fit(df)
+intercept[IllegalArgumentException] {
+  indexer.transform(df)
+}
+  }
+
   test("StringIndexer read/write") {
 val t = new StringIndexer()
   .setInputCol("myInputCol")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12874][ML] ML StringIndexer does not protect itself from column name duplication

2016-02-25 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master fb8bb0476 -> 14e2700de


[SPARK-12874][ML] ML StringIndexer does not protect itself from column name 
duplication

## What changes were proposed in this pull request?
ML StringIndexer does not protect itself from column name duplication.

We should still improve a way to validate a schema of `StringIndexer` and 
`StringIndexerModel`.  However, it would be great to fix at another issue.

## How was this patch tested?
unit test

Author: Yu ISHIKAWA 

Closes #11370 from yu-iskw/SPARK-12874.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14e2700d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14e2700d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14e2700d

Branch: refs/heads/master
Commit: 14e2700de29d06460179a94cc9816bcd37344cf7
Parents: fb8bb04
Author: Yu ISHIKAWA 
Authored: Thu Feb 25 13:21:33 2016 -0800
Committer: Xiangrui Meng 
Committed: Thu Feb 25 13:21:33 2016 -0800

--
 .../org/apache/spark/ml/feature/StringIndexer.scala  |  1 +
 .../org/apache/spark/ml/feature/StringIndexerSuite.scala | 11 +++
 2 files changed, 12 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/14e2700d/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
index 912bd95..555f113 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
@@ -150,6 +150,7 @@ class StringIndexerModel (
 "Skip StringIndexerModel.")
   return dataset
 }
+validateAndTransformSchema(dataset.schema)
 
 val indexer = udf { label: String =>
   if (labelToIndex.contains(label)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/14e2700d/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
index 5d199ca..0dbaed2 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
@@ -118,6 +118,17 @@ class StringIndexerSuite
 assert(indexerModel.transform(df).eq(df))
   }
 
+  test("StringIndexerModel can't overwrite output column") {
+val df = sqlContext.createDataFrame(Seq((1, 2), (3, 4))).toDF("input", 
"output")
+val indexer = new StringIndexer()
+  .setInputCol("input")
+  .setOutputCol("output")
+  .fit(df)
+intercept[IllegalArgumentException] {
+  indexer.transform(df)
+}
+  }
+
   test("StringIndexer read/write") {
 val t = new StringIndexer()
   .setInputCol("myInputCol")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-13069][STREAMING] Add "ask" style store() to ActorReciever

2016-02-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 751724b13 -> fb8bb0476


[SPARK-13069][STREAMING] Add "ask" style store() to ActorReciever

Introduces a "ask" style ```store``` in ```ActorReceiver``` as a way to allow 
actor receiver blocked by back pressure or maxRate.

Author: Lin Zhao 

Closes #11176 from lin-zhao/SPARK-13069.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb8bb047
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb8bb047
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb8bb047

Branch: refs/heads/master
Commit: fb8bb04766005e8935607069c0155d639f407e8a
Parents: 751724b
Author: Lin Zhao 
Authored: Thu Feb 25 12:32:17 2016 -0800
Committer: Shixiong Zhu 
Committed: Thu Feb 25 12:32:24 2016 -0800

--
 .../spark/streaming/akka/ActorReceiver.scala| 39 +++-
 .../streaming/akka/JavaAkkaUtilsSuite.java  |  2 +
 .../spark/streaming/akka/AkkaUtilsSuite.scala   |  3 ++
 3 files changed, 43 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fb8bb047/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
--
diff --git 
a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
 
b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
index c75dc92..33415c1 100644
--- 
a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
+++ 
b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
@@ -20,12 +20,15 @@ package org.apache.spark.streaming.akka
 import java.nio.ByteBuffer
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.language.postfixOps
 import scala.reflect.ClassTag
 
 import akka.actor._
 import akka.actor.SupervisorStrategy.{Escalate, Restart}
+import akka.pattern.ask
+import akka.util.Timeout
 import com.typesafe.config.ConfigFactory
 
 import org.apache.spark.{Logging, TaskContext}
@@ -105,13 +108,26 @@ abstract class ActorReceiver extends Actor {
   }
 
   /**
-   * Store a single item of received data to Spark's memory.
+   * Store a single item of received data to Spark's memory asynchronously.
* These single items will be aggregated together into data blocks before
* being pushed into Spark's memory.
*/
   def store[T](item: T) {
 context.parent ! SingleItemData(item)
   }
+
+  /**
+   * Store a single item of received data to Spark's memory and returns a 
`Future`.
+   * The `Future` will be completed when the operator finishes, or with an
+   * `akka.pattern.AskTimeoutException` after the given timeout has expired.
+   * These single items will be aggregated together into data blocks before
+   * being pushed into Spark's memory.
+   *
+   * This method allows the user to control the flow speed using `Future`
+   */
+  def store[T](item: T, timeout: Timeout): Future[Unit] = {
+context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => 
())(context.dispatcher)
+  }
 }
 
 /**
@@ -162,6 +178,19 @@ abstract class JavaActorReceiver extends UntypedActor {
   def store[T](item: T) {
 context.parent ! SingleItemData(item)
   }
+
+  /**
+   * Store a single item of received data to Spark's memory and returns a 
`Future`.
+   * The `Future` will be completed when the operator finishes, or with an
+   * `akka.pattern.AskTimeoutException` after the given timeout has expired.
+   * These single items will be aggregated together into data blocks before
+   * being pushed into Spark's memory.
+   *
+   * This method allows the user to control the flow speed using `Future`
+   */
+  def store[T](item: T, timeout: Timeout): Future[Unit] = {
+context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => 
())(context.dispatcher)
+  }
 }
 
 /**
@@ -179,8 +208,10 @@ case class Statistics(numberOfMsgs: Int,
 /** Case class to receive data sent by child actors */
 private[akka] sealed trait ActorReceiverData
 private[akka] case class SingleItemData[T](item: T) extends ActorReceiverData
+private[akka] case class AskStoreSingleItemData[T](item: T) extends 
ActorReceiverData
 private[akka] case class IteratorData[T](iterator: Iterator[T]) extends 
ActorReceiverData
 private[akka] case class ByteBufferData(bytes: ByteBuffer) extends 
ActorReceiverData
+private[akka] object Ack extends ActorReceiverData
 
 /**
  * Provides Actors as receivers for receiving stream.
@@ -233,6 +264,12 @@ private[akka] class ActorReceiverSupervisor[T: ClassTag](
 store(msg.asInstanceOf[T])
 n.incrementAndGet
 
+  case AskStoreSingleItemData(msg) =>
+ 

spark git commit: [SPARK-13464][STREAMING][PYSPARK] Fix failed streaming in pyspark in branch 1.3

2016-02-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 387d81891 -> 6ddde8eda


[SPARK-13464][STREAMING][PYSPARK] Fix failed streaming in pyspark in branch 1.3

JIRA: https://issues.apache.org/jira/browse/SPARK-13464

## What changes were proposed in this pull request?

During backport a mllib feature, I found that the clearly checkouted branch-1.3 
codebase would fail at the test 
`test_reduce_by_key_and_window_with_none_invFunc` in pyspark/streaming. We 
should fix it.

## How was the this patch tested?

Unit test `test_reduce_by_key_and_window_with_none_invFunc` is fixed.

Author: Liang-Chi Hsieh 

Closes #11339 from viirya/fix-streaming-test-branch-1.3.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ddde8ed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ddde8ed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ddde8ed

Branch: refs/heads/branch-1.3
Commit: 6ddde8eda28538e8f899912477a821f8193262ba
Parents: 387d818
Author: Liang-Chi Hsieh 
Authored: Thu Feb 25 12:28:50 2016 -0800
Committer: Shixiong Zhu 
Committed: Thu Feb 25 12:28:50 2016 -0800

--
 python/pyspark/streaming/tests.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6ddde8ed/python/pyspark/streaming/tests.py
--
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index c136bf1..12293a0 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -423,7 +423,7 @@ class WindowFunctionTests(PySparkStreamingTestCase):
 .reduceByKeyAndWindow(operator.add, None, 5, 1)\
 .filter(lambda kv: kv[1] > 0).count()
 
-expected = [[2], [4], [6], [6], [6], [6]]
+expected = [[1], [2], [3], [4], [5], [6]]
 self._test_func(input, func, expected)
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Revert "[SPARK-13444][MLLIB] QuantileDiscretizer chooses bad splits on large DataFrames"

2016-02-25 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 5f7440b25 -> d59a08f7c


Revert "[SPARK-13444][MLLIB] QuantileDiscretizer chooses bad splits on large 
DataFrames"

This reverts commit cb869a143d338985c3d99ef388dd78b1e3d90a73.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d59a08f7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d59a08f7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d59a08f7

Branch: refs/heads/branch-1.6
Commit: d59a08f7c1c455d86e7ee3d6522a3e9c55f9ee02
Parents: 5f7440b
Author: Xiangrui Meng 
Authored: Thu Feb 25 12:28:03 2016 -0800
Committer: Xiangrui Meng 
Committed: Thu Feb 25 12:28:03 2016 -0800

--
 .../spark/ml/feature/QuantileDiscretizer.scala  | 11 ++-
 .../ml/feature/QuantileDiscretizerSuite.scala   | 20 
 2 files changed, 2 insertions(+), 29 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d59a08f7/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
index cd5085a..7bf67c6 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
@@ -97,13 +97,6 @@ final class QuantileDiscretizer(override val uid: String)
 
 @Since("1.6.0")
 object QuantileDiscretizer extends DefaultParamsReadable[QuantileDiscretizer] 
with Logging {
-
-  /**
-   * Minimum number of samples required for finding splits, regardless of 
number of bins.  If
-   * the dataset has fewer rows than this value, the entire dataset will be 
used.
-   */
-  private[spark] val minSamplesRequired: Int = 1
-
   /**
* Sampling from the given dataset to collect quantile statistics.
*/
@@ -111,8 +104,8 @@ object QuantileDiscretizer extends 
DefaultParamsReadable[QuantileDiscretizer] wi
 val totalSamples = dataset.count()
 require(totalSamples > 0,
   "QuantileDiscretizer requires non-empty input dataset but was given an 
empty input.")
-val requiredSamples = math.max(numBins * numBins, minSamplesRequired)
-val fraction = math.min(requiredSamples.toDouble / dataset.count(), 1.0)
+val requiredSamples = math.max(numBins * numBins, 1)
+val fraction = math.min(requiredSamples / dataset.count(), 1.0)
 dataset.sample(withReplacement = false, fraction, new 
XORShiftRandom().nextInt()).collect()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d59a08f7/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
index 32bfa43..3a4f6d2 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
@@ -71,26 +71,6 @@ class QuantileDiscretizerSuite
 }
   }
 
-  test("Test splits on dataset larger than minSamplesRequired") {
-val sqlCtx = SQLContext.getOrCreate(sc)
-import sqlCtx.implicits._
-
-val datasetSize = QuantileDiscretizer.minSamplesRequired + 1
-val numBuckets = 5
-val df = sc.parallelize((1.0 to datasetSize by 
1.0).map(Tuple1.apply)).toDF("input")
-val discretizer = new QuantileDiscretizer()
-  .setInputCol("input")
-  .setOutputCol("result")
-  .setNumBuckets(numBuckets)
-  .setSeed(1)
-
-val result = discretizer.fit(df).transform(df)
-val observedNumBuckets = result.select("result").distinct.count
-
-assert(observedNumBuckets === numBuckets,
-  "Observed number of buckets does not equal expected number of buckets.")
-  }
-
   test("read/write") {
 val t = new QuantileDiscretizer()
   .setInputCol("myInputCol")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Revert "[SPARK-13457][SQL] Removes DataFrame RDD operations"

2016-02-25 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 46f6e7931 -> 751724b13


Revert "[SPARK-13457][SQL] Removes DataFrame RDD operations"

This reverts commit 157fe64f3ecbd13b7286560286e50235eecfe30e.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/751724b1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/751724b1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/751724b1

Branch: refs/heads/master
Commit: 751724b1320d38fd94186df3d8f1ca887f21947a
Parents: 46f6e79
Author: Davies Liu 
Authored: Thu Feb 25 11:53:48 2016 -0800
Committer: Davies Liu 
Committed: Thu Feb 25 11:53:48 2016 -0800

--
 .../spark/examples/ml/DataFrameExample.scala|  2 +-
 .../spark/examples/ml/DecisionTreeExample.scala |  8 ++--
 .../spark/examples/ml/OneVsRestExample.scala|  2 +-
 .../spark/examples/mllib/LDAExample.scala   |  1 -
 .../apache/spark/examples/sql/RDDRelation.scala |  2 +-
 .../spark/examples/sql/hive/HiveFromSpark.scala |  2 +-
 .../scala/org/apache/spark/ml/Predictor.scala   |  6 +--
 .../ml/classification/LogisticRegression.scala  | 13 +++---
 .../spark/ml/clustering/BisectingKMeans.scala   |  4 +-
 .../org/apache/spark/ml/clustering/KMeans.scala |  6 +--
 .../org/apache/spark/ml/clustering/LDA.scala|  1 -
 .../BinaryClassificationEvaluator.scala |  9 +++--
 .../MulticlassClassificationEvaluator.scala |  6 +--
 .../ml/evaluation/RegressionEvaluator.scala |  3 +-
 .../apache/spark/ml/feature/ChiSqSelector.scala |  2 +-
 .../spark/ml/feature/CountVectorizer.scala  |  2 +-
 .../scala/org/apache/spark/ml/feature/IDF.scala |  2 +-
 .../apache/spark/ml/feature/MinMaxScaler.scala  |  2 +-
 .../apache/spark/ml/feature/OneHotEncoder.scala |  2 +-
 .../scala/org/apache/spark/ml/feature/PCA.scala |  2 +-
 .../spark/ml/feature/StandardScaler.scala   |  2 +-
 .../apache/spark/ml/feature/StringIndexer.scala |  1 -
 .../apache/spark/ml/feature/VectorIndexer.scala |  2 +-
 .../org/apache/spark/ml/feature/Word2Vec.scala  |  2 +-
 .../apache/spark/ml/recommendation/ALS.scala|  1 -
 .../ml/regression/AFTSurvivalRegression.scala   |  2 +-
 .../ml/regression/IsotonicRegression.scala  |  6 +--
 .../spark/ml/regression/LinearRegression.scala  | 16 +++-
 .../spark/mllib/api/python/PythonMLLibAPI.scala |  8 ++--
 .../spark/mllib/clustering/KMeansModel.scala|  2 +-
 .../spark/mllib/clustering/LDAModel.scala   |  4 +-
 .../clustering/PowerIterationClustering.scala   |  2 +-
 .../BinaryClassificationMetrics.scala   |  2 +-
 .../mllib/evaluation/MulticlassMetrics.scala|  2 +-
 .../mllib/evaluation/MultilabelMetrics.scala|  4 +-
 .../mllib/evaluation/RegressionMetrics.scala|  2 +-
 .../spark/mllib/feature/ChiSqSelector.scala |  2 +-
 .../org/apache/spark/mllib/fpm/FPGrowth.scala   |  2 +-
 .../MatrixFactorizationModel.scala  | 12 +++---
 .../mllib/tree/model/DecisionTreeModel.scala|  2 +-
 .../mllib/tree/model/treeEnsembleModels.scala   |  2 +-
 .../LogisticRegressionSuite.scala   |  2 +-
 .../MultilayerPerceptronClassifierSuite.scala   |  5 +--
 .../ml/classification/OneVsRestSuite.scala  |  6 +--
 .../ml/clustering/BisectingKMeansSuite.scala|  3 +-
 .../spark/ml/clustering/KMeansSuite.scala   |  3 +-
 .../apache/spark/ml/clustering/LDASuite.scala   |  2 +-
 .../spark/ml/feature/OneHotEncoderSuite.scala   |  4 +-
 .../spark/ml/feature/StringIndexerSuite.scala   |  6 +--
 .../spark/ml/feature/VectorIndexerSuite.scala   |  5 +--
 .../apache/spark/ml/feature/Word2VecSuite.scala |  8 ++--
 .../spark/ml/recommendation/ALSSuite.scala  |  7 ++--
 .../spark/ml/regression/GBTRegressorSuite.scala |  2 +-
 .../ml/regression/IsotonicRegressionSuite.scala |  6 +--
 .../ml/regression/LinearRegressionSuite.scala   | 17 
 .../scala/org/apache/spark/sql/DataFrame.scala  | 42 
 .../org/apache/spark/sql/GroupedData.scala  |  1 -
 .../org/apache/spark/sql/api/r/SQLUtils.scala   |  2 +-
 .../execution/datasources/jdbc/JdbcUtils.scala  |  2 +-
 .../spark/sql/DataFrameAggregateSuite.scala |  2 +-
 .../parquet/ParquetFilterSuite.scala|  2 +-
 .../datasources/parquet/ParquetIOSuite.scala|  2 +-
 .../sql/execution/ui/SQLListenerSuite.scala |  4 +-
 .../spark/sql/hive/HiveSparkSubmitSuite.scala   |  2 +-
 .../sql/hive/execution/HiveQuerySuite.scala |  2 -
 .../spark/sql/hive/orc/OrcQuerySuite.scala  |  5 +--
 .../apache/spark/sql/hive/parquetSuites.scala   |  2 +-
 67 files changed, 159 insertions(+), 140 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/751724b1/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala

spark git commit: Revert "[SPARK-13117][WEB UI] WebUI should use the local ip not 0.0.0.0"

2016-02-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 5fcf4c2bf -> 46f6e7931


Revert "[SPARK-13117][WEB UI] WebUI should use the local ip not 0.0.0.0"

This reverts commit 2e44031fafdb8cf486573b98e4faa6b31ffb90a4.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46f6e793
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46f6e793
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46f6e793

Branch: refs/heads/master
Commit: 46f6e79316b72afea0c9b1559ea662dd3e95e57b
Parents: 5fcf4c2
Author: Shixiong Zhu 
Authored: Thu Feb 25 11:39:26 2016 -0800
Committer: Shixiong Zhu 
Committed: Thu Feb 25 11:39:26 2016 -0800

--
 core/src/main/scala/org/apache/spark/ui/WebUI.scala   | 2 +-
 .../scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala| 3 +--
 2 files changed, 2 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/46f6e793/core/src/main/scala/org/apache/spark/ui/WebUI.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala 
b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index e515916..fe4949b 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -134,7 +134,7 @@ private[spark] abstract class WebUI(
   def bind() {
 assert(!serverInfo.isDefined, "Attempted to bind %s more than 
once!".format(className))
 try {
-  serverInfo = Some(startJettyServer(publicHostName, port, sslOptions, 
handlers, conf, name))
+  serverInfo = Some(startJettyServer("0.0.0.0", port, sslOptions, 
handlers, conf, name))
   logInfo("Started %s at http://%s:%d".format(className, publicHostName, 
boundPort))
 } catch {
   case e: Exception =>

http://git-wip-us.apache.org/repos/asf/spark/blob/46f6e793/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
index 972b552..f416ace 100644
--- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
@@ -26,7 +26,6 @@ import org.apache.spark.{LocalSparkContext, SparkConf, 
SparkContext, SparkFunSui
 import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded}
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.util.SparkConfWithEnv
-import org.apache.spark.util.Utils
 
 class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
 
@@ -54,7 +53,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with 
LocalSparkContext {
   }
 
   test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") {
-val SPARK_PUBLIC_DNS = Utils.localHostNameForURI()
+val SPARK_PUBLIC_DNS = "public_dns"
 val conf = new SparkConfWithEnv(Map("SPARK_PUBLIC_DNS" -> 
SPARK_PUBLIC_DNS)).set(
   "spark.extraListeners", classOf[SaveExecutorInfo].getName)
 sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12316] Wait a minutes to avoid cycle calling.

2016-02-25 Thread tgraves
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 e3802a752 -> 5f7440b25


[SPARK-12316] Wait a minutes to avoid cycle calling.

When application end, AM will clean the staging dir.
But if the driver trigger to update the delegation token, it will can't find 
the right token file and then it will endless cycle call the method 
'updateCredentialsIfRequired'.
Then it lead driver StackOverflowError.
https://issues.apache.org/jira/browse/SPARK-12316

Author: huangzhaowei 

Closes #10475 from SaintBacchus/SPARK-12316.

(cherry picked from commit 5fcf4c2bfce4b7e3543815c8e49ffdec8072c9a2)
Signed-off-by: Tom Graves 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5f7440b2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5f7440b2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5f7440b2

Branch: refs/heads/branch-1.6
Commit: 5f7440b2529a0f6edfed5038756c004acecbce39
Parents: e3802a7
Author: huangzhaowei 
Authored: Thu Feb 25 09:14:19 2016 -0600
Committer: Tom Graves 
Committed: Thu Feb 25 09:14:36 2016 -0600

--
 .../spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala  | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5f7440b2/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
--
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
index 94feb63..6febc70 100644
--- 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
@@ -76,7 +76,10 @@ private[spark] class ExecutorDelegationTokenUpdater(
 SparkHadoopUtil.get.getTimeFromNowToRenewal(
   sparkConf, 0.8, UserGroupInformation.getCurrentUser.getCredentials)
   if (timeFromNowToRenewal <= 0) {
-executorUpdaterRunnable.run()
+// We just checked for new credentials but none were there, wait a 
minute and retry.
+// This handles the shutdown case where the staging directory may have 
been removed(see
+// SPARK-12316 for more details).
+delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, 
TimeUnit.MINUTES)
   } else {
 logInfo(s"Scheduling token refresh from HDFS in $timeFromNowToRenewal 
millis.")
 delegationTokenRenewer.schedule(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12316] Wait a minutes to avoid cycle calling.

2016-02-25 Thread tgraves
Repository: spark
Updated Branches:
  refs/heads/master 157fe64f3 -> 5fcf4c2bf


[SPARK-12316] Wait a minutes to avoid cycle calling.

When application end, AM will clean the staging dir.
But if the driver trigger to update the delegation token, it will can't find 
the right token file and then it will endless cycle call the method 
'updateCredentialsIfRequired'.
Then it lead driver StackOverflowError.
https://issues.apache.org/jira/browse/SPARK-12316

Author: huangzhaowei 

Closes #10475 from SaintBacchus/SPARK-12316.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5fcf4c2b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5fcf4c2b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5fcf4c2b

Branch: refs/heads/master
Commit: 5fcf4c2bfce4b7e3543815c8e49ffdec8072c9a2
Parents: 157fe64
Author: huangzhaowei 
Authored: Thu Feb 25 09:14:19 2016 -0600
Committer: Tom Graves 
Committed: Thu Feb 25 09:14:19 2016 -0600

--
 .../spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala  | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5fcf4c2b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
--
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
index 9d99c0d..6474acc 100644
--- 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
@@ -76,7 +76,10 @@ private[spark] class ExecutorDelegationTokenUpdater(
 SparkHadoopUtil.get.getTimeFromNowToRenewal(
   sparkConf, 0.8, UserGroupInformation.getCurrentUser.getCredentials)
   if (timeFromNowToRenewal <= 0) {
-executorUpdaterRunnable.run()
+// We just checked for new credentials but none were there, wait a 
minute and retry.
+// This handles the shutdown case where the staging directory may have 
been removed(see
+// SPARK-12316 for more details).
+delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, 
TimeUnit.MINUTES)
   } else {
 logInfo(s"Scheduling token refresh from HDFS in $timeFromNowToRenewal 
millis.")
 delegationTokenRenewer.schedule(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-13457][SQL] Removes DataFrame RDD operations

2016-02-25 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 4460113d4 -> 157fe64f3


[SPARK-13457][SQL] Removes DataFrame RDD operations

## What changes were proposed in this pull request?

This PR removes DataFrame RDD operations. Original calls are now replaced by 
calls to methods of `DataFrame.rdd`.

## How was the this patch tested?

No extra tests are added. Existing tests should do the work.

Author: Cheng Lian 

Closes #11323 from liancheng/remove-df-rdd-ops.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/157fe64f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/157fe64f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/157fe64f

Branch: refs/heads/master
Commit: 157fe64f3ecbd13b7286560286e50235eecfe30e
Parents: 4460113
Author: Cheng Lian 
Authored: Thu Feb 25 23:07:59 2016 +0800
Committer: Cheng Lian 
Committed: Thu Feb 25 23:07:59 2016 +0800

--
 .../spark/examples/ml/DataFrameExample.scala|  2 +-
 .../spark/examples/ml/DecisionTreeExample.scala |  8 ++--
 .../spark/examples/ml/OneVsRestExample.scala|  2 +-
 .../spark/examples/mllib/LDAExample.scala   |  1 +
 .../apache/spark/examples/sql/RDDRelation.scala |  2 +-
 .../spark/examples/sql/hive/HiveFromSpark.scala |  2 +-
 .../scala/org/apache/spark/ml/Predictor.scala   |  6 ++-
 .../ml/classification/LogisticRegression.scala  | 13 +++---
 .../spark/ml/clustering/BisectingKMeans.scala   |  4 +-
 .../org/apache/spark/ml/clustering/KMeans.scala |  6 +--
 .../org/apache/spark/ml/clustering/LDA.scala|  1 +
 .../BinaryClassificationEvaluator.scala |  9 ++---
 .../MulticlassClassificationEvaluator.scala |  6 +--
 .../ml/evaluation/RegressionEvaluator.scala |  3 +-
 .../apache/spark/ml/feature/ChiSqSelector.scala |  2 +-
 .../spark/ml/feature/CountVectorizer.scala  |  2 +-
 .../scala/org/apache/spark/ml/feature/IDF.scala |  2 +-
 .../apache/spark/ml/feature/MinMaxScaler.scala  |  2 +-
 .../apache/spark/ml/feature/OneHotEncoder.scala |  2 +-
 .../scala/org/apache/spark/ml/feature/PCA.scala |  2 +-
 .../spark/ml/feature/StandardScaler.scala   |  2 +-
 .../apache/spark/ml/feature/StringIndexer.scala |  1 +
 .../apache/spark/ml/feature/VectorIndexer.scala |  2 +-
 .../org/apache/spark/ml/feature/Word2Vec.scala  |  2 +-
 .../apache/spark/ml/recommendation/ALS.scala|  1 +
 .../ml/regression/AFTSurvivalRegression.scala   |  2 +-
 .../ml/regression/IsotonicRegression.scala  |  6 +--
 .../spark/ml/regression/LinearRegression.scala  | 16 +---
 .../spark/mllib/api/python/PythonMLLibAPI.scala |  8 ++--
 .../spark/mllib/clustering/KMeansModel.scala|  2 +-
 .../spark/mllib/clustering/LDAModel.scala   |  4 +-
 .../clustering/PowerIterationClustering.scala   |  2 +-
 .../BinaryClassificationMetrics.scala   |  2 +-
 .../mllib/evaluation/MulticlassMetrics.scala|  2 +-
 .../mllib/evaluation/MultilabelMetrics.scala|  4 +-
 .../mllib/evaluation/RegressionMetrics.scala|  2 +-
 .../spark/mllib/feature/ChiSqSelector.scala |  2 +-
 .../org/apache/spark/mllib/fpm/FPGrowth.scala   |  2 +-
 .../MatrixFactorizationModel.scala  | 12 +++---
 .../mllib/tree/model/DecisionTreeModel.scala|  2 +-
 .../mllib/tree/model/treeEnsembleModels.scala   |  2 +-
 .../LogisticRegressionSuite.scala   |  2 +-
 .../MultilayerPerceptronClassifierSuite.scala   |  5 ++-
 .../ml/classification/OneVsRestSuite.scala  |  6 +--
 .../ml/clustering/BisectingKMeansSuite.scala|  3 +-
 .../spark/ml/clustering/KMeansSuite.scala   |  3 +-
 .../apache/spark/ml/clustering/LDASuite.scala   |  2 +-
 .../spark/ml/feature/OneHotEncoderSuite.scala   |  4 +-
 .../spark/ml/feature/StringIndexerSuite.scala   |  6 +--
 .../spark/ml/feature/VectorIndexerSuite.scala   |  5 ++-
 .../apache/spark/ml/feature/Word2VecSuite.scala |  8 ++--
 .../spark/ml/recommendation/ALSSuite.scala  |  7 ++--
 .../spark/ml/regression/GBTRegressorSuite.scala |  2 +-
 .../ml/regression/IsotonicRegressionSuite.scala |  6 +--
 .../ml/regression/LinearRegressionSuite.scala   | 17 
 .../scala/org/apache/spark/sql/DataFrame.scala  | 42 
 .../org/apache/spark/sql/GroupedData.scala  |  1 +
 .../org/apache/spark/sql/api/r/SQLUtils.scala   |  2 +-
 .../execution/datasources/jdbc/JdbcUtils.scala  |  2 +-
 .../spark/sql/DataFrameAggregateSuite.scala |  2 +-
 .../parquet/ParquetFilterSuite.scala|  2 +-
 .../datasources/parquet/ParquetIOSuite.scala|  2 +-
 .../sql/execution/ui/SQLListenerSuite.scala |  4 +-
 .../spark/sql/hive/HiveSparkSubmitSuite.scala   |  2 +-
 .../sql/hive/execution/HiveQuerySuite.scala |  2 +
 .../spark/sql/hive/orc/OrcQuerySuite.scala  |  5 ++-
 .../apache/spark/sql/hive/parquetSuites.scala   |  2 +-
 67 files changed, 140 insertions(+), 159 

spark git commit: [SPARK-13490][ML] ML LinearRegression should cache standardization param value

2016-02-25 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master c98a93ded -> 4460113d4


[SPARK-13490][ML] ML LinearRegression should cache standardization param value

## What changes were proposed in this pull request?
Like #11027 for ```LogisticRegression```, ```LinearRegression``` with L1 
regularization should also cache the value of the ```standardization``` rather 
than re-fetching it from the ```ParamMap``` for every OWLQN iteration.
cc srowen

## How was this patch tested?
No extra tests are added. It should pass all existing tests.

Author: Yanbo Liang 

Closes #11367 from yanboliang/spark-13490.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4460113d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4460113d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4460113d

Branch: refs/heads/master
Commit: 4460113d419b5da47ba3c956b8430fd00eb03217
Parents: c98a93d
Author: Yanbo Liang 
Authored: Thu Feb 25 13:34:29 2016 +
Committer: Sean Owen 
Committed: Thu Feb 25 13:34:29 2016 +

--
 .../scala/org/apache/spark/ml/regression/LinearRegression.scala   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4460113d/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala 
b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index e253f25..ccfb5c4 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -277,8 +277,9 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") 
override val uid: String
 val optimizer = if ($(elasticNetParam) == 0.0 || effectiveRegParam == 0.0) 
{
   new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
 } else {
+  val standardizationParam = $(standardization)
   def effectiveL1RegFun = (index: Int) => {
-if ($(standardization)) {
+if (standardizationParam) {
   effectiveL1RegParam
 } else {
   // If `standardization` is false, we still standardize the data


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-13439][MESOS] Document that spark.mesos.uris is comma-separated

2016-02-25 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 1f031635f -> e3802a752


[SPARK-13439][MESOS] Document that spark.mesos.uris is comma-separated

Author: Michael Gummelt 

Closes #11311 from mgummelt/document_csv.

(cherry picked from commit c98a93ded36db5da2f3ebd519aa391de90927688)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e3802a75
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e3802a75
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e3802a75

Branch: refs/heads/branch-1.6
Commit: e3802a7522a83b91c84d0ee6f721a768a485774b
Parents: 1f03163
Author: Michael Gummelt 
Authored: Thu Feb 25 13:32:09 2016 +
Committer: Sean Owen 
Committed: Thu Feb 25 13:32:50 2016 +

--
 docs/running-on-mesos.md | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e3802a75/docs/running-on-mesos.md
--
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index ed720f1..13ad535 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -347,8 +347,9 @@ See the [configuration page](configuration.html) for 
information on Spark config
   spark.mesos.uris
   (none)
   
-A list of URIs to be downloaded to the sandbox when driver or executor is 
launched by Mesos.
-This applies to both coarse-grain and fine-grain mode.
+A comma-separated list of URIs to be downloaded to the sandbox
+when driver or executor is launched by Mesos.  This applies to
+both coarse-grained and fine-grained mode.
   
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-13439][MESOS] Document that spark.mesos.uris is comma-separated

2016-02-25 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master fae88af18 -> c98a93ded


[SPARK-13439][MESOS] Document that spark.mesos.uris is comma-separated

Author: Michael Gummelt 

Closes #11311 from mgummelt/document_csv.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c98a93de
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c98a93de
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c98a93de

Branch: refs/heads/master
Commit: c98a93ded36db5da2f3ebd519aa391de90927688
Parents: fae88af
Author: Michael Gummelt 
Authored: Thu Feb 25 13:32:09 2016 +
Committer: Sean Owen 
Committed: Thu Feb 25 13:32:09 2016 +

--
 docs/running-on-mesos.md | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c98a93de/docs/running-on-mesos.md
--
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index b9f64c7..9816d03 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -349,8 +349,9 @@ See the [configuration page](configuration.html) for 
information on Spark config
   spark.mesos.uris
   (none)
   
-A list of URIs to be downloaded to the sandbox when driver or executor is 
launched by Mesos.
-This applies to both coarse-grain and fine-grain mode.
+A comma-separated list of URIs to be downloaded to the sandbox
+when driver or executor is launched by Mesos.  This applies to
+both coarse-grained and fine-grained mode.
   
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-13441][YARN] Fix NPE in yarn Client.createConfArchive method

2016-02-25 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 cb869a143 -> 1f031635f


[SPARK-13441][YARN] Fix NPE in yarn Client.createConfArchive method

## What changes were proposed in this pull request?

Instead of using result of File.listFiles() directly, which may throw NPE, 
check for null first. If it is null, log a warning instead

## How was the this patch tested?

Ran the ./dev/run-tests locally
Tested manually on a cluster

Author: Terence Yim 

Closes #11337 from chtyim/fixes/SPARK-13441-null-check.

(cherry picked from commit fae88af18445c5a88212b4644e121de4b30ce027)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f031635
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f031635
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f031635

Branch: refs/heads/branch-1.6
Commit: 1f031635ffb4df472ad0d9c00bc82ebb601ebbb5
Parents: cb869a1
Author: Terence Yim 
Authored: Thu Feb 25 13:29:30 2016 +
Committer: Sean Owen 
Committed: Thu Feb 25 13:29:41 2016 +

--
 .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 11 ---
 1 file changed, 8 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1f031635/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
--
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index f0590d2..7631aa3 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -539,9 +539,14 @@ private[spark] class Client(
   sys.env.get(envKey).foreach { path =>
 val dir = new File(path)
 if (dir.isDirectory()) {
-  dir.listFiles().foreach { file =>
-if (file.isFile && !hadoopConfFiles.contains(file.getName())) {
-  hadoopConfFiles(file.getName()) = file
+  val files = dir.listFiles()
+  if (files == null) {
+logWarning("Failed to list files under directory " + dir)
+  } else {
+files.foreach { file =>
+  if (file.isFile && !hadoopConfFiles.contains(file.getName())) {
+hadoopConfFiles(file.getName()) = file
+  }
 }
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-13441][YARN] Fix NPE in yarn Client.createConfArchive method

2016-02-25 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 6f8e835c6 -> fae88af18


[SPARK-13441][YARN] Fix NPE in yarn Client.createConfArchive method

## What changes were proposed in this pull request?

Instead of using result of File.listFiles() directly, which may throw NPE, 
check for null first. If it is null, log a warning instead

## How was the this patch tested?

Ran the ./dev/run-tests locally
Tested manually on a cluster

Author: Terence Yim 

Closes #11337 from chtyim/fixes/SPARK-13441-null-check.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fae88af1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fae88af1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fae88af1

Branch: refs/heads/master
Commit: fae88af18445c5a88212b4644e121de4b30ce027
Parents: 6f8e835
Author: Terence Yim 
Authored: Thu Feb 25 13:29:30 2016 +
Committer: Sean Owen 
Committed: Thu Feb 25 13:29:30 2016 +

--
 .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 11 ---
 1 file changed, 8 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fae88af1/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
--
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index d4ca255..530f1d7 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -537,9 +537,14 @@ private[spark] class Client(
   sys.env.get(envKey).foreach { path =>
 val dir = new File(path)
 if (dir.isDirectory()) {
-  dir.listFiles().foreach { file =>
-if (file.isFile && !hadoopConfFiles.contains(file.getName())) {
-  hadoopConfFiles(file.getName()) = file
+  val files = dir.listFiles()
+  if (files == null) {
+logWarning("Failed to list files under directory " + dir)
+  } else {
+files.foreach { file =>
+  if (file.isFile && !hadoopConfFiles.contains(file.getName())) {
+hadoopConfFiles(file.getName()) = file
+  }
 }
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-13444][MLLIB] QuantileDiscretizer chooses bad splits on large DataFrames

2016-02-25 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 3cc938ac8 -> cb869a143


[SPARK-13444][MLLIB] QuantileDiscretizer chooses bad splits on large DataFrames

Change line 113 of QuantileDiscretizer.scala to

`val requiredSamples = math.max(numBins * numBins, 1.0)`

so that `requiredSamples` is a `Double`.  This will fix the division in line 
114 which currently results in zero if `requiredSamples < dataset.count`

Manual tests.  I was having a problems using QuantileDiscretizer with my a 
dataset and after making this change QuantileDiscretizer behaves as expected.

Author: Oliver Pierson 
Author: Oliver Pierson 

Closes #11319 from oliverpierson/SPARK-13444.

(cherry picked from commit 6f8e835c68dff6fcf97326dc617132a41ff9d043)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb869a14
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb869a14
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb869a14

Branch: refs/heads/branch-1.6
Commit: cb869a143d338985c3d99ef388dd78b1e3d90a73
Parents: 3cc938a
Author: Oliver Pierson 
Authored: Thu Feb 25 13:24:46 2016 +
Committer: Sean Owen 
Committed: Thu Feb 25 13:27:10 2016 +

--
 .../spark/ml/feature/QuantileDiscretizer.scala  | 11 +--
 .../ml/feature/QuantileDiscretizerSuite.scala   | 20 
 2 files changed, 29 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cb869a14/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
index 7bf67c6..cd5085a 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
@@ -97,6 +97,13 @@ final class QuantileDiscretizer(override val uid: String)
 
 @Since("1.6.0")
 object QuantileDiscretizer extends DefaultParamsReadable[QuantileDiscretizer] 
with Logging {
+
+  /**
+   * Minimum number of samples required for finding splits, regardless of 
number of bins.  If
+   * the dataset has fewer rows than this value, the entire dataset will be 
used.
+   */
+  private[spark] val minSamplesRequired: Int = 1
+
   /**
* Sampling from the given dataset to collect quantile statistics.
*/
@@ -104,8 +111,8 @@ object QuantileDiscretizer extends 
DefaultParamsReadable[QuantileDiscretizer] wi
 val totalSamples = dataset.count()
 require(totalSamples > 0,
   "QuantileDiscretizer requires non-empty input dataset but was given an 
empty input.")
-val requiredSamples = math.max(numBins * numBins, 1)
-val fraction = math.min(requiredSamples / dataset.count(), 1.0)
+val requiredSamples = math.max(numBins * numBins, minSamplesRequired)
+val fraction = math.min(requiredSamples.toDouble / dataset.count(), 1.0)
 dataset.sample(withReplacement = false, fraction, new 
XORShiftRandom().nextInt()).collect()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cb869a14/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
index 3a4f6d2..32bfa43 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
@@ -71,6 +71,26 @@ class QuantileDiscretizerSuite
 }
   }
 
+  test("Test splits on dataset larger than minSamplesRequired") {
+val sqlCtx = SQLContext.getOrCreate(sc)
+import sqlCtx.implicits._
+
+val datasetSize = QuantileDiscretizer.minSamplesRequired + 1
+val numBuckets = 5
+val df = sc.parallelize((1.0 to datasetSize by 
1.0).map(Tuple1.apply)).toDF("input")
+val discretizer = new QuantileDiscretizer()
+  .setInputCol("input")
+  .setOutputCol("result")
+  .setNumBuckets(numBuckets)
+  .setSeed(1)
+
+val result = discretizer.fit(df).transform(df)
+val observedNumBuckets = result.select("result").distinct.count
+
+assert(observedNumBuckets === numBuckets,
+  "Observed number of buckets does not equal expected number of buckets.")
+  }
+
   test("read/write") {
 val t = new QuantileDiscretizer()
   .setInputCol("myInputCol")



spark git commit: [SPARK-13444][MLLIB] QuantileDiscretizer chooses bad splits on large DataFrames

2016-02-25 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 3fa6491be -> 6f8e835c6


[SPARK-13444][MLLIB] QuantileDiscretizer chooses bad splits on large DataFrames

## What changes were proposed in this pull request?

Change line 113 of QuantileDiscretizer.scala to

`val requiredSamples = math.max(numBins * numBins, 1.0)`

so that `requiredSamples` is a `Double`.  This will fix the division in line 
114 which currently results in zero if `requiredSamples < dataset.count`

## How was the this patch tested?
Manual tests.  I was having a problems using QuantileDiscretizer with my a 
dataset and after making this change QuantileDiscretizer behaves as expected.

Author: Oliver Pierson 
Author: Oliver Pierson 

Closes #11319 from oliverpierson/SPARK-13444.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f8e835c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f8e835c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f8e835c

Branch: refs/heads/master
Commit: 6f8e835c68dff6fcf97326dc617132a41ff9d043
Parents: 3fa6491
Author: Oliver Pierson 
Authored: Thu Feb 25 13:24:46 2016 +
Committer: Sean Owen 
Committed: Thu Feb 25 13:24:46 2016 +

--
 .../spark/ml/feature/QuantileDiscretizer.scala  | 11 +--
 .../ml/feature/QuantileDiscretizerSuite.scala   | 20 
 2 files changed, 29 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6f8e835c/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
index 1f4cca1..769f440 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
@@ -103,6 +103,13 @@ final class QuantileDiscretizer(override val uid: String)
 
 @Since("1.6.0")
 object QuantileDiscretizer extends DefaultParamsReadable[QuantileDiscretizer] 
with Logging {
+
+  /**
+   * Minimum number of samples required for finding splits, regardless of 
number of bins.  If
+   * the dataset has fewer rows than this value, the entire dataset will be 
used.
+   */
+  private[spark] val minSamplesRequired: Int = 1
+
   /**
* Sampling from the given dataset to collect quantile statistics.
*/
@@ -110,8 +117,8 @@ object QuantileDiscretizer extends 
DefaultParamsReadable[QuantileDiscretizer] wi
 val totalSamples = dataset.count()
 require(totalSamples > 0,
   "QuantileDiscretizer requires non-empty input dataset but was given an 
empty input.")
-val requiredSamples = math.max(numBins * numBins, 1)
-val fraction = math.min(requiredSamples / dataset.count(), 1.0)
+val requiredSamples = math.max(numBins * numBins, minSamplesRequired)
+val fraction = math.min(requiredSamples.toDouble / dataset.count(), 1.0)
 dataset.sample(withReplacement = false, fraction, new 
XORShiftRandom(seed).nextInt()).collect()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6f8e835c/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
index 6a2c601..25fabf6 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
@@ -71,6 +71,26 @@ class QuantileDiscretizerSuite
 }
   }
 
+  test("Test splits on dataset larger than minSamplesRequired") {
+val sqlCtx = SQLContext.getOrCreate(sc)
+import sqlCtx.implicits._
+
+val datasetSize = QuantileDiscretizer.minSamplesRequired + 1
+val numBuckets = 5
+val df = sc.parallelize((1.0 to datasetSize by 
1.0).map(Tuple1.apply)).toDF("input")
+val discretizer = new QuantileDiscretizer()
+  .setInputCol("input")
+  .setOutputCol("result")
+  .setNumBuckets(numBuckets)
+  .setSeed(1)
+
+val result = discretizer.fit(df).transform(df)
+val observedNumBuckets = result.select("result").distinct.count
+
+assert(observedNumBuckets === numBuckets,
+  "Observed number of buckets does not equal expected number of buckets.")
+  }
+
   test("read/write") {
 val t = new QuantileDiscretizer()
   .setInputCol("myInputCol")


-
To unsubscribe, 

spark git commit: [SPARK-13473][SQL] Don't push predicate through project with nondeterministic field(s)

2016-02-25 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 d2c1c67cf -> 0e920411f


[SPARK-13473][SQL] Don't push predicate through project with nondeterministic 
field(s)

## What changes were proposed in this pull request?

Predicates shouldn't be pushed through project with nondeterministic field(s).

See https://github.com/graphframes/graphframes/pull/23 and SPARK-13473 for more 
details.

This PR targets master, branch-1.6, and branch-1.5.

## How was this patch tested?

A test case is added in `FilterPushdownSuite`. It constructs a query plan where 
a filter is over a project with a nondeterministic field. Optimized query plan 
shouldn't change in this case.

Author: Cheng Lian 

Closes #11348 from 
liancheng/spark-13473-no-ppd-through-nondeterministic-project-field.

(cherry picked from commit 3fa6491be66dad690ca5329dd32e7c82037ae8c1)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e920411
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e920411
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e920411

Branch: refs/heads/branch-1.5
Commit: 0e920411f69178b0c1aac80797c4af067a2ba0c6
Parents: d2c1c67
Author: Cheng Lian 
Authored: Thu Feb 25 20:43:03 2016 +0800
Committer: Wenchen Fan 
Committed: Thu Feb 25 20:46:32 2016 +0800

--
 .../sql/catalyst/optimizer/Optimizer.scala  |  9 ++-
 .../optimizer/FilterPushdownSuite.scala | 27 +++-
 2 files changed, 11 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0e920411/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d2b5c4f..c51de49 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -586,7 +586,14 @@ object SimplifyFilters extends Rule[LogicalPlan] {
  */
 object PushPredicateThroughProject extends Rule[LogicalPlan] with 
PredicateHelper {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-case filter @ Filter(condition, project @ Project(fields, grandChild)) =>
+// SPARK-13473: We can't push the predicate down when the underlying 
projection output non-
+// deterministic field(s).  Non-deterministic expressions are essentially 
stateful. This
+// implies that, for a given input row, the output are determined by the 
expression's initial
+// state and all the input rows processed before. In another word, the 
order of input rows
+// matters for non-deterministic expressions, while pushing down 
predicates changes the order.
+case filter @ Filter(condition, project @ Project(fields, grandChild))
+  if fields.forall(_.deterministic) =>
+
   // Create a map of Aliases to their values from the child projection.
   // e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b).
   val aliasMap = AttributeMap(fields.collect {

http://git-wip-us.apache.org/repos/asf/spark/blob/0e920411/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 0f1fde2..e6d651b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -146,7 +146,7 @@ class FilterPushdownSuite extends PlanTest {
 comparePlans(optimized, correctAnswer)
   }
 
-  test("nondeterministic: can't push down filter through project") {
+  test("nondeterministic: can't push down filter with nondeterministic 
condition through project") {
 val originalQuery = testRelation
   .select(Rand(10).as('rand), 'a)
   .where('rand > 5 || 'a > 5)
@@ -157,36 +157,15 @@ class FilterPushdownSuite extends PlanTest {
 comparePlans(optimized, originalQuery)
   }
 
-  test("nondeterministic: push down part of filter through project") {
+  test("nondeterministic: can't push down filter through project with 
nondeterministic field") {
 val originalQuery = testRelation
   .select(Rand(10).as('rand), 'a)
-  .where('rand > 5 && 'a > 5)
-  

spark git commit: [SPARK-13473][SQL] Don't push predicate through project with nondeterministic field(s)

2016-02-25 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 897599601 -> 3cc938ac8


[SPARK-13473][SQL] Don't push predicate through project with nondeterministic 
field(s)

## What changes were proposed in this pull request?

Predicates shouldn't be pushed through project with nondeterministic field(s).

See https://github.com/graphframes/graphframes/pull/23 and SPARK-13473 for more 
details.

This PR targets master, branch-1.6, and branch-1.5.

## How was this patch tested?

A test case is added in `FilterPushdownSuite`. It constructs a query plan where 
a filter is over a project with a nondeterministic field. Optimized query plan 
shouldn't change in this case.

Author: Cheng Lian 

Closes #11348 from 
liancheng/spark-13473-no-ppd-through-nondeterministic-project-field.

(cherry picked from commit 3fa6491be66dad690ca5329dd32e7c82037ae8c1)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cc938ac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cc938ac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cc938ac

Branch: refs/heads/branch-1.6
Commit: 3cc938ac8124b8445f171baa365fa44a47962cc9
Parents: 8975996
Author: Cheng Lian 
Authored: Thu Feb 25 20:43:03 2016 +0800
Committer: Wenchen Fan 
Committed: Thu Feb 25 20:45:18 2016 +0800

--
 .../sql/catalyst/optimizer/Optimizer.scala  |  9 ++-
 .../optimizer/FilterPushdownSuite.scala | 27 +++-
 2 files changed, 11 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3cc938ac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 06d14fc..682b860 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -618,7 +618,14 @@ object SimplifyFilters extends Rule[LogicalPlan] {
  */
 object PushPredicateThroughProject extends Rule[LogicalPlan] with 
PredicateHelper {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-case filter @ Filter(condition, project @ Project(fields, grandChild)) =>
+// SPARK-13473: We can't push the predicate down when the underlying 
projection output non-
+// deterministic field(s).  Non-deterministic expressions are essentially 
stateful. This
+// implies that, for a given input row, the output are determined by the 
expression's initial
+// state and all the input rows processed before. In another word, the 
order of input rows
+// matters for non-deterministic expressions, while pushing down 
predicates changes the order.
+case filter @ Filter(condition, project @ Project(fields, grandChild))
+  if fields.forall(_.deterministic) =>
+
   // Create a map of Aliases to their values from the child projection.
   // e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b).
   val aliasMap = AttributeMap(fields.collect {

http://git-wip-us.apache.org/repos/asf/spark/blob/3cc938ac/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index fba4c5c..6978807 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -147,7 +147,7 @@ class FilterPushdownSuite extends PlanTest {
 comparePlans(optimized, correctAnswer)
   }
 
-  test("nondeterministic: can't push down filter through project") {
+  test("nondeterministic: can't push down filter with nondeterministic 
condition through project") {
 val originalQuery = testRelation
   .select(Rand(10).as('rand), 'a)
   .where('rand > 5 || 'a > 5)
@@ -158,36 +158,15 @@ class FilterPushdownSuite extends PlanTest {
 comparePlans(optimized, originalQuery)
   }
 
-  test("nondeterministic: push down part of filter through project") {
+  test("nondeterministic: can't push down filter through project with 
nondeterministic field") {
 val originalQuery = testRelation
   .select(Rand(10).as('rand), 'a)
-  .where('rand > 5 && 'a > 5)
-  

spark git commit: [SPARK-13473][SQL] Don't push predicate through project with nondeterministic field(s)

2016-02-25 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 2e44031fa -> 3fa6491be


[SPARK-13473][SQL] Don't push predicate through project with nondeterministic 
field(s)

## What changes were proposed in this pull request?

Predicates shouldn't be pushed through project with nondeterministic field(s).

See https://github.com/graphframes/graphframes/pull/23 and SPARK-13473 for more 
details.

This PR targets master, branch-1.6, and branch-1.5.

## How was this patch tested?

A test case is added in `FilterPushdownSuite`. It constructs a query plan where 
a filter is over a project with a nondeterministic field. Optimized query plan 
shouldn't change in this case.

Author: Cheng Lian 

Closes #11348 from 
liancheng/spark-13473-no-ppd-through-nondeterministic-project-field.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3fa6491b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3fa6491b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3fa6491b

Branch: refs/heads/master
Commit: 3fa6491be66dad690ca5329dd32e7c82037ae8c1
Parents: 2e44031
Author: Cheng Lian 
Authored: Thu Feb 25 20:43:03 2016 +0800
Committer: Wenchen Fan 
Committed: Thu Feb 25 20:43:03 2016 +0800

--
 .../sql/catalyst/optimizer/Optimizer.scala  |  9 ++-
 .../optimizer/FilterPushdownSuite.scala | 27 +++-
 2 files changed, 11 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3fa6491b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 2b80497..2aeb957 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -792,7 +792,14 @@ object SimplifyFilters extends Rule[LogicalPlan] {
  */
 object PushPredicateThroughProject extends Rule[LogicalPlan] with 
PredicateHelper {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-case filter @ Filter(condition, project @ Project(fields, grandChild)) =>
+// SPARK-13473: We can't push the predicate down when the underlying 
projection output non-
+// deterministic field(s).  Non-deterministic expressions are essentially 
stateful. This
+// implies that, for a given input row, the output are determined by the 
expression's initial
+// state and all the input rows processed before. In another word, the 
order of input rows
+// matters for non-deterministic expressions, while pushing down 
predicates changes the order.
+case filter @ Filter(condition, project @ Project(fields, grandChild))
+  if fields.forall(_.deterministic) =>
+
   // Create a map of Aliases to their values from the child projection.
   // e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b).
   val aliasMap = AttributeMap(fields.collect {

http://git-wip-us.apache.org/repos/asf/spark/blob/3fa6491b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 7d60862..1292aa0 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -98,7 +98,7 @@ class FilterPushdownSuite extends PlanTest {
 comparePlans(optimized, correctAnswer)
   }
 
-  test("nondeterministic: can't push down filter through project") {
+  test("nondeterministic: can't push down filter with nondeterministic 
condition through project") {
 val originalQuery = testRelation
   .select(Rand(10).as('rand), 'a)
   .where('rand > 5 || 'a > 5)
@@ -109,36 +109,15 @@ class FilterPushdownSuite extends PlanTest {
 comparePlans(optimized, originalQuery)
   }
 
-  test("nondeterministic: push down part of filter through project") {
+  test("nondeterministic: can't push down filter through project with 
nondeterministic field") {
 val originalQuery = testRelation
   .select(Rand(10).as('rand), 'a)
-  .where('rand > 5 && 'a > 5)
-  .analyze
-
-val optimized = Optimize.execute(originalQuery)
-
-val correctAnswer = testRelation
   .where('a > 5)
-  

spark git commit: [SPARK-13117][WEB UI] WebUI should use the local ip not 0.0.0.0

2016-02-25 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 2b2c8c332 -> 2e44031fa


[SPARK-13117][WEB UI] WebUI should use the local ip not 0.0.0.0

Fixed the HTTP Server Host Name/IP issue i.e. HTTP Server to take the
configured host name/IP and not '0.0.0.0' always.

Author: Devaraj K 

Closes #11133 from devaraj-kavali/SPARK-13117.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2e44031f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2e44031f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2e44031f

Branch: refs/heads/master
Commit: 2e44031fafdb8cf486573b98e4faa6b31ffb90a4
Parents: 2b2c8c3
Author: Devaraj K 
Authored: Thu Feb 25 12:18:43 2016 +
Committer: Sean Owen 
Committed: Thu Feb 25 12:18:43 2016 +

--
 core/src/main/scala/org/apache/spark/ui/WebUI.scala   | 2 +-
 .../scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala| 3 ++-
 2 files changed, 3 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2e44031f/core/src/main/scala/org/apache/spark/ui/WebUI.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala 
b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index fe4949b..e515916 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -134,7 +134,7 @@ private[spark] abstract class WebUI(
   def bind() {
 assert(!serverInfo.isDefined, "Attempted to bind %s more than 
once!".format(className))
 try {
-  serverInfo = Some(startJettyServer("0.0.0.0", port, sslOptions, 
handlers, conf, name))
+  serverInfo = Some(startJettyServer(publicHostName, port, sslOptions, 
handlers, conf, name))
   logInfo("Started %s at http://%s:%d".format(className, publicHostName, 
boundPort))
 } catch {
   case e: Exception =>

http://git-wip-us.apache.org/repos/asf/spark/blob/2e44031f/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
index f416ace..972b552 100644
--- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.{LocalSparkContext, SparkConf, 
SparkContext, SparkFunSui
 import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded}
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.util.SparkConfWithEnv
+import org.apache.spark.util.Utils
 
 class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
 
@@ -53,7 +54,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with 
LocalSparkContext {
   }
 
   test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") {
-val SPARK_PUBLIC_DNS = "public_dns"
+val SPARK_PUBLIC_DNS = Utils.localHostNameForURI()
 val conf = new SparkConfWithEnv(Map("SPARK_PUBLIC_DNS" -> 
SPARK_PUBLIC_DNS)).set(
   "spark.extraListeners", classOf[SaveExecutorInfo].getName)
 sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[3/3] spark git commit: [SPARK-13486][SQL] Move SQLConf into an internal package

2016-02-25 Thread lian
[SPARK-13486][SQL] Move SQLConf into an internal package

## What changes were proposed in this pull request?
This patch moves SQLConf into org.apache.spark.sql.internal package to make it 
very explicit that it is internal. Soon I will also submit more API work that 
creates implementations of interfaces in this internal package.

## How was this patch tested?
If it compiles, then the refactoring should work.

Author: Reynold Xin 

Closes #11363 from rxin/SPARK-13486.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b2c8c33
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b2c8c33
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b2c8c33

Branch: refs/heads/master
Commit: 2b2c8c33236677c916541f956f7b94bba014a9ce
Parents: 07f92ef
Author: Reynold Xin 
Authored: Thu Feb 25 17:49:50 2016 +0800
Committer: Cheng Lian 
Committed: Thu Feb 25 17:49:50 2016 +0800

--
 project/MimaExcludes.scala  |   6 +
 .../scala/org/apache/spark/sql/DataFrame.scala  |   1 +
 .../org/apache/spark/sql/GroupedData.scala  |   1 +
 .../scala/org/apache/spark/sql/SQLConf.scala| 730 ---
 .../scala/org/apache/spark/sql/SQLContext.scala |   3 +-
 .../spark/sql/execution/ExistingRDD.scala   |   3 +-
 .../spark/sql/execution/SparkStrategies.scala   |   3 +-
 .../apache/spark/sql/execution/commands.scala   |   3 +-
 .../InsertIntoHadoopFsRelation.scala|   1 +
 .../execution/datasources/SqlNewHadoopRDD.scala |   3 +-
 .../execution/datasources/WriterContainer.scala |   1 +
 .../parquet/CatalystSchemaConverter.scala   |   4 +-
 .../parquet/CatalystWriteSupport.scala  |   2 +-
 .../datasources/parquet/ParquetRelation.scala   |   1 +
 .../spark/sql/execution/debug/package.scala |   1 +
 .../execution/local/BinaryHashJoinNode.scala|   2 +-
 .../execution/local/BroadcastHashJoinNode.scala |   2 +-
 .../sql/execution/local/ConvertToSafeNode.scala |   2 +-
 .../execution/local/ConvertToUnsafeNode.scala   |   2 +-
 .../spark/sql/execution/local/ExpandNode.scala  |   2 +-
 .../spark/sql/execution/local/FilterNode.scala  |   2 +-
 .../sql/execution/local/IntersectNode.scala |   4 +-
 .../spark/sql/execution/local/LimitNode.scala   |   2 +-
 .../spark/sql/execution/local/LocalNode.scala   |   3 +-
 .../execution/local/NestedLoopJoinNode.scala|   2 +-
 .../spark/sql/execution/local/ProjectNode.scala |   2 +-
 .../spark/sql/execution/local/SampleNode.scala  |   2 +-
 .../spark/sql/execution/local/SeqScanNode.scala |   2 +-
 .../local/TakeOrderedAndProjectNode.scala   |   2 +-
 .../spark/sql/execution/local/UnionNode.scala   |   2 +-
 .../org/apache/spark/sql/internal/SQLConf.scala | 730 +++
 .../apache/spark/sql/internal/package-info.java |  22 +
 .../org/apache/spark/sql/internal/package.scala |  24 +
 .../spark/sql/DataFrameAggregateSuite.scala |   1 +
 .../apache/spark/sql/DataFramePivotSuite.scala  |   1 +
 .../org/apache/spark/sql/DataFrameSuite.scala   |   1 +
 .../scala/org/apache/spark/sql/JoinSuite.scala  |   1 +
 .../spark/sql/MultiSQLContextsSuite.scala   |   1 +
 .../scala/org/apache/spark/sql/QueryTest.scala  |   1 +
 .../apache/spark/sql/SQLConfEntrySuite.scala| 150 
 .../org/apache/spark/sql/SQLConfSuite.scala | 132 
 .../org/apache/spark/sql/SQLContextSuite.scala  |   1 +
 .../org/apache/spark/sql/SQLQuerySuite.scala|   1 +
 .../execution/ExchangeCoordinatorSuite.scala|   1 +
 .../spark/sql/execution/PlannerSuite.scala  |   3 +-
 .../columnar/PartitionBatchPruningSuite.scala   |   1 +
 .../execution/datasources/json/JsonSuite.scala  |   1 +
 .../parquet/ParquetFilterSuite.scala|   1 +
 .../datasources/parquet/ParquetIOSuite.scala|   1 +
 .../ParquetPartitionDiscoverySuite.scala|   1 +
 .../datasources/parquet/ParquetQuerySuite.scala |   1 +
 .../parquet/ParquetReadBenchmark.scala  |   3 +-
 .../datasources/parquet/ParquetTest.scala   |   3 +-
 .../sql/execution/joins/InnerJoinSuite.scala|   3 +-
 .../sql/execution/joins/OuterJoinSuite.scala|   3 +-
 .../sql/execution/joins/SemiJoinSuite.scala |   3 +-
 .../spark/sql/execution/local/DummyNode.scala   |   2 +-
 .../sql/execution/local/HashJoinNodeSuite.scala |   2 +-
 .../sql/execution/local/LocalNodeTest.scala |   2 +-
 .../local/NestedLoopJoinNodeSuite.scala |   2 +-
 .../sql/execution/metric/SQLMetricsSuite.scala  |   1 +
 .../spark/sql/internal/SQLConfEntrySuite.scala  | 150 
 .../spark/sql/internal/SQLConfSuite.scala   | 133 
 .../spark/sql/sources/DataSourceTest.scala  |   1 +
 .../spark/sql/sources/FilteredScanSuite.scala   |   1 +
 .../spark/sql/sources/PrunedScanSuite.scala |   1 +
 .../spark/sql/sources/SaveLoadSuite.scala   |   3 +-
 

[1/3] spark git commit: [SPARK-13486][SQL] Move SQLConf into an internal package

2016-02-25 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 07f92ef1f -> 2b2c8c332


http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
index c89a151..28ad7ae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -18,8 +18,8 @@
 package org.apache.spark.sql.test
 
 import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.sql.{SQLConf, SQLContext}
-
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.internal.SQLConf
 
 /**
  * A special [[SQLContext]] prepared for testing.
@@ -39,7 +39,7 @@ private[sql] class TestSQLContext(sc: SparkContext) extends 
SQLContext(sc) { sel
   super.clear()
 
   // Make sure we start with the default test configs even after clear
-  TestSQLContext.overrideConfs.map {
+  TestSQLContext.overrideConfs.foreach {
 case (key, value) => setConfString(key, value)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
--
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index 66eaa3e..f32ba5f 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -32,10 +32,10 @@ import org.apache.hive.service.server.{HiveServer2, 
HiveServerServerOptionsProce
 import org.apache.spark.{Logging, SparkContext}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, 
SparkListenerJobStart}
-import org.apache.spark.sql.SQLConf
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
 import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.{ShutdownHookManager, Utils}
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
--
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 8fef22c..458d4f2 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -33,9 +33,10 @@ import 
org.apache.hive.service.cli.operation.ExecuteStatementOperation
 import org.apache.hive.service.cli.session.HiveSession
 
 import org.apache.spark.Logging
-import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLConf}
+import org.apache.spark.sql.{DataFrame, Row => SparkRow}
 import org.apache.spark.sql.execution.SetCommand
 import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.util.{Utils => SparkUtils}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
--
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 5f9952a..c05527b 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -202,7 +202,7 @@ class HiveThriftBinaryServerSuite extends 
HiveThriftJdbcTest {
   }
 
   test("test multiple session") {
-import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.internal.SQLConf
 var defaultV1: String = null
 var defaultV2: String = null
 var data: ArrayBuffer[Int] = null


spark git commit: [SPARK-13376] [SPARK-13476] [SQL] improve column pruning

2016-02-25 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 264533b55 -> 07f92ef1f


[SPARK-13376] [SPARK-13476] [SQL] improve column pruning

## What changes were proposed in this pull request?

This PR mostly rewrite the ColumnPruning rule to support most of the SQL 
logical plans (except those for Dataset).

This PR also fix a bug in Generate, it should always output UnsafeRow, added an 
regression test for that.

## How was this patch tested?

This is test by unit tests, also manually test with TPCDS Q78, which could 
prune all unused columns successfully, improved the performance by 78% (from 
22s to 12s).

Author: Davies Liu 

Closes #11354 from davies/fix_column_pruning.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07f92ef1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07f92ef1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07f92ef1

Branch: refs/heads/master
Commit: 07f92ef1fa090821bef9c60689bf41909d781ee7
Parents: 264533b
Author: Davies Liu 
Authored: Thu Feb 25 00:13:07 2016 -0800
Committer: Davies Liu 
Committed: Thu Feb 25 00:13:07 2016 -0800

--
 .../sql/catalyst/optimizer/Optimizer.scala  | 128 +--
 .../catalyst/optimizer/ColumnPruningSuite.scala | 128 ++-
 .../optimizer/FilterPushdownSuite.scala |  80 
 .../optimizer/JoinOptimizationSuite.scala   |   2 +-
 .../apache/spark/sql/execution/Generate.scala   |  28 ++--
 .../columnar/InMemoryColumnarTableScan.scala|   7 +-
 .../org/apache/spark/sql/DataFrameSuite.scala   |   8 ++
 7 files changed, 215 insertions(+), 166 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/07f92ef1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 1f05f20..2b80497 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -313,97 +313,85 @@ object SetOperationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
  */
 object ColumnPruning extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-case a @ Aggregate(_, _, e @ Expand(projects, output, child))
-  if (e.outputSet -- a.references).nonEmpty =>
-  val newOutput = output.filter(a.references.contains(_))
-  val newProjects = projects.map { proj =>
-proj.zip(output).filter { case (e, a) =>
+// Prunes the unused columns from project list of 
Project/Aggregate/Window/Expand
+case p @ Project(_, p2: Project) if (p2.outputSet -- 
p.references).nonEmpty =>
+  p.copy(child = p2.copy(projectList = 
p2.projectList.filter(p.references.contains)))
+case p @ Project(_, a: Aggregate) if (a.outputSet -- 
p.references).nonEmpty =>
+  p.copy(
+child = a.copy(aggregateExpressions = 
a.aggregateExpressions.filter(p.references.contains)))
+case p @ Project(_, w: Window) if (w.outputSet -- p.references).nonEmpty =>
+  p.copy(child = w.copy(
+projectList = w.projectList.filter(p.references.contains),
+windowExpressions = w.windowExpressions.filter(p.references.contains)))
+case a @ Project(_, e @ Expand(_, _, grandChild)) if (e.outputSet -- 
a.references).nonEmpty =>
+  val newOutput = e.output.filter(a.references.contains(_))
+  val newProjects = e.projections.map { proj =>
+proj.zip(e.output).filter { case (e, a) =>
   newOutput.contains(a)
 }.unzip._1
   }
-  a.copy(child = Expand(newProjects, newOutput, child))
+  a.copy(child = Expand(newProjects, newOutput, grandChild))
+// TODO: support some logical plan for Dataset
 
-case a @ Aggregate(_, _, e @ Expand(_, _, child))
-  if (child.outputSet -- e.references -- a.references).nonEmpty =>
-  a.copy(child = e.copy(child = prunedChild(child, e.references ++ 
a.references)))
-
-// Eliminate attributes that are not needed to calculate the specified 
aggregates.
+// Prunes the unused columns from child of Aggregate/Window/Expand/Generate
 case a @ Aggregate(_, _, child) if (child.outputSet -- 
a.references).nonEmpty =>
-  a.copy(child = Project(a.references.toSeq, child))
-
-// Eliminate attributes that are not needed to calculate the Generate.
+  a.copy(child = prunedChild(child, a.references))
+case w @ Window(_, _, _, _, child) if (child.outputSet -- 
w.references).nonEmpty =>
+