spark git commit: Fix missing close-parens for In filter's toString

2017-01-12 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 6b34e745b -> b040cef2e


Fix missing close-parens for In filter's toString

Otherwise the open parentheses isn't closed in query plan descriptions of batch 
scans.

PushedFilters: [In(COL_A, [1,2,4,6,10,16,219,815], IsNotNull(COL_B), ...

Author: Andrew Ash 

Closes #16558 from ash211/patch-9.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b040cef2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b040cef2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b040cef2

Branch: refs/heads/master
Commit: b040cef2ed0ed46c3dfb483a117200c9dac074ca
Parents: 6b34e74
Author: Andrew Ash 
Authored: Thu Jan 12 23:14:07 2017 -0800
Committer: Reynold Xin 
Committed: Thu Jan 12 23:14:07 2017 -0800

--
 sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b040cef2/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
index e0494df..2499e9b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
@@ -130,7 +130,7 @@ case class In(attribute: String, values: Array[Any]) 
extends Filter {
 case _ => false
   }
   override def toString: String = {
-s"In($attribute, [${values.mkString(",")}]"
+s"In($attribute, [${values.mkString(",")}])"
   }
 
   override def references: Array[String] = Array(attribute) ++ 
values.flatMap(findReferences)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Fix missing close-parens for In filter's toString

2017-01-12 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 55d2a1178 -> be527ddc0


Fix missing close-parens for In filter's toString

Otherwise the open parentheses isn't closed in query plan descriptions of batch 
scans.

PushedFilters: [In(COL_A, [1,2,4,6,10,16,219,815], IsNotNull(COL_B), ...

Author: Andrew Ash 

Closes #16558 from ash211/patch-9.

(cherry picked from commit b040cef2ed0ed46c3dfb483a117200c9dac074ca)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be527ddc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be527ddc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be527ddc

Branch: refs/heads/branch-2.0
Commit: be527ddc07be009ad6aa4f8561c7f8406468a8dc
Parents: 55d2a11
Author: Andrew Ash 
Authored: Thu Jan 12 23:14:07 2017 -0800
Committer: Reynold Xin 
Committed: Thu Jan 12 23:14:25 2017 -0800

--
 sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/be527ddc/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
index 9130e77..f7e4f93 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
@@ -97,7 +97,7 @@ case class In(attribute: String, values: Array[Any]) extends 
Filter {
 case _ => false
   }
   override def toString: String = {
-s"In($attribute, [${values.mkString(",")}]"
+s"In($attribute, [${values.mkString(",")}])"
   }
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Fix missing close-parens for In filter's toString

2017-01-12 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 23944d0d6 -> 0668e061b


Fix missing close-parens for In filter's toString

Otherwise the open parentheses isn't closed in query plan descriptions of batch 
scans.

PushedFilters: [In(COL_A, [1,2,4,6,10,16,219,815], IsNotNull(COL_B), ...

Author: Andrew Ash 

Closes #16558 from ash211/patch-9.

(cherry picked from commit b040cef2ed0ed46c3dfb483a117200c9dac074ca)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0668e061
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0668e061
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0668e061

Branch: refs/heads/branch-2.1
Commit: 0668e061beba683d026a2d48011ff74faf8a38ab
Parents: 23944d0
Author: Andrew Ash 
Authored: Thu Jan 12 23:14:07 2017 -0800
Committer: Reynold Xin 
Committed: Thu Jan 12 23:14:15 2017 -0800

--
 sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0668e061/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
index e0494df..2499e9b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
@@ -130,7 +130,7 @@ case class In(attribute: String, values: Array[Any]) 
extends Filter {
 case _ => false
   }
   override def toString: String = {
-s"In($attribute, [${values.mkString(",")}]"
+s"In($attribute, [${values.mkString(",")}])"
   }
 
   override def references: Array[String] = Array(attribute) ++ 
values.flatMap(findReferences)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19178][SQL] convert string of large numbers to int should return null

2017-01-12 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 7f24a0b6c -> 6b34e745b


[SPARK-19178][SQL] convert string of large numbers to int should return null

## What changes were proposed in this pull request?

When we convert a string to integral, we will convert that string to 
`decimal(20, 0)` first, so that we can turn a string with decimal format to 
truncated integral, e.g. `CAST('1.2' AS int)` will return `1`.

However, this brings problems when we convert a string with large numbers to 
integral, e.g. `CAST('1234567890123' AS int)` will return `1912276171`, while 
Hive returns null as we expected.

This is a long standing bug(seems it was there the first day Spark SQL was 
created), this PR fixes this bug by adding the native support to convert 
`UTF8String` to integral.

## How was this patch tested?

new regression tests

Author: Wenchen Fan 

Closes #16550 from cloud-fan/string-to-int.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b34e745
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b34e745
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b34e745

Branch: refs/heads/master
Commit: 6b34e745bb8bdcf5a8bb78359fa39bbe8c6563cc
Parents: 7f24a0b
Author: Wenchen Fan 
Authored: Thu Jan 12 22:52:34 2017 -0800
Committer: gatorsmile 
Committed: Thu Jan 12 22:52:34 2017 -0800

--
 .../apache/spark/unsafe/types/UTF8String.java   | 184 +++
 .../sql/catalyst/analysis/TypeCoercion.scala|  16 --
 .../spark/sql/catalyst/expressions/Cast.scala   |  18 +-
 .../test/resources/sql-tests/inputs/cast.sql|  43 +
 .../resources/sql-tests/results/cast.sql.out| 178 ++
 5 files changed, 414 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6b34e745/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
--
diff --git 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
index 0255f53..3800d53 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
@@ -835,6 +835,190 @@ public final class UTF8String implements 
Comparable, Externalizable,
 return fromString(sb.toString());
   }
 
+  private int getDigit(byte b) {
+if (b >= '0' && b <= '9') {
+  return b - '0';
+}
+throw new NumberFormatException(toString());
+  }
+
+  /**
+   * Parses this UTF8String to long.
+   *
+   * Note that, in this method we accumulate the result in negative format, 
and convert it to
+   * positive format at the end, if this string is not started with '-'. This 
is because min value
+   * is bigger than max value in digits, e.g. Integer.MAX_VALUE is 
'2147483647' and
+   * Integer.MIN_VALUE is '-2147483648'.
+   *
+   * This code is mostly copied from LazyLong.parseLong in Hive.
+   */
+  public long toLong() {
+if (numBytes == 0) {
+  throw new NumberFormatException("Empty string");
+}
+
+byte b = getByte(0);
+final boolean negative = b == '-';
+int offset = 0;
+if (negative || b == '+') {
+  offset++;
+  if (numBytes == 1) {
+throw new NumberFormatException(toString());
+  }
+}
+
+final byte separator = '.';
+final int radix = 10;
+final long stopValue = Long.MIN_VALUE / radix;
+long result = 0;
+
+while (offset < numBytes) {
+  b = getByte(offset);
+  offset++;
+  if (b == separator) {
+// We allow decimals and will return a truncated integral in that case.
+// Therefore we won't throw an exception here (checking the fractional
+// part happens below.)
+break;
+  }
+
+  int digit = getDigit(b);
+  // We are going to process the new digit and accumulate the result. 
However, before doing
+  // this, if the result is already smaller than the 
stopValue(Long.MIN_VALUE / radix), then
+  // result * 10 will definitely be smaller than minValue, and we can stop 
and throw exception.
+  if (result < stopValue) {
+throw new NumberFormatException(toString());
+  }
+
+  result = result * radix - digit;
+  // Since the previous result is less than or equal to 
stopValue(Long.MIN_VALUE / radix), we
+  // can just use `result > 0` to check overflow. If result overflows, we 
should stop and throw
+  // exception.
+  if (result > 0) {
+throw new NumberFormatException(toString());
+  }
+}
+
+// This is the case when we've encountered a decimal separator. The 
fractional
+// part 

spark git commit: [SPARK-19142][SPARKR] spark.kmeans should take seed, initSteps, and tol as parameters

2017-01-12 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master 3356b8b6a -> 7f24a0b6c


[SPARK-19142][SPARKR] spark.kmeans should take seed, initSteps, and tol as 
parameters

## What changes were proposed in this pull request?
spark.kmeans doesn't have interface to set initSteps, seed and tol. As Spark 
Kmeans algorithm doesn't take the same set of parameters as R kmeans, we should 
maintain a different interface in spark.kmeans.

Add missing parameters and corresponding document.

Modified existing unit tests to take additional parameters.

Author: wm...@hotmail.com 

Closes #16523 from wangmiao1981/kmeans.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f24a0b6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f24a0b6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f24a0b6

Branch: refs/heads/master
Commit: 7f24a0b6c32c56a38cf879d953bbd523922ab9c9
Parents: 3356b8b
Author: wm...@hotmail.com 
Authored: Thu Jan 12 22:27:57 2017 -0800
Committer: Yanbo Liang 
Committed: Thu Jan 12 22:27:57 2017 -0800

--
 R/pkg/R/mllib_clustering.R  | 13 +++--
 .../inst/tests/testthat/test_mllib_clustering.R | 20 
 .../org/apache/spark/ml/r/KMeansWrapper.scala   |  9 -
 3 files changed, 39 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7f24a0b6/R/pkg/R/mllib_clustering.R
--
diff --git a/R/pkg/R/mllib_clustering.R b/R/pkg/R/mllib_clustering.R
index c443588..ca5182d 100644
--- a/R/pkg/R/mllib_clustering.R
+++ b/R/pkg/R/mllib_clustering.R
@@ -175,6 +175,10 @@ setMethod("write.ml", signature(object = 
"GaussianMixtureModel", path = "charact
 #' @param k number of centers.
 #' @param maxIter maximum iteration number.
 #' @param initMode the initialization algorithm choosen to fit the model.
+#' @param seed the random seed for cluster initialization.
+#' @param initSteps the number of steps for the k-means|| initialization mode.
+#'  This is an advanced setting, the default of 2 is almost 
always enough. Must be > 0.
+#' @param tol convergence tolerance of iterations.
 #' @param ... additional argument(s) passed to the method.
 #' @return \code{spark.kmeans} returns a fitted k-means model.
 #' @rdname spark.kmeans
@@ -204,11 +208,16 @@ setMethod("write.ml", signature(object = 
"GaussianMixtureModel", path = "charact
 #' @note spark.kmeans since 2.0.0
 #' @seealso \link{predict}, \link{read.ml}, \link{write.ml}
 setMethod("spark.kmeans", signature(data = "SparkDataFrame", formula = 
"formula"),
-  function(data, formula, k = 2, maxIter = 20, initMode = 
c("k-means||", "random")) {
+  function(data, formula, k = 2, maxIter = 20, initMode = 
c("k-means||", "random"),
+   seed = NULL, initSteps = 2, tol = 1E-4) {
 formula <- paste(deparse(formula), collapse = "")
 initMode <- match.arg(initMode)
+if (!is.null(seed)) {
+  seed <- as.character(as.integer(seed))
+}
 jobj <- callJStatic("org.apache.spark.ml.r.KMeansWrapper", "fit", 
data@sdf, formula,
-as.integer(k), as.integer(maxIter), initMode)
+as.integer(k), as.integer(maxIter), initMode, 
seed,
+as.integer(initSteps), as.numeric(tol))
 new("KMeansModel", jobj = jobj)
   })
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7f24a0b6/R/pkg/inst/tests/testthat/test_mllib_clustering.R
--
diff --git a/R/pkg/inst/tests/testthat/test_mllib_clustering.R 
b/R/pkg/inst/tests/testthat/test_mllib_clustering.R
index 1980fff..f013991 100644
--- a/R/pkg/inst/tests/testthat/test_mllib_clustering.R
+++ b/R/pkg/inst/tests/testthat/test_mllib_clustering.R
@@ -132,6 +132,26 @@ test_that("spark.kmeans", {
   expect_true(summary2$is.loaded)
 
   unlink(modelPath)
+
+  # Test Kmeans on dataset that is sensitive to seed value
+  col1 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
+  col2 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
+  col3 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
+  cols <- as.data.frame(cbind(col1, col2, col3))
+  df <- createDataFrame(cols)
+
+  model1 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10,
+ initMode = "random", seed = 1, tol = 1E-5)
+  model2 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10,
+ initMode = "random", seed = 2, tol = 1E-5)
+
+  fitted.model1 <- fitted(model1)
+  fitted.model2 <- fitted(model2)
+  # The predicted clusters are different
+  expect_equal(sort(collect(distinct(select(fitted.model1, 

spark git commit: [SPARK-19092][SQL] Save() API of DataFrameWriter should not scan all the saved files

2017-01-12 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master c983267b0 -> 3356b8b6a


[SPARK-19092][SQL] Save() API of DataFrameWriter should not scan all the saved 
files

### What changes were proposed in this pull request?
`DataFrameWriter`'s [save() 
API](https://github.com/gatorsmile/spark/blob/5d38f09f47a767a342a0a8219c63efa2943b5d1f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L207)
 is performing a unnecessary full filesystem scan for the saved files. The 
save() API is the most basic/core API in `DataFrameWriter`. We should avoid it.

The related PR: https://github.com/apache/spark/pull/16090

### How was this patch tested?
Updated the existing test cases.

Author: gatorsmile 

Closes #16481 from gatorsmile/saveFileScan.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3356b8b6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3356b8b6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3356b8b6

Branch: refs/heads/master
Commit: 3356b8b6a9184fcab8d0fe993f3545c3beaa4d99
Parents: c983267
Author: gatorsmile 
Authored: Fri Jan 13 13:05:53 2017 +0800
Committer: Wenchen Fan 
Committed: Fri Jan 13 13:05:53 2017 +0800

--
 .../command/createDataSourceTables.scala|   2 +-
 .../sql/execution/datasources/DataSource.scala  | 172 +++
 .../hive/PartitionedTablePerfStatsSuite.scala   |  29 +---
 3 files changed, 106 insertions(+), 97 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3356b8b6/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 73b2153..90aeebd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -199,7 +199,7 @@ case class CreateDataSourceTableAsSelectCommand(
   catalogTable = if (tableExists) Some(table) else None)
 
 try {
-  dataSource.write(mode, Dataset.ofRows(session, query))
+  dataSource.writeAndRead(mode, Dataset.ofRows(session, query))
 } catch {
   case ex: AnalysisException =>
 logError(s"Failed to write to table 
${table.identifier.unquotedString}", ex)

http://git-wip-us.apache.org/repos/asf/spark/blob/3356b8b6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index b7f3559..29afe57 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -413,10 +413,85 @@ case class DataSource(
 relation
   }
 
-  /** Writes the given [[DataFrame]] out to this [[DataSource]]. */
-  def write(
-  mode: SaveMode,
-  data: DataFrame): BaseRelation = {
+  /**
+   * Writes the given [[DataFrame]] out in this [[FileFormat]].
+   */
+  private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: 
DataFrame): Unit = {
+// Don't glob path for the write path.  The contracts here are:
+//  1. Only one output path can be specified on the write path;
+//  2. Output path must be a legal HDFS style file system path;
+//  3. It's OK that the output path doesn't exist yet;
+val allPaths = paths ++ caseInsensitiveOptions.get("path")
+val outputPath = if (allPaths.length == 1) {
+  val path = new Path(allPaths.head)
+  val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
+  path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+} else {
+  throw new IllegalArgumentException("Expected exactly one path to be 
specified, but " +
+s"got: ${allPaths.mkString(", ")}")
+}
+
+val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, 
caseSensitive)
+
+// If we are appending to a table that already exists, make sure the 
partitioning matches
+// up.  If we fail to load the table for whatever reason, ignore the check.
+if (mode == SaveMode.Append) {
+  val existingPartitionColumns = Try {
+getOrInferFileFormatSchema(format, justPartitioning = 

spark git commit: [SPARK-19110][MLLIB][FOLLOWUP] Add a unit test for testing logPrior and logLikelihood of DistributedLDAModel in MLLIB

2017-01-12 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 5585ed93b -> c983267b0


[SPARK-19110][MLLIB][FOLLOWUP] Add a unit test for testing logPrior and 
logLikelihood of DistributedLDAModel in MLLIB

## What changes were proposed in this pull request?
#16491 added the fix to mllib and a unit test to ml. This followup PR, add unit 
tests to mllib suite.

## How was this patch tested?
Unit tests.

Author: wm...@hotmail.com 

Closes #16524 from wangmiao1981/ldabug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c983267b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c983267b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c983267b

Branch: refs/heads/master
Commit: c983267b0853f908d1c671cedd18b159e6993df1
Parents: 5585ed9
Author: wm...@hotmail.com 
Authored: Thu Jan 12 18:31:57 2017 -0800
Committer: Joseph K. Bradley 
Committed: Thu Jan 12 18:31:57 2017 -0800

--
 .../test/scala/org/apache/spark/mllib/clustering/LDASuite.scala| 2 ++
 1 file changed, 2 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c983267b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
index 211e2bc..086bb21 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
@@ -505,6 +505,8 @@ class LDASuite extends SparkFunSuite with 
MLlibTestSparkContext {
   assert(distributedModel.topicConcentration === 
sameDistributedModel.topicConcentration)
   assert(distributedModel.gammaShape === sameDistributedModel.gammaShape)
   assert(distributedModel.globalTopicTotals === 
sameDistributedModel.globalTopicTotals)
+  assert(distributedModel.logLikelihood ~== 
sameDistributedModel.logLikelihood absTol 1e-6)
+  assert(distributedModel.logPrior ~== sameDistributedModel.logPrior 
absTol 1e-6)
 
   val graph = distributedModel.graph
   val sameGraph = sameDistributedModel.graph


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17237][SQL] Remove backticks in a pivot result schema

2017-01-12 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 2bc4d4e28 -> 5585ed93b


[SPARK-17237][SQL] Remove backticks in a pivot result schema

## What changes were proposed in this pull request?
Pivoting adds backticks (e.g. 3_count(\`c\`)) in column names and, in some 
cases,
thes causes analysis exceptions  like;
```
scala> val df = Seq((2, 3, 4), (3, 4, 5)).toDF("a", "x", "y")
scala> df.groupBy("a").pivot("x").agg(count("y"), avg("y")).na.fill(0)
org.apache.spark.sql.AnalysisException: syntax error in attribute name: 
`3_count(`y`)`;
  at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.e$1(unresolved.scala:134)
  at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.parseAttributeName(unresolved.scala:144)
...
```
So, this pr proposes to remove these backticks from column names.

## How was this patch tested?
Added a test in `DataFrameAggregateSuite`.

Author: Takeshi YAMAMURO 

Closes #14812 from maropu/SPARK-17237.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5585ed93
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5585ed93
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5585ed93

Branch: refs/heads/master
Commit: 5585ed93b09bc05cdd7a731650eca50d43d7159b
Parents: 2bc4d4e
Author: Takeshi YAMAMURO 
Authored: Thu Jan 12 09:46:53 2017 -0800
Committer: gatorsmile 
Committed: Thu Jan 12 09:46:53 2017 -0800

--
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala| 2 +-
 .../scala/org/apache/spark/sql/DataFrameAggregateSuite.scala | 8 
 .../scala/org/apache/spark/sql/DataFramePivotSuite.scala | 2 +-
 3 files changed, 10 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5585ed93/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 3c58832..1957df8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -477,7 +477,7 @@ class Analyzer(
   } else {
 val suffix = aggregate match {
   case n: NamedExpression => n.name
-  case _ => aggregate.sql
+  case _ => toPrettySQL(aggregate)
 }
 value + "_" + suffix
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5585ed93/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 7853b22fe..e707912 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -530,4 +530,12 @@ class DataFrameAggregateSuite extends QueryTest with 
SharedSQLContext {
   limit2Df.groupBy("id").count().select($"id"),
   limit2Df.select($"id"))
   }
+
+  test("SPARK-17237 remove backticks in a pivot result schema") {
+val df = Seq((2, 3, 4), (3, 4, 5)).toDF("a", "x", "y")
+checkAnswer(
+  df.groupBy("a").pivot("x").agg(count("y"), avg("y")).na.fill(0),
+  Seq(Row(3, 0, 0.0, 1, 5.0), Row(2, 1, 4.0, 0, 0.0))
+)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5585ed93/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
index a8d854c..51ffe34 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
@@ -200,7 +200,7 @@ class DataFramePivotSuite extends QueryTest with 
SharedSQLContext{
 
   test("pivot preserves aliases if given") {
 assertResult(
-  Array("year", "dotNET_foo", "dotNET_avg(`earnings`)", "Java_foo", 
"Java_avg(`earnings`)")
+  Array("year", "dotNET_foo", "dotNET_avg(earnings)", "Java_foo", 
"Java_avg(earnings)")
 )(
   courseSales.groupBy($"year")
 .pivot("course", Seq("dotNET", "Java"))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional 

spark git commit: [SPARK-17237][SQL] Remove backticks in a pivot result schema

2017-01-12 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 042e32d18 -> 23944d0d6


[SPARK-17237][SQL] Remove backticks in a pivot result schema

## What changes were proposed in this pull request?
Pivoting adds backticks (e.g. 3_count(\`c\`)) in column names and, in some 
cases,
thes causes analysis exceptions  like;
```
scala> val df = Seq((2, 3, 4), (3, 4, 5)).toDF("a", "x", "y")
scala> df.groupBy("a").pivot("x").agg(count("y"), avg("y")).na.fill(0)
org.apache.spark.sql.AnalysisException: syntax error in attribute name: 
`3_count(`y`)`;
  at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.e$1(unresolved.scala:134)
  at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.parseAttributeName(unresolved.scala:144)
...
```
So, this pr proposes to remove these backticks from column names.

## How was this patch tested?
Added a test in `DataFrameAggregateSuite`.

Author: Takeshi YAMAMURO 

Closes #14812 from maropu/SPARK-17237.

(cherry picked from commit 5585ed93b09bc05cdd7a731650eca50d43d7159b)
Signed-off-by: gatorsmile 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23944d0d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23944d0d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23944d0d

Branch: refs/heads/branch-2.1
Commit: 23944d0d64a07d29e9bfcb8f8d6d22858ec02aef
Parents: 042e32d
Author: Takeshi YAMAMURO 
Authored: Thu Jan 12 09:46:53 2017 -0800
Committer: gatorsmile 
Committed: Thu Jan 12 09:47:09 2017 -0800

--
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala| 2 +-
 .../scala/org/apache/spark/sql/DataFrameAggregateSuite.scala | 8 
 .../scala/org/apache/spark/sql/DataFramePivotSuite.scala | 2 +-
 3 files changed, 10 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/23944d0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ab9de02..f873996 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -383,7 +383,7 @@ class Analyzer(
   } else {
 val suffix = aggregate match {
   case n: NamedExpression => n.name
-  case _ => aggregate.sql
+  case _ => toPrettySQL(aggregate)
 }
 value + "_" + suffix
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/23944d0d/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 7853b22fe..e707912 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -530,4 +530,12 @@ class DataFrameAggregateSuite extends QueryTest with 
SharedSQLContext {
   limit2Df.groupBy("id").count().select($"id"),
   limit2Df.select($"id"))
   }
+
+  test("SPARK-17237 remove backticks in a pivot result schema") {
+val df = Seq((2, 3, 4), (3, 4, 5)).toDF("a", "x", "y")
+checkAnswer(
+  df.groupBy("a").pivot("x").agg(count("y"), avg("y")).na.fill(0),
+  Seq(Row(3, 0, 0.0, 1, 5.0), Row(2, 1, 4.0, 0, 0.0))
+)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/23944d0d/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
index a8d854c..51ffe34 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
@@ -200,7 +200,7 @@ class DataFramePivotSuite extends QueryTest with 
SharedSQLContext{
 
   test("pivot preserves aliases if given") {
 assertResult(
-  Array("year", "dotNET_foo", "dotNET_avg(`earnings`)", "Java_foo", 
"Java_avg(`earnings`)")
+  Array("year", "dotNET_foo", "dotNET_avg(earnings)", "Java_foo", 
"Java_avg(earnings)")
 )(
   courseSales.groupBy($"year")
 .pivot("course", Seq("dotNET", "Java"))



spark git commit: [SPARK-12757][CORE] lower "block locks were not released" log to info level

2017-01-12 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/master c6c37b8af -> 2bc4d4e28


[SPARK-12757][CORE] lower "block locks were not released" log to info level

## What changes were proposed in this pull request?

lower "block locks were not released" log to info level, as it is generating a 
lot of warnings in running ML, graph calls, as pointed out in the JIRA.

Author: Felix Cheung 

Closes #16513 from felixcheung/blocklockswarn.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bc4d4e2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bc4d4e2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bc4d4e2

Branch: refs/heads/master
Commit: 2bc4d4e286e65f8b4e9ee21bccd913b62e6061f2
Parents: c6c37b8
Author: Felix Cheung 
Authored: Thu Jan 12 09:45:16 2017 -0800
Committer: Felix Cheung 
Committed: Thu Jan 12 09:45:16 2017 -0800

--
 core/src/main/scala/org/apache/spark/executor/Executor.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2bc4d4e2/core/src/main/scala/org/apache/spark/executor/Executor.scala
--
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 789198f..b6c0f0c 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -336,7 +336,7 @@ private[spark] class Executor(
 if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false)) {
   throw new SparkException(errMsg)
 } else {
-  logWarning(errMsg)
+  logInfo(errMsg)
 }
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19055][SQL][PYSPARK] Fix SparkSession initialization when SparkContext is stopped

2017-01-12 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 3566e40a4 -> 55d2a1178


[SPARK-19055][SQL][PYSPARK] Fix SparkSession initialization when SparkContext 
is stopped

## What changes were proposed in this pull request?

In SparkSession initialization, we store created the instance of SparkSession 
into a class variable _instantiatedContext. Next time we can use 
SparkSession.builder.getOrCreate() to retrieve the existing SparkSession 
instance.

However, when the active SparkContext is stopped and we create another new 
SparkContext to use, the existing SparkSession is still associated with the 
stopped SparkContext. So the operations with this existing SparkSession will be 
failed.

We need to detect such case in SparkSession and renew the class variable 
_instantiatedContext if needed.

## How was this patch tested?

New test added in PySpark.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh 

Closes #16454 from viirya/fix-pyspark-sparksession.

(cherry picked from commit c6c37b8af714c8ddc8c77ac943a379f703558f27)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/55d2a117
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/55d2a117
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/55d2a117

Branch: refs/heads/branch-2.0
Commit: 55d2a117805d76fd27e2960f92ece88238488231
Parents: 3566e40
Author: Liang-Chi Hsieh 
Authored: Thu Jan 12 20:53:31 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jan 12 20:54:41 2017 +0800

--
 python/pyspark/sql/session.py | 16 ++--
 python/pyspark/sql/tests.py   | 23 +++
 2 files changed, 33 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/55d2a117/python/pyspark/sql/session.py
--
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index d25823d..79017c6 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -161,8 +161,8 @@ class SparkSession(object):
 with self._lock:
 from pyspark.context import SparkContext
 from pyspark.conf import SparkConf
-session = SparkSession._instantiatedContext
-if session is None:
+session = SparkSession._instantiatedSession
+if session is None or session._sc._jsc is None:
 sparkConf = SparkConf()
 for key, value in self._options.items():
 sparkConf.set(key, value)
@@ -183,7 +183,7 @@ class SparkSession(object):
 
 builder = Builder()
 
-_instantiatedContext = None
+_instantiatedSession = None
 
 @ignore_unicode_prefix
 def __init__(self, sparkContext, jsparkSession=None):
@@ -214,8 +214,12 @@ class SparkSession(object):
 self._wrapped = SQLContext(self._sc, self, self._jwrapped)
 _monkey_patch_RDD(self)
 install_exception_handler()
-if SparkSession._instantiatedContext is None:
-SparkSession._instantiatedContext = self
+# If we had an instantiated SparkSession attached with a SparkContext
+# which is stopped now, we need to renew the instantiated SparkSession.
+# Otherwise, we will use invalid SparkSession when we call 
Builder.getOrCreate.
+if SparkSession._instantiatedSession is None \
+or SparkSession._instantiatedSession._sc._jsc is None:
+SparkSession._instantiatedSession = self
 
 @since(2.0)
 def newSession(self):
@@ -597,7 +601,7 @@ class SparkSession(object):
 """Stop the underlying :class:`SparkContext`.
 """
 self._sc.stop()
-SparkSession._instantiatedContext = None
+SparkSession._instantiatedSession = None
 
 @since(2.0)
 def __enter__(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/55d2a117/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index b3cf72b..796b964 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -46,6 +46,7 @@ if sys.version_info[:2] <= (2, 6):
 else:
 import unittest
 
+from pyspark import SparkContext
 from pyspark.sql import SparkSession, HiveContext, Column, Row
 from pyspark.sql.types import *
 from pyspark.sql.types import UserDefinedType, _infer_type
@@ -1772,6 +1773,28 @@ class HiveSparkSubmitTests(SparkSubmitTests):
 self.assertTrue(os.path.exists(metastore_path))
 
 
+class SQLTests2(ReusedPySparkTestCase):
+
+

spark git commit: [SPARK-19055][SQL][PYSPARK] Fix SparkSession initialization when SparkContext is stopped

2017-01-12 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 616a78a56 -> 042e32d18


[SPARK-19055][SQL][PYSPARK] Fix SparkSession initialization when SparkContext 
is stopped

## What changes were proposed in this pull request?

In SparkSession initialization, we store created the instance of SparkSession 
into a class variable _instantiatedContext. Next time we can use 
SparkSession.builder.getOrCreate() to retrieve the existing SparkSession 
instance.

However, when the active SparkContext is stopped and we create another new 
SparkContext to use, the existing SparkSession is still associated with the 
stopped SparkContext. So the operations with this existing SparkSession will be 
failed.

We need to detect such case in SparkSession and renew the class variable 
_instantiatedContext if needed.

## How was this patch tested?

New test added in PySpark.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh 

Closes #16454 from viirya/fix-pyspark-sparksession.

(cherry picked from commit c6c37b8af714c8ddc8c77ac943a379f703558f27)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/042e32d1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/042e32d1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/042e32d1

Branch: refs/heads/branch-2.1
Commit: 042e32d18ad10be5c60907959e55b0324df5b2c0
Parents: 616a78a
Author: Liang-Chi Hsieh 
Authored: Thu Jan 12 20:53:31 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jan 12 20:54:16 2017 +0800

--
 python/pyspark/sql/session.py | 16 ++--
 python/pyspark/sql/tests.py   | 23 +++
 2 files changed, 33 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/042e32d1/python/pyspark/sql/session.py
--
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 1e40b9c..9f4772e 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -161,8 +161,8 @@ class SparkSession(object):
 with self._lock:
 from pyspark.context import SparkContext
 from pyspark.conf import SparkConf
-session = SparkSession._instantiatedContext
-if session is None:
+session = SparkSession._instantiatedSession
+if session is None or session._sc._jsc is None:
 sparkConf = SparkConf()
 for key, value in self._options.items():
 sparkConf.set(key, value)
@@ -183,7 +183,7 @@ class SparkSession(object):
 
 builder = Builder()
 
-_instantiatedContext = None
+_instantiatedSession = None
 
 @ignore_unicode_prefix
 def __init__(self, sparkContext, jsparkSession=None):
@@ -214,8 +214,12 @@ class SparkSession(object):
 self._wrapped = SQLContext(self._sc, self, self._jwrapped)
 _monkey_patch_RDD(self)
 install_exception_handler()
-if SparkSession._instantiatedContext is None:
-SparkSession._instantiatedContext = self
+# If we had an instantiated SparkSession attached with a SparkContext
+# which is stopped now, we need to renew the instantiated SparkSession.
+# Otherwise, we will use invalid SparkSession when we call 
Builder.getOrCreate.
+if SparkSession._instantiatedSession is None \
+or SparkSession._instantiatedSession._sc._jsc is None:
+SparkSession._instantiatedSession = self
 
 @since(2.0)
 def newSession(self):
@@ -595,7 +599,7 @@ class SparkSession(object):
 """Stop the underlying :class:`SparkContext`.
 """
 self._sc.stop()
-SparkSession._instantiatedContext = None
+SparkSession._instantiatedSession = None
 
 @since(2.0)
 def __enter__(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/042e32d1/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 6de63e6..fe034bc 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -46,6 +46,7 @@ if sys.version_info[:2] <= (2, 6):
 else:
 import unittest
 
+from pyspark import SparkContext
 from pyspark.sql import SparkSession, HiveContext, Column, Row
 from pyspark.sql.types import *
 from pyspark.sql.types import UserDefinedType, _infer_type
@@ -1877,6 +1878,28 @@ class HiveSparkSubmitTests(SparkSubmitTests):
 self.assertTrue(os.path.exists(metastore_path))
 
 
+class SQLTests2(ReusedPySparkTestCase):
+
+

spark git commit: [SPARK-19055][SQL][PYSPARK] Fix SparkSession initialization when SparkContext is stopped

2017-01-12 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 871d26664 -> c6c37b8af


[SPARK-19055][SQL][PYSPARK] Fix SparkSession initialization when SparkContext 
is stopped

## What changes were proposed in this pull request?

In SparkSession initialization, we store created the instance of SparkSession 
into a class variable _instantiatedContext. Next time we can use 
SparkSession.builder.getOrCreate() to retrieve the existing SparkSession 
instance.

However, when the active SparkContext is stopped and we create another new 
SparkContext to use, the existing SparkSession is still associated with the 
stopped SparkContext. So the operations with this existing SparkSession will be 
failed.

We need to detect such case in SparkSession and renew the class variable 
_instantiatedContext if needed.

## How was this patch tested?

New test added in PySpark.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh 

Closes #16454 from viirya/fix-pyspark-sparksession.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6c37b8a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6c37b8a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6c37b8a

Branch: refs/heads/master
Commit: c6c37b8af714c8ddc8c77ac943a379f703558f27
Parents: 871d266
Author: Liang-Chi Hsieh 
Authored: Thu Jan 12 20:53:31 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jan 12 20:53:31 2017 +0800

--
 python/pyspark/sql/session.py | 16 ++--
 python/pyspark/sql/tests.py   | 23 +++
 2 files changed, 33 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c6c37b8a/python/pyspark/sql/session.py
--
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 1e40b9c..9f4772e 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -161,8 +161,8 @@ class SparkSession(object):
 with self._lock:
 from pyspark.context import SparkContext
 from pyspark.conf import SparkConf
-session = SparkSession._instantiatedContext
-if session is None:
+session = SparkSession._instantiatedSession
+if session is None or session._sc._jsc is None:
 sparkConf = SparkConf()
 for key, value in self._options.items():
 sparkConf.set(key, value)
@@ -183,7 +183,7 @@ class SparkSession(object):
 
 builder = Builder()
 
-_instantiatedContext = None
+_instantiatedSession = None
 
 @ignore_unicode_prefix
 def __init__(self, sparkContext, jsparkSession=None):
@@ -214,8 +214,12 @@ class SparkSession(object):
 self._wrapped = SQLContext(self._sc, self, self._jwrapped)
 _monkey_patch_RDD(self)
 install_exception_handler()
-if SparkSession._instantiatedContext is None:
-SparkSession._instantiatedContext = self
+# If we had an instantiated SparkSession attached with a SparkContext
+# which is stopped now, we need to renew the instantiated SparkSession.
+# Otherwise, we will use invalid SparkSession when we call 
Builder.getOrCreate.
+if SparkSession._instantiatedSession is None \
+or SparkSession._instantiatedSession._sc._jsc is None:
+SparkSession._instantiatedSession = self
 
 @since(2.0)
 def newSession(self):
@@ -595,7 +599,7 @@ class SparkSession(object):
 """Stop the underlying :class:`SparkContext`.
 """
 self._sc.stop()
-SparkSession._instantiatedContext = None
+SparkSession._instantiatedSession = None
 
 @since(2.0)
 def __enter__(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/c6c37b8a/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 18fd68e..d178285 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -46,6 +46,7 @@ if sys.version_info[:2] <= (2, 6):
 else:
 import unittest
 
+from pyspark import SparkContext
 from pyspark.sql import SparkSession, HiveContext, Column, Row
 from pyspark.sql.types import *
 from pyspark.sql.types import UserDefinedType, _infer_type
@@ -1886,6 +1887,28 @@ class HiveSparkSubmitTests(SparkSubmitTests):
 self.assertTrue(os.path.exists(metastore_path))
 
 
+class SQLTests2(ReusedPySparkTestCase):
+
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+

spark git commit: [SPARK-18969][SQL] Support grouping by nondeterministic expressions

2017-01-12 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 c94288b57 -> 3566e40a4


[SPARK-18969][SQL] Support grouping by nondeterministic expressions

## What changes were proposed in this pull request?

Currently nondeterministic expressions are allowed in `Aggregate`(see the 
[comment](https://github.com/apache/spark/blob/v2.0.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L249-L251)),
 but the `PullOutNondeterministic` analyzer rule failed to handle `Aggregate`, 
this PR fixes it.

close https://github.com/apache/spark/pull/16379

There is still one remaining issue: `SELECT a + rand() FROM t GROUP BY a + 
rand()` is not allowed, because the 2 `rand()` are different(we generate random 
seed as the default seed for `rand()`). 
https://issues.apache.org/jira/browse/SPARK-19035 is tracking this issue.

## How was this patch tested?

a new test suite

Author: Wenchen Fan 

Closes #16404 from cloud-fan/groupby.

(cherry picked from commit 871d266649ddfed38c64dfda7158d8bb58d4b979)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3566e40a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3566e40a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3566e40a

Branch: refs/heads/branch-2.0
Commit: 3566e40a4ce319e095780062abf94154b4aba334
Parents: c94288b
Author: Wenchen Fan 
Authored: Thu Jan 12 20:21:04 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jan 12 20:25:44 2017 +0800

--
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 37 -
 .../analysis/PullOutNondeterministicSuite.scala | 56 
 .../sql-tests/results/group-by-ordinal.sql.out  | 10 ++--
 3 files changed, 86 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3566e40a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 32dc70a..9040ced 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1789,28 +1789,37 @@ class Analyzer(
   case p: Project => p
   case f: Filter => f
 
+  case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) =>
+val nondeterToAttr = getNondeterToAttr(a.groupingExpressions)
+val newChild = Project(a.child.output ++ nondeterToAttr.values, 
a.child)
+a.transformExpressions { case e =>
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
+}.copy(child = newChild)
+
   // todo: It's hard to write a general rule to pull out nondeterministic 
expressions
   // from LogicalPlan, currently we only do it for UnaryNode which has 
same output
   // schema with its child.
   case p: UnaryNode if p.output == p.child.output && 
p.expressions.exists(!_.deterministic) =>
-val nondeterministicExprs = 
p.expressions.filterNot(_.deterministic).flatMap { expr =>
-  val leafNondeterministic = expr.collect {
-case n: Nondeterministic => n
-  }
-  leafNondeterministic.map { e =>
-val ne = e match {
-  case n: NamedExpression => n
-  case _ => Alias(e, "_nondeterministic")(isGenerated = true)
-}
-new TreeNodeRef(e) -> ne
-  }
-}.toMap
+val nondeterToAttr = getNondeterToAttr(p.expressions)
 val newPlan = p.transformExpressions { case e =>
-  nondeterministicExprs.get(new 
TreeNodeRef(e)).map(_.toAttribute).getOrElse(e)
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
 }
-val newChild = Project(p.child.output ++ nondeterministicExprs.values, 
p.child)
+val newChild = Project(p.child.output ++ nondeterToAttr.values, 
p.child)
 Project(p.output, newPlan.withNewChildren(newChild :: Nil))
 }
+
+private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, 
NamedExpression] = {
+  exprs.filterNot(_.deterministic).flatMap { expr =>
+val leafNondeterministic = expr.collect { case n: Nondeterministic => 
n }
+leafNondeterministic.distinct.map { e =>
+  val ne = e match {
+case n: NamedExpression => n
+case _ => Alias(e, "_nondeterministic")(isGenerated = true)
+  }
+  e -> ne
+}
+  }.toMap
+}
   }
 
   /**


spark git commit: [SPARK-18969][SQL] Support grouping by nondeterministic expressions

2017-01-12 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 9b9867ef5 -> 616a78a56


[SPARK-18969][SQL] Support grouping by nondeterministic expressions

## What changes were proposed in this pull request?

Currently nondeterministic expressions are allowed in `Aggregate`(see the 
[comment](https://github.com/apache/spark/blob/v2.0.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L249-L251)),
 but the `PullOutNondeterministic` analyzer rule failed to handle `Aggregate`, 
this PR fixes it.

close https://github.com/apache/spark/pull/16379

There is still one remaining issue: `SELECT a + rand() FROM t GROUP BY a + 
rand()` is not allowed, because the 2 `rand()` are different(we generate random 
seed as the default seed for `rand()`). 
https://issues.apache.org/jira/browse/SPARK-19035 is tracking this issue.

## How was this patch tested?

a new test suite

Author: Wenchen Fan 

Closes #16404 from cloud-fan/groupby.

(cherry picked from commit 871d266649ddfed38c64dfda7158d8bb58d4b979)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/616a78a5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/616a78a5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/616a78a5

Branch: refs/heads/branch-2.1
Commit: 616a78a56cc911953e3133e60ab8c5a4fc287539
Parents: 9b9867e
Author: Wenchen Fan 
Authored: Thu Jan 12 20:21:04 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jan 12 20:24:23 2017 +0800

--
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 37 -
 .../analysis/PullOutNondeterministicSuite.scala | 56 
 .../sql-tests/results/group-by-ordinal.sql.out  | 10 ++--
 3 files changed, 86 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/616a78a5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index f17c372..ab9de02 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1859,28 +1859,37 @@ class Analyzer(
   case p: Project => p
   case f: Filter => f
 
+  case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) =>
+val nondeterToAttr = getNondeterToAttr(a.groupingExpressions)
+val newChild = Project(a.child.output ++ nondeterToAttr.values, 
a.child)
+a.transformExpressions { case e =>
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
+}.copy(child = newChild)
+
   // todo: It's hard to write a general rule to pull out nondeterministic 
expressions
   // from LogicalPlan, currently we only do it for UnaryNode which has 
same output
   // schema with its child.
   case p: UnaryNode if p.output == p.child.output && 
p.expressions.exists(!_.deterministic) =>
-val nondeterministicExprs = 
p.expressions.filterNot(_.deterministic).flatMap { expr =>
-  val leafNondeterministic = expr.collect {
-case n: Nondeterministic => n
-  }
-  leafNondeterministic.map { e =>
-val ne = e match {
-  case n: NamedExpression => n
-  case _ => Alias(e, "_nondeterministic")(isGenerated = true)
-}
-new TreeNodeRef(e) -> ne
-  }
-}.toMap
+val nondeterToAttr = getNondeterToAttr(p.expressions)
 val newPlan = p.transformExpressions { case e =>
-  nondeterministicExprs.get(new 
TreeNodeRef(e)).map(_.toAttribute).getOrElse(e)
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
 }
-val newChild = Project(p.child.output ++ nondeterministicExprs.values, 
p.child)
+val newChild = Project(p.child.output ++ nondeterToAttr.values, 
p.child)
 Project(p.output, newPlan.withNewChildren(newChild :: Nil))
 }
+
+private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, 
NamedExpression] = {
+  exprs.filterNot(_.deterministic).flatMap { expr =>
+val leafNondeterministic = expr.collect { case n: Nondeterministic => 
n }
+leafNondeterministic.distinct.map { e =>
+  val ne = e match {
+case n: NamedExpression => n
+case _ => Alias(e, "_nondeterministic")(isGenerated = true)
+  }
+  e -> ne
+}
+  }.toMap
+}
   }
 
   /**


spark git commit: [SPARK-18969][SQL] Support grouping by nondeterministic expressions

2017-01-12 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master c71b25481 -> 871d26664


[SPARK-18969][SQL] Support grouping by nondeterministic expressions

## What changes were proposed in this pull request?

Currently nondeterministic expressions are allowed in `Aggregate`(see the 
[comment](https://github.com/apache/spark/blob/v2.0.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L249-L251)),
 but the `PullOutNondeterministic` analyzer rule failed to handle `Aggregate`, 
this PR fixes it.

close https://github.com/apache/spark/pull/16379

There is still one remaining issue: `SELECT a + rand() FROM t GROUP BY a + 
rand()` is not allowed, because the 2 `rand()` are different(we generate random 
seed as the default seed for `rand()`). 
https://issues.apache.org/jira/browse/SPARK-19035 is tracking this issue.

## How was this patch tested?

a new test suite

Author: Wenchen Fan 

Closes #16404 from cloud-fan/groupby.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/871d2666
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/871d2666
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/871d2666

Branch: refs/heads/master
Commit: 871d266649ddfed38c64dfda7158d8bb58d4b979
Parents: c71b254
Author: Wenchen Fan 
Authored: Thu Jan 12 20:21:04 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jan 12 20:21:04 2017 +0800

--
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 37 -
 .../analysis/PullOutNondeterministicSuite.scala | 56 
 .../sql-tests/results/group-by-ordinal.sql.out  | 10 ++--
 3 files changed, 86 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/871d2666/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d461531..3c58832 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2008,28 +2008,37 @@ class Analyzer(
   case p: Project => p
   case f: Filter => f
 
+  case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) =>
+val nondeterToAttr = getNondeterToAttr(a.groupingExpressions)
+val newChild = Project(a.child.output ++ nondeterToAttr.values, 
a.child)
+a.transformExpressions { case e =>
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
+}.copy(child = newChild)
+
   // todo: It's hard to write a general rule to pull out nondeterministic 
expressions
   // from LogicalPlan, currently we only do it for UnaryNode which has 
same output
   // schema with its child.
   case p: UnaryNode if p.output == p.child.output && 
p.expressions.exists(!_.deterministic) =>
-val nondeterministicExprs = 
p.expressions.filterNot(_.deterministic).flatMap { expr =>
-  val leafNondeterministic = expr.collect {
-case n: Nondeterministic => n
-  }
-  leafNondeterministic.map { e =>
-val ne = e match {
-  case n: NamedExpression => n
-  case _ => Alias(e, "_nondeterministic")(isGenerated = true)
-}
-new TreeNodeRef(e) -> ne
-  }
-}.toMap
+val nondeterToAttr = getNondeterToAttr(p.expressions)
 val newPlan = p.transformExpressions { case e =>
-  nondeterministicExprs.get(new 
TreeNodeRef(e)).map(_.toAttribute).getOrElse(e)
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
 }
-val newChild = Project(p.child.output ++ nondeterministicExprs.values, 
p.child)
+val newChild = Project(p.child.output ++ nondeterToAttr.values, 
p.child)
 Project(p.output, newPlan.withNewChildren(newChild :: Nil))
 }
+
+private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, 
NamedExpression] = {
+  exprs.filterNot(_.deterministic).flatMap { expr =>
+val leafNondeterministic = expr.collect { case n: Nondeterministic => 
n }
+leafNondeterministic.distinct.map { e =>
+  val ne = e match {
+case n: NamedExpression => n
+case _ => Alias(e, "_nondeterministic")(isGenerated = true)
+  }
+  e -> ne
+}
+  }.toMap
+}
   }
 
   /**


spark git commit: [SPARK-18857][SQL] Don't use `Iterator.duplicate` for `incrementalCollect` in Thrift Server

2017-01-12 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 ec2fe925c -> c94288b57


[SPARK-18857][SQL] Don't use `Iterator.duplicate` for `incrementalCollect` in 
Thrift Server

## What changes were proposed in this pull request?

To support `FETCH_FIRST`, SPARK-16563 used Scala `Iterator.duplicate`. However,
Scala `Iterator.duplicate` uses a **queue to buffer all items between both 
iterators**,
this causes GC and hangs for queries with large number of rows. We should not 
use this,
especially for `spark.sql.thriftServer.incrementalCollect`.

https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/Iterator.scala#L1262-L1300

## How was this patch tested?

Pass the existing tests.

Author: Dongjoon Hyun 

Closes #16440 from dongjoon-hyun/SPARK-18857.

(cherry picked from commit a2c6adcc5d2702d2f0e9b239517353335e5f911e)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c94288b5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c94288b5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c94288b5

Branch: refs/heads/branch-2.0
Commit: c94288b57b5ce2232e58e35cada558d8d5b8ec6e
Parents: ec2fe92
Author: Dongjoon Hyun 
Authored: Tue Jan 10 13:27:55 2017 +
Committer: Sean Owen 
Committed: Thu Jan 12 10:45:26 2017 +

--
 .../org/apache/spark/sql/internal/SQLConf.scala |  7 +
 .../SparkExecuteStatementOperation.scala| 30 +---
 2 files changed, 26 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c94288b5/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 7598d47..edec6f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -295,6 +295,13 @@ object SQLConf {
 .stringConf
 .createOptional
 
+  val THRIFTSERVER_INCREMENTAL_COLLECT =
+SQLConfigBuilder("spark.sql.thriftServer.incrementalCollect")
+  .internal()
+  .doc("When true, enable incremental collection for execution in Thrift 
Server.")
+  .booleanConf
+  .createWithDefault(false)
+
   val THRIFTSERVER_UI_STATEMENT_LIMIT =
 SQLConfigBuilder("spark.sql.thriftserver.ui.retainedStatements")
   .doc("The number of SQL statements kept in the JDBC/ODBC web UI 
history.")

http://git-wip-us.apache.org/repos/asf/spark/blob/c94288b5/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
--
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 8a78523..a95170e 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -50,8 +50,13 @@ private[hive] class SparkExecuteStatementOperation(
   with Logging {
 
   private var result: DataFrame = _
+
+  // We cache the returned rows to get iterators again in case the user wants 
to use FETCH_FIRST.
+  // This is only used when `spark.sql.thriftServer.incrementalCollect` is set 
to `false`.
+  // In case of `true`, this will be `None` and FETCH_FIRST will trigger 
re-execution.
+  private var resultList: Option[Array[SparkRow]] = _
+
   private var iter: Iterator[SparkRow] = _
-  private var iterHeader: Iterator[SparkRow] = _
   private var dataTypes: Array[DataType] = _
   private var statementId: String = _
 
@@ -111,9 +116,15 @@ private[hive] class SparkExecuteStatementOperation(
 
 // Reset iter to header when fetching start from first row
 if (order.equals(FetchOrientation.FETCH_FIRST)) {
-  val (ita, itb) = iterHeader.duplicate
-  iter = ita
-  iterHeader = itb
+  iter = if 
(sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) {
+resultList = None
+result.toLocalIterator.asScala
+  } else {
+if (resultList.isEmpty) {
+  resultList = Some(result.collect())
+}
+resultList.get.iterator
+  }
 }
 
 if (!iter.hasNext) {
@@ -226,17 +237,14 @@ private[hive] class SparkExecuteStatementOperation(
   }
   

spark git commit: [SPARK-18857][SQL] Don't use `Iterator.duplicate` for `incrementalCollect` in Thrift Server

2017-01-12 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 0b07634b5 -> 9b9867ef5


[SPARK-18857][SQL] Don't use `Iterator.duplicate` for `incrementalCollect` in 
Thrift Server

## What changes were proposed in this pull request?

To support `FETCH_FIRST`, SPARK-16563 used Scala `Iterator.duplicate`. However,
Scala `Iterator.duplicate` uses a **queue to buffer all items between both 
iterators**,
this causes GC and hangs for queries with large number of rows. We should not 
use this,
especially for `spark.sql.thriftServer.incrementalCollect`.

https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/Iterator.scala#L1262-L1300

## How was this patch tested?

Pass the existing tests.

Author: Dongjoon Hyun 

Closes #16440 from dongjoon-hyun/SPARK-18857.

(cherry picked from commit a2c6adcc5d2702d2f0e9b239517353335e5f911e)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b9867ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b9867ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b9867ef

Branch: refs/heads/branch-2.1
Commit: 9b9867ef5b64b05f1e968de1fc0bfc1fcc64a707
Parents: 0b07634
Author: Dongjoon Hyun 
Authored: Tue Jan 10 13:27:55 2017 +
Committer: Sean Owen 
Committed: Thu Jan 12 10:45:10 2017 +

--
 .../org/apache/spark/sql/internal/SQLConf.scala |  7 +
 .../SparkExecuteStatementOperation.scala| 30 +---
 2 files changed, 26 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9b9867ef/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 8fbad60..8d493e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -309,6 +309,13 @@ object SQLConf {
 .stringConf
 .createOptional
 
+  val THRIFTSERVER_INCREMENTAL_COLLECT =
+SQLConfigBuilder("spark.sql.thriftServer.incrementalCollect")
+  .internal()
+  .doc("When true, enable incremental collection for execution in Thrift 
Server.")
+  .booleanConf
+  .createWithDefault(false)
+
   val THRIFTSERVER_UI_STATEMENT_LIMIT =
 SQLConfigBuilder("spark.sql.thriftserver.ui.retainedStatements")
   .doc("The number of SQL statements kept in the JDBC/ODBC web UI 
history.")

http://git-wip-us.apache.org/repos/asf/spark/blob/9b9867ef/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
--
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index aeabd6a..517b01f 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -50,8 +50,13 @@ private[hive] class SparkExecuteStatementOperation(
   with Logging {
 
   private var result: DataFrame = _
+
+  // We cache the returned rows to get iterators again in case the user wants 
to use FETCH_FIRST.
+  // This is only used when `spark.sql.thriftServer.incrementalCollect` is set 
to `false`.
+  // In case of `true`, this will be `None` and FETCH_FIRST will trigger 
re-execution.
+  private var resultList: Option[Array[SparkRow]] = _
+
   private var iter: Iterator[SparkRow] = _
-  private var iterHeader: Iterator[SparkRow] = _
   private var dataTypes: Array[DataType] = _
   private var statementId: String = _
 
@@ -111,9 +116,15 @@ private[hive] class SparkExecuteStatementOperation(
 
 // Reset iter to header when fetching start from first row
 if (order.equals(FetchOrientation.FETCH_FIRST)) {
-  val (ita, itb) = iterHeader.duplicate
-  iter = ita
-  iterHeader = itb
+  iter = if 
(sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) {
+resultList = None
+result.toLocalIterator.asScala
+  } else {
+if (resultList.isEmpty) {
+  resultList = Some(result.collect())
+}
+resultList.get.iterator
+  }
 }
 
 if (!iter.hasNext) {
@@ -227,17 +238,14 @@ private[hive] class SparkExecuteStatementOperation(
   }
   

spark git commit: [SPARK-19183][SQL] Add deleteWithJob hook to internal commit protocol API

2017-01-12 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 5db35b312 -> c71b25481


[SPARK-19183][SQL] Add deleteWithJob hook to internal commit protocol API

## What changes were proposed in this pull request?

Currently in SQL we implement overwrites by calling fs.delete() directly on the 
original data. This is not ideal since we the original files end up deleted 
even if the job aborts. We should extend the commit protocol to allow file 
overwrites to be managed as well.

## How was this patch tested?

Existing tests. I also fixed a bunch of tests that were depending on the commit 
protocol implementation being set to the legacy mapreduce one.

cc rxin cloud-fan

Author: Eric Liang 
Author: Eric Liang 

Closes #16554 from ericl/add-delete-protocol.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c71b2548
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c71b2548
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c71b2548

Branch: refs/heads/master
Commit: c71b25481aa5f7bc27d5c979e66bed54cd46b97e
Parents: 5db35b3
Author: Eric Liang 
Authored: Thu Jan 12 17:45:55 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jan 12 17:45:55 2017 +0800

--
 .../spark/internal/io/FileCommitProtocol.scala  |   9 ++
 .../InsertIntoHadoopFsRelationCommand.scala |  25 ++--
 .../datasources/HadoopFsRelationSuite.scala |   2 +-
 .../datasources/parquet/ParquetIOSuite.scala| 122 ++-
 .../ParquetPartitionDiscoverySuite.scala|   9 +-
 .../datasources/parquet/ParquetQuerySuite.scala |   5 +
 .../sql/hive/orc/OrcHadoopFsRelationSuite.scala |   4 +-
 .../sql/sources/HadoopFsRelationTest.scala  |  77 ++--
 .../sources/ParquetHadoopFsRelationSuite.scala  |   6 +-
 9 files changed, 149 insertions(+), 110 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala 
b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
index afd2250..2394cf3 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.internal.io
 
+import org.apache.hadoop.fs._
 import org.apache.hadoop.mapreduce._
 
 import org.apache.spark.util.Utils
@@ -112,6 +113,14 @@ abstract class FileCommitProtocol {
* just crashes (or killed) before it can call abort.
*/
   def abortTask(taskContext: TaskAttemptContext): Unit
+
+  /**
+   * Specifies that a file should be deleted with the commit of this job. The 
default
+   * implementation deletes the file immediately.
+   */
+  def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean): Boolean = 
{
+fs.delete(path, recursive)
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 423009e..652bcc8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -88,11 +88,20 @@ case class InsertIntoHadoopFsRelationCommand(
 }
 
 val pathExists = fs.exists(qualifiedOutputPath)
+// If we are appending data to an existing dir.
+val isAppend = pathExists && (mode == SaveMode.Append)
+
+val committer = FileCommitProtocol.instantiate(
+  sparkSession.sessionState.conf.fileCommitProtocolClass,
+  jobId = java.util.UUID.randomUUID().toString,
+  outputPath = outputPath.toString,
+  isAppend = isAppend)
+
 val doInsertion = (mode, pathExists) match {
   case (SaveMode.ErrorIfExists, true) =>
 throw new AnalysisException(s"path $qualifiedOutputPath already 
exists.")
   case (SaveMode.Overwrite, true) =>
-deleteMatchingPartitions(fs, qualifiedOutputPath, 
customPartitionLocations)
+deleteMatchingPartitions(fs, qualifiedOutputPath, 
customPartitionLocations, committer)
 true
   case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | 
(SaveMode.ErrorIfExists, false) =>
 

spark git commit: [SPARK-19164][PYTHON][SQL] Remove unused UserDefinedFunction._broadcast

2017-01-12 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 2c586f506 -> 5db35b312


[SPARK-19164][PYTHON][SQL] Remove unused UserDefinedFunction._broadcast

## What changes were proposed in this pull request?

Removes `UserDefinedFunction._broadcast` and `UserDefinedFunction.__del__` 
method.

## How was this patch tested?

Existing unit tests.

Author: zero323 

Closes #16538 from zero323/SPARK-19164.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5db35b31
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5db35b31
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5db35b31

Branch: refs/heads/master
Commit: 5db35b312e96dea07f03100c64b58723c2430cd7
Parents: 2c586f5
Author: zero323 
Authored: Thu Jan 12 01:05:02 2017 -0800
Committer: Reynold Xin 
Committed: Thu Jan 12 01:05:02 2017 -0800

--
 python/pyspark/sql/functions.py | 6 --
 1 file changed, 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5db35b31/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 7fe901a..66d993a 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1826,7 +1826,6 @@ class UserDefinedFunction(object):
 def __init__(self, func, returnType, name=None):
 self.func = func
 self.returnType = returnType
-self._broadcast = None
 self._judf = self._create_judf(name)
 
 def _create_judf(self, name):
@@ -1842,11 +1841,6 @@ class UserDefinedFunction(object):
 name, wrapped_func, jdt)
 return judf
 
-def __del__(self):
-if self._broadcast is not None:
-self._broadcast.unpersist()
-self._broadcast = None
-
 def __call__(self, *cols):
 sc = SparkContext._active_spark_context
 jc = self._judf.apply(_to_seq(sc, cols, _to_java_column))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19158][SPARKR][EXAMPLES] Fix ml.R example fails due to lack of e1071 package.

2017-01-12 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 82fcc1330 -> 0b07634b5


[SPARK-19158][SPARKR][EXAMPLES] Fix ml.R example fails due to lack of e1071 
package.

## What changes were proposed in this pull request?
```ml.R``` example depends on ```e1071``` package, if it's not available in 
users' environment, it will fail. I think the example should not depends on 
third-party packages, so I update it to remove the dependency.

## How was this patch tested?
Manual test.

Author: Yanbo Liang 

Closes #16548 from yanboliang/spark-19158.

(cherry picked from commit 2c586f506de9e2ba592afae1f0c73b6ae631bb96)
Signed-off-by: Yanbo Liang 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b07634b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b07634b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b07634b

Branch: refs/heads/branch-2.1
Commit: 0b07634b5e06cc9030f20e277ec5956efff6c3fa
Parents: 82fcc13
Author: Yanbo Liang 
Authored: Thu Jan 12 00:58:30 2017 -0800
Committer: Yanbo Liang 
Committed: Thu Jan 12 00:58:49 2017 -0800

--
 examples/src/main/r/ml/ml.R | 15 +++
 1 file changed, 7 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0b07634b/examples/src/main/r/ml/ml.R
--
diff --git a/examples/src/main/r/ml/ml.R b/examples/src/main/r/ml/ml.R
index d601590..05f5199 100644
--- a/examples/src/main/r/ml/ml.R
+++ b/examples/src/main/r/ml/ml.R
@@ -49,17 +49,16 @@ unlink(modelPath)
 
  fit models with spark.lapply 
#
 # Perform distributed training of multiple models with spark.lapply
-costs <- exp(seq(from = log(1), to = log(1000), length.out = 5))
-train <- function(cost) {
-  stopifnot(requireNamespace("e1071", quietly = TRUE))
-  model <- e1071::svm(Species ~ ., data = iris, cost = cost)
-  summary(model)
+algorithms <- c("Hartigan-Wong", "Lloyd", "MacQueen")
+train <- function(algorithm) {
+  model <- kmeans(x = iris[1:4], centers = 3, algorithm = algorithm)
+  model$withinss
 }
 
-model.summaries <- spark.lapply(costs, train)
+model.withinss <- spark.lapply(algorithms, train)
 
-# Print the summary of each model
-print(model.summaries)
+# Print the within-cluster sum of squares for each model
+print(model.withinss)
 
 # Stop the SparkSession now
 sparkR.session.stop()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19158][SPARKR][EXAMPLES] Fix ml.R example fails due to lack of e1071 package.

2017-01-12 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master 24100f162 -> 2c586f506


[SPARK-19158][SPARKR][EXAMPLES] Fix ml.R example fails due to lack of e1071 
package.

## What changes were proposed in this pull request?
```ml.R``` example depends on ```e1071``` package, if it's not available in 
users' environment, it will fail. I think the example should not depends on 
third-party packages, so I update it to remove the dependency.

## How was this patch tested?
Manual test.

Author: Yanbo Liang 

Closes #16548 from yanboliang/spark-19158.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c586f50
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c586f50
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c586f50

Branch: refs/heads/master
Commit: 2c586f506de9e2ba592afae1f0c73b6ae631bb96
Parents: 24100f1
Author: Yanbo Liang 
Authored: Thu Jan 12 00:58:30 2017 -0800
Committer: Yanbo Liang 
Committed: Thu Jan 12 00:58:30 2017 -0800

--
 examples/src/main/r/ml/ml.R | 15 +++
 1 file changed, 7 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2c586f50/examples/src/main/r/ml/ml.R
--
diff --git a/examples/src/main/r/ml/ml.R b/examples/src/main/r/ml/ml.R
index d601590..05f5199 100644
--- a/examples/src/main/r/ml/ml.R
+++ b/examples/src/main/r/ml/ml.R
@@ -49,17 +49,16 @@ unlink(modelPath)
 
  fit models with spark.lapply 
#
 # Perform distributed training of multiple models with spark.lapply
-costs <- exp(seq(from = log(1), to = log(1000), length.out = 5))
-train <- function(cost) {
-  stopifnot(requireNamespace("e1071", quietly = TRUE))
-  model <- e1071::svm(Species ~ ., data = iris, cost = cost)
-  summary(model)
+algorithms <- c("Hartigan-Wong", "Lloyd", "MacQueen")
+train <- function(algorithm) {
+  model <- kmeans(x = iris[1:4], centers = 3, algorithm = algorithm)
+  model$withinss
 }
 
-model.summaries <- spark.lapply(costs, train)
+model.withinss <- spark.lapply(algorithms, train)
 
-# Print the summary of each model
-print(model.summaries)
+# Print the within-cluster sum of squares for each model
+print(model.withinss)
 
 # Stop the SparkSession now
 sparkR.session.stop()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org