spark git commit: [SPARK-18774][CORE][SQL] Ignore non-existing files when ignoreCorruptFiles is enabled

2016-12-07 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 330fda8aa -> b47b892e4


[SPARK-18774][CORE][SQL] Ignore non-existing files when ignoreCorruptFiles is 
enabled

## What changes were proposed in this pull request?

When `ignoreCorruptFiles` is enabled, it's better to also ignore non-existing 
files.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #16203 from zsxwing/ignore-file-not-found.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b47b892e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b47b892e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b47b892e

Branch: refs/heads/master
Commit: b47b892e4579b7b06b4b2837ee4b614e517789f9
Parents: 330fda8
Author: Shixiong Zhu 
Authored: Wed Dec 7 22:37:04 2016 -0800
Committer: Reynold Xin 
Committed: Wed Dec 7 22:37:04 2016 -0800

--
 .../apache/spark/internal/config/package.scala   |  3 ++-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala   | 14 +++---
 .../org/apache/spark/rdd/NewHadoopRDD.scala  | 19 +++
 .../sql/execution/datasources/FileScanRDD.scala  |  3 +++
 .../org/apache/spark/sql/internal/SQLConf.scala  |  3 ++-
 5 files changed, 33 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b47b892e/core/src/main/scala/org/apache/spark/internal/config/package.scala
--
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index a69a2b5..78aed4f 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -203,7 +203,8 @@ package object config {
 
   private[spark] val IGNORE_CORRUPT_FILES = 
ConfigBuilder("spark.files.ignoreCorruptFiles")
 .doc("Whether to ignore corrupt files. If true, the Spark jobs will 
continue to run when " +
-  "encountering corrupt files and contents that have been read will still 
be returned.")
+  "encountering corrupted or non-existing files and contents that have 
been read will still " +
+  "be returned.")
 .booleanConf
 .createWithDefault(false)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b47b892e/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 6e87233..a83e139 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -248,12 +248,20 @@ class HadoopRDD[K, V](
   HadoopRDD.addLocalConfiguration(
 new SimpleDateFormat("MMddHHmmss", Locale.US).format(createTime),
 context.stageId, theSplit.index, context.attemptNumber, jobConf)
-  reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, 
Reporter.NULL)
 
+  reader =
+try {
+  inputFormat.getRecordReader(split.inputSplit.value, jobConf, 
Reporter.NULL)
+} catch {
+  case e: IOException if ignoreCorruptFiles =>
+logWarning(s"Skipped the rest content in the corrupted file: 
${split.inputSplit}", e)
+finished = true
+null
+}
   // Register an on-task-completion callback to close the input stream.
   context.addTaskCompletionListener{ context => closeIfNeeded() }
-  private val key: K = reader.createKey()
-  private val value: V = reader.createValue()
+  private val key: K = if (reader == null) null.asInstanceOf[K] else 
reader.createKey()
+  private val value: V = if (reader == null) null.asInstanceOf[V] else 
reader.createValue()
 
   override def getNext(): (K, V) = {
 try {

http://git-wip-us.apache.org/repos/asf/spark/blob/b47b892e/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index e805192..733e85f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -174,14 +174,25 @@ class NewHadoopRDD[K, V](
   }
   private val attemptId = new TaskAttemptID(jobTrackerId, id, 
TaskType.MAP, split.index, 0)
   private val hadoopAttemptContext = new TaskAttemptContextImpl(conf, 
attemptId)
-  private var reader = format.createRecordReader(
-

spark git commit: Close stale pull requests.

2016-12-07 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 97255497d -> 330fda8aa


Close stale pull requests.

Closes #15689
Closes #14640
Closes #15917
Closes #16188
Closes #16206


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/330fda8a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/330fda8a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/330fda8a

Branch: refs/heads/master
Commit: 330fda8aa289e0142e174ed6f03b8fa28d08470f
Parents: 9725549
Author: Reynold Xin 
Authored: Wed Dec 7 22:29:57 2016 -0800
Committer: Reynold Xin 
Committed: Wed Dec 7 22:29:57 2016 -0800

--

--



-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[1/2] spark git commit: Preparing Spark release v2.1.0-rc2

2016-12-07 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 1c3f1da82 -> 48aa6775d


Preparing Spark release v2.1.0-rc2


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08071749
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08071749
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08071749

Branch: refs/heads/branch-2.1
Commit: 080717497365b83bc202ab16812ced93eb1ea7bd
Parents: 1c3f1da
Author: Patrick Wendell 
Authored: Wed Dec 7 22:29:49 2016 -0800
Committer: Patrick Wendell 
Committed: Wed Dec 7 22:29:49 2016 -0800

--
 R/pkg/DESCRIPTION | 2 +-
 assembly/pom.xml  | 2 +-
 common/network-common/pom.xml | 2 +-
 common/network-shuffle/pom.xml| 2 +-
 common/network-yarn/pom.xml   | 2 +-
 common/sketch/pom.xml | 2 +-
 common/tags/pom.xml   | 2 +-
 common/unsafe/pom.xml | 2 +-
 core/pom.xml  | 2 +-
 docs/_config.yml  | 4 ++--
 examples/pom.xml  | 2 +-
 external/docker-integration-tests/pom.xml | 2 +-
 external/flume-assembly/pom.xml   | 2 +-
 external/flume-sink/pom.xml   | 2 +-
 external/flume/pom.xml| 2 +-
 external/java8-tests/pom.xml  | 2 +-
 external/kafka-0-10-assembly/pom.xml  | 2 +-
 external/kafka-0-10-sql/pom.xml   | 2 +-
 external/kafka-0-10/pom.xml   | 2 +-
 external/kafka-0-8-assembly/pom.xml   | 2 +-
 external/kafka-0-8/pom.xml| 2 +-
 external/kinesis-asl-assembly/pom.xml | 2 +-
 external/kinesis-asl/pom.xml  | 2 +-
 external/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml| 2 +-
 launcher/pom.xml  | 2 +-
 mesos/pom.xml | 2 +-
 mllib-local/pom.xml   | 2 +-
 mllib/pom.xml | 2 +-
 pom.xml   | 2 +-
 python/pyspark/version.py | 2 +-
 repl/pom.xml  | 2 +-
 sql/catalyst/pom.xml  | 2 +-
 sql/core/pom.xml  | 2 +-
 sql/hive-thriftserver/pom.xml | 2 +-
 sql/hive/pom.xml  | 2 +-
 streaming/pom.xml | 2 +-
 tools/pom.xml | 2 +-
 yarn/pom.xml  | 2 +-
 39 files changed, 40 insertions(+), 40 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/08071749/R/pkg/DESCRIPTION
--
diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index 46fb178..981ae12 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -1,7 +1,7 @@
 Package: SparkR
 Type: Package
 Title: R Frontend for Apache Spark
-Version: 2.1.1
+Version: 2.1.0
 Date: 2016-11-06
 Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
 email = "shiva...@cs.berkeley.edu"),

http://git-wip-us.apache.org/repos/asf/spark/blob/08071749/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 29522fd..aebfd12 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.1-SNAPSHOT
+2.1.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/08071749/common/network-common/pom.xml
--
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index 85644c4..67d78d5 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.1-SNAPSHOT
+2.1.0
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/08071749/common/network-shuffle/pom.xml
--
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index e15ede9..9379097 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.1-SNAPSHOT
+2.1.0
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/08071749/common/network-yarn/pom.xml
--
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index c93a355..53cb8dd 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ 

[2/2] spark git commit: Preparing development version 2.1.1-SNAPSHOT

2016-12-07 Thread pwendell
Preparing development version 2.1.1-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/48aa6775
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/48aa6775
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/48aa6775

Branch: refs/heads/branch-2.1
Commit: 48aa6775d6b54ccecdbe2287ae75d99c00b02d18
Parents: 0807174
Author: Patrick Wendell 
Authored: Wed Dec 7 22:29:55 2016 -0800
Committer: Patrick Wendell 
Committed: Wed Dec 7 22:29:55 2016 -0800

--
 R/pkg/DESCRIPTION | 2 +-
 assembly/pom.xml  | 2 +-
 common/network-common/pom.xml | 2 +-
 common/network-shuffle/pom.xml| 2 +-
 common/network-yarn/pom.xml   | 2 +-
 common/sketch/pom.xml | 2 +-
 common/tags/pom.xml   | 2 +-
 common/unsafe/pom.xml | 2 +-
 core/pom.xml  | 2 +-
 docs/_config.yml  | 4 ++--
 examples/pom.xml  | 2 +-
 external/docker-integration-tests/pom.xml | 2 +-
 external/flume-assembly/pom.xml   | 2 +-
 external/flume-sink/pom.xml   | 2 +-
 external/flume/pom.xml| 2 +-
 external/java8-tests/pom.xml  | 2 +-
 external/kafka-0-10-assembly/pom.xml  | 2 +-
 external/kafka-0-10-sql/pom.xml   | 2 +-
 external/kafka-0-10/pom.xml   | 2 +-
 external/kafka-0-8-assembly/pom.xml   | 2 +-
 external/kafka-0-8/pom.xml| 2 +-
 external/kinesis-asl-assembly/pom.xml | 2 +-
 external/kinesis-asl/pom.xml  | 2 +-
 external/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml| 2 +-
 launcher/pom.xml  | 2 +-
 mesos/pom.xml | 2 +-
 mllib-local/pom.xml   | 2 +-
 mllib/pom.xml | 2 +-
 pom.xml   | 2 +-
 python/pyspark/version.py | 2 +-
 repl/pom.xml  | 2 +-
 sql/catalyst/pom.xml  | 2 +-
 sql/core/pom.xml  | 2 +-
 sql/hive-thriftserver/pom.xml | 2 +-
 sql/hive/pom.xml  | 2 +-
 streaming/pom.xml | 2 +-
 tools/pom.xml | 2 +-
 yarn/pom.xml  | 2 +-
 39 files changed, 40 insertions(+), 40 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/48aa6775/R/pkg/DESCRIPTION
--
diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index 981ae12..46fb178 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -1,7 +1,7 @@
 Package: SparkR
 Type: Package
 Title: R Frontend for Apache Spark
-Version: 2.1.0
+Version: 2.1.1
 Date: 2016-11-06
 Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
 email = "shiva...@cs.berkeley.edu"),

http://git-wip-us.apache.org/repos/asf/spark/blob/48aa6775/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index aebfd12..29522fd 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.0
+2.1.1-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/48aa6775/common/network-common/pom.xml
--
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index 67d78d5..85644c4 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.0
+2.1.1-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/48aa6775/common/network-shuffle/pom.xml
--
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index 9379097..e15ede9 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.0
+2.1.1-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/48aa6775/common/network-yarn/pom.xml
--
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index 53cb8dd..c93a355 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.1.0
+

[spark] Git Push Summary

2016-12-07 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/v2.1.0-rc2 [created] 080717497

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18326][SPARKR][ML] Review SparkR ML wrappers API for 2.1

2016-12-07 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master 82253617f -> 97255497d


[SPARK-18326][SPARKR][ML] Review SparkR ML wrappers API for 2.1

## What changes were proposed in this pull request?
Reviewing SparkR ML wrappers API for 2.1 release, mainly two issues:
* Remove ```probabilityCol``` from the argument list of ```spark.logit``` and 
```spark.randomForest```. Since it was used when making prediction and should 
be an argument of ```predict```, and we will work on this at 
[SPARK-18618](https://issues.apache.org/jira/browse/SPARK-18618) in the next 
release cycle.
* Fix ```spark.als``` params to make it consistent with MLlib.

## How was this patch tested?
Existing tests.

Author: Yanbo Liang 

Closes #16169 from yanboliang/spark-18326.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97255497
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97255497
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97255497

Branch: refs/heads/master
Commit: 97255497d885f0f8ccfc808e868bc8aa5e4d1063
Parents: 8225361
Author: Yanbo Liang 
Authored: Wed Dec 7 20:23:28 2016 -0800
Committer: Yanbo Liang 
Committed: Wed Dec 7 20:23:28 2016 -0800

--
 R/pkg/R/mllib.R | 23 +---
 R/pkg/inst/tests/testthat/test_mllib.R  |  4 ++--
 .../spark/ml/r/LogisticRegressionWrapper.scala  |  4 +---
 .../r/RandomForestClassificationWrapper.scala   |  2 --
 4 files changed, 13 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/97255497/R/pkg/R/mllib.R
--
diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R
index 074e9cb..632e4ad 100644
--- a/R/pkg/R/mllib.R
+++ b/R/pkg/R/mllib.R
@@ -733,7 +733,6 @@ setMethod("predict", signature(object = "KMeansModel"),
 #'  excepting that at most one value may be 0. The class with 
largest value p/t is predicted, where p
 #'  is the original probability of that class and t is the 
class's threshold.
 #' @param weightCol The weight column name.
-#' @param probabilityCol column name for predicted class conditional 
probabilities.
 #' @param ... additional arguments passed to the method.
 #' @return \code{spark.logit} returns a fitted logistic regression model
 #' @rdname spark.logit
@@ -772,7 +771,7 @@ setMethod("predict", signature(object = "KMeansModel"),
 setMethod("spark.logit", signature(data = "SparkDataFrame", formula = 
"formula"),
   function(data, formula, regParam = 0.0, elasticNetParam = 0.0, 
maxIter = 100,
tol = 1E-6, family = "auto", standardization = TRUE,
-   thresholds = 0.5, weightCol = NULL, probabilityCol = 
"probability") {
+   thresholds = 0.5, weightCol = NULL) {
 formula <- paste(deparse(formula), collapse = "")
 
 if (is.null(weightCol)) {
@@ -784,7 +783,7 @@ setMethod("spark.logit", signature(data = "SparkDataFrame", 
formula = "formula")
 as.numeric(elasticNetParam), 
as.integer(maxIter),
 as.numeric(tol), as.character(family),
 as.logical(standardization), 
as.array(thresholds),
-as.character(weightCol), 
as.character(probabilityCol))
+as.character(weightCol))
 new("LogisticRegressionModel", jobj = jobj)
   })
 
@@ -1425,7 +1424,7 @@ setMethod("predict", signature(object = 
"GaussianMixtureModel"),
 #' @param userCol column name for user ids. Ids must be (or can be coerced 
into) integers.
 #' @param itemCol column name for item ids. Ids must be (or can be coerced 
into) integers.
 #' @param rank rank of the matrix factorization (> 0).
-#' @param reg regularization parameter (>= 0).
+#' @param regParam regularization parameter (>= 0).
 #' @param maxIter maximum number of iterations (>= 0).
 #' @param nonnegative logical value indicating whether to apply nonnegativity 
constraints.
 #' @param implicitPrefs logical value indicating whether to use implicit 
preference.
@@ -1464,21 +1463,21 @@ setMethod("predict", signature(object = 
"GaussianMixtureModel"),
 #'
 #' # set other arguments
 #' modelS <- spark.als(df, "rating", "user", "item", rank = 20,
-#' reg = 0.1, nonnegative = TRUE)
+#' regParam = 0.1, nonnegative = TRUE)
 #' statsS <- summary(modelS)
 #' }
 #' @note spark.als since 2.1.0
 setMethod("spark.als", signature(data = "SparkDataFrame"),
   function(data, ratingCol = "rating", userCol = "user", itemCol = 
"item",
-   rank = 10, reg = 0.1, maxIter = 10, nonnegative = FALSE,
+  

spark git commit: [SPARK-18326][SPARKR][ML] Review SparkR ML wrappers API for 2.1

2016-12-07 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 ab865cfd9 -> 1c3f1da82


[SPARK-18326][SPARKR][ML] Review SparkR ML wrappers API for 2.1

## What changes were proposed in this pull request?
Reviewing SparkR ML wrappers API for 2.1 release, mainly two issues:
* Remove ```probabilityCol``` from the argument list of ```spark.logit``` and 
```spark.randomForest```. Since it was used when making prediction and should 
be an argument of ```predict```, and we will work on this at 
[SPARK-18618](https://issues.apache.org/jira/browse/SPARK-18618) in the next 
release cycle.
* Fix ```spark.als``` params to make it consistent with MLlib.

## How was this patch tested?
Existing tests.

Author: Yanbo Liang 

Closes #16169 from yanboliang/spark-18326.

(cherry picked from commit 97255497d885f0f8ccfc808e868bc8aa5e4d1063)
Signed-off-by: Yanbo Liang 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c3f1da8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c3f1da8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c3f1da8

Branch: refs/heads/branch-2.1
Commit: 1c3f1da82356426b6b550fee67e66dc82eaf1c85
Parents: ab865cf
Author: Yanbo Liang 
Authored: Wed Dec 7 20:23:28 2016 -0800
Committer: Yanbo Liang 
Committed: Wed Dec 7 20:23:45 2016 -0800

--
 R/pkg/R/mllib.R | 23 +---
 R/pkg/inst/tests/testthat/test_mllib.R  |  4 ++--
 .../spark/ml/r/LogisticRegressionWrapper.scala  |  4 +---
 .../r/RandomForestClassificationWrapper.scala   |  2 --
 4 files changed, 13 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1c3f1da8/R/pkg/R/mllib.R
--
diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R
index 074e9cb..632e4ad 100644
--- a/R/pkg/R/mllib.R
+++ b/R/pkg/R/mllib.R
@@ -733,7 +733,6 @@ setMethod("predict", signature(object = "KMeansModel"),
 #'  excepting that at most one value may be 0. The class with 
largest value p/t is predicted, where p
 #'  is the original probability of that class and t is the 
class's threshold.
 #' @param weightCol The weight column name.
-#' @param probabilityCol column name for predicted class conditional 
probabilities.
 #' @param ... additional arguments passed to the method.
 #' @return \code{spark.logit} returns a fitted logistic regression model
 #' @rdname spark.logit
@@ -772,7 +771,7 @@ setMethod("predict", signature(object = "KMeansModel"),
 setMethod("spark.logit", signature(data = "SparkDataFrame", formula = 
"formula"),
   function(data, formula, regParam = 0.0, elasticNetParam = 0.0, 
maxIter = 100,
tol = 1E-6, family = "auto", standardization = TRUE,
-   thresholds = 0.5, weightCol = NULL, probabilityCol = 
"probability") {
+   thresholds = 0.5, weightCol = NULL) {
 formula <- paste(deparse(formula), collapse = "")
 
 if (is.null(weightCol)) {
@@ -784,7 +783,7 @@ setMethod("spark.logit", signature(data = "SparkDataFrame", 
formula = "formula")
 as.numeric(elasticNetParam), 
as.integer(maxIter),
 as.numeric(tol), as.character(family),
 as.logical(standardization), 
as.array(thresholds),
-as.character(weightCol), 
as.character(probabilityCol))
+as.character(weightCol))
 new("LogisticRegressionModel", jobj = jobj)
   })
 
@@ -1425,7 +1424,7 @@ setMethod("predict", signature(object = 
"GaussianMixtureModel"),
 #' @param userCol column name for user ids. Ids must be (or can be coerced 
into) integers.
 #' @param itemCol column name for item ids. Ids must be (or can be coerced 
into) integers.
 #' @param rank rank of the matrix factorization (> 0).
-#' @param reg regularization parameter (>= 0).
+#' @param regParam regularization parameter (>= 0).
 #' @param maxIter maximum number of iterations (>= 0).
 #' @param nonnegative logical value indicating whether to apply nonnegativity 
constraints.
 #' @param implicitPrefs logical value indicating whether to use implicit 
preference.
@@ -1464,21 +1463,21 @@ setMethod("predict", signature(object = 
"GaussianMixtureModel"),
 #'
 #' # set other arguments
 #' modelS <- spark.als(df, "rating", "user", "item", rank = 20,
-#' reg = 0.1, nonnegative = TRUE)
+#' regParam = 0.1, nonnegative = TRUE)
 #' statsS <- summary(modelS)
 #' }
 #' @note spark.als since 2.1.0
 setMethod("spark.als", signature(data = "SparkDataFrame"),
   function(data, ratingCol = "rating", 

spark git commit: [SPARK-18705][ML][DOC] Update user guide to reflect one pass solver for L1 and elastic-net

2016-12-07 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 617ce3ba7 -> ab865cfd9


[SPARK-18705][ML][DOC] Update user guide to reflect one pass solver for L1 and 
elastic-net

## What changes were proposed in this pull request?

WeightedLeastSquares now supports L1 and elastic net penalties and has an 
additional solver option: QuasiNewton. The docs are updated to reflect this 
change.

## How was this patch tested?

Docs only. Generated documentation to make sure Latex looks ok.

Author: sethah 

Closes #16139 from sethah/SPARK-18705.

(cherry picked from commit 82253617f5b3cdbd418c48f94e748651ee80077e)
Signed-off-by: Yanbo Liang 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab865cfd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab865cfd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab865cfd

Branch: refs/heads/branch-2.1
Commit: ab865cfd9dc87154e7d4fc5d09168868c88db6b0
Parents: 617ce3b
Author: sethah 
Authored: Wed Dec 7 19:41:32 2016 -0800
Committer: Yanbo Liang 
Committed: Wed Dec 7 19:42:06 2016 -0800

--
 docs/ml-advanced.md | 24 
 1 file changed, 16 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ab865cfd/docs/ml-advanced.md
--
diff --git a/docs/ml-advanced.md b/docs/ml-advanced.md
index 12a03d3..2747f2d 100644
--- a/docs/ml-advanced.md
+++ b/docs/ml-advanced.md
@@ -59,17 +59,25 @@ Given $n$ weighted observations $(w_i, a_i, b_i)$:
 
 The number of features for each observation is $m$. We use the following 
weighted least squares formulation:
 `\[   
-minimize_{x}\frac{1}{2} \sum_{i=1}^n \frac{w_i(a_i^T x -b_i)^2}{\sum_{k=1}^n 
w_k} + \frac{1}{2}\frac{\lambda}{\delta}\sum_{j=1}^m(\sigma_{j} x_{j})^2
+\min_{\mathbf{x}}\frac{1}{2} \sum_{i=1}^n \frac{w_i(\mathbf{a}_i^T \mathbf{x} 
-b_i)^2}{\sum_{k=1}^n w_k} + \frac{\lambda}{\delta}\left[\frac{1}{2}(1 - 
\alpha)\sum_{j=1}^m(\sigma_j x_j)^2 + \alpha\sum_{j=1}^m |\sigma_j x_j|\right]
 \]`
-where $\lambda$ is the regularization parameter, $\delta$ is the population 
standard deviation of the label
+where $\lambda$ is the regularization parameter, $\alpha$ is the elastic-net 
mixing parameter, $\delta$ is the population standard deviation of the label
 and $\sigma_j$ is the population standard deviation of the j-th feature column.
 
-This objective function has an analytic solution and it requires only one pass 
over the data to collect necessary statistics to solve.
-Unlike the original dataset which can only be stored in a distributed system,
-these statistics can be loaded into memory on a single machine if the number 
of features is relatively small, and then we can solve the objective function 
through Cholesky factorization on the driver.
+This objective function requires only one pass over the data to collect the 
statistics necessary to solve it. For an
+$n \times m$ data matrix, these statistics require only $O(m^2)$ storage and 
so can be stored on a single machine when $m$ (the number of features) is
+relatively small. We can then solve the normal equations on a single machine 
using local methods like direct Cholesky factorization or iterative 
optimization programs.
 
-WeightedLeastSquares only supports L2 regularization and provides options to 
enable or disable regularization and standardization.
-In order to make the normal equation approach efficient, WeightedLeastSquares 
requires that the number of features be no more than 4096. For larger problems, 
use L-BFGS instead.
+Spark MLlib currently supports two types of solvers for the normal equations: 
Cholesky factorization and Quasi-Newton methods (L-BFGS/OWL-QN). Cholesky 
factorization
+depends on a positive definite covariance matrix (i.e. columns of the data 
matrix must be linearly independent) and will fail if this condition is 
violated. Quasi-Newton methods
+are still capable of providing a reasonable solution even when the covariance 
matrix is not positive definite, so the normal equation solver can also fall 
back to 
+Quasi-Newton methods in this case. This fallback is currently always enabled 
for the `LinearRegression` and `GeneralizedLinearRegression` estimators.
+
+`WeightedLeastSquares` supports L1, L2, and elastic-net regularization and 
provides options to enable or disable regularization and standardization. In 
the case where no 
+L1 regularization is applied (i.e. $\alpha = 0$), there exists an analytical 
solution and either Cholesky or Quasi-Newton solver may be used. When $\alpha > 
0$ no analytical 
+solution exists and we instead use the Quasi-Newton solver to find the 
coefficients iteratively. 
+
+In order 

spark git commit: [SPARK-18705][ML][DOC] Update user guide to reflect one pass solver for L1 and elastic-net

2016-12-07 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master 9ab725eab -> 82253617f


[SPARK-18705][ML][DOC] Update user guide to reflect one pass solver for L1 and 
elastic-net

## What changes were proposed in this pull request?

WeightedLeastSquares now supports L1 and elastic net penalties and has an 
additional solver option: QuasiNewton. The docs are updated to reflect this 
change.

## How was this patch tested?

Docs only. Generated documentation to make sure Latex looks ok.

Author: sethah 

Closes #16139 from sethah/SPARK-18705.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82253617
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82253617
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82253617

Branch: refs/heads/master
Commit: 82253617f5b3cdbd418c48f94e748651ee80077e
Parents: 9ab725e
Author: sethah 
Authored: Wed Dec 7 19:41:32 2016 -0800
Committer: Yanbo Liang 
Committed: Wed Dec 7 19:41:32 2016 -0800

--
 docs/ml-advanced.md | 24 
 1 file changed, 16 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/82253617/docs/ml-advanced.md
--
diff --git a/docs/ml-advanced.md b/docs/ml-advanced.md
index 12a03d3..2747f2d 100644
--- a/docs/ml-advanced.md
+++ b/docs/ml-advanced.md
@@ -59,17 +59,25 @@ Given $n$ weighted observations $(w_i, a_i, b_i)$:
 
 The number of features for each observation is $m$. We use the following 
weighted least squares formulation:
 `\[   
-minimize_{x}\frac{1}{2} \sum_{i=1}^n \frac{w_i(a_i^T x -b_i)^2}{\sum_{k=1}^n 
w_k} + \frac{1}{2}\frac{\lambda}{\delta}\sum_{j=1}^m(\sigma_{j} x_{j})^2
+\min_{\mathbf{x}}\frac{1}{2} \sum_{i=1}^n \frac{w_i(\mathbf{a}_i^T \mathbf{x} 
-b_i)^2}{\sum_{k=1}^n w_k} + \frac{\lambda}{\delta}\left[\frac{1}{2}(1 - 
\alpha)\sum_{j=1}^m(\sigma_j x_j)^2 + \alpha\sum_{j=1}^m |\sigma_j x_j|\right]
 \]`
-where $\lambda$ is the regularization parameter, $\delta$ is the population 
standard deviation of the label
+where $\lambda$ is the regularization parameter, $\alpha$ is the elastic-net 
mixing parameter, $\delta$ is the population standard deviation of the label
 and $\sigma_j$ is the population standard deviation of the j-th feature column.
 
-This objective function has an analytic solution and it requires only one pass 
over the data to collect necessary statistics to solve.
-Unlike the original dataset which can only be stored in a distributed system,
-these statistics can be loaded into memory on a single machine if the number 
of features is relatively small, and then we can solve the objective function 
through Cholesky factorization on the driver.
+This objective function requires only one pass over the data to collect the 
statistics necessary to solve it. For an
+$n \times m$ data matrix, these statistics require only $O(m^2)$ storage and 
so can be stored on a single machine when $m$ (the number of features) is
+relatively small. We can then solve the normal equations on a single machine 
using local methods like direct Cholesky factorization or iterative 
optimization programs.
 
-WeightedLeastSquares only supports L2 regularization and provides options to 
enable or disable regularization and standardization.
-In order to make the normal equation approach efficient, WeightedLeastSquares 
requires that the number of features be no more than 4096. For larger problems, 
use L-BFGS instead.
+Spark MLlib currently supports two types of solvers for the normal equations: 
Cholesky factorization and Quasi-Newton methods (L-BFGS/OWL-QN). Cholesky 
factorization
+depends on a positive definite covariance matrix (i.e. columns of the data 
matrix must be linearly independent) and will fail if this condition is 
violated. Quasi-Newton methods
+are still capable of providing a reasonable solution even when the covariance 
matrix is not positive definite, so the normal equation solver can also fall 
back to 
+Quasi-Newton methods in this case. This fallback is currently always enabled 
for the `LinearRegression` and `GeneralizedLinearRegression` estimators.
+
+`WeightedLeastSquares` supports L1, L2, and elastic-net regularization and 
provides options to enable or disable regularization and standardization. In 
the case where no 
+L1 regularization is applied (i.e. $\alpha = 0$), there exists an analytical 
solution and either Cholesky or Quasi-Newton solver may be used. When $\alpha > 
0$ no analytical 
+solution exists and we instead use the Quasi-Newton solver to find the 
coefficients iteratively. 
+
+In order to make the normal equation approach efficient, 
`WeightedLeastSquares` requires that the number of features be no more than 

spark git commit: [SPARK-18758][SS] StreamingQueryListener events from a StreamingQuery should be sent only to the listeners in the same session as the query

2016-12-07 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 839c2eb97 -> 617ce3ba7


[SPARK-18758][SS] StreamingQueryListener events from a StreamingQuery should be 
sent only to the listeners in the same session as the query

## What changes were proposed in this pull request?

Listeners added with `sparkSession.streams.addListener(l)` are added to a 
SparkSession. So events only from queries in the same session as a listener 
should be posted to the listener. Currently, all the events gets rerouted 
through the Spark's main listener bus, that is,
- StreamingQuery posts event to StreamingQueryListenerBus. Only the queries 
associated with the same session as the bus posts events to it.
- StreamingQueryListenerBus posts event to Spark's main LiveListenerBus as a 
SparkEvent.
- StreamingQueryListenerBus also subscribes to LiveListenerBus events thus 
getting back the posted event in a different thread.
- The received is posted to the registered listeners.

The problem is that *all StreamingQueryListenerBuses in all sessions* gets the 
events and posts them to their listeners. This is wrong.

In this PR, I solve it by making StreamingQueryListenerBus track active queries 
(by their runIds) when a query posts the QueryStarted event to the bus. This 
allows the rerouted events to be filtered using the tracked queries.

Note that this list needs to be maintained separately
from the `StreamingQueryManager.activeQueries` because a terminated query is 
cleared from
`StreamingQueryManager.activeQueries` as soon as it is stopped, but the this 
ListenerBus must
clear a query only after the termination event of that query has been posted 
lazily, much after the query has been terminated.

Credit goes to zsxwing for coming up with the initial idea.

## How was this patch tested?
Updated test harness code to use the correct session, and added new unit test.

Author: Tathagata Das 

Closes #16186 from tdas/SPARK-18758.

(cherry picked from commit 9ab725eabbb4ad515a663b395bd2f91bb5853a23)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/617ce3ba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/617ce3ba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/617ce3ba

Branch: refs/heads/branch-2.1
Commit: 617ce3ba765e13e354eaa9b7e13851aef40c9ceb
Parents: 839c2eb
Author: Tathagata Das 
Authored: Wed Dec 7 19:23:27 2016 -0800
Committer: Tathagata Das 
Committed: Wed Dec 7 19:23:41 2016 -0800

--
 .../streaming/StreamingQueryListenerBus.scala   | 54 +--
 .../spark/sql/execution/streaming/memory.scala  |  4 +-
 .../apache/spark/sql/streaming/StreamTest.scala | 15 +++--
 .../streaming/StreamingQueryListenerSuite.scala | 69 ++--
 4 files changed, 119 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/617ce3ba/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
index 22e4c63..a2153d2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
@@ -17,6 +17,10 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.util.UUID
+
+import scala.collection.mutable
+
 import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, 
SparkListenerEvent}
 import org.apache.spark.sql.streaming.StreamingQueryListener
 import org.apache.spark.util.ListenerBus
@@ -25,7 +29,11 @@ import org.apache.spark.util.ListenerBus
  * A bus to forward events to [[StreamingQueryListener]]s. This one will send 
received
  * [[StreamingQueryListener.Event]]s to the Spark listener bus. It also 
registers itself with
  * Spark listener bus, so that it can receive 
[[StreamingQueryListener.Event]]s and dispatch them
- * to StreamingQueryListener.
+ * to StreamingQueryListeners.
+ *
+ * Note that each bus and its registered listeners are associated with a 
single SparkSession
+ * and StreamingQueryManager. So this bus will dispatch events to registered 
listeners for only
+ * those queries that were started in the associated SparkSession.
  */
 class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
   extends SparkListener with ListenerBus[StreamingQueryListener, 
StreamingQueryListener.Event] {
@@ -35,12 +43,30 @@ class 

spark git commit: [SPARK-18758][SS] StreamingQueryListener events from a StreamingQuery should be sent only to the listeners in the same session as the query

2016-12-07 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master aad11209e -> 9ab725eab


[SPARK-18758][SS] StreamingQueryListener events from a StreamingQuery should be 
sent only to the listeners in the same session as the query

## What changes were proposed in this pull request?

Listeners added with `sparkSession.streams.addListener(l)` are added to a 
SparkSession. So events only from queries in the same session as a listener 
should be posted to the listener. Currently, all the events gets rerouted 
through the Spark's main listener bus, that is,
- StreamingQuery posts event to StreamingQueryListenerBus. Only the queries 
associated with the same session as the bus posts events to it.
- StreamingQueryListenerBus posts event to Spark's main LiveListenerBus as a 
SparkEvent.
- StreamingQueryListenerBus also subscribes to LiveListenerBus events thus 
getting back the posted event in a different thread.
- The received is posted to the registered listeners.

The problem is that *all StreamingQueryListenerBuses in all sessions* gets the 
events and posts them to their listeners. This is wrong.

In this PR, I solve it by making StreamingQueryListenerBus track active queries 
(by their runIds) when a query posts the QueryStarted event to the bus. This 
allows the rerouted events to be filtered using the tracked queries.

Note that this list needs to be maintained separately
from the `StreamingQueryManager.activeQueries` because a terminated query is 
cleared from
`StreamingQueryManager.activeQueries` as soon as it is stopped, but the this 
ListenerBus must
clear a query only after the termination event of that query has been posted 
lazily, much after the query has been terminated.

Credit goes to zsxwing for coming up with the initial idea.

## How was this patch tested?
Updated test harness code to use the correct session, and added new unit test.

Author: Tathagata Das 

Closes #16186 from tdas/SPARK-18758.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ab725ea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ab725ea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ab725ea

Branch: refs/heads/master
Commit: 9ab725eabbb4ad515a663b395bd2f91bb5853a23
Parents: aad1120
Author: Tathagata Das 
Authored: Wed Dec 7 19:23:27 2016 -0800
Committer: Tathagata Das 
Committed: Wed Dec 7 19:23:27 2016 -0800

--
 .../streaming/StreamingQueryListenerBus.scala   | 54 +--
 .../spark/sql/execution/streaming/memory.scala  |  4 +-
 .../apache/spark/sql/streaming/StreamTest.scala | 15 +++--
 .../streaming/StreamingQueryListenerSuite.scala | 69 ++--
 4 files changed, 119 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9ab725ea/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
index 22e4c63..a2153d2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
@@ -17,6 +17,10 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.util.UUID
+
+import scala.collection.mutable
+
 import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, 
SparkListenerEvent}
 import org.apache.spark.sql.streaming.StreamingQueryListener
 import org.apache.spark.util.ListenerBus
@@ -25,7 +29,11 @@ import org.apache.spark.util.ListenerBus
  * A bus to forward events to [[StreamingQueryListener]]s. This one will send 
received
  * [[StreamingQueryListener.Event]]s to the Spark listener bus. It also 
registers itself with
  * Spark listener bus, so that it can receive 
[[StreamingQueryListener.Event]]s and dispatch them
- * to StreamingQueryListener.
+ * to StreamingQueryListeners.
+ *
+ * Note that each bus and its registered listeners are associated with a 
single SparkSession
+ * and StreamingQueryManager. So this bus will dispatch events to registered 
listeners for only
+ * those queries that were started in the associated SparkSession.
  */
 class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
   extends SparkListener with ListenerBus[StreamingQueryListener, 
StreamingQueryListener.Event] {
@@ -35,12 +43,30 @@ class StreamingQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
   sparkListenerBus.addListener(this)
 
   /**
-   * Post a StreamingQueryListener event to the 

spark git commit: [SPARK-18633][ML][EXAMPLE] Add multiclass logistic regression summary python example and document

2016-12-07 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 1c6419718 -> 839c2eb97


[SPARK-18633][ML][EXAMPLE] Add multiclass logistic regression summary python 
example and document

## What changes were proposed in this pull request?
Logistic Regression summary is added in Python API. We need to add example and 
document for summary.

The newly added example is consistent with Scala and Java examples.

## How was this patch tested?

Manually tests: Run the example with spark-submit; copy & paste code into 
pyspark; build document and check the document.

Author: wm...@hotmail.com 

Closes #16064 from wangmiao1981/py.

(cherry picked from commit aad11209eb4db585f991ba09d08d90576f315bb4)
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/839c2eb9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/839c2eb9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/839c2eb9

Branch: refs/heads/branch-2.1
Commit: 839c2eb9723ba51baf6022fea8c29caecf7c0612
Parents: 1c64197
Author: wm...@hotmail.com 
Authored: Wed Dec 7 18:12:49 2016 -0800
Committer: Joseph K. Bradley 
Committed: Wed Dec 7 18:13:04 2016 -0800

--
 docs/ml-classification-regression.md| 10 ++-
 .../ml/logistic_regression_summary_example.py   | 68 
 2 files changed, 76 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/839c2eb9/docs/ml-classification-regression.md
--
diff --git a/docs/ml-classification-regression.md 
b/docs/ml-classification-regression.md
index 5148ad0..557a53c 100644
--- a/docs/ml-classification-regression.md
+++ b/docs/ml-classification-regression.md
@@ -114,9 +114,15 @@ Continuing the earlier example:
 {% include_example 
java/org/apache/spark/examples/ml/JavaLogisticRegressionSummaryExample.java %}
 
 
-
 
-Logistic regression model summary is not yet supported in Python.
+[`LogisticRegressionTrainingSummary`](api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegressionSummary)
+provides a summary for a
+[`LogisticRegressionModel`](api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegressionModel).
+Currently, only binary classification is supported. Support for multiclass 
model summaries will be added in the future.
+
+Continuing the earlier example:
+
+{% include_example python/ml/logistic_regression_summary_example.py %}
 
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/839c2eb9/examples/src/main/python/ml/logistic_regression_summary_example.py
--
diff --git a/examples/src/main/python/ml/logistic_regression_summary_example.py 
b/examples/src/main/python/ml/logistic_regression_summary_example.py
new file mode 100644
index 000..bd440a1
--- /dev/null
+++ b/examples/src/main/python/ml/logistic_regression_summary_example.py
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import print_function
+
+# $example on$
+from pyspark.ml.classification import LogisticRegression
+# $example off$
+from pyspark.sql import SparkSession
+
+"""
+An example demonstrating Logistic Regression Summary.
+Run with:
+  bin/spark-submit 
examples/src/main/python/ml/logistic_regression_summary_example.py
+"""
+
+if __name__ == "__main__":
+spark = SparkSession \
+.builder \
+.appName("LogisticRegressionSummary") \
+.getOrCreate()
+
+# Load training data
+training = 
spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
+
+lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
+
+# Fit the model
+lrModel = lr.fit(training)
+
+# $example on$
+# Extract the summary from the returned LogisticRegressionModel instance 
trained
+# in the earlier example
+trainingSummary = lrModel.summary
+
+# Obtain the objective per iteration
+

spark git commit: [SPARK-18633][ML][EXAMPLE] Add multiclass logistic regression summary python example and document

2016-12-07 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master bec0a9217 -> aad11209e


[SPARK-18633][ML][EXAMPLE] Add multiclass logistic regression summary python 
example and document

## What changes were proposed in this pull request?
Logistic Regression summary is added in Python API. We need to add example and 
document for summary.

The newly added example is consistent with Scala and Java examples.

## How was this patch tested?

Manually tests: Run the example with spark-submit; copy & paste code into 
pyspark; build document and check the document.

Author: wm...@hotmail.com 

Closes #16064 from wangmiao1981/py.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aad11209
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aad11209
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aad11209

Branch: refs/heads/master
Commit: aad11209eb4db585f991ba09d08d90576f315bb4
Parents: bec0a92
Author: wm...@hotmail.com 
Authored: Wed Dec 7 18:12:49 2016 -0800
Committer: Joseph K. Bradley 
Committed: Wed Dec 7 18:12:49 2016 -0800

--
 docs/ml-classification-regression.md| 10 ++-
 .../ml/logistic_regression_summary_example.py   | 68 
 2 files changed, 76 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/aad11209/docs/ml-classification-regression.md
--
diff --git a/docs/ml-classification-regression.md 
b/docs/ml-classification-regression.md
index 5759593..bb9390f 100644
--- a/docs/ml-classification-regression.md
+++ b/docs/ml-classification-regression.md
@@ -114,9 +114,15 @@ Continuing the earlier example:
 {% include_example 
java/org/apache/spark/examples/ml/JavaLogisticRegressionSummaryExample.java %}
 
 
-
 
-Logistic regression model summary is not yet supported in Python.
+[`LogisticRegressionTrainingSummary`](api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegressionSummary)
+provides a summary for a
+[`LogisticRegressionModel`](api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegressionModel).
+Currently, only binary classification is supported. Support for multiclass 
model summaries will be added in the future.
+
+Continuing the earlier example:
+
+{% include_example python/ml/logistic_regression_summary_example.py %}
 
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/aad11209/examples/src/main/python/ml/logistic_regression_summary_example.py
--
diff --git a/examples/src/main/python/ml/logistic_regression_summary_example.py 
b/examples/src/main/python/ml/logistic_regression_summary_example.py
new file mode 100644
index 000..bd440a1
--- /dev/null
+++ b/examples/src/main/python/ml/logistic_regression_summary_example.py
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import print_function
+
+# $example on$
+from pyspark.ml.classification import LogisticRegression
+# $example off$
+from pyspark.sql import SparkSession
+
+"""
+An example demonstrating Logistic Regression Summary.
+Run with:
+  bin/spark-submit 
examples/src/main/python/ml/logistic_regression_summary_example.py
+"""
+
+if __name__ == "__main__":
+spark = SparkSession \
+.builder \
+.appName("LogisticRegressionSummary") \
+.getOrCreate()
+
+# Load training data
+training = 
spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
+
+lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
+
+# Fit the model
+lrModel = lr.fit(training)
+
+# $example on$
+# Extract the summary from the returned LogisticRegressionModel instance 
trained
+# in the earlier example
+trainingSummary = lrModel.summary
+
+# Obtain the objective per iteration
+objectiveHistory = trainingSummary.objectiveHistory
+print("objectiveHistory:")
+for objective in objectiveHistory:
+

spark git commit: [SPARK-18654][SQL] Remove unreachable patterns in makeRootConverter

2016-12-07 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 70b2bf717 -> bec0a9217


[SPARK-18654][SQL] Remove unreachable patterns in makeRootConverter

## What changes were proposed in this pull request?

`makeRootConverter` is only called with a `StructType` value. By making this 
method less general we can remove pattern matches, which are never actually hit 
outside of the test suite.

## How was this patch tested?

The existing tests.

Author: Nathan Howell 

Closes #16084 from NathanHowell/SPARK-18654.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bec0a921
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bec0a921
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bec0a921

Branch: refs/heads/master
Commit: bec0a9217b896a596c0a0919888da34589fc720d
Parents: 70b2bf7
Author: Nathan Howell 
Authored: Wed Dec 7 16:52:05 2016 -0800
Committer: Reynold Xin 
Committed: Wed Dec 7 16:52:05 2016 -0800

--
 .../spark/sql/catalyst/json/JacksonParser.scala | 56 
 .../execution/datasources/json/JsonSuite.scala  |  2 +-
 2 files changed, 22 insertions(+), 36 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bec0a921/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index e476cb1..03e27ba 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -119,47 +119,33 @@ class JacksonParser(
* to a value according to a desired schema. This is a wrapper for the method
* `makeConverter()` to handle a row wrapped with an array.
*/
-  def makeRootConverter(dataType: DataType): ValueConverter = dataType match {
-case st: StructType =>
-  val elementConverter = makeConverter(st)
-  val fieldConverters = st.map(_.dataType).map(makeConverter)
-  (parser: JsonParser) => parseJsonToken(parser, dataType) {
-case START_OBJECT => convertObject(parser, st, fieldConverters)
-  // SPARK-3308: support reading top level JSON arrays and take every 
element
-  // in such an array as a row
-  //
-  // For example, we support, the JSON data as below:
-  //
-  // [{"a":"str_a_1"}]
-  // [{"a":"str_a_2"}, {"b":"str_b_3"}]
-  //
-  // resulting in:
-  //
-  // List([str_a_1,null])
-  // List([str_a_2,null], [null,str_b_3])
-  //
-case START_ARRAY => convertArray(parser, elementConverter)
-  }
-
-case ArrayType(st: StructType, _) =>
-  val elementConverter = makeConverter(st)
-  val fieldConverters = st.map(_.dataType).map(makeConverter)
-  (parser: JsonParser) => parseJsonToken(parser, dataType) {
-// the business end of SPARK-3308:
-// when an object is found but an array is requested just wrap it in a 
list.
-// This is being wrapped in `JacksonParser.parse`.
-case START_OBJECT => convertObject(parser, st, fieldConverters)
-case START_ARRAY => convertArray(parser, elementConverter)
-  }
-
-case _ => makeConverter(dataType)
+  private def makeRootConverter(st: StructType): ValueConverter = {
+val elementConverter = makeConverter(st)
+val fieldConverters = st.map(_.dataType).map(makeConverter)
+(parser: JsonParser) => parseJsonToken(parser, st) {
+  case START_OBJECT => convertObject(parser, st, fieldConverters)
+// SPARK-3308: support reading top level JSON arrays and take every 
element
+// in such an array as a row
+//
+// For example, we support, the JSON data as below:
+//
+// [{"a":"str_a_1"}]
+// [{"a":"str_a_2"}, {"b":"str_b_3"}]
+//
+// resulting in:
+//
+// List([str_a_1,null])
+// List([str_a_2,null], [null,str_b_3])
+//
+  case START_ARRAY => convertArray(parser, elementConverter)
+}
   }
 
   /**
* Create a converter which converts the JSON documents held by the 
`JsonParser`
* to a value according to a desired schema.
*/
-  private def makeConverter(dataType: DataType): ValueConverter = dataType 
match {
+  private[sql] def makeConverter(dataType: DataType): ValueConverter = 
dataType match {
 case BooleanType =>
   (parser: JsonParser) => parseJsonToken(parser, dataType) {
 case VALUE_TRUE => true


spark git commit: [SPARK-18754][SS] Rename recentProgresses to recentProgress

2016-12-07 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 e9b3afac9 -> 1c6419718


[SPARK-18754][SS] Rename recentProgresses to recentProgress

Based on an informal survey, users find this option easier to understand / 
remember.

Author: Michael Armbrust 

Closes #16182 from marmbrus/renameRecentProgress.

(cherry picked from commit 70b2bf717d367d598c5a238d569d62c777e63fde)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c641971
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c641971
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c641971

Branch: refs/heads/branch-2.1
Commit: 1c6419718aadf0bdc200f9b328242062a07f2277
Parents: e9b3afa
Author: Michael Armbrust 
Authored: Wed Dec 7 15:36:29 2016 -0800
Committer: Tathagata Das 
Committed: Wed Dec 7 15:36:39 2016 -0800

--
 .../spark/sql/kafka010/KafkaSourceSuite.scala |  2 +-
 project/MimaExcludes.scala|  2 +-
 python/pyspark/sql/streaming.py   |  6 +++---
 python/pyspark/sql/tests.py   |  4 ++--
 .../execution/streaming/ProgressReporter.scala|  2 +-
 .../org/apache/spark/sql/internal/SQLConf.scala   |  2 +-
 .../spark/sql/streaming/StreamingQuery.scala  |  4 ++--
 .../execution/streaming/ForeachSinkSuite.scala|  4 ++--
 .../sql/streaming/FileStreamSourceSuite.scala |  2 +-
 .../streaming/StreamingQueryListenerSuite.scala   |  4 ++--
 .../spark/sql/streaming/StreamingQuerySuite.scala | 18 +-
 11 files changed, 25 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1c641971/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 0e40aba..544fbc5 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -448,7 +448,7 @@ class KafkaSourceSuite extends KafkaSourceTest {
   AddKafkaData(Set(topic), 1, 2, 3),
   CheckAnswer(2, 3, 4),
   AssertOnQuery { query =>
-val recordsRead = query.recentProgresses.map(_.numInputRows).sum
+val recordsRead = query.recentProgress.map(_.numInputRows).sum
 recordsRead == 3
   }
 )

http://git-wip-us.apache.org/repos/asf/spark/blob/1c641971/project/MimaExcludes.scala
--
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 6650aad..978a328 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -85,7 +85,7 @@ object MimaExcludes {
   
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sourceStatuses"),
   
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
   
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.lastProgress"),
-  
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgresses"),
+  
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgress"),
   
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
   
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.get"),
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1c641971/python/pyspark/sql/streaming.py
--
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index ee7a26d..9cfb3fe 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -114,12 +114,12 @@ class StreamingQuery(object):
 
 @property
 @since(2.1)
-def recentProgresses(self):
+def recentProgress(self):
 """Returns an array of the most recent [[StreamingQueryProgress]] 
updates for this query.
 The number of progress updates retained for each stream is configured 
by Spark session
-configuration `spark.sql.streaming.numRecentProgresses`.
+configuration `spark.sql.streaming.numRecentProgressUpdates`.
 """
-return [json.loads(p.json()) for p 

spark git commit: [SPARK-18754][SS] Rename recentProgresses to recentProgress

2016-12-07 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master edc87e189 -> 70b2bf717


[SPARK-18754][SS] Rename recentProgresses to recentProgress

Based on an informal survey, users find this option easier to understand / 
remember.

Author: Michael Armbrust 

Closes #16182 from marmbrus/renameRecentProgress.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/70b2bf71
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/70b2bf71
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/70b2bf71

Branch: refs/heads/master
Commit: 70b2bf717d367d598c5a238d569d62c777e63fde
Parents: edc87e1
Author: Michael Armbrust 
Authored: Wed Dec 7 15:36:29 2016 -0800
Committer: Tathagata Das 
Committed: Wed Dec 7 15:36:29 2016 -0800

--
 .../spark/sql/kafka010/KafkaSourceSuite.scala |  2 +-
 project/MimaExcludes.scala|  2 +-
 python/pyspark/sql/streaming.py   |  6 +++---
 python/pyspark/sql/tests.py   |  4 ++--
 .../execution/streaming/ProgressReporter.scala|  2 +-
 .../org/apache/spark/sql/internal/SQLConf.scala   |  2 +-
 .../spark/sql/streaming/StreamingQuery.scala  |  4 ++--
 .../execution/streaming/ForeachSinkSuite.scala|  4 ++--
 .../sql/streaming/FileStreamSourceSuite.scala |  2 +-
 .../streaming/StreamingQueryListenerSuite.scala   |  4 ++--
 .../spark/sql/streaming/StreamingQuerySuite.scala | 18 +-
 11 files changed, 25 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/70b2bf71/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 0e40aba..544fbc5 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -448,7 +448,7 @@ class KafkaSourceSuite extends KafkaSourceTest {
   AddKafkaData(Set(topic), 1, 2, 3),
   CheckAnswer(2, 3, 4),
   AssertOnQuery { query =>
-val recordsRead = query.recentProgresses.map(_.numInputRows).sum
+val recordsRead = query.recentProgress.map(_.numInputRows).sum
 recordsRead == 3
   }
 )

http://git-wip-us.apache.org/repos/asf/spark/blob/70b2bf71/project/MimaExcludes.scala
--
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 82d50f9..b215d88 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -91,7 +91,7 @@ object MimaExcludes {
   
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sourceStatuses"),
   
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
   
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.lastProgress"),
-  
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgresses"),
+  
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgress"),
   
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
   
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.get"),
 

http://git-wip-us.apache.org/repos/asf/spark/blob/70b2bf71/python/pyspark/sql/streaming.py
--
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index ee7a26d..9cfb3fe 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -114,12 +114,12 @@ class StreamingQuery(object):
 
 @property
 @since(2.1)
-def recentProgresses(self):
+def recentProgress(self):
 """Returns an array of the most recent [[StreamingQueryProgress]] 
updates for this query.
 The number of progress updates retained for each stream is configured 
by Spark session
-configuration `spark.sql.streaming.numRecentProgresses`.
+configuration `spark.sql.streaming.numRecentProgressUpdates`.
 """
-return [json.loads(p.json()) for p in self._jsq.recentProgresses()]
+return [json.loads(p.json()) for p in self._jsq.recentProgress()]
 
 @property
 

spark git commit: [SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite

2016-12-07 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master bb94f61a7 -> edc87e189


[SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite

## What changes were proposed in this pull request?

Fixed the following failures:

```
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to 
eventually never returned normally. Attempted 3745 times over 
1.79085165 minutes. Last failure message: assertion failed: 
failOnDataLoss-0 not deleted after timeout.
```

```
sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: 
Query query-66 terminated with exception: null
at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:146)
Caused by: sbt.ForkMain$ForkError: java.lang.NullPointerException: null
at java.util.ArrayList.addAll(ArrayList.java:577)
at 
org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:257)
at org.apache.kafka.clients.Metadata.update(Metadata.java:177)
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:605)
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:582)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at
...
```

## How was this patch tested?

Tested in #16048 by running many times.

Author: Shixiong Zhu 

Closes #16109 from zsxwing/fix-kafka-flaky-test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/edc87e18
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/edc87e18
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/edc87e18

Branch: refs/heads/master
Commit: edc87e18922b98be47c298cdc3daa2b049a737e9
Parents: bb94f61
Author: Shixiong Zhu 
Authored: Wed Dec 7 13:47:44 2016 -0800
Committer: Tathagata Das 
Committed: Wed Dec 7 13:47:44 2016 -0800

--
 .../sql/kafka010/CachedKafkaConsumer.scala  | 39 --
 .../apache/spark/sql/kafka010/KafkaSource.scala |  2 +-
 .../spark/sql/kafka010/KafkaSourceSuite.scala   | 11 ++-
 .../spark/sql/kafka010/KafkaTestUtils.scala | 75 +---
 .../spark/sql/test/SharedSQLContext.scala   |  8 ++-
 5 files changed, 96 insertions(+), 39 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/edc87e18/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
index 3f438e9..3f396a7 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
@@ -86,7 +86,7 @@ private[kafka010] case class CachedKafkaConsumer private(
 var toFetchOffset = offset
 while (toFetchOffset != UNKNOWN_OFFSET) {
   try {
-return fetchData(toFetchOffset, pollTimeoutMs)
+return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
   } catch {
 case e: OffsetOutOfRangeException =>
   // When there is some error thrown, it's better to use a new 
consumer to drop all cached
@@ -159,14 +159,18 @@ private[kafka010] case class CachedKafkaConsumer 

spark git commit: [SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite

2016-12-07 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 76e1f1651 -> e9b3afac9


[SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite

## What changes were proposed in this pull request?

Fixed the following failures:

```
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to 
eventually never returned normally. Attempted 3745 times over 
1.79085165 minutes. Last failure message: assertion failed: 
failOnDataLoss-0 not deleted after timeout.
```

```
sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: 
Query query-66 terminated with exception: null
at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:146)
Caused by: sbt.ForkMain$ForkError: java.lang.NullPointerException: null
at java.util.ArrayList.addAll(ArrayList.java:577)
at 
org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:257)
at org.apache.kafka.clients.Metadata.update(Metadata.java:177)
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:605)
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:582)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at
...
```

## How was this patch tested?

Tested in #16048 by running many times.

Author: Shixiong Zhu 

Closes #16109 from zsxwing/fix-kafka-flaky-test.

(cherry picked from commit edc87e18922b98be47c298cdc3daa2b049a737e9)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e9b3afac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e9b3afac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e9b3afac

Branch: refs/heads/branch-2.1
Commit: e9b3afac9ce5ea4bffb8201a58856598c521a3a9
Parents: 76e1f16
Author: Shixiong Zhu 
Authored: Wed Dec 7 13:47:44 2016 -0800
Committer: Tathagata Das 
Committed: Wed Dec 7 13:47:54 2016 -0800

--
 .../sql/kafka010/CachedKafkaConsumer.scala  | 39 --
 .../apache/spark/sql/kafka010/KafkaSource.scala |  2 +-
 .../spark/sql/kafka010/KafkaSourceSuite.scala   | 11 ++-
 .../spark/sql/kafka010/KafkaTestUtils.scala | 75 +---
 .../spark/sql/test/SharedSQLContext.scala   |  8 ++-
 5 files changed, 96 insertions(+), 39 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e9b3afac/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
index 3f438e9..3f396a7 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
@@ -86,7 +86,7 @@ private[kafka010] case class CachedKafkaConsumer private(
 var toFetchOffset = offset
 while (toFetchOffset != UNKNOWN_OFFSET) {
   try {
-return fetchData(toFetchOffset, pollTimeoutMs)
+return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
   } catch {
 case e: OffsetOutOfRangeException =>
   // When there is some error 

spark git commit: [SPARK-18762][WEBUI] Web UI should be http:4040 instead of https:4040

2016-12-07 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 7fbb07372 -> 44df6d2ce


[SPARK-18762][WEBUI] Web UI should be http:4040 instead of https:4040

## What changes were proposed in this pull request?

When SSL is enabled, the Spark shell shows:
```
Spark context Web UI available at https://192.168.99.1:4040
```
This is wrong because 4040 is http, not https. It redirects to the https port.
More importantly, this introduces several broken links in the UI. For example, 
in the master UI, the worker link is https:8081 instead of http:8081 or 
https:8481.

CC: mengxr liancheng

I manually tested accessing by accessing MasterPage, WorkerPage and 
HistoryServer with SSL enabled.

Author: sarutak 

Closes #16190 from sarutak/SPARK-18761.

(cherry picked from commit bb94f61a7ac97bf904ec0e8d5a4ab69a4142443f)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44df6d2c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44df6d2c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44df6d2c

Branch: refs/heads/branch-2.0
Commit: 44df6d2ce9b9fee087efd18f9f72b2bed89b4223
Parents: 7fbb073
Author: sarutak 
Authored: Wed Dec 7 11:41:23 2016 -0800
Committer: Marcelo Vanzin 
Committed: Wed Dec 7 11:44:20 2016 -0800

--
 core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 3 +--
 core/src/main/scala/org/apache/spark/ui/WebUI.scala | 5 +
 2 files changed, 2 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/44df6d2c/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 724206b..5bd32b2 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -187,8 +187,7 @@ private[deploy] class Worker(
 webUi = new WorkerWebUI(this, workDir, webUiPort)
 webUi.bind()
 
-val scheme = if (webUi.sslOptions.enabled) "https" else "http"
-workerWebUiUrl = s"$scheme://$publicAddress:${webUi.boundPort}"
+workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort};
 registerWithMaster()
 
 metricsSystem.registerSource(workerSource)

http://git-wip-us.apache.org/repos/asf/spark/blob/44df6d2c/core/src/main/scala/org/apache/spark/ui/WebUI.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala 
b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index adc4a4f..2c40e72 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -146,10 +146,7 @@ private[spark] abstract class WebUI(
   }
 
   /** Return the url of web interface. Only valid after bind(). */
-  def webUrl: String = {
-val protocol = if (sslOptions.enabled) "https" else "http"
-s"$protocol://$publicHostName:$boundPort"
-  }
+  def webUrl: String = s"http://$publicHostName:$boundPort;
 
   /** Return the actual port to which this server is bound. Only valid after 
bind(). */
   def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18762][WEBUI] Web UI should be http:4040 instead of https:4040

2016-12-07 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 acb6ac5da -> 76e1f1651


[SPARK-18762][WEBUI] Web UI should be http:4040 instead of https:4040

## What changes were proposed in this pull request?

When SSL is enabled, the Spark shell shows:
```
Spark context Web UI available at https://192.168.99.1:4040
```
This is wrong because 4040 is http, not https. It redirects to the https port.
More importantly, this introduces several broken links in the UI. For example, 
in the master UI, the worker link is https:8081 instead of http:8081 or 
https:8481.

CC: mengxr liancheng

I manually tested accessing by accessing MasterPage, WorkerPage and 
HistoryServer with SSL enabled.

Author: sarutak 

Closes #16190 from sarutak/SPARK-18761.

(cherry picked from commit bb94f61a7ac97bf904ec0e8d5a4ab69a4142443f)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76e1f165
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76e1f165
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76e1f165

Branch: refs/heads/branch-2.1
Commit: 76e1f1651f5a7207c9c66686616709b62b798fa3
Parents: acb6ac5
Author: sarutak 
Authored: Wed Dec 7 11:41:23 2016 -0800
Committer: Marcelo Vanzin 
Committed: Wed Dec 7 11:41:46 2016 -0800

--
 core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 3 +--
 core/src/main/scala/org/apache/spark/ui/WebUI.scala | 5 +
 2 files changed, 2 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/76e1f165/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 8b1c6bf..0940f3c 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -187,8 +187,7 @@ private[deploy] class Worker(
 webUi = new WorkerWebUI(this, workDir, webUiPort)
 webUi.bind()
 
-val scheme = if (webUi.sslOptions.enabled) "https" else "http"
-workerWebUiUrl = s"$scheme://$publicAddress:${webUi.boundPort}"
+workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort};
 registerWithMaster()
 
 metricsSystem.registerSource(workerSource)

http://git-wip-us.apache.org/repos/asf/spark/blob/76e1f165/core/src/main/scala/org/apache/spark/ui/WebUI.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala 
b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index a05e0ef..4118fcf 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -147,10 +147,7 @@ private[spark] abstract class WebUI(
   }
 
   /** Return the url of web interface. Only valid after bind(). */
-  def webUrl: String = {
-val protocol = if (sslOptions.enabled) "https" else "http"
-s"$protocol://$publicHostName:$boundPort"
-  }
+  def webUrl: String = s"http://$publicHostName:$boundPort;
 
   /** Return the actual port to which this server is bound. Only valid after 
bind(). */
   def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18762][WEBUI] Web UI should be http:4040 instead of https:4040

2016-12-07 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master dbf3e298a -> bb94f61a7


[SPARK-18762][WEBUI] Web UI should be http:4040 instead of https:4040

## What changes were proposed in this pull request?

When SSL is enabled, the Spark shell shows:
```
Spark context Web UI available at https://192.168.99.1:4040
```
This is wrong because 4040 is http, not https. It redirects to the https port.
More importantly, this introduces several broken links in the UI. For example, 
in the master UI, the worker link is https:8081 instead of http:8081 or 
https:8481.

CC: mengxr liancheng

I manually tested accessing by accessing MasterPage, WorkerPage and 
HistoryServer with SSL enabled.

Author: sarutak 

Closes #16190 from sarutak/SPARK-18761.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb94f61a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb94f61a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb94f61a

Branch: refs/heads/master
Commit: bb94f61a7ac97bf904ec0e8d5a4ab69a4142443f
Parents: dbf3e29
Author: sarutak 
Authored: Wed Dec 7 11:41:23 2016 -0800
Committer: Marcelo Vanzin 
Committed: Wed Dec 7 11:41:23 2016 -0800

--
 core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 3 +--
 core/src/main/scala/org/apache/spark/ui/WebUI.scala | 5 +
 2 files changed, 2 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bb94f61a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 8b1c6bf..0940f3c 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -187,8 +187,7 @@ private[deploy] class Worker(
 webUi = new WorkerWebUI(this, workDir, webUiPort)
 webUi.bind()
 
-val scheme = if (webUi.sslOptions.enabled) "https" else "http"
-workerWebUiUrl = s"$scheme://$publicAddress:${webUi.boundPort}"
+workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort};
 registerWithMaster()
 
 metricsSystem.registerSource(workerSource)

http://git-wip-us.apache.org/repos/asf/spark/blob/bb94f61a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala 
b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index 8c80155..b8604c5 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -147,10 +147,7 @@ private[spark] abstract class WebUI(
   }
 
   /** Return the url of web interface. Only valid after bind(). */
-  def webUrl: String = {
-val protocol = if (sslOptions.enabled) "https" else "http"
-s"$protocol://$publicHostName:$boundPort"
-  }
+  def webUrl: String = s"http://$publicHostName:$boundPort;
 
   /** Return the actual port to which this server is bound. Only valid after 
bind(). */
   def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18764][CORE] Add a warning log when skipping a corrupted file

2016-12-07 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 5dbcd4fcf -> acb6ac5da


[SPARK-18764][CORE] Add a warning log when skipping a corrupted file

## What changes were proposed in this pull request?

It's better to add a warning log when skipping a corrupted file. It will be 
helpful when we want to finish the job first, then find them in the log and fix 
these files.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #16192 from zsxwing/SPARK-18764.

(cherry picked from commit dbf3e298a1a35c0243f087814ddf88034ff96d66)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acb6ac5d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acb6ac5d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acb6ac5d

Branch: refs/heads/branch-2.1
Commit: acb6ac5da7a5694cc3270772c6d68933b7d761dc
Parents: 5dbcd4f
Author: Shixiong Zhu 
Authored: Wed Dec 7 10:30:05 2016 -0800
Committer: Shixiong Zhu 
Committed: Wed Dec 7 10:30:15 2016 -0800

--
 core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala   | 4 +++-
 core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala| 6 +-
 .../apache/spark/sql/execution/datasources/FileScanRDD.scala   | 1 +
 3 files changed, 9 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/acb6ac5d/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index ae4320d..3133a28 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -257,7 +257,9 @@ class HadoopRDD[K, V](
 try {
   finished = !reader.next(key, value)
 } catch {
-  case e: IOException if ignoreCorruptFiles => finished = true
+  case e: IOException if ignoreCorruptFiles =>
+logWarning(s"Skipped the rest content in the corrupted file: 
${split.inputSplit}", e)
+finished = true
 }
 if (!finished) {
   inputMetrics.incRecordsRead(1)

http://git-wip-us.apache.org/repos/asf/spark/blob/acb6ac5d/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index c783e13..c6ddb4b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -186,7 +186,11 @@ class NewHadoopRDD[K, V](
   try {
 finished = !reader.nextKeyValue
   } catch {
-case e: IOException if ignoreCorruptFiles => finished = true
+case e: IOException if ignoreCorruptFiles =>
+  logWarning(
+s"Skipped the rest content in the corrupted file: 
${split.serializableHadoopSplit}",
+e)
+  finished = true
   }
   if (finished) {
 // Close and release the reader here; close() will also be called 
when the task

http://git-wip-us.apache.org/repos/asf/spark/blob/acb6ac5d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 8994457..237cdab 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -138,6 +138,7 @@ class FileScanRDD(
 }
   } catch {
 case e: IOException =>
+  logWarning(s"Skipped the rest content in the corrupted 
file: $currentFile", e)
   finished = true
   null
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18764][CORE] Add a warning log when skipping a corrupted file

2016-12-07 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master f1fca81b1 -> dbf3e298a


[SPARK-18764][CORE] Add a warning log when skipping a corrupted file

## What changes were proposed in this pull request?

It's better to add a warning log when skipping a corrupted file. It will be 
helpful when we want to finish the job first, then find them in the log and fix 
these files.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #16192 from zsxwing/SPARK-18764.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dbf3e298
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dbf3e298
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dbf3e298

Branch: refs/heads/master
Commit: dbf3e298a1a35c0243f087814ddf88034ff96d66
Parents: f1fca81
Author: Shixiong Zhu 
Authored: Wed Dec 7 10:30:05 2016 -0800
Committer: Shixiong Zhu 
Committed: Wed Dec 7 10:30:05 2016 -0800

--
 core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala   | 4 +++-
 core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala| 6 +-
 .../apache/spark/sql/execution/datasources/FileScanRDD.scala   | 1 +
 3 files changed, 9 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/dbf3e298/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index e3d81a6..6e87233 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -259,7 +259,9 @@ class HadoopRDD[K, V](
 try {
   finished = !reader.next(key, value)
 } catch {
-  case e: IOException if ignoreCorruptFiles => finished = true
+  case e: IOException if ignoreCorruptFiles =>
+logWarning(s"Skipped the rest content in the corrupted file: 
${split.inputSplit}", e)
+finished = true
 }
 if (!finished) {
   inputMetrics.incRecordsRead(1)

http://git-wip-us.apache.org/repos/asf/spark/blob/dbf3e298/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index e90e84c..e805192 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -189,7 +189,11 @@ class NewHadoopRDD[K, V](
   try {
 finished = !reader.nextKeyValue
   } catch {
-case e: IOException if ignoreCorruptFiles => finished = true
+case e: IOException if ignoreCorruptFiles =>
+  logWarning(
+s"Skipped the rest content in the corrupted file: 
${split.serializableHadoopSplit}",
+e)
+  finished = true
   }
   if (finished) {
 // Close and release the reader here; close() will also be called 
when the task

http://git-wip-us.apache.org/repos/asf/spark/blob/dbf3e298/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 306dc65..6d8cd81 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -139,6 +139,7 @@ class FileScanRDD(
 }
   } catch {
 case e: IOException =>
+  logWarning(s"Skipped the rest content in the corrupted 
file: $currentFile", e)
   finished = true
   null
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17760][SQL][BACKPORT] AnalysisException with dataframe pivot when groupBy column is not attribute

2016-12-07 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 e05ad8830 -> 7fbb07372


[SPARK-17760][SQL][BACKPORT] AnalysisException with dataframe pivot when 
groupBy column is not attribute

## What changes were proposed in this pull request?

Backport of #16177 to branch-2.0

## How was this patch tested?

existing and additional unit tests

Author: Andrew Ray 

Closes #16197 from aray/SPARK-17760-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7fbb0737
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7fbb0737
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7fbb0737

Branch: refs/heads/branch-2.0
Commit: 7fbb073728e984ce11e8f64f324878a399078e14
Parents: e05ad88
Author: Andrew Ray 
Authored: Wed Dec 7 18:30:34 2016 +0100
Committer: Herman van Hovell 
Committed: Wed Dec 7 18:30:34 2016 +0100

--
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala   | 5 +++--
 .../scala/org/apache/spark/sql/DataFramePivotSuite.scala| 9 +
 2 files changed, 12 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7fbb0737/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 05a2d18..32dc70a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -402,14 +402,15 @@ class Analyzer(
   .toAggregateExpression()
 , "__pivot_" + a.sql)()
   }
-  val secondAgg = Aggregate(groupByExprs, groupByExprs ++ pivotAggs, 
firstAgg)
+  val groupByExprsAttr = groupByExprs.map(_.toAttribute)
+  val secondAgg = Aggregate(groupByExprsAttr, groupByExprsAttr ++ 
pivotAggs, firstAgg)
   val pivotAggAttribute = pivotAggs.map(_.toAttribute)
   val pivotOutputs = pivotValues.zipWithIndex.flatMap { case (value, 
i) =>
 aggregates.zip(pivotAggAttribute).map { case (aggregate, pivotAtt) 
=>
   Alias(ExtractValue(pivotAtt, Literal(i), resolver), 
outputName(value, aggregate))()
 }
   }
-  Project(groupByExprs ++ pivotOutputs, secondAgg)
+  Project(groupByExprsAttr ++ pivotOutputs, secondAgg)
 } else {
   val pivotAggregates: Seq[NamedExpression] = pivotValues.flatMap { 
value =>
 def ifExpr(expr: Expression) = {

http://git-wip-us.apache.org/repos/asf/spark/blob/7fbb0737/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
index d5cb5e1..41d3525 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
@@ -197,4 +197,13 @@ class DataFramePivotSuite extends QueryTest with 
SharedSQLContext{
 Row(2013, Seq(48000.0, 7.0), Seq(3.0, 7.0)) :: Nil
 )
   }
+
+  test("pivot with column definition in groupby") {
+checkAnswer(
+  courseSales.groupBy(substring(col("course"), 0, 1).as("foo"))
+.pivot("year", Seq(2012, 2013))
+.sum("earnings"),
+  Row("d", 15000.0, 48000.0) :: Row("J", 2.0, 3.0) :: Nil
+)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17760][SQL] AnalysisException with dataframe pivot when groupBy column is not attribute

2016-12-07 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 4432a2a83 -> 5dbcd4fcf


[SPARK-17760][SQL] AnalysisException with dataframe pivot when groupBy column 
is not attribute

## What changes were proposed in this pull request?

Fixes AnalysisException for pivot queries that have group by columns that are 
expressions and not attributes by substituting the expressions output attribute 
in the second aggregation and final projection.

## How was this patch tested?

existing and additional unit tests

Author: Andrew Ray 

Closes #16177 from aray/SPARK-17760.

(cherry picked from commit f1fca81b165c5a673f7d86b268e04ea42a6c267e)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5dbcd4fc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5dbcd4fc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5dbcd4fc

Branch: refs/heads/branch-2.1
Commit: 5dbcd4fcfbc14ba8c17e1cb364ca45b99aa90708
Parents: 4432a2a
Author: Andrew Ray 
Authored: Wed Dec 7 04:44:14 2016 -0800
Committer: Herman van Hovell 
Committed: Wed Dec 7 04:44:25 2016 -0800

--
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala| 5 +++--
 .../scala/org/apache/spark/sql/DataFramePivotSuite.scala | 8 
 2 files changed, 11 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5dbcd4fc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index f738ae8..9ca9901 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -404,14 +404,15 @@ class Analyzer(
   .toAggregateExpression()
 , "__pivot_" + a.sql)()
   }
-  val secondAgg = Aggregate(groupByExprs, groupByExprs ++ pivotAggs, 
firstAgg)
+  val groupByExprsAttr = groupByExprs.map(_.toAttribute)
+  val secondAgg = Aggregate(groupByExprsAttr, groupByExprsAttr ++ 
pivotAggs, firstAgg)
   val pivotAggAttribute = pivotAggs.map(_.toAttribute)
   val pivotOutputs = pivotValues.zipWithIndex.flatMap { case (value, 
i) =>
 aggregates.zip(pivotAggAttribute).map { case (aggregate, pivotAtt) 
=>
   Alias(ExtractValue(pivotAtt, Literal(i), resolver), 
outputName(value, aggregate))()
 }
   }
-  Project(groupByExprs ++ pivotOutputs, secondAgg)
+  Project(groupByExprsAttr ++ pivotOutputs, secondAgg)
 } else {
   val pivotAggregates: Seq[NamedExpression] = pivotValues.flatMap { 
value =>
 def ifExpr(expr: Expression) = {

http://git-wip-us.apache.org/repos/asf/spark/blob/5dbcd4fc/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
index 1bbe135..a8d854c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
@@ -208,4 +208,12 @@ class DataFramePivotSuite extends QueryTest with 
SharedSQLContext{
 )
   }
 
+  test("pivot with column definition in groupby") {
+checkAnswer(
+  courseSales.groupBy(substring(col("course"), 0, 1).as("foo"))
+.pivot("year", Seq(2012, 2013))
+.sum("earnings"),
+  Row("d", 15000.0, 48000.0) :: Row("J", 2.0, 3.0) :: Nil
+)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17760][SQL] AnalysisException with dataframe pivot when groupBy column is not attribute

2016-12-07 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master c496d03b5 -> f1fca81b1


[SPARK-17760][SQL] AnalysisException with dataframe pivot when groupBy column 
is not attribute

## What changes were proposed in this pull request?

Fixes AnalysisException for pivot queries that have group by columns that are 
expressions and not attributes by substituting the expressions output attribute 
in the second aggregation and final projection.

## How was this patch tested?

existing and additional unit tests

Author: Andrew Ray 

Closes #16177 from aray/SPARK-17760.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f1fca81b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f1fca81b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f1fca81b

Branch: refs/heads/master
Commit: f1fca81b165c5a673f7d86b268e04ea42a6c267e
Parents: c496d03
Author: Andrew Ray 
Authored: Wed Dec 7 04:44:14 2016 -0800
Committer: Herman van Hovell 
Committed: Wed Dec 7 04:44:14 2016 -0800

--
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala| 5 +++--
 .../scala/org/apache/spark/sql/DataFramePivotSuite.scala | 8 
 2 files changed, 11 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f1fca81b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ed6e17a..58f98d5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -463,14 +463,15 @@ class Analyzer(
   .toAggregateExpression()
 , "__pivot_" + a.sql)()
   }
-  val secondAgg = Aggregate(groupByExprs, groupByExprs ++ pivotAggs, 
firstAgg)
+  val groupByExprsAttr = groupByExprs.map(_.toAttribute)
+  val secondAgg = Aggregate(groupByExprsAttr, groupByExprsAttr ++ 
pivotAggs, firstAgg)
   val pivotAggAttribute = pivotAggs.map(_.toAttribute)
   val pivotOutputs = pivotValues.zipWithIndex.flatMap { case (value, 
i) =>
 aggregates.zip(pivotAggAttribute).map { case (aggregate, pivotAtt) 
=>
   Alias(ExtractValue(pivotAtt, Literal(i), resolver), 
outputName(value, aggregate))()
 }
   }
-  Project(groupByExprs ++ pivotOutputs, secondAgg)
+  Project(groupByExprsAttr ++ pivotOutputs, secondAgg)
 } else {
   val pivotAggregates: Seq[NamedExpression] = pivotValues.flatMap { 
value =>
 def ifExpr(expr: Expression) = {

http://git-wip-us.apache.org/repos/asf/spark/blob/f1fca81b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
index 1bbe135..a8d854c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
@@ -208,4 +208,12 @@ class DataFramePivotSuite extends QueryTest with 
SharedSQLContext{
 )
   }
 
+  test("pivot with column definition in groupby") {
+checkAnswer(
+  courseSales.groupBy(substring(col("course"), 0, 1).as("foo"))
+.pivot("year", Seq(2012, 2013))
+.sum("earnings"),
+  Row("d", 15000.0, 48000.0) :: Row("J", 2.0, 3.0) :: Nil
+)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18208][SHUFFLE] Executor OOM due to a growing LongArray in BytesToBytesMap

2016-12-07 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 f5c5a07bd -> e05ad8830


[SPARK-18208][SHUFFLE] Executor OOM due to a growing LongArray in 
BytesToBytesMap

## What changes were proposed in this pull request?

BytesToBytesMap currently does not release the in-memory storage (the longArray 
variable) after it spills to disk. This is typically not a problem during 
aggregation because the longArray should be much smaller than the pages, and 
because we grow the longArray at a conservative rate.

However this can lead to an OOM when an already running task is allocated more 
than its fair share, this can happen because of a scheduling delay. In this 
case the longArray can grow beyond the fair share of memory for the task. This 
becomes problematic when the task spills and the long array is not freed, that 
causes subsequent memory allocation requests to be denied by the memory manager 
resulting in an OOM.

This PR fixes this issuing by freeing the longArray when the BytesToBytesMap 
spills.

## How was this patch tested?

Existing tests and tested on realworld workloads.

Author: Jie Xiong 
Author: jiexiong 

Closes #15722 from jiexiong/jie_oom_fix.

(cherry picked from commit c496d03b5289f7c604661a12af86f6accddcf125)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e05ad883
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e05ad883
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e05ad883

Branch: refs/heads/branch-2.0
Commit: e05ad8830e204acaf7cee4daef0ed44db9a158f3
Parents: f5c5a07
Author: Jie Xiong 
Authored: Wed Dec 7 04:33:30 2016 -0800
Committer: Herman van Hovell 
Committed: Wed Dec 7 04:34:04 2016 -0800

--
 .../java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 7 +--
 1 file changed, 5 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e05ad883/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
--
diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index dc04025..947db7f 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -169,6 +169,8 @@ public final class BytesToBytesMap extends MemoryConsumer {
 
   private long peakMemoryUsedBytes = 0L;
 
+  private final int initialCapacity;
+
   private final BlockManager blockManager;
   private final SerializerManager serializerManager;
   private volatile MapIterator destructiveIterator = null;
@@ -201,6 +203,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
   throw new IllegalArgumentException("Page size " + pageSizeBytes + " 
cannot exceed " +
 TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES);
 }
+this.initialCapacity = initialCapacity;
 allocate(initialCapacity);
   }
 
@@ -897,12 +900,12 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
   public void reset() {
 numKeys = 0;
 numValues = 0;
-longArray.zeroOut();
-
+freeArray(longArray);
 while (dataPages.size() > 0) {
   MemoryBlock dataPage = dataPages.removeLast();
   freePage(dataPage);
 }
+allocate(initialCapacity);
 currentPage = null;
 pageCursor = 0;
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18208][SHUFFLE] Executor OOM due to a growing LongArray in BytesToBytesMap

2016-12-07 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 51754d6df -> 4432a2a83


[SPARK-18208][SHUFFLE] Executor OOM due to a growing LongArray in 
BytesToBytesMap

## What changes were proposed in this pull request?

BytesToBytesMap currently does not release the in-memory storage (the longArray 
variable) after it spills to disk. This is typically not a problem during 
aggregation because the longArray should be much smaller than the pages, and 
because we grow the longArray at a conservative rate.

However this can lead to an OOM when an already running task is allocated more 
than its fair share, this can happen because of a scheduling delay. In this 
case the longArray can grow beyond the fair share of memory for the task. This 
becomes problematic when the task spills and the long array is not freed, that 
causes subsequent memory allocation requests to be denied by the memory manager 
resulting in an OOM.

This PR fixes this issuing by freeing the longArray when the BytesToBytesMap 
spills.

## How was this patch tested?

Existing tests and tested on realworld workloads.

Author: Jie Xiong 
Author: jiexiong 

Closes #15722 from jiexiong/jie_oom_fix.

(cherry picked from commit c496d03b5289f7c604661a12af86f6accddcf125)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4432a2a8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4432a2a8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4432a2a8

Branch: refs/heads/branch-2.1
Commit: 4432a2a8386f951775957f352e4ba223c6ce4fa3
Parents: 51754d6
Author: Jie Xiong 
Authored: Wed Dec 7 04:33:30 2016 -0800
Committer: Herman van Hovell 
Committed: Wed Dec 7 04:33:50 2016 -0800

--
 .../java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 7 +--
 1 file changed, 5 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4432a2a8/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
--
diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index d2fcdea..44120e5 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -170,6 +170,8 @@ public final class BytesToBytesMap extends MemoryConsumer {
 
   private long peakMemoryUsedBytes = 0L;
 
+  private final int initialCapacity;
+
   private final BlockManager blockManager;
   private final SerializerManager serializerManager;
   private volatile MapIterator destructiveIterator = null;
@@ -202,6 +204,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
   throw new IllegalArgumentException("Page size " + pageSizeBytes + " 
cannot exceed " +
 TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES);
 }
+this.initialCapacity = initialCapacity;
 allocate(initialCapacity);
   }
 
@@ -902,12 +905,12 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
   public void reset() {
 numKeys = 0;
 numValues = 0;
-longArray.zeroOut();
-
+freeArray(longArray);
 while (dataPages.size() > 0) {
   MemoryBlock dataPage = dataPages.removeLast();
   freePage(dataPage);
 }
+allocate(initialCapacity);
 currentPage = null;
 pageCursor = 0;
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18208][SHUFFLE] Executor OOM due to a growing LongArray in BytesToBytesMap

2016-12-07 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 79f5f281b -> c496d03b5


[SPARK-18208][SHUFFLE] Executor OOM due to a growing LongArray in 
BytesToBytesMap

## What changes were proposed in this pull request?

BytesToBytesMap currently does not release the in-memory storage (the longArray 
variable) after it spills to disk. This is typically not a problem during 
aggregation because the longArray should be much smaller than the pages, and 
because we grow the longArray at a conservative rate.

However this can lead to an OOM when an already running task is allocated more 
than its fair share, this can happen because of a scheduling delay. In this 
case the longArray can grow beyond the fair share of memory for the task. This 
becomes problematic when the task spills and the long array is not freed, that 
causes subsequent memory allocation requests to be denied by the memory manager 
resulting in an OOM.

This PR fixes this issuing by freeing the longArray when the BytesToBytesMap 
spills.

## How was this patch tested?

Existing tests and tested on realworld workloads.

Author: Jie Xiong 
Author: jiexiong 

Closes #15722 from jiexiong/jie_oom_fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c496d03b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c496d03b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c496d03b

Branch: refs/heads/master
Commit: c496d03b5289f7c604661a12af86f6accddcf125
Parents: 79f5f28
Author: Jie Xiong 
Authored: Wed Dec 7 04:33:30 2016 -0800
Committer: Herman van Hovell 
Committed: Wed Dec 7 04:33:30 2016 -0800

--
 .../java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 7 +--
 1 file changed, 5 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c496d03b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
--
diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index d2fcdea..44120e5 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -170,6 +170,8 @@ public final class BytesToBytesMap extends MemoryConsumer {
 
   private long peakMemoryUsedBytes = 0L;
 
+  private final int initialCapacity;
+
   private final BlockManager blockManager;
   private final SerializerManager serializerManager;
   private volatile MapIterator destructiveIterator = null;
@@ -202,6 +204,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
   throw new IllegalArgumentException("Page size " + pageSizeBytes + " 
cannot exceed " +
 TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES);
 }
+this.initialCapacity = initialCapacity;
 allocate(initialCapacity);
   }
 
@@ -902,12 +905,12 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
   public void reset() {
 numKeys = 0;
 numValues = 0;
-longArray.zeroOut();
-
+freeArray(longArray);
 while (dataPages.size() > 0) {
   MemoryBlock dataPage = dataPages.removeLast();
   freePage(dataPage);
 }
+allocate(initialCapacity);
 currentPage = null;
 pageCursor = 0;
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18678][ML] Skewed reservoir sampling in SamplingUtils

2016-12-07 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 99c293eea -> 51754d6df


[SPARK-18678][ML] Skewed reservoir sampling in SamplingUtils

## What changes were proposed in this pull request?

Fix reservoir sampling bias for small k. An off-by-one error meant that the 
probability of replacement was slightly too high -- k/(l-1) after l element 
instead of k/l, which matters for small k.

## How was this patch tested?

Existing test plus new test case.

Author: Sean Owen 

Closes #16129 from srowen/SPARK-18678.

(cherry picked from commit 79f5f281bb69cb2de9f64006180abd753e8ae427)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51754d6d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51754d6d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51754d6d

Branch: refs/heads/branch-2.1
Commit: 51754d6df703c02ecb23ec1779889602ff8fb038
Parents: 99c293e
Author: Sean Owen 
Authored: Wed Dec 7 17:34:45 2016 +0800
Committer: Sean Owen 
Committed: Wed Dec 7 17:34:57 2016 +0800

--
 R/pkg/inst/tests/testthat/test_mllib.R |  9 +
 .../org/apache/spark/util/random/SamplingUtils.scala   |  5 -
 .../apache/spark/util/random/SamplingUtilsSuite.scala  | 13 +
 3 files changed, 22 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/51754d6d/R/pkg/inst/tests/testthat/test_mllib.R
--
diff --git a/R/pkg/inst/tests/testthat/test_mllib.R 
b/R/pkg/inst/tests/testthat/test_mllib.R
index d7aa965..9f810be 100644
--- a/R/pkg/inst/tests/testthat/test_mllib.R
+++ b/R/pkg/inst/tests/testthat/test_mllib.R
@@ -1007,10 +1007,11 @@ test_that("spark.randomForest", {
   model <- spark.randomForest(data, Employed ~ ., "regression", maxDepth = 5, 
maxBins = 16,
   numTrees = 20, seed = 123)
   predictions <- collect(predict(model, data))
-  expect_equal(predictions$prediction, c(60.379, 61.096, 60.636, 62.258,
- 63.736, 64.296, 64.868, 64.300,
- 66.709, 67.697, 67.966, 67.252,
- 68.866, 69.593, 69.195, 69.658),
+  expect_equal(predictions$prediction, c(60.32820, 61.22315, 60.69025, 
62.11070,
+ 63.53160, 64.05470, 65.12710, 
64.30450,
+ 66.70910, 67.86125, 68.08700, 
67.21865,
+ 68.89275, 69.53180, 69.39640, 
69.68250),
+
tolerance = 1e-4)
   stats <- summary(model)
   expect_equal(stats$numTrees, 20)

http://git-wip-us.apache.org/repos/asf/spark/blob/51754d6d/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala 
b/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala
index 297524c..a7e0075 100644
--- a/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala
@@ -56,11 +56,14 @@ private[spark] object SamplingUtils {
   val rand = new XORShiftRandom(seed)
   while (input.hasNext) {
 val item = input.next()
+l += 1
+// There are k elements in the reservoir, and the l-th element has been
+// consumed. It should be chosen with probability k/l. The expression
+// below is a random long chosen uniformly from [0,l)
 val replacementIndex = (rand.nextDouble() * l).toLong
 if (replacementIndex < k) {
   reservoir(replacementIndex.toInt) = item
 }
-l += 1
   }
   (reservoir, l)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/51754d6d/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala
index 667a4db..55c5dd5 100644
--- a/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala
@@ -44,6 +44,19 @@ class SamplingUtilsSuite extends SparkFunSuite {
 assert(sample3.length === 10)
   }
 
+  test("SPARK-18678 reservoirSampleAndCount with tiny input") {
+val input = Seq(0, 1)
+val counts = new Array[Int](input.size)
+for (i <- 0 until 500) {
+  val (samples, inputSize) = 

spark git commit: [SPARK-18678][ML] Skewed reservoir sampling in SamplingUtils

2016-12-07 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master b82802713 -> 79f5f281b


[SPARK-18678][ML] Skewed reservoir sampling in SamplingUtils

## What changes were proposed in this pull request?

Fix reservoir sampling bias for small k. An off-by-one error meant that the 
probability of replacement was slightly too high -- k/(l-1) after l element 
instead of k/l, which matters for small k.

## How was this patch tested?

Existing test plus new test case.

Author: Sean Owen 

Closes #16129 from srowen/SPARK-18678.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79f5f281
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79f5f281
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79f5f281

Branch: refs/heads/master
Commit: 79f5f281bb69cb2de9f64006180abd753e8ae427
Parents: b828027
Author: Sean Owen 
Authored: Wed Dec 7 17:34:45 2016 +0800
Committer: Sean Owen 
Committed: Wed Dec 7 17:34:45 2016 +0800

--
 R/pkg/inst/tests/testthat/test_mllib.R |  9 +
 .../org/apache/spark/util/random/SamplingUtils.scala   |  5 -
 .../apache/spark/util/random/SamplingUtilsSuite.scala  | 13 +
 3 files changed, 22 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/79f5f281/R/pkg/inst/tests/testthat/test_mllib.R
--
diff --git a/R/pkg/inst/tests/testthat/test_mllib.R 
b/R/pkg/inst/tests/testthat/test_mllib.R
index 0802a2a..4758e40 100644
--- a/R/pkg/inst/tests/testthat/test_mllib.R
+++ b/R/pkg/inst/tests/testthat/test_mllib.R
@@ -1007,10 +1007,11 @@ test_that("spark.randomForest", {
   model <- spark.randomForest(data, Employed ~ ., "regression", maxDepth = 5, 
maxBins = 16,
   numTrees = 20, seed = 123)
   predictions <- collect(predict(model, data))
-  expect_equal(predictions$prediction, c(60.379, 61.096, 60.636, 62.258,
- 63.736, 64.296, 64.868, 64.300,
- 66.709, 67.697, 67.966, 67.252,
- 68.866, 69.593, 69.195, 69.658),
+  expect_equal(predictions$prediction, c(60.32820, 61.22315, 60.69025, 
62.11070,
+ 63.53160, 64.05470, 65.12710, 
64.30450,
+ 66.70910, 67.86125, 68.08700, 
67.21865,
+ 68.89275, 69.53180, 69.39640, 
69.68250),
+
tolerance = 1e-4)
   stats <- summary(model)
   expect_equal(stats$numTrees, 20)

http://git-wip-us.apache.org/repos/asf/spark/blob/79f5f281/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala 
b/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala
index 297524c..a7e0075 100644
--- a/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala
@@ -56,11 +56,14 @@ private[spark] object SamplingUtils {
   val rand = new XORShiftRandom(seed)
   while (input.hasNext) {
 val item = input.next()
+l += 1
+// There are k elements in the reservoir, and the l-th element has been
+// consumed. It should be chosen with probability k/l. The expression
+// below is a random long chosen uniformly from [0,l)
 val replacementIndex = (rand.nextDouble() * l).toLong
 if (replacementIndex < k) {
   reservoir(replacementIndex.toInt) = item
 }
-l += 1
   }
   (reservoir, l)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79f5f281/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala
index 667a4db..55c5dd5 100644
--- a/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala
@@ -44,6 +44,19 @@ class SamplingUtilsSuite extends SparkFunSuite {
 assert(sample3.length === 10)
   }
 
+  test("SPARK-18678 reservoirSampleAndCount with tiny input") {
+val input = Seq(0, 1)
+val counts = new Array[Int](input.size)
+for (i <- 0 until 500) {
+  val (samples, inputSize) = 
SamplingUtils.reservoirSampleAndCount(input.iterator, 1)
+  assert(inputSize === 2)
+  assert(samples.length === 1)
+  

spark git commit: [SPARK-18701][ML] Fix Poisson GLM failure due to wrong initialization

2016-12-07 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 90b59d1bf -> b82802713


[SPARK-18701][ML] Fix Poisson GLM failure due to wrong initialization

Poisson GLM fails for many standard data sets (see example in test or JIRA). 
The issue is incorrect initialization leading to almost zero probability and 
weights. Specifically, the mean is initialized as the response, which could be 
zero. Applying the log link results in very negative numbers (protected against 
-Inf), which again leads to close to zero probability and weights in the 
weighted least squares. Fix and test are included in the commits.

## What changes were proposed in this pull request?
Update initialization in Poisson GLM

## How was this patch tested?
Add test in GeneralizedLinearRegressionSuite

srowen sethah yanboliang HyukjinKwon mengxr

Author: actuaryzhang 

Closes #16131 from actuaryzhang/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8280271
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8280271
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8280271

Branch: refs/heads/master
Commit: b8280271396eb74638da6546d76bbb2d06c7011b
Parents: 90b59d1
Author: actuaryzhang 
Authored: Wed Dec 7 16:37:25 2016 +0800
Committer: Sean Owen 
Committed: Wed Dec 7 16:37:25 2016 +0800

--
 .../GeneralizedLinearRegression.scala   |  6 +-
 .../GeneralizedLinearRegressionSuite.scala  | 21 +++-
 2 files changed, 17 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b8280271/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
index 770a257..f137c8c 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
@@ -505,7 +505,11 @@ object GeneralizedLinearRegression extends 
DefaultParamsReadable[GeneralizedLine
 override def initialize(y: Double, weight: Double): Double = {
   require(y >= 0.0, "The response variable of Poisson family " +
 s"should be non-negative, but got $y")
-  y
+  /*
+Force Poisson mean > 0 to avoid numerical instability in IRLS.
+R uses y + 0.1 for initialization. See poisson()$initialize.
+   */
+  math.max(y, 0.1)
 }
 
 override def variance(mu: Double): Double = mu

http://git-wip-us.apache.org/repos/asf/spark/blob/b8280271/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala
index 4fab216..3e9e1fc 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala
@@ -89,11 +89,14 @@ class GeneralizedLinearRegressionSuite
   xVariance = Array(0.7, 1.2), nPoints = 1, seed, noiseLevel = 0.01,
   family = "poisson", link = "log").toDF()
 
-datasetPoissonLogWithZero = generateGeneralizedLinearRegressionInput(
-  intercept = -1.5, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 
10.5),
-  xVariance = Array(0.7, 1.2), nPoints = 100, seed, noiseLevel = 0.01,
-  family = "poisson", link = "log")
-  .map{x => LabeledPoint(if (x.label < 0.7) 0.0 else x.label, 
x.features)}.toDF()
+datasetPoissonLogWithZero = Seq(
+  LabeledPoint(0.0, Vectors.dense(18, 1.0)),
+  LabeledPoint(1.0, Vectors.dense(12, 0.0)),
+  LabeledPoint(0.0, Vectors.dense(15, 0.0)),
+  LabeledPoint(0.0, Vectors.dense(13, 2.0)),
+  LabeledPoint(0.0, Vectors.dense(15, 1.0)),
+  LabeledPoint(1.0, Vectors.dense(16, 1.0))
+).toDF()
 
 datasetPoissonIdentity = generateGeneralizedLinearRegressionInput(
   intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 
10.5),
@@ -480,12 +483,12 @@ class GeneralizedLinearRegressionSuite
  model <- glm(formula, family="poisson", data=data)
  print(as.vector(coef(model)))
}
-   [1]  0.4272661 -0.1565423
-   [1] -3.6911354  0.6214301  0.1295814
+   [1] -0.0457441 -0.6833928
+   [1] 1.8121235  -0.1747493  -0.5815417
  */
 val 

spark git commit: [SPARK-18701][ML] Fix Poisson GLM failure due to wrong initialization

2016-12-07 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 340e9aea4 -> 99c293eea


[SPARK-18701][ML] Fix Poisson GLM failure due to wrong initialization

Poisson GLM fails for many standard data sets (see example in test or JIRA). 
The issue is incorrect initialization leading to almost zero probability and 
weights. Specifically, the mean is initialized as the response, which could be 
zero. Applying the log link results in very negative numbers (protected against 
-Inf), which again leads to close to zero probability and weights in the 
weighted least squares. Fix and test are included in the commits.

## What changes were proposed in this pull request?
Update initialization in Poisson GLM

## How was this patch tested?
Add test in GeneralizedLinearRegressionSuite

srowen sethah yanboliang HyukjinKwon mengxr

Author: actuaryzhang 

Closes #16131 from actuaryzhang/master.

(cherry picked from commit b8280271396eb74638da6546d76bbb2d06c7011b)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99c293ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99c293ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99c293ee

Branch: refs/heads/branch-2.1
Commit: 99c293eeaa9733fc424404d04a9671e9525a1e36
Parents: 340e9ae
Author: actuaryzhang 
Authored: Wed Dec 7 16:37:25 2016 +0800
Committer: Sean Owen 
Committed: Wed Dec 7 16:37:37 2016 +0800

--
 .../GeneralizedLinearRegression.scala   |  6 +-
 .../GeneralizedLinearRegressionSuite.scala  | 21 +++-
 2 files changed, 17 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/99c293ee/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
index 770a257..f137c8c 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
@@ -505,7 +505,11 @@ object GeneralizedLinearRegression extends 
DefaultParamsReadable[GeneralizedLine
 override def initialize(y: Double, weight: Double): Double = {
   require(y >= 0.0, "The response variable of Poisson family " +
 s"should be non-negative, but got $y")
-  y
+  /*
+Force Poisson mean > 0 to avoid numerical instability in IRLS.
+R uses y + 0.1 for initialization. See poisson()$initialize.
+   */
+  math.max(y, 0.1)
 }
 
 override def variance(mu: Double): Double = mu

http://git-wip-us.apache.org/repos/asf/spark/blob/99c293ee/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala
index 4fab216..3e9e1fc 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala
@@ -89,11 +89,14 @@ class GeneralizedLinearRegressionSuite
   xVariance = Array(0.7, 1.2), nPoints = 1, seed, noiseLevel = 0.01,
   family = "poisson", link = "log").toDF()
 
-datasetPoissonLogWithZero = generateGeneralizedLinearRegressionInput(
-  intercept = -1.5, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 
10.5),
-  xVariance = Array(0.7, 1.2), nPoints = 100, seed, noiseLevel = 0.01,
-  family = "poisson", link = "log")
-  .map{x => LabeledPoint(if (x.label < 0.7) 0.0 else x.label, 
x.features)}.toDF()
+datasetPoissonLogWithZero = Seq(
+  LabeledPoint(0.0, Vectors.dense(18, 1.0)),
+  LabeledPoint(1.0, Vectors.dense(12, 0.0)),
+  LabeledPoint(0.0, Vectors.dense(15, 0.0)),
+  LabeledPoint(0.0, Vectors.dense(13, 2.0)),
+  LabeledPoint(0.0, Vectors.dense(15, 1.0)),
+  LabeledPoint(1.0, Vectors.dense(16, 1.0))
+).toDF()
 
 datasetPoissonIdentity = generateGeneralizedLinearRegressionInput(
   intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 
10.5),
@@ -480,12 +483,12 @@ class GeneralizedLinearRegressionSuite
  model <- glm(formula, family="poisson", data=data)
  print(as.vector(coef(model)))
}
-   [1]  0.4272661 -0.1565423
-   [1] -3.6911354  

spark git commit: [SPARK-18686][SPARKR][ML] Several cleanup and improvements for spark.logit.

2016-12-07 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 3750c6e9b -> 340e9aea4


[SPARK-18686][SPARKR][ML] Several cleanup and improvements for spark.logit.

## What changes were proposed in this pull request?
Several cleanup and improvements for ```spark.logit```:
* ```summary``` should return coefficients matrix, and should output labels for 
each class if the model is multinomial logistic regression model.
* ```summary``` should not return ```areaUnderROC, roc, pr, ...```, since most 
of them are DataFrame which are less important for R users. Meanwhile, these 
metrics ignore instance weights (setting all to 1.0) which will be changed in 
later Spark version. In case it will introduce breaking changes, we do not 
expose them currently.
* SparkR test improvement: comparing the training result with native R glmnet.
* Remove argument ```aggregationDepth``` from ```spark.logit```, since it's an 
expert Param(related with Spark architecture and job execution) that would be 
used rarely by R users.

## How was this patch tested?
Unit tests.

The ```summary``` output after this change:
multinomial logistic regression:
```
> df <- suppressWarnings(createDataFrame(iris))
> model <- spark.logit(df, Species ~ ., regParam = 0.5)
> summary(model)
$coefficients
 versicolor  virginica   setosa
(Intercept)  1.514031-2.609108   1.095077
Sepal_Length 0.02511006  0.2649821   -0.2900921
Sepal_Width  -0.5291215  -0.02016446 0.549286
Petal_Length 0.03647411  0.1544119   -0.190886
Petal_Width  0.000236092 0.4195804   -0.4198165
```
binomial logistic regression:
```
> df <- suppressWarnings(createDataFrame(iris))
> training <- df[df$Species %in% c("versicolor", "virginica"), ]
> model <- spark.logit(training, Species ~ ., regParam = 0.5)
> summary(model)
$coefficients
 Estimate
(Intercept)  -6.053815
Sepal_Length 0.2449379
Sepal_Width  0.1648321
Petal_Length 0.4730718
Petal_Width  1.031947
```

Author: Yanbo Liang 

Closes #16117 from yanboliang/spark-18686.

(cherry picked from commit 90b59d1bf262b41c3a5f780697f504030f9d079c)
Signed-off-by: Yanbo Liang 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/340e9aea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/340e9aea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/340e9aea

Branch: refs/heads/branch-2.1
Commit: 340e9aea4853805c42b8739004d93efe8fe16ba4
Parents: 3750c6e
Author: Yanbo Liang 
Authored: Wed Dec 7 00:31:11 2016 -0800
Committer: Yanbo Liang 
Committed: Wed Dec 7 00:32:32 2016 -0800

--
 R/pkg/R/mllib.R |  86 +++--
 R/pkg/inst/tests/testthat/test_mllib.R  | 183 +--
 .../spark/ml/r/LogisticRegressionWrapper.scala  |  81 
 3 files changed, 203 insertions(+), 147 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/340e9aea/R/pkg/R/mllib.R
--
diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R
index eed8293..074e9cb 100644
--- a/R/pkg/R/mllib.R
+++ b/R/pkg/R/mllib.R
@@ -733,8 +733,6 @@ setMethod("predict", signature(object = "KMeansModel"),
 #'  excepting that at most one value may be 0. The class with 
largest value p/t is predicted, where p
 #'  is the original probability of that class and t is the 
class's threshold.
 #' @param weightCol The weight column name.
-#' @param aggregationDepth depth for treeAggregate (>= 2). If the dimensions 
of features or the number of partitions
-#' are large, this param could be adjusted to a larger 
size.
 #' @param probabilityCol column name for predicted class conditional 
probabilities.
 #' @param ... additional arguments passed to the method.
 #' @return \code{spark.logit} returns a fitted logistic regression model
@@ -746,45 +744,35 @@ setMethod("predict", signature(object = "KMeansModel"),
 #' \dontrun{
 #' sparkR.session()
 #' # binary logistic regression
-#' label <- c(0.0, 0.0, 0.0, 1.0, 1.0)
-#' features <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
-#' binary_data <- as.data.frame(cbind(label, features))
-#' binary_df <- createDataFrame(binary_data)
-#' blr_model <- spark.logit(binary_df, label ~ features, thresholds = 1.0)
-#' blr_predict <- collect(select(predict(blr_model, binary_df), "prediction"))
-#'
-#' # summary of binary logistic regression
-#' blr_summary <- summary(blr_model)
-#' blr_fmeasure <- collect(select(blr_summary$fMeasureByThreshold, 
"threshold", "F-Measure"))
+#' df <- createDataFrame(iris)
+#' training <- df[df$Species %in% c("versicolor", "virginica"), ]
+#' model <- spark.logit(training, Species ~ ., regParam = 0.5)
+#' summary <- summary(model)
+#'
+#' 

spark git commit: [SPARK-18686][SPARKR][ML] Several cleanup and improvements for spark.logit.

2016-12-07 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master 5c6bcdbda -> 90b59d1bf


[SPARK-18686][SPARKR][ML] Several cleanup and improvements for spark.logit.

## What changes were proposed in this pull request?
Several cleanup and improvements for ```spark.logit```:
* ```summary``` should return coefficients matrix, and should output labels for 
each class if the model is multinomial logistic regression model.
* ```summary``` should not return ```areaUnderROC, roc, pr, ...```, since most 
of them are DataFrame which are less important for R users. Meanwhile, these 
metrics ignore instance weights (setting all to 1.0) which will be changed in 
later Spark version. In case it will introduce breaking changes, we do not 
expose them currently.
* SparkR test improvement: comparing the training result with native R glmnet.
* Remove argument ```aggregationDepth``` from ```spark.logit```, since it's an 
expert Param(related with Spark architecture and job execution) that would be 
used rarely by R users.

## How was this patch tested?
Unit tests.

The ```summary``` output after this change:
multinomial logistic regression:
```
> df <- suppressWarnings(createDataFrame(iris))
> model <- spark.logit(df, Species ~ ., regParam = 0.5)
> summary(model)
$coefficients
 versicolor  virginica   setosa
(Intercept)  1.514031-2.609108   1.095077
Sepal_Length 0.02511006  0.2649821   -0.2900921
Sepal_Width  -0.5291215  -0.02016446 0.549286
Petal_Length 0.03647411  0.1544119   -0.190886
Petal_Width  0.000236092 0.4195804   -0.4198165
```
binomial logistic regression:
```
> df <- suppressWarnings(createDataFrame(iris))
> training <- df[df$Species %in% c("versicolor", "virginica"), ]
> model <- spark.logit(training, Species ~ ., regParam = 0.5)
> summary(model)
$coefficients
 Estimate
(Intercept)  -6.053815
Sepal_Length 0.2449379
Sepal_Width  0.1648321
Petal_Length 0.4730718
Petal_Width  1.031947
```

Author: Yanbo Liang 

Closes #16117 from yanboliang/spark-18686.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90b59d1b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90b59d1b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90b59d1b

Branch: refs/heads/master
Commit: 90b59d1bf262b41c3a5f780697f504030f9d079c
Parents: 5c6bcdb
Author: Yanbo Liang 
Authored: Wed Dec 7 00:31:11 2016 -0800
Committer: Yanbo Liang 
Committed: Wed Dec 7 00:31:11 2016 -0800

--
 R/pkg/R/mllib.R |  86 +++--
 R/pkg/inst/tests/testthat/test_mllib.R  | 183 +--
 .../spark/ml/r/LogisticRegressionWrapper.scala  |  81 
 3 files changed, 203 insertions(+), 147 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/90b59d1b/R/pkg/R/mllib.R
--
diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R
index eed8293..074e9cb 100644
--- a/R/pkg/R/mllib.R
+++ b/R/pkg/R/mllib.R
@@ -733,8 +733,6 @@ setMethod("predict", signature(object = "KMeansModel"),
 #'  excepting that at most one value may be 0. The class with 
largest value p/t is predicted, where p
 #'  is the original probability of that class and t is the 
class's threshold.
 #' @param weightCol The weight column name.
-#' @param aggregationDepth depth for treeAggregate (>= 2). If the dimensions 
of features or the number of partitions
-#' are large, this param could be adjusted to a larger 
size.
 #' @param probabilityCol column name for predicted class conditional 
probabilities.
 #' @param ... additional arguments passed to the method.
 #' @return \code{spark.logit} returns a fitted logistic regression model
@@ -746,45 +744,35 @@ setMethod("predict", signature(object = "KMeansModel"),
 #' \dontrun{
 #' sparkR.session()
 #' # binary logistic regression
-#' label <- c(0.0, 0.0, 0.0, 1.0, 1.0)
-#' features <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
-#' binary_data <- as.data.frame(cbind(label, features))
-#' binary_df <- createDataFrame(binary_data)
-#' blr_model <- spark.logit(binary_df, label ~ features, thresholds = 1.0)
-#' blr_predict <- collect(select(predict(blr_model, binary_df), "prediction"))
-#'
-#' # summary of binary logistic regression
-#' blr_summary <- summary(blr_model)
-#' blr_fmeasure <- collect(select(blr_summary$fMeasureByThreshold, 
"threshold", "F-Measure"))
+#' df <- createDataFrame(iris)
+#' training <- df[df$Species %in% c("versicolor", "virginica"), ]
+#' model <- spark.logit(training, Species ~ ., regParam = 0.5)
+#' summary <- summary(model)
+#'
+#' # fitted values on training data
+#' fitted <- predict(model, training)
+#'
 #' # save fitted model to input path
 #' path <-