svn commit: r29697 - in /dev/spark/2.4.1-SNAPSHOT-2018_09_25_22_02-3f20305-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Sep 26 05:17:17 2018 New Revision: 29697 Log: Apache Spark 2.4.1-SNAPSHOT-2018_09_25_22_02-3f20305 docs [This commit notification would consist of 1472 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: [MINOR][PYTHON] Use a helper in `PythonUtils` instead of direct accessing Scala package
Repository: spark Updated Branches: refs/heads/branch-2.2 4f10aff40 -> ef3616825 [MINOR][PYTHON] Use a helper in `PythonUtils` instead of direct accessing Scala package ## What changes were proposed in this pull request? This PR proposes to use add a helper in `PythonUtils` instead of direct accessing Scala package. ## How was this patch tested? Jenkins tests. Closes #22483 from HyukjinKwon/minor-refactoring. Authored-by: hyukjinkwon Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ad6693b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ad6693b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ad6693b Branch: refs/heads/branch-2.2 Commit: 8ad6693bd27f3e130fbd51518de880997a1cdcc7 Parents: 4f10aff Author: hyukjinkwon Authored: Fri Sep 21 00:41:42 2018 +0800 Committer: hyukjinkwon Committed: Wed Sep 26 10:50:38 2018 +0800 -- .../src/main/scala/org/apache/spark/api/python/PythonUtils.scala | 4 python/pyspark/context.py| 4 +--- 2 files changed, 5 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8ad6693b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index 27a5e19..cdce371 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -74,4 +74,8 @@ private[spark] object PythonUtils { def toScalaMap[K, V](jm: java.util.Map[K, V]): Map[K, V] = { jm.asScala.toMap } + + def getEncryptionEnabled(sc: JavaSparkContext): Boolean = { +sc.conf.get(org.apache.spark.internal.config.IO_ENCRYPTION_ENABLED) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/8ad6693b/python/pyspark/context.py -- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 68e4c17..171e143 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -192,9 +192,7 @@ class SparkContext(object): # If encryption is enabled, we need to setup a server in the jvm to read broadcast # data via a socket. # scala's mangled names w/ $ in them require special treatment. -encryption_conf = self._jvm.org.apache.spark.internal.config.__getattr__("package$")\ -.__getattr__("MODULE$").IO_ENCRYPTION_ENABLED() -self._encryption_enabled = self._jsc.sc().conf().get(encryption_conf) +self._encryption_enabled = self._jvm.PythonUtils.getEncryptionEnabled(self._jsc) self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python') self.pythonVer = "%d.%d" % sys.version_info[:2] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/2] spark git commit: [SPARKR] Match pyspark features in SparkR communication protocol
[SPARKR] Match pyspark features in SparkR communication protocol Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef361682 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef361682 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef361682 Branch: refs/heads/branch-2.2 Commit: ef36168258b8ad15362312e0562794f4f07322d0 Parents: 8ad6693 Author: hyukjinkwon Authored: Mon Sep 24 19:25:02 2018 +0800 Committer: hyukjinkwon Committed: Wed Sep 26 10:50:46 2018 +0800 -- R/pkg/R/context.R | 43 ++-- R/pkg/tests/fulltests/test_Serde.R | 32 +++ R/pkg/tests/fulltests/test_sparkSQL.R | 12 -- .../scala/org/apache/spark/api/r/RRDD.scala | 33 ++- .../scala/org/apache/spark/api/r/RUtils.scala | 4 ++ 5 files changed, 98 insertions(+), 26 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ef361682/R/pkg/R/context.R -- diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 50856e3..c1a12f5 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -168,18 +168,30 @@ parallelize <- function(sc, coll, numSlices = 1) { # 2-tuples of raws serializedSlices <- lapply(slices, serialize, connection = NULL) - # The PRC backend cannot handle arguments larger than 2GB (INT_MAX) + # The RPC backend cannot handle arguments larger than 2GB (INT_MAX) # If serialized data is safely less than that threshold we send it over the PRC channel. # Otherwise, we write it to a file and send the file name if (objectSize < sizeLimit) { jrdd <- callJStatic("org.apache.spark.api.r.RRDD", "createRDDFromArray", sc, serializedSlices) } else { -fileName <- writeToTempFile(serializedSlices) -jrdd <- tryCatch(callJStatic( -"org.apache.spark.api.r.RRDD", "createRDDFromFile", sc, fileName, as.integer(numSlices)), - finally = { -file.remove(fileName) -}) +if (callJStatic("org.apache.spark.api.r.RUtils", "getEncryptionEnabled", sc)) { + # the length of slices here is the parallelism to use in the jvm's sc.parallelize() + parallelism <- as.integer(numSlices) + jserver <- newJObject("org.apache.spark.api.r.RParallelizeServer", sc, parallelism) + authSecret <- callJMethod(jserver, "secret") + port <- callJMethod(jserver, "port") + conn <- socketConnection(port = port, blocking = TRUE, open = "wb", timeout = 1500) + doServerAuth(conn, authSecret) + writeToConnection(serializedSlices, conn) + jrdd <- callJMethod(jserver, "getResult") +} else { + fileName <- writeToTempFile(serializedSlices) + jrdd <- tryCatch(callJStatic( + "org.apache.spark.api.r.RRDD", "createRDDFromFile", sc, fileName, as.integer(numSlices)), +finally = { + file.remove(fileName) + }) +} } RDD(jrdd, "byte") @@ -195,14 +207,21 @@ getMaxAllocationLimit <- function(sc) { )) } +writeToConnection <- function(serializedSlices, conn) { + tryCatch({ +for (slice in serializedSlices) { + writeBin(as.integer(length(slice)), conn, endian = "big") + writeBin(slice, conn, endian = "big") +} + }, finally = { +close(conn) + }) +} + writeToTempFile <- function(serializedSlices) { fileName <- tempfile() conn <- file(fileName, "wb") - for (slice in serializedSlices) { -writeBin(as.integer(length(slice)), conn, endian = "big") -writeBin(slice, conn, endian = "big") - } - close(conn) + writeToConnection(serializedSlices, conn) fileName } http://git-wip-us.apache.org/repos/asf/spark/blob/ef361682/R/pkg/tests/fulltests/test_Serde.R -- diff --git a/R/pkg/tests/fulltests/test_Serde.R b/R/pkg/tests/fulltests/test_Serde.R index 6bbd201..092f9b8 100644 --- a/R/pkg/tests/fulltests/test_Serde.R +++ b/R/pkg/tests/fulltests/test_Serde.R @@ -77,3 +77,35 @@ test_that("SerDe of list of lists", { }) sparkR.session.stop() + +# Note that this test should be at the end of tests since the configruations used here are not +# specific to sessions, and the Spark context is restarted. +test_that("createDataFrame large objects", { + for (encryptionEnabled in list("true", "false")) { +# To simulate a large object scenario, we set spark.r.maxAllocationLimit to a smaller value +conf <- list(spark.r.maxAllocationLimit = "100", + spark.io.encryption.enabled = encryptionEnabled) + +suppressWarnings(sparkR.session(master = sparkRTestMaster, +sparkConfig = conf, +enableHiveSupport = FALSE)) + +sc <- getSparkContext() +actual <-
svn commit: r29683 - in /dev/spark/2.5.0-SNAPSHOT-2018_09_25_20_02-473d0d8-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Sep 26 03:17:32 2018 New Revision: 29683 Log: Apache Spark 2.5.0-SNAPSHOT-2018_09_25_20_02-473d0d8 docs [This commit notification would consist of 1485 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25514][SQL] Generating pretty JSON by to_json
Repository: spark Updated Branches: refs/heads/master cb77a6689 -> 473d0d862 [SPARK-25514][SQL] Generating pretty JSON by to_json ## What changes were proposed in this pull request? The PR introduces new JSON option `pretty` which allows to turn on `DefaultPrettyPrinter` of `Jackson`'s Json generator. New option is useful in exploring of deep nested columns and in converting of JSON columns in more readable representation (look at the added test). ## How was this patch tested? Added rount trip test which convert an JSON string to pretty representation via `from_json()` and `to_json()`. Closes #22534 from MaxGekk/pretty-json. Lead-authored-by: Maxim Gekk Co-authored-by: Maxim Gekk Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/473d0d86 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/473d0d86 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/473d0d86 Branch: refs/heads/master Commit: 473d0d862de54ec1c7a8f0354fa5e06f3d66e455 Parents: cb77a66 Author: Maxim Gekk Authored: Wed Sep 26 09:52:15 2018 +0800 Committer: hyukjinkwon Committed: Wed Sep 26 09:52:15 2018 +0800 -- R/pkg/R/functions.R | 5 +++-- python/pyspark/sql/functions.py | 4 +++- .../spark/sql/catalyst/json/JSONOptions.scala | 5 + .../sql/catalyst/json/JacksonGenerator.scala| 5 - .../scala/org/apache/spark/sql/functions.scala | 4 .../apache/spark/sql/JsonFunctionsSuite.scala | 21 6 files changed, 40 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/473d0d86/R/pkg/R/functions.R -- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 572dee5..6425c9d 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -198,8 +198,9 @@ NULL #' } #' @param ... additional argument(s). In \code{to_json} and \code{from_json}, this contains #'additional named properties to control how it is converted, accepts the same -#'options as the JSON data source. In \code{arrays_zip}, this contains additional -#'Columns of arrays to be merged. +#'options as the JSON data source. Additionally \code{to_json} supports the "pretty" +#'option which enables pretty JSON generation. In \code{arrays_zip}, this contains +#'additional Columns of arrays to be merged. #' @name column_collection_functions #' @rdname column_collection_functions #' @family collection functions http://git-wip-us.apache.org/repos/asf/spark/blob/473d0d86/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 6da5237..1c3d972 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2295,7 +2295,9 @@ def to_json(col, options={}): into a JSON string. Throws an exception, in the case of an unsupported type. :param col: name of column containing a struct, an array or a map. -:param options: options to control converting. accepts the same options as the JSON datasource +:param options: options to control converting. accepts the same options as the JSON datasource. +Additionally the function supports the `pretty` option which enables +pretty JSON generation. >>> from pyspark.sql import Row >>> from pyspark.sql.types import * http://git-wip-us.apache.org/repos/asf/spark/blob/473d0d86/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 47eeb70..64152e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -113,6 +113,11 @@ private[sql] class JSONOptions( } val lineSeparatorInWrite: String = lineSeparator.getOrElse("\n") + /** + * Generating JSON strings in pretty representation if the parameter is enabled. + */ + val pretty: Boolean = parameters.get("pretty").map(_.toBoolean).getOrElse(false) + /** Sets config options on a Jackson [[JsonFactory]]. */ def setJacksonOptions(factory: JsonFactory): Unit = { factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments)
spark git commit: [SPARK-21291][R] add R partitionBy API in DataFrame
Repository: spark Updated Branches: refs/heads/master 8c2edf46d -> cb77a6689 [SPARK-21291][R] add R partitionBy API in DataFrame ## What changes were proposed in this pull request? add R partitionBy API in write.df I didn't add bucketBy in write.df. The last line of write.df is ``` write <- handledCallJMethod(write, "save") ``` save doesn't support bucketBy right now. ``` assertNotBucketed("save") ``` ## How was this patch tested? Add unit test in test_sparkSQL.R Closes #22537 from huaxingao/spark-21291. Authored-by: Huaxin Gao Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb77a668 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb77a668 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb77a668 Branch: refs/heads/master Commit: cb77a6689137916e64bc5692b0c942e86ca1a0ea Parents: 8c2edf4 Author: Huaxin Gao Authored: Wed Sep 26 09:37:44 2018 +0800 Committer: hyukjinkwon Committed: Wed Sep 26 09:37:44 2018 +0800 -- R/pkg/R/DataFrame.R | 17 +++-- R/pkg/tests/fulltests/test_sparkSQL.R | 8 2 files changed, 23 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cb77a668/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index a1cb478..3469188 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2954,6 +2954,9 @@ setMethod("exceptAll", #' @param source a name for external data source. #' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore' #' save mode (it is 'error' by default) +#' @param partitionBy a name or a list of names of columns to partition the output by on the file +#'system. If specified, the output is laid out on the file system similar +#'to Hive's partitioning scheme. #' @param ... additional argument(s) passed to the method. #' #' @family SparkDataFrame functions @@ -2965,13 +2968,13 @@ setMethod("exceptAll", #' sparkR.session() #' path <- "path/to/file.json" #' df <- read.json(path) -#' write.df(df, "myfile", "parquet", "overwrite") +#' write.df(df, "myfile", "parquet", "overwrite", partitionBy = c("col1", "col2")) #' saveDF(df, parquetPath2, "parquet", mode = "append", mergeSchema = TRUE) #' } #' @note write.df since 1.4.0 setMethod("write.df", signature(df = "SparkDataFrame"), - function(df, path = NULL, source = NULL, mode = "error", ...) { + function(df, path = NULL, source = NULL, mode = "error", partitionBy = NULL, ...) { if (!is.null(path) && !is.character(path)) { stop("path should be character, NULL or omitted.") } @@ -2985,8 +2988,18 @@ setMethod("write.df", if (is.null(source)) { source <- getDefaultSqlSource() } +cols <- NULL +if (!is.null(partitionBy)) { + if (!all(sapply(partitionBy, function(c) is.character(c { +stop("All partitionBy column names should be characters.") + } + cols <- as.list(partitionBy) +} write <- callJMethod(df@sdf, "write") write <- callJMethod(write, "format", source) +if (!is.null(cols)) { + write <- callJMethod(write, "partitionBy", cols) +} write <- setWriteOptions(write, path = path, mode = mode, ...) write <- handledCallJMethod(write, "save") }) http://git-wip-us.apache.org/repos/asf/spark/blob/cb77a668/R/pkg/tests/fulltests/test_sparkSQL.R -- diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index a874bfb..50eff37 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -2701,8 +2701,16 @@ test_that("read/write text files", { expect_equal(colnames(df2), c("value")) expect_equal(count(df2), count(df) * 2) + df3 <- createDataFrame(list(list(1L, "1"), list(2L, "2"), list(1L, "1"), list(2L, "2")), + schema = c("key", "value")) + textPath3 <- tempfile(pattern = "textPath3", fileext = ".txt") + write.df(df3, textPath3, "text", mode = "overwrite", partitionBy = "key") + df4 <- read.df(textPath3, "text") + expect_equal(count(df3), count(df4)) + unlink(textPath) unlink(textPath2) + unlink(textPath3) }) test_that("read/write text files - compression option", { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail:
spark git commit: [SPARK-24324][PYTHON][FOLLOW-UP] Rename the Conf to spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName
Repository: spark Updated Branches: refs/heads/branch-2.4 f91247f81 -> 3f203050a [SPARK-24324][PYTHON][FOLLOW-UP] Rename the Conf to spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName ## What changes were proposed in this pull request? Add the legacy prefix for spark.sql.execution.pandas.groupedMap.assignColumnsByPosition and rename it to spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName ## How was this patch tested? The existing tests. Closes #22540 from gatorsmile/renameAssignColumnsByPosition. Lead-authored-by: gatorsmile Co-authored-by: Hyukjin Kwon Signed-off-by: hyukjinkwon (cherry picked from commit 8c2edf46d0f89e5ec54968218d89f30a3f8190bc) Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f203050 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f203050 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f203050 Branch: refs/heads/branch-2.4 Commit: 3f203050ac764516e68fb43628bba0df5963e44d Parents: f91247f Author: gatorsmile Authored: Wed Sep 26 09:32:51 2018 +0800 Committer: hyukjinkwon Committed: Wed Sep 26 09:33:13 2018 +0800 -- python/pyspark/sql/tests.py | 3 ++- python/pyspark/worker.py | 7 --- .../org/apache/spark/sql/internal/SQLConf.scala | 18 +- .../spark/sql/execution/arrow/ArrowUtils.scala| 9 +++-- 4 files changed, 18 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3f203050/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 9fa1577..cb186de 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -5799,7 +5799,8 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase): import pandas as pd from pyspark.sql.functions import pandas_udf, PandasUDFType -with self.sql_conf({"spark.sql.execution.pandas.groupedMap.assignColumnsByPosition": True}): +with self.sql_conf({ + "spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName": False}): @pandas_udf("a string, b float", PandasUDFType.GROUPED_MAP) def foo(_): http://git-wip-us.apache.org/repos/asf/spark/blob/3f203050/python/pyspark/worker.py -- diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 974344f..8c59f1f 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -97,8 +97,9 @@ def wrap_scalar_pandas_udf(f, return_type): def wrap_grouped_map_pandas_udf(f, return_type, argspec, runner_conf): -assign_cols_by_pos = runner_conf.get( -"spark.sql.execution.pandas.groupedMap.assignColumnsByPosition", False) +assign_cols_by_name = runner_conf.get( +"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName", "true") +assign_cols_by_name = assign_cols_by_name.lower() == "true" def wrapped(key_series, value_series): import pandas as pd @@ -119,7 +120,7 @@ def wrap_grouped_map_pandas_udf(f, return_type, argspec, runner_conf): "Expected: {} Actual: {}".format(len(return_type), len(result.columns))) # Assign result columns by schema name if user labeled with strings, else use position -if not assign_cols_by_pos and any(isinstance(name, basestring) for name in result.columns): +if assign_cols_by_name and any(isinstance(name, basestring) for name in result.columns): return [(result[field.name], to_arrow_type(field.dataType)) for field in return_type] else: return [(result[result.columns[i]], to_arrow_type(field.dataType)) http://git-wip-us.apache.org/repos/asf/spark/blob/3f203050/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2788402..68daf9d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1275,15 +1275,15 @@ object SQLConf { .booleanConf .createWithDefault(true) - val PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION = -buildConf("spark.sql.execution.pandas.groupedMap.assignColumnsByPosition") + val PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_NAME = + buildConf("spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName") .internal() - .doc("When true, a grouped map
spark git commit: [SPARK-24324][PYTHON][FOLLOW-UP] Rename the Conf to spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName
Repository: spark Updated Branches: refs/heads/master 9bb3a0c67 -> 8c2edf46d [SPARK-24324][PYTHON][FOLLOW-UP] Rename the Conf to spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName ## What changes were proposed in this pull request? Add the legacy prefix for spark.sql.execution.pandas.groupedMap.assignColumnsByPosition and rename it to spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName ## How was this patch tested? The existing tests. Closes #22540 from gatorsmile/renameAssignColumnsByPosition. Lead-authored-by: gatorsmile Co-authored-by: Hyukjin Kwon Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8c2edf46 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8c2edf46 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8c2edf46 Branch: refs/heads/master Commit: 8c2edf46d0f89e5ec54968218d89f30a3f8190bc Parents: 9bb3a0c Author: gatorsmile Authored: Wed Sep 26 09:32:51 2018 +0800 Committer: hyukjinkwon Committed: Wed Sep 26 09:32:51 2018 +0800 -- python/pyspark/sql/tests.py | 3 ++- python/pyspark/worker.py | 7 --- .../org/apache/spark/sql/internal/SQLConf.scala | 18 +- .../spark/sql/execution/arrow/ArrowUtils.scala| 9 +++-- 4 files changed, 18 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8c2edf46/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index b829bae..74642d4 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -5802,7 +5802,8 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase): import pandas as pd from pyspark.sql.functions import pandas_udf, PandasUDFType -with self.sql_conf({"spark.sql.execution.pandas.groupedMap.assignColumnsByPosition": True}): +with self.sql_conf({ + "spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName": False}): @pandas_udf("a string, b float", PandasUDFType.GROUPED_MAP) def foo(_): http://git-wip-us.apache.org/repos/asf/spark/blob/8c2edf46/python/pyspark/worker.py -- diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 974344f..8c59f1f 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -97,8 +97,9 @@ def wrap_scalar_pandas_udf(f, return_type): def wrap_grouped_map_pandas_udf(f, return_type, argspec, runner_conf): -assign_cols_by_pos = runner_conf.get( -"spark.sql.execution.pandas.groupedMap.assignColumnsByPosition", False) +assign_cols_by_name = runner_conf.get( +"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName", "true") +assign_cols_by_name = assign_cols_by_name.lower() == "true" def wrapped(key_series, value_series): import pandas as pd @@ -119,7 +120,7 @@ def wrap_grouped_map_pandas_udf(f, return_type, argspec, runner_conf): "Expected: {} Actual: {}".format(len(return_type), len(result.columns))) # Assign result columns by schema name if user labeled with strings, else use position -if not assign_cols_by_pos and any(isinstance(name, basestring) for name in result.columns): +if assign_cols_by_name and any(isinstance(name, basestring) for name in result.columns): return [(result[field.name], to_arrow_type(field.dataType)) for field in return_type] else: return [(result[result.columns[i]], to_arrow_type(field.dataType)) http://git-wip-us.apache.org/repos/asf/spark/blob/8c2edf46/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0e0a01d..e7c9a83 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1295,15 +1295,15 @@ object SQLConf { .booleanConf .createWithDefault(true) - val PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION = -buildConf("spark.sql.execution.pandas.groupedMap.assignColumnsByPosition") + val PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_NAME = + buildConf("spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName") .internal() - .doc("When true, a grouped map Pandas UDF will assign columns from the returned " + -"Pandas DataFrame based on position,
svn commit: r29678 - in /dev/spark/2.4.1-SNAPSHOT-2018_09_25_18_02-f91247f-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Sep 26 01:16:56 2018 New Revision: 29678 Log: Apache Spark 2.4.1-SNAPSHOT-2018_09_25_18_02-f91247f docs [This commit notification would consist of 1472 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25422][CORE] Don't memory map blocks streamed to disk.
Repository: spark Updated Branches: refs/heads/branch-2.4 544f86a69 -> f91247f81 [SPARK-25422][CORE] Don't memory map blocks streamed to disk. After data has been streamed to disk, the buffers are inserted into the memory store in some cases (eg., with broadcast blocks). But broadcast code also disposes of those buffers when the data has been read, to ensure that we don't leave mapped buffers using up memory, which then leads to garbage data in the memory store. ## How was this patch tested? Ran the old failing test in a loop. Full tests on jenkins Closes #22546 from squito/SPARK-25422-master. Authored-by: Imran Rashid Signed-off-by: Wenchen Fan (cherry picked from commit 9bb3a0c67bd851b09ff4701ef1d280e2a77d791b) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f91247f8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f91247f8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f91247f8 Branch: refs/heads/branch-2.4 Commit: f91247f812f87daa9fe4ec23b100f2310254df22 Parents: 544f86a Author: Imran Rashid Authored: Wed Sep 26 08:45:27 2018 +0800 Committer: Wenchen Fan Committed: Wed Sep 26 08:45:56 2018 +0800 -- .../org/apache/spark/storage/BlockManager.scala | 13 +++--- .../spark/util/io/ChunkedByteBuffer.scala | 47 +++- 2 files changed, 31 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f91247f8/core/src/main/scala/org/apache/spark/storage/BlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 2234146..0fe82ac 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -438,10 +438,8 @@ private[spark] class BlockManager( // stream. channel.close() // TODO SPARK-25035 Even if we're only going to write the data to disk after this, we end up -// using a lot of memory here. With encryption, we'll read the whole file into a regular -// byte buffer and OOM. Without encryption, we'll memory map the file and won't get a jvm -// OOM, but might get killed by the OS / cluster manager. We could at least read the tmp -// file as a stream in both cases. +// using a lot of memory here. We'll read the whole file into a regular +// byte buffer and OOM. We could at least read the tmp file as a stream. val buffer = securityManager.getIOEncryptionKey() match { case Some(key) => // we need to pass in the size of the unencrypted block @@ -453,7 +451,7 @@ private[spark] class BlockManager( new EncryptedBlockData(tmpFile, blockSize, conf, key).toChunkedByteBuffer(allocator) case None => -ChunkedByteBuffer.map(tmpFile, conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt) +ChunkedByteBuffer.fromFile(tmpFile, conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt) } putBytes(blockId, buffer, level)(classTag) tmpFile.delete() @@ -726,10 +724,9 @@ private[spark] class BlockManager( */ def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { // TODO if we change this method to return the ManagedBuffer, then getRemoteValues -// could just use the inputStream on the temp file, rather than memory-mapping the file. +// could just use the inputStream on the temp file, rather than reading the file into memory. // Until then, replication can cause the process to use too much memory and get killed -// by the OS / cluster manager (not a java OOM, since it's a memory-mapped file) even though -// we've read the data to disk. +// even though we've read the data to disk. logDebug(s"Getting remote block $blockId") require(blockId != null, "BlockId is null") var runningFailureCount = 0 http://git-wip-us.apache.org/repos/asf/spark/blob/f91247f8/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 39f050f..4aa8d45 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -19,17 +19,16 @@ package org.apache.spark.util.io import java.io.{File, FileInputStream, InputStream} import java.nio.ByteBuffer -import java.nio.channels.{FileChannel, WritableByteChannel}
spark git commit: [SPARK-25422][CORE] Don't memory map blocks streamed to disk.
Repository: spark Updated Branches: refs/heads/master 66d29870c -> 9bb3a0c67 [SPARK-25422][CORE] Don't memory map blocks streamed to disk. After data has been streamed to disk, the buffers are inserted into the memory store in some cases (eg., with broadcast blocks). But broadcast code also disposes of those buffers when the data has been read, to ensure that we don't leave mapped buffers using up memory, which then leads to garbage data in the memory store. ## How was this patch tested? Ran the old failing test in a loop. Full tests on jenkins Closes #22546 from squito/SPARK-25422-master. Authored-by: Imran Rashid Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9bb3a0c6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9bb3a0c6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9bb3a0c6 Branch: refs/heads/master Commit: 9bb3a0c67bd851b09ff4701ef1d280e2a77d791b Parents: 66d2987 Author: Imran Rashid Authored: Wed Sep 26 08:45:27 2018 +0800 Committer: Wenchen Fan Committed: Wed Sep 26 08:45:27 2018 +0800 -- .../org/apache/spark/storage/BlockManager.scala | 13 +++--- .../spark/util/io/ChunkedByteBuffer.scala | 47 +++- 2 files changed, 31 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9bb3a0c6/core/src/main/scala/org/apache/spark/storage/BlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 2234146..0fe82ac 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -438,10 +438,8 @@ private[spark] class BlockManager( // stream. channel.close() // TODO SPARK-25035 Even if we're only going to write the data to disk after this, we end up -// using a lot of memory here. With encryption, we'll read the whole file into a regular -// byte buffer and OOM. Without encryption, we'll memory map the file and won't get a jvm -// OOM, but might get killed by the OS / cluster manager. We could at least read the tmp -// file as a stream in both cases. +// using a lot of memory here. We'll read the whole file into a regular +// byte buffer and OOM. We could at least read the tmp file as a stream. val buffer = securityManager.getIOEncryptionKey() match { case Some(key) => // we need to pass in the size of the unencrypted block @@ -453,7 +451,7 @@ private[spark] class BlockManager( new EncryptedBlockData(tmpFile, blockSize, conf, key).toChunkedByteBuffer(allocator) case None => -ChunkedByteBuffer.map(tmpFile, conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt) +ChunkedByteBuffer.fromFile(tmpFile, conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt) } putBytes(blockId, buffer, level)(classTag) tmpFile.delete() @@ -726,10 +724,9 @@ private[spark] class BlockManager( */ def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { // TODO if we change this method to return the ManagedBuffer, then getRemoteValues -// could just use the inputStream on the temp file, rather than memory-mapping the file. +// could just use the inputStream on the temp file, rather than reading the file into memory. // Until then, replication can cause the process to use too much memory and get killed -// by the OS / cluster manager (not a java OOM, since it's a memory-mapped file) even though -// we've read the data to disk. +// even though we've read the data to disk. logDebug(s"Getting remote block $blockId") require(blockId != null, "BlockId is null") var runningFailureCount = 0 http://git-wip-us.apache.org/repos/asf/spark/blob/9bb3a0c6/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 39f050f..4aa8d45 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -19,17 +19,16 @@ package org.apache.spark.util.io import java.io.{File, FileInputStream, InputStream} import java.nio.ByteBuffer -import java.nio.channels.{FileChannel, WritableByteChannel} -import java.nio.file.StandardOpenOption - -import scala.collection.mutable.ListBuffer +import
svn commit: r29675 - in /dev/spark/2.4.1-SNAPSHOT-2018_09_25_14_02-544f86a-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Sep 25 21:17:02 2018 New Revision: 29675 Log: Apache Spark 2.4.1-SNAPSHOT-2018_09_25_14_02-544f86a docs [This commit notification would consist of 1472 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r29672 - in /dev/spark/2.5.0-SNAPSHOT-2018_09_25_12_02-66d2987-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Sep 25 19:17:33 2018 New Revision: 29672 Log: Apache Spark 2.5.0-SNAPSHOT-2018_09_25_12_02-66d2987 docs [This commit notification would consist of 1485 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25495][SS] FetchedData.reset should reset all fields
Repository: spark Updated Branches: refs/heads/branch-2.4 a709718da -> 544f86a69 [SPARK-25495][SS] FetchedData.reset should reset all fields ## What changes were proposed in this pull request? `FetchedData.reset` should reset `_nextOffsetInFetchedData` and `_offsetAfterPoll`. Otherwise it will cause inconsistent cached data and may make Kafka connector return wrong results. ## How was this patch tested? The new unit test. Closes #22507 from zsxwing/fix-kafka-reset. Lead-authored-by: Shixiong Zhu Co-authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu (cherry picked from commit 66d29870c09e6050dd846336e596faaa8b0d14ad) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/544f86a6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/544f86a6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/544f86a6 Branch: refs/heads/branch-2.4 Commit: 544f86a69bba94dfcb241e41c799ed63ef4210fc Parents: a709718 Author: Shixiong Zhu Authored: Tue Sep 25 11:42:27 2018 -0700 Committer: Shixiong Zhu Committed: Tue Sep 25 11:42:39 2018 -0700 -- .../spark/sql/kafka010/KafkaDataConsumer.scala | 5 +- .../kafka010/KafkaMicroBatchSourceSuite.scala | 52 2 files changed, 56 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/544f86a6/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index ceb9e31..7b1314b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -134,6 +134,8 @@ private[kafka010] case class InternalKafkaConsumer( /** Reset the internal pre-fetched data. */ def reset(): Unit = { _records = ju.Collections.emptyListIterator() + _nextOffsetInFetchedData = UNKNOWN_OFFSET + _offsetAfterPoll = UNKNOWN_OFFSET } /** @@ -361,8 +363,9 @@ private[kafka010] case class InternalKafkaConsumer( if (offset < fetchedData.offsetAfterPoll) { // Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. Return a record to ask // the next call to start from `fetchedData.offsetAfterPoll`. +val nextOffsetToFetch = fetchedData.offsetAfterPoll fetchedData.reset() -return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll) +return fetchedRecord.withRecord(null, nextOffsetToFetch) } else { // Fetch records from Kafka and update `fetchedData`. fetchData(offset, pollTimeoutMs) http://git-wip-us.apache.org/repos/asf/spark/blob/544f86a6/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 65615fd..e0b6d8c 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -853,6 +853,58 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { ) } } + + test("SPARK-25495: FetchedData.reset should reset all fields") { +val topic = newTopic() +val topicPartition = new TopicPartition(topic, 0) +testUtils.createTopic(topic, partitions = 1) + +val ds = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.isolation.level", "read_committed") + .option("subscribe", topic) + .option("startingOffsets", "earliest") + .load() + .select($"value".as[String]) + +testUtils.withTranscationalProducer { producer => + producer.beginTransaction() + (0 to 3).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() +} +testUtils.waitUntilOffsetAppears(topicPartition, 5) + +val q = ds.writeStream.foreachBatch { (ds, epochId) => + if (epochId == 0) { +// Send more message before the tasks of the current batch start reading the current
spark git commit: [SPARK-25495][SS] FetchedData.reset should reset all fields
Repository: spark Updated Branches: refs/heads/master 04db03537 -> 66d29870c [SPARK-25495][SS] FetchedData.reset should reset all fields ## What changes were proposed in this pull request? `FetchedData.reset` should reset `_nextOffsetInFetchedData` and `_offsetAfterPoll`. Otherwise it will cause inconsistent cached data and may make Kafka connector return wrong results. ## How was this patch tested? The new unit test. Closes #22507 from zsxwing/fix-kafka-reset. Lead-authored-by: Shixiong Zhu Co-authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/66d29870 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/66d29870 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/66d29870 Branch: refs/heads/master Commit: 66d29870c09e6050dd846336e596faaa8b0d14ad Parents: 04db035 Author: Shixiong Zhu Authored: Tue Sep 25 11:42:27 2018 -0700 Committer: Shixiong Zhu Committed: Tue Sep 25 11:42:27 2018 -0700 -- .../spark/sql/kafka010/KafkaDataConsumer.scala | 5 +- .../kafka010/KafkaMicroBatchSourceSuite.scala | 52 2 files changed, 56 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/66d29870/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index ceb9e31..7b1314b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -134,6 +134,8 @@ private[kafka010] case class InternalKafkaConsumer( /** Reset the internal pre-fetched data. */ def reset(): Unit = { _records = ju.Collections.emptyListIterator() + _nextOffsetInFetchedData = UNKNOWN_OFFSET + _offsetAfterPoll = UNKNOWN_OFFSET } /** @@ -361,8 +363,9 @@ private[kafka010] case class InternalKafkaConsumer( if (offset < fetchedData.offsetAfterPoll) { // Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. Return a record to ask // the next call to start from `fetchedData.offsetAfterPoll`. +val nextOffsetToFetch = fetchedData.offsetAfterPoll fetchedData.reset() -return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll) +return fetchedRecord.withRecord(null, nextOffsetToFetch) } else { // Fetch records from Kafka and update `fetchedData`. fetchData(offset, pollTimeoutMs) http://git-wip-us.apache.org/repos/asf/spark/blob/66d29870/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index e5f0088..39c2cde 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -874,6 +874,58 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { ) } } + + test("SPARK-25495: FetchedData.reset should reset all fields") { +val topic = newTopic() +val topicPartition = new TopicPartition(topic, 0) +testUtils.createTopic(topic, partitions = 1) + +val ds = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.isolation.level", "read_committed") + .option("subscribe", topic) + .option("startingOffsets", "earliest") + .load() + .select($"value".as[String]) + +testUtils.withTranscationalProducer { producer => + producer.beginTransaction() + (0 to 3).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() +} +testUtils.waitUntilOffsetAppears(topicPartition, 5) + +val q = ds.writeStream.foreachBatch { (ds, epochId) => + if (epochId == 0) { +// Send more message before the tasks of the current batch start reading the current batch +// data, so that the executors will prefetch messages in the next batch and drop them. In +
spark git commit: [SPARK-25486][TEST] Refactor SortBenchmark to use main method
Repository: spark Updated Branches: refs/heads/master 9cbd001e2 -> 04db03537 [SPARK-25486][TEST] Refactor SortBenchmark to use main method ## What changes were proposed in this pull request? Refactor SortBenchmark to use main method. Generate benchmark result: ``` SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.SortBenchmark" ``` ## How was this patch tested? manual tests Closes #22495 from yucai/SPARK-25486. Authored-by: yucai Signed-off-by: Dongjoon Hyun Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04db0353 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04db0353 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04db0353 Branch: refs/heads/master Commit: 04db035378012907c93f6e5b4faa6ec11f1fc67b Parents: 9cbd001 Author: yucai Authored: Tue Sep 25 11:13:05 2018 -0700 Committer: Dongjoon Hyun Committed: Tue Sep 25 11:13:05 2018 -0700 -- sql/core/benchmarks/SortBenchmark-results.txt | 17 + .../sql/execution/benchmark/SortBenchmark.scala | 38 +--- 2 files changed, 33 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/04db0353/sql/core/benchmarks/SortBenchmark-results.txt -- diff --git a/sql/core/benchmarks/SortBenchmark-results.txt b/sql/core/benchmarks/SortBenchmark-results.txt new file mode 100644 index 000..0d00a0c --- /dev/null +++ b/sql/core/benchmarks/SortBenchmark-results.txt @@ -0,0 +1,17 @@ + +radix sort + + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_162-b12 on Mac OS X 10.13.6 +Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz + +radix sort 2500: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + +reference TimSort key prefix array 11770 / 11960 2.1 470.8 1.0X +reference Arrays.sort 2106 / 2128 11.9 84.3 5.6X +radix sort one byte 93 / 100269.7 3.7 126.9X +radix sort two bytes 171 / 179146.0 6.9 68.7X +radix sort eight bytes 659 / 664 37.9 26.4 17.9X +radix sort key prefix array 1024 / 1053 24.4 41.0 11.5X + + http://git-wip-us.apache.org/repos/asf/spark/blob/04db0353/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala index 17619ec..958a064 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.benchmark import java.util.{Arrays, Comparator} -import org.apache.spark.benchmark.Benchmark +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} import org.apache.spark.unsafe.array.LongArray import org.apache.spark.unsafe.memory.MemoryBlock import org.apache.spark.util.collection.Sorter @@ -28,12 +28,15 @@ import org.apache.spark.util.random.XORShiftRandom /** * Benchmark to measure performance for aggregate primitives. - * To run this: - * build/sbt "sql/test-only *benchmark.SortBenchmark" - * - * Benchmarks in this file are skipped in normal builds. + * {{{ + * To run this benchmark: + * 1. without sbt: bin/spark-submit --class + * 2. build/sbt "sql/test:runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " + * Results will be written to "benchmarks/-results.txt". + * }}} */ -class SortBenchmark extends BenchmarkWithCodegen { +object SortBenchmark extends BenchmarkBase { private def referenceKeyPrefixSort(buf: LongArray, lo: Int, hi: Int, refCmp: PrefixComparator) { val sortBuffer = new LongArray(MemoryBlock.fromLongArray(new Array[Long](buf.size().toInt))) @@ -54,10 +57,10 @@ class SortBenchmark extends BenchmarkWithCodegen { new LongArray(MemoryBlock.fromLongArray(extended))) } - ignore("sort") { + def sortBenchmark(): Unit = { val size = 2500 val rand = new XORShiftRandom(123) -val benchmark = new
[3/3] spark git commit: [PYSPARK][SQL] Updates to RowQueue
[PYSPARK][SQL] Updates to RowQueue Tested with updates to RowQueueSuite (cherry picked from commit 6d742d1bd71aa3803dce91a830b37284cb18cf70) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f10aff4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f10aff4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f10aff4 Branch: refs/heads/branch-2.2 Commit: 4f10aff403ccc8287a816cb94ddf7f11e185907a Parents: dd0e7cf Author: Imran Rashid Authored: Thu Sep 6 12:11:47 2018 -0500 Committer: Imran Rashid Committed: Tue Sep 25 11:46:06 2018 -0500 -- .../spark/sql/execution/python/RowQueue.scala | 27 ++- .../sql/execution/python/RowQueueSuite.scala| 28 +++- 2 files changed, 41 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4f10aff4/sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala index cd1e77f..4d6820c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala @@ -21,9 +21,10 @@ import java.io._ import com.google.common.io.Closeables -import org.apache.spark.SparkException +import org.apache.spark.{SparkEnv, SparkException} import org.apache.spark.io.NioBufferedFileInputStream import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager} +import org.apache.spark.serializer.SerializerManager import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.memory.MemoryBlock @@ -108,9 +109,13 @@ private[python] abstract class InMemoryRowQueue(val page: MemoryBlock, numFields * A RowQueue that is backed by a file on disk. This queue will stop accepting new rows once any * reader has begun reading from the queue. */ -private[python] case class DiskRowQueue(file: File, fields: Int) extends RowQueue { - private var out = new DataOutputStream( -new BufferedOutputStream(new FileOutputStream(file.toString))) +private[python] case class DiskRowQueue( +file: File, +fields: Int, +serMgr: SerializerManager) extends RowQueue { + + private var out = new DataOutputStream(serMgr.wrapForEncryption( +new BufferedOutputStream(new FileOutputStream(file.toString private var unreadBytes = 0L private var in: DataInputStream = _ @@ -131,7 +136,8 @@ private[python] case class DiskRowQueue(file: File, fields: Int) extends RowQueu if (out != null) { out.close() out = null - in = new DataInputStream(new NioBufferedFileInputStream(file)) + in = new DataInputStream(serMgr.wrapForEncryption( +new NioBufferedFileInputStream(file))) } if (unreadBytes > 0) { @@ -166,7 +172,8 @@ private[python] case class DiskRowQueue(file: File, fields: Int) extends RowQueu private[python] case class HybridRowQueue( memManager: TaskMemoryManager, tempDir: File, -numFields: Int) +numFields: Int, +serMgr: SerializerManager) extends MemoryConsumer(memManager) with RowQueue { // Each buffer should have at least one row @@ -212,7 +219,7 @@ private[python] case class HybridRowQueue( } private def createDiskQueue(): RowQueue = { -DiskRowQueue(File.createTempFile("buffer", "", tempDir), numFields) +DiskRowQueue(File.createTempFile("buffer", "", tempDir), numFields, serMgr) } private def createNewQueue(required: Long): RowQueue = { @@ -279,3 +286,9 @@ private[python] case class HybridRowQueue( } } } + +private[python] object HybridRowQueue { + def apply(taskMemoryMgr: TaskMemoryManager, file: File, fields: Int): HybridRowQueue = { +HybridRowQueue(taskMemoryMgr, file, fields, SparkEnv.get.serializerManager) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/4f10aff4/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala index ffda33c..1ec9986 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala @@ -20,12 +20,15 @@ package org.apache.spark.sql.execution.python import java.io.File import org.apache.spark.{SparkConf, SparkFunSuite} -import
[1/3] spark git commit: [SPARK-25253][PYSPARK] Refactor local connection & auth code
Repository: spark Updated Branches: refs/heads/branch-2.2 bd12eb75d -> 4f10aff40 [SPARK-25253][PYSPARK] Refactor local connection & auth code This eliminates some duplication in the code to connect to a server on localhost to talk directly to the jvm. Also it gives consistent ipv6 and error handling. Two other incidental changes, that shouldn't matter: 1) python barrier tasks perform authentication immediately (rather than waiting for the BARRIER_FUNCTION indicator) 2) for `rdd._load_from_socket`, the timeout is only increased after authentication. Closes #22247 from squito/py_connection_refactor. Authored-by: Imran Rashid Signed-off-by: hyukjinkwon (cherry picked from commit 38391c9aa8a88fcebb337934f30298a32d91596b) (cherry picked from commit a2a54a5f49364a1825932c9f04eb0ff82dd7d465) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc1c4e7d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc1c4e7d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc1c4e7d Branch: refs/heads/branch-2.2 Commit: fc1c4e7d24f7d0afb3b79d66aa9812e7dddc2f38 Parents: bd12eb7 Author: Imran Rashid Authored: Wed Aug 29 09:47:38 2018 +0800 Committer: Imran Rashid Committed: Tue Sep 25 11:45:59 2018 -0500 -- python/pyspark/java_gateway.py | 32 +++- python/pyspark/rdd.py | 24 ++-- python/pyspark/worker.py | 7 ++- 3 files changed, 35 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fc1c4e7d/python/pyspark/java_gateway.py -- diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 7abf2c1..191dfce 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -133,7 +133,7 @@ def launch_gateway(conf=None): return gateway -def do_server_auth(conn, auth_secret): +def _do_server_auth(conn, auth_secret): """ Performs the authentication protocol defined by the SocketAuthHelper class on the given file-like object 'conn'. @@ -144,3 +144,33 @@ def do_server_auth(conn, auth_secret): if reply != "ok": conn.close() raise Exception("Unexpected reply from iterator server.") + + +def local_connect_and_auth(port, auth_secret): +""" +Connect to local host, authenticate with it, and return a (sockfile,sock) for that connection. +Handles IPV4 & IPV6, does some error handling. +:param port +:param auth_secret +:return: a tuple with (sockfile, sock) +""" +sock = None +errors = [] +# Support for both IPv4 and IPv6. +# On most of IPv6-ready systems, IPv6 will take precedence. +for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM): +af, socktype, proto, _, sa = res +try: +sock = socket.socket(af, socktype, proto) +sock.settimeout(15) +sock.connect(sa) +sockfile = sock.makefile("rwb", 65536) +_do_server_auth(sockfile, auth_secret) +return (sockfile, sock) +except socket.error as e: +emsg = _exception_message(e) +errors.append("tried to connect to %s, but an error occured: %s" % (sa, emsg)) +sock.close() +sock = None +else: +raise Exception("could not open socket: %s" % errors) http://git-wip-us.apache.org/repos/asf/spark/blob/fc1c4e7d/python/pyspark/rdd.py -- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 864cebb..7d84cbd 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -39,7 +39,7 @@ if sys.version > '3': else: from itertools import imap as map, ifilter as filter -from pyspark.java_gateway import do_server_auth +from pyspark.java_gateway import local_connect_and_auth from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ PickleSerializer, pack_long, AutoBatchedSerializer, write_with_length, \ @@ -122,30 +122,10 @@ def _parse_memory(s): def _load_from_socket(sock_info, serializer): -port, auth_secret = sock_info -sock = None -# Support for both IPv4 and IPv6. -# On most of IPv6-ready systems, IPv6 will take precedence. -for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM): -af, socktype, proto, canonname, sa = res -sock = socket.socket(af, socktype, proto) -try: -sock.settimeout(15) -sock.connect(sa) -except socket.error: -sock.close() -sock = None -continue -break -
[2/3] spark git commit: [PYSPARK] Updates to pyspark broadcast
[PYSPARK] Updates to pyspark broadcast (cherry picked from commit 09dd34cb1706f2477a89174d6a1a0f17ed5b0a65) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd0e7cf5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd0e7cf5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd0e7cf5 Branch: refs/heads/branch-2.2 Commit: dd0e7cf5287148618404593ca095dd900b6e993f Parents: fc1c4e7 Author: Imran Rashid Authored: Mon Aug 13 21:35:34 2018 -0500 Committer: Imran Rashid Committed: Tue Sep 25 11:46:03 2018 -0500 -- .../org/apache/spark/api/python/PythonRDD.scala | 349 --- .../spark/api/python/PythonRDDSuite.scala | 23 +- dev/sparktestsupport/modules.py | 2 + python/pyspark/broadcast.py | 58 ++- python/pyspark/context.py | 63 +++- python/pyspark/serializers.py | 58 +++ python/pyspark/test_broadcast.py| 126 +++ python/pyspark/test_serializers.py | 90 + python/pyspark/worker.py| 24 +- 9 files changed, 705 insertions(+), 88 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dd0e7cf5/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 7b5a179..2f4e3bc 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -24,8 +24,10 @@ import java.util.{ArrayList => JArrayList, List => JList, Map => JMap} import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.concurrent.Promise +import scala.concurrent.duration.Duration import scala.language.existentials -import scala.util.control.NonFatal +import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.compress.CompressionCodec @@ -37,6 +39,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.input.PortableDataStream import org.apache.spark.internal.Logging +import org.apache.spark.network.util.JavaUtils import org.apache.spark.rdd.RDD import org.apache.spark.security.SocketAuthHelper import org.apache.spark.util._ @@ -293,19 +296,51 @@ private[spark] class PythonRunner( val newBids = broadcastVars.map(_.id).toSet // number of different broadcasts val toRemove = oldBids.diff(newBids) -val cnt = toRemove.size + newBids.diff(oldBids).size +val addedBids = newBids.diff(oldBids) +val cnt = toRemove.size + addedBids.size +val needsDecryptionServer = env.serializerManager.encryptionEnabled && addedBids.nonEmpty +dataOut.writeBoolean(needsDecryptionServer) dataOut.writeInt(cnt) -for (bid <- toRemove) { - // remove the broadcast from worker - dataOut.writeLong(- bid - 1) // bid >= 0 - oldBids.remove(bid) +def sendBidsToRemove(): Unit = { + for (bid <- toRemove) { +// remove the broadcast from worker +dataOut.writeLong(-bid - 1) // bid >= 0 +oldBids.remove(bid) + } } -for (broadcast <- broadcastVars) { - if (!oldBids.contains(broadcast.id)) { +if (needsDecryptionServer) { + // if there is encryption, we setup a server which reads the encrypted files, and sends +// the decrypted data to python + val idsAndFiles = broadcastVars.flatMap { broadcast => + if (oldBids.contains(broadcast.id)) { + None +} else { + Some((broadcast.id, broadcast.value.path)) +} +} + val server = new EncryptedPythonBroadcastServer(env, idsAndFiles) + dataOut.writeInt(server.port) + logTrace(s"broadcast decryption server setup on ${server.port}") + PythonRDD.writeUTF(server.secret, dataOut) + sendBidsToRemove() + idsAndFiles.foreach { case (id, _) => // send new broadcast -dataOut.writeLong(broadcast.id) -PythonRDD.writeUTF(broadcast.value.path, dataOut) -oldBids.add(broadcast.id) +dataOut.writeLong(id) +oldBids.add(id) + } + dataOut.flush() + logTrace("waiting for python to read decrypted broadcast data from server") + server.waitTillBroadcastDataSent() + logTrace("done sending decrypted data to python") +} else { +
svn commit: r29666 - in /dev/spark/2.5.0-SNAPSHOT-2018_09_25_08_02-9cbd001-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Sep 25 15:17:19 2018 New Revision: 29666 Log: Apache Spark 2.5.0-SNAPSHOT-2018_09_25_08_02-9cbd001 docs [This commit notification would consist of 1485 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r29662 - in /dev/spark/2.4.1-SNAPSHOT-2018_09_25_06_02-a709718-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Sep 25 13:17:02 2018 New Revision: 29662 Log: Apache Spark 2.4.1-SNAPSHOT-2018_09_25_06_02-a709718 docs [This commit notification would consist of 1472 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23907][SQL] Revert regr_* functions entirely
Repository: spark Updated Branches: refs/heads/branch-2.4 4ca4ef7b9 -> a709718da [SPARK-23907][SQL] Revert regr_* functions entirely ## What changes were proposed in this pull request? This patch reverts entirely all the regr_* functions added in SPARK-23907. These were added by mgaido91 (and proposed by gatorsmile) to improve compatibility with other database systems, without any actual use cases. However, they are very rarely used, and in Spark there are much better ways to compute these functions, due to Spark's flexibility in exposing real programming APIs. I'm going through all the APIs added in Spark 2.4 and I think we should revert these. If there are strong enough demands and more use cases, we can add them back in the future pretty easily. ## How was this patch tested? Reverted test cases also. Closes #22541 from rxin/SPARK-23907. Authored-by: Reynold Xin Signed-off-by: hyukjinkwon (cherry picked from commit 9cbd001e2476cd06aa0bcfcc77a21a9077d5797a) Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a709718d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a709718d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a709718d Branch: refs/heads/branch-2.4 Commit: a709718dae495725af4e961b1e0f85bce5d34368 Parents: 4ca4ef7 Author: Reynold Xin Authored: Tue Sep 25 20:13:07 2018 +0800 Committer: hyukjinkwon Committed: Tue Sep 25 20:13:22 2018 +0800 -- .../catalyst/analysis/FunctionRegistry.scala| 9 - .../expressions/aggregate/regression.scala | 190 --- .../sql-tests/inputs/udaf-regrfunctions.sql | 56 -- .../results/udaf-regrfunctions.sql.out | 93 - 4 files changed, 348 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a709718d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 77860e1..695267a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -299,15 +299,6 @@ object FunctionRegistry { expression[CollectList]("collect_list"), expression[CollectSet]("collect_set"), expression[CountMinSketchAgg]("count_min_sketch"), -expression[RegrCount]("regr_count"), -expression[RegrSXX]("regr_sxx"), -expression[RegrSYY]("regr_syy"), -expression[RegrAvgX]("regr_avgx"), -expression[RegrAvgY]("regr_avgy"), -expression[RegrSXY]("regr_sxy"), -expression[RegrSlope]("regr_slope"), -expression[RegrR2]("regr_r2"), -expression[RegrIntercept]("regr_intercept"), // string functions expression[Ascii]("ascii"), http://git-wip-us.apache.org/repos/asf/spark/blob/a709718d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/regression.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/regression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/regression.scala deleted file mode 100644 index d8f4505..000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/regression.scala +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.expressions.aggregate - -import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{AbstractDataType, DoubleType} - -/** - * Base trait for all regression functions. - */ -trait RegrLike extends AggregateFunction with
spark git commit: [SPARK-23907][SQL] Revert regr_* functions entirely
Repository: spark Updated Branches: refs/heads/master 7d8f5b62c -> 9cbd001e2 [SPARK-23907][SQL] Revert regr_* functions entirely ## What changes were proposed in this pull request? This patch reverts entirely all the regr_* functions added in SPARK-23907. These were added by mgaido91 (and proposed by gatorsmile) to improve compatibility with other database systems, without any actual use cases. However, they are very rarely used, and in Spark there are much better ways to compute these functions, due to Spark's flexibility in exposing real programming APIs. I'm going through all the APIs added in Spark 2.4 and I think we should revert these. If there are strong enough demands and more use cases, we can add them back in the future pretty easily. ## How was this patch tested? Reverted test cases also. Closes #22541 from rxin/SPARK-23907. Authored-by: Reynold Xin Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9cbd001e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9cbd001e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9cbd001e Branch: refs/heads/master Commit: 9cbd001e2476cd06aa0bcfcc77a21a9077d5797a Parents: 7d8f5b6 Author: Reynold Xin Authored: Tue Sep 25 20:13:07 2018 +0800 Committer: hyukjinkwon Committed: Tue Sep 25 20:13:07 2018 +0800 -- .../catalyst/analysis/FunctionRegistry.scala| 9 - .../expressions/aggregate/regression.scala | 190 --- .../sql-tests/inputs/udaf-regrfunctions.sql | 56 -- .../results/udaf-regrfunctions.sql.out | 93 - 4 files changed, 348 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9cbd001e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 8b69a47..7dafebf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -300,15 +300,6 @@ object FunctionRegistry { expression[CollectList]("collect_list"), expression[CollectSet]("collect_set"), expression[CountMinSketchAgg]("count_min_sketch"), -expression[RegrCount]("regr_count"), -expression[RegrSXX]("regr_sxx"), -expression[RegrSYY]("regr_syy"), -expression[RegrAvgX]("regr_avgx"), -expression[RegrAvgY]("regr_avgy"), -expression[RegrSXY]("regr_sxy"), -expression[RegrSlope]("regr_slope"), -expression[RegrR2]("regr_r2"), -expression[RegrIntercept]("regr_intercept"), // string functions expression[Ascii]("ascii"), http://git-wip-us.apache.org/repos/asf/spark/blob/9cbd001e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/regression.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/regression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/regression.scala deleted file mode 100644 index d8f4505..000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/regression.scala +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.expressions.aggregate - -import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{AbstractDataType, DoubleType} - -/** - * Base trait for all regression functions. - */ -trait RegrLike extends AggregateFunction with ImplicitCastInputTypes { - def y: Expression - def x: Expression - - override def children: Seq[Expression] =
svn commit: r29652 - in /dev/spark/2.5.0-SNAPSHOT-2018_09_25_00_02-7d8f5b6-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Sep 25 07:17:33 2018 New Revision: 29652 Log: Apache Spark 2.5.0-SNAPSHOT-2018_09_25_00_02-7d8f5b6 docs [This commit notification would consist of 1485 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v2.3.2 [created] 02b510728 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org