[spark] branch master updated: [SPARK-7721][INFRA] Run and generate test coverage report from Python via Jenkins
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new cdd694c [SPARK-7721][INFRA] Run and generate test coverage report from Python via Jenkins cdd694c is described below commit cdd694c52b53165acba6faabaf3a1fbaa925ac2e Author: Hyukjin Kwon AuthorDate: Fri Feb 1 10:18:08 2019 +0800 [SPARK-7721][INFRA] Run and generate test coverage report from Python via Jenkins ## What changes were proposed in this pull request? ### Background For the current status, the test script that generates coverage information was merged into Spark, https://github.com/apache/spark/pull/20204 So, we can generate the coverage report and site by, for example: ``` run-tests-with-coverage --python-executables=python3 --modules=pyspark-sql ``` like `run-tests` script in `./python`. ### Proposed change The next step is to host this coverage report via `github.io` automatically by Jenkins (see https://spark-test.github.io/pyspark-coverage-site/). This uses my testing account for Spark, spark-test, which is shared to Felix and Shivaram a long time ago for testing purpose including AppVeyor. To cut this short, this PR targets to run the coverage in [spark-master-test-sbt-hadoop-2.7](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/) In the specific job, it will clone the page, and rebase the up-to-date PySpark test coverage from the latest commit. For instance as below: ```bash # Clone PySpark coverage site. git clone https://github.com/spark-test/pyspark-coverage-site.git # Remove existing HTMLs. rm -fr pyspark-coverage-site/* # Copy generated coverage HTMLs. cp -r .../python/test_coverage/htmlcov/* pyspark-coverage-site/ # Check out to a temporary branch. git symbolic-ref HEAD refs/heads/latest_branch # Add all the files. git add -A # Commit current HTMLs. git commit -am "Coverage report at latest commit in Apache Spark" # Delete the old branch. git branch -D gh-pages # Rename the temporary branch to master. git branch -m gh-pages # Finally, force update to our repository. git push -f origin gh-pages ``` So, it is a one single up-to-date coverage can be shown in the `github-io` page. The commands above were manually tested. ### TODOs - [x] Write a draft HyukjinKwon - [x] `pip install coverage` to all python implementations (pypy, python2, python3) in Jenkins workers - shaneknapp - [x] Set hidden `SPARK_TEST_KEY` for spark-test's password in Jenkins via Jenkins's feature This should be set in both PR builder and `spark-master-test-sbt-hadoop-2.7` so that later other PRs can test and fix the bugs - shaneknapp - [x] Set an environment variable that indicates `spark-master-test-sbt-hadoop-2.7` so that that specific build can report and update the coverage site - shaneknapp - [x] Make PR builder's test passed HyukjinKwon - [x] Fix flaky test related with coverage HyukjinKwon - 6 consecutive passes out of 7 runs This PR will be co-authored with me and shaneknapp ## How was this patch tested? It will be tested via Jenkins. Closes #23117 from HyukjinKwon/SPARK-7721. Lead-authored-by: Hyukjin Kwon Co-authored-by: hyukjinkwon Co-authored-by: shane knapp Signed-off-by: Hyukjin Kwon --- README.md | 1 + dev/run-tests.py | 63 -- python/pyspark/streaming/tests/test_dstream.py | 10 3 files changed, 71 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index f3b90ce..271f2f5 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,7 @@ [![Jenkins Build](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/badge/icon)](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7) [![AppVeyor Build](https://img.shields.io/appveyor/ci/ApacheSoftwareFoundation/spark/master.svg?style=plastic=appveyor)](https://ci.appveyor.com/project/ApacheSoftwareFoundation/spark) +[![PySpark Coverage](https://img.shields.io/badge/dynamic/xml.svg?label=pyspark%20coverage=https%3A%2F%2Fspark-test.github.io%2Fpyspark-coverage-site=%2Fhtml%2Fbody%2Fdiv%5B1%5D%2Fdiv%2Fh1%2Fspan=brightgreen=plastic)](https://spark-test.github.io/pyspark-coverage-site) Spark is a fast and general cluster computing system for Big Data. It provides high-level APIs in Scala, Java, Python, and R, and an optimized engine that diff --git a/dev/run-tests.py b/dev/run-tests.py index e1ed274..edd89c9 100755 --- a/dev/run-tests.py +++ b/dev/ru
[spark] branch master updated: [SPARK-26651][SQL][DOC] Collapse notes related to java.time API
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b85974d [SPARK-26651][SQL][DOC] Collapse notes related to java.time API b85974d is described below commit b85974db85a881a2e8aebc31cb4f578008648ab9 Author: Maxim Gekk AuthorDate: Sat Feb 2 11:17:33 2019 +0800 [SPARK-26651][SQL][DOC] Collapse notes related to java.time API ## What changes were proposed in this pull request? Collapsed notes about using Java 8 API for date/timestamp manipulations and Proleptic Gregorian calendar in the SQL migration guide. Closes #23722 from MaxGekk/collapse-notes. Authored-by: Maxim Gekk Signed-off-by: Hyukjin Kwon --- docs/sql-migration-guide-upgrade.md | 14 +++--- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 41f27a3..dbf9df0 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -31,14 +31,10 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, the `SET` command works without any warnings even if the specified key is for `SparkConf` entries and it has no effect because the command does not update `SparkConf`, but the behavior might confuse users. Since 3.0, the command fails if a `SparkConf` key is used. You can disable such a check by setting `spark.sql.legacy.setCommandRejectsSparkCoreConfs` to `false`. - - Since Spark 3.0, CSV/JSON datasources use java.time API for parsing and generating CSV/JSON content. In Spark version 2.4 and earlier, java.text.SimpleDateFormat is used for the same purpose with fallbacks to the parsing mechanisms of Spark 2.0 and 1.x. For example, `2018-12-08 10:39:21.123` with the pattern `-MM-dd'T'HH:mm:ss.SSS` cannot be parsed since Spark 3.0 because the timestamp does not match to the pattern but it can be parsed by earlier Spark versions due to a fallback [...] - - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, the returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully. - In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a bad JSON record to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, the returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully. - - Since Spark 3.0, the `unix_timestamp`, `date_format`, `to_unix_timestamp`, `from_unixtime`, `to_date`, `to_timestamp` functions use java.time API for parsing and formatting dates/timestamps from/to strings by using ISO chronology (https://docs.oracle.com/javase/8/docs/api/java/time/chrono/IsoChronology.html) based on Proleptic Gregorian calendar. In Spark version 2.4 and earlier, java.text.SimpleDateFormat and java.util.GregorianCalendar (hybrid calendar that supports both the Julian [...] - - Since Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Set JSON option `inferTimestamp` to `false` to disable such type inferring. - In PySpark, when Arrow optimization is enabled, if Arrow version is higher than 0.11.0, Arrow can perform safe type conversion when converting Pandas.Series to Arrow array during serialization. Arrow will raise errors when detecting unsafe type conversion like overflow. Setting `spark.sql.execution.pandas.arrowSafeTypeConversion` to true can enable it. The default setting is false. PySpark's behavior for Arrow versions is illustrated in the table below: @@ -91,11 +87,15 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, if `org.apache.spark.sql.functions.udf(Any, DataType)` gets a Scala closure with primitive-type argument, the returned UDF will return null if the input values is null. Since Spark 3.0, the UDF will return the default value of the Java type if the input value is null. For example, `val f = udf((x: Int) => x, IntegerType)`, `f($"x")` will return null in Spark 2.4 and earlier if column `x` is null, and return 0 in Spark 3.0. This behavior change is int [...] - - Since Spark 3.0, the `weekofyear`, `weekday` and `dayofweek` functions use java.time API for calculation week number of year and day number of week based on Proleptic Gregorian calendar. In Spark version 2.4 and earlier, the hybrid calendar (Julian + Gregorian) is used for the same purpose. Results of the functions returned by Spark 3.0 and previous
[spark] branch master updated: [SPARK-18161][PYTHON] Update cloudpickle to v0.6.1
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 75ea89a [SPARK-18161][PYTHON] Update cloudpickle to v0.6.1 75ea89a is described below commit 75ea89ad94ca76646e4697cf98c78d14c6e2695f Author: Boris Shminke AuthorDate: Sat Feb 2 10:49:45 2019 +0800 [SPARK-18161][PYTHON] Update cloudpickle to v0.6.1 ## What changes were proposed in this pull request? In this PR we've done two things: 1) updated the Spark's copy of cloudpickle to 0.6.1 (current stable) The main reason Spark stayed with cloudpickle 0.4.x was that the default pickle protocol was changed in later versions. 2) started using pickle.HIGHEST_PROTOCOL for both Python 2 and Python 3 for serializers and broadcast [Pyrolite](https://github.com/irmen/Pyrolite) has such Pickle protocol version support: reading: 0,1,2,3,4; writing: 2. ## How was this patch tested? Jenkins tests. Authors: Sloane Simmons, Boris Shminke This contribution is original work of Sloane Simmons and Boris Shminke and they licensed it to the project under the project's open source license. Closes #20691 from inpefess/pickle_protocol_4. Lead-authored-by: Boris Shminke Co-authored-by: singularperturbation Signed-off-by: Hyukjin Kwon --- python/pyspark/broadcast.py | 4 +- python/pyspark/cloudpickle.py| 259 --- python/pyspark/serializers.py| 7 +- python/pyspark/tests/test_rdd.py | 2 +- 4 files changed, 194 insertions(+), 78 deletions(-) diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index 43a5ead..cca64b5 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -23,7 +23,7 @@ import threading from pyspark.cloudpickle import print_exec from pyspark.java_gateway import local_connect_and_auth -from pyspark.serializers import ChunkedStream +from pyspark.serializers import ChunkedStream, pickle_protocol from pyspark.util import _exception_message if sys.version < '3': @@ -109,7 +109,7 @@ class Broadcast(object): def dump(self, value, f): try: -pickle.dump(value, f, 2) +pickle.dump(value, f, pickle_protocol) except pickle.PickleError: raise except Exception as e: diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index 88519d7..bf92569 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -42,20 +42,26 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ from __future__ import print_function -import dis -from functools import partial -import imp import io -import itertools -import logging +import dis +import sys +import types import opcode -import operator import pickle import struct -import sys -import traceback -import types +import logging import weakref +import operator +import importlib +import itertools +import traceback +from functools import partial + + +# cloudpickle is meant for inter process communication: we expect all +# communicating processes to run the same Python version hence we favor +# communication speed over compatibility: +DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL if sys.version < '3': @@ -72,6 +78,22 @@ else: PY3 = True +# Container for the global namespace to ensure consistent unpickling of +# functions defined in dynamic modules (modules not registed in sys.modules). +_dynamic_modules_globals = weakref.WeakValueDictionary() + + +class _DynamicModuleFuncGlobals(dict): +"""Global variables referenced by a function defined in a dynamic module + +To avoid leaking references we store such context in a WeakValueDictionary +instance. However instances of python builtin types such as dict cannot +be used directly as values in such a construct, hence the need for a +derived class. +""" +pass + + def _make_cell_set_template_code(): """Get the Python compiler to emit LOAD_FAST(arg); STORE_DEREF @@ -157,7 +179,7 @@ def cell_set(cell, value): )(value) -#relevant opcodes +# relevant opcodes STORE_GLOBAL = opcode.opmap['STORE_GLOBAL'] DELETE_GLOBAL = opcode.opmap['DELETE_GLOBAL'] LOAD_GLOBAL = opcode.opmap['LOAD_GLOBAL'] @@ -167,7 +189,7 @@ EXTENDED_ARG = dis.EXTENDED_ARG def islambda(func): -return getattr(func,'__name__') == '' +return getattr(func, '__name__') == '' _BUILTIN_TYPE_NAMES = {} @@ -248,7 +270,9 @@ class CloudPickler(Pickler): dispatch = Pickler.dispatch.copy() def __init__(self, file, protocol=None): -Pickler.__init__(self, file, protocol) +if protocol is None: +protocol = DEFAULT_PROTOCOL +Pickler
[spark] branch master updated: [MINOR][SQL][DOCS] Reformat the tables in SQL migration guide
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a5427a0 [MINOR][SQL][DOCS] Reformat the tables in SQL migration guide a5427a0 is described below commit a5427a0067504484b0eb5e5f87b2658566aee324 Author: Hyukjin Kwon AuthorDate: Sat Feb 2 23:45:46 2019 +0800 [MINOR][SQL][DOCS] Reformat the tables in SQL migration guide ## What changes were proposed in this pull request? 1. Reformat the tables to be located with a proper indentation under the corresponding item to be consistent. 2. Fix **Table 2**'s contents to be more readable with code blocks. ### Table 1 **Before:** ![screen shot 2019-02-02 at 11 37 30 am](https://user-images.githubusercontent.com/6477701/52159396-f1a18380-26de-11e9-9dca-f56b19f22bb4.png) **After:** ![screen shot 2019-02-02 at 11 32 39 am](https://user-images.githubusercontent.com/6477701/52159370-7d66e000-26de-11e9-9e6d-81cf73691c05.png) ### Table 2 **Before:** ![screen shot 2019-02-02 at 11 35 51 am](https://user-images.githubusercontent.com/6477701/52159401-0ed65200-26df-11e9-8b0e-86d005c233b5.png) **After:** ![screen shot 2019-02-02 at 11 32 44 am](https://user-images.githubusercontent.com/6477701/52159372-7f30a380-26de-11e9-8c04-a88c74b78cff.png) ## How was this patch tested? Manually built the doc. Closes #23723 from HyukjinKwon/minor-doc-fix. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- docs/sql-migration-guide-upgrade.md | 138 ++-- 1 file changed, 69 insertions(+), 69 deletions(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index dbf9df0..1ae26e6 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -38,7 +38,7 @@ displayTitle: Spark SQL Upgrading Guide - Since Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Set JSON option `inferTimestamp` to `false` to disable such type inferring. - In PySpark, when Arrow optimization is enabled, if Arrow version is higher than 0.11.0, Arrow can perform safe type conversion when converting Pandas.Series to Arrow array during serialization. Arrow will raise errors when detecting unsafe type conversion like overflow. Setting `spark.sql.execution.pandas.arrowSafeTypeConversion` to true can enable it. The default setting is false. PySpark's behavior for Arrow versions is illustrated in the table below: - + PyArrow version @@ -51,39 +51,39 @@ displayTitle: Spark SQL Upgrading Guide - -version < 0.11.0 - - -Raise error - - -Silently allows - + +version < 0.11.0 + + +Raise error + + +Silently allows + - -version > 0.11.0, arrowSafeTypeConversion=false - - -Silent overflow - - -Silently allows - + +version > 0.11.0, arrowSafeTypeConversion=false + + +Silent overflow + + +Silently allows + - -version > 0.11.0, arrowSafeTypeConversion=true - - -Raise error - - -Raise error - + +version > 0.11.0, arrowSafeTypeConversion=true + + +Raise error + + +Raise error + - + - In Spark version 2.4 and earlier, if `org.apache.spark.sql.functions.udf(Any, DataType)` gets a Scala closure with primitive-type argument, the returned UDF will return null if the input values is null. Since Spark 3.0, the UDF will return the default value of the Java type if the input value is null. For example, `val f = udf((x: Int) => x, IntegerType)`, `f($"x")` will return null in Spark 2.4 and earlier if column `x` is null, and return 0 in Spark 3.0. This behavior change is int [...] @@ -100,64 +100,64 @@ displayTitle: Spark SQL Upgrading Guide ## Upgrading From Spark SQL 2.3 to 2.4 - In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains`
[spark] branch master updated: [SPARK-26818][ML] Make MLEvents JSON ser/de safe
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new dfb8809 [SPARK-26818][ML] Make MLEvents JSON ser/de safe dfb8809 is described below commit dfb880951a8de55c587c1bf8b696df50eae6e68a Author: Hyukjin Kwon AuthorDate: Sun Feb 3 21:19:35 2019 +0800 [SPARK-26818][ML] Make MLEvents JSON ser/de safe ## What changes were proposed in this pull request? Currently, it looks it's not going to cause any virtually effective problem apparently (if I didn't misread the codes). I see one place that JSON formatted events are being used. https://github.com/apache/spark/blob/ec506bd30c2ca324c12c9ec811764081c2eb8c42/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala#L148 It's okay because it just logs when the exception is ignorable https://github.com/apache/spark/blob/9690eba16efe6d25261934d8b73a221972b684f3/core/src/main/scala/org/apache/spark/util/ListenerBus.scala#L111 I guess it should be best to stay safe - I don't want this unstable experimental feature breaks anything in any case. It also disables `logEvent` in `SparkListenerEvent` for the same reason. This is also to match SQL execution events side: https://github.com/apache/spark/blob/ca545f79410a464ef24e3986fac225f53bb2ef02/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L41-L57 to make ML events JSON ser/de safe. ## How was this patch tested? Manually tested, and unit tests were added. Closes #23728 from HyukjinKwon/SPARK-26818. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .../main/scala/org/apache/spark/ml/events.scala| 81 +++ .../scala/org/apache/spark/ml/MLEventsSuite.scala | 112 + 2 files changed, 155 insertions(+), 38 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/events.scala b/mllib/src/main/scala/org/apache/spark/ml/events.scala index c51600f..dc4be4d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/events.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/events.scala @@ -17,6 +17,8 @@ package org.apache.spark.ml +import com.fasterxml.jackson.annotation.JsonIgnore + import org.apache.spark.SparkContext import org.apache.spark.annotation.Unstable import org.apache.spark.internal.Logging @@ -29,53 +31,84 @@ import org.apache.spark.sql.{DataFrame, Dataset} * after each operation (the event should document this). * * @note This is supported via [[Pipeline]] and [[PipelineModel]]. + * @note This is experimental and unstable. Do not use this unless you fully + * understand what `Unstable` means. */ @Unstable -sealed trait MLEvent extends SparkListenerEvent +sealed trait MLEvent extends SparkListenerEvent { + // Do not log ML events in event log. It should be revisited to see + // how it works with history server. + protected[spark] override def logEvent: Boolean = false +} /** * Event fired before `Transformer.transform`. */ @Unstable -case class TransformStart(transformer: Transformer, input: Dataset[_]) extends MLEvent +case class TransformStart() extends MLEvent { + @JsonIgnore var transformer: Transformer = _ + @JsonIgnore var input: Dataset[_] = _ +} + /** * Event fired after `Transformer.transform`. */ @Unstable -case class TransformEnd(transformer: Transformer, output: Dataset[_]) extends MLEvent +case class TransformEnd() extends MLEvent { + @JsonIgnore var transformer: Transformer = _ + @JsonIgnore var output: Dataset[_] = _ +} /** * Event fired before `Estimator.fit`. */ @Unstable -case class FitStart[M <: Model[M]](estimator: Estimator[M], dataset: Dataset[_]) extends MLEvent +case class FitStart[M <: Model[M]]() extends MLEvent { + @JsonIgnore var estimator: Estimator[M] = _ + @JsonIgnore var dataset: Dataset[_] = _ +} + /** * Event fired after `Estimator.fit`. */ @Unstable -case class FitEnd[M <: Model[M]](estimator: Estimator[M], model: M) extends MLEvent +case class FitEnd[M <: Model[M]]() extends MLEvent { + @JsonIgnore var estimator: Estimator[M] = _ + @JsonIgnore var model: M = _ +} /** * Event fired before `MLReader.load`. */ @Unstable -case class LoadInstanceStart[T](reader: MLReader[T], path: String) extends MLEvent +case class LoadInstanceStart[T](path: String) extends MLEvent { + @JsonIgnore var reader: MLReader[T] = _ +} + /** * Event fired after `MLReader.load`. */ @Unstable -case class LoadInstanceEnd[T](reader: MLReader[T], instance: T) extends MLEvent +case class LoadInstanceEnd[T]() extends MLEvent { + @JsonIgnore var reader: MLReader[T] = _ + @JsonIgnore var instance: T = _ +} /** * Event fired before `MLWriter.save`. */ @Unstable -case class SaveInstance
[spark] branch master updated: [MINOR][DOCS] Add a note that 'spark.executor.pyspark.memory' is dependent on 'resource'
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0d77d57 [MINOR][DOCS] Add a note that 'spark.executor.pyspark.memory' is dependent on 'resource' 0d77d57 is described below commit 0d77d575e14e535fbe29b42e5612f3ddc64d42f4 Author: Hyukjin Kwon AuthorDate: Thu Jan 31 15:51:40 2019 +0800 [MINOR][DOCS] Add a note that 'spark.executor.pyspark.memory' is dependent on 'resource' ## What changes were proposed in this pull request? This PR adds a note that explicitly `spark.executor.pyspark.memory` is dependent on resource module's behaviours at Python memory usage. For instance, I at least see some difference at https://github.com/apache/spark/pull/21977#discussion_r251220966 ## How was this patch tested? Manually built the doc. Closes #23664 from HyukjinKwon/note-resource-dependent. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- docs/configuration.md | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 806e16a..5b5891e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -190,8 +190,10 @@ of the most common options to set are: and it is up to the application to avoid exceeding the overhead memory space shared with other non-JVM processes. When PySpark is run in YARN or Kubernetes, this memory is added to executor resource requests. - -NOTE: Python memory usage may not be limited on platforms that do not support resource limiting, such as Windows. + +Note: This feature is dependent on Python's `resource` module; therefore, the behaviors and +limitations are inherited. For instance, Windows does not support resource limiting and actual +resource is not limited on MacOS. @@ -223,7 +225,8 @@ of the most common options to set are: stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories on different disks. -NOTE: In Spark 1.0 and later this will be overridden by SPARK_LOCAL_DIRS (Standalone), MESOS_SANDBOX (Mesos) or + +Note: This will be overridden by SPARK_LOCAL_DIRS (Standalone), MESOS_SANDBOX (Mesos) or LOCAL_DIRS (YARN) environment variables set by the cluster manager. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-26745][SQL] Revert count optimization in JSON datasource by SPARK-24959
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d4d6df2 [SPARK-26745][SQL] Revert count optimization in JSON datasource by SPARK-24959 d4d6df2 is described below commit d4d6df2f7d97168f0f3073aa42608294030ece55 Author: Hyukjin Kwon AuthorDate: Thu Jan 31 14:32:31 2019 +0800 [SPARK-26745][SQL] Revert count optimization in JSON datasource by SPARK-24959 ## What changes were proposed in this pull request? This PR reverts JSON count optimization part of #21909. We cannot distinguish the cases below without parsing: ``` [{...}, {...}] ``` ``` [] ``` ``` {...} ``` ```bash # empty string ``` when we `count()`. One line (input: IN) can be, 0 record, 1 record and multiple records and this is dependent on each input. See also https://github.com/apache/spark/pull/23665#discussion_r251276720. ## How was this patch tested? Manually tested. Closes #23667 from HyukjinKwon/revert-SPARK-24959. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .../apache/spark/sql/catalyst/csv/UnivocityParser.scala | 16 +++- .../spark/sql/catalyst/expressions/csvExpressions.scala | 3 +-- .../spark/sql/catalyst/expressions/jsonExpressions.scala | 3 +-- .../spark/sql/catalyst/util/FailureSafeParser.scala | 11 ++- sql/core/benchmarks/JSONBenchmark-results.txt| 1 - .../scala/org/apache/spark/sql/DataFrameReader.scala | 6 ++ .../sql/execution/datasources/json/JsonDataSource.scala | 6 ++ .../sql/execution/datasources/json/JsonBenchmark.scala | 3 --- 8 files changed, 19 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 82a5b3c..79dff6f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -188,11 +188,19 @@ class UnivocityParser( } } + private val doParse = if (requiredSchema.nonEmpty) { +(input: String) => convert(tokenizer.parseLine(input)) + } else { +// If `columnPruning` enabled and partition attributes scanned only, +// `schema` gets empty. +(_: String) => InternalRow.empty + } + /** * Parses a single CSV string and turns it into either one resulting row or no row (if the * the record is malformed). */ - def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) + def parse(input: String): InternalRow = doParse(input) private val getToken = if (options.columnPruning) { (tokens: Array[String], index: Int) => tokens(index) @@ -282,8 +290,7 @@ private[sql] object UnivocityParser { input => Seq(parser.convert(input)), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord, - parser.options.multiLine) + parser.options.columnNameOfCorruptRecord) val handleHeader: () => Unit = () => headerChecker.checkHeaderColumnNames(tokenizer) @@ -336,8 +343,7 @@ private[sql] object UnivocityParser { input => Seq(parser.parse(input)), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord, - parser.options.multiLine) + parser.options.columnNameOfCorruptRecord) filteredLines.flatMap(safeParser.parse) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index 83b0299..65b10f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -117,8 +117,7 @@ case class CsvToStructs( input => Seq(rawParser.parse(input)), mode, nullableSchema, - parsedOptions.columnNameOfCorruptRecord, - parsedOptions.multiLine) + parsedOptions.columnNameOfCorruptRecord) } override def dataType: DataType = nullableSchema diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 3403349..655e44e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -582,8 +582,7 @@ case class Jso
[spark] branch master updated: [SPARK-24779][R] Add map_concat / map_from_entries / an option in months_between UDF to disable rounding-off
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new bc6f191 [SPARK-24779][R] Add map_concat / map_from_entries / an option in months_between UDF to disable rounding-off bc6f191 is described below commit bc6f19145192835cdfa4fc263b1c35b294c1e0ac Author: Huaxin Gao AuthorDate: Thu Jan 31 19:38:32 2019 +0800 [SPARK-24779][R] Add map_concat / map_from_entries / an option in months_between UDF to disable rounding-off ## What changes were proposed in this pull request? Add the R version of map_concat / map_from_entries / an option in months_between UDF to disable rounding-off ## How was this patch tested? Add test in test_sparkSQL.R Closes #21835 from huaxingao/spark-24779. Authored-by: Huaxin Gao Signed-off-by: Hyukjin Kwon --- R/pkg/NAMESPACE | 2 ++ R/pkg/R/functions.R | 60 +++ R/pkg/R/generics.R| 10 +- R/pkg/tests/fulltests/test_sparkSQL.R | 22 + 4 files changed, 87 insertions(+), 7 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index cfad20d..1dcad16 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -312,8 +312,10 @@ exportMethods("%<=>%", "lower", "lpad", "ltrim", + "map_concat", "map_entries", "map_from_arrays", + "map_from_entries", "map_keys", "map_values", "max", diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 58fc410..8f425b1 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -80,6 +80,11 @@ NULL #' \item \code{from_utc_timestamp}, \code{to_utc_timestamp}: time zone to use. #' \item \code{next_day}: day of the week string. #' } +#' @param ... additional argument(s). +#' \itemize{ +#' \item \code{months_between}, this contains an optional parameter to specify the +#' the result is rounded off to 8 digits. +#' } #' #' @name column_datetime_diff_functions #' @rdname column_datetime_diff_functions @@ -217,6 +222,7 @@ NULL #' additional named properties to control how it is converted and accepts the #' same options as the CSV data source. #' \item \code{arrays_zip}, this contains additional Columns of arrays to be merged. +#' \item \code{map_concat}, this contains additional Columns of maps to be unioned. #' } #' @name column_collection_functions #' @rdname column_collection_functions @@ -229,7 +235,7 @@ NULL #' head(select(tmp, array_contains(tmp$v1, 21), size(tmp$v1), shuffle(tmp$v1))) #' head(select(tmp, array_max(tmp$v1), array_min(tmp$v1), array_distinct(tmp$v1))) #' head(select(tmp, array_position(tmp$v1, 21), array_repeat(df$mpg, 3), array_sort(tmp$v1))) -#' head(select(tmp, flatten(tmp$v1), reverse(tmp$v1), array_remove(tmp$v1, 21))) +#' head(select(tmp, reverse(tmp$v1), array_remove(tmp$v1, 21))) #' tmp2 <- mutate(tmp, v2 = explode(tmp$v1)) #' head(tmp2) #' head(select(tmp, posexplode(tmp$v1))) @@ -238,15 +244,21 @@ NULL #' head(select(tmp, sort_array(tmp$v1, asc = FALSE))) #' tmp3 <- mutate(df, v3 = create_map(df$model, df$cyl)) #' head(select(tmp3, map_entries(tmp3$v3), map_keys(tmp3$v3), map_values(tmp3$v3))) -#' head(select(tmp3, element_at(tmp3$v3, "Valiant"))) +#' head(select(tmp3, element_at(tmp3$v3, "Valiant"), map_concat(tmp3$v3, tmp3$v3))) #' tmp4 <- mutate(df, v4 = create_array(df$mpg, df$cyl), v5 = create_array(df$cyl, df$hp)) #' head(select(tmp4, concat(tmp4$v4, tmp4$v5), arrays_overlap(tmp4$v4, tmp4$v5))) #' head(select(tmp4, array_except(tmp4$v4, tmp4$v5), array_intersect(tmp4$v4, tmp4$v5))) #' head(select(tmp4, array_union(tmp4$v4, tmp4$v5))) -#' head(select(tmp4, arrays_zip(tmp4$v4, tmp4$v5), map_from_arrays(tmp4$v4, tmp4$v5))) +#' head(select(tmp4, arrays_zip(tmp4$v4, tmp4$v5))) #' head(select(tmp, concat(df$mpg, df$cyl, df$hp))) #' tmp5 <- mutate(df, v6 = create_array(df$model, df$model)) -#' head(select(tmp5, array_join(tmp5$v6, "#"), array_join(tmp5$v6, "#", "NULL")))} +#' head(select(tmp5, array_join(tmp5$v6, "#"), array_join(tmp5$v6, "#", "NULL"))) +#' tmp6 <- mutate(df, v7 = create_array(create_array(df$model, df$model))) +#' head(select(tmp6, flatten(tmp6$v7))) +#' tmp7 <- mutate(df, v8 = create_array(df$model, df$cyl), v9 = create_array(df$model, df$hp)) +#' head(select(tmp7, map_from_arrays(tmp7$v8, tmp7$v9))) +#' tmp8 <- mutate(df, v1
[spark] branch master updated: [MINOR][SQL] Unnecessary access to externalCatalog
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 181d190 [MINOR][SQL] Unnecessary access to externalCatalog 181d190 is described below commit 181d190c606ec6cbd09f6d618347b60eaa4ea828 Author: ocaballero AuthorDate: Thu Apr 11 10:00:09 2019 +0900 [MINOR][SQL] Unnecessary access to externalCatalog Necessarily access the external catalog without having to do it ## What changes were proposed in this pull request? The existsFunction function has been changed because it unnecessarily accessed the externalCatalog to find if the database exists in cases where the function is in the functionRegistry ## How was this patch tested? It has been tested through spark-shell and accessing the metastore logs of hive. Inside spark-shell we use spark.table (% tableA%). SelectExpr ("trim (% columnA%)") in the current version and it appears every time: org.apache.hadoop.hive.metastore.HiveMetaStore.audit: cmd = get_database: default Once the change is made, no record appears Closes #24312 from OCaballero/master. Authored-by: ocaballero Signed-off-by: HyukjinKwon --- .../org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 4b862a5..c05f777 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1105,10 +1105,11 @@ class SessionCatalog( * Check if the function with the specified name exists */ def functionExists(name: FunctionIdentifier): Boolean = { -val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) -requireDbExists(db) -functionRegistry.functionExists(name) || +functionRegistry.functionExists(name) || { + val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) + requireDbExists(db) externalCatalog.functionExists(db, name.funcName) +} } // - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0745333 [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor 0745333 is described below commit 07454d01afdd7862a1ac6c5a7a672bcce3f8 Author: chakravarthiT <45845595+chakravart...@users.noreply.github.com> AuthorDate: Thu Apr 11 10:02:27 2019 +0900 [SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor ## What changes were proposed in this pull request? Similar to #22406 , which has made log level for plan changes by each rule configurable ,this PR is to make log level for plan changes by each batch configurable,and I have reused the same configuration: "spark.sql.optimizer.planChangeLog.level". Config proposed in this PR , spark.sql.optimizer.planChangeLog.batches - enable plan change logging only for a set of specified batches, separated by commas. ## How was this patch tested? Added UT , also tested manually and attached screenshots below. 1)Setting spark.sql.optimizer.planChangeLog.leve to warn. ![settingLogLevelToWarn](https://user-images.githubusercontent.com/45845595/54556730-8803dd00-49df-11e9-95ab-ebb0c8d735ef.png) 2)setting spark.sql.optimizer.planChangeLog.batches to Resolution and Subquery. ![settingBatchestoLog](https://user-images.githubusercontent.com/45845595/54556740-8cc89100-49df-11e9-80ab-fbbbe1ff2cdf.png) 3) plan change logging enabled only for a set of specified batches(Resolution and Subquery) ![batchloggingOp](https://user-images.githubusercontent.com/45845595/54556788-ab2e8c80-49df-11e9-9ae0-57815f552896.png) Closes #24136 from chakravarthiT/logBatches. Lead-authored-by: chakravarthiT <45845595+chakravart...@users.noreply.github.com> Co-authored-by: chakravarthiT Signed-off-by: HyukjinKwon --- .../spark/sql/catalyst/rules/RuleExecutor.scala| 55 ++ .../org/apache/spark/sql/internal/SQLConf.scala| 18 +-- .../catalyst/optimizer/OptimizerLoggingSuite.scala | 45 +++--- 3 files changed, 87 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 088f1fe..3e8a6e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -113,7 +113,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { if (effective) { queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName) queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime) - planChangeLogger.log(rule.ruleName, plan, result) + planChangeLogger.logRule(rule.ruleName, plan, result) } queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime) queryExecutionMetrics.incNumExecution(rule.ruleName) @@ -152,15 +152,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { lastPlan = curPlan } - if (!batchStartPlan.fastEquals(curPlan)) { -logDebug( - s""" -|=== Result of Batch ${batch.name} === -|${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")} - """.stripMargin) - } else { -logTrace(s"Batch ${batch.name} has no effect.") - } + planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan) } curPlan @@ -172,21 +164,46 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { private val logRules = SQLConf.get.optimizerPlanChangeRules.map(Utils.stringToSeq) -def log(ruleName: String, oldPlan: TreeType, newPlan: TreeType): Unit = { +private val logBatches = SQLConf.get.optimizerPlanChangeBatches.map(Utils.stringToSeq) + +def logRule(ruleName: String, oldPlan: TreeType, newPlan: TreeType): Unit = { if (logRules.isEmpty || logRules.get.contains(ruleName)) { -lazy val message = +def message(): String = { s""" |=== Applying Rule ${ruleName} === |${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")} """.stripMargin -logLevel match { - case "TRACE" => logTrace(message) - case "DEBUG" => logDebug(message) - case "INFO" => logInfo(message) -
[spark] branch master updated: [SPARK-27387][PYTHON][TESTS] Replace sqlutils.assertPandasEqual with Pandas assert_frame_equals
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f62f44f [SPARK-27387][PYTHON][TESTS] Replace sqlutils.assertPandasEqual with Pandas assert_frame_equals f62f44f is described below commit f62f44f2a277c38d3d5b5524b287340991523236 Author: Bryan Cutler AuthorDate: Wed Apr 10 07:50:25 2019 +0900 [SPARK-27387][PYTHON][TESTS] Replace sqlutils.assertPandasEqual with Pandas assert_frame_equals ## What changes were proposed in this pull request? Running PySpark tests with Pandas 0.24.x causes a failure in `test_pandas_udf_grouped_map` test_supported_types: `ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()` This is because a column is an ArrayType and the method `sqlutils ReusedSQLTestCase.assertPandasEqual ` does not properly check this. This PR removes `assertPandasEqual` and replaces it with the built-in `pandas.util.testing.assert_frame_equal` which can properly handle columns of ArrayType and also prints out better diff between the DataFrames when an error occurs. Additionally, imports of pandas and pyarrow were moved to the top of related test files to avoid duplicating the same import many times. ## How was this patch tested? Existing tests Closes #24306 from BryanCutler/python-pandas-assert_frame_equal-SPARK-27387. Authored-by: Bryan Cutler Signed-off-by: HyukjinKwon --- python/pyspark/sql/tests/test_arrow.py | 44 +++- python/pyspark/sql/tests/test_dataframe.py | 7 +-- .../sql/tests/test_pandas_udf_grouped_agg.py | 60 +++-- .../sql/tests/test_pandas_udf_grouped_map.py | 61 +- python/pyspark/sql/tests/test_pandas_udf_scalar.py | 33 +++- python/pyspark/sql/tests/test_pandas_udf_window.py | 41 --- python/pyspark/testing/sqlutils.py | 6 --- 7 files changed, 115 insertions(+), 137 deletions(-) diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index 38a6402..a45c3fb 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -29,6 +29,13 @@ from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarro from pyspark.testing.utils import QuietTest from pyspark.util import _exception_message +if have_pandas: +import pandas as pd +from pandas.util.testing import assert_frame_equal + +if have_pyarrow: +import pyarrow as pa + @unittest.skipIf( not have_pandas or not have_pyarrow, @@ -40,7 +47,6 @@ class ArrowTests(ReusedSQLTestCase): from datetime import date, datetime from decimal import Decimal from distutils.version import LooseVersion -import pyarrow as pa super(ArrowTests, cls).setUpClass() cls.warnings_lock = threading.Lock() @@ -89,7 +95,6 @@ class ArrowTests(ReusedSQLTestCase): super(ArrowTests, cls).tearDownClass() def create_pandas_data_frame(self): -import pandas as pd import numpy as np data_dict = {} for j, name in enumerate(self.schema.names): @@ -100,8 +105,6 @@ class ArrowTests(ReusedSQLTestCase): return pd.DataFrame(data=data_dict) def test_toPandas_fallback_enabled(self): -import pandas as pd - with self.sql_conf({"spark.sql.execution.arrow.fallback.enabled": True}): schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)]) df = self.spark.createDataFrame([({u'a': 1},)], schema=schema) @@ -117,11 +120,10 @@ class ArrowTests(ReusedSQLTestCase): self.assertTrue(len(user_warns) > 0) self.assertTrue( "Attempting non-optimization" in _exception_message(user_warns[-1])) -self.assertPandasEqual(pdf, pd.DataFrame({u'map': [{u'a': 1}]})) +assert_frame_equal(pdf, pd.DataFrame({u'map': [{u'a': 1}]})) def test_toPandas_fallback_disabled(self): from distutils.version import LooseVersion -import pyarrow as pa schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)]) df = self.spark.createDataFrame([(None,)], schema=schema) @@ -157,8 +159,8 @@ class ArrowTests(ReusedSQLTestCase): df = self.spark.createDataFrame(self.data, schema=self.schema) pdf, pdf_arrow = self._toPandas_arrow_toggle(df) expected = self.create_pandas_data_frame() -self.assertPandasEqual(expected, pdf) -self.assertPandasEqual(expected, pdf_arrow
[spark-website] branch asf-site updated: Update committer page
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/spark-website.git The following commit(s) were added to refs/heads/asf-site by this push: new aa95d4c Update committer page aa95d4c is described below commit aa95d4c0fc2bd7786e2e8325a592790fd431ccae Author: HyukjinKwon AuthorDate: Sun Apr 7 18:21:55 2019 +0900 Update committer page I moved to Databricks. Author: HyukjinKwon Closes #193 from HyukjinKwon/update-hyukjin. --- committers.md| 2 +- site/committers.html | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/committers.md b/committers.md index 97b21a4..bd48bd8 100644 --- a/committers.md +++ b/committers.md @@ -40,7 +40,7 @@ navigation: |Shane Knapp|UC Berkeley| |Cody Koeninger|Nexstar Digital| |Andy Konwinski|Databricks| -|Hyukjin Kwon|Hortonworks| +|Hyukjin Kwon|Databricks| |Ryan LeCompte|Quantifind| |Haoyuan Li|Alluxio| |Xiao Li|Databricks| diff --git a/site/committers.html b/site/committers.html index 264f3ce..fde415d 100644 --- a/site/committers.html +++ b/site/committers.html @@ -332,7 +332,7 @@ Hyukjin Kwon - Hortonworks + Databricks Ryan LeCompte - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27253][SQL][FOLLOW-UP] Update doc about parent-session configuration priority
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 08858f6 [SPARK-27253][SQL][FOLLOW-UP] Update doc about parent-session configuration priority 08858f6 is described below commit 08858f6abcb7a0e31ee1d4d2c439112e33730bb4 Author: Liang-Chi Hsieh AuthorDate: Wed Apr 10 13:21:21 2019 +0900 [SPARK-27253][SQL][FOLLOW-UP] Update doc about parent-session configuration priority ## What changes were proposed in this pull request? The PR #24189 changes the behavior of merging SparkConf. The existing doc is not updated for it. This is a followup of it to update the doc. ## How was this patch tested? Doc only change. Closes #24326 from viirya/SPARK-27253-followup. Authored-by: Liang-Chi Hsieh Signed-off-by: HyukjinKwon --- .../scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 8f2f8e8..3d29ff3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -81,8 +81,8 @@ abstract class BaseSessionStateBuilder( /** * SQL-specific key-value configurations. * - * These either get cloned from a pre-existing instance or newly created. The conf is always - * merged with its [[SparkConf]]. + * These either get cloned from a pre-existing instance or newly created. The conf is merged + * with its [[SparkConf]] only when there is no parent session. */ protected lazy val conf: SQLConf = { parentState.map(_.conf.clone()).getOrElse { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27493][BUILD] Upgrade ASM to 7.1
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f93460d [SPARK-27493][BUILD] Upgrade ASM to 7.1 f93460d is described below commit f93460dae96d3668919b31d10d409c7b5033e267 Author: Dongjoon Hyun AuthorDate: Thu Apr 18 13:36:52 2019 +0900 [SPARK-27493][BUILD] Upgrade ASM to 7.1 ## What changes were proposed in this pull request? [SPARK-25946](https://issues.apache.org/jira/browse/SPARK-25946) upgraded ASM to 7.0 to support JDK11. This PR aims to update ASM to 7.1 to bring the bug fixes. - https://asm.ow2.io/versions.html - https://issues.apache.org/jira/browse/XBEAN-316 ## How was this patch tested? Pass the Jenkins. Closes #24395 from dongjoon-hyun/SPARK-27493. Authored-by: Dongjoon Hyun Signed-off-by: HyukjinKwon --- dev/deps/spark-deps-hadoop-2.7 | 2 +- dev/deps/spark-deps-hadoop-3.2 | 2 +- pom.xml| 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index 1386349..ece8c64 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -192,7 +192,7 @@ stringtemplate-3.2.1.jar super-csv-2.2.0.jar univocity-parsers-2.7.3.jar validation-api-1.1.0.Final.jar -xbean-asm7-shaded-4.12.jar +xbean-asm7-shaded-4.13.jar xercesImpl-2.9.1.jar xmlenc-0.52.jar xz-1.5.jar diff --git a/dev/deps/spark-deps-hadoop-3.2 b/dev/deps/spark-deps-hadoop-3.2 index 961f65d..a45f02d 100644 --- a/dev/deps/spark-deps-hadoop-3.2 +++ b/dev/deps/spark-deps-hadoop-3.2 @@ -216,7 +216,7 @@ token-provider-1.0.1.jar univocity-parsers-2.7.3.jar validation-api-1.1.0.Final.jar woodstox-core-5.0.3.jar -xbean-asm7-shaded-4.12.jar +xbean-asm7-shaded-4.13.jar xz-1.5.jar zjsonpatch-0.3.0.jar zookeeper-3.4.13.jar diff --git a/pom.xml b/pom.xml index 55f9e56..fce4cbd 100644 --- a/pom.xml +++ b/pom.xml @@ -325,7 +325,7 @@ org.apache.xbean xbean-asm7-shaded -4.12 +4.13
[spark] branch master updated: [MINOR][DOCS] Fix some broken links in docs
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 38fc8e2 [MINOR][DOCS] Fix some broken links in docs 38fc8e2 is described below commit 38fc8e2484aa4971d1f2c115da61fc96f36e7868 Author: Sean Owen AuthorDate: Sat Apr 13 22:27:25 2019 +0900 [MINOR][DOCS] Fix some broken links in docs ## What changes were proposed in this pull request? Fix some broken links in docs ## How was this patch tested? N/A Closes #24361 from srowen/BrokenLinks. Authored-by: Sean Owen Signed-off-by: HyukjinKwon --- docs/hardware-provisioning.md | 2 +- docs/ml-advanced.md | 2 +- docs/mllib-clustering.md | 2 +- docs/rdd-programming-guide.md | 10 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/hardware-provisioning.md b/docs/hardware-provisioning.md index dab65e2..4e5d681 100644 --- a/docs/hardware-provisioning.md +++ b/docs/hardware-provisioning.md @@ -45,7 +45,7 @@ nodes than the storage system to avoid interference. While Spark can perform a lot of its computation in memory, it still uses local disks to store data that doesn't fit in RAM, as well as to preserve intermediate output between stages. We recommend having **4-8 disks** per node, configured _without_ RAID (just as separate mount points). -In Linux, mount the disks with the [`noatime` option](http://www.centos.org/docs/5/html/Global_File_System/s2-manage-mountnoatime.html) +In Linux, mount the disks with the `noatime` option to reduce unnecessary writes. In Spark, [configure](configuration.html) the `spark.local.dir` variable to be a comma-separated list of the local disks. If you are running HDFS, it's fine to use the same disks as HDFS. diff --git a/docs/ml-advanced.md b/docs/ml-advanced.md index 8e8c701..5787fe9 100644 --- a/docs/ml-advanced.md +++ b/docs/ml-advanced.md @@ -52,7 +52,7 @@ explicitly in Newton's method. As a result, L-BFGS often achieves faster converg other first-order optimizations. [Orthant-Wise Limited-memory -Quasi-Newton](http://research-srv.microsoft.com/en-us/um/people/jfgao/paper/icml07scalable.pdf) +Quasi-Newton](https://www.microsoft.com/en-us/research/wp-content/uploads/2007/01/andrew07scalable.pdf) (OWL-QN) is an extension of L-BFGS that can effectively handle L1 and elastic net regularization. L-BFGS is used as a solver for [LinearRegression](api/scala/index.html#org.apache.spark.ml.regression.LinearRegression), diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index 18bab11..12c33a5 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -237,7 +237,7 @@ LDA supports different inference algorithms via `setOptimizer` function. on the likelihood function and yields comprehensive results, while `OnlineLDAOptimizer` uses iterative mini-batch sampling for [online variational -inference](https://www.cs.princeton.edu/~blei/papers/HoffmanBleiBach2010b.pdf) +inference](https://mimno.infosci.cornell.edu/info6150/readings/HoffmanBleiBach2010b.pdf) and is generally memory friendly. LDA takes in a collection of documents as vectors of word counts and the diff --git a/docs/rdd-programming-guide.md b/docs/rdd-programming-guide.md index b568e94..c937740 100644 --- a/docs/rdd-programming-guide.md +++ b/docs/rdd-programming-guide.md @@ -345,7 +345,7 @@ One important parameter for parallel collections is the number of *partitions* t -Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, [Amazon S3](http://wiki.apache.org/hadoop/AmazonS3), etc. Spark supports text files, [SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html), and any other Hadoop [InputFormat](http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html). +Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, [Amazon S3](http://wiki.apache.org/hadoop/AmazonS3), etc. Spark supports text files, [SequenceFiles](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html), and any other Hadoop [InputFormat](http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html). Text file RDDs can be created using `SparkContext`'s `textFile` method. This method takes a URI for the file (either a local path on the machine, or a `hdfs://`, `s3a://`, etc URI) and reads it as a collection of lines. Here is an example invocation: @@ -368,7 +368,7 @@ Apart from text files, Spark's Scala API also supports several other data format * `SparkContext.wholeTextFiles` lets you read a directory
[spark] branch master updated: [SPARK-27446][R] Use existing spark conf if available.
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new eea3f55 [SPARK-27446][R] Use existing spark conf if available. eea3f55 is described below commit eea3f55a316f6ebab0f91f265ae101b41a187096 Author: Bago Amirbekian AuthorDate: Sun Apr 14 17:09:12 2019 +0900 [SPARK-27446][R] Use existing spark conf if available. ## What changes were proposed in this pull request? The RBackend and RBackendHandler create new conf objects that don't pick up conf values from the existing SparkSession and therefore always use the default conf values instead of values specified by the user. In this fix we check to see if the spark env already exists, and get the conf from there. We fall back to creating a new conf. This follows the pattern used in other places including this: https://github.com/apache/spark/blob/3725b1324f731d57dc776c256bc1a100ec9e6cd0/core/src/main/scala/org/apache/spark/api/r/BaseRRunner.scala#L261 ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #24353 from MrBago/r-backend-use-existing-conf. Authored-by: Bago Amirbekian Signed-off-by: HyukjinKwon --- core/src/main/scala/org/apache/spark/api/r/RBackend.scala| 6 +++--- core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala index 36b4132..c755dcb 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala @@ -30,7 +30,7 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder import io.netty.handler.codec.bytes.{ByteArrayDecoder, ByteArrayEncoder} import io.netty.handler.timeout.ReadTimeoutHandler -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.R._ @@ -47,7 +47,7 @@ private[spark] class RBackend { private[r] val jvmObjectTracker = new JVMObjectTracker def init(): (Int, RAuthHelper) = { -val conf = new SparkConf() +val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT) bossGroup = new NioEventLoopGroup(conf.get(R_NUM_BACKEND_THREADS)) val workerGroup = bossGroup @@ -124,7 +124,7 @@ private[spark] object RBackend extends Logging { val listenPort = serverSocket.getLocalPort() // Connection timeout is set by socket client. To make it configurable we will pass the // timeout value to client inside the temp file - val conf = new SparkConf() + val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT) // tell the R process via temporary file diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala index 7b74efa..aaa432d 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala @@ -26,7 +26,7 @@ import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler} import io.netty.channel.ChannelHandler.Sharable import io.netty.handler.timeout.ReadTimeoutException -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.api.r.SerDe._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.R._ @@ -98,7 +98,7 @@ private[r] class RBackendHandler(server: RBackend) ctx.write(pingBaos.toByteArray) } } - val conf = new SparkConf() + val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) val heartBeatInterval = conf.get(R_HEARTBEAT_INTERVAL) val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT) val interval = Math.min(heartBeatInterval, backendConnectionTimeout - 1) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27803][SQL][PYTHON] Fix column pruning for Python UDF
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6506616 [SPARK-27803][SQL][PYTHON] Fix column pruning for Python UDF 6506616 is described below commit 6506616b978066a078ad866b3251cdf8fb95af42 Author: Wenchen Fan AuthorDate: Mon May 27 21:39:59 2019 +0900 [SPARK-27803][SQL][PYTHON] Fix column pruning for Python UDF ## What changes were proposed in this pull request? In https://github.com/apache/spark/pull/22104 , we create the python-eval nodes at the end of the optimization phase, which causes a problem. After the main optimization batch, Filter and Project nodes are usually pushed to the bottom, near the scan node. However, if we extract Python UDFs from Filter/Project, and create a python-eval node under Filter/Project, it will break column pruning/filter pushdown of the scan node. There are some hacks in the `ExtractPythonUDFs` rule, to duplicate the column pruning and filter pushdown logic. However, it has some bugs as demonstrated in the new test case(only column pruning is broken). This PR removes the hacks and re-apply the column pruning and filter pushdown rules explicitly. **Before:** ``` ... == Analyzed Logical Plan == a: bigint Project [a#168L] +- Filter dummyUDF(a#168L) +- Relation[a#168L,b#169L] parquet == Optimized Logical Plan == Project [a#168L] +- Project [a#168L, b#169L] +- Filter pythonUDF0#174: boolean +- BatchEvalPython [dummyUDF(a#168L)], [a#168L, b#169L, pythonUDF0#174] +- Relation[a#168L,b#169L] parquet == Physical Plan == *(2) Project [a#168L] +- *(2) Project [a#168L, b#169L] +- *(2) Filter pythonUDF0#174: boolean +- BatchEvalPython [dummyUDF(a#168L)], [a#168L, b#169L, pythonUDF0#174] +- *(1) FileScan parquet [a#168L,b#169L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/private/var/folders/_1/bzcp960d0hlb988k90654z2wgp/T/spark-798bae3c-a2..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct ``` **After:** ``` ... == Analyzed Logical Plan == a: bigint Project [a#168L] +- Filter dummyUDF(a#168L) +- Relation[a#168L,b#169L] parquet == Optimized Logical Plan == Project [a#168L] +- Filter pythonUDF0#174: boolean +- BatchEvalPython [dummyUDF(a#168L)], [pythonUDF0#174] +- Project [a#168L] +- Relation[a#168L,b#169L] parquet == Physical Plan == *(2) Project [a#168L] +- *(2) Filter pythonUDF0#174: boolean +- BatchEvalPython [dummyUDF(a#168L)], [pythonUDF0#174] +- *(1) FileScan parquet [a#168L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/private/var/folders/_1/bzcp960d0hlb988k90654z2wgp/T/spark-9500cafb-78..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct ``` ## How was this patch tested? new test Closes #24675 from cloud-fan/python. Authored-by: Wenchen Fan Signed-off-by: HyukjinKwon --- .../spark/sql/catalyst/expressions/PythonUDF.scala | 3 +- .../spark/sql/catalyst/optimizer/Optimizer.scala | 2 ++ .../plans/logical/pythonLogicalOperators.scala | 29 +++- .../spark/sql/execution/SparkOptimizer.scala | 15 +--- .../sql/execution/python/ArrowEvalPythonExec.scala | 15 ++-- .../sql/execution/python/BatchEvalPythonExec.scala | 15 ++-- .../sql/execution/python/EvalPythonExec.scala | 6 ++-- .../sql/execution/python/ExtractPythonUDFs.scala | 38 +++- .../execution/python/ExtractPythonUDFsSuite.scala | 40 -- 9 files changed, 92 insertions(+), 71 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala index 6530b17..2d82355 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala @@ -45,7 +45,8 @@ object PythonUDF { } /** - * A serialized version of a Python lambda function. + * A serialized version of a Python lambda function. This is a special expression, which needs a + * dedicated physical operator to execute it, and thus can't be pushed down to data sources. */ case class PythonUDF( name: String, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f32f2c7..5b59ac7 100644 --- a/sql/catalyst/src/main
[spark] branch master updated (35952cb -> 90b6cda)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 35952cb [SPARK-27859][SS] Use efficient sorting instead of `.sorted.reverse` sequence add 90b6cda [SPARK-25944][R][BUILD] AppVeyor change to latest R version (3.6.0) No new revisions were added by this update. Summary of changes: appveyor.yml | 4 dev/appveyor-install-dependencies.ps1 | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (3806887 -> db48da8)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 3806887 [SPARK-27907][SQL] HiveUDAF should return NULL in case of 0 rows add db48da8 [SPARK-27834][SQL][R][PYTHON] Make separate PySpark/SparkR vectorization configurations No new revisions were added by this update. Summary of changes: R/pkg/R/DataFrame.R| 8 +-- R/pkg/R/SQLContext.R | 4 +- R/pkg/R/group.R| 2 +- R/pkg/tests/fulltests/test_sparkSQL_arrow.R| 84 +++--- docs/sparkr.md | 6 +- docs/sql-pyspark-pandas-with-arrow.md | 6 +- examples/src/main/python/sql/arrow.py | 2 +- python/pyspark/sql/dataframe.py| 27 +++ python/pyspark/sql/session.py | 18 ++--- python/pyspark/sql/tests/test_arrow.py | 24 +-- python/pyspark/sql/tests/test_dataframe.py | 2 +- .../spark/sql/catalyst/plans/logical/object.scala | 4 +- .../org/apache/spark/sql/internal/SQLConf.scala| 34 ++--- 13 files changed, 127 insertions(+), 94 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27893][SQL][PYTHON] Create an integrated test base for Python, Scalar Pandas, Scala UDF by sql files
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f5317f1 [SPARK-27893][SQL][PYTHON] Create an integrated test base for Python, Scalar Pandas, Scala UDF by sql files f5317f1 is described below commit f5317f10b25bd193cf5026a8f4fd1cd1ded8f5b4 Author: HyukjinKwon AuthorDate: Mon Jun 3 10:03:36 2019 +0900 [SPARK-27893][SQL][PYTHON] Create an integrated test base for Python, Scalar Pandas, Scala UDF by sql files ## What changes were proposed in this pull request? This PR targets to add an integrated test base for various UDF test cases so that Scalar UDF, Python UDF and Scalar Pandas UDFs can be tested in SBT & Maven tests. ### Problem One of the problems we face is that: `ExtractPythonUDFs` (for Python UDF and Scalar Pandas UDF) has unevaluable expressions that always has to be wrapped with special plans. This special rule seems producing many issues, for instance, SPARK-27803, SPARK-26147, SPARK-26864, SPARK-26293, SPARK-25314 and SPARK-24721. ### Why do we have less test cases dedicated for SQL and plans with Python UDFs? We have virtually no such SQL (or plan) dedicated tests in PySpark to catch such issues because: - A developer should know all the analyzer, the optimizer, SQL, PySpark, Py4J and version differences in Python to write such good test cases - To test plans, we should access to plans in JVM via Py4J which is tricky, messy and duplicates Scala test cases - Usually we just add end-to-end test cases in PySpark therefore there are not so many dedicated examples to refer to write in PySpark It is also a non-trivial overhead to switch test base and method (IMHO). ### How does this PR fix? This PR adds Python UDF and Scalar Pandas UDF into our `*.sql` file based test base in runtime of SBT / Maven test cases. It generates Python-pickled instance (consisting of return type and Python native function) that is used in Python or Scalar Pandas UDF and directly brings into JVM. After that, (we don't interact via Py4J) run the tests directly in JVM - we can just register and run Python UDF and Scalar Pandas UDF in JVM. Currently, I only integrated this change into SQL file based testing. This is how works with test files under `udf` directory: After the test files under 'inputs/udf' directory are detected, it creates three test cases: - Scala UDF test case with a Scalar UDF registered named 'udf'. - Python UDF test case with a Python UDF registered named 'udf' iff Python executable and pyspark are available. - Scalar Pandas UDF test case with a Scalar Pandas UDF registered named 'udf' iff Python executable, pandas, pyspark and pyarrow are available. Therefore, UDF test cases should have single input and output files but executed by three different types of UDFs. For instance, ```sql CREATE TEMPORARY VIEW ta AS SELECT udf(a) AS a, udf('a') AS tag FROM t1 UNION ALL SELECT udf(a) AS a, udf('b') AS tag FROM t2; CREATE TEMPORARY VIEW tb AS SELECT udf(a) AS a, udf('a') AS tag FROM t3 UNION ALL SELECT udf(a) AS a, udf('b') AS tag FROM t4; SELECT tb.* FROM ta INNER JOIN tb ON ta.a = tb.a AND ta.tag = tb.tag; ``` will be ran 3 times with Scalar UDF, Python UDF and Scalar Pandas UDF each. ### Appendix Plus, this PR adds `IntegratedUDFTestUtils` which enables to test and execute Python UDF and Scalar Pandas UDFs as below: To register Python UDF in SQL: ```scala IntegratedUDFTestUtils.registerTestUDF(TestPythonUDF(name = "udf"), spark) ``` To register Scalar Pandas UDF in SQL: ```scala IntegratedUDFTestUtils.registerTestUDF(TestScalarPandasUDF(name = "udf"), spark) ``` To use it in Scala API: ```scala spark.select(expr("udf(1)").show() ``` To use it in SQL: ```scala sql("SELECT udf(1)").show() ``` This util could be used in the future for better coverage with Scala API combinations as well. ## How was this patch tested? Tested via the command below: ```bash build/sbt "sql/test-only *SQLQueryTestSuite -- -z udf/udf-inner-join.sql" ``` ``` [info] SQLQueryTestSuite: [info] - udf/udf-inner-join.sql - Scala UDF (5 seconds, 47 milliseconds) [info] - udf/udf-inner-join.sql - Python UDF (4 seconds, 335 milliseconds) [info] - udf/udf-inner-join.sql - Scalar Pandas UDF (5 seconds, 423 milliseconds) ``` [python] unavailable: ``` [info] SQLQueryTestSuite: [info] - udf/udf-inner-join.sql - Scala
[spark] branch master updated (f5317f1 -> 2a88fff)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from f5317f1 [SPARK-27893][SQL][PYTHON] Create an integrated test base for Python, Scalar Pandas, Scala UDF by sql files add 2a88fff [SPARK-27873][SQL] columnNameOfCorruptRecord should not be checked with column names in CSV header when disabling enforceSchema No new revisions were added by this update. Summary of changes: .../execution/datasources/csv/CSVFileFormat.scala | 11 - .../v2/csv/CSVPartitionReaderFactory.scala | 10 ++--- .../sql/execution/datasources/csv/CSVSuite.scala | 26 ++ 3 files changed, 39 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (4d770db -> eee3467)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4d770db [SPARK-27968] ArrowEvalPythonExec.evaluate shouldn't eagerly read the first row add eee3467 [SPARK-27938][SQL] Remove feature flag LEGACY_PASS_PARTITION_BY_AS_OPTIONS No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/internal/SQLConf.scala| 9 - .../org/apache/spark/sql/DataFrameWriter.scala | 9 +++-- .../sql/test/DataFrameReaderWriterSuite.scala | 22 +++--- 3 files changed, 10 insertions(+), 30 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27875][CORE][SQL][ML][K8S] Wrap all PrintWriter with Utils.tryWithResource
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new db3e746 [SPARK-27875][CORE][SQL][ML][K8S] Wrap all PrintWriter with Utils.tryWithResource db3e746 is described below commit db3e746b64a3f78ce60bcfd6f372735f574da95a Author: Yuming Wang AuthorDate: Thu May 30 19:54:32 2019 +0900 [SPARK-27875][CORE][SQL][ML][K8S] Wrap all PrintWriter with Utils.tryWithResource ## What changes were proposed in this pull request? This pr wrap all `PrintWriter` with `Utils.tryWithResource` to prevent resource leak. ## How was this patch tested? Existing test Closes #24739 from wangyum/SPARK-27875. Authored-by: Yuming Wang Signed-off-by: HyukjinKwon --- .../spark/api/python/PythonBroadcastSuite.scala| 6 ++--- .../org/apache/spark/deploy/SparkSubmitSuite.scala | 28 +-- .../deploy/history/FsHistoryProviderSuite.scala| 10 +++ .../spark/metrics/InputOutputMetricsSuite.scala| 12 - .../spark/scheduler/ReplayListenerSuite.scala | 26 +- .../ml/param/shared/SharedParamsCodeGen.scala | 8 +++--- .../features/PodTemplateConfigMapStepSuite.scala | 7 ++--- .../apache/spark/sql/catalyst/util/package.scala | 13 - .../spark/sql/execution/command/DDLSuite.scala | 12 - .../apache/spark/sql/hive/StatisticsSuite.scala| 31 +++--- .../spark/sql/hive/client/VersionsSuite.scala | 6 ++--- .../spark/sql/hive/execution/HiveUDFSuite.scala| 12 - 12 files changed, 88 insertions(+), 83 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala index 24004de..dffdd96 100644 --- a/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala @@ -42,9 +42,9 @@ class PythonBroadcastSuite extends SparkFunSuite with Matchers with SharedSparkC withTempDir { tempDir => val broadcastDataFile: File = { val file = new File(tempDir, "broadcastData") -val printWriter = new PrintWriter(file) -printWriter.write(broadcastedString) -printWriter.close() +Utils.tryWithResource(new PrintWriter(file)) { printWriter => + printWriter.write(broadcastedString) +} file } val broadcast = new PythonBroadcast(broadcastDataFile.getAbsolutePath) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 65c9cb9..385f549 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -749,10 +749,10 @@ class SparkSubmitSuite withTempDir { tmpDir => // Test jars and files val f1 = File.createTempFile("test-submit-jars-files", "", tmpDir) - val writer1 = new PrintWriter(f1) - writer1.println("spark.jars " + jars) - writer1.println("spark.files " + files) - writer1.close() + Utils.tryWithResource(new PrintWriter(f1)) { writer => +writer.println("spark.jars " + jars) +writer.println("spark.files " + files) + } val clArgs = Seq( "--master", "local", "--class", "org.SomeClass", @@ -766,10 +766,10 @@ class SparkSubmitSuite // Test files and archives (Yarn) val f2 = File.createTempFile("test-submit-files-archives", "", tmpDir) - val writer2 = new PrintWriter(f2) - writer2.println("spark.yarn.dist.files " + files) - writer2.println("spark.yarn.dist.archives " + archives) - writer2.close() + Utils.tryWithResource(new PrintWriter(f2)) { writer => +writer.println("spark.yarn.dist.files " + files) +writer.println("spark.yarn.dist.archives " + archives) + } val clArgs2 = Seq( "--master", "yarn", "--class", "org.SomeClass", @@ -783,9 +783,9 @@ class SparkSubmitSuite // Test python files val f3 = File.createTempFile("test-submit-python-files", "", tmpDir) - val writer3 = new PrintWriter(f3) - writer3.println("spark.submit.pyFiles " + pyFiles) - writer3.close() + Utils.tryWithResource(new PrintWriter(f3)) { writer => +writer.println("spark.submit.pyFiles " + pyFiles) + } val clArgs3 = Seq( "-
[spark] branch branch-2.4 updated: [SPARK-27873][SQL][BRANCH-2.4] columnNameOfCorruptRecord should not be checked with column names in CSV header when disabling enforceSchema
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 880cb7b [SPARK-27873][SQL][BRANCH-2.4] columnNameOfCorruptRecord should not be checked with column names in CSV header when disabling enforceSchema 880cb7b is described below commit 880cb7bee08172e2b82ac6fe57e4b75f8f495c1b Author: Liang-Chi Hsieh AuthorDate: Tue Jun 4 16:27:44 2019 +0900 [SPARK-27873][SQL][BRANCH-2.4] columnNameOfCorruptRecord should not be checked with column names in CSV header when disabling enforceSchema ## What changes were proposed in this pull request? If we want to keep corrupt record when reading CSV, we provide a new column into the schema, that is `columnNameOfCorruptRecord`. But this new column isn't actually a column in CSV header. So if `enforceSchema` is disabled, `CSVHeaderChecker` throws a exception complaining that number of column in CSV header isn't equal to that in the schema. This backports the fix into branch-2.4. ## How was this patch tested? Added test. Closes #24771 from viirya/SPARK-27873-2.4. Authored-by: Liang-Chi Hsieh Signed-off-by: HyukjinKwon --- .../execution/datasources/csv/CSVDataSource.scala | 12 +-- .../sql/execution/datasources/csv/CSVSuite.scala | 25 ++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index e840ff1..62c3c16 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -210,7 +210,11 @@ object TextInputCSVDataSource extends CSVDataSource { // Note: if there are only comments in the first block, the header would probably // be not extracted. CSVUtils.extractHeader(lines, parser.options).foreach { header => -val schema = if (columnPruning) requiredSchema else dataSchema +val actualRequiredSchema = + StructType(requiredSchema.filterNot(_.name == parser.options.columnNameOfCorruptRecord)) +val actualDataSchema = + StructType(dataSchema.filterNot(_.name == parser.options.columnNameOfCorruptRecord)) +val schema = if (columnPruning) actualRequiredSchema else actualDataSchema val columnNames = parser.tokenizer.parseLine(header) CSVDataSource.checkHeaderColumnNames( schema, @@ -297,7 +301,11 @@ object MultiLineCSVDataSource extends CSVDataSource { caseSensitive: Boolean, columnPruning: Boolean): Iterator[InternalRow] = { def checkHeader(header: Array[String]): Unit = { - val schema = if (columnPruning) requiredSchema else dataSchema + val actualRequiredSchema = +StructType(requiredSchema.filterNot(_.name == parser.options.columnNameOfCorruptRecord)) + val actualDataSchema = +StructType(dataSchema.filterNot(_.name == parser.options.columnNameOfCorruptRecord)) + val schema = if (columnPruning) actualRequiredSchema else actualDataSchema CSVDataSource.checkHeaderColumnNames( schema, header, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index d59035b..df9d154 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1836,4 +1836,29 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te val schema = new StructType().add("a", StringType).add("b", IntegerType) checkAnswer(spark.read.schema(schema).option("delimiter", delimiter).csv(input), Row("abc", 1)) } + + test("SPARK-27873: disabling enforceSchema should not fail columnNameOfCorruptRecord") { +Seq(false, true).foreach { multiLine => + withTempPath { path => +val df = Seq(("0", "2013-abc-11")).toDF("a", "b") +df.write + .option("header", "true") + .csv(path.getAbsolutePath) + +val schema = StructType.fromDDL("a int, b date") +val columnNameOfCorruptRecord = "_unparsed" +val schemaWithCorrField = schema.add(columnNameOfCorruptRecord, StringType) +val readDF = spark + .read + .option("mode", "Permissive") +
[spark] branch master updated: [MINOR][DOCS] Add a clarifying note to str_to_map documentation
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3ddc26d [MINOR][DOCS] Add a clarifying note to str_to_map documentation 3ddc26d is described below commit 3ddc26ddd8bfd706204c5b20dbfb7749ab55a6ca Author: Michael Chirico AuthorDate: Tue Jun 4 16:58:25 2019 +0900 [MINOR][DOCS] Add a clarifying note to str_to_map documentation I was quite surprised by the following behavior: `SELECT str_to_map('1:2|3:4', '|')` vs `SELECT str_to_map(replace('1:2|3:4', '|', ','))` The documentation does not make clear at all what's going on here, but a [dive into the source code shows](https://github.com/apache/spark/blob/fa0d4bf69929c5acd676d602e758a969713d19d8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala#L461-L466) that `split` is being used and in turn the interpretation of `split`'s arguments as RegEx is clearly documented. ## What changes were proposed in this pull request? Documentation clarification ## How was this patch tested? N/A Closes #23888 from MichaelChirico/patch-2. Authored-by: Michael Chirico Signed-off-by: HyukjinKwon --- .../org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala index 8d3a641..319a7fc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala @@ -418,7 +418,7 @@ case class CreateNamedStructUnsafe(children: Seq[Expression]) extends CreateName */ // scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(text[, pairDelim[, keyValueDelim]]) - Creates a map after splitting the text into key/value pairs using delimiters. Default delimiters are ',' for `pairDelim` and ':' for `keyValueDelim`.", + usage = "_FUNC_(text[, pairDelim[, keyValueDelim]]) - Creates a map after splitting the text into key/value pairs using delimiters. Default delimiters are ',' for `pairDelim` and ':' for `keyValueDelim`. Both `pairDelim` and `keyValueDelim` are treated as regular expressions.", examples = """ Examples: > SELECT _FUNC_('a:1,b:2,c:3', ',', ':'); - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (98708de -> 216eb36)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 98708de [MINOR][ML] add missing since annotation of meanAveragePrecision add 216eb36 [SPARK-27887][PYTHON] Add deprecation warning for Python 2 No new revisions were added by this update. Summary of changes: python/pyspark/context.py | 9 + 1 file changed, 9 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (c79f471 -> 23ebd38)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from c79f471 [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL add 23ebd38 [SPARK-27418][SQL] Migrate Parquet to File Data Source V2 No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/internal/SQLConf.scala| 2 +- ...org.apache.spark.sql.sources.DataSourceRegister | 2 +- .../spark/sql/execution/command/tables.scala | 5 +- .../sql/execution/datasources/SchemaPruning.scala | 4 +- .../datasources/parquet/ParquetFileFormat.scala| 100 + .../datasources/parquet/ParquetFilters.scala | 2 +- .../datasources/parquet/ParquetOutputWriter.scala | 2 +- .../datasources/parquet/ParquetReadSupport.scala | 4 +- .../datasources/parquet/ParquetUtils.scala | 130 +++ .../datasources/parquet/ParquetWriteSupport.scala | 4 +- .../datasources/v2/FilePartitionReader.scala | 17 + .../ParquetDataSourceV2.scala} | 14 +- .../v2/parquet/ParquetPartitionReaderFactory.scala | 227 +++ .../OrcScan.scala => parquet/ParquetScan.scala}| 58 ++- .../ParquetScanBuilder.scala} | 48 ++- .../OrcTable.scala => parquet/ParquetTable.scala} | 18 +- .../v2/parquet/ParquetWriteBuilder.scala | 116 ++ .../spark/sql/FileBasedDataSourceSuite.scala | 4 +- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 111 +++--- .../apache/spark/sql/execution/PlannerSuite.scala | 20 +- .../spark/sql/execution/SameResultSuite.scala | 32 +- .../spark/sql/execution/SparkPlanSuite.scala | 57 ++- .../datasources/parquet/ParquetFilterSuite.scala | 179 ++--- .../parquet/ParquetPartitionDiscoverySuite.scala | 419 ++--- .../datasources/parquet/ParquetQuerySuite.scala| 99 +++-- .../parquet/ParquetSchemaPruningSuite.scala| 40 +- .../datasources/parquet/ParquetSchemaSuite.scala | 2 +- .../sql/execution/metric/SQLMetricsSuite.scala | 47 +-- .../execution/python/ExtractPythonUDFsSuite.scala | 90 +++-- .../sources/v2/FileDataSourceV2FallBackSuite.scala | 8 +- .../spark/sql/streaming/FileStreamSinkSuite.scala | 132 +++ .../apache/spark/sql/streaming/StreamSuite.scala | 9 +- .../streaming/StreamingDeduplicationSuite.scala| 50 +-- .../spark/sql/streaming/StreamingQuerySuite.scala | 86 +++-- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 10 +- .../spark/sql/hive/execution/SQLQuerySuite.scala | 5 +- .../spark/sql/sources/HadoopFsRelationTest.scala | 3 +- 37 files changed, 1513 insertions(+), 643 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala copy sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/{orc/OrcDataSourceV2.scala => parquet/ParquetDataSourceV2.scala} (76%) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala copy sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/{orc/OrcScan.scala => parquet/ParquetScan.scala} (50%) copy sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/{orc/OrcScanBuilder.scala => parquet/ParquetScanBuilder.scala} (53%) copy sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/{orc/OrcTable.scala => parquet/ParquetTable.scala} (81%) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWriteBuilder.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (23ebd38 -> 26998b8)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 23ebd38 [SPARK-27418][SQL] Migrate Parquet to File Data Source V2 add 26998b8 [SPARK-27870][SQL][PYTHON] Add a runtime buffer size configuration for Pandas UDFs No new revisions were added by this update. Summary of changes: .../main/scala/org/apache/spark/api/python/PythonRunner.scala | 3 ++- .../main/scala/org/apache/spark/internal/config/package.scala | 1 + python/pyspark/daemon.py | 5 +++-- python/pyspark/java_gateway.py| 2 +- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala| 11 +++ .../apache/spark/sql/execution/python/ArrowPythonRunner.scala | 7 +++ 6 files changed, 25 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27971][SQL][R] MapPartitionsInRWithArrowExec.evaluate shouldn't eagerly read the first batch
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6dcf09b [SPARK-27971][SQL][R] MapPartitionsInRWithArrowExec.evaluate shouldn't eagerly read the first batch 6dcf09b is described below commit 6dcf09becc24f47fc6487a57692bd4537983e1b1 Author: HyukjinKwon AuthorDate: Sun Jun 9 11:40:20 2019 +0900 [SPARK-27971][SQL][R] MapPartitionsInRWithArrowExec.evaluate shouldn't eagerly read the first batch ## What changes were proposed in this pull request? This PR is the same fix as https://github.com/apache/spark/pull/24816 but in vectorized `dapply` in SparkR. ## How was this patch tested? Manually tested. Closes #24818 from HyukjinKwon/SPARK-27971. Authored-by: HyukjinKwon Signed-off-by: HyukjinKwon --- .../org/apache/spark/sql/execution/objects.scala | 27 -- 1 file changed, 5 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index bedfa9c..202cbd0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -243,28 +243,11 @@ case class MapPartitionsInRWithArrowExec( // binary in a batch due to the limitation of R API. See also ARROW-4512. val columnarBatchIter = runner.compute(batchIter, -1) val outputProject = UnsafeProjection.create(output, output) - new Iterator[InternalRow] { - -private var currentIter = if (columnarBatchIter.hasNext) { - val batch = columnarBatchIter.next() - val actualDataTypes = (0 until batch.numCols()).map(i => batch.column(i).dataType()) - assert(outputTypes == actualDataTypes, "Invalid schema from dapply(): " + -s"expected ${outputTypes.mkString(", ")}, got ${actualDataTypes.mkString(", ")}") - batch.rowIterator.asScala -} else { - Iterator.empty -} - -override def hasNext: Boolean = currentIter.hasNext || { - if (columnarBatchIter.hasNext) { -currentIter = columnarBatchIter.next().rowIterator.asScala -hasNext - } else { -false - } -} - -override def next(): InternalRow = currentIter.next() + columnarBatchIter.flatMap { batch => +val actualDataTypes = (0 until batch.numCols()).map(i => batch.column(i).dataType()) +assert(outputTypes == actualDataTypes, "Invalid schema from dapply(): " + + s"expected ${outputTypes.mkString(", ")}, got ${actualDataTypes.mkString(", ")}") +batch.rowIterator.asScala }.map(outputProject) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27995][PYTHON] Note the difference between str of Python 2 and 3 at Arrow optimized
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1217996 [SPARK-27995][PYTHON] Note the difference between str of Python 2 and 3 at Arrow optimized 1217996 is described below commit 1217996f1574f758d81c4e3846452d24b35b Author: HyukjinKwon AuthorDate: Tue Jun 11 18:43:59 2019 +0900 [SPARK-27995][PYTHON] Note the difference between str of Python 2 and 3 at Arrow optimized ## What changes were proposed in this pull request? When Arrow optimization is enabled in Python 2.7, ```python import pandas pdf = pandas.DataFrame(["test1", "test2"]) df = spark.createDataFrame(pdf) df.show() ``` I got the following output: ``` ++ | 0| ++ |[74 65 73 74 31]| |[74 65 73 74 32]| ++ ``` This looks because Python's `str` and `byte` are same. it does look right: ```python >>> str == bytes True >>> isinstance("a", bytes) True ``` To cut it short: 1. Python 2 treats `str` as `bytes`. 2. PySpark added some special codes and hacks to recognizes `str` as string types. 3. PyArrow / Pandas followed Python 2 difference To fix, we have two options: 1. Fix it to match the behaviour to PySpark's 2. Note the differences but Python 2 is deprecated anyway. I think it's better to just note it and for go option 2. ## How was this patch tested? Manually tested. Doc was checked too: ![Screen Shot 2019-06-11 at 6 40 07 PM](https://user-images.githubusercontent.com/6477701/59261402-59ad3b00-8c78-11e9-94a6-3236a2c338d4.png) Closes #24838 from HyukjinKwon/SPARK-27995. Authored-by: HyukjinKwon Signed-off-by: HyukjinKwon --- python/pyspark/sql/session.py | 5 + 1 file changed, 5 insertions(+) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 72d2d99..cdab840 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -653,6 +653,11 @@ class SparkSession(object): .. note:: Usage with spark.sql.execution.arrow.pyspark.enabled=True is experimental. +.. note:: When Arrow optimization is enabled, strings inside Pandas DataFrame in Python +2 are converted into bytes as they are bytes in Python 2 whereas regular strings are +left as strings. When using strings in Python 2, use unicode `u""` as Python standard +practice. + >>> l = [('Alice', 1)] >>> spark.createDataFrame(l).collect() [Row(_1=u'Alice', _2=1)] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (773cfde -> c1bb331)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 773cfde [SPARK-27917][SQL] canonical form of CaseWhen object is incorrect add c1bb331 [SPARK-27425][SQL] Add count_if function No new revisions were added by this update. Summary of changes: .../sql/catalyst/analysis/FunctionRegistry.scala | 1 + .../catalyst/expressions/aggregate/CountIf.scala | 55 ++ .../sql/catalyst/optimizer/finishAnalysis.scala| 5 +- .../apache/spark/sql/DataFrameAggregateSuite.scala | 40 4 files changed, 99 insertions(+), 2 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CountIf.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28058][DOC] Add a note to doc of mode of CSV for column pruning
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b7bdc31 [SPARK-28058][DOC] Add a note to doc of mode of CSV for column pruning b7bdc31 is described below commit b7bdc3111ec2778d7d54d09ba339d893250aa65d Author: Liang-Chi Hsieh AuthorDate: Tue Jun 18 13:48:32 2019 +0900 [SPARK-28058][DOC] Add a note to doc of mode of CSV for column pruning ## What changes were proposed in this pull request? When using `DROPMALFORMED` mode, corrupted records aren't dropped if malformed columns aren't read. This behavior is due to CSV parser column pruning. Current doc of `DROPMALFORMED` doesn't mention the effect of column pruning. Users will be confused by the fact that `DROPMALFORMED` mode doesn't work as expected. Column pruning also affects other modes. This is a doc improvement to add a note to doc of `mode` to explain it. ## How was this patch tested? N/A. This is just doc change. Closes #24894 from viirya/SPARK-28058. Authored-by: Liang-Chi Hsieh Signed-off-by: HyukjinKwon --- python/pyspark/sql/readwriter.py | 6 +- sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 5 - 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 6413d88..aa5bf63 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -430,7 +430,11 @@ class DataFrameReader(OptionUtils): :param maxMalformedLogPerPartition: this parameter is no longer used since Spark 2.2.0. If specified, it is ignored. :param mode: allows a mode for dealing with corrupt records during parsing. If None is - set, it uses the default value, ``PERMISSIVE``. + set, it uses the default value, ``PERMISSIVE``. Note that Spark tries to + parse only required columns in CSV under column pruning. Therefore, corrupt + records can be different based on required set of fields. This behavior can + be controlled by ``spark.sql.csv.parser.columnPruning.enabled`` + (enabled by default). * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index dfc6d8c..2bf9024 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -627,7 +627,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * `maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed * for any given value being read. By default, it is -1 meaning unlimited length * `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records - *during parsing. It supports the following case-insensitive modes. + *during parsing. It supports the following case-insensitive modes. Note that Spark tries + *to parse only required columns in CSV under column pruning. Therefore, corrupt records + *can be different based on required set of fields. This behavior can be controlled by + *`spark.sql.csv.parser.columnPruning.enabled` (enabled by default). * * `PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a * field configured by `columnNameOfCorruptRecord`, and sets malformed fields to `null`. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-28058][DOC] Add a note to doc of mode of CSV for column pruning
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new f4efcbf [SPARK-28058][DOC] Add a note to doc of mode of CSV for column pruning f4efcbf is described below commit f4efcbf367b23e0e3e85cd3e094641c70eb17463 Author: Liang-Chi Hsieh AuthorDate: Tue Jun 18 13:48:32 2019 +0900 [SPARK-28058][DOC] Add a note to doc of mode of CSV for column pruning ## What changes were proposed in this pull request? When using `DROPMALFORMED` mode, corrupted records aren't dropped if malformed columns aren't read. This behavior is due to CSV parser column pruning. Current doc of `DROPMALFORMED` doesn't mention the effect of column pruning. Users will be confused by the fact that `DROPMALFORMED` mode doesn't work as expected. Column pruning also affects other modes. This is a doc improvement to add a note to doc of `mode` to explain it. ## How was this patch tested? N/A. This is just doc change. Closes #24894 from viirya/SPARK-28058. Authored-by: Liang-Chi Hsieh Signed-off-by: HyukjinKwon (cherry picked from commit b7bdc3111ec2778d7d54d09ba339d893250aa65d) Signed-off-by: HyukjinKwon --- python/pyspark/sql/readwriter.py | 6 +- sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 5 - 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index c25426c..ea7cc80 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -417,7 +417,11 @@ class DataFrameReader(OptionUtils): :param maxMalformedLogPerPartition: this parameter is no longer used since Spark 2.2.0. If specified, it is ignored. :param mode: allows a mode for dealing with corrupt records during parsing. If None is - set, it uses the default value, ``PERMISSIVE``. + set, it uses the default value, ``PERMISSIVE``. Note that Spark tries to + parse only required columns in CSV under column pruning. Therefore, corrupt + records can be different based on required set of fields. This behavior can + be controlled by ``spark.sql.csv.parser.columnPruning.enabled`` + (enabled by default). * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ into a field configured by ``columnNameOfCorruptRecord``, and sets other \ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 666a97d..85cd3f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -589,7 +589,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * `maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed * for any given value being read. By default, it is -1 meaning unlimited length * `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records - *during parsing. It supports the following case-insensitive modes. + *during parsing. It supports the following case-insensitive modes. Note that Spark tries + *to parse only required columns in CSV under column pruning. Therefore, corrupt records + *can be different based on required set of fields. This behavior can be controlled by + *`spark.sql.csv.parser.columnPruning.enabled` (enabled by default). * * `PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a * field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28041][PYTHON] Increase minimum supported Pandas to 0.23.2
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 90f8039 [SPARK-28041][PYTHON] Increase minimum supported Pandas to 0.23.2 90f8039 is described below commit 90f80395af629f5c19d3f552c54b2cc63eb7a76a Author: Bryan Cutler AuthorDate: Tue Jun 18 09:10:58 2019 +0900 [SPARK-28041][PYTHON] Increase minimum supported Pandas to 0.23.2 ## What changes were proposed in this pull request? This increases the minimum supported version of Pandas to 0.23.2. Using a lower version will raise an error `Pandas >= 0.23.2 must be installed; however, your version was 0.XX`. Also, a workaround for using pyarrow with Pandas 0.19.2 was removed. ## How was this patch tested? Existing Tests Closes #24867 from BryanCutler/pyspark-increase-min-pandas-SPARK-28041. Authored-by: Bryan Cutler Signed-off-by: HyukjinKwon --- docs/sql-migration-guide-upgrade.md| 4 python/pyspark/serializers.py | 2 -- python/pyspark/sql/tests/test_arrow.py | 4 ++-- python/pyspark/sql/utils.py| 2 +- python/setup.py| 2 +- 5 files changed, 8 insertions(+), 6 deletions(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 37be86f..b062a04 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -23,6 +23,10 @@ license: | {:toc} ## Upgrading From Spark SQL 2.4 to 3.0 + - Since Spark 3.0, PySpark requires a Pandas version of 0.23.2 or higher to use Pandas related functionality, such as `toPandas`, `createDataFrame` from Pandas DataFrame, etc. + + - Since Spark 3.0, PySpark requires a PyArrow version of 0.12.1 or higher to use PyArrow related functionality, such as `pandas_udf`, `toPandas` and `createDataFrame` with "spark.sql.execution.arrow.enabled=true", etc. + - In Spark version 2.4 and earlier, SQL queries such as `FROM ` or `FROM UNION ALL FROM ` are supported by accident. In hive-style `FROM SELECT `, the `SELECT` clause is not negligible. Neither Hive nor Presto support this syntax. Therefore we will treat these queries as invalid since Spark 3.0. - Since Spark 3.0, the Dataset and DataFrame API `unionAll` is not deprecated any more. It is an alias for `union`. diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 516ee7e..fc0828b 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -297,8 +297,6 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer): # Ensure timestamp series are in expected form for Spark internal representation if t is not None and pa.types.is_timestamp(t): s = _check_series_convert_timestamps_internal(s.fillna(0), self._timezone) -# TODO: need cast after Arrow conversion, ns values cause error with pandas 0.19.2 -return pa.Array.from_pandas(s, mask=mask).cast(t, safe=False) try: array = pa.Array.from_pandas(s, mask=mask, type=t, safe=self._safecheck) diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index cb51241..0671137 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -268,10 +268,10 @@ class ArrowTests(ReusedSQLTestCase): def test_createDataFrame_with_incorrect_schema(self): pdf = self.create_pandas_data_frame() fields = list(self.schema) -fields[0], fields[7] = fields[7], fields[0] # swap str with timestamp +fields[0], fields[1] = fields[1], fields[0] # swap str with int wrong_schema = StructType(fields) with QuietTest(self.sc): -with self.assertRaisesRegexp(Exception, ".*cast.*[s|S]tring.*timestamp.*"): +with self.assertRaisesRegexp(Exception, "integer.*required.*got.*str"): self.spark.createDataFrame(pdf, schema=wrong_schema) def test_createDataFrame_with_names(self): diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index 1c96e33..ca5e85b 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -131,7 +131,7 @@ def require_minimum_pandas_version(): """ Raise ImportError if minimum version of Pandas is not installed """ # TODO(HyukjinKwon): Relocate and deduplicate the version specification. -minimum_pandas_version = "0.19.2" +minimum_pandas_version = "0.23.2" from distutils.version import LooseVersion try: diff --git a/python/setup.py b/python/setup.py index e769bf5..ee5c326 100644 --- a/python/setup.py +++ b/python/setup.py @@ -105,7 +105,7 @@ if (in_spark): # If yo
[spark] branch master updated (67042e9 -> 1d36b89)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 67042e9 [MINOR][BUILD] Exclude pyspark-coverage-site/ dir from RAT add 1d36b89 [SPARK-7721][INFRA][FOLLOW-UP] Remove cloned coverage repo after posting HTMLs No new revisions were added by this update. Summary of changes: dev/run-tests.py | 2 ++ 1 file changed, 2 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (1d36b89 -> ac61f7d)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 1d36b89 [SPARK-7721][INFRA][FOLLOW-UP] Remove cloned coverage repo after posting HTMLs add ac61f7d [SPARK-27893][SQL][PYTHON][FOLLOW-UP] Allow Scalar Pandas and Python UDFs can be tested with Scala test base No new revisions were added by this update. Summary of changes: .../apache/spark/sql/IntegratedUDFTestUtils.scala | 41 -- .../org/apache/spark/sql/SQLQueryTestSuite.scala | 19 +++--- 2 files changed, 35 insertions(+), 25 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (ac61f7d -> 731a60c)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from ac61f7d [SPARK-27893][SQL][PYTHON][FOLLOW-UP] Allow Scalar Pandas and Python UDFs can be tested with Scala test base add 731a60c [SPARK-27823][CORE][DOC][FOLLOWUP] Update doc of config `spark.driver.resourcesFile` No new revisions were added by this update. Summary of changes: core/src/main/scala/org/apache/spark/internal/config/package.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [MINOR][PYSPARK][SQL][DOC] Fix rowsBetween doc in Window
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c0297de [MINOR][PYSPARK][SQL][DOC] Fix rowsBetween doc in Window c0297de is described below commit c0297dedd829a92cca920ab8983dab399f8f32d5 Author: Liang-Chi Hsieh AuthorDate: Fri Jun 14 09:56:37 2019 +0900 [MINOR][PYSPARK][SQL][DOC] Fix rowsBetween doc in Window ## What changes were proposed in this pull request? I suspect that the doc of `rowsBetween` methods in Scala and PySpark looks wrong. Because: ```scala scala> val df = Seq((1, "a"), (2, "a"), (3, "a"), (4, "a"), (5, "a"), (6, "a")).toDF("id", "category") df: org.apache.spark.sql.DataFrame = [id: int, category: string] scala> val byCategoryOrderedById = Window.partitionBy('category).orderBy('id).rowsBetween(-1, 2) byCategoryOrderedById: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec7f04de97 scala> df.withColumn("sum", sum('id) over byCategoryOrderedById).show() +---++---+ | id|category|sum| +---++---+ | 1| a| 6| # sum from index 0 to (0 + 2): 1 + 2 + 3 = 6 | 2| a| 10| # sum from index (1 - 1) to (1 + 2): 1 + 2 + 3 + 4 = 10 | 3| a| 14| | 4| a| 18| | 5| a| 15| | 6| a| 11| +---++---+ ``` So the frame (-1, 2) for row with index 5, as described in the doc, should range from index 4 to index 7. ## How was this patch tested? N/A, just doc change. Closes #24864 from viirya/window-spec-doc. Authored-by: Liang-Chi Hsieh Signed-off-by: HyukjinKwon --- python/pyspark/sql/window.py | 2 +- sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py index 65c3ff5..9e02758a 100644 --- a/python/pyspark/sql/window.py +++ b/python/pyspark/sql/window.py @@ -101,7 +101,7 @@ class Window(object): An offset indicates the number of rows above or below the current row, the frame for the current row starts or ends. For instance, given a row based sliding frame with a lower bound offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from -index 4 to index 6. +index 4 to index 7. >>> from pyspark.sql import Window >>> from pyspark.sql import functions as func diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala index 9a4ad44..cd1c198 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala @@ -129,7 +129,7 @@ object Window { * An offset indicates the number of rows above or below the current row, the frame for the * current row starts or ends. For instance, given a row based sliding frame with a lower bound * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from - * index 4 to index 6. + * index 4 to index 7. * * {{{ * import org.apache.spark.sql.expressions.Window - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-23263][TEST] CTAS should update stat if autoUpdate statistics is enabled
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4968f87 [SPARK-23263][TEST] CTAS should update stat if autoUpdate statistics is enabled 4968f87 is described below commit 4968f871685c23b83690a9a1490ed886a3417d93 Author: Yuming Wang AuthorDate: Thu Jun 20 14:19:10 2019 +0900 [SPARK-23263][TEST] CTAS should update stat if autoUpdate statistics is enabled ## What changes were proposed in this pull request? The [SPARK-27403](https://issues.apache.org/jira/browse/SPARK-27403) fixed CTAS cannot update statistics even if `spark.sql.statistics.size.autoUpdate.enabled` is enabled, as mentioned in [SPARK-23263](https://issues.apache.org/jira/browse/SPARK-23263). This pr adds tests for that fix. ## How was this patch tested? N/A Closes #20430 from wangyum/SPARK-23263. Authored-by: Yuming Wang Signed-off-by: HyukjinKwon --- .../apache/spark/sql/hive/StatisticsSuite.scala| 22 ++ 1 file changed, 22 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 483bd37..7a8e257 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -1431,4 +1431,26 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto assert(catalogStats.rowCount.isEmpty) } } + + test(s"CTAS should update statistics if ${SQLConf.AUTO_SIZE_UPDATE_ENABLED.key} is enabled") { +val tableName = "SPARK_23263" +Seq(false, true).foreach { isConverted => + Seq(false, true).foreach { updateEnabled => +withSQLConf( + SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> updateEnabled.toString, + HiveUtils.CONVERT_METASTORE_PARQUET.key -> isConverted.toString) { + withTable(tableName) { +sql(s"CREATE TABLE $tableName STORED AS parquet AS SELECT 'a', 'b'") +val catalogTable = getCatalogTable(tableName) +// Hive serde tables always update statistics by Hive metastore +if (!isConverted || updateEnabled) { + assert(catalogTable.stats.nonEmpty) +} else { + assert(catalogTable.stats.isEmpty) +} + } +} + } +} + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28012][SQL] Hive UDF supports struct type foldable expression
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d9697fe [SPARK-28012][SQL] Hive UDF supports struct type foldable expression d9697fe is described below commit d9697fedf5a2fa56e25849b0715d48ac8e5345f5 Author: sychen AuthorDate: Thu Jun 20 14:36:01 2019 +0900 [SPARK-28012][SQL] Hive UDF supports struct type foldable expression ## What changes were proposed in this pull request? Currently using hive udf, the parameter is struct type, there will be an exception thrown. No handler for Hive UDF 'xxxUDF': java.lang.RuntimeException: Hive doesn't support the constant type [StructType(StructField(name,StringType,true), StructField(value,DecimalType(3,1),true))] ## How was this patch tested? added new UT Closes #24846 from cxzl25/hive_udf_literal_struct_type. Authored-by: sychen Signed-off-by: HyukjinKwon --- .../main/scala/org/apache/spark/sql/hive/HiveInspectors.scala | 2 ++ .../org/apache/spark/sql/hive/execution/HiveUDFSuite.scala| 11 +++ 2 files changed, 13 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 178fced..33b5bce 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -847,6 +847,8 @@ private[hive] trait HiveInspectors { ObjectInspectorFactory.getStandardConstantMapObjectInspector(keyOI, valueOI, jmap) } +case Literal(_, dt: StructType) => + toInspector(dt) // We will enumerate all of the possible constant expressions, throw exception if we missed case Literal(_, dt) => sys.error(s"Hive doesn't support the constant type [$dt].") // ideally, we don't test the foldable here(but in optimizer), however, some of the diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 446267d..587eab4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -652,6 +652,17 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } } } + test("SPARK-28012 Hive UDF supports struct type foldable expression") { +withUserDefinedFunction("testUDFStructType" -> false) { + // Simulate a hive udf that supports struct parameters + sql("CREATE FUNCTION testUDFStructType AS '" + +s"${classOf[GenericUDFArray].getName}'") + checkAnswer( +sql("SELECT testUDFStructType(named_struct('name', 'xx', 'value', 1))[0].value"), +Seq(Row(1))) +} + } + } class TestPair(x: Int, y: Int) extends Writable with Serializable { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28128][PYTHON][SQL] Pandas Grouped UDFs skip empty partitions
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5ad1053 [SPARK-28128][PYTHON][SQL] Pandas Grouped UDFs skip empty partitions 5ad1053 is described below commit 5ad1053f3e8b7acab58e07e7548e7f14e192e5b4 Author: Bryan Cutler AuthorDate: Sat Jun 22 11:20:35 2019 +0900 [SPARK-28128][PYTHON][SQL] Pandas Grouped UDFs skip empty partitions ## What changes were proposed in this pull request? When running FlatMapGroupsInPandasExec or AggregateInPandasExec the shuffle uses a default number of partitions of 200 in "spark.sql.shuffle.partitions". If the data is small, e.g. in testing, many of the partitions will be empty but are treated just the same. This PR checks the `mapPartitionsInternal` iterator to be non-empty before calling `ArrowPythonRunner` to start computation on the iterator. ## How was this patch tested? Existing tests. Ran the following benchmarks a simple example where most partitions are empty: ```python from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.types import * df = spark.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) def normalize(pdf): v = pdf.v return pdf.assign(v=(v - v.mean()) / v.std()) df.groupby("id").apply(normalize).count() ``` **Before** ``` In [4]: %timeit df.groupby("id").apply(normalize).count() 1.58 s ± 62.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) In [5]: %timeit df.groupby("id").apply(normalize).count() 1.52 s ± 29.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) In [6]: %timeit df.groupby("id").apply(normalize).count() 1.52 s ± 37.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) ``` **After this Change** ``` In [2]: %timeit df.groupby("id").apply(normalize).count() 646 ms ± 89.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) In [3]: %timeit df.groupby("id").apply(normalize).count() 408 ms ± 84.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) In [4]: %timeit df.groupby("id").apply(normalize).count() 381 ms ± 29.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) ``` Closes #24926 from BryanCutler/pyspark-pandas_udf-map-agg-skip-empty-parts-SPARK-28128. Authored-by: Bryan Cutler Signed-off-by: HyukjinKwon --- python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py | 13 + python/pyspark/sql/tests/test_pandas_udf_grouped_map.py | 12 .../spark/sql/execution/python/AggregateInPandasExec.scala | 5 +++-- .../sql/execution/python/FlatMapGroupsInPandasExec.scala| 5 +++-- 4 files changed, 31 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py b/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py index 9eda1aa..f5fd725 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py +++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py @@ -18,6 +18,7 @@ import unittest from pyspark.rdd import PythonEvalType +from pyspark.sql import Row from pyspark.sql.functions import array, explode, col, lit, mean, sum, \ udf, pandas_udf, PandasUDFType from pyspark.sql.types import * @@ -461,6 +462,18 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase): expected = [1, 5] self.assertEqual(actual, expected) +def test_grouped_with_empty_partition(self): +data = [Row(id=1, x=2), Row(id=1, x=3), Row(id=2, x=4)] +expected = [Row(id=1, sum=5), Row(id=2, x=4)] +num_parts = len(data) + 1 +df = self.spark.createDataFrame(self.sc.parallelize(data, numSlices=num_parts)) + +f = pandas_udf(lambda x: x.sum(), + 'int', PandasUDFType.GROUPED_AGG) + +result = df.groupBy('id').agg(f(df['x']).alias('sum')).collect() +self.assertEqual(result, expected) + if __name__ == "__main__": from pyspark.sql.tests.test_pandas_udf_grouped_agg import * diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py index 1d87c63..32d6720 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py +++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py @@ -504,6 +504,18 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase): self.assertEquals(result.collect()[0]['sum'], 165) +def test_grouped_with_empty_partition(self): +data = [Row(id=1, x=2), Row(id=1, x=3),
[spark] branch master updated (a00774a -> 7c05f61)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from a00774a [SPARK-28054][SQL] Fix error when insert Hive partitioned table dynamically where partition name is upper case add 7c05f61 [SPARK-28130][PYTHON] Print pretty messages for skipped tests when xmlrunner is available in PySpark No new revisions were added by this update. Summary of changes: python/pyspark/ml/tests/test_algorithms.py | 2 +- python/pyspark/ml/tests/test_base.py| 2 +- python/pyspark/ml/tests/test_evaluation.py | 2 +- python/pyspark/ml/tests/test_feature.py | 2 +- python/pyspark/ml/tests/test_image.py | 2 +- python/pyspark/ml/tests/test_linalg.py | 2 +- python/pyspark/ml/tests/test_param.py | 2 +- python/pyspark/ml/tests/test_persistence.py | 2 +- python/pyspark/ml/tests/test_pipeline.py| 2 +- python/pyspark/ml/tests/test_stat.py| 2 +- python/pyspark/ml/tests/test_training_summary.py| 2 +- python/pyspark/ml/tests/test_tuning.py | 2 +- python/pyspark/ml/tests/test_wrapper.py | 2 +- python/pyspark/mllib/tests/test_algorithms.py | 2 +- python/pyspark/mllib/tests/test_feature.py | 2 +- python/pyspark/mllib/tests/test_linalg.py | 2 +- python/pyspark/mllib/tests/test_stat.py | 2 +- python/pyspark/mllib/tests/test_streaming_algorithms.py | 2 +- python/pyspark/mllib/tests/test_util.py | 2 +- python/pyspark/sql/tests/test_appsubmit.py | 2 +- python/pyspark/sql/tests/test_arrow.py | 2 +- python/pyspark/sql/tests/test_catalog.py| 2 +- python/pyspark/sql/tests/test_column.py | 2 +- python/pyspark/sql/tests/test_conf.py | 2 +- python/pyspark/sql/tests/test_context.py| 2 +- python/pyspark/sql/tests/test_dataframe.py | 2 +- python/pyspark/sql/tests/test_datasources.py| 2 +- python/pyspark/sql/tests/test_functions.py | 2 +- python/pyspark/sql/tests/test_group.py | 2 +- python/pyspark/sql/tests/test_pandas_udf.py | 2 +- python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py | 2 +- python/pyspark/sql/tests/test_pandas_udf_grouped_map.py | 2 +- python/pyspark/sql/tests/test_pandas_udf_scalar.py | 2 +- python/pyspark/sql/tests/test_pandas_udf_window.py | 2 +- python/pyspark/sql/tests/test_readwriter.py | 2 +- python/pyspark/sql/tests/test_serde.py | 2 +- python/pyspark/sql/tests/test_session.py| 2 +- python/pyspark/sql/tests/test_streaming.py | 2 +- python/pyspark/sql/tests/test_types.py | 2 +- python/pyspark/sql/tests/test_udf.py| 2 +- python/pyspark/sql/tests/test_utils.py | 2 +- python/pyspark/streaming/tests/test_context.py | 5 +++-- python/pyspark/streaming/tests/test_dstream.py | 5 +++-- python/pyspark/streaming/tests/test_kinesis.py | 5 +++-- python/pyspark/streaming/tests/test_listener.py | 5 +++-- python/pyspark/tests/test_appsubmit.py | 2 +- python/pyspark/tests/test_broadcast.py | 2 +- python/pyspark/tests/test_conf.py | 2 +- python/pyspark/tests/test_context.py| 2 +- python/pyspark/tests/test_daemon.py | 2 +- python/pyspark/tests/test_join.py | 2 +- python/pyspark/tests/test_profiler.py | 2 +- python/pyspark/tests/test_rdd.py| 2 +- python/pyspark/tests/test_readwrite.py | 2 +- python/pyspark/tests/test_serializers.py| 2 +- python/pyspark/tests/test_shuffle.py| 2 +- python/pyspark/tests/test_taskcontext.py| 2 +- python/pyspark/tests/test_util.py | 2 +- python/pyspark/tests/test_worker.py | 2 +- python/run-tests.py | 2 +- 60 files changed, 68 insertions(+), 64 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (1a915bf -> a00774a)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 1a915bf [MINOR][SQL][DOCS] failOnDataLoss has effect on batch queries so fix the doc add a00774a [SPARK-28054][SQL] Fix error when insert Hive partitioned table dynamically where partition name is upper case No new revisions were added by this update. Summary of changes: .../spark/sql/hive/execution/SaveAsHiveFile.scala | 12 +++- .../spark/sql/hive/execution/HiveQuerySuite.scala | 18 ++ 2 files changed, 29 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b5e183c -> 85e95b7)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b5e183c [SPARK-28108][SQL][test-hadoop3.2] Simplify OrcFilters add 85e95b7 [SPARK-28142][SS] Use CaseInsensitiveStringMap for KafkaContinuousStream No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/kafka010/KafkaContinuousStream.scala | 7 --- .../scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28031][PYSPARK][TEST] Improve doctest on over function of Column
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ddf4a50 [SPARK-28031][PYSPARK][TEST] Improve doctest on over function of Column ddf4a50 is described below commit ddf4a5031287c0c26ea462dd89ea99d769473213 Author: Liang-Chi Hsieh AuthorDate: Thu Jun 13 11:04:41 2019 +0900 [SPARK-28031][PYSPARK][TEST] Improve doctest on over function of Column ## What changes were proposed in this pull request? Just found the doctest on `over` function of `Column` is commented out. The window spec is also not for the window function used there. We should either remove the doctest, or improve it. Because other functions of `Column` have doctest generally, so this PR tries to improve it. ## How was this patch tested? Added doctest. Closes #24854 from viirya/column-test-minor. Authored-by: Liang-Chi Hsieh Signed-off-by: HyukjinKwon --- python/pyspark/sql/column.py | 12 ++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index e7dec11..7f12d23 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -668,9 +668,17 @@ class Column(object): :return: a Column >>> from pyspark.sql import Window ->>> window = Window.partitionBy("name").orderBy("age").rowsBetween(-1, 1) +>>> window = Window.partitionBy("name").orderBy("age") \ +.rowsBetween(Window.unboundedPreceding, Window.currentRow) >>> from pyspark.sql.functions import rank, min ->>> # df.select(rank().over(window), min('age').over(window)) +>>> df.withColumn("rank", rank().over(window)) \ +.withColumn("min", min('age').over(window)).show() ++---+-++---+ +|age| name|rank|min| ++---+-++---+ +| 5| Bob| 1| 5| +| 2|Alice| 1| 2| ++---+-++---+ """ from pyspark.sql.window import WindowSpec if not isinstance(window, WindowSpec): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-28031][PYSPARK][TEST] Improve doctest on over function of Column
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 29a39e8 [SPARK-28031][PYSPARK][TEST] Improve doctest on over function of Column 29a39e8 is described below commit 29a39e8e58d99762594f3cf6854810cfb529a251 Author: Liang-Chi Hsieh AuthorDate: Thu Jun 13 11:04:41 2019 +0900 [SPARK-28031][PYSPARK][TEST] Improve doctest on over function of Column ## What changes were proposed in this pull request? Just found the doctest on `over` function of `Column` is commented out. The window spec is also not for the window function used there. We should either remove the doctest, or improve it. Because other functions of `Column` have doctest generally, so this PR tries to improve it. ## How was this patch tested? Added doctest. Closes #24854 from viirya/column-test-minor. Authored-by: Liang-Chi Hsieh Signed-off-by: HyukjinKwon (cherry picked from commit ddf4a5031287c0c26ea462dd89ea99d769473213) Signed-off-by: HyukjinKwon --- python/pyspark/sql/column.py | 12 ++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index e7dec11..7f12d23 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -668,9 +668,17 @@ class Column(object): :return: a Column >>> from pyspark.sql import Window ->>> window = Window.partitionBy("name").orderBy("age").rowsBetween(-1, 1) +>>> window = Window.partitionBy("name").orderBy("age") \ +.rowsBetween(Window.unboundedPreceding, Window.currentRow) >>> from pyspark.sql.functions import rank, min ->>> # df.select(rank().over(window), min('age').over(window)) +>>> df.withColumn("rank", rank().over(window)) \ +.withColumn("min", min('age').over(window)).show() ++---+-++---+ +|age| name|rank|min| ++---+-++---+ +| 5| Bob| 1| 5| +| 2|Alice| 1| 2| ++---+-++---+ """ from pyspark.sql.window import WindowSpec if not isinstance(window, WindowSpec): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (2bc42ad -> 126310c)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 2bc42ad [MINOR][REPL] Remove dead code of Spark Repl in Scala 2.11 add 126310c [SPARK-26601][SQL] Make broadcast-exchange thread pool configurable No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/sql/internal/StaticSQLConf.scala | 13 + .../sql/execution/exchange/BroadcastExchangeExec.scala | 5 +++-- 2 files changed, 16 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [MINOR][SS] Remove duplicate 'add' in comment of `StructuredSessionization`.
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new fbd2eac [MINOR][SS] Remove duplicate 'add' in comment of `StructuredSessionization`. fbd2eac is described below commit fbd2eac142cd429f7e98e44fbd12c663946b1dad Author: gengjiaan AuthorDate: Wed May 15 16:01:43 2019 +0900 [MINOR][SS] Remove duplicate 'add' in comment of `StructuredSessionization`. ## What changes were proposed in this pull request? `StructuredSessionization` comment contains duplicate 'add', I think it should be changed. ## How was this patch tested? Exists UT. Closes #24589 from beliefer/remove-duplicate-add-in-comment. Lead-authored-by: gengjiaan Co-authored-by: Jiaan Geng Signed-off-by: HyukjinKwon (cherry picked from commit 7dd2dd5dc5d4210bef88b75b7f5b06266dc4ce1c) Signed-off-by: HyukjinKwon --- .../apache/spark/examples/sql/streaming/StructuredSessionization.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala index ed63fb6..29dbb0d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala @@ -70,7 +70,7 @@ object StructuredSessionization { line.split(" ").map(word => Event(sessionId = word, timestamp)) } -// Sessionize the events. Track number of events, start and end timestamps of session, and +// Sessionize the events. Track number of events, start and end timestamps of session, // and report session updates. val sessionUpdates = events .groupByKey(event => event.sessionId) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.3 updated: [MINOR][SS] Remove duplicate 'add' in comment of `StructuredSessionization`.
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.3 by this push: new 306ebb2 [MINOR][SS] Remove duplicate 'add' in comment of `StructuredSessionization`. 306ebb2 is described below commit 306ebb2975b0346a557a0e0f23e6f897d3912eef Author: gengjiaan AuthorDate: Wed May 15 16:01:43 2019 +0900 [MINOR][SS] Remove duplicate 'add' in comment of `StructuredSessionization`. ## What changes were proposed in this pull request? `StructuredSessionization` comment contains duplicate 'add', I think it should be changed. ## How was this patch tested? Exists UT. Closes #24589 from beliefer/remove-duplicate-add-in-comment. Lead-authored-by: gengjiaan Co-authored-by: Jiaan Geng Signed-off-by: HyukjinKwon (cherry picked from commit 7dd2dd5dc5d4210bef88b75b7f5b06266dc4ce1c) Signed-off-by: HyukjinKwon --- .../apache/spark/examples/sql/streaming/StructuredSessionization.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala index ed63fb6..29dbb0d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala @@ -70,7 +70,7 @@ object StructuredSessionization { line.split(" ").map(word => Event(sessionId = word, timestamp)) } -// Sessionize the events. Track number of events, start and end timestamps of session, and +// Sessionize the events. Track number of events, start and end timestamps of session, // and report session updates. val sessionUpdates = events .groupByKey(event => event.sessionId) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (2da5b21 -> 7dd2dd5)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 2da5b21 [SPARK-24923][SQL] Implement v2 CreateTableAsSelect add 7dd2dd5 [MINOR][SS] Remove duplicate 'add' in comment of `StructuredSessionization`. No new revisions were added by this update. Summary of changes: .../apache/spark/examples/sql/streaming/StructuredSessionization.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27778][PYTHON] Fix toPandas conversion of empty DataFrame with Arrow enabled
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 034cb13 [SPARK-27778][PYTHON] Fix toPandas conversion of empty DataFrame with Arrow enabled 034cb13 is described below commit 034cb139a1eadb455cb6909a3ac3e73a509d324e Author: David Vogelbacher AuthorDate: Wed May 22 13:21:26 2019 +0900 [SPARK-27778][PYTHON] Fix toPandas conversion of empty DataFrame with Arrow enabled ## What changes were proposed in this pull request? https://github.com/apache/spark/pull/22275 introduced a performance improvement where we send partitions out of order to python and then, as a last step, send the partition order as well. However, if there are no partitions we will never send the partition order and we will get an "EofError" on the python side. This PR fixes this by also sending the partition order if there are no partitions present. ## How was this patch tested? New unit test added. Closes #24650 from dvogelbacher/dv/fixNoPartitionArrowConversion. Authored-by: David Vogelbacher Signed-off-by: HyukjinKwon --- python/pyspark/sql/tests/test_arrow.py | 8 + .../main/scala/org/apache/spark/sql/Dataset.scala | 34 +- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index 22578cb..f5b5ad9 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -183,6 +183,14 @@ class ArrowTests(ReusedSQLTestCase): self.assertEqual(pdf.columns[0], "i") self.assertTrue(pdf.empty) +def test_no_partition_frame(self): +schema = StructType([StructField("field1", StringType(), True)]) +df = self.spark.createDataFrame(self.sc.emptyRDD(), schema) +pdf = df.toPandas() +self.assertEqual(len(pdf.columns), 1) +self.assertEqual(pdf.columns[0], "field1") +self.assertTrue(pdf.empty) + def _createDataFrame_toggle(self, pdf, schema=None): with self.sql_conf({"spark.sql.execution.arrow.enabled": False}): df_no_arrow = self.spark.createDataFrame(pdf, schema=schema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 05436ca..d5f1edb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3299,15 +3299,12 @@ class Dataset[T] private[sql]( PythonRDD.serveToStream("serve-Arrow") { outputStream => val out = new DataOutputStream(outputStream) val batchWriter = new ArrowBatchStreamWriter(schema, out, timeZoneId) -val arrowBatchRdd = toArrowBatchRdd(plan) -val numPartitions = arrowBatchRdd.partitions.length // Batches ordered by (index of partition, batch index in that partition) tuple val batchOrder = ArrayBuffer.empty[(Int, Int)] -var partitionCount = 0 // Handler to eagerly write batches to Python as they arrive, un-ordered -def handlePartitionBatches(index: Int, arrowBatches: Array[Array[Byte]]): Unit = { +val handlePartitionBatches = (index: Int, arrowBatches: Array[Array[Byte]]) => if (arrowBatches.nonEmpty) { // Write all batches (can be more than 1) in the partition, store the batch order tuple batchWriter.writeBatches(arrowBatches.iterator) @@ -3315,27 +3312,22 @@ class Dataset[T] private[sql]( partitionBatchIndex => batchOrder.append((index, partitionBatchIndex)) } } - partitionCount += 1 - - // After last batch, end the stream and write batch order indices - if (partitionCount == numPartitions) { -batchWriter.end() -out.writeInt(batchOrder.length) -// Sort by (index of partition, batch index in that partition) tuple to get the -// overall_batch_index from 0 to N-1 batches, which can be used to put the -// transferred batches in the correct order -batchOrder.zipWithIndex.sortBy(_._1).foreach { case (_, overallBatchIndex) => - out.writeInt(overallBatchIndex) -} -out.flush() - } -} +val arrowBatchRdd = toArrowBatchRdd(plan) sparkSession.sparkContext.runJob( arrowBatchRdd, - (ctx: TaskContext, it: Iterator[Array[Byte]]) => it.toArray, - 0 until numPartitions, + (it: Iterator[Array[Byte]]) => it.toArray, handlePartitionBatches) + +// After processing all part
[spark] branch master updated: [SPARK-27774][CORE][MLLIB] Avoid hardcoded configs
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e7443d6 [SPARK-27774][CORE][MLLIB] Avoid hardcoded configs e7443d6 is described below commit e7443d6412582aa16769e298c31d889a5ba0143c Author: wenxuanguan AuthorDate: Wed May 22 10:45:11 2019 +0900 [SPARK-27774][CORE][MLLIB] Avoid hardcoded configs ## What changes were proposed in this pull request? avoid hardcoded configs in `SparkConf` and `SparkSubmit` and test ## How was this patch tested? N/A Closes #24631 from wenxuanguan/minor-fix. Authored-by: wenxuanguan Signed-off-by: HyukjinKwon --- .../main/scala/org/apache/spark/SparkConf.scala| 8 .../main/scala/org/apache/spark/api/r/RUtils.scala | 2 +- .../scala/org/apache/spark/deploy/RRunner.scala| 3 ++- .../org/apache/spark/deploy/SparkSubmit.scala | 6 +++--- .../apache/spark/deploy/worker/DriverWrapper.scala | 2 +- .../scala/org/apache/spark/SparkConfSuite.scala| 2 +- .../scala/org/apache/spark/SparkContextSuite.scala | 2 +- .../org/apache/spark/deploy/SparkSubmitSuite.scala | 4 ++-- .../deploy/StandaloneDynamicAllocationSuite.scala | 6 +++--- .../deploy/rest/SubmitRestProtocolSuite.scala | 24 +++--- .../storage/BlockManagerReplicationSuite.scala | 6 +++--- .../apache/spark/storage/BlockManagerSuite.scala | 5 +++-- .../apache/spark/storage/MemoryStoreSuite.scala| 5 +++-- .../org/apache/spark/ml/feature/Word2Vec.scala | 3 ++- .../org/apache/spark/mllib/feature/Word2Vec.scala | 3 ++- .../cluster/mesos/MesosClusterScheduler.scala | 2 +- .../spark/deploy/yarn/ApplicationMaster.scala | 2 +- 17 files changed, 45 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index aa93f42..bd2ef5b 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -595,7 +595,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria // it will almost always cause ExecutorLostFailure. See SPARK-22754. require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The value of " + s"${networkTimeout}=${executorTimeoutThresholdMs}ms must be no less than the value of " + - s"spark.executor.heartbeatInterval=${executorHeartbeatIntervalMs}ms.") + s"${EXECUTOR_HEARTBEAT_INTERVAL.key}=${executorHeartbeatIntervalMs}ms.") } /** @@ -667,12 +667,12 @@ private[spark] object SparkConf extends Logging { translation = s => s"${s.toLong * 10}s")), REDUCER_MAX_SIZE_IN_FLIGHT.key -> Seq( AlternateConfig("spark.reducer.maxMbInFlight", "1.4")), -"spark.kryoserializer.buffer" -> Seq( +KRYO_SERIALIZER_BUFFER_SIZE.key -> Seq( AlternateConfig("spark.kryoserializer.buffer.mb", "1.4", translation = s => s"${(s.toDouble * 1000).toInt}k")), -"spark.kryoserializer.buffer.max" -> Seq( +KRYO_SERIALIZER_MAX_BUFFER_SIZE.key -> Seq( AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")), -"spark.shuffle.file.buffer" -> Seq( +SHUFFLE_FILE_BUFFER_SIZE.key -> Seq( AlternateConfig("spark.shuffle.file.buffer.kb", "1.4")), EXECUTOR_LOGS_ROLLING_MAX_SIZE.key -> Seq( AlternateConfig("spark.executor.logs.rolling.size.maxBytes", "1.4")), diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala index 5a43302..311fade 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala @@ -60,7 +60,7 @@ private[spark] object RUtils { def sparkRPackagePath(isDriver: Boolean): Seq[String] = { val (master, deployMode) = if (isDriver) { -(sys.props("spark.master"), sys.props("spark.submit.deployMode")) +(sys.props("spark.master"), sys.props(SUBMIT_DEPLOY_MODE.key)) } else { val sparkConf = SparkEnv.get.conf (sparkConf.get("spark.master"), sparkConf.get(SUBMIT_DEPLOY_MODE)) diff --git a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala index 6284e6a..60ba047 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path import org.apache.spar
[spark] branch master updated: [SPARK-27737][SQL] Upgrade to Hive 2.3.5 for Hive Metastore Client and Hadoop-3.2 profile
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6cd1efd [SPARK-27737][SQL] Upgrade to Hive 2.3.5 for Hive Metastore Client and Hadoop-3.2 profile 6cd1efd is described below commit 6cd1efd0ae75ca780047f16e2d88759e04a78e31 Author: Yuming Wang AuthorDate: Wed May 22 10:24:17 2019 +0900 [SPARK-27737][SQL] Upgrade to Hive 2.3.5 for Hive Metastore Client and Hadoop-3.2 profile ## What changes were proposed in this pull request? This PR aims to upgrade to Hive 2.3.5 for Hive Metastore Client and Hadoop-3.2 profile. Release Notes - Hive - Version 2.3.5 - [[HIVE-21536](https://issues.apache.org/jira/browse/HIVE-21536)] - Backport HIVE-17764 to branch-2.3 - [[HIVE-21585](https://issues.apache.org/jira/browse/HIVE-21585)] - Upgrade branch-2.3 to ORC 1.3.4 - [[HIVE-21639](https://issues.apache.org/jira/browse/HIVE-21639)] - Spark test failed since HIVE-10632 - [[HIVE-21680](https://issues.apache.org/jira/browse/HIVE-21680)] - Backport HIVE-17644 to branch-2 and branch-2.3 https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12345394=Text=12310843 ## How was this patch tested? This PR is tested in two ways. - Pass the Jenkins with the default configuration for `Hive Metastore Client` testing. - Pass the Jenkins with `test-hadoop3.2` configuration for `Hadoop 3.2` testing. Closes #24620 from wangyum/SPARK-27737. Authored-by: Yuming Wang Signed-off-by: HyukjinKwon --- docs/sql-data-sources-hive-tables.md | 2 +- docs/sql-migration-guide-hive-compatibility.md| 2 +- pom.xml | 4 ++-- .../apache/spark/sql/execution/datasources/orc/OrcColumnVector.java | 0 .../org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala | 0 .../org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala | 0 .../apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala | 0 sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 2 +- .../scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala | 2 +- .../src/main/scala/org/apache/spark/sql/hive/client/package.scala | 2 +- 10 files changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/sql-data-sources-hive-tables.md b/docs/sql-data-sources-hive-tables.md index 0c51c11..3d58e94 100644 --- a/docs/sql-data-sources-hive-tables.md +++ b/docs/sql-data-sources-hive-tables.md @@ -130,7 +130,7 @@ The following options can be used to configure the version of Hive that is used 1.2.1 Version of the Hive metastore. Available - options are 0.12.0 through 2.3.4 and 3.1.0 through 3.1.1. + options are 0.12.0 through 2.3.5 and 3.1.0 through 3.1.1. diff --git a/docs/sql-migration-guide-hive-compatibility.md b/docs/sql-migration-guide-hive-compatibility.md index 5602f53..4a8076d 100644 --- a/docs/sql-migration-guide-hive-compatibility.md +++ b/docs/sql-migration-guide-hive-compatibility.md @@ -25,7 +25,7 @@ license: | Spark SQL is designed to be compatible with the Hive Metastore, SerDes and UDFs. Currently, Hive SerDes and UDFs are based on Hive 1.2.1, and Spark SQL can be connected to different versions of Hive Metastore -(from 0.12.0 to 2.3.4 and 3.1.0 to 3.1.1. Also see [Interacting with Different Versions of Hive Metastore](sql-data-sources-hive-tables.html#interacting-with-different-versions-of-hive-metastore)). +(from 0.12.0 to 2.3.5 and 3.1.0 to 3.1.1. Also see [Interacting with Different Versions of Hive Metastore](sql-data-sources-hive-tables.html#interacting-with-different-versions-of-hive-metastore)). Deploying in Existing Hive Warehouses diff --git a/pom.xml b/pom.xml index 2e88e19..ff891ba 100644 --- a/pom.xml +++ b/pom.xml @@ -128,7 +128,7 @@ 1.2.1.spark2 -2.3.4 +2.3.5 1.2.1 @@ -2833,7 +2833,7 @@ org.apache.hive core ${hive23.version} -2.3.4 +2.3.5 provided diff --git a/sql/core/v2.3.4/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java b/sql/core/v2.3.5/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java similarity index 100% rename from sql/core/v2.3.4/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java rename to sql/core/v2.3.5/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java diff --git a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3.5/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala similarity
[spark] 02/02: Revert "[SPARK-27351][SQL] Wrong outputRows estimation after AggregateEstimation wit…"
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git commit 516b0fb537b4bae3b4d9550ee2989eb66939d5da Author: HyukjinKwon AuthorDate: Fri May 24 03:17:06 2019 +0900 Revert "[SPARK-27351][SQL] Wrong outputRows estimation after AggregateEstimation wit…" This reverts commit 40668c53ed799881db1f316ceaf2f978b294d8ed. --- .../plans/logical/statsEstimation/AggregateEstimation.scala | 12 ++-- .../catalyst/statsEstimation/AggregateEstimationSuite.scala | 12 +--- 2 files changed, 3 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala index 1198d3f..0606d0d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala @@ -39,16 +39,8 @@ object AggregateEstimation { // Multiply distinct counts of group-by columns. This is an upper bound, which assumes // the data contains all combinations of distinct values of group-by columns. var outputRows: BigInt = agg.groupingExpressions.foldLeft(BigInt(1))( -(res, expr) => { - val columnStat = childStats.attributeStats(expr.asInstanceOf[Attribute]) - val distinctCount = columnStat.distinctCount.get - val distinctValue: BigInt = if (distinctCount == 0 && columnStat.nullCount.get > 0) { -1 - } else { -distinctCount - } - res * distinctValue -}) +(res, expr) => res * + childStats.attributeStats(expr.asInstanceOf[Attribute]).distinctCount.get) outputRows = if (agg.groupingExpressions.isEmpty) { // If there's no group-by columns, the output is a single row containing values of aggregate diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala index c247050..dfa6e46 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala @@ -38,9 +38,7 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest { attr("key22") -> ColumnStat(distinctCount = Some(2), min = Some(10), max = Some(20), nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)), attr("key31") -> ColumnStat(distinctCount = Some(0), min = None, max = None, - nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)), -attr("key32") -> ColumnStat(distinctCount = Some(0), min = None, max = None, - nullCount = Some(4), avgLen = Some(4), maxLen = Some(4)) + nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)) )) private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1) @@ -118,14 +116,6 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest { expectedOutputRowCount = 0) } - test("group-by column with only null value") { -checkAggStats( - tableColumns = Seq("key22", "key32"), - tableRowCount = 6, - groupByColumns = Seq("key22", "key32"), - expectedOutputRowCount = nameToColInfo("key22")._2.distinctCount.get) - } - test("non-cbo estimation") { val attributes = Seq("key12").map(nameToAttr) val child = StatsTestPlan( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] 01/02: Revert "[SPARK-27539][SQL] Fix inaccurate aggregate outputRows estimation with column containing null values"
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git commit 855399bbad7706bfd75cc640e3d289392dfd648a Author: HyukjinKwon AuthorDate: Fri May 24 03:16:24 2019 +0900 Revert "[SPARK-27539][SQL] Fix inaccurate aggregate outputRows estimation with column containing null values" This reverts commit 42cb4a2ccdb5ca6216677dc4285c3e74cfb7e707. --- .../plans/logical/statsEstimation/AggregateEstimation.scala | 4 ++-- .../catalyst/statsEstimation/AggregateEstimationSuite.scala | 13 + 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala index ffe071e..1198d3f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala @@ -42,8 +42,8 @@ object AggregateEstimation { (res, expr) => { val columnStat = childStats.attributeStats(expr.asInstanceOf[Attribute]) val distinctCount = columnStat.distinctCount.get - val distinctValue: BigInt = if (columnStat.nullCount.get > 0) { -distinctCount + 1 + val distinctValue: BigInt = if (distinctCount == 0 && columnStat.nullCount.get > 0) { +1 } else { distinctCount } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala index 32bf20b..c247050 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala @@ -40,9 +40,7 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest { attr("key31") -> ColumnStat(distinctCount = Some(0), min = None, max = None, nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)), attr("key32") -> ColumnStat(distinctCount = Some(0), min = None, max = None, - nullCount = Some(4), avgLen = Some(4), maxLen = Some(4)), -attr("key33") -> ColumnStat(distinctCount = Some(2), min = None, max = None, - nullCount = Some(2), avgLen = Some(4), maxLen = Some(4)) + nullCount = Some(4), avgLen = Some(4), maxLen = Some(4)) )) private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1) @@ -128,15 +126,6 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest { expectedOutputRowCount = nameToColInfo("key22")._2.distinctCount.get) } - test("group-by column with null value") { -checkAggStats( - tableColumns = Seq("key21", "key33"), - tableRowCount = 6, - groupByColumns = Seq("key21", "key33"), - expectedOutputRowCount = nameToColInfo("key21")._2.distinctCount.get * -(nameToColInfo("key33")._2.distinctCount.get + 1)) - } - test("non-cbo estimation") { val attributes = Seq("key12").map(nameToAttr) val child = StatsTestPlan( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (a590a93 -> 516b0fb)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from a590a93 [SPARK-27806][SQL] byName/byPosition should apply to struct fields as well new 855399b Revert "[SPARK-27539][SQL] Fix inaccurate aggregate outputRows estimation with column containing null values" new 516b0fb Revert "[SPARK-27351][SQL] Wrong outputRows estimation after AggregateEstimation wit…" The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../statsEstimation/AggregateEstimation.scala | 12 ++- .../statsEstimation/AggregateEstimationSuite.scala | 23 +- 2 files changed, 3 insertions(+), 32 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated (fa7c319 -> e69ad46)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git. from fa7c319 [SPARK-27800][SQL][HOTFIX][FOLLOWUP] Fix wrong answer on BitwiseXor test cases new e0e8a6d Revert "[SPARK-27539][SQL] Fix inaccurate aggregate outputRows estimation with column containing null values" new e69ad46 Revert "[SPARK-27351][SQL] Wrong outputRows estimation after AggregateEstimation wit…" The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../statsEstimation/AggregateEstimation.scala | 12 ++- .../statsEstimation/AggregateEstimationSuite.scala | 23 +- 2 files changed, 3 insertions(+), 32 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] 02/02: Revert "[SPARK-27351][SQL] Wrong outputRows estimation after AggregateEstimation wit…"
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git commit e69ad46c72ed26c8293da95dc19b6f31445c0df5 Author: HyukjinKwon AuthorDate: Fri May 24 03:19:48 2019 +0900 Revert "[SPARK-27351][SQL] Wrong outputRows estimation after AggregateEstimation wit…" This reverts commit 40668c53ed799881db1f316ceaf2f978b294d8ed. --- .../plans/logical/statsEstimation/AggregateEstimation.scala | 12 ++-- .../catalyst/statsEstimation/AggregateEstimationSuite.scala | 12 +--- 2 files changed, 3 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala index 7ef22fa..111c594 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala @@ -39,16 +39,8 @@ object AggregateEstimation { // Multiply distinct counts of group-by columns. This is an upper bound, which assumes // the data contains all combinations of distinct values of group-by columns. var outputRows: BigInt = agg.groupingExpressions.foldLeft(BigInt(1))( -(res, expr) => { - val columnStat = childStats.attributeStats(expr.asInstanceOf[Attribute]) - val distinctCount = columnStat.distinctCount.get - val distinctValue: BigInt = if (distinctCount == 0 && columnStat.nullCount.get > 0) { -1 - } else { -distinctCount - } - res * distinctValue -}) +(res, expr) => res * + childStats.attributeStats(expr.asInstanceOf[Attribute]).distinctCount.get) outputRows = if (agg.groupingExpressions.isEmpty) { // If there's no group-by columns, the output is a single row containing values of aggregate diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala index 6bdf8cd..8213d56 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala @@ -38,9 +38,7 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest { attr("key22") -> ColumnStat(distinctCount = Some(2), min = Some(10), max = Some(20), nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)), attr("key31") -> ColumnStat(distinctCount = Some(0), min = None, max = None, - nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)), -attr("key32") -> ColumnStat(distinctCount = Some(0), min = None, max = None, - nullCount = Some(4), avgLen = Some(4), maxLen = Some(4)) + nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)) )) private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1) @@ -94,14 +92,6 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest { expectedOutputRowCount = 0) } - test("group-by column with only null value") { -checkAggStats( - tableColumns = Seq("key22", "key32"), - tableRowCount = 6, - groupByColumns = Seq("key22", "key32"), - expectedOutputRowCount = nameToColInfo("key22")._2.distinctCount.get) - } - test("non-cbo estimation") { val attributes = Seq("key12").map(nameToAttr) val child = StatsTestPlan( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] 01/02: Revert "[SPARK-27539][SQL] Fix inaccurate aggregate outputRows estimation with column containing null values"
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git commit e0e8a6de1345e6e716bb8c6e35a98e981feb3bab Author: HyukjinKwon AuthorDate: Fri May 24 03:19:40 2019 +0900 Revert "[SPARK-27539][SQL] Fix inaccurate aggregate outputRows estimation with column containing null values" This reverts commit 42cb4a2ccdb5ca6216677dc4285c3e74cfb7e707. --- .../plans/logical/statsEstimation/AggregateEstimation.scala | 4 ++-- .../catalyst/statsEstimation/AggregateEstimationSuite.scala | 13 + 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala index b9e72c2..7ef22fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala @@ -42,8 +42,8 @@ object AggregateEstimation { (res, expr) => { val columnStat = childStats.attributeStats(expr.asInstanceOf[Attribute]) val distinctCount = columnStat.distinctCount.get - val distinctValue: BigInt = if (columnStat.nullCount.get > 0) { -distinctCount + 1 + val distinctValue: BigInt = if (distinctCount == 0 && columnStat.nullCount.get > 0) { +1 } else { distinctCount } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala index d89b9df..6bdf8cd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala @@ -40,9 +40,7 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest { attr("key31") -> ColumnStat(distinctCount = Some(0), min = None, max = None, nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)), attr("key32") -> ColumnStat(distinctCount = Some(0), min = None, max = None, - nullCount = Some(4), avgLen = Some(4), maxLen = Some(4)), -attr("key33") -> ColumnStat(distinctCount = Some(2), min = None, max = None, - nullCount = Some(2), avgLen = Some(4), maxLen = Some(4)) + nullCount = Some(4), avgLen = Some(4), maxLen = Some(4)) )) private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1) @@ -104,15 +102,6 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest { expectedOutputRowCount = nameToColInfo("key22")._2.distinctCount.get) } - test("group-by column with null value") { -checkAggStats( - tableColumns = Seq("key21", "key33"), - tableRowCount = 6, - groupByColumns = Seq("key21", "key33"), - expectedOutputRowCount = nameToColInfo("key21")._2.distinctCount.get * -(nameToColInfo("key33")._2.distinctCount.get + 1)) - } - test("non-cbo estimation") { val attributes = Seq("key12").map(nameToAttr) val child = StatsTestPlan( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] 01/02: Revert "Revert "[SPARK-27351][SQL] Wrong outputRows estimation after AggregateEstimation wit…""
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git commit 1ba4011a7f91616e88df1d141a7057500309e14c Author: HyukjinKwon AuthorDate: Fri May 24 05:36:08 2019 +0900 Revert "Revert "[SPARK-27351][SQL] Wrong outputRows estimation after AggregateEstimation wit…"" This reverts commit 516b0fb537b4bae3b4d9550ee2989eb66939d5da. --- .../plans/logical/statsEstimation/AggregateEstimation.scala | 12 ++-- .../catalyst/statsEstimation/AggregateEstimationSuite.scala | 12 +++- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala index 0606d0d..1198d3f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala @@ -39,8 +39,16 @@ object AggregateEstimation { // Multiply distinct counts of group-by columns. This is an upper bound, which assumes // the data contains all combinations of distinct values of group-by columns. var outputRows: BigInt = agg.groupingExpressions.foldLeft(BigInt(1))( -(res, expr) => res * - childStats.attributeStats(expr.asInstanceOf[Attribute]).distinctCount.get) +(res, expr) => { + val columnStat = childStats.attributeStats(expr.asInstanceOf[Attribute]) + val distinctCount = columnStat.distinctCount.get + val distinctValue: BigInt = if (distinctCount == 0 && columnStat.nullCount.get > 0) { +1 + } else { +distinctCount + } + res * distinctValue +}) outputRows = if (agg.groupingExpressions.isEmpty) { // If there's no group-by columns, the output is a single row containing values of aggregate diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala index dfa6e46..c247050 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala @@ -38,7 +38,9 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest { attr("key22") -> ColumnStat(distinctCount = Some(2), min = Some(10), max = Some(20), nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)), attr("key31") -> ColumnStat(distinctCount = Some(0), min = None, max = None, - nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)) + nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)), +attr("key32") -> ColumnStat(distinctCount = Some(0), min = None, max = None, + nullCount = Some(4), avgLen = Some(4), maxLen = Some(4)) )) private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1) @@ -116,6 +118,14 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest { expectedOutputRowCount = 0) } + test("group-by column with only null value") { +checkAggStats( + tableColumns = Seq("key22", "key32"), + tableRowCount = 6, + groupByColumns = Seq("key22", "key32"), + expectedOutputRowCount = nameToColInfo("key22")._2.distinctCount.get) + } + test("non-cbo estimation") { val attributes = Seq("key12").map(nameToAttr) val child = StatsTestPlan( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] 02/02: Revert "Revert "[SPARK-27539][SQL] Fix inaccurate aggregate outputRows estimation with column containing null values""
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git commit c1e555711b359065876a82d5f4fc3e229716a6d4 Author: HyukjinKwon AuthorDate: Fri May 24 05:36:17 2019 +0900 Revert "Revert "[SPARK-27539][SQL] Fix inaccurate aggregate outputRows estimation with column containing null values"" This reverts commit 855399bbad7706bfd75cc640e3d289392dfd648a. --- .../plans/logical/statsEstimation/AggregateEstimation.scala | 4 ++-- .../catalyst/statsEstimation/AggregateEstimationSuite.scala | 13 - 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala index 1198d3f..ffe071e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala @@ -42,8 +42,8 @@ object AggregateEstimation { (res, expr) => { val columnStat = childStats.attributeStats(expr.asInstanceOf[Attribute]) val distinctCount = columnStat.distinctCount.get - val distinctValue: BigInt = if (distinctCount == 0 && columnStat.nullCount.get > 0) { -1 + val distinctValue: BigInt = if (columnStat.nullCount.get > 0) { +distinctCount + 1 } else { distinctCount } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala index c247050..32bf20b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala @@ -40,7 +40,9 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest { attr("key31") -> ColumnStat(distinctCount = Some(0), min = None, max = None, nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)), attr("key32") -> ColumnStat(distinctCount = Some(0), min = None, max = None, - nullCount = Some(4), avgLen = Some(4), maxLen = Some(4)) + nullCount = Some(4), avgLen = Some(4), maxLen = Some(4)), +attr("key33") -> ColumnStat(distinctCount = Some(2), min = None, max = None, + nullCount = Some(2), avgLen = Some(4), maxLen = Some(4)) )) private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1) @@ -126,6 +128,15 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest { expectedOutputRowCount = nameToColInfo("key22")._2.distinctCount.get) } + test("group-by column with null value") { +checkAggStats( + tableColumns = Seq("key21", "key33"), + tableRowCount = 6, + groupByColumns = Seq("key21", "key33"), + expectedOutputRowCount = nameToColInfo("key21")._2.distinctCount.get * +(nameToColInfo("key33")._2.distinctCount.get + 1)) + } + test("non-cbo estimation") { val attributes = Seq("key12").map(nameToAttr) val child = StatsTestPlan( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (32f310b -> c1e5557)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 32f310b [SPARK-26045][BUILD] Leave avro, avro-ipc dependendencies as compile scope even for hadoop-provided usages new 1ba4011 Revert "Revert "[SPARK-27351][SQL] Wrong outputRows estimation after AggregateEstimation wit…"" new c1e5557 Revert "Revert "[SPARK-27539][SQL] Fix inaccurate aggregate outputRows estimation with column containing null values"" The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../statsEstimation/AggregateEstimation.scala | 12 +-- .../statsEstimation/AggregateEstimationSuite.scala | 23 +- 2 files changed, 32 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] 02/02: Revert "Revert "[SPARK-27539][SQL] Fix inaccurate aggregate outputRows estimation with column containing null values""
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git commit fb60066027b4ce4281acfd2755538566da2664aa Author: HyukjinKwon AuthorDate: Fri May 24 05:39:13 2019 +0900 Revert "Revert "[SPARK-27539][SQL] Fix inaccurate aggregate outputRows estimation with column containing null values"" This reverts commit e0e8a6de1345e6e716bb8c6e35a98e981feb3bab. --- .../plans/logical/statsEstimation/AggregateEstimation.scala | 4 ++-- .../catalyst/statsEstimation/AggregateEstimationSuite.scala | 13 - 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala index 7ef22fa..b9e72c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala @@ -42,8 +42,8 @@ object AggregateEstimation { (res, expr) => { val columnStat = childStats.attributeStats(expr.asInstanceOf[Attribute]) val distinctCount = columnStat.distinctCount.get - val distinctValue: BigInt = if (distinctCount == 0 && columnStat.nullCount.get > 0) { -1 + val distinctValue: BigInt = if (columnStat.nullCount.get > 0) { +distinctCount + 1 } else { distinctCount } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala index 6bdf8cd..d89b9df 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala @@ -40,7 +40,9 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest { attr("key31") -> ColumnStat(distinctCount = Some(0), min = None, max = None, nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)), attr("key32") -> ColumnStat(distinctCount = Some(0), min = None, max = None, - nullCount = Some(4), avgLen = Some(4), maxLen = Some(4)) + nullCount = Some(4), avgLen = Some(4), maxLen = Some(4)), +attr("key33") -> ColumnStat(distinctCount = Some(2), min = None, max = None, + nullCount = Some(2), avgLen = Some(4), maxLen = Some(4)) )) private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1) @@ -102,6 +104,15 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest { expectedOutputRowCount = nameToColInfo("key22")._2.distinctCount.get) } + test("group-by column with null value") { +checkAggStats( + tableColumns = Seq("key21", "key33"), + tableRowCount = 6, + groupByColumns = Seq("key21", "key33"), + expectedOutputRowCount = nameToColInfo("key21")._2.distinctCount.get * +(nameToColInfo("key33")._2.distinctCount.get + 1)) + } + test("non-cbo estimation") { val attributes = Seq("key12").map(nameToAttr) val child = StatsTestPlan( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [MINOR][DOC] ForeachBatch doc fix.
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 80fe1ed [MINOR][DOC] ForeachBatch doc fix. 80fe1ed is described below commit 80fe1ed4a6974ed5083e5602fe364bc8955d2f8c Author: Gabor Somogyi AuthorDate: Sat May 25 00:03:59 2019 +0900 [MINOR][DOC] ForeachBatch doc fix. ## What changes were proposed in this pull request? ForeachBatch doc is wrongly formatted. This PR formats it. ## How was this patch tested? ``` cd docs SKIP_API=1 jekyll build ``` Manual webpage check. Closes #24698 from gaborgsomogyi/foreachbatchdoc. Authored-by: Gabor Somogyi Signed-off-by: HyukjinKwon --- docs/structured-streaming-programming-guide.md | 20 ++-- 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index f0971ab..a93f65b 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -2086,12 +2086,20 @@ With `foreachBatch`, you can do the following. cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. -streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => - batchDF.persist() - batchDF.write.format(...).save(...) // location 1 - batchDF.write.format(...).save(...) // location 2 - batchDF.unpersist() -} + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.persist() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.unpersist() +} +{% endhighlight %} + + + - **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported in streaming DataFrames because Spark does not support generating incremental plans in those cases. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (6b28497 -> 4e7908f)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 6b28497 [SPARK-27732][SQL] Add v2 CreateTable implementation. add 4e7908f [MINOR][DOC] ForeachBatch doc fix. No new revisions were added by this update. Summary of changes: docs/structured-streaming-programming-guide.md | 20 ++-- 1 file changed, 14 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [MINOR][DOCS] Make Spark's description consistent in docs with websites
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new cfe236f [MINOR][DOCS] Make Spark's description consistent in docs with websites cfe236f is described below commit cfe236f6957dd9ae799831d47914de34a03716aa Author: Reynold Xin AuthorDate: Fri May 10 17:55:23 2019 +0900 [MINOR][DOCS] Make Spark's description consistent in docs with websites We updated our website a long time ago to describe Spark as the unified analytics engine, which is also how Spark is described in the community now. But our README and docs page still use the same description from 2011 ... This patch updates them. The patch also updates the README example to use more modern APIs, and refer to Structured Streaming rather than Spark Streaming. Closes #24573 from rxin/consistent-message. Authored-by: Reynold Xin Signed-off-by: HyukjinKwon --- README.md| 20 ++-- docs/index.md| 4 ++-- python/README.md | 4 ++-- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 271f2f5..482c007 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,18 @@ # Apache Spark -[![Jenkins Build](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/badge/icon)](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7) -[![AppVeyor Build](https://img.shields.io/appveyor/ci/ApacheSoftwareFoundation/spark/master.svg?style=plastic=appveyor)](https://ci.appveyor.com/project/ApacheSoftwareFoundation/spark) -[![PySpark Coverage](https://img.shields.io/badge/dynamic/xml.svg?label=pyspark%20coverage=https%3A%2F%2Fspark-test.github.io%2Fpyspark-coverage-site=%2Fhtml%2Fbody%2Fdiv%5B1%5D%2Fdiv%2Fh1%2Fspan=brightgreen=plastic)](https://spark-test.github.io/pyspark-coverage-site) - -Spark is a fast and general cluster computing system for Big Data. It provides +Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Scala, Java, Python, and R, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and DataFrames, MLlib for machine learning, GraphX for graph processing, -and Spark Streaming for stream processing. +and Structured Streaming for stream processing. <http://spark.apache.org/> +[![Jenkins Build](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/badge/icon)](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7) +[![AppVeyor Build](https://img.shields.io/appveyor/ci/ApacheSoftwareFoundation/spark/master.svg?style=plastic=appveyor)](https://ci.appveyor.com/project/ApacheSoftwareFoundation/spark) +[![PySpark Coverage](https://img.shields.io/badge/dynamic/xml.svg?label=pyspark%20coverage=https%3A%2F%2Fspark-test.github.io%2Fpyspark-coverage-site=%2Fhtml%2Fbody%2Fdiv%5B1%5D%2Fdiv%2Fh1%2Fspan=brightgreen=plastic)](https://spark-test.github.io/pyspark-coverage-site) + ## Online Documentation @@ -41,9 +41,9 @@ The easiest way to start using Spark is through the Scala shell: ./bin/spark-shell -Try the following command, which should return 1000: +Try the following command, which should return 1,000,000,000: -scala> sc.parallelize(1 to 1000).count() +scala> spark.range(1000 * 1000 * 1000).count() ## Interactive Python Shell @@ -51,9 +51,9 @@ Alternatively, if you prefer Python, you can use the Python shell: ./bin/pyspark -And run the following command, which should also return 1000: +And run the following command, which should also return 1,000,000,000: ->>> sc.parallelize(range(1000)).count() +>>> spark.range(1000 * 1000 * 1000).count() ## Example Programs diff --git a/docs/index.md b/docs/index.md index 5adbebd..bec0fee 100644 --- a/docs/index.md +++ b/docs/index.md @@ -20,10 +20,10 @@ license: | limitations under the License. --- -Apache Spark is a fast and general-purpose cluster computing system. +Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. -It also supports a rich set of higher-level tools including [Spark SQL](sql-programming-guide.html) for SQL and structured data processing, [MLlib](ml-guide.html) for machine learning, [GraphX](graphx-programming-guide.html) for graph processing, and [Spark Streaming](streaming-programming-guide.html). +It also supports a rich set of higher-level tools including [Spark SQL](sql-programming-guide.html) for SQL and structured data processing, [MLlib](ml-guide.html) for machine learning, [GraphX](graphx-pro
[spark] branch master updated (57450ed -> 78a403f)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 57450ed [MINOR][SS] Rename `secondLatestBatchId` to `secondLatestOffsets` add 78a403f [SPARK-27627][SQL] Make option "pathGlobFilter" as a general option for all file sources No new revisions were added by this update. Summary of changes: docs/sql-data-sources-binaryFile.md| 16 + docs/sql-data-sources-load-save-functions.md | 21 +++ .../examples/sql/JavaSQLDataSourceExample.java | 5 ++ examples/src/main/python/sql/datasource.py | 5 ++ examples/src/main/r/RSparkSQLExample.R | 4 ++ .../partitioned_users.orc/do_not_read_this.txt | 1 + .../users.orc | Bin 0 -> 448 bytes .../favorite_color=red/users.orc | Bin 0 -> 402 bytes .../spark/examples/sql/SQLDataSourceExample.scala | 5 ++ .../org/apache/spark/sql/avro/AvroFileFormat.scala | 4 ++ .../org/apache/spark/sql/avro/AvroOptions.scala| 5 +- python/pyspark/sql/readwriter.py | 6 ++ python/pyspark/sql/streaming.py| 6 ++ .../org/apache/spark/sql/DataFrameReader.scala | 9 +++ .../sql/execution/datasources/DataSource.scala | 3 +- .../datasources/PartitioningAwareFileIndex.scala | 11 +++- .../datasources/binaryfile/BinaryFileFormat.scala | 70 - .../sql/execution/streaming/FileStreamSource.scala | 4 +- .../execution/streaming/MetadataLogFileIndex.scala | 3 +- .../spark/sql/streaming/DataStreamReader.scala | 9 +++ .../spark/sql/FileBasedDataSourceSuite.scala | 32 ++ .../sql/streaming/FileStreamSourceSuite.scala | 19 ++ 22 files changed, 173 insertions(+), 65 deletions(-) create mode 100644 examples/src/main/resources/partitioned_users.orc/do_not_read_this.txt create mode 100644 examples/src/main/resources/partitioned_users.orc/favorite_color=__HIVE_DEFAULT_PARTITION__/users.orc create mode 100644 examples/src/main/resources/partitioned_users.orc/favorite_color=red/users.orc - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [MINOR][DOCS] Fix Spark hive example.
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 694ebb4 [MINOR][DOCS] Fix Spark hive example. 694ebb4 is described below commit 694ebb493fa8c7d3a7ad1b6927af5d1b617999b7 Author: Prashant Sharma AuthorDate: Tue May 21 18:23:38 2019 +0900 [MINOR][DOCS] Fix Spark hive example. ## What changes were proposed in this pull request? Documentation has an error, https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html#hive-tables. The example: ```scala scala> val dataDir = "/tmp/parquet_data" dataDir: String = /tmp/parquet_data scala> spark.range(10).write.parquet(dataDir) scala> sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'") res6: org.apache.spark.sql.DataFrame = [] scala> sql("SELECT * FROM hive_ints").show() ++ | key| ++ |null| |null| |null| |null| |null| |null| |null| |null| |null| |null| ++ ``` Range does not emit `key`, but `id` instead. Closes #24657 from ScrapCodes/fix_hive_example. Lead-authored-by: Prashant Sharma Co-authored-by: Prashant Sharma Signed-off-by: HyukjinKwon (cherry picked from commit 5f4b50513cd34cd3dcf7f72972bfcd1f51031723) Signed-off-by: HyukjinKwon --- .../org/apache/spark/examples/sql/hive/SparkHiveExample.scala | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala index 70fb5b2..a832276 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala @@ -122,16 +122,16 @@ object SparkHiveExample { val dataDir = "/tmp/parquet_data" spark.range(10).write.parquet(dataDir) // Create a Hive external Parquet table -sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'") +sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'") // The Hive external table should already have data -sql("SELECT * FROM hive_ints").show() +sql("SELECT * FROM hive_bigints").show() // +---+ -// |key| +// | id| // +---+ // | 0| // | 1| // | 2| -// ... +// ... Order may vary, as spark processes the partitions in parallel. // Turn on flag for Hive Dynamic Partitioning spark.sqlContext.setConf("hive.exec.dynamic.partition", "true") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.3 updated: [MINOR][DOCS] Fix Spark hive example.
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.3 by this push: new 533d603 [MINOR][DOCS] Fix Spark hive example. 533d603 is described below commit 533d603cebd2fe5197f4b4d40e1f54fa94c74f36 Author: Prashant Sharma AuthorDate: Tue May 21 18:23:38 2019 +0900 [MINOR][DOCS] Fix Spark hive example. ## What changes were proposed in this pull request? Documentation has an error, https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html#hive-tables. The example: ```scala scala> val dataDir = "/tmp/parquet_data" dataDir: String = /tmp/parquet_data scala> spark.range(10).write.parquet(dataDir) scala> sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'") res6: org.apache.spark.sql.DataFrame = [] scala> sql("SELECT * FROM hive_ints").show() ++ | key| ++ |null| |null| |null| |null| |null| |null| |null| |null| |null| |null| ++ ``` Range does not emit `key`, but `id` instead. Closes #24657 from ScrapCodes/fix_hive_example. Lead-authored-by: Prashant Sharma Co-authored-by: Prashant Sharma Signed-off-by: HyukjinKwon (cherry picked from commit 5f4b50513cd34cd3dcf7f72972bfcd1f51031723) Signed-off-by: HyukjinKwon --- .../org/apache/spark/examples/sql/hive/SparkHiveExample.scala | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala index 70fb5b2..a832276 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala @@ -122,16 +122,16 @@ object SparkHiveExample { val dataDir = "/tmp/parquet_data" spark.range(10).write.parquet(dataDir) // Create a Hive external Parquet table -sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'") +sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'") // The Hive external table should already have data -sql("SELECT * FROM hive_ints").show() +sql("SELECT * FROM hive_bigints").show() // +---+ -// |key| +// | id| // +---+ // | 0| // | 1| // | 2| -// ... +// ... Order may vary, as spark processes the partitions in parallel. // Turn on flag for Hive Dynamic Partitioning spark.sqlContext.setConf("hive.exec.dynamic.partition", "true") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [MINOR][DOCS] Fix Spark hive example.
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5f4b505 [MINOR][DOCS] Fix Spark hive example. 5f4b505 is described below commit 5f4b50513cd34cd3dcf7f72972bfcd1f51031723 Author: Prashant Sharma AuthorDate: Tue May 21 18:23:38 2019 +0900 [MINOR][DOCS] Fix Spark hive example. ## What changes were proposed in this pull request? Documentation has an error, https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html#hive-tables. The example: ```scala scala> val dataDir = "/tmp/parquet_data" dataDir: String = /tmp/parquet_data scala> spark.range(10).write.parquet(dataDir) scala> sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'") res6: org.apache.spark.sql.DataFrame = [] scala> sql("SELECT * FROM hive_ints").show() ++ | key| ++ |null| |null| |null| |null| |null| |null| |null| |null| |null| |null| ++ ``` Range does not emit `key`, but `id` instead. Closes #24657 from ScrapCodes/fix_hive_example. Lead-authored-by: Prashant Sharma Co-authored-by: Prashant Sharma Signed-off-by: HyukjinKwon --- .../org/apache/spark/examples/sql/hive/SparkHiveExample.scala | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala index 70fb5b2..a832276 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala @@ -122,16 +122,16 @@ object SparkHiveExample { val dataDir = "/tmp/parquet_data" spark.range(10).write.parquet(dataDir) // Create a Hive external Parquet table -sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'") +sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'") // The Hive external table should already have data -sql("SELECT * FROM hive_ints").show() +sql("SELECT * FROM hive_bigints").show() // +---+ -// |key| +// | id| // +---+ // | 0| // | 1| // | 2| -// ... +// ... Order may vary, as spark processes the partitions in parallel. // Turn on flag for Hive Dynamic Partitioning spark.sqlContext.setConf("hive.exec.dynamic.partition", "true") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (604aa1b -> 20fb01b)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 604aa1b [SPARK-27786][SQL] Fix Sha1, Md5, and Base64 codegen when commons-codec is shaded add 20fb01b [MINOR][PYTHON] Remove explain(True) in test_udf.py No new revisions were added by this update. Summary of changes: python/pyspark/sql/tests/test_udf.py | 1 - 1 file changed, 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27591][SQL] Fix UnivocityParser for UserDefinedType
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a35043c [SPARK-27591][SQL] Fix UnivocityParser for UserDefinedType a35043c is described below commit a35043c9e22a9bd9e372246c8d337e016736536c Author: Artem Kalchenko AuthorDate: Wed May 1 08:27:51 2019 +0900 [SPARK-27591][SQL] Fix UnivocityParser for UserDefinedType ## What changes were proposed in this pull request? Fix bug in UnivocityParser. makeConverter method didn't work correctly for UsedDefinedType ## How was this patch tested? A test suite for UnivocityParser has been extended. Closes #24496 from kalkolab/spark-27591. Authored-by: Artem Kalchenko Signed-off-by: HyukjinKwon --- .../spark/sql/catalyst/csv/UnivocityParser.scala | 2 +- .../sql/catalyst/csv/UnivocityParserSuite.scala| 35 ++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index b26044e..8456b7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -166,7 +166,7 @@ class UnivocityParser( case _: StringType => (d: String) => nullSafeDatum(d, name, nullable, options)(UTF8String.fromString) -case udt: UserDefinedType[_] => (datum: String) => +case udt: UserDefinedType[_] => makeConverter(name, udt.sqlType, nullable) // We don't actually hit this exception though, we keep it for understandability diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index 986de12..933c576 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -231,4 +231,39 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { Seq("en-US", "ko-KR", "ru-RU", "de-DE").foreach(checkDecimalParsing) } + + test("SPARK-27591 UserDefinedType can be read") { + +@SQLUserDefinedType(udt = classOf[StringBasedUDT]) +case class NameId(name: String, id: Int) + +class StringBasedUDT extends UserDefinedType[NameId] { + override def sqlType: DataType = StringType + + override def serialize(obj: NameId): Any = s"${obj.name}\t${obj.id}" + + override def deserialize(datum: Any): NameId = datum match { +case s: String => + val split = s.split("\t") + if (split.length != 2) throw new RuntimeException(s"Can't parse $s into NameId"); + NameId(split(0), Integer.parseInt(split(1))) +case _ => throw new RuntimeException(s"Can't parse $datum into NameId"); + } + + override def userClass: Class[NameId] = classOf[NameId] +} + +object StringBasedUDT extends StringBasedUDT + +val input = "name\t42" +val expected = UTF8String.fromString(input) + +val options = new CSVOptions(Map.empty[String, String], false, "GMT") +val parser = new UnivocityParser(StructType(Seq.empty), options) + +val convertedValue = parser.makeConverter("_1", StringBasedUDT, nullable = false).apply(input) + +assert(convertedValue.isInstanceOf[UTF8String]) +assert(convertedValue == expected) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-26921][R][DOCS] Document Arrow optimization and vectorized R APIs
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3670826 [SPARK-26921][R][DOCS] Document Arrow optimization and vectorized R APIs 3670826 is described below commit 3670826af6f40bf8cd6c6c850515d6c2f0a83519 Author: HyukjinKwon AuthorDate: Thu May 2 10:02:14 2019 +0900 [SPARK-26921][R][DOCS] Document Arrow optimization and vectorized R APIs ## What changes were proposed in this pull request? This PR adds SparkR with Arrow optimization documentation. Note that looks CRAN issue in Arrow side won't look likely fixed soon, IMHO, even after Spark 3.0. If it happen to be fixed, I will fix this doc too later. Another note is that Arrow R package itself requires R 3.5+. So, I intentionally didn't note this. ## How was this patch tested? Manually built and checked. Closes #24506 from HyukjinKwon/SPARK-26924. Authored-by: HyukjinKwon Signed-off-by: HyukjinKwon --- docs/sparkr.md | 59 ++ .../org/apache/spark/sql/internal/SQLConf.scala| 18 +-- 2 files changed, 72 insertions(+), 5 deletions(-) diff --git a/docs/sparkr.md b/docs/sparkr.md index 26eeae6..d6b5179 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -657,6 +657,65 @@ The following example shows how to save/load a MLlib model by SparkR. SparkR supports the Structured Streaming API. Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. For more information see the R API on the [Structured Streaming Programming Guide](structured-streaming-programming-guide.html) +# Apache Arrow in SparkR + +Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer data between JVM and R processes. See also PySpark optimization done, [PySpark Usage Guide for Pandas with Apache Arrow](sql-pyspark-pandas-with-arrow.html). This guide targets to explain how to use Arrow optimization in SparkR with some key points. + +## Ensure Arrow Installed + +Currently, Arrow R library is not on CRAN yet [ARROW-3204](https://issues.apache.org/jira/browse/ARROW-3204). Therefore, it should be installed directly from Github. You can use `remotes::install_github` as below. + +```bash +Rscript -e 'remotes::install_github("apache/arrow@TAG", subdir = "r")' +``` + +`TAG` is a version tag that can be checked in [Arrow at Github](https://github.com/apache/arrow/releases). You must ensure that Arrow R packge is installed and available on all cluster nodes. The current supported version is 0.12.1. + +## Enabling for Conversion to/from R DataFrame, `dapply` and `gapply` + +Arrow optimization is available when converting a Spark DataFrame to an R DataFrame using the call `createDataFrame(r_df)`, +when creating a Spark DataFrame from an R DataFrame with `collect(spark_df)`, when applying an R native function to each partition +via `dapply(...)` and when applying an R native function to grouped data via `gapply(...)`. +To use Arrow when executing these calls, users need to first set the Spark configuration ‘spark.sql.execution.arrow.enabled’ +to ‘true’. This is disabled by default. + +In addition, optimizations enabled by ‘spark.sql.execution.arrow.enabled’ could fallback automatically to non-Arrow optimization +implementation if an error occurs before the actual computation within Spark during converting a Spark DataFrame to/from an R +DataFrame. + + +{% highlight r %} +# Start up spark session with Arrow optimization enabled +sparkR.session(master = "local[*]", + sparkConfig = list(spark.sql.execution.arrow.enabled = "true")) + +# Converts Spark DataFrame from an R DataFrame +spark_df <- createDataFrame(mtcars) + +# Converts Spark DataFrame to an R DataFrame +collect(spark_df) + +# Apply an R native function to each partition. +collect(dapply(spark_df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double"))) + +# Apply an R native function to grouped data. +collect(gapply(spark_df, + "gear", + function(key, group) { + data.frame(gear = key[[1]], disp = mean(group$disp) > group$disp) + }, + structType("gear double, disp boolean"))) +{% endhighlight %} + + +Using the above optimizations with Arrow will produce the same results as when Arrow is not enabled. Note that even with Arrow, +`collect(spark_df)` results in the collection of all records in the DataFrame to the driver program and should be done on a +small subset of the data. + +## Supported SQL Types + +Currently, all Spark SQL data types are supported by Arrow-based conversion except `FloatType`,
[spark] branch master updated: [SPARK-23619][DOCS] Add output description for some generator expressions / functions
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 90085a1 [SPARK-23619][DOCS] Add output description for some generator expressions / functions 90085a1 is described below commit 90085a184797f8bddbff8ca6ec7a60f3899c1a86 Author: Jash Gala AuthorDate: Sat Apr 27 10:30:12 2019 +0900 [SPARK-23619][DOCS] Add output description for some generator expressions / functions ## What changes were proposed in this pull request? This PR addresses SPARK-23619: https://issues.apache.org/jira/browse/SPARK-23619 It adds additional comments indicating the default column names for the `explode` and `posexplode` functions in Spark-SQL. Functions for which comments have been updated so far: * stack * inline * explode * posexplode * explode_outer * posexplode_outer ## How was this patch tested? This is just a change in the comments. The package builds and tests successfullly after the change. Closes #23748 from jashgala/SPARK-23619. Authored-by: Jash Gala Signed-off-by: HyukjinKwon --- R/pkg/R/functions.R | 12 ++-- python/pyspark/sql/functions.py | 20 .../spark/sql/catalyst/expressions/generators.scala | 12 .../main/scala/org/apache/spark/sql/functions.scala | 8 4 files changed, 42 insertions(+), 10 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 0566a47..3bd1f54 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -3589,6 +3589,8 @@ setMethod("element_at", #' @details #' \code{explode}: Creates a new row for each element in the given array or map column. +#' Uses the default column name \code{col} for elements in the array and +#' \code{key} and \code{value} for elements in the map unless specified otherwise. #' #' @rdname column_collection_functions #' @aliases explode explode,Column-method @@ -3649,7 +3651,9 @@ setMethod("sort_array", #' @details #' \code{posexplode}: Creates a new row for each element with position in the given array -#' or map column. +#' or map column. Uses the default column name \code{pos} for position, and \code{col} +#' for elements in the array and \code{key} and \code{value} for elements in the map +#' unless specified otherwise. #' #' @rdname column_collection_functions #' @aliases posexplode posexplode,Column-method @@ -3790,7 +3794,8 @@ setMethod("repeat_string", #' \code{explode}: Creates a new row for each element in the given array or map column. #' Unlike \code{explode}, if the array/map is \code{null} or empty #' then \code{null} is produced. -#' +#' Uses the default column name \code{col} for elements in the array and +#' \code{key} and \code{value} for elements in the map unless specified otherwise. #' #' @rdname column_collection_functions #' @aliases explode_outer explode_outer,Column-method @@ -3815,6 +3820,9 @@ setMethod("explode_outer", #' \code{posexplode_outer}: Creates a new row for each element with position in the given #' array or map column. Unlike \code{posexplode}, if the array/map is \code{null} or empty #' then the row (\code{null}, \code{null}) is produced. +#' Uses the default column name \code{pos} for position, and \code{col} +#' for elements in the array and \code{key} and \code{value} for elements in the map +#' unless specified otherwise. #' #' @rdname column_collection_functions #' @aliases posexplode_outer posexplode_outer,Column-method diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 22163f5..613822b 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2142,7 +2142,10 @@ def array_except(col1, col2): @since(1.4) def explode(col): -"""Returns a new row for each element in the given array or map. +""" +Returns a new row for each element in the given array or map. +Uses the default column name `col` for elements in the array and +`key` and `value` for elements in the map unless specified otherwise. >>> from pyspark.sql import Row >>> eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})]) @@ -2163,7 +2166,10 @@ def explode(col): @since(2.1) def posexplode(col): -"""Returns a new row for each element with position in the given array or map. +""" +Returns a new row for each element with position in the given array or map. +Uses the default column name `pos` for position, and `col` for elements in the +array and `key` and `value` for elements in the map unless specified otherwise.
[spark] branch master updated: [SPARK-25139][SPARK-18406][CORE] Avoid NonFatals to kill the Executor in PythonRunner
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e63fbfc [SPARK-25139][SPARK-18406][CORE] Avoid NonFatals to kill the Executor in PythonRunner e63fbfc is described below commit e63fbfcf206b8c98396668736fd880fb6787a4f2 Author: Xingbo Jiang AuthorDate: Wed May 8 13:47:26 2019 +0900 [SPARK-25139][SPARK-18406][CORE] Avoid NonFatals to kill the Executor in PythonRunner ## What changes were proposed in this pull request? Python uses a prefetch approach to read the result from upstream and serve them in another thread, thus it's possible that if the children operator doesn't consume all the data then the Task cleanup may happen before Python side read process finishes, this in turn create a race condition that the block read locks are freed during Task cleanup and then the reader try to release the read lock it holds and find it has been released, in this case we shall hit a AssertionError. We shall catch the AssertionError in PythonRunner and prevent this kill the Executor. ## How was this patch tested? Hard to write a unit test case for this case, manually verified with failed job. Closes #24542 from jiangxb1987/pyError. Authored-by: Xingbo Jiang Signed-off-by: HyukjinKwon --- .../org/apache/spark/api/python/PythonRDD.scala| 2 +- .../org/apache/spark/api/python/PythonRunner.scala | 34 -- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index bc84382..fe25c3a 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -86,7 +86,7 @@ private[spark] case class PythonFunction( private[spark] case class ChainedPythonFunctions(funcs: Seq[PythonFunction]) /** Thrown for exceptions in user Python code. */ -private[spark] class PythonException(msg: String, cause: Exception) +private[spark] class PythonException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) /** diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index fabd6fd..dca8704 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets.UTF_8 import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ +import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.internal.Logging @@ -165,15 +166,15 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( context: TaskContext) extends Thread(s"stdout writer for $pythonExec") { -@volatile private var _exception: Exception = null +@volatile private var _exception: Throwable = null private val pythonIncludes = funcs.flatMap(_.funcs.flatMap(_.pythonIncludes.asScala)).toSet private val broadcastVars = funcs.flatMap(_.funcs.flatMap(_.broadcastVars.asScala)) setDaemon(true) -/** Contains the exception thrown while writing the parent iterator to the Python process. */ -def exception: Option[Exception] = Option(_exception) +/** Contains the throwable thrown while writing the parent iterator to the Python process. */ +def exception: Option[Throwable] = Option(_exception) /** Terminates the writer thread, ignoring any exceptions that may occur due to cleanup. */ def shutdownOnTaskCompletion() { @@ -347,18 +348,21 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(SpecialLengths.END_OF_STREAM) dataOut.flush() } catch { -case e: Exception if context.isCompleted || context.isInterrupted => - logDebug("Exception thrown after task completion (likely due to cleanup)", e) - if (!worker.isClosed) { -Utils.tryLog(worker.shutdownOutput()) - } - -case e: Exception => - // We must avoid throwing exceptions here, because the thread uncaught exception handler - // will kill the whole executor (see org.apache.spark.executor.Executor). - _exception = e - if (!worker.isClosed) { -Utils.tryLog(worker.shutdownOutput()) +case t: Throwable if (NonFatal(t) || t.isInstanceOf[Exception]) => + if (context.isCompleted || context.isInterrupted) { +logDebug("Exception/NonFatal Error thrown after task completion (likely due to " + + "cle
[spark] branch master updated: [SPARK-25079][PYTHON] update python3 executable to 3.6.x
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e1ece6a [SPARK-25079][PYTHON] update python3 executable to 3.6.x e1ece6a is described below commit e1ece6a3192f2637c47de99ec8958e909c507824 Author: shane knapp AuthorDate: Fri Apr 19 10:03:50 2019 +0900 [SPARK-25079][PYTHON] update python3 executable to 3.6.x ## What changes were proposed in this pull request? have jenkins test against python3.6 (instead of 3.4). ## How was this patch tested? extensive testing on both the centos and ubuntu jenkins workers. NOTE: this will need to be backported to all active branches. Closes #24266 from shaneknapp/updating-python3-executable. Authored-by: shane knapp Signed-off-by: HyukjinKwon --- dev/run-tests.py| 4 +++- python/run-tests.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/dev/run-tests.py b/dev/run-tests.py index dfad299..70dcb4a 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -546,7 +546,9 @@ def main(): hadoop_version = os.environ.get("AMPLAB_JENKINS_BUILD_PROFILE", "hadoop2.7") test_env = "amplab_jenkins" # add path for Python3 in Jenkins if we're calling from a Jenkins machine -os.environ["PATH"] = "/home/anaconda/envs/py3k/bin:" + os.environ.get("PATH") +# TODO(sknapp): after all builds are ported to the ubuntu workers, change this to be: +# /home/jenkins/anaconda2/envs/py36/bin +os.environ["PATH"] = "/home/anaconda/envs/py36/bin:" + os.environ.get("PATH") else: # else we're running locally and can use local settings build_tool = "sbt" diff --git a/python/run-tests.py b/python/run-tests.py index 793035f..027069d 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -160,7 +160,7 @@ def run_individual_python_test(target_dir, test_name, pyspark_python): def get_default_python_executables(): -python_execs = [x for x in ["python2.7", "python3.4", "pypy"] if which(x)] +python_execs = [x for x in ["python2.7", "python3.6", "pypy"] if which(x)] if "python2.7" not in python_execs: LOGGER.warning("Not testing against `python2.7` because it could not be found; falling" " back to `python` instead") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27501][SQL][TEST] Add test for HIVE-13083: Writing HiveDecimal to ORC can wrongly suppress present stream
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8f82237 [SPARK-27501][SQL][TEST] Add test for HIVE-13083: Writing HiveDecimal to ORC can wrongly suppress present stream 8f82237 is described below commit 8f82237a5b1865fc78778231c310bc7f3d66cf42 Author: Yuming Wang AuthorDate: Fri Apr 19 10:12:21 2019 +0900 [SPARK-27501][SQL][TEST] Add test for HIVE-13083: Writing HiveDecimal to ORC can wrongly suppress present stream ## What changes were proposed in this pull request? This PR add test for [HIVE-13083](https://issues.apache.org/jira/browse/HIVE-13083): Writing HiveDecimal to ORC can wrongly suppress present stream. ## How was this patch tested? manual tests: ``` build/sbt "hive/testOnly *HiveOrcQuerySuite" -Phive -Phadoop-3.2 ``` Closes #24397 from wangyum/SPARK-26437. Authored-by: Yuming Wang Signed-off-by: HyukjinKwon --- .../org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala | 10 ++ 1 file changed, 10 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala index 597b0f5..94f35b0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala @@ -218,4 +218,14 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton { } } } + + test("SPARK-26437 Can not query decimal type when value is 0") { +assume(HiveUtils.isHive23, "bad test: This bug fixed by HIVE-13083(Hive 2.0.1)") +withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "false") { + withTable("spark_26437") { +sql("CREATE TABLE spark_26437 STORED AS ORCFILE AS SELECT 0.00 AS c1") +checkAnswer(spark.table("spark_26437"), Seq(Row(0.00))) + } +} + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b73744a -> 253a879)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b73744a [SPARK-27611][BUILD] Exclude jakarta.activation:jakarta.activation-api from org.glassfish.jaxb:jaxb-runtime:2.3.2 add 253a879 [SPARK-26921][R][DOCS][FOLLOWUP] Document Arrow optimization and vectorized R APIs No new revisions were added by this update. Summary of changes: docs/sparkr.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27606][SQL] Deprecate 'extended' field in ExpressionDescription/ExpressionInfo
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new df8aa7b [SPARK-27606][SQL] Deprecate 'extended' field in ExpressionDescription/ExpressionInfo df8aa7b is described below commit df8aa7ba8a99cbc172f09155cbf98157fbfdf323 Author: HyukjinKwon AuthorDate: Thu May 2 21:10:00 2019 +0900 [SPARK-27606][SQL] Deprecate 'extended' field in ExpressionDescription/ExpressionInfo ## What changes were proposed in this pull request? After we added other fields, `arguments`, `examples`, `note` and `since` at SPARK-21485 and `deprecated` at SPARK-27328, we have nicer way to separately describe extended usages. `extended` field and method at `ExpressionDescription`/`ExpressionInfo` is now pretty useless - it's not used in Spark side and only exists to keep backward compatibility. This PR proposes to deprecate it. ## How was this patch tested? Manually checked the deprecation waring is properly shown. Closes #24500 from HyukjinKwon/SPARK-27606. Authored-by: HyukjinKwon Signed-off-by: HyukjinKwon --- .../spark/sql/catalyst/expressions/ExpressionDescription.java | 6 ++ .../org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java | 6 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionDescription.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionDescription.java index 6a64cb1..acdf6af 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionDescription.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionDescription.java @@ -88,6 +88,12 @@ import java.lang.annotation.RetentionPolicy; @Retention(RetentionPolicy.RUNTIME) public @interface ExpressionDescription { String usage() default ""; +/** + * @deprecated This field is deprecated as of Spark 3.0. Use {@link #arguments}, + * {@link #examples}, {@link #note}, {@link #since} and {@link #deprecated} instead + * to document the extended usage. + */ +@Deprecated String extended() default ""; String arguments() default ""; String examples() default ""; diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java index 0c69942..0d1f6c2 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java @@ -140,7 +140,11 @@ public class ExpressionInfo { this(className, db, name, null, "", "", "", "", ""); } -// This is to keep the original constructor just in case. +/** + * @deprecated This constructor is deprecated as of Spark 3.0. Use other constructors to fully + * specify each argument for extended usage. + */ +@Deprecated public ExpressionInfo(String className, String db, String name, String usage, String extended) { // `arguments` and `examples` are concatenated for the extended description. So, here // simply pass the `extended` as `arguments` and an empty string for `examples`. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27555][SQL] HiveSerDe should fall back to hadoopconf if hive.default.fileformat is not found in SQLConf
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c66ec43 [SPARK-27555][SQL] HiveSerDe should fall back to hadoopconf if hive.default.fileformat is not found in SQLConf c66ec43 is described below commit c66ec439456c5a160e3849e23c2ce3970d4c6ec7 Author: sandeep katta AuthorDate: Sat May 4 09:02:12 2019 +0900 [SPARK-27555][SQL] HiveSerDe should fall back to hadoopconf if hive.default.fileformat is not found in SQLConf ## What changes were proposed in this pull request? SQLConf does not load hive-site.xml.So HiveSerDe should fall back to hadoopconf if hive.default.fileformat is not found in SQLConf ## How was this patch tested? Tested manually. Added UT Closes #24489 from sandeep-katta/spark-27555. Authored-by: sandeep katta Signed-off-by: HyukjinKwon --- docs/sql-migration-guide-upgrade.md| 2 ++ .../org/apache/spark/sql/internal/HiveSerDe.scala | 12 +++- .../spark/sql/hive/execution/HiveSerDeSuite.scala | 22 +- 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 54512ae..5fe7c7c 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -126,6 +126,8 @@ license: | - Since Spark 3.0, parquet logical type `TIMESTAMP_MICROS` is used by default while saving `TIMESTAMP` columns. In Spark version 2.4 and earlier, `TIMESTAMP` columns are saved as `INT96` in parquet files. To set `INT96` to `spark.sql.parquet.outputTimestampType` restores the previous behavior. + - Since Spark 3.0, if `hive.default.fileformat` is not found in `Spark SQL configuration` then it will fallback to hive-site.xml present in the `Hadoop configuration` of `SparkContext`. + ## Upgrading from Spark SQL 2.4 to 2.4.1 - The value of `spark.executor.heartbeatInterval`, when specified without units like "30" rather than "30s", was diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala index bd25a64..4921e3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.internal import java.util.Locale +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat case class HiveSerDe( @@ -88,7 +89,16 @@ object HiveSerDe { } def getDefaultStorage(conf: SQLConf): CatalogStorageFormat = { -val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile") +// To respect hive-site.xml, it peeks Hadoop configuration from existing Spark session, +// as an easy workaround. See SPARK-27555. +val defaultFormatKey = "hive.default.fileformat" +val defaultValue = { + val defaultFormatValue = "textfile" + SparkSession.getActiveSession.map { session => +session.sessionState.newHadoopConf().get(defaultFormatKey, defaultFormatValue) + }.getOrElse(defaultFormatValue) +} +val defaultStorageType = conf.getConfString("hive.default.fileformat", defaultValue) val defaultHiveSerde = sourceToSerDe(defaultStorageType) CatalogStorageFormat.empty.copy( inputFormat = defaultHiveSerde.flatMap(_.inputFormat) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala index d7752e9..ed4304b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala @@ -21,13 +21,14 @@ import java.net.URI import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.execution.metric.InputOutputMetricsHelper import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.types.StructType /** @@ -210,4 +211,23 @@ class HiveSerDeSuite extends HiveComparisonTest with PlanTest with BeforeAndAfte val e8 = intercept[IllegalArgumentException](analyzeCreate
[spark] branch master updated: [MINOR][DOCS] Correct date_trunc docs
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5182aa2 [MINOR][DOCS] Correct date_trunc docs 5182aa2 is described below commit 5182aa25f017fe7ef3616e8f4459296d13dbb716 Author: Seth Fitzsimmons AuthorDate: Sat May 4 09:13:23 2019 +0900 [MINOR][DOCS] Correct date_trunc docs ## What changes were proposed in this pull request? `date_trunc` argument order was flipped, phrasing was awkward. ## How was this patch tested? Documentation-only. Closes #24522 from mojodna/patch-2. Authored-by: Seth Fitzsimmons Signed-off-by: HyukjinKwon --- sql/core/src/main/scala/org/apache/spark/sql/functions.scala | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index f92bf79..1abda54 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2943,8 +2943,8 @@ object functions { * * @param date A date, timestamp or string. If a string, the data must be in a format that can be * cast to a date, such as `-MM-dd` or `-MM-dd HH:mm:ss.` - * @param format: 'year', '', 'yy' for truncate by year, - * or 'month', 'mon', 'mm' for truncate by month + * @param format: 'year', '', 'yy' to truncate by year, + * or 'month', 'mon', 'mm' to truncate by month * * @return A date, or null if `date` was a string that could not be cast to a date or `format` * was an invalid value @@ -2958,11 +2958,11 @@ object functions { /** * Returns timestamp truncated to the unit specified by the format. * - * For example, `date_tunc("2018-11-19 12:01:19", "year")` returns 2018-01-01 00:00:00 + * For example, `date_trunc("year", "2018-11-19 12:01:19")` returns 2018-01-01 00:00:00 * - * @param format: 'year', '', 'yy' for truncate by year, - *'month', 'mon', 'mm' for truncate by month, - *'day', 'dd' for truncate by day, + * @param format: 'year', '', 'yy' to truncate by year, + *'month', 'mon', 'mm' to truncate by month, + *'day', 'dd' to truncate by day, *Other options are: 'second', 'minute', 'hour', 'week', 'month', 'quarter' * @param timestamp A date, timestamp or string. If a string, the data must be in a format that * can be cast to a timestamp, such as `-MM-dd` or `-MM-dd HH:mm:ss.` - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27629][PYSPARK] Prevent Unpickler from intervening each unpickling
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d9bcacf [SPARK-27629][PYSPARK] Prevent Unpickler from intervening each unpickling d9bcacf is described below commit d9bcacf94b93fe76542b5c1fd852559075ef6faa Author: Liang-Chi Hsieh AuthorDate: Sat May 4 13:21:08 2019 +0900 [SPARK-27629][PYSPARK] Prevent Unpickler from intervening each unpickling ## What changes were proposed in this pull request? In SPARK-27612, one correctness issue was reported. When protocol 4 is used to pickle Python objects, we found that unpickled objects were wrong. A temporary fix was proposed by not using highest protocol. It was found that Opcodes.MEMOIZE was appeared in the opcodes in protocol 4. It is suspect to this issue. A deeper dive found that Opcodes.MEMOIZE stores objects into internal map of Unpickler object. We use single Unpickler object to unpickle serialized Python bytes. Stored objects intervenes next round of unpickling, if the map is not cleared. We has two options: 1. Continues to reuse Unpickler, but calls its close after each unpickling. 2. Not to reuse Unpickler and create new Unpickler object in each unpickling. This patch takes option 1. ## How was this patch tested? Passing the test added in SPARK-27612 (#24519). Closes #24521 from viirya/SPARK-27629. Authored-by: Liang-Chi Hsieh Signed-off-by: HyukjinKwon --- core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala | 3 +++ .../main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 4 python/pyspark/serializers.py | 3 +-- python/pyspark/sql/tests/test_serde.py| 4 .../org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala | 4 5 files changed, 16 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index 01e64b6..9462dfd 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -186,6 +186,9 @@ private[spark] object SerDeUtil extends Logging { val unpickle = new Unpickler iter.flatMap { row => val obj = unpickle.loads(row) +// `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map +// of `Unpickler`. This map is cleared when calling `Unpickler.close()`. +unpickle.close() if (batched) { obj match { case array: Array[Any] => array.toSeq diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 3e1bbba..322ef93 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1347,6 +1347,10 @@ private[spark] abstract class SerDeBase { val unpickle = new Unpickler iter.flatMap { row => val obj = unpickle.loads(row) +// `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map +// of `Unpickler`. This map is cleared when calling `Unpickler.close()`. Pyrolite +// doesn't clear it up, so we manually clear it. +unpickle.close() if (batched) { obj match { case list: JArrayList[_] => list.asScala diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 5311087..6058e94 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -62,12 +62,11 @@ import itertools if sys.version < '3': import cPickle as pickle from itertools import izip as zip, imap as map -pickle_protocol = 2 else: import pickle basestring = unicode = str xrange = range -pickle_protocol = 3 +pickle_protocol = pickle.HIGHEST_PROTOCOL from pyspark import cloudpickle from pyspark.util import _exception_message diff --git a/python/pyspark/sql/tests/test_serde.py b/python/pyspark/sql/tests/test_serde.py index 1c18e93..ed4b9a7 100644 --- a/python/pyspark/sql/tests/test_serde.py +++ b/python/pyspark/sql/tests/test_serde.py @@ -128,6 +128,10 @@ class SerdeTests(ReusedSQLTestCase): def test_int_array_serialization(self): # Note that this test seems dependent on parallelism. +# This issue is because internal object map in Pyrolite is not cleared after op code +# STOP. If we use protocol 4 to pickle Python objects, op code MEMOIZE will store +# objects in the map. We n
[spark] branch master updated: [SPARK-27620][BUILD] Upgrade jetty to 9.4.18.v20190429
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 875e7e1 [SPARK-27620][BUILD] Upgrade jetty to 9.4.18.v20190429 875e7e1 is described below commit 875e7e1d972b0fcdc948fe906d0da68efbd497b2 Author: Yuming Wang AuthorDate: Fri May 3 09:25:54 2019 +0900 [SPARK-27620][BUILD] Upgrade jetty to 9.4.18.v20190429 ## What changes were proposed in this pull request? This pr upgrade jetty to [9.4.18.v20190429](https://github.com/eclipse/jetty.project/releases/tag/jetty-9.4.18.v20190429) because of [CVE-2019-10247](https://nvd.nist.gov/vuln/detail/CVE-2019-10247). ## How was this patch tested? Existing test. Closes #24513 from wangyum/SPARK-27620. Authored-by: Yuming Wang Signed-off-by: HyukjinKwon --- dev/deps/spark-deps-hadoop-3.2 | 4 ++-- pom.xml| 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-3.2 b/dev/deps/spark-deps-hadoop-3.2 index fd09e45..643ba2f 100644 --- a/dev/deps/spark-deps-hadoop-3.2 +++ b/dev/deps/spark-deps-hadoop-3.2 @@ -117,8 +117,8 @@ jersey-container-servlet-core-2.22.2.jar jersey-guava-2.22.2.jar jersey-media-jaxb-2.22.2.jar jersey-server-2.22.2.jar -jetty-webapp-9.4.12.v20180830.jar -jetty-xml-9.4.12.v20180830.jar +jetty-webapp-9.4.18.v20190429.jar +jetty-xml-9.4.18.v20190429.jar jline-2.14.6.jar joda-time-2.9.3.jar jodd-core-3.5.2.jar diff --git a/pom.xml b/pom.xml index 62be6d8..f3ba896 100644 --- a/pom.xml +++ b/pom.xml @@ -139,7 +139,7 @@ nohive com.twitter 1.6.0 -9.4.12.v20180830 +9.4.18.v20190429 3.1.0 0.9.3 2.4.0 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27612][PYTHON] Use Python's default protocol instead of highest protocol
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5c47924 [SPARK-27612][PYTHON] Use Python's default protocol instead of highest protocol 5c47924 is described below commit 5c479243de8d88dbe6ac7ab8580826511af74ff5 Author: HyukjinKwon AuthorDate: Fri May 3 14:40:13 2019 +0900 [SPARK-27612][PYTHON] Use Python's default protocol instead of highest protocol ## What changes were proposed in this pull request? This PR partially reverts https://github.com/apache/spark/pull/20691 After we changed the Python protocol to highest ones, seems like it introduced a correctness bug. This potentially affects all Python related code paths. I suspect a bug related to Pryolite (maybe opcodes `MEMOIZE`, `FRAME` and/or our `RowPickler`). I would like to stick to default protocol for now and investigate the issue separately. I will separately investigate later to bring highest protocol back. ## How was this patch tested? Unittest was added. ```bash ./run-tests --python-executables=python3.7 --testname "pyspark.sql.tests.test_serde SerdeTests.test_int_array_serialization" ``` Closes #24519 from HyukjinKwon/SPARK-27612. Authored-by: HyukjinKwon Signed-off-by: HyukjinKwon --- python/pyspark/serializers.py | 3 ++- python/pyspark/sql/tests/test_serde.py | 6 ++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 6058e94..5311087 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -62,11 +62,12 @@ import itertools if sys.version < '3': import cPickle as pickle from itertools import izip as zip, imap as map +pickle_protocol = 2 else: import pickle basestring = unicode = str xrange = range -pickle_protocol = pickle.HIGHEST_PROTOCOL +pickle_protocol = 3 from pyspark import cloudpickle from pyspark.util import _exception_message diff --git a/python/pyspark/sql/tests/test_serde.py b/python/pyspark/sql/tests/test_serde.py index 8707f46..1c18e93 100644 --- a/python/pyspark/sql/tests/test_serde.py +++ b/python/pyspark/sql/tests/test_serde.py @@ -126,6 +126,12 @@ class SerdeTests(ReusedSQLTestCase): df = self.spark.createDataFrame(data, schema=schema) df.collect() +def test_int_array_serialization(self): +# Note that this test seems dependent on parallelism. +data = self.spark.sparkContext.parallelize([[1, 2, 3, 4]] * 100, numSlices=12) +df = self.spark.createDataFrame(data, "array") +self.assertEqual(len(list(filter(lambda r: None in r.value, df.collect(, 0) + if __name__ == "__main__": import unittest - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (bde30bc -> d8db7db)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from bde30bc [SPARK-27467][FOLLOW-UP][BUILD] Upgrade Maven to 3.6.1 in AppVeyor and Doc new 447d018 Revert "[SPARK-27467][BUILD][TEST-MAVEN] Upgrade Maven to 3.6.1" new d8db7db Revert "[SPARK-27467][FOLLOW-UP][BUILD] Upgrade Maven to 3.6.1 in AppVeyor and Doc" The 24234 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: dev/appveyor-install-dependencies.ps1 | 2 +- docs/building-spark.md| 2 +- pom.xml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.3 updated: [MINOR][DOCS] Fix invalid documentation for StreamingQueryManager Class
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.3 by this push: new 564dbf6 [MINOR][DOCS] Fix invalid documentation for StreamingQueryManager Class 564dbf6 is described below commit 564dbf61b9e3febd623a08bec9506505fd337bc3 Author: Asaf Levy AuthorDate: Wed May 8 23:45:05 2019 +0900 [MINOR][DOCS] Fix invalid documentation for StreamingQueryManager Class ## What changes were proposed in this pull request? When following the example for using `spark.streams().awaitAnyTermination()` a valid pyspark code will output the following error: ``` Traceback (most recent call last): File "pyspark_app.py", line 182, in spark.streams().awaitAnyTermination() TypeError: 'StreamingQueryManager' object is not callable ``` Docs URL: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries This changes the documentation line to properly call the method under the StreamingQueryManager Class https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.streaming.StreamingQueryManager ## How was this patch tested? After changing the syntax, error no longer occurs and pyspark application works This is only docs change Closes #24547 from asaf400/patch-1. Authored-by: Asaf Levy Signed-off-by: HyukjinKwon (cherry picked from commit 09422f5139cc13abaf506453819c2bb91e174ae3) Signed-off-by: HyukjinKwon --- docs/structured-streaming-programming-guide.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 9a83f15..ab229a6 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -2310,11 +2310,11 @@ spark.streams().awaitAnyTermination(); // block until any one of them terminat {% highlight python %} spark = ... # spark session -spark.streams().active # get the list of currently active streaming queries +spark.streams.active # get the list of currently active streaming queries -spark.streams().get(id) # get a query object by its unique id +spark.streams.get(id) # get a query object by its unique id -spark.streams().awaitAnyTermination() # block until any one of them terminates +spark.streams.awaitAnyTermination() # block until any one of them terminates {% endhighlight %} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [MINOR][DOCS] Fix invalid documentation for StreamingQueryManager Class
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 09422f5 [MINOR][DOCS] Fix invalid documentation for StreamingQueryManager Class 09422f5 is described below commit 09422f5139cc13abaf506453819c2bb91e174ae3 Author: Asaf Levy AuthorDate: Wed May 8 23:45:05 2019 +0900 [MINOR][DOCS] Fix invalid documentation for StreamingQueryManager Class ## What changes were proposed in this pull request? When following the example for using `spark.streams().awaitAnyTermination()` a valid pyspark code will output the following error: ``` Traceback (most recent call last): File "pyspark_app.py", line 182, in spark.streams().awaitAnyTermination() TypeError: 'StreamingQueryManager' object is not callable ``` Docs URL: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries This changes the documentation line to properly call the method under the StreamingQueryManager Class https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.streaming.StreamingQueryManager ## How was this patch tested? After changing the syntax, error no longer occurs and pyspark application works This is only docs change Closes #24547 from asaf400/patch-1. Authored-by: Asaf Levy Signed-off-by: HyukjinKwon --- docs/structured-streaming-programming-guide.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 2c4169d..be30f6e 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -2554,11 +2554,11 @@ spark.streams().awaitAnyTermination(); // block until any one of them terminat {% highlight python %} spark = ... # spark session -spark.streams().active # get the list of currently active streaming queries +spark.streams.active # get the list of currently active streaming queries -spark.streams().get(id) # get a query object by its unique id +spark.streams.get(id) # get a query object by its unique id -spark.streams().awaitAnyTermination() # block until any one of them terminates +spark.streams.awaitAnyTermination() # block until any one of them terminates {% endhighlight %} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [MINOR][DOCS] Fix invalid documentation for StreamingQueryManager Class
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new b15866c [MINOR][DOCS] Fix invalid documentation for StreamingQueryManager Class b15866c is described below commit b15866cec48a3f61927283ba2afdc5616b702de9 Author: Asaf Levy AuthorDate: Wed May 8 23:45:05 2019 +0900 [MINOR][DOCS] Fix invalid documentation for StreamingQueryManager Class ## What changes were proposed in this pull request? When following the example for using `spark.streams().awaitAnyTermination()` a valid pyspark code will output the following error: ``` Traceback (most recent call last): File "pyspark_app.py", line 182, in spark.streams().awaitAnyTermination() TypeError: 'StreamingQueryManager' object is not callable ``` Docs URL: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries This changes the documentation line to properly call the method under the StreamingQueryManager Class https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.streaming.StreamingQueryManager ## How was this patch tested? After changing the syntax, error no longer occurs and pyspark application works This is only docs change Closes #24547 from asaf400/patch-1. Authored-by: Asaf Levy Signed-off-by: HyukjinKwon (cherry picked from commit 09422f5139cc13abaf506453819c2bb91e174ae3) Signed-off-by: HyukjinKwon --- docs/structured-streaming-programming-guide.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 3d91223..f0971ab 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -2539,11 +2539,11 @@ spark.streams().awaitAnyTermination(); // block until any one of them terminat {% highlight python %} spark = ... # spark session -spark.streams().active # get the list of currently active streaming queries +spark.streams.active # get the list of currently active streaming queries -spark.streams().get(id) # get a query object by its unique id +spark.streams.get(id) # get a query object by its unique id -spark.streams().awaitAnyTermination() # block until any one of them terminates +spark.streams.awaitAnyTermination() # block until any one of them terminates {% endhighlight %} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28240][PYTHON] Fix Arrow tests to pass with Python 2.7 and latest PyArrow and Pandas in PySpark
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5f7aceb [SPARK-28240][PYTHON] Fix Arrow tests to pass with Python 2.7 and latest PyArrow and Pandas in PySpark 5f7aceb is described below commit 5f7aceb9df472709ffcd3b06d1132be1b077291b Author: HyukjinKwon AuthorDate: Wed Jul 3 17:46:31 2019 +0900 [SPARK-28240][PYTHON] Fix Arrow tests to pass with Python 2.7 and latest PyArrow and Pandas in PySpark ## What changes were proposed in this pull request? In Python 2.7 with latest PyArrow and Pandas, the error message seems a bit different with Python 3. This PR simply fixes the test. ``` == FAIL: test_createDataFrame_with_incorrect_schema (pyspark.sql.tests.test_arrow.ArrowTests) -- Traceback (most recent call last): File "/.../spark/python/pyspark/sql/tests/test_arrow.py", line 275, in test_createDataFrame_with_incorrect_schema self.spark.createDataFrame(pdf, schema=wrong_schema) AssertionError: "integer.*required.*got.*str" does not match "('Exception thrown when converting pandas.Series (object) to Arrow Array (int32). It can be caused by overflows or other unsafe conversions warned by Arrow. Arrow safe type check can be disabled by using SQL config `spark.sql.execution.pandas.arrowSafeTypeConversion`.', ArrowTypeError('an integer is required',))" == FAIL: test_createDataFrame_with_incorrect_schema (pyspark.sql.tests.test_arrow.EncryptionArrowTests) -- Traceback (most recent call last): File "/.../spark/python/pyspark/sql/tests/test_arrow.py", line 275, in test_createDataFrame_with_incorrect_schema self.spark.createDataFrame(pdf, schema=wrong_schema) AssertionError: "integer.*required.*got.*str" does not match "('Exception thrown when converting pandas.Series (object) to Arrow Array (int32). It can be caused by overflows or other unsafe conversions warned by Arrow. Arrow safe type check can be disabled by using SQL config `spark.sql.execution.pandas.arrowSafeTypeConversion`.', ArrowTypeError('an integer is required',))" ``` ## How was this patch tested? Manually tested. ``` cd python ./run-tests --python-executables=python --modules pyspark-sql ``` Closes #25042 from HyukjinKwon/SPARK-28240. Authored-by: HyukjinKwon Signed-off-by: HyukjinKwon --- python/pyspark/sql/tests/test_arrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index 1f96d2c..f533083 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -271,7 +271,7 @@ class ArrowTests(ReusedSQLTestCase): fields[0], fields[1] = fields[1], fields[0] # swap str with int wrong_schema = StructType(fields) with QuietTest(self.sc): -with self.assertRaisesRegexp(Exception, "integer.*required.*got.*str"): +with self.assertRaisesRegexp(Exception, "integer.*required"): self.spark.createDataFrame(pdf, schema=wrong_schema) def test_createDataFrame_with_names(self): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28198][PYTHON][FOLLOW-UP] Rename mapPartitionsInPandas to mapInPandas with a separate evaluation type
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5c55812 [SPARK-28198][PYTHON][FOLLOW-UP] Rename mapPartitionsInPandas to mapInPandas with a separate evaluation type 5c55812 is described below commit 5c55812400e1e0a8aaeb50a50be106e80c916c86 Author: HyukjinKwon AuthorDate: Fri Jul 5 09:22:41 2019 +0900 [SPARK-28198][PYTHON][FOLLOW-UP] Rename mapPartitionsInPandas to mapInPandas with a separate evaluation type ## What changes were proposed in this pull request? This PR proposes to rename `mapPartitionsInPandas` to `mapInPandas` with a separate evaluation type . Had an offline discussion with rxin, mengxr and cloud-fan The reason is basically: 1. `SCALAR_ITER` doesn't make sense with `mapPartitionsInPandas`. 2. It cannot share the same Pandas UDF, for instance, at `select` and `mapPartitionsInPandas` unlike `GROUPED_AGG` because iterator's return type is different. 3. `mapPartitionsInPandas` -> `mapInPandas` - see https://github.com/apache/spark/pull/25044#issuecomment-508298552 and https://github.com/apache/spark/pull/25044#issuecomment-508299764 Renaming `SCALAR_ITER` as `MAP_ITER` is abandoned due to 2. reason. For `XXX_ITER`, it might have to have a different interface in the future if we happen to add other versions of them. But this is an orthogonal topic with `mapPartitionsInPandas`. ## How was this patch tested? Existing tests should cover. Closes #25044 from HyukjinKwon/SPARK-28198. Authored-by: HyukjinKwon Signed-off-by: HyukjinKwon --- python/pyspark/sql/dataframe.py| 13 +--- python/pyspark/sql/functions.py| 5 - python/pyspark/sql/tests/test_pandas_udf_iter.py | 24 +++--- python/pyspark/sql/udf.py | 20 +++--- python/pyspark/worker.py | 7 +-- .../plans/logical/pythonLogicalOperators.scala | 4 ++-- .../main/scala/org/apache/spark/sql/Dataset.scala | 10 - .../spark/sql/execution/SparkStrategies.scala | 4 ++-- ...onsInPandasExec.scala => MapInPandasExec.scala} | 2 +- 9 files changed, 52 insertions(+), 37 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 3f5d1ff..e666973 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -2193,7 +2193,7 @@ class DataFrame(object): _check_series_convert_timestamps_local_tz(pdf[field.name], timezone) return pdf -def mapPartitionsInPandas(self, udf): +def mapInPandas(self, udf): """ Maps each partition of the current :class:`DataFrame` using a pandas udf and returns the result as a `DataFrame`. @@ -2215,7 +2215,7 @@ class DataFrame(object): ... def filter_func(iterator): ... for pdf in iterator: ... yield pdf[pdf.id == 1] ->>> df.mapPartitionsInPandas(filter_func).show() # doctest: +SKIP +>>> df.mapInPandas(filter_func).show() # doctest: +SKIP +---+---+ | id|age| +---+---+ @@ -2227,15 +2227,12 @@ class DataFrame(object): """ # Columns are special because hasattr always return True if isinstance(udf, Column) or not hasattr(udf, 'func') \ -or udf.evalType != PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF: +or udf.evalType != PythonEvalType.SQL_MAP_PANDAS_ITER_UDF: raise ValueError("Invalid udf: the udf argument must be a pandas_udf of type " - "SCALAR_ITER.") - -if not isinstance(udf.returnType, StructType): -raise ValueError("The returnType of the pandas_udf must be a StructType") + "MAP_ITER.") udf_column = udf(*[self[col] for col in self.columns]) -jdf = self._jdf.mapPartitionsInPandas(udf_column._jc.expr()) +jdf = self._jdf.mapInPandas(udf_column._jc.expr()) return DataFrame(jdf, self.sql_ctx) def _collectAsArrow(self): diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 5d1e69e..bf33b9a 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2802,6 +2802,8 @@ class PandasUDFType(object): GROUPED_AGG = PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF +MAP_ITER = PythonEvalType.SQL_MAP_PANDAS_ITER_UDF + @since(1.3) def udf(f=None, returnType=StringType()): @@ -3278,7 +3280,8 @@ def pandas_udf(f=None, returnType=None, functionType=None): if eval_type not in [PythonEvalT
[spark] branch master updated: [SPARK-28179][SQL] Avoid hard-coded config: spark.sql.globalTempDatabase
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 410a898 [SPARK-28179][SQL] Avoid hard-coded config: spark.sql.globalTempDatabase 410a898 is described below commit 410a898cf979da7f6c78fbed420468eae8d74251 Author: Yuming Wang AuthorDate: Fri Jun 28 10:42:35 2019 +0900 [SPARK-28179][SQL] Avoid hard-coded config: spark.sql.globalTempDatabase ## What changes were proposed in this pull request? Avoid hard-coded config: `spark.sql.globalTempDatabase`. ## How was this patch tested? N/A Closes #24979 from wangyum/SPARK-28179. Authored-by: Yuming Wang Signed-off-by: HyukjinKwon --- .../scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 3 ++- .../src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala | 3 +++ .../src/main/scala/org/apache/spark/sql/internal/SharedState.scala | 2 +- .../scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala| 5 +++-- 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index e49e54f..dcc6298 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View} import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils @@ -71,7 +72,7 @@ class SessionCatalog( conf: SQLConf) { this( () => externalCatalog, - () => new GlobalTempViewManager("global_temp"), + () => new GlobalTempViewManager(conf.getConf(GLOBAL_TEMP_DATABASE)), functionRegistry, conf, new Configuration(), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index 3fda4c8..d665d16 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.internal +import java.util.Locale + import org.apache.spark.util.Utils @@ -42,6 +44,7 @@ object StaticSQLConf { val GLOBAL_TEMP_DATABASE = buildStaticConf("spark.sql.globalTempDatabase") .internal() .stringConf +.transform(_.toLowerCase(Locale.ROOT)) .createWithDefault("global_temp") // This is used to control when we will split a schema's JSON string to multiple pieces diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index e6c40bd..f1a6481 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -158,7 +158,7 @@ private[sql] class SharedState( // System preserved database should not exists in metastore. However it's hard to guarantee it // for every session, because case-sensitivity differs. Here we always lowercase it to make our // life easier. -val globalTempDB = conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT) +val globalTempDB = conf.get(GLOBAL_TEMP_DATABASE) if (externalCatalog.databaseExists(globalTempDB)) { throw new SparkException( s"$globalTempDB is a system preserved database, please rename your existing database " + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index cc1ead9..acfb84e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} import org.apache.spark.sql.hive.test.{HiveTestUtils, TestHiveSingleton} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ impo
[spark] branch master updated: [SPARK-28185][PYTHON][SQL] Closes the generator when Python UDFs stop early
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 31e7c37 [SPARK-28185][PYTHON][SQL] Closes the generator when Python UDFs stop early 31e7c37 is described below commit 31e7c37354132545da59bff176af1613bd09447c Author: WeichenXu AuthorDate: Fri Jun 28 17:10:25 2019 +0900 [SPARK-28185][PYTHON][SQL] Closes the generator when Python UDFs stop early ## What changes were proposed in this pull request? Closes the generator when Python UDFs stop early. ### Manually verification on pandas iterator UDF and mapPartitions ```python from pyspark.sql import SparkSession from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.functions import col, udf from pyspark.taskcontext import TaskContext import time import os spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '1') spark.conf.set('spark.sql.pandas.udf.buffer.size', '4') pandas_udf("int", PandasUDFType.SCALAR_ITER) def fi1(it): try: for batch in it: yield batch + 100 time.sleep(1.0) except BaseException as be: print("Debug: exception raised: " + str(type(be))) raise be finally: open("/tmp/01.tmp", "a").close() df1 = spark.range(10).select(col('id').alias('a')).repartition(1) # will see log Debug: exception raised: # and file "/tmp/01.tmp" generated. df1.select(col('a'), fi1('a')).limit(2).collect() def mapper(it): try: for batch in it: yield batch except BaseException as be: print("Debug: exception raised: " + str(type(be))) raise be finally: open("/tmp/02.tmp", "a").close() df2 = spark.range(1000).repartition(1) # will see log Debug: exception raised: # and file "/tmp/02.tmp" generated. df2.rdd.mapPartitions(mapper).take(2) ``` ## How was this patch tested? Unit test added. Please review https://spark.apache.org/contributing.html before opening a pull request. Closes #24986 from WeichenXu123/pandas_iter_udf_limit. Authored-by: WeichenXu Signed-off-by: HyukjinKwon --- python/pyspark/sql/tests/test_pandas_udf_scalar.py | 37 ++ python/pyspark/worker.py | 7 +++- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/test_pandas_udf_scalar.py b/python/pyspark/sql/tests/test_pandas_udf_scalar.py index c291d42..d254508 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_scalar.py +++ b/python/pyspark/sql/tests/test_pandas_udf_scalar.py @@ -850,6 +850,43 @@ class ScalarPandasUDFTests(ReusedSQLTestCase): with self.assertRaisesRegexp(Exception, "reached finally block"): self.spark.range(1).select(test_close(col("id"))).collect() +def test_scalar_iter_udf_close_early(self): +tmp_dir = tempfile.mkdtemp() +try: +tmp_file = tmp_dir + '/reach_finally_block' + +@pandas_udf('int', PandasUDFType.SCALAR_ITER) +def test_close(batch_iter): +generator_exit_caught = False +try: +for batch in batch_iter: +yield batch +time.sleep(1.0) # avoid the function finish too fast. +except GeneratorExit as ge: +generator_exit_caught = True +raise ge +finally: +assert generator_exit_caught, "Generator exit exception was not caught." +open(tmp_file, 'a').close() + +with QuietTest(self.sc): +with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 1, +"spark.sql.pandas.udf.buffer.size": 4}): +self.spark.range(10).repartition(1) \ +.select(test_close(col("id"))).limit(2).collect() +# wait here because python udf worker will take some time to detect +# jvm side socket closed and then will trigger `GenerateExit` raised. +# wait timeout is 10s. +for i in range(100): +time.sleep(0.1) +if os.path.exists(tmp_file): +break + +assert os.path.exists(tmp_file), "finally block not reached." + +finally: +
[spark] branch master updated (06ac7d5 -> b598dfd)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 06ac7d5 [SPARK-27922][SQL][PYTHON][TESTS] Convert and port 'natural-join.sql' into UDF test base add b598dfd [SPARK-28275][SQL][PYTHON][TESTS] Convert and port 'count.sql' into UDF test base No new revisions were added by this update. Summary of changes: .../resources/sql-tests/inputs/udf/udf-count.sql | 28 +++ .../sql-tests/results/udf/udf-count.sql.out| 55 ++ 2 files changed, 83 insertions(+) create mode 100644 sql/core/src/test/resources/sql-tests/inputs/udf/udf-count.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/udf/udf-count.sql.out - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (6532153 -> 92e051c)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 6532153 [SPARK-28015][SQL] Check stringToDate() consumes entire input for the and -[m]m formats add 92e051c [SPARK-28270][SQL][PYTHON] Convert and port 'pgSQL/aggregates_part1.sql' into UDF test base No new revisions were added by this update. Summary of changes: .../pgSQL/udf-aggregates_part1.sql}| 99 +++--- .../results/udf/pgSQL/udf-aggregates_part1.sql.out | 387 + .../apache/spark/sql/IntegratedUDFTestUtils.scala | 2 +- 3 files changed, 440 insertions(+), 48 deletions(-) copy sql/core/src/test/resources/sql-tests/inputs/{pgSQL/aggregates_part1.sql => udf/pgSQL/udf-aggregates_part1.sql} (55%) create mode 100644 sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-aggregates_part1.sql.out - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28234][CORE][PYTHON] Add python and JavaSparkContext support to get resources
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f84cca2 [SPARK-28234][CORE][PYTHON] Add python and JavaSparkContext support to get resources f84cca2 is described below commit f84cca2d84d67cee2877092d0354cf111c95eb8e Author: Thomas Graves AuthorDate: Thu Jul 11 09:32:58 2019 +0900 [SPARK-28234][CORE][PYTHON] Add python and JavaSparkContext support to get resources ## What changes were proposed in this pull request? Add python api support and JavaSparkContext support for resources(). I needed the JavaSparkContext support for it to properly translate into python with the py4j stuff. ## How was this patch tested? Unit tests added and manually tested in local cluster mode and on yarn. Closes #25087 from tgravescs/SPARK-28234-python. Authored-by: Thomas Graves Signed-off-by: HyukjinKwon --- .../apache/spark/api/java/JavaSparkContext.scala | 3 ++ .../org/apache/spark/api/python/PythonRunner.scala | 10 + python/pyspark/__init__.py | 3 +- python/pyspark/context.py | 12 ++ python/pyspark/resourceinformation.py | 43 ++ python/pyspark/taskcontext.py | 8 python/pyspark/tests/test_context.py | 35 +- python/pyspark/tests/test_taskcontext.py | 38 +++ python/pyspark/worker.py | 11 ++ 9 files changed, 161 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index c5ef190..330c2f6 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -35,6 +35,7 @@ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.broadcast.Broadcast import org.apache.spark.input.PortableDataStream import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD} +import org.apache.spark.resource.ResourceInformation /** * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns @@ -114,6 +115,8 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable { def appName: String = sc.appName + def resources: JMap[String, ResourceInformation] = sc.resources.asJava + def jars: util.List[String] = sc.jars.asJava def startTime: java.lang.Long = sc.startTime diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 414d208..dc6c596 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -281,6 +281,16 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(context.partitionId()) dataOut.writeInt(context.attemptNumber()) dataOut.writeLong(context.taskAttemptId()) +val resources = context.resources() +dataOut.writeInt(resources.size) +resources.foreach { case (k, v) => + PythonRDD.writeUTF(k, dataOut) + PythonRDD.writeUTF(v.name, dataOut) + dataOut.writeInt(v.addresses.size) + v.addresses.foreach { case addr => +PythonRDD.writeUTF(addr, dataOut) + } +} val localProps = context.getLocalProperties.asScala dataOut.writeInt(localProps.size) localProps.foreach { case (k, v) => diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index ee153af..70c0b27 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -54,6 +54,7 @@ from pyspark.files import SparkFiles from pyspark.storagelevel import StorageLevel from pyspark.accumulators import Accumulator, AccumulatorParam from pyspark.broadcast import Broadcast +from pyspark.resourceinformation import ResourceInformation from pyspark.serializers import MarshalSerializer, PickleSerializer from pyspark.status import * from pyspark.taskcontext import TaskContext, BarrierTaskContext, BarrierTaskInfo @@ -118,5 +119,5 @@ __all__ = [ "SparkConf", "SparkContext", "SparkFiles", "RDD", "StorageLevel", "Broadcast", "Accumulator", "AccumulatorParam", "MarshalSerializer", "PickleSerializer", "StatusTracker", "SparkJobInfo", "SparkStageInfo", "Profiler", "BasicProfiler", "TaskContext", -"RDDBarrier", "BarrierTaskContext", "Barrier
[spark] branch master updated (b598dfd -> 8d686f3)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b598dfd [SPARK-28275][SQL][PYTHON][TESTS] Convert and port 'count.sql' into UDF test base add 8d686f3 [SPARK-28271][SQL][PYTHON][TESTS] Convert and port 'pgSQL/aggregates_part2.sql' into UDF test base No new revisions were added by this update. Summary of changes: .../pgSQL/udf-aggregates_part2.sql}| 30 + .../pgSQL/udf-aggregates_part2.sql.out}| 38 +++--- 2 files changed, 36 insertions(+), 32 deletions(-) copy sql/core/src/test/resources/sql-tests/inputs/{pgSQL/aggregates_part2.sql => udf/pgSQL/udf-aggregates_part2.sql} (85%) copy sql/core/src/test/resources/sql-tests/results/{pgSQL/aggregates_part2.sql.out => udf/pgSQL/udf-aggregates_part2.sql.out} (70%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28281][SQL][PYTHON][TESTS] Convert and port 'having.sql' into UDF test base
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3a94fb3 [SPARK-28281][SQL][PYTHON][TESTS] Convert and port 'having.sql' into UDF test base 3a94fb3 is described below commit 3a94fb3dd92a05676a3c11cbcea314dd296ec059 Author: Huaxin Gao AuthorDate: Thu Jul 11 09:57:34 2019 +0900 [SPARK-28281][SQL][PYTHON][TESTS] Convert and port 'having.sql' into UDF test base ## What changes were proposed in this pull request? This PR adds some tests converted from having.sql to test UDFs following the combination guide in [SPARK-27921](url) Diff comparing to 'having.sql' ```diff diff --git a/sql/core/src/test/resources/sql-tests/results/having.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-having.sql.out index d87ee52216..7cea2e5128 100644 --- a/sql/core/src/test/resources/sql-tests/results/having.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-having.sql.out -16,34 +16,34 struct<> -- !query 1 -SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 +SELECT udf(k) AS k, udf(sum(v)) FROM hav GROUP BY k HAVING udf(sum(v)) > 2 -- !query 1 schema -struct +struct -- !query 1 output one6 three 3 -- !query 2 -SELECT count(k) FROM hav GROUP BY v + 1 HAVING v + 1 = 2 +SELECT udf(count(udf(k))) FROM hav GROUP BY v + 1 HAVING v + 1 = udf(2) -- !query 2 schema -struct +struct -- !query 2 output 1 -- !query 3 -SELECT MIN(t.v) FROM (SELECT * FROM hav WHERE v > 0) t HAVING(COUNT(1) > 0) +SELECT udf(MIN(t.v)) FROM (SELECT * FROM hav WHERE v > 0) t HAVING(udf(COUNT(udf(1))) > 0) -- !query 3 schema -struct +struct -- !query 3 output 1 -- !query 4 -SELECT a + b FROM VALUES (1L, 2), (3L, 4) AS T(a, b) GROUP BY a + b HAVING a + b > 1 +SELECT udf(a + b) FROM VALUES (1L, 2), (3L, 4) AS T(a, b) GROUP BY a + b HAVING a + b > udf(1) -- !query 4 schema -struct<(a + CAST(b AS BIGINT)):bigint> +struct -- !query 4 output 3 7 ``` ## How was this patch tested? Tested as guided in SPARK-27921. Closes #25093 from huaxingao/spark-28281. Authored-by: Huaxin Gao Signed-off-by: HyukjinKwon --- .../resources/sql-tests/inputs/udf/udf-having.sql | 22 ++ .../sql-tests/results/udf/udf-having.sql.out | 49 ++ 2 files changed, 71 insertions(+) diff --git a/sql/core/src/test/resources/sql-tests/inputs/udf/udf-having.sql b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-having.sql new file mode 100644 index 000..6ae34ae --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-having.sql @@ -0,0 +1,22 @@ +-- This test file was converted from having.sql. +-- Note that currently registered UDF returns a string. So there are some differences, for instance +-- in string cast within UDF in Scala and Python. + +create temporary view hav as select * from values + ("one", 1), + ("two", 2), + ("three", 3), + ("one", 5) + as hav(k, v); + +-- having clause +SELECT udf(k) AS k, udf(sum(v)) FROM hav GROUP BY k HAVING udf(sum(v)) > 2; + +-- having condition contains grouping column +SELECT udf(count(udf(k))) FROM hav GROUP BY v + 1 HAVING v + 1 = udf(2); + +-- SPARK-11032: resolve having correctly +SELECT udf(MIN(t.v)) FROM (SELECT * FROM hav WHERE v > 0) t HAVING(udf(COUNT(udf(1))) > 0); + +-- SPARK-20329: make sure we handle timezones correctly +SELECT udf(a + b) FROM VALUES (1L, 2), (3L, 4) AS T(a, b) GROUP BY a + b HAVING a + b > udf(1); diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-having.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-having.sql.out new file mode 100644 index 000..7cea2e5 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-having.sql.out @@ -0,0 +1,49 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 5 + + +-- !query 0 +create temporary view hav as select * from values + ("one", 1), + ("two", 2), + ("three", 3), + ("one", 5) + as hav(k, v) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +SELECT udf(k) AS k, udf(sum(v)) FROM hav GROUP BY k HAVING udf(sum(v)) > 2 +-- !query 1 schema +struct +-- !query 1 output +one6 +three 3 + + +-- !query 2 +SELECT udf(count(udf(k))) FROM hav GROUP BY v + 1 HAVING v + 1 = udf(2) +-- !query 2 schema +struct +-- !query 2 output +1 + + +-- !query 3 +SELECT udf(MIN(t.v)) FROM (SELECT * FROM hav WHERE v > 0) t HAVING(udf(COUNT(udf(1))) > 0) +-- !query 3
[spark] branch master updated: [SPARK-27922][SQL][PYTHON][TESTS] Convert and port 'natural-join.sql' into UDF test base
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 06ac7d5 [SPARK-27922][SQL][PYTHON][TESTS] Convert and port 'natural-join.sql' into UDF test base 06ac7d5 is described below commit 06ac7d5966a2f072d523de8f798e601473a49871 Author: manu.zhang AuthorDate: Thu Jul 11 09:37:25 2019 +0900 [SPARK-27922][SQL][PYTHON][TESTS] Convert and port 'natural-join.sql' into UDF test base ## What changes were proposed in this pull request? This PR adds some tests converted from `natural-join.sql` to test UDFs following the combination guide in [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921). Diff results comparing to `natural-join.sql` ```diff diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join. sql.out index 43f2f9a..53ef177 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out -27,7 +27,7 struct<> -- !query 2 -SELECT * FROM nt1 natural join nt2 where k = "one" +SELECT * FROM nt1 natural join nt2 where udf(k) = "one" -- !query 2 schema struct -- !query 2 output -36,7 +36,7 one 1 5 -- !query 3 -SELECT * FROM nt1 natural left join nt2 order by v1, v2 +SELECT * FROM nt1 natural left join nt2 where k <> udf("") order by v1, v2 -- !query 3 schema diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join. sql.out index 43f2f9a..53ef177 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out -27,7 +27,7 struct<> -- !query 2 -SELECT * FROM nt1 natural join nt2 where k = "one" +SELECT * FROM nt1 natural join nt2 where udf(k) = "one" -- !query 2 schema struct -- !query 2 output -36,7 +36,7 one 1 5 -- !query 3 -SELECT * FROM nt1 natural left join nt2 order by v1, v2 +SELECT * FROM nt1 natural left join nt2 where k <> udf("") order by v1, v2 -- !query 3 schema struct ``` ## How was this patch tested? Tested as guided in [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921). Closes #25088 from manuzhang/SPARK-27922. Authored-by: manu.zhang Signed-off-by: HyukjinKwon --- .../sql-tests/inputs/udf/udf-natural-join.sql | 29 ++ .../sql-tests/results/udf/udf-natural-join.sql.out | 64 ++ 2 files changed, 93 insertions(+) diff --git a/sql/core/src/test/resources/sql-tests/inputs/udf/udf-natural-join.sql b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-natural-join.sql new file mode 100644 index 000..6862683 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-natural-join.sql @@ -0,0 +1,29 @@ +-- List of configuration the test suite is run against: +--SET spark.sql.autoBroadcastJoinThreshold=10485760 +--SET spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.preferSortMergeJoin=true +--SET spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.preferSortMergeJoin=false + +-- This test file was converted from natural-join.sql. +-- Note that currently registered UDF returns a string. So there are some differences, for instance +-- in string cast within UDF in Scala and Python. + +create temporary view nt1 as select * from values + ("one", 1), + ("two", 2), + ("three", 3) + as nt1(k, v1); + +create temporary view nt2 as select * from values + ("one", 1), + ("two", 22), + ("one", 5) + as nt2(k, v2); + + +SELECT * FROM nt1 natural join nt2 where udf(k) = "one"; + +SELECT * FROM nt1 natural left join nt2 where k <> udf("") order by v1, v2; + +SELECT * FROM nt1 natural right join nt2 where udf(k) <> udf("") order by v1, v2; + +SELECT udf(count(*)) FROM nt1 natural full outer join nt2; diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out new file mode 100644 index 000..53ef177 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-natural-join.sql.out @@ -0,0 +1,64 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 6 + + +-- !query 0 +create temporary view
[spark] branch master updated (7021588 -> 19bcce1)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 7021588 [SPARK-28306][SQL] Make NormalizeFloatingNumbers rule idempotent add 19bcce1 [SPARK-28270][SQL][FOLLOW-UP] Explicitly cast into int/long/decimal in udf-aggregates_part1.sql to avoid Python float limitation No new revisions were added by this update. Summary of changes: .../inputs/udf/pgSQL/udf-aggregates_part1.sql | 42 +++ .../results/udf/pgSQL/udf-aggregates_part1.sql.out | 140 ++--- 2 files changed, 91 insertions(+), 91 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-28357][CORE][TEST] Fix Flaky Test - FileAppenderSuite.rollingfile appender - size-based rolling compressed
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 094a20c [SPARK-28357][CORE][TEST] Fix Flaky Test - FileAppenderSuite.rollingfile appender - size-based rolling compressed 094a20c is described below commit 094a20ce9c110084f427897f95860981d1fc8212 Author: Dongjoon Hyun AuthorDate: Fri Jul 12 18:40:07 2019 +0900 [SPARK-28357][CORE][TEST] Fix Flaky Test - FileAppenderSuite.rollingfile appender - size-based rolling compressed ## What changes were proposed in this pull request? `SizeBasedRollingPolicy.shouldRollover` returns false when the size is equal to `rolloverSizeBytes`. ```scala /** Should rollover if the next set of bytes is going to exceed the size limit */ def shouldRollover(bytesToBeWritten: Long): Boolean = { logDebug(s"$bytesToBeWritten + $bytesWrittenSinceRollover > $rolloverSizeBytes") bytesToBeWritten + bytesWrittenSinceRollover > rolloverSizeBytes } ``` - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/107553/testReport/org.apache.spark.util/FileAppenderSuite/rolling_file_appender___size_based_rolling__compressed_/ ``` org.scalatest.exceptions.TestFailedException: 1000 was not less than 1000 ``` ## How was this patch tested? Pass the Jenkins with the updated test. Closes #25125 from dongjoon-hyun/SPARK-28357. Authored-by: Dongjoon Hyun Signed-off-by: HyukjinKwon (cherry picked from commit 1c29212394adcbde2de4f4dfdc43a1cf32671ae1) Signed-off-by: HyukjinKwon --- core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala index 52cd537..1a3e880 100644 --- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -128,7 +128,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { val files = testRolling(appender, testOutputStream, textToAppend, 0, isCompressed = true) files.foreach { file => logInfo(file.toString + ": " + file.length + " bytes") - assert(file.length < rolloverSize) + assert(file.length <= rolloverSize) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org