spark git commit: Fix missing close-parens for In filter's toString
Repository: spark Updated Branches: refs/heads/master 6b34e745b -> b040cef2e Fix missing close-parens for In filter's toString Otherwise the open parentheses isn't closed in query plan descriptions of batch scans. PushedFilters: [In(COL_A, [1,2,4,6,10,16,219,815], IsNotNull(COL_B), ... Author: Andrew AshCloses #16558 from ash211/patch-9. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b040cef2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b040cef2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b040cef2 Branch: refs/heads/master Commit: b040cef2ed0ed46c3dfb483a117200c9dac074ca Parents: 6b34e74 Author: Andrew Ash Authored: Thu Jan 12 23:14:07 2017 -0800 Committer: Reynold Xin Committed: Thu Jan 12 23:14:07 2017 -0800 -- sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b040cef2/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala index e0494df..2499e9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala @@ -130,7 +130,7 @@ case class In(attribute: String, values: Array[Any]) extends Filter { case _ => false } override def toString: String = { -s"In($attribute, [${values.mkString(",")}]" +s"In($attribute, [${values.mkString(",")}])" } override def references: Array[String] = Array(attribute) ++ values.flatMap(findReferences) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Fix missing close-parens for In filter's toString
Repository: spark Updated Branches: refs/heads/branch-2.0 55d2a1178 -> be527ddc0 Fix missing close-parens for In filter's toString Otherwise the open parentheses isn't closed in query plan descriptions of batch scans. PushedFilters: [In(COL_A, [1,2,4,6,10,16,219,815], IsNotNull(COL_B), ... Author: Andrew AshCloses #16558 from ash211/patch-9. (cherry picked from commit b040cef2ed0ed46c3dfb483a117200c9dac074ca) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be527ddc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be527ddc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be527ddc Branch: refs/heads/branch-2.0 Commit: be527ddc07be009ad6aa4f8561c7f8406468a8dc Parents: 55d2a11 Author: Andrew Ash Authored: Thu Jan 12 23:14:07 2017 -0800 Committer: Reynold Xin Committed: Thu Jan 12 23:14:25 2017 -0800 -- sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/be527ddc/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala index 9130e77..f7e4f93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala @@ -97,7 +97,7 @@ case class In(attribute: String, values: Array[Any]) extends Filter { case _ => false } override def toString: String = { -s"In($attribute, [${values.mkString(",")}]" +s"In($attribute, [${values.mkString(",")}])" } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Fix missing close-parens for In filter's toString
Repository: spark Updated Branches: refs/heads/branch-2.1 23944d0d6 -> 0668e061b Fix missing close-parens for In filter's toString Otherwise the open parentheses isn't closed in query plan descriptions of batch scans. PushedFilters: [In(COL_A, [1,2,4,6,10,16,219,815], IsNotNull(COL_B), ... Author: Andrew AshCloses #16558 from ash211/patch-9. (cherry picked from commit b040cef2ed0ed46c3dfb483a117200c9dac074ca) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0668e061 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0668e061 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0668e061 Branch: refs/heads/branch-2.1 Commit: 0668e061beba683d026a2d48011ff74faf8a38ab Parents: 23944d0 Author: Andrew Ash Authored: Thu Jan 12 23:14:07 2017 -0800 Committer: Reynold Xin Committed: Thu Jan 12 23:14:15 2017 -0800 -- sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0668e061/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala index e0494df..2499e9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala @@ -130,7 +130,7 @@ case class In(attribute: String, values: Array[Any]) extends Filter { case _ => false } override def toString: String = { -s"In($attribute, [${values.mkString(",")}]" +s"In($attribute, [${values.mkString(",")}])" } override def references: Array[String] = Array(attribute) ++ values.flatMap(findReferences) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19178][SQL] convert string of large numbers to int should return null
Repository: spark Updated Branches: refs/heads/master 7f24a0b6c -> 6b34e745b [SPARK-19178][SQL] convert string of large numbers to int should return null ## What changes were proposed in this pull request? When we convert a string to integral, we will convert that string to `decimal(20, 0)` first, so that we can turn a string with decimal format to truncated integral, e.g. `CAST('1.2' AS int)` will return `1`. However, this brings problems when we convert a string with large numbers to integral, e.g. `CAST('1234567890123' AS int)` will return `1912276171`, while Hive returns null as we expected. This is a long standing bug(seems it was there the first day Spark SQL was created), this PR fixes this bug by adding the native support to convert `UTF8String` to integral. ## How was this patch tested? new regression tests Author: Wenchen FanCloses #16550 from cloud-fan/string-to-int. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b34e745 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b34e745 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b34e745 Branch: refs/heads/master Commit: 6b34e745bb8bdcf5a8bb78359fa39bbe8c6563cc Parents: 7f24a0b Author: Wenchen Fan Authored: Thu Jan 12 22:52:34 2017 -0800 Committer: gatorsmile Committed: Thu Jan 12 22:52:34 2017 -0800 -- .../apache/spark/unsafe/types/UTF8String.java | 184 +++ .../sql/catalyst/analysis/TypeCoercion.scala| 16 -- .../spark/sql/catalyst/expressions/Cast.scala | 18 +- .../test/resources/sql-tests/inputs/cast.sql| 43 + .../resources/sql-tests/results/cast.sql.out| 178 ++ 5 files changed, 414 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6b34e745/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java -- diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java index 0255f53..3800d53 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java @@ -835,6 +835,190 @@ public final class UTF8String implements Comparable, Externalizable, return fromString(sb.toString()); } + private int getDigit(byte b) { +if (b >= '0' && b <= '9') { + return b - '0'; +} +throw new NumberFormatException(toString()); + } + + /** + * Parses this UTF8String to long. + * + * Note that, in this method we accumulate the result in negative format, and convert it to + * positive format at the end, if this string is not started with '-'. This is because min value + * is bigger than max value in digits, e.g. Integer.MAX_VALUE is '2147483647' and + * Integer.MIN_VALUE is '-2147483648'. + * + * This code is mostly copied from LazyLong.parseLong in Hive. + */ + public long toLong() { +if (numBytes == 0) { + throw new NumberFormatException("Empty string"); +} + +byte b = getByte(0); +final boolean negative = b == '-'; +int offset = 0; +if (negative || b == '+') { + offset++; + if (numBytes == 1) { +throw new NumberFormatException(toString()); + } +} + +final byte separator = '.'; +final int radix = 10; +final long stopValue = Long.MIN_VALUE / radix; +long result = 0; + +while (offset < numBytes) { + b = getByte(offset); + offset++; + if (b == separator) { +// We allow decimals and will return a truncated integral in that case. +// Therefore we won't throw an exception here (checking the fractional +// part happens below.) +break; + } + + int digit = getDigit(b); + // We are going to process the new digit and accumulate the result. However, before doing + // this, if the result is already smaller than the stopValue(Long.MIN_VALUE / radix), then + // result * 10 will definitely be smaller than minValue, and we can stop and throw exception. + if (result < stopValue) { +throw new NumberFormatException(toString()); + } + + result = result * radix - digit; + // Since the previous result is less than or equal to stopValue(Long.MIN_VALUE / radix), we + // can just use `result > 0` to check overflow. If result overflows, we should stop and throw + // exception. + if (result > 0) { +throw new NumberFormatException(toString()); + } +} + +// This is the case when we've encountered a decimal separator. The fractional +// part
spark git commit: [SPARK-19142][SPARKR] spark.kmeans should take seed, initSteps, and tol as parameters
Repository: spark Updated Branches: refs/heads/master 3356b8b6a -> 7f24a0b6c [SPARK-19142][SPARKR] spark.kmeans should take seed, initSteps, and tol as parameters ## What changes were proposed in this pull request? spark.kmeans doesn't have interface to set initSteps, seed and tol. As Spark Kmeans algorithm doesn't take the same set of parameters as R kmeans, we should maintain a different interface in spark.kmeans. Add missing parameters and corresponding document. Modified existing unit tests to take additional parameters. Author: wm...@hotmail.comCloses #16523 from wangmiao1981/kmeans. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f24a0b6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f24a0b6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f24a0b6 Branch: refs/heads/master Commit: 7f24a0b6c32c56a38cf879d953bbd523922ab9c9 Parents: 3356b8b Author: wm...@hotmail.com Authored: Thu Jan 12 22:27:57 2017 -0800 Committer: Yanbo Liang Committed: Thu Jan 12 22:27:57 2017 -0800 -- R/pkg/R/mllib_clustering.R | 13 +++-- .../inst/tests/testthat/test_mllib_clustering.R | 20 .../org/apache/spark/ml/r/KMeansWrapper.scala | 9 - 3 files changed, 39 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7f24a0b6/R/pkg/R/mllib_clustering.R -- diff --git a/R/pkg/R/mllib_clustering.R b/R/pkg/R/mllib_clustering.R index c443588..ca5182d 100644 --- a/R/pkg/R/mllib_clustering.R +++ b/R/pkg/R/mllib_clustering.R @@ -175,6 +175,10 @@ setMethod("write.ml", signature(object = "GaussianMixtureModel", path = "charact #' @param k number of centers. #' @param maxIter maximum iteration number. #' @param initMode the initialization algorithm choosen to fit the model. +#' @param seed the random seed for cluster initialization. +#' @param initSteps the number of steps for the k-means|| initialization mode. +#' This is an advanced setting, the default of 2 is almost always enough. Must be > 0. +#' @param tol convergence tolerance of iterations. #' @param ... additional argument(s) passed to the method. #' @return \code{spark.kmeans} returns a fitted k-means model. #' @rdname spark.kmeans @@ -204,11 +208,16 @@ setMethod("write.ml", signature(object = "GaussianMixtureModel", path = "charact #' @note spark.kmeans since 2.0.0 #' @seealso \link{predict}, \link{read.ml}, \link{write.ml} setMethod("spark.kmeans", signature(data = "SparkDataFrame", formula = "formula"), - function(data, formula, k = 2, maxIter = 20, initMode = c("k-means||", "random")) { + function(data, formula, k = 2, maxIter = 20, initMode = c("k-means||", "random"), + seed = NULL, initSteps = 2, tol = 1E-4) { formula <- paste(deparse(formula), collapse = "") initMode <- match.arg(initMode) +if (!is.null(seed)) { + seed <- as.character(as.integer(seed)) +} jobj <- callJStatic("org.apache.spark.ml.r.KMeansWrapper", "fit", data@sdf, formula, -as.integer(k), as.integer(maxIter), initMode) +as.integer(k), as.integer(maxIter), initMode, seed, +as.integer(initSteps), as.numeric(tol)) new("KMeansModel", jobj = jobj) }) http://git-wip-us.apache.org/repos/asf/spark/blob/7f24a0b6/R/pkg/inst/tests/testthat/test_mllib_clustering.R -- diff --git a/R/pkg/inst/tests/testthat/test_mllib_clustering.R b/R/pkg/inst/tests/testthat/test_mllib_clustering.R index 1980fff..f013991 100644 --- a/R/pkg/inst/tests/testthat/test_mllib_clustering.R +++ b/R/pkg/inst/tests/testthat/test_mllib_clustering.R @@ -132,6 +132,26 @@ test_that("spark.kmeans", { expect_true(summary2$is.loaded) unlink(modelPath) + + # Test Kmeans on dataset that is sensitive to seed value + col1 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0) + col2 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0) + col3 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0) + cols <- as.data.frame(cbind(col1, col2, col3)) + df <- createDataFrame(cols) + + model1 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10, + initMode = "random", seed = 1, tol = 1E-5) + model2 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10, + initMode = "random", seed = 2, tol = 1E-5) + + fitted.model1 <- fitted(model1) + fitted.model2 <- fitted(model2) + # The predicted clusters are different + expect_equal(sort(collect(distinct(select(fitted.model1,
spark git commit: [SPARK-19092][SQL] Save() API of DataFrameWriter should not scan all the saved files
Repository: spark Updated Branches: refs/heads/master c983267b0 -> 3356b8b6a [SPARK-19092][SQL] Save() API of DataFrameWriter should not scan all the saved files ### What changes were proposed in this pull request? `DataFrameWriter`'s [save() API](https://github.com/gatorsmile/spark/blob/5d38f09f47a767a342a0a8219c63efa2943b5d1f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L207) is performing a unnecessary full filesystem scan for the saved files. The save() API is the most basic/core API in `DataFrameWriter`. We should avoid it. The related PR: https://github.com/apache/spark/pull/16090 ### How was this patch tested? Updated the existing test cases. Author: gatorsmileCloses #16481 from gatorsmile/saveFileScan. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3356b8b6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3356b8b6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3356b8b6 Branch: refs/heads/master Commit: 3356b8b6a9184fcab8d0fe993f3545c3beaa4d99 Parents: c983267 Author: gatorsmile Authored: Fri Jan 13 13:05:53 2017 +0800 Committer: Wenchen Fan Committed: Fri Jan 13 13:05:53 2017 +0800 -- .../command/createDataSourceTables.scala| 2 +- .../sql/execution/datasources/DataSource.scala | 172 +++ .../hive/PartitionedTablePerfStatsSuite.scala | 29 +--- 3 files changed, 106 insertions(+), 97 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3356b8b6/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 73b2153..90aeebd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -199,7 +199,7 @@ case class CreateDataSourceTableAsSelectCommand( catalogTable = if (tableExists) Some(table) else None) try { - dataSource.write(mode, Dataset.ofRows(session, query)) + dataSource.writeAndRead(mode, Dataset.ofRows(session, query)) } catch { case ex: AnalysisException => logError(s"Failed to write to table ${table.identifier.unquotedString}", ex) http://git-wip-us.apache.org/repos/asf/spark/blob/3356b8b6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index b7f3559..29afe57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -413,10 +413,85 @@ case class DataSource( relation } - /** Writes the given [[DataFrame]] out to this [[DataSource]]. */ - def write( - mode: SaveMode, - data: DataFrame): BaseRelation = { + /** + * Writes the given [[DataFrame]] out in this [[FileFormat]]. + */ + private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = { +// Don't glob path for the write path. The contracts here are: +// 1. Only one output path can be specified on the write path; +// 2. Output path must be a legal HDFS style file system path; +// 3. It's OK that the output path doesn't exist yet; +val allPaths = paths ++ caseInsensitiveOptions.get("path") +val outputPath = if (allPaths.length == 1) { + val path = new Path(allPaths.head) + val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) + path.makeQualified(fs.getUri, fs.getWorkingDirectory) +} else { + throw new IllegalArgumentException("Expected exactly one path to be specified, but " + +s"got: ${allPaths.mkString(", ")}") +} + +val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis +PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive) + +// If we are appending to a table that already exists, make sure the partitioning matches +// up. If we fail to load the table for whatever reason, ignore the check. +if (mode == SaveMode.Append) { + val existingPartitionColumns = Try { +getOrInferFileFormatSchema(format, justPartitioning =
spark git commit: [SPARK-19110][MLLIB][FOLLOWUP] Add a unit test for testing logPrior and logLikelihood of DistributedLDAModel in MLLIB
Repository: spark Updated Branches: refs/heads/master 5585ed93b -> c983267b0 [SPARK-19110][MLLIB][FOLLOWUP] Add a unit test for testing logPrior and logLikelihood of DistributedLDAModel in MLLIB ## What changes were proposed in this pull request? #16491 added the fix to mllib and a unit test to ml. This followup PR, add unit tests to mllib suite. ## How was this patch tested? Unit tests. Author: wm...@hotmail.comCloses #16524 from wangmiao1981/ldabug. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c983267b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c983267b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c983267b Branch: refs/heads/master Commit: c983267b0853f908d1c671cedd18b159e6993df1 Parents: 5585ed9 Author: wm...@hotmail.com Authored: Thu Jan 12 18:31:57 2017 -0800 Committer: Joseph K. Bradley Committed: Thu Jan 12 18:31:57 2017 -0800 -- .../test/scala/org/apache/spark/mllib/clustering/LDASuite.scala| 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c983267b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 211e2bc..086bb21 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -505,6 +505,8 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext { assert(distributedModel.topicConcentration === sameDistributedModel.topicConcentration) assert(distributedModel.gammaShape === sameDistributedModel.gammaShape) assert(distributedModel.globalTopicTotals === sameDistributedModel.globalTopicTotals) + assert(distributedModel.logLikelihood ~== sameDistributedModel.logLikelihood absTol 1e-6) + assert(distributedModel.logPrior ~== sameDistributedModel.logPrior absTol 1e-6) val graph = distributedModel.graph val sameGraph = sameDistributedModel.graph - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17237][SQL] Remove backticks in a pivot result schema
Repository: spark Updated Branches: refs/heads/master 2bc4d4e28 -> 5585ed93b [SPARK-17237][SQL] Remove backticks in a pivot result schema ## What changes were proposed in this pull request? Pivoting adds backticks (e.g. 3_count(\`c\`)) in column names and, in some cases, thes causes analysis exceptions like; ``` scala> val df = Seq((2, 3, 4), (3, 4, 5)).toDF("a", "x", "y") scala> df.groupBy("a").pivot("x").agg(count("y"), avg("y")).na.fill(0) org.apache.spark.sql.AnalysisException: syntax error in attribute name: `3_count(`y`)`; at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.e$1(unresolved.scala:134) at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.parseAttributeName(unresolved.scala:144) ... ``` So, this pr proposes to remove these backticks from column names. ## How was this patch tested? Added a test in `DataFrameAggregateSuite`. Author: Takeshi YAMAMUROCloses #14812 from maropu/SPARK-17237. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5585ed93 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5585ed93 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5585ed93 Branch: refs/heads/master Commit: 5585ed93b09bc05cdd7a731650eca50d43d7159b Parents: 2bc4d4e Author: Takeshi YAMAMURO Authored: Thu Jan 12 09:46:53 2017 -0800 Committer: gatorsmile Committed: Thu Jan 12 09:46:53 2017 -0800 -- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala| 2 +- .../scala/org/apache/spark/sql/DataFrameAggregateSuite.scala | 8 .../scala/org/apache/spark/sql/DataFramePivotSuite.scala | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5585ed93/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 3c58832..1957df8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -477,7 +477,7 @@ class Analyzer( } else { val suffix = aggregate match { case n: NamedExpression => n.name - case _ => aggregate.sql + case _ => toPrettySQL(aggregate) } value + "_" + suffix } http://git-wip-us.apache.org/repos/asf/spark/blob/5585ed93/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 7853b22fe..e707912 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -530,4 +530,12 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { limit2Df.groupBy("id").count().select($"id"), limit2Df.select($"id")) } + + test("SPARK-17237 remove backticks in a pivot result schema") { +val df = Seq((2, 3, 4), (3, 4, 5)).toDF("a", "x", "y") +checkAnswer( + df.groupBy("a").pivot("x").agg(count("y"), avg("y")).na.fill(0), + Seq(Row(3, 0, 0.0, 1, 5.0), Row(2, 1, 4.0, 0, 0.0)) +) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/5585ed93/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala index a8d854c..51ffe34 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala @@ -200,7 +200,7 @@ class DataFramePivotSuite extends QueryTest with SharedSQLContext{ test("pivot preserves aliases if given") { assertResult( - Array("year", "dotNET_foo", "dotNET_avg(`earnings`)", "Java_foo", "Java_avg(`earnings`)") + Array("year", "dotNET_foo", "dotNET_avg(earnings)", "Java_foo", "Java_avg(earnings)") )( courseSales.groupBy($"year") .pivot("course", Seq("dotNET", "Java")) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional
spark git commit: [SPARK-17237][SQL] Remove backticks in a pivot result schema
Repository: spark Updated Branches: refs/heads/branch-2.1 042e32d18 -> 23944d0d6 [SPARK-17237][SQL] Remove backticks in a pivot result schema ## What changes were proposed in this pull request? Pivoting adds backticks (e.g. 3_count(\`c\`)) in column names and, in some cases, thes causes analysis exceptions like; ``` scala> val df = Seq((2, 3, 4), (3, 4, 5)).toDF("a", "x", "y") scala> df.groupBy("a").pivot("x").agg(count("y"), avg("y")).na.fill(0) org.apache.spark.sql.AnalysisException: syntax error in attribute name: `3_count(`y`)`; at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.e$1(unresolved.scala:134) at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.parseAttributeName(unresolved.scala:144) ... ``` So, this pr proposes to remove these backticks from column names. ## How was this patch tested? Added a test in `DataFrameAggregateSuite`. Author: Takeshi YAMAMUROCloses #14812 from maropu/SPARK-17237. (cherry picked from commit 5585ed93b09bc05cdd7a731650eca50d43d7159b) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23944d0d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23944d0d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23944d0d Branch: refs/heads/branch-2.1 Commit: 23944d0d64a07d29e9bfcb8f8d6d22858ec02aef Parents: 042e32d Author: Takeshi YAMAMURO Authored: Thu Jan 12 09:46:53 2017 -0800 Committer: gatorsmile Committed: Thu Jan 12 09:47:09 2017 -0800 -- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala| 2 +- .../scala/org/apache/spark/sql/DataFrameAggregateSuite.scala | 8 .../scala/org/apache/spark/sql/DataFramePivotSuite.scala | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/23944d0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index ab9de02..f873996 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -383,7 +383,7 @@ class Analyzer( } else { val suffix = aggregate match { case n: NamedExpression => n.name - case _ => aggregate.sql + case _ => toPrettySQL(aggregate) } value + "_" + suffix } http://git-wip-us.apache.org/repos/asf/spark/blob/23944d0d/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 7853b22fe..e707912 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -530,4 +530,12 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { limit2Df.groupBy("id").count().select($"id"), limit2Df.select($"id")) } + + test("SPARK-17237 remove backticks in a pivot result schema") { +val df = Seq((2, 3, 4), (3, 4, 5)).toDF("a", "x", "y") +checkAnswer( + df.groupBy("a").pivot("x").agg(count("y"), avg("y")).na.fill(0), + Seq(Row(3, 0, 0.0, 1, 5.0), Row(2, 1, 4.0, 0, 0.0)) +) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/23944d0d/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala index a8d854c..51ffe34 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala @@ -200,7 +200,7 @@ class DataFramePivotSuite extends QueryTest with SharedSQLContext{ test("pivot preserves aliases if given") { assertResult( - Array("year", "dotNET_foo", "dotNET_avg(`earnings`)", "Java_foo", "Java_avg(`earnings`)") + Array("year", "dotNET_foo", "dotNET_avg(earnings)", "Java_foo", "Java_avg(earnings)") )( courseSales.groupBy($"year") .pivot("course", Seq("dotNET", "Java"))
spark git commit: [SPARK-12757][CORE] lower "block locks were not released" log to info level
Repository: spark Updated Branches: refs/heads/master c6c37b8af -> 2bc4d4e28 [SPARK-12757][CORE] lower "block locks were not released" log to info level ## What changes were proposed in this pull request? lower "block locks were not released" log to info level, as it is generating a lot of warnings in running ML, graph calls, as pointed out in the JIRA. Author: Felix CheungCloses #16513 from felixcheung/blocklockswarn. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bc4d4e2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bc4d4e2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bc4d4e2 Branch: refs/heads/master Commit: 2bc4d4e286e65f8b4e9ee21bccd913b62e6061f2 Parents: c6c37b8 Author: Felix Cheung Authored: Thu Jan 12 09:45:16 2017 -0800 Committer: Felix Cheung Committed: Thu Jan 12 09:45:16 2017 -0800 -- core/src/main/scala/org/apache/spark/executor/Executor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2bc4d4e2/core/src/main/scala/org/apache/spark/executor/Executor.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 789198f..b6c0f0c 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -336,7 +336,7 @@ private[spark] class Executor( if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false)) { throw new SparkException(errMsg) } else { - logWarning(errMsg) + logInfo(errMsg) } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19055][SQL][PYSPARK] Fix SparkSession initialization when SparkContext is stopped
Repository: spark Updated Branches: refs/heads/branch-2.0 3566e40a4 -> 55d2a1178 [SPARK-19055][SQL][PYSPARK] Fix SparkSession initialization when SparkContext is stopped ## What changes were proposed in this pull request? In SparkSession initialization, we store created the instance of SparkSession into a class variable _instantiatedContext. Next time we can use SparkSession.builder.getOrCreate() to retrieve the existing SparkSession instance. However, when the active SparkContext is stopped and we create another new SparkContext to use, the existing SparkSession is still associated with the stopped SparkContext. So the operations with this existing SparkSession will be failed. We need to detect such case in SparkSession and renew the class variable _instantiatedContext if needed. ## How was this patch tested? New test added in PySpark. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi HsiehCloses #16454 from viirya/fix-pyspark-sparksession. (cherry picked from commit c6c37b8af714c8ddc8c77ac943a379f703558f27) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/55d2a117 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/55d2a117 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/55d2a117 Branch: refs/heads/branch-2.0 Commit: 55d2a117805d76fd27e2960f92ece88238488231 Parents: 3566e40 Author: Liang-Chi Hsieh Authored: Thu Jan 12 20:53:31 2017 +0800 Committer: Wenchen Fan Committed: Thu Jan 12 20:54:41 2017 +0800 -- python/pyspark/sql/session.py | 16 ++-- python/pyspark/sql/tests.py | 23 +++ 2 files changed, 33 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/55d2a117/python/pyspark/sql/session.py -- diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index d25823d..79017c6 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -161,8 +161,8 @@ class SparkSession(object): with self._lock: from pyspark.context import SparkContext from pyspark.conf import SparkConf -session = SparkSession._instantiatedContext -if session is None: +session = SparkSession._instantiatedSession +if session is None or session._sc._jsc is None: sparkConf = SparkConf() for key, value in self._options.items(): sparkConf.set(key, value) @@ -183,7 +183,7 @@ class SparkSession(object): builder = Builder() -_instantiatedContext = None +_instantiatedSession = None @ignore_unicode_prefix def __init__(self, sparkContext, jsparkSession=None): @@ -214,8 +214,12 @@ class SparkSession(object): self._wrapped = SQLContext(self._sc, self, self._jwrapped) _monkey_patch_RDD(self) install_exception_handler() -if SparkSession._instantiatedContext is None: -SparkSession._instantiatedContext = self +# If we had an instantiated SparkSession attached with a SparkContext +# which is stopped now, we need to renew the instantiated SparkSession. +# Otherwise, we will use invalid SparkSession when we call Builder.getOrCreate. +if SparkSession._instantiatedSession is None \ +or SparkSession._instantiatedSession._sc._jsc is None: +SparkSession._instantiatedSession = self @since(2.0) def newSession(self): @@ -597,7 +601,7 @@ class SparkSession(object): """Stop the underlying :class:`SparkContext`. """ self._sc.stop() -SparkSession._instantiatedContext = None +SparkSession._instantiatedSession = None @since(2.0) def __enter__(self): http://git-wip-us.apache.org/repos/asf/spark/blob/55d2a117/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index b3cf72b..796b964 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -46,6 +46,7 @@ if sys.version_info[:2] <= (2, 6): else: import unittest +from pyspark import SparkContext from pyspark.sql import SparkSession, HiveContext, Column, Row from pyspark.sql.types import * from pyspark.sql.types import UserDefinedType, _infer_type @@ -1772,6 +1773,28 @@ class HiveSparkSubmitTests(SparkSubmitTests): self.assertTrue(os.path.exists(metastore_path)) +class SQLTests2(ReusedPySparkTestCase): + +
spark git commit: [SPARK-19055][SQL][PYSPARK] Fix SparkSession initialization when SparkContext is stopped
Repository: spark Updated Branches: refs/heads/branch-2.1 616a78a56 -> 042e32d18 [SPARK-19055][SQL][PYSPARK] Fix SparkSession initialization when SparkContext is stopped ## What changes were proposed in this pull request? In SparkSession initialization, we store created the instance of SparkSession into a class variable _instantiatedContext. Next time we can use SparkSession.builder.getOrCreate() to retrieve the existing SparkSession instance. However, when the active SparkContext is stopped and we create another new SparkContext to use, the existing SparkSession is still associated with the stopped SparkContext. So the operations with this existing SparkSession will be failed. We need to detect such case in SparkSession and renew the class variable _instantiatedContext if needed. ## How was this patch tested? New test added in PySpark. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi HsiehCloses #16454 from viirya/fix-pyspark-sparksession. (cherry picked from commit c6c37b8af714c8ddc8c77ac943a379f703558f27) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/042e32d1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/042e32d1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/042e32d1 Branch: refs/heads/branch-2.1 Commit: 042e32d18ad10be5c60907959e55b0324df5b2c0 Parents: 616a78a Author: Liang-Chi Hsieh Authored: Thu Jan 12 20:53:31 2017 +0800 Committer: Wenchen Fan Committed: Thu Jan 12 20:54:16 2017 +0800 -- python/pyspark/sql/session.py | 16 ++-- python/pyspark/sql/tests.py | 23 +++ 2 files changed, 33 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/042e32d1/python/pyspark/sql/session.py -- diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 1e40b9c..9f4772e 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -161,8 +161,8 @@ class SparkSession(object): with self._lock: from pyspark.context import SparkContext from pyspark.conf import SparkConf -session = SparkSession._instantiatedContext -if session is None: +session = SparkSession._instantiatedSession +if session is None or session._sc._jsc is None: sparkConf = SparkConf() for key, value in self._options.items(): sparkConf.set(key, value) @@ -183,7 +183,7 @@ class SparkSession(object): builder = Builder() -_instantiatedContext = None +_instantiatedSession = None @ignore_unicode_prefix def __init__(self, sparkContext, jsparkSession=None): @@ -214,8 +214,12 @@ class SparkSession(object): self._wrapped = SQLContext(self._sc, self, self._jwrapped) _monkey_patch_RDD(self) install_exception_handler() -if SparkSession._instantiatedContext is None: -SparkSession._instantiatedContext = self +# If we had an instantiated SparkSession attached with a SparkContext +# which is stopped now, we need to renew the instantiated SparkSession. +# Otherwise, we will use invalid SparkSession when we call Builder.getOrCreate. +if SparkSession._instantiatedSession is None \ +or SparkSession._instantiatedSession._sc._jsc is None: +SparkSession._instantiatedSession = self @since(2.0) def newSession(self): @@ -595,7 +599,7 @@ class SparkSession(object): """Stop the underlying :class:`SparkContext`. """ self._sc.stop() -SparkSession._instantiatedContext = None +SparkSession._instantiatedSession = None @since(2.0) def __enter__(self): http://git-wip-us.apache.org/repos/asf/spark/blob/042e32d1/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 6de63e6..fe034bc 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -46,6 +46,7 @@ if sys.version_info[:2] <= (2, 6): else: import unittest +from pyspark import SparkContext from pyspark.sql import SparkSession, HiveContext, Column, Row from pyspark.sql.types import * from pyspark.sql.types import UserDefinedType, _infer_type @@ -1877,6 +1878,28 @@ class HiveSparkSubmitTests(SparkSubmitTests): self.assertTrue(os.path.exists(metastore_path)) +class SQLTests2(ReusedPySparkTestCase): + +
spark git commit: [SPARK-19055][SQL][PYSPARK] Fix SparkSession initialization when SparkContext is stopped
Repository: spark Updated Branches: refs/heads/master 871d26664 -> c6c37b8af [SPARK-19055][SQL][PYSPARK] Fix SparkSession initialization when SparkContext is stopped ## What changes were proposed in this pull request? In SparkSession initialization, we store created the instance of SparkSession into a class variable _instantiatedContext. Next time we can use SparkSession.builder.getOrCreate() to retrieve the existing SparkSession instance. However, when the active SparkContext is stopped and we create another new SparkContext to use, the existing SparkSession is still associated with the stopped SparkContext. So the operations with this existing SparkSession will be failed. We need to detect such case in SparkSession and renew the class variable _instantiatedContext if needed. ## How was this patch tested? New test added in PySpark. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi HsiehCloses #16454 from viirya/fix-pyspark-sparksession. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6c37b8a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6c37b8a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6c37b8a Branch: refs/heads/master Commit: c6c37b8af714c8ddc8c77ac943a379f703558f27 Parents: 871d266 Author: Liang-Chi Hsieh Authored: Thu Jan 12 20:53:31 2017 +0800 Committer: Wenchen Fan Committed: Thu Jan 12 20:53:31 2017 +0800 -- python/pyspark/sql/session.py | 16 ++-- python/pyspark/sql/tests.py | 23 +++ 2 files changed, 33 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c6c37b8a/python/pyspark/sql/session.py -- diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 1e40b9c..9f4772e 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -161,8 +161,8 @@ class SparkSession(object): with self._lock: from pyspark.context import SparkContext from pyspark.conf import SparkConf -session = SparkSession._instantiatedContext -if session is None: +session = SparkSession._instantiatedSession +if session is None or session._sc._jsc is None: sparkConf = SparkConf() for key, value in self._options.items(): sparkConf.set(key, value) @@ -183,7 +183,7 @@ class SparkSession(object): builder = Builder() -_instantiatedContext = None +_instantiatedSession = None @ignore_unicode_prefix def __init__(self, sparkContext, jsparkSession=None): @@ -214,8 +214,12 @@ class SparkSession(object): self._wrapped = SQLContext(self._sc, self, self._jwrapped) _monkey_patch_RDD(self) install_exception_handler() -if SparkSession._instantiatedContext is None: -SparkSession._instantiatedContext = self +# If we had an instantiated SparkSession attached with a SparkContext +# which is stopped now, we need to renew the instantiated SparkSession. +# Otherwise, we will use invalid SparkSession when we call Builder.getOrCreate. +if SparkSession._instantiatedSession is None \ +or SparkSession._instantiatedSession._sc._jsc is None: +SparkSession._instantiatedSession = self @since(2.0) def newSession(self): @@ -595,7 +599,7 @@ class SparkSession(object): """Stop the underlying :class:`SparkContext`. """ self._sc.stop() -SparkSession._instantiatedContext = None +SparkSession._instantiatedSession = None @since(2.0) def __enter__(self): http://git-wip-us.apache.org/repos/asf/spark/blob/c6c37b8a/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 18fd68e..d178285 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -46,6 +46,7 @@ if sys.version_info[:2] <= (2, 6): else: import unittest +from pyspark import SparkContext from pyspark.sql import SparkSession, HiveContext, Column, Row from pyspark.sql.types import * from pyspark.sql.types import UserDefinedType, _infer_type @@ -1886,6 +1887,28 @@ class HiveSparkSubmitTests(SparkSubmitTests): self.assertTrue(os.path.exists(metastore_path)) +class SQLTests2(ReusedPySparkTestCase): + +@classmethod +def setUpClass(cls): +ReusedPySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +
spark git commit: [SPARK-18969][SQL] Support grouping by nondeterministic expressions
Repository: spark Updated Branches: refs/heads/branch-2.0 c94288b57 -> 3566e40a4 [SPARK-18969][SQL] Support grouping by nondeterministic expressions ## What changes were proposed in this pull request? Currently nondeterministic expressions are allowed in `Aggregate`(see the [comment](https://github.com/apache/spark/blob/v2.0.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L249-L251)), but the `PullOutNondeterministic` analyzer rule failed to handle `Aggregate`, this PR fixes it. close https://github.com/apache/spark/pull/16379 There is still one remaining issue: `SELECT a + rand() FROM t GROUP BY a + rand()` is not allowed, because the 2 `rand()` are different(we generate random seed as the default seed for `rand()`). https://issues.apache.org/jira/browse/SPARK-19035 is tracking this issue. ## How was this patch tested? a new test suite Author: Wenchen FanCloses #16404 from cloud-fan/groupby. (cherry picked from commit 871d266649ddfed38c64dfda7158d8bb58d4b979) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3566e40a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3566e40a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3566e40a Branch: refs/heads/branch-2.0 Commit: 3566e40a4ce319e095780062abf94154b4aba334 Parents: c94288b Author: Wenchen Fan Authored: Thu Jan 12 20:21:04 2017 +0800 Committer: Wenchen Fan Committed: Thu Jan 12 20:25:44 2017 +0800 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 37 - .../analysis/PullOutNondeterministicSuite.scala | 56 .../sql-tests/results/group-by-ordinal.sql.out | 10 ++-- 3 files changed, 86 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3566e40a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 32dc70a..9040ced 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1789,28 +1789,37 @@ class Analyzer( case p: Project => p case f: Filter => f + case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) => +val nondeterToAttr = getNondeterToAttr(a.groupingExpressions) +val newChild = Project(a.child.output ++ nondeterToAttr.values, a.child) +a.transformExpressions { case e => + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) +}.copy(child = newChild) + // todo: It's hard to write a general rule to pull out nondeterministic expressions // from LogicalPlan, currently we only do it for UnaryNode which has same output // schema with its child. case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) => -val nondeterministicExprs = p.expressions.filterNot(_.deterministic).flatMap { expr => - val leafNondeterministic = expr.collect { -case n: Nondeterministic => n - } - leafNondeterministic.map { e => -val ne = e match { - case n: NamedExpression => n - case _ => Alias(e, "_nondeterministic")(isGenerated = true) -} -new TreeNodeRef(e) -> ne - } -}.toMap +val nondeterToAttr = getNondeterToAttr(p.expressions) val newPlan = p.transformExpressions { case e => - nondeterministicExprs.get(new TreeNodeRef(e)).map(_.toAttribute).getOrElse(e) + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) } -val newChild = Project(p.child.output ++ nondeterministicExprs.values, p.child) +val newChild = Project(p.child.output ++ nondeterToAttr.values, p.child) Project(p.output, newPlan.withNewChildren(newChild :: Nil)) } + +private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, NamedExpression] = { + exprs.filterNot(_.deterministic).flatMap { expr => +val leafNondeterministic = expr.collect { case n: Nondeterministic => n } +leafNondeterministic.distinct.map { e => + val ne = e match { +case n: NamedExpression => n +case _ => Alias(e, "_nondeterministic")(isGenerated = true) + } + e -> ne +} + }.toMap +} } /**
spark git commit: [SPARK-18969][SQL] Support grouping by nondeterministic expressions
Repository: spark Updated Branches: refs/heads/branch-2.1 9b9867ef5 -> 616a78a56 [SPARK-18969][SQL] Support grouping by nondeterministic expressions ## What changes were proposed in this pull request? Currently nondeterministic expressions are allowed in `Aggregate`(see the [comment](https://github.com/apache/spark/blob/v2.0.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L249-L251)), but the `PullOutNondeterministic` analyzer rule failed to handle `Aggregate`, this PR fixes it. close https://github.com/apache/spark/pull/16379 There is still one remaining issue: `SELECT a + rand() FROM t GROUP BY a + rand()` is not allowed, because the 2 `rand()` are different(we generate random seed as the default seed for `rand()`). https://issues.apache.org/jira/browse/SPARK-19035 is tracking this issue. ## How was this patch tested? a new test suite Author: Wenchen FanCloses #16404 from cloud-fan/groupby. (cherry picked from commit 871d266649ddfed38c64dfda7158d8bb58d4b979) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/616a78a5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/616a78a5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/616a78a5 Branch: refs/heads/branch-2.1 Commit: 616a78a56cc911953e3133e60ab8c5a4fc287539 Parents: 9b9867e Author: Wenchen Fan Authored: Thu Jan 12 20:21:04 2017 +0800 Committer: Wenchen Fan Committed: Thu Jan 12 20:24:23 2017 +0800 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 37 - .../analysis/PullOutNondeterministicSuite.scala | 56 .../sql-tests/results/group-by-ordinal.sql.out | 10 ++-- 3 files changed, 86 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/616a78a5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f17c372..ab9de02 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1859,28 +1859,37 @@ class Analyzer( case p: Project => p case f: Filter => f + case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) => +val nondeterToAttr = getNondeterToAttr(a.groupingExpressions) +val newChild = Project(a.child.output ++ nondeterToAttr.values, a.child) +a.transformExpressions { case e => + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) +}.copy(child = newChild) + // todo: It's hard to write a general rule to pull out nondeterministic expressions // from LogicalPlan, currently we only do it for UnaryNode which has same output // schema with its child. case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) => -val nondeterministicExprs = p.expressions.filterNot(_.deterministic).flatMap { expr => - val leafNondeterministic = expr.collect { -case n: Nondeterministic => n - } - leafNondeterministic.map { e => -val ne = e match { - case n: NamedExpression => n - case _ => Alias(e, "_nondeterministic")(isGenerated = true) -} -new TreeNodeRef(e) -> ne - } -}.toMap +val nondeterToAttr = getNondeterToAttr(p.expressions) val newPlan = p.transformExpressions { case e => - nondeterministicExprs.get(new TreeNodeRef(e)).map(_.toAttribute).getOrElse(e) + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) } -val newChild = Project(p.child.output ++ nondeterministicExprs.values, p.child) +val newChild = Project(p.child.output ++ nondeterToAttr.values, p.child) Project(p.output, newPlan.withNewChildren(newChild :: Nil)) } + +private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, NamedExpression] = { + exprs.filterNot(_.deterministic).flatMap { expr => +val leafNondeterministic = expr.collect { case n: Nondeterministic => n } +leafNondeterministic.distinct.map { e => + val ne = e match { +case n: NamedExpression => n +case _ => Alias(e, "_nondeterministic")(isGenerated = true) + } + e -> ne +} + }.toMap +} } /**
spark git commit: [SPARK-18969][SQL] Support grouping by nondeterministic expressions
Repository: spark Updated Branches: refs/heads/master c71b25481 -> 871d26664 [SPARK-18969][SQL] Support grouping by nondeterministic expressions ## What changes were proposed in this pull request? Currently nondeterministic expressions are allowed in `Aggregate`(see the [comment](https://github.com/apache/spark/blob/v2.0.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L249-L251)), but the `PullOutNondeterministic` analyzer rule failed to handle `Aggregate`, this PR fixes it. close https://github.com/apache/spark/pull/16379 There is still one remaining issue: `SELECT a + rand() FROM t GROUP BY a + rand()` is not allowed, because the 2 `rand()` are different(we generate random seed as the default seed for `rand()`). https://issues.apache.org/jira/browse/SPARK-19035 is tracking this issue. ## How was this patch tested? a new test suite Author: Wenchen FanCloses #16404 from cloud-fan/groupby. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/871d2666 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/871d2666 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/871d2666 Branch: refs/heads/master Commit: 871d266649ddfed38c64dfda7158d8bb58d4b979 Parents: c71b254 Author: Wenchen Fan Authored: Thu Jan 12 20:21:04 2017 +0800 Committer: Wenchen Fan Committed: Thu Jan 12 20:21:04 2017 +0800 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 37 - .../analysis/PullOutNondeterministicSuite.scala | 56 .../sql-tests/results/group-by-ordinal.sql.out | 10 ++-- 3 files changed, 86 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/871d2666/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d461531..3c58832 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2008,28 +2008,37 @@ class Analyzer( case p: Project => p case f: Filter => f + case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) => +val nondeterToAttr = getNondeterToAttr(a.groupingExpressions) +val newChild = Project(a.child.output ++ nondeterToAttr.values, a.child) +a.transformExpressions { case e => + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) +}.copy(child = newChild) + // todo: It's hard to write a general rule to pull out nondeterministic expressions // from LogicalPlan, currently we only do it for UnaryNode which has same output // schema with its child. case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) => -val nondeterministicExprs = p.expressions.filterNot(_.deterministic).flatMap { expr => - val leafNondeterministic = expr.collect { -case n: Nondeterministic => n - } - leafNondeterministic.map { e => -val ne = e match { - case n: NamedExpression => n - case _ => Alias(e, "_nondeterministic")(isGenerated = true) -} -new TreeNodeRef(e) -> ne - } -}.toMap +val nondeterToAttr = getNondeterToAttr(p.expressions) val newPlan = p.transformExpressions { case e => - nondeterministicExprs.get(new TreeNodeRef(e)).map(_.toAttribute).getOrElse(e) + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) } -val newChild = Project(p.child.output ++ nondeterministicExprs.values, p.child) +val newChild = Project(p.child.output ++ nondeterToAttr.values, p.child) Project(p.output, newPlan.withNewChildren(newChild :: Nil)) } + +private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, NamedExpression] = { + exprs.filterNot(_.deterministic).flatMap { expr => +val leafNondeterministic = expr.collect { case n: Nondeterministic => n } +leafNondeterministic.distinct.map { e => + val ne = e match { +case n: NamedExpression => n +case _ => Alias(e, "_nondeterministic")(isGenerated = true) + } + e -> ne +} + }.toMap +} } /**
spark git commit: [SPARK-18857][SQL] Don't use `Iterator.duplicate` for `incrementalCollect` in Thrift Server
Repository: spark Updated Branches: refs/heads/branch-2.0 ec2fe925c -> c94288b57 [SPARK-18857][SQL] Don't use `Iterator.duplicate` for `incrementalCollect` in Thrift Server ## What changes were proposed in this pull request? To support `FETCH_FIRST`, SPARK-16563 used Scala `Iterator.duplicate`. However, Scala `Iterator.duplicate` uses a **queue to buffer all items between both iterators**, this causes GC and hangs for queries with large number of rows. We should not use this, especially for `spark.sql.thriftServer.incrementalCollect`. https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/Iterator.scala#L1262-L1300 ## How was this patch tested? Pass the existing tests. Author: Dongjoon HyunCloses #16440 from dongjoon-hyun/SPARK-18857. (cherry picked from commit a2c6adcc5d2702d2f0e9b239517353335e5f911e) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c94288b5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c94288b5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c94288b5 Branch: refs/heads/branch-2.0 Commit: c94288b57b5ce2232e58e35cada558d8d5b8ec6e Parents: ec2fe92 Author: Dongjoon Hyun Authored: Tue Jan 10 13:27:55 2017 + Committer: Sean Owen Committed: Thu Jan 12 10:45:26 2017 + -- .../org/apache/spark/sql/internal/SQLConf.scala | 7 + .../SparkExecuteStatementOperation.scala| 30 +--- 2 files changed, 26 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c94288b5/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7598d47..edec6f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -295,6 +295,13 @@ object SQLConf { .stringConf .createOptional + val THRIFTSERVER_INCREMENTAL_COLLECT = +SQLConfigBuilder("spark.sql.thriftServer.incrementalCollect") + .internal() + .doc("When true, enable incremental collection for execution in Thrift Server.") + .booleanConf + .createWithDefault(false) + val THRIFTSERVER_UI_STATEMENT_LIMIT = SQLConfigBuilder("spark.sql.thriftserver.ui.retainedStatements") .doc("The number of SQL statements kept in the JDBC/ODBC web UI history.") http://git-wip-us.apache.org/repos/asf/spark/blob/c94288b5/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 8a78523..a95170e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -50,8 +50,13 @@ private[hive] class SparkExecuteStatementOperation( with Logging { private var result: DataFrame = _ + + // We cache the returned rows to get iterators again in case the user wants to use FETCH_FIRST. + // This is only used when `spark.sql.thriftServer.incrementalCollect` is set to `false`. + // In case of `true`, this will be `None` and FETCH_FIRST will trigger re-execution. + private var resultList: Option[Array[SparkRow]] = _ + private var iter: Iterator[SparkRow] = _ - private var iterHeader: Iterator[SparkRow] = _ private var dataTypes: Array[DataType] = _ private var statementId: String = _ @@ -111,9 +116,15 @@ private[hive] class SparkExecuteStatementOperation( // Reset iter to header when fetching start from first row if (order.equals(FetchOrientation.FETCH_FIRST)) { - val (ita, itb) = iterHeader.duplicate - iter = ita - iterHeader = itb + iter = if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) { +resultList = None +result.toLocalIterator.asScala + } else { +if (resultList.isEmpty) { + resultList = Some(result.collect()) +} +resultList.get.iterator + } } if (!iter.hasNext) { @@ -226,17 +237,14 @@ private[hive] class SparkExecuteStatementOperation( }
spark git commit: [SPARK-18857][SQL] Don't use `Iterator.duplicate` for `incrementalCollect` in Thrift Server
Repository: spark Updated Branches: refs/heads/branch-2.1 0b07634b5 -> 9b9867ef5 [SPARK-18857][SQL] Don't use `Iterator.duplicate` for `incrementalCollect` in Thrift Server ## What changes were proposed in this pull request? To support `FETCH_FIRST`, SPARK-16563 used Scala `Iterator.duplicate`. However, Scala `Iterator.duplicate` uses a **queue to buffer all items between both iterators**, this causes GC and hangs for queries with large number of rows. We should not use this, especially for `spark.sql.thriftServer.incrementalCollect`. https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/Iterator.scala#L1262-L1300 ## How was this patch tested? Pass the existing tests. Author: Dongjoon HyunCloses #16440 from dongjoon-hyun/SPARK-18857. (cherry picked from commit a2c6adcc5d2702d2f0e9b239517353335e5f911e) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b9867ef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b9867ef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b9867ef Branch: refs/heads/branch-2.1 Commit: 9b9867ef5b64b05f1e968de1fc0bfc1fcc64a707 Parents: 0b07634 Author: Dongjoon Hyun Authored: Tue Jan 10 13:27:55 2017 + Committer: Sean Owen Committed: Thu Jan 12 10:45:10 2017 + -- .../org/apache/spark/sql/internal/SQLConf.scala | 7 + .../SparkExecuteStatementOperation.scala| 30 +--- 2 files changed, 26 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9b9867ef/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8fbad60..8d493e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -309,6 +309,13 @@ object SQLConf { .stringConf .createOptional + val THRIFTSERVER_INCREMENTAL_COLLECT = +SQLConfigBuilder("spark.sql.thriftServer.incrementalCollect") + .internal() + .doc("When true, enable incremental collection for execution in Thrift Server.") + .booleanConf + .createWithDefault(false) + val THRIFTSERVER_UI_STATEMENT_LIMIT = SQLConfigBuilder("spark.sql.thriftserver.ui.retainedStatements") .doc("The number of SQL statements kept in the JDBC/ODBC web UI history.") http://git-wip-us.apache.org/repos/asf/spark/blob/9b9867ef/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index aeabd6a..517b01f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -50,8 +50,13 @@ private[hive] class SparkExecuteStatementOperation( with Logging { private var result: DataFrame = _ + + // We cache the returned rows to get iterators again in case the user wants to use FETCH_FIRST. + // This is only used when `spark.sql.thriftServer.incrementalCollect` is set to `false`. + // In case of `true`, this will be `None` and FETCH_FIRST will trigger re-execution. + private var resultList: Option[Array[SparkRow]] = _ + private var iter: Iterator[SparkRow] = _ - private var iterHeader: Iterator[SparkRow] = _ private var dataTypes: Array[DataType] = _ private var statementId: String = _ @@ -111,9 +116,15 @@ private[hive] class SparkExecuteStatementOperation( // Reset iter to header when fetching start from first row if (order.equals(FetchOrientation.FETCH_FIRST)) { - val (ita, itb) = iterHeader.duplicate - iter = ita - iterHeader = itb + iter = if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) { +resultList = None +result.toLocalIterator.asScala + } else { +if (resultList.isEmpty) { + resultList = Some(result.collect()) +} +resultList.get.iterator + } } if (!iter.hasNext) { @@ -227,17 +238,14 @@ private[hive] class SparkExecuteStatementOperation( }
spark git commit: [SPARK-19183][SQL] Add deleteWithJob hook to internal commit protocol API
Repository: spark Updated Branches: refs/heads/master 5db35b312 -> c71b25481 [SPARK-19183][SQL] Add deleteWithJob hook to internal commit protocol API ## What changes were proposed in this pull request? Currently in SQL we implement overwrites by calling fs.delete() directly on the original data. This is not ideal since we the original files end up deleted even if the job aborts. We should extend the commit protocol to allow file overwrites to be managed as well. ## How was this patch tested? Existing tests. I also fixed a bunch of tests that were depending on the commit protocol implementation being set to the legacy mapreduce one. cc rxin cloud-fan Author: Eric LiangAuthor: Eric Liang Closes #16554 from ericl/add-delete-protocol. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c71b2548 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c71b2548 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c71b2548 Branch: refs/heads/master Commit: c71b25481aa5f7bc27d5c979e66bed54cd46b97e Parents: 5db35b3 Author: Eric Liang Authored: Thu Jan 12 17:45:55 2017 +0800 Committer: Wenchen Fan Committed: Thu Jan 12 17:45:55 2017 +0800 -- .../spark/internal/io/FileCommitProtocol.scala | 9 ++ .../InsertIntoHadoopFsRelationCommand.scala | 25 ++-- .../datasources/HadoopFsRelationSuite.scala | 2 +- .../datasources/parquet/ParquetIOSuite.scala| 122 ++- .../ParquetPartitionDiscoverySuite.scala| 9 +- .../datasources/parquet/ParquetQuerySuite.scala | 5 + .../sql/hive/orc/OrcHadoopFsRelationSuite.scala | 4 +- .../sql/sources/HadoopFsRelationTest.scala | 77 ++-- .../sources/ParquetHadoopFsRelationSuite.scala | 6 +- 9 files changed, 149 insertions(+), 110 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala -- diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index afd2250..2394cf3 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -17,6 +17,7 @@ package org.apache.spark.internal.io +import org.apache.hadoop.fs._ import org.apache.hadoop.mapreduce._ import org.apache.spark.util.Utils @@ -112,6 +113,14 @@ abstract class FileCommitProtocol { * just crashes (or killed) before it can call abort. */ def abortTask(taskContext: TaskAttemptContext): Unit + + /** + * Specifies that a file should be deleted with the commit of this job. The default + * implementation deletes the file immediately. + */ + def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean): Boolean = { +fs.delete(path, recursive) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 423009e..652bcc8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -88,11 +88,20 @@ case class InsertIntoHadoopFsRelationCommand( } val pathExists = fs.exists(qualifiedOutputPath) +// If we are appending data to an existing dir. +val isAppend = pathExists && (mode == SaveMode.Append) + +val committer = FileCommitProtocol.instantiate( + sparkSession.sessionState.conf.fileCommitProtocolClass, + jobId = java.util.UUID.randomUUID().toString, + outputPath = outputPath.toString, + isAppend = isAppend) + val doInsertion = (mode, pathExists) match { case (SaveMode.ErrorIfExists, true) => throw new AnalysisException(s"path $qualifiedOutputPath already exists.") case (SaveMode.Overwrite, true) => -deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations) +deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer) true case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
spark git commit: [SPARK-19164][PYTHON][SQL] Remove unused UserDefinedFunction._broadcast
Repository: spark Updated Branches: refs/heads/master 2c586f506 -> 5db35b312 [SPARK-19164][PYTHON][SQL] Remove unused UserDefinedFunction._broadcast ## What changes were proposed in this pull request? Removes `UserDefinedFunction._broadcast` and `UserDefinedFunction.__del__` method. ## How was this patch tested? Existing unit tests. Author: zero323Closes #16538 from zero323/SPARK-19164. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5db35b31 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5db35b31 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5db35b31 Branch: refs/heads/master Commit: 5db35b312e96dea07f03100c64b58723c2430cd7 Parents: 2c586f5 Author: zero323 Authored: Thu Jan 12 01:05:02 2017 -0800 Committer: Reynold Xin Committed: Thu Jan 12 01:05:02 2017 -0800 -- python/pyspark/sql/functions.py | 6 -- 1 file changed, 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5db35b31/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 7fe901a..66d993a 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1826,7 +1826,6 @@ class UserDefinedFunction(object): def __init__(self, func, returnType, name=None): self.func = func self.returnType = returnType -self._broadcast = None self._judf = self._create_judf(name) def _create_judf(self, name): @@ -1842,11 +1841,6 @@ class UserDefinedFunction(object): name, wrapped_func, jdt) return judf -def __del__(self): -if self._broadcast is not None: -self._broadcast.unpersist() -self._broadcast = None - def __call__(self, *cols): sc = SparkContext._active_spark_context jc = self._judf.apply(_to_seq(sc, cols, _to_java_column)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19158][SPARKR][EXAMPLES] Fix ml.R example fails due to lack of e1071 package.
Repository: spark Updated Branches: refs/heads/branch-2.1 82fcc1330 -> 0b07634b5 [SPARK-19158][SPARKR][EXAMPLES] Fix ml.R example fails due to lack of e1071 package. ## What changes were proposed in this pull request? ```ml.R``` example depends on ```e1071``` package, if it's not available in users' environment, it will fail. I think the example should not depends on third-party packages, so I update it to remove the dependency. ## How was this patch tested? Manual test. Author: Yanbo LiangCloses #16548 from yanboliang/spark-19158. (cherry picked from commit 2c586f506de9e2ba592afae1f0c73b6ae631bb96) Signed-off-by: Yanbo Liang Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b07634b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b07634b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b07634b Branch: refs/heads/branch-2.1 Commit: 0b07634b5e06cc9030f20e277ec5956efff6c3fa Parents: 82fcc13 Author: Yanbo Liang Authored: Thu Jan 12 00:58:30 2017 -0800 Committer: Yanbo Liang Committed: Thu Jan 12 00:58:49 2017 -0800 -- examples/src/main/r/ml/ml.R | 15 +++ 1 file changed, 7 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0b07634b/examples/src/main/r/ml/ml.R -- diff --git a/examples/src/main/r/ml/ml.R b/examples/src/main/r/ml/ml.R index d601590..05f5199 100644 --- a/examples/src/main/r/ml/ml.R +++ b/examples/src/main/r/ml/ml.R @@ -49,17 +49,16 @@ unlink(modelPath) fit models with spark.lapply # # Perform distributed training of multiple models with spark.lapply -costs <- exp(seq(from = log(1), to = log(1000), length.out = 5)) -train <- function(cost) { - stopifnot(requireNamespace("e1071", quietly = TRUE)) - model <- e1071::svm(Species ~ ., data = iris, cost = cost) - summary(model) +algorithms <- c("Hartigan-Wong", "Lloyd", "MacQueen") +train <- function(algorithm) { + model <- kmeans(x = iris[1:4], centers = 3, algorithm = algorithm) + model$withinss } -model.summaries <- spark.lapply(costs, train) +model.withinss <- spark.lapply(algorithms, train) -# Print the summary of each model -print(model.summaries) +# Print the within-cluster sum of squares for each model +print(model.withinss) # Stop the SparkSession now sparkR.session.stop() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19158][SPARKR][EXAMPLES] Fix ml.R example fails due to lack of e1071 package.
Repository: spark Updated Branches: refs/heads/master 24100f162 -> 2c586f506 [SPARK-19158][SPARKR][EXAMPLES] Fix ml.R example fails due to lack of e1071 package. ## What changes were proposed in this pull request? ```ml.R``` example depends on ```e1071``` package, if it's not available in users' environment, it will fail. I think the example should not depends on third-party packages, so I update it to remove the dependency. ## How was this patch tested? Manual test. Author: Yanbo LiangCloses #16548 from yanboliang/spark-19158. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c586f50 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c586f50 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c586f50 Branch: refs/heads/master Commit: 2c586f506de9e2ba592afae1f0c73b6ae631bb96 Parents: 24100f1 Author: Yanbo Liang Authored: Thu Jan 12 00:58:30 2017 -0800 Committer: Yanbo Liang Committed: Thu Jan 12 00:58:30 2017 -0800 -- examples/src/main/r/ml/ml.R | 15 +++ 1 file changed, 7 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2c586f50/examples/src/main/r/ml/ml.R -- diff --git a/examples/src/main/r/ml/ml.R b/examples/src/main/r/ml/ml.R index d601590..05f5199 100644 --- a/examples/src/main/r/ml/ml.R +++ b/examples/src/main/r/ml/ml.R @@ -49,17 +49,16 @@ unlink(modelPath) fit models with spark.lapply # # Perform distributed training of multiple models with spark.lapply -costs <- exp(seq(from = log(1), to = log(1000), length.out = 5)) -train <- function(cost) { - stopifnot(requireNamespace("e1071", quietly = TRUE)) - model <- e1071::svm(Species ~ ., data = iris, cost = cost) - summary(model) +algorithms <- c("Hartigan-Wong", "Lloyd", "MacQueen") +train <- function(algorithm) { + model <- kmeans(x = iris[1:4], centers = 3, algorithm = algorithm) + model$withinss } -model.summaries <- spark.lapply(costs, train) +model.withinss <- spark.lapply(algorithms, train) -# Print the summary of each model -print(model.summaries) +# Print the within-cluster sum of squares for each model +print(model.withinss) # Stop the SparkSession now sparkR.session.stop() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org