spark git commit: [SPARK-17717][SQL] Add exist/find methods to Catalog.

2016-09-29 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 2f7395670 -> 74ac1c438


[SPARK-17717][SQL] Add exist/find methods to Catalog.

## What changes were proposed in this pull request?
The current user facing catalog does not implement methods for checking object 
existence or finding objects. You could theoretically do this using the `list*` 
commands, but this is rather cumbersome and can actually be costly when there 
are many objects. This PR adds `exists*` and `find*` methods for Databases, 
Table and Functions.

## How was this patch tested?
Added tests to `org.apache.spark.sql.internal.CatalogSuite`

Author: Herman van Hovell 

Closes #15301 from hvanhovell/SPARK-17717.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/74ac1c43
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/74ac1c43
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/74ac1c43

Branch: refs/heads/master
Commit: 74ac1c43817c0b8da70342e540ec7638dd7d01bd
Parents: 2f73956
Author: Herman van Hovell 
Authored: Thu Sep 29 17:56:32 2016 -0700
Committer: Reynold Xin 
Committed: Thu Sep 29 17:56:32 2016 -0700

--
 project/MimaExcludes.scala  |  11 +-
 .../org/apache/spark/sql/catalog/Catalog.scala  |  83 ++
 .../apache/spark/sql/internal/CatalogImpl.scala | 152 ---
 .../spark/sql/internal/CatalogSuite.scala   | 118 ++
 4 files changed, 339 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/74ac1c43/project/MimaExcludes.scala
--
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 4db3edb..2ffe0ac 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -46,7 +46,16 @@ object MimaExcludes {
   // [SPARK-16967] Move Mesos to Module
   
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkMasterRegex.MESOS_REGEX"),
   // [SPARK-16240] ML persistence backward compatibility for LDA
-  
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$")
+  
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$"),
+  // [SPARK-17717] Add Find and Exists method to Catalog.
+  
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findDatabase"),
+  
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findTable"),
+  
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findFunction"),
+  
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findColumn"),
+  
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.databaseExists"),
+  
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.tableExists"),
+  
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.functionExists"),
+  
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.columnExists")
 )
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/74ac1c43/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 1aed245..b439022 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -102,6 +102,89 @@ abstract class Catalog {
   def listColumns(dbName: String, tableName: String): Dataset[Column]
 
   /**
+   * Find the database with the specified name. This throws an 
AnalysisException when the database
+   * cannot be found.
+   *
+   * @since 2.1.0
+   */
+  @throws[AnalysisException]("database does not exist")
+  def findDatabase(dbName: String): Database
+
+  /**
+   * Find the table with the specified name. This table can be a temporary 
table or a table in the
+   * current database. This throws an AnalysisException when the table cannot 
be found.
+   *
+   * @since 2.1.0
+   */
+  @throws[AnalysisException]("table does not exist")
+  def findTable(tableName: String): Table
+
+  /**
+   * Find the table with the specified name in the specified database. This 
throws an
+   * AnalysisException when the table cannot be found.
+   *
+   * @since 2.1.0
+   */
+  @throws[AnalysisException]("database or table does not exist")
+  def findTable(dbName: 

spark git commit: Updated the following PR with minor changes to allow cherry-pick to branch-2.0

2016-09-29 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 0cdd7370a -> a99ea4c9e


Updated the following PR with minor changes to allow cherry-pick to branch-2.0

[SPARK-17697][ML] Fixed bug in summary calculations that pattern match against 
label without casting

In calling LogisticRegression.evaluate and GeneralizedLinearRegression.evaluate 
using a Dataset where the Label is not of a double type, calculations pattern 
match against a double and throw a MatchError.  This fix casts the Label column 
to a DoubleType to ensure there is no MatchError.

Added unit tests to call evaluate with a dataset that has Label as other 
numeric types.

Author: Bryan Cutler 

Closes #15288 from BryanCutler/binaryLOR-numericCheck-SPARK-17697.

(cherry picked from commit 2f739567080d804a942cfcca0e22f91ab7cbea36)
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a99ea4c9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a99ea4c9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a99ea4c9

Branch: refs/heads/branch-2.0
Commit: a99ea4c9e0e2f91e4b524987788f0acee88e564d
Parents: 0cdd737
Author: Bryan Cutler 
Authored: Thu Sep 29 16:31:30 2016 -0700
Committer: Joseph K. Bradley 
Committed: Thu Sep 29 16:56:34 2016 -0700

--
 .../ml/classification/LogisticRegression.scala  |  2 +-
 .../GeneralizedLinearRegression.scala   | 11 +
 .../LogisticRegressionSuite.scala   | 18 +-
 .../GeneralizedLinearRegressionSuite.scala  | 25 
 4 files changed, 49 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a99ea4c9/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index cca3374..c50ee5d 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -860,7 +860,7 @@ class BinaryLogisticRegressionSummary 
private[classification] (
   // TODO: Allow the user to vary the number of bins using a setBins method in
   // BinaryClassificationMetrics. For now the default is set to 100.
   @transient private val binaryMetrics = new BinaryClassificationMetrics(
-predictions.select(probabilityCol, labelCol).rdd.map {
+predictions.select(col(probabilityCol), 
col(labelCol).cast(DoubleType)).rdd.map {
   case Row(score: Vector, label: Double) => (score(1), label)
 }, 100
   )

http://git-wip-us.apache.org/repos/asf/spark/blob/a99ea4c9/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
index 2bdc09e..7f88c12 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
@@ -988,7 +988,7 @@ class GeneralizedLinearRegressionSummary 
private[regression] (
 } else {
   link.unlink(0.0)
 }
-predictions.select(col(model.getLabelCol), w).rdd.map {
+predictions.select(col(model.getLabelCol).cast(DoubleType), w).rdd.map {
   case Row(y: Double, weight: Double) =>
 family.deviance(y, wtdmu, weight)
 }.sum()
@@ -1000,7 +1000,7 @@ class GeneralizedLinearRegressionSummary 
private[regression] (
   @Since("2.0.0")
   lazy val deviance: Double = {
 val w = weightCol
-predictions.select(col(model.getLabelCol), col(predictionCol), w).rdd.map {
+predictions.select(col(model.getLabelCol).cast(DoubleType), 
col(predictionCol), w).rdd.map {
   case Row(label: Double, pred: Double, weight: Double) =>
 family.deviance(label, pred, weight)
 }.sum()
@@ -1026,9 +1026,10 @@ class GeneralizedLinearRegressionSummary 
private[regression] (
   lazy val aic: Double = {
 val w = weightCol
 val weightSum = predictions.select(w).agg(sum(w)).first().getDouble(0)
-val t = predictions.select(col(model.getLabelCol), col(predictionCol), 
w).rdd.map {
-  case Row(label: Double, pred: Double, weight: Double) =>
-(label, pred, weight)
+val t = predictions.select(
+  col(model.getLabelCol).cast(DoubleType), col(predictionCol), w).rdd.map {
+

spark git commit: [SPARK-17697][ML] Fixed bug in summary calculations that pattern match against label without casting

2016-09-29 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 39eb3bb1e -> 2f7395670


[SPARK-17697][ML] Fixed bug in summary calculations that pattern match against 
label without casting

## What changes were proposed in this pull request?
In calling LogisticRegression.evaluate and GeneralizedLinearRegression.evaluate 
using a Dataset where the Label is not of a double type, calculations pattern 
match against a double and throw a MatchError.  This fix casts the Label column 
to a DoubleType to ensure there is no MatchError.

## How was this patch tested?
Added unit tests to call evaluate with a dataset that has Label as other 
numeric types.

Author: Bryan Cutler 

Closes #15288 from BryanCutler/binaryLOR-numericCheck-SPARK-17697.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f739567
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f739567
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f739567

Branch: refs/heads/master
Commit: 2f739567080d804a942cfcca0e22f91ab7cbea36
Parents: 39eb3bb
Author: Bryan Cutler 
Authored: Thu Sep 29 16:31:30 2016 -0700
Committer: Joseph K. Bradley 
Committed: Thu Sep 29 16:31:30 2016 -0700

--
 .../ml/classification/LogisticRegression.scala  |  2 +-
 .../GeneralizedLinearRegression.scala   | 11 +
 .../LogisticRegressionSuite.scala   | 18 +-
 .../GeneralizedLinearRegressionSuite.scala  | 25 
 4 files changed, 49 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2f739567/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index 5ab63d1..329961a 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -1169,7 +1169,7 @@ class BinaryLogisticRegressionSummary 
private[classification] (
   // TODO: Allow the user to vary the number of bins using a setBins method in
   // BinaryClassificationMetrics. For now the default is set to 100.
   @transient private val binaryMetrics = new BinaryClassificationMetrics(
-predictions.select(probabilityCol, labelCol).rdd.map {
+predictions.select(col(probabilityCol), 
col(labelCol).cast(DoubleType)).rdd.map {
   case Row(score: Vector, label: Double) => (score(1), label)
 }, 100
   )

http://git-wip-us.apache.org/repos/asf/spark/blob/2f739567/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
index 02b27fb..bb9e150 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
@@ -992,7 +992,7 @@ class GeneralizedLinearRegressionSummary 
private[regression] (
 } else {
   link.unlink(0.0)
 }
-predictions.select(col(model.getLabelCol), w).rdd.map {
+predictions.select(col(model.getLabelCol).cast(DoubleType), w).rdd.map {
   case Row(y: Double, weight: Double) =>
 family.deviance(y, wtdmu, weight)
 }.sum()
@@ -1004,7 +1004,7 @@ class GeneralizedLinearRegressionSummary 
private[regression] (
   @Since("2.0.0")
   lazy val deviance: Double = {
 val w = weightCol
-predictions.select(col(model.getLabelCol), col(predictionCol), w).rdd.map {
+predictions.select(col(model.getLabelCol).cast(DoubleType), 
col(predictionCol), w).rdd.map {
   case Row(label: Double, pred: Double, weight: Double) =>
 family.deviance(label, pred, weight)
 }.sum()
@@ -1030,9 +1030,10 @@ class GeneralizedLinearRegressionSummary 
private[regression] (
   lazy val aic: Double = {
 val w = weightCol
 val weightSum = predictions.select(w).agg(sum(w)).first().getDouble(0)
-val t = predictions.select(col(model.getLabelCol), col(predictionCol), 
w).rdd.map {
-  case Row(label: Double, pred: Double, weight: Double) =>
-(label, pred, weight)
+val t = predictions.select(
+  col(model.getLabelCol).cast(DoubleType), col(predictionCol), w).rdd.map {
+case Row(label: Double, pred: Double, weight: Double) =>
+  (label, pred, weight)
 }
 family.aic(t, deviance, 

spark git commit: [SPARK-17412][DOC] All test should not be run by `root` or any admin user

2016-09-29 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 3993ebca2 -> 39eb3bb1e


[SPARK-17412][DOC] All test should not be run by `root` or any admin user

## What changes were proposed in this pull request?

`FsHistoryProviderSuite` fails if `root` user runs it. The test case 
**SPARK-3697: ignore directories that cannot be read** depends on 
`setReadable(false, false)` to make test data files and expects the number of 
accessible files is 1. But, `root` can access all files, so it returns 2.

This PR adds the assumption explicitly on doc. `building-spark.md`.

## How was this patch tested?

This is a documentation change.

Author: Dongjoon Hyun 

Closes #15291 from dongjoon-hyun/SPARK-17412.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39eb3bb1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39eb3bb1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39eb3bb1

Branch: refs/heads/master
Commit: 39eb3bb1ec29aa993de13a6eba3ab27db6fc5371
Parents: 3993ebc
Author: Dongjoon Hyun 
Authored: Thu Sep 29 16:01:45 2016 -0700
Committer: Marcelo Vanzin 
Committed: Thu Sep 29 16:01:45 2016 -0700

--
 docs/building-spark.md | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/39eb3bb1/docs/building-spark.md
--
diff --git a/docs/building-spark.md b/docs/building-spark.md
index 75c304a3..da7eeb8 100644
--- a/docs/building-spark.md
+++ b/docs/building-spark.md
@@ -215,6 +215,7 @@ For help in setting up IntelliJ IDEA or Eclipse for Spark 
development, and troub
 # Running Tests
 
 Tests are run by default via the [ScalaTest Maven 
plugin](http://www.scalatest.org/user_guide/using_the_scalatest_maven_plugin).
+Note that tests should not be run as root or an admin user.
 
 Some of the tests require Spark to be packaged first, so always run `mvn 
package` with `-DskipTests` the first time.  The following is an example of a 
correct (build, test) sequence:
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17676][CORE] FsHistoryProvider should ignore hidden files

2016-09-29 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 29396e7d1 -> 3993ebca2


[SPARK-17676][CORE] FsHistoryProvider should ignore hidden files

## What changes were proposed in this pull request?

FsHistoryProvider was writing a hidden file (to check the fs's clock).
Even though it deleted the file immediately, sometimes another thread
would try to scan the files on the fs in-between, and then there would
be an error msg logged which was very misleading for the end-user.
(The logged error was harmless, though.)

## How was this patch tested?

I added one unit test, but to be clear, that test was passing before.  The 
actual change in behavior in that test is just logging (after the change, there 
is no more logged error), which I just manually verified.

Author: Imran Rashid 

Closes #15250 from squito/SPARK-17676.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3993ebca
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3993ebca
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3993ebca

Branch: refs/heads/master
Commit: 3993ebca23afa4b8770695051635933a6c9d2c11
Parents: 29396e7
Author: Imran Rashid 
Authored: Thu Sep 29 15:40:35 2016 -0700
Committer: Marcelo Vanzin 
Committed: Thu Sep 29 15:40:35 2016 -0700

--
 .../deploy/history/FsHistoryProvider.scala  |  7 +++-
 .../deploy/history/FsHistoryProviderSuite.scala | 36 ++--
 2 files changed, 40 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3993ebca/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index d494ff0..c5740e4 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -294,7 +294,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 .filter { entry =>
   try {
 val prevFileSize = 
fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
-!entry.isDirectory() && prevFileSize < entry.getLen()
+!entry.isDirectory() &&
+  // FsHistoryProvider generates a hidden file which can't be 
read.  Accidentally
+  // reading a garbage file is safe, but we would log an error 
which can be scary to
+  // the end-user.
+  !entry.getPath().getName().startsWith(".") &&
+  prevFileSize < entry.getLen()
   } catch {
 case e: AccessControlException =>
   // Do not use "logInfo" since these messages can get pretty 
noisy if printed on

http://git-wip-us.apache.org/repos/asf/spark/blob/3993ebca/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 39c5857..01bef0a 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -17,8 +17,7 @@
 
 package org.apache.spark.deploy.history
 
-import java.io.{BufferedOutputStream, ByteArrayInputStream, 
ByteArrayOutputStream, File,
-  FileOutputStream, OutputStreamWriter}
+import java.io._
 import java.net.URI
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.TimeUnit
@@ -394,6 +393,39 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
 }
   }
 
+  test("ignore hidden files") {
+
+// FsHistoryProvider should ignore hidden files.  (It even writes out a 
hidden file itself
+// that should be ignored).
+
+// write out one totally bogus hidden file
+val hiddenGarbageFile = new File(testDir, ".garbage")
+val out = new PrintWriter(hiddenGarbageFile)
+// scalastyle:off println
+out.println("GARBAGE")
+// scalastyle:on println
+out.close()
+
+// also write out one real event log file, but since its a hidden file, we 
shouldn't read it
+val tmpNewAppFile = newLogFile("hidden", None, inProgress = false)
+val hiddenNewAppFile = new File(tmpNewAppFile.getParentFile, "." + 
tmpNewAppFile.getName)
+tmpNewAppFile.renameTo(hiddenNewAppFile)
+
+// and write one real file, which should still get picked up just fine
+val newAppComplete = 

spark git commit: [SPARK-17721][MLLIB][ML] Fix for multiplying transposed SparseMatrix with SparseVector

2016-09-29 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 4ecc648ad -> 29396e7d1


[SPARK-17721][MLLIB][ML] Fix for multiplying transposed SparseMatrix with 
SparseVector

## What changes were proposed in this pull request?

* changes the implementation of gemv with transposed SparseMatrix and 
SparseVector both in mllib-local and mllib (identical)
* adds a test that was failing before this change, but succeeds with these 
changes.

The problem in the previous implementation was that it only increments `i`, 
that is enumerating the columns of a row in the SparseMatrix, when the 
row-index of the vector matches the column-index of the SparseMatrix. In cases 
where a particular row of the SparseMatrix has non-zero values at 
column-indices lower than corresponding non-zero row-indices of the 
SparseVector, the non-zero values of the SparseVector are enumerated without 
ever matching the column-index at index `i` and the remaining column-indices 
i+1,...,indEnd-1 are never attempted. The test cases in this PR illustrate this 
issue.

## How was this patch tested?

I have run the specific `gemv` tests in both mllib-local and mllib. I am 
currently still running `./dev/run-tests`.

## ___
As per instructions, I hereby state that this is my original work and that I 
license the work to the project (Apache Spark) under the project's open source 
license.

Mentioning dbtsai, viirya and brkyvz whom I can see have worked/authored on 
these parts before.

Author: Bjarne Fruergaard 

Closes #15296 from bwahlgreen/bugfix-spark-17721.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29396e7d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29396e7d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29396e7d

Branch: refs/heads/master
Commit: 29396e7d1483d027960b9a1bed47008775c4253e
Parents: 4ecc648
Author: Bjarne Fruergaard 
Authored: Thu Sep 29 15:39:57 2016 -0700
Committer: Joseph K. Bradley 
Committed: Thu Sep 29 15:39:57 2016 -0700

--
 .../scala/org/apache/spark/ml/linalg/BLAS.scala|  8 ++--
 .../org/apache/spark/ml/linalg/BLASSuite.scala | 17 +
 .../scala/org/apache/spark/mllib/linalg/BLAS.scala |  8 ++--
 .../org/apache/spark/mllib/linalg/BLASSuite.scala  | 17 +
 4 files changed, 46 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/29396e7d/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
--
diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala 
b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
index 41b0c6c..4ca19f3 100644
--- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
+++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
@@ -638,12 +638,16 @@ private[spark] object BLAS extends Serializable {
 val indEnd = Arows(rowCounter + 1)
 var sum = 0.0
 var k = 0
-while (k < xNnz && i < indEnd) {
+while (i < indEnd && k < xNnz) {
   if (xIndices(k) == Acols(i)) {
 sum += Avals(i) * xValues(k)
+k += 1
+i += 1
+  } else if (xIndices(k) < Acols(i)) {
+k += 1
+  } else {
 i += 1
   }
-  k += 1
 }
 yValues(rowCounter) = sum * alpha + beta * yValues(rowCounter)
 rowCounter += 1

http://git-wip-us.apache.org/repos/asf/spark/blob/29396e7d/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala
--
diff --git 
a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala 
b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala
index 8a9f497..6e72a5f 100644
--- a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala
+++ b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala
@@ -392,6 +392,23 @@ class BLASSuite extends SparkMLFunSuite {
   }
 }
 
+val y17 = new DenseVector(Array(0.0, 0.0))
+val y18 = y17.copy
+
+val sA3 = new SparseMatrix(3, 2, Array(0, 2, 4), Array(1, 2, 0, 1), 
Array(2.0, 1.0, 1.0, 2.0))
+  .transpose
+val sA4 =
+  new SparseMatrix(2, 3, Array(0, 1, 3, 4), Array(1, 0, 1, 0), Array(1.0, 
2.0, 2.0, 1.0))
+val sx3 = new SparseVector(3, Array(1, 2), Array(2.0, 1.0))
+
+val expected4 = new DenseVector(Array(5.0, 4.0))
+
+gemv(1.0, sA3, sx3, 0.0, y17)
+gemv(1.0, sA4, sx3, 0.0, y18)
+
+assert(y17 ~== expected4 absTol 1e-15)
+assert(y18 ~== expected4 absTol 1e-15)
+
 val dAT =
   new DenseMatrix(3, 4, Array(0.0, 2.0, 0.0, 1.0, 0.0, 0.0, 

spark git commit: [SPARK-17721][MLLIB][ML] Fix for multiplying transposed SparseMatrix with SparseVector

2016-09-29 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 7c9450b00 -> 0cdd7370a


[SPARK-17721][MLLIB][ML] Fix for multiplying transposed SparseMatrix with 
SparseVector

## What changes were proposed in this pull request?

* changes the implementation of gemv with transposed SparseMatrix and 
SparseVector both in mllib-local and mllib (identical)
* adds a test that was failing before this change, but succeeds with these 
changes.

The problem in the previous implementation was that it only increments `i`, 
that is enumerating the columns of a row in the SparseMatrix, when the 
row-index of the vector matches the column-index of the SparseMatrix. In cases 
where a particular row of the SparseMatrix has non-zero values at 
column-indices lower than corresponding non-zero row-indices of the 
SparseVector, the non-zero values of the SparseVector are enumerated without 
ever matching the column-index at index `i` and the remaining column-indices 
i+1,...,indEnd-1 are never attempted. The test cases in this PR illustrate this 
issue.

## How was this patch tested?

I have run the specific `gemv` tests in both mllib-local and mllib. I am 
currently still running `./dev/run-tests`.

## ___
As per instructions, I hereby state that this is my original work and that I 
license the work to the project (Apache Spark) under the project's open source 
license.

Mentioning dbtsai, viirya and brkyvz whom I can see have worked/authored on 
these parts before.

Author: Bjarne Fruergaard 

Closes #15296 from bwahlgreen/bugfix-spark-17721.

(cherry picked from commit 29396e7d1483d027960b9a1bed47008775c4253e)
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0cdd7370
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0cdd7370
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0cdd7370

Branch: refs/heads/branch-2.0
Commit: 0cdd7370a61618d042417ee387a3c32ee5c924e6
Parents: 7c9450b
Author: Bjarne Fruergaard 
Authored: Thu Sep 29 15:39:57 2016 -0700
Committer: Joseph K. Bradley 
Committed: Thu Sep 29 15:40:08 2016 -0700

--
 .../scala/org/apache/spark/ml/linalg/BLAS.scala|  8 ++--
 .../org/apache/spark/ml/linalg/BLASSuite.scala | 17 +
 .../scala/org/apache/spark/mllib/linalg/BLAS.scala |  8 ++--
 .../org/apache/spark/mllib/linalg/BLASSuite.scala  | 17 +
 4 files changed, 46 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0cdd7370/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
--
diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala 
b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
index 41b0c6c..4ca19f3 100644
--- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
+++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
@@ -638,12 +638,16 @@ private[spark] object BLAS extends Serializable {
 val indEnd = Arows(rowCounter + 1)
 var sum = 0.0
 var k = 0
-while (k < xNnz && i < indEnd) {
+while (i < indEnd && k < xNnz) {
   if (xIndices(k) == Acols(i)) {
 sum += Avals(i) * xValues(k)
+k += 1
+i += 1
+  } else if (xIndices(k) < Acols(i)) {
+k += 1
+  } else {
 i += 1
   }
-  k += 1
 }
 yValues(rowCounter) = sum * alpha + beta * yValues(rowCounter)
 rowCounter += 1

http://git-wip-us.apache.org/repos/asf/spark/blob/0cdd7370/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala
--
diff --git 
a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala 
b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala
index 8a9f497..6e72a5f 100644
--- a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala
+++ b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala
@@ -392,6 +392,23 @@ class BLASSuite extends SparkMLFunSuite {
   }
 }
 
+val y17 = new DenseVector(Array(0.0, 0.0))
+val y18 = y17.copy
+
+val sA3 = new SparseMatrix(3, 2, Array(0, 2, 4), Array(1, 2, 0, 1), 
Array(2.0, 1.0, 1.0, 2.0))
+  .transpose
+val sA4 =
+  new SparseMatrix(2, 3, Array(0, 1, 3, 4), Array(1, 0, 1, 0), Array(1.0, 
2.0, 2.0, 1.0))
+val sx3 = new SparseVector(3, Array(1, 2), Array(2.0, 1.0))
+
+val expected4 = new DenseVector(Array(5.0, 4.0))
+
+gemv(1.0, sA3, sx3, 0.0, y17)
+gemv(1.0, sA4, sx3, 0.0, y18)
+
+assert(y17 ~== expected4 

spark git commit: [SPARK-17612][SQL] Support `DESCRIBE table PARTITION` SQL syntax

2016-09-29 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 566d7f282 -> 4ecc648ad


[SPARK-17612][SQL] Support `DESCRIBE table PARTITION` SQL syntax

## What changes were proposed in this pull request?

This PR implements `DESCRIBE table PARTITION` SQL Syntax again. It was 
supported until Spark 1.6.2, but was dropped since 2.0.0.

**Spark 1.6.2**
```scala
scala> sql("CREATE TABLE partitioned_table (a STRING, b INT) PARTITIONED BY (c 
STRING, d STRING)")
res1: org.apache.spark.sql.DataFrame = [result: string]

scala> sql("ALTER TABLE partitioned_table ADD PARTITION (c='Us', d=1)")
res2: org.apache.spark.sql.DataFrame = [result: string]

scala> sql("DESC partitioned_table PARTITION (c='Us', d=1)").show(false)
++
|result  |
++
|a  string   |
|b  int  |
|c  string   |
|d  string   |
||
|# Partition Information |
|# col_name data_type   comment  |
||
|c  string   |
|d  string   |
++
```

**Spark 2.0**
- **Before**
```scala
scala> sql("CREATE TABLE partitioned_table (a STRING, b INT) PARTITIONED BY (c 
STRING, d STRING)")
res0: org.apache.spark.sql.DataFrame = []

scala> sql("ALTER TABLE partitioned_table ADD PARTITION (c='Us', d=1)")
res1: org.apache.spark.sql.DataFrame = []

scala> sql("DESC partitioned_table PARTITION (c='Us', d=1)").show(false)
org.apache.spark.sql.catalyst.parser.ParseException:
Unsupported SQL statement
```

- **After**
```scala
scala> sql("CREATE TABLE partitioned_table (a STRING, b INT) PARTITIONED BY (c 
STRING, d STRING)")
res0: org.apache.spark.sql.DataFrame = []

scala> sql("ALTER TABLE partitioned_table ADD PARTITION (c='Us', d=1)")
res1: org.apache.spark.sql.DataFrame = []

scala> sql("DESC partitioned_table PARTITION (c='Us', d=1)").show(false)
+---+-+---+
|col_name   |data_type|comment|
+---+-+---+
|a  |string   |null   |
|b  |int  |null   |
|c  |string   |null   |
|d  |string   |null   |
|# Partition Information| |   |
|# col_name |data_type|comment|
|c  |string   |null   |
|d  |string   |null   |
+---+-+---+

scala> sql("DESC EXTENDED partitioned_table PARTITION (c='Us', 
d=1)").show(100,false)
+---+-+---+
|col_name   





|data_type|comment|
+---+-+---+
|a  




   

spark git commit: [SPARK-17653][SQL] Remove unnecessary distincts in multiple unions

2016-09-29 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master fe33121a5 -> 566d7f282


[SPARK-17653][SQL] Remove unnecessary distincts in multiple unions

## What changes were proposed in this pull request?

Currently for `Union [Distinct]`, a `Distinct` operator is necessary to be on 
the top of `Union`. Once there are adjacent `Union [Distinct]`,  there will be 
multiple `Distinct` in the query plan.

E.g.,

For a query like: select 1 a union select 2 b union select 3 c

Before this patch, its physical plan looks like:

*HashAggregate(keys=[a#13], functions=[])
+- Exchange hashpartitioning(a#13, 200)
   +- *HashAggregate(keys=[a#13], functions=[])
  +- Union
 :- *HashAggregate(keys=[a#13], functions=[])
 :  +- Exchange hashpartitioning(a#13, 200)
 : +- *HashAggregate(keys=[a#13], functions=[])
 :+- Union
 :   :- *Project [1 AS a#13]
 :   :  +- Scan OneRowRelation[]
 :   +- *Project [2 AS b#14]
 :  +- Scan OneRowRelation[]
 +- *Project [3 AS c#15]
+- Scan OneRowRelation[]

Only the top distinct should be necessary.

After this patch, the physical plan looks like:

*HashAggregate(keys=[a#221], functions=[], output=[a#221])
+- Exchange hashpartitioning(a#221, 5)
   +- *HashAggregate(keys=[a#221], functions=[], output=[a#221])
  +- Union
 :- *Project [1 AS a#221]
 :  +- Scan OneRowRelation[]
 :- *Project [2 AS b#222]
 :  +- Scan OneRowRelation[]
 +- *Project [3 AS c#223]
+- Scan OneRowRelation[]

## How was this patch tested?

Jenkins tests.

Author: Liang-Chi Hsieh 

Closes #15238 from viirya/remove-extra-distinct-union.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/566d7f28
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/566d7f28
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/566d7f28

Branch: refs/heads/master
Commit: 566d7f28275f90f7b9bed6a75e90989ad0c59931
Parents: fe33121
Author: Liang-Chi Hsieh 
Authored: Thu Sep 29 14:30:23 2016 -0700
Committer: Herman van Hovell 
Committed: Thu Sep 29 14:30:23 2016 -0700

--
 .../sql/catalyst/optimizer/Optimizer.scala  | 24 ++-
 .../spark/sql/catalyst/planning/patterns.scala  | 27 
 .../catalyst/optimizer/SetOperationSuite.scala  | 68 
 3 files changed, 89 insertions(+), 30 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/566d7f28/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 4952ba3..9df8ce1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import scala.annotation.tailrec
 import scala.collection.immutable.HashSet
+import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.api.java.function.FilterFunction
@@ -29,7 +30,7 @@ import 
org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, 
TrueLiteral}
-import org.apache.spark.sql.catalyst.planning.{ExtractFiltersAndInnerJoins, 
Unions}
+import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
@@ -579,8 +580,25 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
  * Combines all adjacent [[Union]] operators into a single [[Union]].
  */
 object CombineUnions extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-case Unions(children) => Union(children)
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+case u: Union => flattenUnion(u, false)
+case Distinct(u: Union) => Distinct(flattenUnion(u, true))
+  }
+
+  private def flattenUnion(union: Union, flattenDistinct: Boolean): Union = {
+val stack = mutable.Stack[LogicalPlan](union)
+val flattened = mutable.ArrayBuffer.empty[LogicalPlan]
+

spark git commit: [SPARK-17699] Support for parsing JSON string columns

2016-09-29 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 027dea8f2 -> fe33121a5


[SPARK-17699] Support for parsing JSON string columns

Spark SQL has great support for reading text files that contain JSON data.  
However, in many cases the JSON data is just one column amongst others.  This 
is particularly true when reading from sources such as Kafka.  This PR adds a 
new functions `from_json` that converts a string column into a nested 
`StructType` with a user specified schema.

Example usage:
```scala
val df = Seq("""{"a": 1}""").toDS()
val schema = new StructType().add("a", IntegerType)

df.select(from_json($"value", schema) as 'json) // => [json: ]
```

This PR adds support for java, scala and python.  I leveraged our existing JSON 
parsing support by moving it into catalyst (so that we could define expressions 
using it).  I left SQL out for now, because I'm not sure how users would 
specify a schema.

Author: Michael Armbrust 

Closes #15274 from marmbrus/jsonParser.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe33121a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe33121a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe33121a

Branch: refs/heads/master
Commit: fe33121a53384811a8e094ab6c05dc85b7c7ca87
Parents: 027dea8
Author: Michael Armbrust 
Authored: Thu Sep 29 13:01:10 2016 -0700
Committer: Yin Huai 
Committed: Thu Sep 29 13:01:10 2016 -0700

--
 python/pyspark/sql/functions.py |  23 +
 .../catalyst/expressions/jsonExpressions.scala  |  31 +-
 .../spark/sql/catalyst/json/JSONOptions.scala   |  84 
 .../spark/sql/catalyst/json/JacksonParser.scala | 443 +++
 .../spark/sql/catalyst/json/JacksonUtils.scala  |  32 ++
 .../sql/catalyst/util/CompressionCodecs.scala   |  72 +++
 .../spark/sql/catalyst/util/ParseModes.scala|  41 ++
 .../expressions/JsonExpressionsSuite.scala  |  26 ++
 .../org/apache/spark/sql/DataFrameReader.scala  |   5 +-
 .../datasources/CompressionCodecs.scala |  72 ---
 .../sql/execution/datasources/ParseModes.scala  |  41 --
 .../datasources/csv/CSVFileFormat.scala |   1 +
 .../execution/datasources/csv/CSVOptions.scala  |   2 +-
 .../datasources/json/InferSchema.scala  |   3 +-
 .../datasources/json/JSONOptions.scala  |  84 
 .../datasources/json/JacksonGenerator.scala |   3 +-
 .../datasources/json/JacksonParser.scala| 440 --
 .../datasources/json/JacksonUtils.scala |  32 --
 .../datasources/json/JsonFileFormat.scala   |   2 +
 .../datasources/text/TextFileFormat.scala   |   1 +
 .../scala/org/apache/spark/sql/functions.scala  |  58 +++
 .../apache/spark/sql/JsonFunctionsSuite.scala   |  29 ++
 .../json/JsonParsingOptionsSuite.scala  |   1 +
 .../execution/datasources/json/JsonSuite.scala  |   3 +-
 24 files changed, 852 insertions(+), 677 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 89b3c07..45d6bf9 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1706,6 +1706,29 @@ def json_tuple(col, *fields):
 return Column(jc)
 
 
+@since(2.1)
+def from_json(col, schema, options={}):
+"""
+Parses a column containing a JSON string into a [[StructType]] with the
+specified schema. Returns `null`, in the case of an unparseable string.
+
+:param col: string column in json format
+:param schema: a StructType to use when parsing the json column
+:param options: options to control parsing. accepts the same options as 
the json datasource
+
+>>> from pyspark.sql.types import *
+>>> data = [(1, '''{"a": 1}''')]
+>>> schema = StructType([StructField("a", IntegerType())])
+>>> df = spark.createDataFrame(data, ("key", "value"))
+>>> df.select(from_json(df.value, schema).alias("json")).collect()
+[Row(json=Row(a=1))]
+"""
+
+sc = SparkContext._active_spark_context
+jc = sc._jvm.functions.from_json(_to_java_column(col), schema.json(), 
options)
+return Column(jc)
+
+
 @since(1.5)
 def size(col):
 """

http://git-wip-us.apache.org/repos/asf/spark/blob/fe33121a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index c14a2fb..65dbd6a 100644
--- 

spark git commit: [SPARK-17715][SCHEDULER] Make task launch logs DEBUG

2016-09-29 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master cb87b3ced -> 027dea8f2


[SPARK-17715][SCHEDULER] Make task launch logs DEBUG

## What changes were proposed in this pull request?

Ramp down the task launch logs from INFO to DEBUG. Task launches can happen 
orders of magnitude more than executor registration so it makes the logs easier 
to handle if they are different log levels. For larger jobs, there can be 
100,000s of task launches which makes the driver log huge.

## How was this patch tested?

No tests, as this is a trivial change.

Author: Brian Cho 

Closes #15290 from dafrista/ramp-down-task-logging.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/027dea8f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/027dea8f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/027dea8f

Branch: refs/heads/master
Commit: 027dea8f294504bc5cd8bfedde546d171cb78657
Parents: cb87b3c
Author: Brian Cho 
Authored: Thu Sep 29 15:59:17 2016 -0400
Committer: Andrew Or 
Committed: Thu Sep 29 15:59:17 2016 -0400

--
 .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/027dea8f/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 2d09863..0dae0e6 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -265,7 +265,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   val executorData = executorDataMap(task.executorId)
   executorData.freeCores -= scheduler.CPUS_PER_TASK
 
-  logInfo(s"Launching task ${task.taskId} on executor id: 
${task.executorId} hostname: " +
+  logDebug(s"Launching task ${task.taskId} on executor id: 
${task.executorId} hostname: " +
 s"${executorData.executorHost}.")
 
   executorData.executorEndpoint.send(LaunchTask(new 
SerializableBuffer(serializedTask)))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17672] Spark 2.0 history server web Ui takes too long for a single application

2016-09-29 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 f7839e47c -> 7c9450b00


[SPARK-17672] Spark 2.0 history server web Ui takes too long for a single 
application

Added a new API getApplicationInfo(appId: String) in class 
ApplicationHistoryProvider and class SparkUI to get app info. In this change, 
FsHistoryProvider can directly fetch one app info in O(1) time complexity 
compared to O(n) before the change which used an Iterator.find() interface.

Both ApplicationCache and OneApplicationResource classes adopt this new api.

 manual tests

Author: Gang Wu 

Closes #15247 from wgtmac/SPARK-17671.

(cherry picked from commit cb87b3ced9453b5717fa8e8637b97a2f3f25fdd7)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c9450b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c9450b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c9450b0

Branch: refs/heads/branch-2.0
Commit: 7c9450b007205958984f39a881415cdbe75e0c34
Parents: f7839e4
Author: Gang Wu 
Authored: Thu Sep 29 15:51:05 2016 -0400
Committer: Andrew Or 
Committed: Thu Sep 29 15:51:57 2016 -0400

--
 .../spark/deploy/history/ApplicationHistoryProvider.scala   | 5 +
 .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 4 
 .../scala/org/apache/spark/deploy/history/HistoryServer.scala   | 4 
 .../scala/org/apache/spark/status/api/v1/ApiRootResource.scala  | 1 +
 .../org/apache/spark/status/api/v1/OneApplicationResource.scala | 2 +-
 core/src/main/scala/org/apache/spark/ui/SparkUI.scala   | 4 
 6 files changed, 19 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7c9450b0/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
 
b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index 44661ed..ba42b48 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -109,4 +109,9 @@ private[history] abstract class ApplicationHistoryProvider {
   @throws(classOf[SparkException])
   def writeEventLogs(appId: String, attemptId: Option[String], zipStream: 
ZipOutputStream): Unit
 
+  /**
+   * @return the [[ApplicationHistoryInfo]] for the appId if it exists.
+   */
+  def getApplicationInfo(appId: String): Option[ApplicationHistoryInfo]
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7c9450b0/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 110d882..cf4a401 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -222,6 +222,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 
   override def getListing(): Iterable[FsApplicationHistoryInfo] = 
applications.values
 
+  override def getApplicationInfo(appId: String): 
Option[FsApplicationHistoryInfo] = {
+applications.get(appId)
+  }
+
   override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] = {
 try {
   applications.get(appId).flatMap { appInfo =>

http://git-wip-us.apache.org/repos/asf/spark/blob/7c9450b0/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index c178917..735aa43 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -182,6 +182,10 @@ class HistoryServer(
 
getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
   }
 
+  def getApplicationInfo(appId: String): Option[ApplicationInfo] = {
+
provider.getApplicationInfo(appId).map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
+  }
+
   override def writeEventLogs(
   appId: String,
   attemptId: Option[String],


spark git commit: [SPARK-17672] Spark 2.0 history server web Ui takes too long for a single application

2016-09-29 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 7f779e743 -> cb87b3ced


[SPARK-17672] Spark 2.0 history server web Ui takes too long for a single 
application

Added a new API getApplicationInfo(appId: String) in class 
ApplicationHistoryProvider and class SparkUI to get app info. In this change, 
FsHistoryProvider can directly fetch one app info in O(1) time complexity 
compared to O(n) before the change which used an Iterator.find() interface.

Both ApplicationCache and OneApplicationResource classes adopt this new api.

 manual tests

Author: Gang Wu 

Closes #15247 from wgtmac/SPARK-17671.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb87b3ce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb87b3ce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb87b3ce

Branch: refs/heads/master
Commit: cb87b3ced9453b5717fa8e8637b97a2f3f25fdd7
Parents: 7f779e7
Author: Gang Wu 
Authored: Thu Sep 29 15:51:05 2016 -0400
Committer: Andrew Or 
Committed: Thu Sep 29 15:51:38 2016 -0400

--
 .../spark/deploy/history/ApplicationHistoryProvider.scala   | 5 +
 .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 4 
 .../scala/org/apache/spark/deploy/history/HistoryServer.scala   | 4 
 .../scala/org/apache/spark/status/api/v1/ApiRootResource.scala  | 1 +
 .../org/apache/spark/status/api/v1/OneApplicationResource.scala | 2 +-
 core/src/main/scala/org/apache/spark/ui/SparkUI.scala   | 4 
 6 files changed, 19 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cb87b3ce/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
 
b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index 44661ed..ba42b48 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -109,4 +109,9 @@ private[history] abstract class ApplicationHistoryProvider {
   @throws(classOf[SparkException])
   def writeEventLogs(appId: String, attemptId: Option[String], zipStream: 
ZipOutputStream): Unit
 
+  /**
+   * @return the [[ApplicationHistoryInfo]] for the appId if it exists.
+   */
+  def getApplicationInfo(appId: String): Option[ApplicationHistoryInfo]
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cb87b3ce/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 6874aa5..d494ff0 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -224,6 +224,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 
   override def getListing(): Iterable[FsApplicationHistoryInfo] = 
applications.values
 
+  override def getApplicationInfo(appId: String): 
Option[FsApplicationHistoryInfo] = {
+applications.get(appId)
+  }
+
   override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] = {
 try {
   applications.get(appId).flatMap { appInfo =>

http://git-wip-us.apache.org/repos/asf/spark/blob/cb87b3ce/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index c178917..735aa43 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -182,6 +182,10 @@ class HistoryServer(
 
getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
   }
 
+  def getApplicationInfo(appId: String): Option[ApplicationInfo] = {
+
provider.getApplicationInfo(appId).map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
+  }
+
   override def writeEventLogs(
   appId: String,
   attemptId: Option[String],

http://git-wip-us.apache.org/repos/asf/spark/blob/cb87b3ce/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
--
diff --git 

spark git commit: [SPARK-17648][CORE] TaskScheduler really needs offers to be an IndexedSeq

2016-09-29 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 958200497 -> 7f779e743


[SPARK-17648][CORE] TaskScheduler really needs offers to be an IndexedSeq

## What changes were proposed in this pull request?

The Seq[WorkerOffer] is accessed by index, so it really should be an
IndexedSeq, otherwise an O(n) operation becomes O(n^2).  In practice
this hasn't been an issue b/c where these offers are generated, the call
to `.toSeq` just happens to create an IndexedSeq anyway.I got bitten by
this in performance tests I was doing, and its better for the types to be
more precise so eg. a change in Scala doesn't destroy performance.

## How was this patch tested?

Unit tests via jenkins.

Author: Imran Rashid 

Closes #15221 from squito/SPARK-17648.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f779e74
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f779e74
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f779e74

Branch: refs/heads/master
Commit: 7f779e7439127efa0e3611f7745e1c8423845198
Parents: 9582004
Author: Imran Rashid 
Authored: Thu Sep 29 15:36:40 2016 -0400
Committer: Andrew Or 
Committed: Thu Sep 29 15:36:40 2016 -0400

--
 .../spark/scheduler/TaskSchedulerImpl.scala |  4 +--
 .../cluster/CoarseGrainedSchedulerBackend.scala |  4 +--
 .../scheduler/local/LocalSchedulerBackend.scala |  2 +-
 .../scheduler/SchedulerIntegrationSuite.scala   |  7 ++---
 .../scheduler/TaskSchedulerImplSuite.scala  | 32 ++--
 .../MesosFineGrainedSchedulerBackend.scala  |  2 +-
 .../MesosFineGrainedSchedulerBackendSuite.scala |  2 +-
 7 files changed, 26 insertions(+), 27 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7f779e74/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 52a7186..0ad4730 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -252,7 +252,7 @@ private[spark] class TaskSchedulerImpl(
   maxLocality: TaskLocality,
   shuffledOffers: Seq[WorkerOffer],
   availableCpus: Array[Int],
-  tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
+  tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
 var launchedTask = false
 for (i <- 0 until shuffledOffers.size) {
   val execId = shuffledOffers(i).executorId
@@ -286,7 +286,7 @@ private[spark] class TaskSchedulerImpl(
* sets for tasks in order of priority. We fill each node with tasks in a 
round-robin manner so
* that tasks are balanced across the cluster.
*/
-  def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = 
synchronized {
+  def resourceOffers(offers: IndexedSeq[WorkerOffer]): 
Seq[Seq[TaskDescription]] = synchronized {
 // Mark each slave as alive and remember its hostname
 // Also track if new executor is added
 var newExecAvail = false

http://git-wip-us.apache.org/repos/asf/spark/blob/7f779e74/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index edc3c19..2d09863 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -216,7 +216,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
   val workOffers = activeExecutors.map { case (id, executorData) =>
 new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
-  }.toSeq
+  }.toIndexedSeq
   launchTasks(scheduler.resourceOffers(workOffers))
 }
 
@@ -233,7 +233,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   // Filter out executors under killing
   if (executorIsAlive(executorId)) {
 val executorData = executorDataMap(executorId)
-val workOffers = Seq(
+val workOffers = IndexedSeq(
   new WorkerOffer(executorId, executorData.executorHost, 
executorData.freeCores))
 launchTasks(scheduler.resourceOffers(workOffers))
   }


spark git commit: [SPARK-17712][SQL] Fix invalid pushdown of data-independent filters beneath aggregates

2016-09-29 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 7ffafa3bf -> f7839e47c


[SPARK-17712][SQL] Fix invalid pushdown of data-independent filters beneath 
aggregates

## What changes were proposed in this pull request?

This patch fixes a minor correctness issue impacting the pushdown of filters 
beneath aggregates. Specifically, if a filter condition references no grouping 
or aggregate columns (e.g. `WHERE false`) then it would be incorrectly pushed 
beneath an aggregate.

Intuitively, the only case where you can push a filter beneath an aggregate is 
when that filter is deterministic and is defined over the grouping columns / 
expressions, since in that case the filter is acting to exclude entire groups 
from the query (like a `HAVING` clause). The existing code would only push 
deterministic filters beneath aggregates when all of the filter's references 
were grouping columns, but this logic missed the case where a filter has no 
references. For example, `WHERE false` is deterministic but is independent of 
the actual data.

This patch fixes this minor bug by adding a new check to ensure that we don't 
push filters beneath aggregates when those filters don't reference any columns.

## How was this patch tested?

New regression test in FilterPushdownSuite.

Author: Josh Rosen 

Closes #15289 from JoshRosen/SPARK-17712.

(cherry picked from commit 37eb9184f1e9f1c07142c66936671f4711ef407d)
Signed-off-by: Josh Rosen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7839e47
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7839e47
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7839e47

Branch: refs/heads/branch-2.0
Commit: f7839e47c3bda86d61c3b2be72c168aab4a5674f
Parents: 7ffafa3
Author: Josh Rosen 
Authored: Wed Sep 28 19:03:05 2016 -0700
Committer: Josh Rosen 
Committed: Thu Sep 29 12:05:46 2016 -0700

--
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  2 +-
 .../catalyst/optimizer/FilterPushdownSuite.scala   | 17 +
 2 files changed, 18 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f7839e47/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 35b122d..4c06038 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1071,7 +1071,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
 
   val (pushDown, rest) = candidates.partition { cond =>
 val replaced = replaceAlias(cond, aliasMap)
-replaced.references.subsetOf(aggregate.child.outputSet)
+cond.references.nonEmpty && 
replaced.references.subsetOf(aggregate.child.outputSet)
   }
 
   val stayUp = rest ++ containingNonDeterministic

http://git-wip-us.apache.org/repos/asf/spark/blob/f7839e47/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 55836f9..019f132 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -687,6 +687,23 @@ class FilterPushdownSuite extends PlanTest {
 comparePlans(optimized, correctAnswer)
   }
 
+  test("SPARK-17712: aggregate: don't push down filters that are 
data-independent") {
+val originalQuery = LocalRelation.apply(testRelation.output, Seq.empty)
+  .select('a, 'b)
+  .groupBy('a)(count('a))
+  .where(false)
+
+val optimized = Optimize.execute(originalQuery.analyze)
+
+val correctAnswer = testRelation
+  .select('a, 'b)
+  .groupBy('a)(count('a))
+  .where(false)
+  .analyze
+
+comparePlans(optimized, correctAnswer)
+  }
+
   test("broadcast hint") {
 val originalQuery = BroadcastHint(testRelation)
   .where('a === 2L && 'b + Rand(10).as("rnd") === 3)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: 

spark git commit: [SPARK-16343][SQL] Improve the PushDownPredicate rule to pushdown predicates correctly in non-deterministic condition.

2016-09-29 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 ca8130050 -> 7ffafa3bf


[SPARK-16343][SQL] Improve the PushDownPredicate rule to pushdown predicates 
correctly in non-deterministic condition.

## What changes were proposed in this pull request?

Currently our Optimizer may reorder the predicates to run them more efficient, 
but in non-deterministic condition, change the order between deterministic 
parts and non-deterministic parts may change the number of input rows. For 
example:
```SELECT a FROM t WHERE rand() < 0.1 AND a = 1```
And
```SELECT a FROM t WHERE a = 1 AND rand() < 0.1```
may call rand() for different times and therefore the output rows differ.

This PR improved this condition by checking whether the predicate is placed 
before any non-deterministic predicates.

## How was this patch tested?

Expanded related testcases in FilterPushdownSuite.

Author: 蒋星博 

Closes #14012 from jiangxb1987/ppd.

(cherry picked from commit f376c37268848dbb4b2fb57677e22ef2bf207b49)
Signed-off-by: Josh Rosen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ffafa3b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ffafa3b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ffafa3b

Branch: refs/heads/branch-2.0
Commit: 7ffafa3bfecb8bc92b79eddea1ca18166efd3385
Parents: ca81300
Author: 蒋星博 
Authored: Thu Jul 14 00:21:27 2016 +0800
Committer: Josh Rosen 
Committed: Thu Sep 29 11:44:00 2016 -0700

--
 .../sql/catalyst/optimizer/Optimizer.scala  | 44 +---
 .../optimizer/FilterPushdownSuite.scala |  8 ++--
 2 files changed, 33 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7ffafa3b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d824c2e..35b122d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1031,19 +1031,23 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
   project.copy(child = Filter(replaceAlias(condition, aliasMap), 
grandChild))
 
 // Push [[Filter]] operators through [[Window]] operators. Parts of the 
predicate that can be
-// pushed beneath must satisfy the following two conditions:
+// pushed beneath must satisfy the following conditions:
 // 1. All the expressions are part of window partitioning key. The 
expressions can be compound.
-// 2. Deterministic
+// 2. Deterministic.
+// 3. Placed before any non-deterministic predicates.
 case filter @ Filter(condition, w: Window)
 if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
   val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))
-  val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition 
{ cond =>
-cond.references.subsetOf(partitionAttrs) && cond.deterministic &&
-  // This is for ensuring all the partitioning expressions have been 
converted to alias
-  // in Analyzer. Thus, we do not need to check if the expressions in 
conditions are
-  // the same as the expressions used in partitioning columns.
-  partitionAttrs.forall(_.isInstanceOf[Attribute])
+
+  val (candidates, containingNonDeterministic) =
+splitConjunctivePredicates(condition).span(_.deterministic)
+
+  val (pushDown, rest) = candidates.partition { cond =>
+cond.references.subsetOf(partitionAttrs)
   }
+
+  val stayUp = rest ++ containingNonDeterministic
+
   if (pushDown.nonEmpty) {
 val pushDownPredicate = pushDown.reduce(And)
 val newWindow = w.copy(child = Filter(pushDownPredicate, w.child))
@@ -1062,11 +1066,16 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
 
   // For each filter, expand the alias and check if the filter can be 
evaluated using
   // attributes produced by the aggregate operator's child operator.
-  val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition 
{ cond =>
+  val (candidates, containingNonDeterministic) =
+splitConjunctivePredicates(condition).span(_.deterministic)
+
+  val (pushDown, rest) = candidates.partition { cond =>
 val replaced = replaceAlias(cond, aliasMap)
-

spark git commit: [DOCS] Reorganize explanation of Accumulators and Broadcast Variables

2016-09-29 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master b2e9731ca -> 958200497


[DOCS] Reorganize explanation of Accumulators and Broadcast Variables

## What changes were proposed in this pull request?

The discussion of the interaction of Accumulators and Broadcast Variables 
should logically follow the discussion on Checkpointing. As currently written, 
this section discusses Checkpointing before it is formally introduced. To 
remedy this:

 - Rename this section to "Accumulators, Broadcast Variables, and Checkpoints", 
and
 - Move this section after "Checkpointing".

## How was this patch tested?

Testing: ran

$ SKIP_API=1 jekyll build

, and verified changes in a Web browser pointed at docs/_site/index.html.

Author: José Hiram Soltren 

Closes #15281 from jsoltren/doc-changes.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95820049
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95820049
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95820049

Branch: refs/heads/master
Commit: 958200497affb40f05e321c2b0e252d365ae02f4
Parents: b2e9731
Author: José Hiram Soltren 
Authored: Thu Sep 29 10:18:56 2016 -0700
Committer: Marcelo Vanzin 
Committed: Thu Sep 29 10:18:56 2016 -0700

--
 docs/streaming-programming-guide.md | 328 +++
 1 file changed, 164 insertions(+), 164 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/95820049/docs/streaming-programming-guide.md
--
diff --git a/docs/streaming-programming-guide.md 
b/docs/streaming-programming-guide.md
index 43f1cf3..0b0315b 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -1368,170 +1368,6 @@ Note that the connections in the pool should be lazily 
created on demand and tim
 
 ***
 
-## Accumulators and Broadcast Variables
-
-[Accumulators](programming-guide.html#accumulators) and [Broadcast 
variables](programming-guide.html#broadcast-variables) cannot be recovered from 
checkpoint in Spark Streaming. If you enable checkpointing and use 
[Accumulators](programming-guide.html#accumulators) or [Broadcast 
variables](programming-guide.html#broadcast-variables) as well, you'll have to 
create lazily instantiated singleton instances for 
[Accumulators](programming-guide.html#accumulators) and [Broadcast 
variables](programming-guide.html#broadcast-variables) so that they can be 
re-instantiated after the driver restarts on failure. This is shown in the 
following example.
-
-
-
-{% highlight scala %}
-
-object WordBlacklist {
-
-  @volatile private var instance: Broadcast[Seq[String]] = null
-
-  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
-if (instance == null) {
-  synchronized {
-if (instance == null) {
-  val wordBlacklist = Seq("a", "b", "c")
-  instance = sc.broadcast(wordBlacklist)
-}
-  }
-}
-instance
-  }
-}
-
-object DroppedWordsCounter {
-
-  @volatile private var instance: LongAccumulator = null
-
-  def getInstance(sc: SparkContext): LongAccumulator = {
-if (instance == null) {
-  synchronized {
-if (instance == null) {
-  instance = sc.longAccumulator("WordsInBlacklistCounter")
-}
-  }
-}
-instance
-  }
-}
-
-wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
-  // Get or register the blacklist Broadcast
-  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
-  // Get or register the droppedWordsCounter Accumulator
-  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
-  // Use blacklist to drop words and use droppedWordsCounter to count them
-  val counts = rdd.filter { case (word, count) =>
-if (blacklist.value.contains(word)) {
-  droppedWordsCounter.add(count)
-  false
-} else {
-  true
-}
-  }.collect().mkString("[", ", ", "]")
-  val output = "Counts at time " + time + " " + counts
-})
-
-{% endhighlight %}
-
-See the full [source 
code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
-
-
-{% highlight java %}
-
-class JavaWordBlacklist {
-
-  private static volatile Broadcast instance = null;
-
-  public static Broadcast getInstance(JavaSparkContext jsc) {
-if (instance == null) {
-  synchronized (JavaWordBlacklist.class) {
-if (instance == null) {
-  List wordBlacklist = Arrays.asList("a", "b", "c");
-  instance = jsc.broadcast(wordBlacklist);
-}
-  }
-}
-return instance;
-  }
-}
-
-class JavaDroppedWordsCounter {
-
-  private static volatile 

spark git commit: [MINOR][DOCS] Fix th doc. of spark-streaming with kinesis

2016-09-29 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 7d612a7d5 -> ca8130050


[MINOR][DOCS] Fix th doc. of spark-streaming with kinesis

## What changes were proposed in this pull request?
This pr is just to fix the document of `spark-kinesis-integration`.
Since `SPARK-17418` prevented all the kinesis stuffs (including kinesis example 
code)
from publishing,  `bin/run-example streaming.KinesisWordCountASL` and 
`bin/run-example streaming.JavaKinesisWordCountASL` does not work.
Instead, it fetches the kinesis jar from the Spark Package.

Author: Takeshi YAMAMURO 

Closes #15260 from maropu/DocFixKinesis.

(cherry picked from commit b2e9731ca494c0c60d571499f68bb8306a3c9fe5)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca813005
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca813005
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca813005

Branch: refs/heads/branch-2.0
Commit: ca8130050964fac8baa568918f0b67c44a7a2518
Parents: 7d612a7
Author: Takeshi YAMAMURO 
Authored: Thu Sep 29 08:26:03 2016 -0400
Committer: Sean Owen 
Committed: Thu Sep 29 08:26:14 2016 -0400

--
 docs/streaming-kinesis-integration.md | 9 +++--
 1 file changed, 3 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ca813005/docs/streaming-kinesis-integration.md
--
diff --git a/docs/streaming-kinesis-integration.md 
b/docs/streaming-kinesis-integration.md
index 96198dd..6be0b54 100644
--- a/docs/streaming-kinesis-integration.md
+++ b/docs/streaming-kinesis-integration.md
@@ -166,10 +166,7 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
  Running the Example
 To run the example,
 
-- Download Spark source and follow the [instructions](building-spark.html) to 
build Spark with profile *-Pkinesis-asl*.
-
-mvn -Pkinesis-asl -DskipTests clean package
-
+- Download a Spark binary from the [download 
site](http://spark.apache.org/downloads.html).
 
 - Set up Kinesis stream (see earlier section) within AWS. Note the name of the 
Kinesis stream and the endpoint URL corresponding to the region where the 
stream was created.
 
@@ -180,12 +177,12 @@ To run the example,


 
-bin/run-example streaming.KinesisWordCountASL [Kinesis app name] 
[Kinesis stream name] [endpoint URL]
+bin/run-example --packages 
org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] 
[endpoint URL]
 


 
-bin/run-example streaming.JavaKinesisWordCountASL [Kinesis app name] 
[Kinesis stream name] [endpoint URL]
+bin/run-example --packages 
org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] 
[endpoint URL]
 




-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR][DOCS] Fix th doc. of spark-streaming with kinesis

2016-09-29 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master b35b0dbbf -> b2e9731ca


[MINOR][DOCS] Fix th doc. of spark-streaming with kinesis

## What changes were proposed in this pull request?
This pr is just to fix the document of `spark-kinesis-integration`.
Since `SPARK-17418` prevented all the kinesis stuffs (including kinesis example 
code)
from publishing,  `bin/run-example streaming.KinesisWordCountASL` and 
`bin/run-example streaming.JavaKinesisWordCountASL` does not work.
Instead, it fetches the kinesis jar from the Spark Package.

Author: Takeshi YAMAMURO 

Closes #15260 from maropu/DocFixKinesis.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b2e9731c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b2e9731c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b2e9731c

Branch: refs/heads/master
Commit: b2e9731ca494c0c60d571499f68bb8306a3c9fe5
Parents: b35b0db
Author: Takeshi YAMAMURO 
Authored: Thu Sep 29 08:26:03 2016 -0400
Committer: Sean Owen 
Committed: Thu Sep 29 08:26:03 2016 -0400

--
 docs/streaming-kinesis-integration.md | 9 +++--
 1 file changed, 3 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b2e9731c/docs/streaming-kinesis-integration.md
--
diff --git a/docs/streaming-kinesis-integration.md 
b/docs/streaming-kinesis-integration.md
index 96198dd..6be0b54 100644
--- a/docs/streaming-kinesis-integration.md
+++ b/docs/streaming-kinesis-integration.md
@@ -166,10 +166,7 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
  Running the Example
 To run the example,
 
-- Download Spark source and follow the [instructions](building-spark.html) to 
build Spark with profile *-Pkinesis-asl*.
-
-mvn -Pkinesis-asl -DskipTests clean package
-
+- Download a Spark binary from the [download 
site](http://spark.apache.org/downloads.html).
 
 - Set up Kinesis stream (see earlier section) within AWS. Note the name of the 
Kinesis stream and the endpoint URL corresponding to the region where the 
stream was created.
 
@@ -180,12 +177,12 @@ To run the example,


 
-bin/run-example streaming.KinesisWordCountASL [Kinesis app name] 
[Kinesis stream name] [endpoint URL]
+bin/run-example --packages 
org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] 
[endpoint URL]
 


 
-bin/run-example streaming.JavaKinesisWordCountASL [Kinesis app name] 
[Kinesis stream name] [endpoint URL]
+bin/run-example --packages 
org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] 
[endpoint URL]
 




-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17614][SQL] sparkSession.read() .jdbc(***) use the sql syntax "where 1=0" that Cassandra does not support

2016-09-29 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master f7082ac12 -> b35b0dbbf


[SPARK-17614][SQL] sparkSession.read() .jdbc(***) use the sql syntax "where 
1=0" that Cassandra does not support

## What changes were proposed in this pull request?

Use dialect's table-exists query rather than hard-coded WHERE 1=0 query

## How was this patch tested?

Existing tests.

Author: Sean Owen 

Closes #15196 from srowen/SPARK-17614.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b35b0dbb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b35b0dbb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b35b0dbb

Branch: refs/heads/master
Commit: b35b0dbbfa3dc1bdf5e2fa1e9677d06635142b22
Parents: f7082ac
Author: Sean Owen 
Authored: Thu Sep 29 08:24:34 2016 -0400
Committer: Sean Owen 
Committed: Thu Sep 29 08:24:34 2016 -0400

--
 .../sql/execution/datasources/jdbc/JDBCRDD.scala |  6 ++
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 15 ++-
 2 files changed, 16 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b35b0dbb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index a7da29f..f10615e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -58,11 +58,11 @@ object JDBCRDD extends Logging {
 val dialect = JdbcDialects.get(url)
 val conn: Connection = JdbcUtils.createConnectionFactory(url, properties)()
 try {
-  val statement = conn.prepareStatement(s"SELECT * FROM $table WHERE 1=0")
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
   try {
 val rs = statement.executeQuery()
 try {
-  return JdbcUtils.getSchema(rs, dialect)
+  JdbcUtils.getSchema(rs, dialect)
 } finally {
   rs.close()
 }
@@ -72,8 +72,6 @@ object JDBCRDD extends Logging {
 } finally {
   conn.close()
 }
-
-throw new RuntimeException("This line is unreachable.")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/b35b0dbb/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 3a6d5b7..8dd4b8f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.jdbc
 
 import java.sql.Connection
 
-import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.annotation.{DeveloperApi, Since}
 import org.apache.spark.sql.types._
 
 /**
@@ -100,6 +100,19 @@ abstract class JdbcDialect extends Serializable {
   }
 
   /**
+   * The SQL query that should be used to discover the schema of a table. It 
only needs to
+   * ensure that the result set has the same schema as the table, such as by 
calling
+   * "SELECT * ...". Dialects can override this method to return a query that 
works best in a
+   * particular database.
+   * @param table The name of the table.
+   * @return The SQL query to use for discovering the schema.
+   */
+  @Since("2.1.0")
+  def getSchemaQuery(table: String): String = {
+s"SELECT * FROM $table WHERE 1=0"
+  }
+
+  /**
* Override connection specific properties to run before a select is made.  
This is in place to
* allow dialects that need special treatment to optimize behavior.
* @param connection The connection object


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17704][ML][MLLIB] ChiSqSelector performance improvement.

2016-09-29 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master a19a1bb59 -> f7082ac12


[SPARK-17704][ML][MLLIB] ChiSqSelector performance improvement.

## What changes were proposed in this pull request?
Several performance improvement for ```ChiSqSelector```:
1, Keep ```selectedFeatures``` ordered ascendent.
```ChiSqSelectorModel.transform``` need ```selectedFeatures``` ordered to make 
prediction. We should sort it when training model rather than making 
prediction, since users usually train model once and use the model to do 
prediction multiple times.
2, When training ```fpr``` type ```ChiSqSelectorModel```, it's not necessary to 
sort the ChiSq test result by statistic.

## How was this patch tested?
Existing unit tests.

Author: Yanbo Liang 

Closes #15277 from yanboliang/spark-17704.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7082ac1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7082ac1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7082ac1

Branch: refs/heads/master
Commit: f7082ac12518ae84d6d1d4b7330a9f12cf95e7c1
Parents: a19a1bb
Author: Yanbo Liang 
Authored: Thu Sep 29 04:30:42 2016 -0700
Committer: Yanbo Liang 
Committed: Thu Sep 29 04:30:42 2016 -0700

--
 .../spark/mllib/feature/ChiSqSelector.scala | 45 +---
 project/MimaExcludes.scala  |  3 --
 2 files changed, 30 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f7082ac1/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala
index 0f7c6e8..706ce78 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala
@@ -35,12 +35,24 @@ import org.apache.spark.sql.{Row, SparkSession}
 /**
  * Chi Squared selector model.
  *
- * @param selectedFeatures list of indices to select (filter).
+ * @param selectedFeatures list of indices to select (filter). Must be ordered 
asc
  */
 @Since("1.3.0")
 class ChiSqSelectorModel @Since("1.3.0") (
   @Since("1.3.0") val selectedFeatures: Array[Int]) extends VectorTransformer 
with Saveable {
 
+  require(isSorted(selectedFeatures), "Array has to be sorted asc")
+
+  protected def isSorted(array: Array[Int]): Boolean = {
+var i = 1
+val len = array.length
+while (i < len) {
+  if (array(i) < array(i-1)) return false
+  i += 1
+}
+true
+  }
+
   /**
* Applies transformation on a vector.
*
@@ -57,22 +69,21 @@ class ChiSqSelectorModel @Since("1.3.0") (
* Preserves the order of filtered features the same as their indices are 
stored.
* Might be moved to Vector as .slice
* @param features vector
-   * @param filterIndices indices of features to filter
+   * @param filterIndices indices of features to filter, must be ordered asc
*/
   private def compress(features: Vector, filterIndices: Array[Int]): Vector = {
-val orderedIndices = filterIndices.sorted
 features match {
   case SparseVector(size, indices, values) =>
-val newSize = orderedIndices.length
+val newSize = filterIndices.length
 val newValues = new ArrayBuilder.ofDouble
 val newIndices = new ArrayBuilder.ofInt
 var i = 0
 var j = 0
 var indicesIdx = 0
 var filterIndicesIdx = 0
-while (i < indices.length && j < orderedIndices.length) {
+while (i < indices.length && j < filterIndices.length) {
   indicesIdx = indices(i)
-  filterIndicesIdx = orderedIndices(j)
+  filterIndicesIdx = filterIndices(j)
   if (indicesIdx == filterIndicesIdx) {
 newIndices += j
 newValues += values(i)
@@ -90,7 +101,7 @@ class ChiSqSelectorModel @Since("1.3.0") (
 Vectors.sparse(newSize, newIndices.result(), newValues.result())
   case DenseVector(values) =>
 val values = features.toArray
-Vectors.dense(orderedIndices.map(i => values(i)))
+Vectors.dense(filterIndices.map(i => values(i)))
   case other =>
 throw new UnsupportedOperationException(
   s"Only sparse and dense vectors are supported but got 
${other.getClass}.")
@@ -220,18 +231,22 @@ class ChiSqSelector @Since("2.1.0") () extends 
Serializable {
   @Since("1.3.0")
   def fit(data: RDD[LabeledPoint]): ChiSqSelectorModel = {
 val chiSqTestResult = Statistics.chiSqTest(data)
-  .zipWithIndex.sortBy { case (res, _) => -res.statistic }
 val features = 

spark git commit: [SPARK-16356][FOLLOW-UP][ML] Enforce ML test of exception for local/distributed Dataset.

2016-09-29 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master 37eb9184f -> a19a1bb59


[SPARK-16356][FOLLOW-UP][ML] Enforce ML test of exception for local/distributed 
Dataset.

## What changes were proposed in this pull request?
#14035 added ```testImplicits``` to ML unit tests and promoted ```toDF()```, 
but left one minor issue at ```VectorIndexerSuite```. If we create the 
DataFrame by ```Seq(...).toDF()```, it will throw different error/exception 
compared with ```sc.parallelize(Seq(...)).toDF()``` for one of the test cases.
After in-depth study, I found it was caused by different behavior of local and 
distributed Dataset if the UDF failed at ```assert```. If the data is local 
Dataset, it throws ```AssertionError``` directly; If the data is distributed 
Dataset, it throws ```SparkException``` which is the wrapper of 
```AssertionError```. I think we should enforce this test to cover both case.

## How was this patch tested?
Unit test.

Author: Yanbo Liang 

Closes #15261 from yanboliang/spark-16356.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a19a1bb5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a19a1bb5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a19a1bb5

Branch: refs/heads/master
Commit: a19a1bb59411177caaf99581e89098826b7d0c7b
Parents: 37eb918
Author: Yanbo Liang 
Authored: Thu Sep 29 00:54:26 2016 -0700
Committer: Yanbo Liang 
Committed: Thu Sep 29 00:54:26 2016 -0700

--
 .../apache/spark/ml/feature/VectorIndexerSuite.scala   | 13 +
 1 file changed, 9 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a19a1bb5/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala
index 4da1b13..b28ce2a 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala
@@ -88,9 +88,7 @@ class VectorIndexerSuite extends SparkFunSuite with 
MLlibTestSparkContext
 
 densePoints1 = densePoints1Seq.map(FeatureData).toDF()
 sparsePoints1 = sparsePoints1Seq.map(FeatureData).toDF()
-// TODO: If we directly use `toDF` without parallelize, the test in
-// "Throws error when given RDDs with different size vectors" is failed 
for an unknown reason.
-densePoints2 = sc.parallelize(densePoints2Seq, 2).map(FeatureData).toDF()
+densePoints2 = densePoints2Seq.map(FeatureData).toDF()
 sparsePoints2 = sparsePoints2Seq.map(FeatureData).toDF()
 badPoints = badPointsSeq.map(FeatureData).toDF()
   }
@@ -121,10 +119,17 @@ class VectorIndexerSuite extends SparkFunSuite with 
MLlibTestSparkContext
 
 model.transform(densePoints1) // should work
 model.transform(sparsePoints1) // should work
-intercept[SparkException] {
+// If the data is local Dataset, it throws AssertionError directly.
+intercept[AssertionError] {
   model.transform(densePoints2).collect()
   logInfo("Did not throw error when fit, transform were called on vectors 
of different lengths")
 }
+// If the data is distributed Dataset, it throws SparkException
+// which is the wrapper of AssertionError.
+intercept[SparkException] {
+  model.transform(densePoints2.repartition(2)).collect()
+  logInfo("Did not throw error when fit, transform were called on vectors 
of different lengths")
+}
 intercept[SparkException] {
   vectorIndexer.fit(badPoints)
   logInfo("Did not throw error when fitting vectors of different lengths 
in same RDD.")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org