[spark] branch master updated: [SPARK-37133][SQL] Add a config to optionally enforce ANSI reserved keywords
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new cfb96eb [SPARK-37133][SQL] Add a config to optionally enforce ANSI reserved keywords cfb96eb is described below commit cfb96ebd3991c62fc737242aeeb9b5cdb4abe7ae Author: Wenchen Fan AuthorDate: Thu Oct 28 12:58:15 2021 +0800 [SPARK-37133][SQL] Add a config to optionally enforce ANSI reserved keywords ### What changes were proposed in this pull request? This PR adds a new config to optionally enforce the ANSI reserved keywords in the parser. The default value is true, so we by default still enforce it and there is no behavior change. ### Why are the changes needed? In Spark 3.2, the ANSI mode is GA. We want more people to try and use the ANSI mode, to find data issues as early as possible and get better data quality. However, the reserved keywords thing is a big stopper for many users that want to try ANSI mode. They have to update the SQL queries to pass the parser, which is nothing about data quality but just trouble. With a new config to allow users to not enforce reserved keywords, I think we can get better adoption of the ANSI mode. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? updated tests. Closes #34403 from cloud-fan/parser. Lead-authored-by: Wenchen Fan Co-authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- docs/sql-ref-ansi-compliance.md | 4 +++- .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala| 2 +- .../org/apache/spark/sql/catalyst/parser/ParseDriver.scala | 2 +- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 10 ++ .../spark/sql/catalyst/parser/ExpressionParserSuite.scala| 12 +++- .../sql/catalyst/parser/TableIdentifierParserSuite.scala | 9 + 6 files changed, 35 insertions(+), 4 deletions(-) diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index c10e866..4527faa 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -299,7 +299,9 @@ When the ANSI mode is disabled, Spark SQL has two kinds of keywords: * Non-reserved keywords: Same definition as the one when the ANSI mode enabled. * Strict-non-reserved keywords: A strict version of non-reserved keywords, which can not be used as table alias. -By default `spark.sql.ansi.enabled` is false. +If you want to still use reserved keywords as identifiers with ANSI mode, you can set `spark.sql.ansi.enforceReservedKeywords` to false. + +By default `spark.sql.ansi.enabled` is false and `spark.sql.ansi.enforceReservedKeywords` is true. Below is a list of all the keywords in Spark SQL. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index d36c7ac..768d406 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -1728,7 +1728,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } override def visitCurrentLike(ctx: CurrentLikeContext): Expression = withOrigin(ctx) { -if (conf.ansiEnabled) { +if (conf.enforceReservedKeywords) { ctx.name.getType match { case SqlBaseParser.CURRENT_DATE => CurrentDate() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala index 64216e6..b459a2d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala @@ -100,7 +100,7 @@ abstract class AbstractSqlParser extends ParserInterface with SQLConfHelper with parser.addErrorListener(ParseErrorListener) parser.legacy_setops_precedence_enabled = conf.setOpsPrecedenceEnforced parser.legacy_exponent_literal_as_decimal_enabled = conf.exponentLiteralAsDecimalEnabled -parser.SQL_standard_keyword_behavior = conf.ansiEnabled +parser.SQL_standard_keyword_behavior = conf.enforceReservedKeywords try { try { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5023b4a..fe3204b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2590,6 +2590,14 @@ object SQLConf {
[spark] branch master updated (30e1261 -> 502a1ec)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 30e1261 [SPARK-37121][HIVE][TEST] Fix Python version detection bug in TestUtils used by HiveExternalCatalogVersionsSuite add 502a1ec [SPARK-37036][PYTHON] Add util function to raise advice warning for pandas API on Spark No new revisions were added by this update. Summary of changes: python/pyspark/pandas/accessors.py| 9 + python/pyspark/pandas/frame.py| 38 +++ python/pyspark/pandas/generic.py | 13 python/pyspark/pandas/groupby.py | 9 + python/pyspark/pandas/indexes/base.py | 13 python/pyspark/pandas/namespace.py| 25 +++ python/pyspark/pandas/series.py | 9 + python/pyspark/pandas/utils.py| 9 + 8 files changed, 125 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-37121][HIVE][TEST] Fix Python version detection bug in TestUtils used by HiveExternalCatalogVersionsSuite
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 31af87c [SPARK-37121][HIVE][TEST] Fix Python version detection bug in TestUtils used by HiveExternalCatalogVersionsSuite 31af87c is described below commit 31af87c154881743ad85d0259d5d8769968a8b58 Author: Erik Krogen AuthorDate: Thu Oct 28 09:46:01 2021 +0900 [SPARK-37121][HIVE][TEST] Fix Python version detection bug in TestUtils used by HiveExternalCatalogVersionsSuite ### What changes were proposed in this pull request? Fix a bug in `TestUtils.isPythonVersionAtLeast38` to allow for `HiveExternalCatalogVersionsSuite` to test against Spark 2.x releases in environments with Python <= 3.7. ### Why are the changes needed? The logic in `TestUtils.isPythonVersionAtLeast38` was added in #30044 to prevent Spark 2.4 from being run in an environment where the Python3 version installed was >= Python 3.8, which is not compatible with Spark 2.4. However, this method always returns true, so only Spark 3.x versions will ever be included in the version set for `HiveExternalCatalogVersionsSuite`, regardless of the system-installed version of Python. The problem is here: https://github.com/apache/spark/blob/951efb80856e2a92ba3690886c95643567dae9d0/core/src/main/scala/org/apache/spark/TestUtils.scala#L280-L291 It's trying to evaluate the version of Python using a `ProcessLogger`, but the logger accepts a `String => Unit` function, i.e., it does not make use of the return value in any way (since it's meant for logging). So the result of the `startsWith` checks are thrown away, and `attempt.isSuccess && attempt.get == 0` will always be true as long as your system has a `python3` binary (of any version). ### Does this PR introduce _any_ user-facing change? No, test changes only. ### How was this patch tested? Confirmed by checking that `HiveExternalCatalogVersionsSuite` downloads binary distros for Spark 2.x lines as well as 3.x when I symlink my `python3` to Python 3.7, and only downloads distros for the 3.x lines when I symlink my `python3` to Python 3.9. ```bash brew link --force python3.7 # run HiveExternalCatalogVersionsSuite and validate that 2.x and 3.x tests get executed brew unlink python3.7 brew link --force python3.9 # run HiveExternalCatalogVersionsSuite and validate that only 3.x tests get executed ``` Closes #34395 from xkrogen/xkrogen-SPARK-37121-testutils-python38-fix. Authored-by: Erik Krogen Signed-off-by: Hyukjin Kwon (cherry picked from commit 30e126176a9e29f23d6a16407074200d377153d6) Signed-off-by: Hyukjin Kwon --- core/src/main/scala/org/apache/spark/TestUtils.scala | 13 +++-- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index dcbb9ba..2cbd156 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -277,16 +277,9 @@ private[spark] object TestUtils { } def isPythonVersionAtLeast38(): Boolean = { -val attempt = if (Utils.isWindows) { - Try(Process(Seq("cmd.exe", "/C", "python3 --version")) -.run(ProcessLogger(s => s.startsWith("Python 3.8") || s.startsWith("Python 3.9"))) -.exitValue()) -} else { - Try(Process(Seq("sh", "-c", "python3 --version")) -.run(ProcessLogger(s => s.startsWith("Python 3.8") || s.startsWith("Python 3.9"))) -.exitValue()) -} -attempt.isSuccess && attempt.get == 0 +val cmdSeq = if (Utils.isWindows) Seq("cmd.exe", "/C") else Seq("sh", "-c") +val pythonSnippet = "import sys; sys.exit(sys.version_info < (3, 8, 0))" +Try(Process(cmdSeq :+ s"python3 -c '$pythonSnippet'").! == 0).getOrElse(false) } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-37121][HIVE][TEST] Fix Python version detection bug in TestUtils used by HiveExternalCatalogVersionsSuite
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 30e1261 [SPARK-37121][HIVE][TEST] Fix Python version detection bug in TestUtils used by HiveExternalCatalogVersionsSuite 30e1261 is described below commit 30e126176a9e29f23d6a16407074200d377153d6 Author: Erik Krogen AuthorDate: Thu Oct 28 09:46:01 2021 +0900 [SPARK-37121][HIVE][TEST] Fix Python version detection bug in TestUtils used by HiveExternalCatalogVersionsSuite ### What changes were proposed in this pull request? Fix a bug in `TestUtils.isPythonVersionAtLeast38` to allow for `HiveExternalCatalogVersionsSuite` to test against Spark 2.x releases in environments with Python <= 3.7. ### Why are the changes needed? The logic in `TestUtils.isPythonVersionAtLeast38` was added in #30044 to prevent Spark 2.4 from being run in an environment where the Python3 version installed was >= Python 3.8, which is not compatible with Spark 2.4. However, this method always returns true, so only Spark 3.x versions will ever be included in the version set for `HiveExternalCatalogVersionsSuite`, regardless of the system-installed version of Python. The problem is here: https://github.com/apache/spark/blob/951efb80856e2a92ba3690886c95643567dae9d0/core/src/main/scala/org/apache/spark/TestUtils.scala#L280-L291 It's trying to evaluate the version of Python using a `ProcessLogger`, but the logger accepts a `String => Unit` function, i.e., it does not make use of the return value in any way (since it's meant for logging). So the result of the `startsWith` checks are thrown away, and `attempt.isSuccess && attempt.get == 0` will always be true as long as your system has a `python3` binary (of any version). ### Does this PR introduce _any_ user-facing change? No, test changes only. ### How was this patch tested? Confirmed by checking that `HiveExternalCatalogVersionsSuite` downloads binary distros for Spark 2.x lines as well as 3.x when I symlink my `python3` to Python 3.7, and only downloads distros for the 3.x lines when I symlink my `python3` to Python 3.9. ```bash brew link --force python3.7 # run HiveExternalCatalogVersionsSuite and validate that 2.x and 3.x tests get executed brew unlink python3.7 brew link --force python3.9 # run HiveExternalCatalogVersionsSuite and validate that only 3.x tests get executed ``` Closes #34395 from xkrogen/xkrogen-SPARK-37121-testutils-python38-fix. Authored-by: Erik Krogen Signed-off-by: Hyukjin Kwon --- core/src/main/scala/org/apache/spark/TestUtils.scala | 13 +++-- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 24e5534..65ef813 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -278,16 +278,9 @@ private[spark] object TestUtils { } def isPythonVersionAtLeast38(): Boolean = { -val attempt = if (Utils.isWindows) { - Try(Process(Seq("cmd.exe", "/C", "python3 --version")) -.run(ProcessLogger(s => s.startsWith("Python 3.8") || s.startsWith("Python 3.9"))) -.exitValue()) -} else { - Try(Process(Seq("sh", "-c", "python3 --version")) -.run(ProcessLogger(s => s.startsWith("Python 3.8") || s.startsWith("Python 3.9"))) -.exitValue()) -} -attempt.isSuccess && attempt.get == 0 +val cmdSeq = if (Utils.isWindows) Seq("cmd.exe", "/C") else Seq("sh", "-c") +val pythonSnippet = "import sys; sys.exit(sys.version_info < (3, 8, 0))" +Try(Process(cmdSeq :+ s"python3 -c '$pythonSnippet'").! == 0).getOrElse(false) } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-37071][CORE] Make OpenHashMap serialize without reference tracking
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3319361 [SPARK-37071][CORE] Make OpenHashMap serialize without reference tracking 3319361 is described below commit 3319361ca67212d2ae373bb46c5b6f2d80d792a4 Author: Emil Ejbyfeldt AuthorDate: Wed Oct 27 08:58:37 2021 -0500 [SPARK-37071][CORE] Make OpenHashMap serialize without reference tracking ### What changes were proposed in this pull request? Change the anonymous functions in OpenHashMap to member methods. This avoid having a member which captures the OpenHashMap object in its closure. This fixes so that OpenHashMap instances can be serialized with Kryo with reference tracking turned off. I am not sure why the original implementation had the anonymous function members in the first place. But if it was implemented that way for performance reason another possible fix is just to mark the `grow` and `move` members as transient. ### Why are the changes needed? User might want to turn off referenceTracking in kryo since it has performance benefits, but currently this will unnecessary and unexpectedly prevent them from using some features of spark that uses OpenHashMap internally. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests and a new test in the `KryoSerializerSuite`. Closes #34351 from eejbyfeldt/SPARK-37071-make-open-hash-map-serialize-without-reference-tracking. Authored-by: Emil Ejbyfeldt Signed-off-by: Sean Owen --- .../scala/org/apache/spark/util/collection/OpenHashMap.scala | 9 ++--- .../spark/util/collection/PrimitiveKeyOpenHashMap.scala | 9 ++--- .../org/apache/spark/serializer/KryoSerializerSuite.scala | 11 +++ .../util/collection/GraphXPrimitiveKeyOpenHashMap.scala | 9 ++--- 4 files changed, 17 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala index 1200ac0..79e1a35 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala @@ -149,17 +149,12 @@ class OpenHashMap[K : ClassTag, @specialized(Long, Int, Double) V: ClassTag]( } } - // The following member variables are declared as protected instead of private for the - // specialization to work (specialized class extends the non-specialized one and needs access - // to the "private" variables). - // They also should have been val's. We use var's because there is a Scala compiler bug that - // would throw illegal access error at runtime if they are declared as val's. - protected var grow = (newCapacity: Int) => { + private def grow(newCapacity: Int): Unit = { _oldValues = _values _values = new Array[V](newCapacity) } - protected var move = (oldPos: Int, newPos: Int) => { + private def move(oldPos: Int, newPos: Int): Unit = { _values(newPos) = _oldValues(oldPos) } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala index 7a50d85..69665aa 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala @@ -117,17 +117,12 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag, } } - // The following member variables are declared as protected instead of private for the - // specialization to work (specialized class extends the unspecialized one and needs access - // to the "private" variables). - // They also should have been val's. We use var's because there is a Scala compiler bug that - // would throw illegal access error at runtime if they are declared as val's. - protected var grow = (newCapacity: Int) => { + private def grow(newCapacity: Int): Unit = { _oldValues = _values _values = new Array[V](newCapacity) } - protected var move = (oldPos: Int, newPos: Int) => { + private def move(oldPos: Int, newPos: Int): Unit = { _values(newPos) = _oldValues(oldPos) } } diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 229ef69..dd2340a 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -39,6 +39,7 @@ import org.apache.spark.scheduler.HighlyCompressedMapStatus import
[spark] branch master updated: [SPARK-16280][SPARK-37082][SQL] Implements histogram_numeric aggregation function which supports partial aggregation
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new de0d7fb [SPARK-16280][SPARK-37082][SQL] Implements histogram_numeric aggregation function which supports partial aggregation de0d7fb is described below commit de0d7fbb4f010bec8e457d0dc00b5618e7a43750 Author: Angerszh AuthorDate: Wed Oct 27 19:47:17 2021 +0800 [SPARK-16280][SPARK-37082][SQL] Implements histogram_numeric aggregation function which supports partial aggregation ### What changes were proposed in this pull request? This PR implements aggregation function `histogram_numeric`. Function `histogram_numeric` returns an approximate histogram of a numerical column using a user-specified number of bins. For example, the histogram of column `col` when split to 3 bins. Syntax: an approximate histogram of a numerical column using a user-specified number of bins. histogram_numebric(col, nBins) ## Returns an approximate histogram of a column `col` into 3 bins. SELECT histogram_numebric(col, 3) FROM table # Returns an approximate histogram of a column `col` into 5 bins. SELECT histogram_numebric(col, 5) FROM table ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? No change from user side ### How was this patch tested? Added UT Closes #34380 from AngersZh/SPARK-37082. Authored-by: Angerszh Signed-off-by: Wenchen Fan --- .../apache/spark/sql/util/NumericHistogram.java| 286 + .../sql/catalyst/analysis/FunctionRegistry.scala | 1 + .../sql/catalyst/catalog/SessionCatalog.scala | 2 +- .../expressions/aggregate/HistogramNumeric.scala | 207 +++ .../aggregate/HistogramNumericSuite.scala | 166 .../sql-functions/sql-expression-schema.md | 3 +- .../test/resources/sql-tests/inputs/group-by.sql | 12 + .../resources/sql-tests/results/group-by.sql.out | 20 +- .../apache/spark/sql/hive/HiveSessionCatalog.scala | 4 +- 9 files changed, 695 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/util/NumericHistogram.java b/sql/catalyst/src/main/java/org/apache/spark/sql/util/NumericHistogram.java new file mode 100644 index 000..987c18e --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/util/NumericHistogram.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Random; + + +/** + * A generic, re-usable histogram class that supports partial aggregations. + * The algorithm is a heuristic adapted from the following paper: + * Yael Ben-Haim and Elad Tom-Tov, "A streaming parallel decision tree algorithm", + * J. Machine Learning Research 11 (2010), pp. 849--872. Although there are no approximation + * guarantees, it appears to work well with adequate data and a large (e.g., 20-80) number + * of histogram bins. + * + * Adapted from Hive's NumericHistogram. Can refer to + * https://github.com/apache/hive/blob/master/ql/src/ + * java/org/apache/hadoop/hive/ql/udf/generic/NumericHistogram.java + * + * Differences: + * 1. Declaring [[Coord]] and it's variables as public types for + * easy access in the HistogramNumeric class. + * 2. Add method [[getNumBins()]] for serialize [[NumericHistogram]] + * in [[NumericHistogramSerializer]]. + * 3. Add method [[addBin()]] for deserialize [[NumericHistogram]] + * in [[NumericHistogramSerializer]]. + * 4. In Hive's code, the method [[merge()] pass a serialized histogram, + * in Spark, this method pass a deserialized histogram. + * Here we change the code about merge bins. + */ +public class NumericHistogram { + /** + * The Coord class defines a histogram bin, which is just an (x,y) pair. + */ + public static class Coord implements Comparable { +public double x; +public double
[spark] branch master updated: [SPARK-37115][SQL] HiveClientImpl should use shim to wrap all hive client calls
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1099fd3 [SPARK-37115][SQL] HiveClientImpl should use shim to wrap all hive client calls 1099fd3 is described below commit 1099fd342075b53ad9ddb2787911f2dabb340a3d Author: Angerszh AuthorDate: Wed Oct 27 15:32:47 2021 +0800 [SPARK-37115][SQL] HiveClientImpl should use shim to wrap all hive client calls ### What changes were proposed in this pull request? In this pr we use `shim` to wrap all hive client api to make it easier. ### Why are the changes needed? Use `shim` to wrap all hive client api to make it easier. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existed UT Closes #34388 from AngersZh/SPARK-37115. Authored-by: Angerszh Signed-off-by: Wenchen Fan --- .../spark/sql/hive/client/HiveClientImpl.scala | 64 .../apache/spark/sql/hive/client/HiveShim.scala| 176 - 2 files changed, 205 insertions(+), 35 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index e295e0f..25be8b5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -343,14 +343,14 @@ private[hive] class HiveClientImpl( database: CatalogDatabase, ignoreIfExists: Boolean): Unit = withHiveState { val hiveDb = toHiveDatabase(database, Some(userName)) -client.createDatabase(hiveDb, ignoreIfExists) +shim.createDatabase(client, hiveDb, ignoreIfExists) } override def dropDatabase( name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = withHiveState { -client.dropDatabase(name, true, ignoreIfNotExists, cascade) +shim.dropDatabase(client, name, true, ignoreIfNotExists, cascade) } override def alterDatabase(database: CatalogDatabase): Unit = withHiveState { @@ -361,7 +361,7 @@ private[hive] class HiveClientImpl( } } val hiveDb = toHiveDatabase(database) -client.alterDatabase(database.name, hiveDb) +shim.alterDatabase(client, database.name, hiveDb) } private def toHiveDatabase( @@ -379,7 +379,7 @@ private[hive] class HiveClientImpl( } override def getDatabase(dbName: String): CatalogDatabase = withHiveState { -Option(client.getDatabase(dbName)).map { d => +Option(shim.getDatabase(client, dbName)).map { d => val params = Option(d.getParameters).map(_.asScala.toMap).getOrElse(Map()) ++ Map(PROP_OWNER -> shim.getDatabaseOwnerName(d)) @@ -392,15 +392,15 @@ private[hive] class HiveClientImpl( } override def databaseExists(dbName: String): Boolean = withHiveState { -client.databaseExists(dbName) +shim.databaseExists(client, dbName) } override def listDatabases(pattern: String): Seq[String] = withHiveState { -client.getDatabasesByPattern(pattern).asScala.toSeq +shim.getDatabasesByPattern(client, pattern) } private def getRawTableOption(dbName: String, tableName: String): Option[HiveTable] = { -Option(client.getTable(dbName, tableName, false /* do not throw exception */)) +Option(shim.getTable(client, dbName, tableName, false /* do not throw exception */)) } private def getRawTablesByName(dbName: String, tableNames: Seq[String]): Seq[HiveTable] = { @@ -551,7 +551,7 @@ private[hive] class HiveClientImpl( override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState { verifyColumnDataType(table.dataSchema) -client.createTable(toHiveTable(table, Some(userName)), ignoreIfExists) +shim.createTable(client, toHiveTable(table, Some(userName)), ignoreIfExists) } override def dropTable( @@ -583,7 +583,7 @@ private[hive] class HiveClientImpl( tableName: String, newDataSchema: StructType, schemaProps: Map[String, String]): Unit = withHiveState { -val oldTable = client.getTable(dbName, tableName) +val oldTable = shim.getTable(client, dbName, tableName) verifyColumnDataType(newDataSchema) val hiveCols = newDataSchema.map(toHiveColumn) oldTable.setFields(hiveCols.asJava) @@ -630,7 +630,7 @@ private[hive] class HiveClientImpl( purge: Boolean, retainData: Boolean): Unit = withHiveState { // TODO: figure out how to drop multiple partitions in one call -val hiveTable = client.getTable(db, table, true /* throw exception */) +val hiveTable = shim.getTable(client, db, table, true /* throw exception */) // do the check at first and collect all the matching
[spark] branch master updated: [SPARK-37125][SQL] Support AnsiInterval radix sort
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 101dd6b [SPARK-37125][SQL] Support AnsiInterval radix sort 101dd6b is described below commit 101dd6bbff2491a608e1ab51541a120a1f08e942 Author: ulysses-you AuthorDate: Wed Oct 27 15:31:26 2021 +0800 [SPARK-37125][SQL] Support AnsiInterval radix sort ### What changes were proposed in this pull request? - Make `AnsiInterval` data type support radix sort in SQL. - Enhance the `SortSuite` by disable radix. ### Why are the changes needed? The radix sort is more faster than timsort, the benchmark result can see in `SortBenchmark`. Since the `AnsiInterval` data type is comparable: - `YearMonthIntervalType` -> int ordering - `DayTimeIntervalType` -> long ordering And we aslo support radix sort when the ordering column date type is int or long. So `AnsiInterval` radix sort can be supported. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? - The data correctness should be ensured in `SortSuite` - Add a new benchmark Closes #34398 from ulysses-you/ansi-interval-sort. Authored-by: ulysses-you Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/expressions/SortOrder.scala | 6 +- .../AnsiIntervalSortBenchmark-jdk11-results.txt| 28 + .../AnsiIntervalSortBenchmark-results.txt | 28 + .../spark/sql/execution/SortPrefixUtils.scala | 5 +- .../org/apache/spark/sql/execution/SortSuite.scala | 17 +++-- .../benchmark/AnsiIntervalSortBenchmark.scala | 73 ++ 6 files changed, 146 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala index 9aef25c..8e6f076 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala @@ -132,7 +132,7 @@ object SortOrder { case class SortPrefix(child: SortOrder) extends UnaryExpression { val nullValue = child.child.dataType match { -case BooleanType | DateType | TimestampType | _: IntegralType => +case BooleanType | DateType | TimestampType | _: IntegralType | _: AnsiIntervalType => if (nullAsSmallest) Long.MinValue else Long.MaxValue case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS => if (nullAsSmallest) Long.MinValue else Long.MaxValue @@ -154,7 +154,7 @@ case class SortPrefix(child: SortOrder) extends UnaryExpression { private lazy val calcPrefix: Any => Long = child.child.dataType match { case BooleanType => (raw) => if (raw.asInstanceOf[Boolean]) 1 else 0 -case DateType | TimestampType | _: IntegralType => (raw) => +case DateType | TimestampType | _: IntegralType | _: AnsiIntervalType => (raw) => raw.asInstanceOf[java.lang.Number].longValue() case FloatType | DoubleType => (raw) => { val dVal = raw.asInstanceOf[java.lang.Number].doubleValue() @@ -198,7 +198,7 @@ case class SortPrefix(child: SortOrder) extends UnaryExpression { s"$input ? 1L : 0L" case _: IntegralType => s"(long) $input" - case DateType | TimestampType => + case DateType | TimestampType | _: AnsiIntervalType => s"(long) $input" case FloatType | DoubleType => s"$DoublePrefixCmp.computePrefix((double)$input)" diff --git a/sql/core/benchmarks/AnsiIntervalSortBenchmark-jdk11-results.txt b/sql/core/benchmarks/AnsiIntervalSortBenchmark-jdk11-results.txt new file mode 100644 index 000..004d9d8 --- /dev/null +++ b/sql/core/benchmarks/AnsiIntervalSortBenchmark-jdk11-results.txt @@ -0,0 +1,28 @@ +OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.8.0-1042-azure +Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz +year month interval one column: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative + +year month interval one column enable radix 40092 40744 668 2.5 400.9 1.0X +year month interval one column disable radix 55178 55871 609 1.8 551.8 0.7X + +OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.8.0-1042-azure +Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz +year month interval two columns: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative
[spark] branch master updated: [SPARK-37031][SQL][TESTS][FOLLOWUP] Add a missing test to DescribeNamespaceSuite
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7954a91 [SPARK-37031][SQL][TESTS][FOLLOWUP] Add a missing test to DescribeNamespaceSuite 7954a91 is described below commit 7954a91d76b5f60234b2f8628c0330a65653297a Author: Terry Kim AuthorDate: Wed Oct 27 15:28:19 2021 +0800 [SPARK-37031][SQL][TESTS][FOLLOWUP] Add a missing test to DescribeNamespaceSuite ### What changes were proposed in this pull request? This PR proposes to add a missing test on "keeping the legacy output schema" to `DescribeNamespaceSuite`. (#31705 didn't seem to add it). ### Why are the changes needed? To increase the test coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a new test. Closes #34399 from imback82/SPARK-37031-followup. Authored-by: Terry Kim Signed-off-by: Wenchen Fan --- .../execution/command/v1/DescribeNamespaceSuite.scala | 18 ++ 1 file changed, 18 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeNamespaceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeNamespaceSuite.scala index a86e4a5..e97f0c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeNamespaceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeNamespaceSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command.v1 import org.apache.spark.sql.Row import org.apache.spark.sql.execution.command +import org.apache.spark.sql.internal.SQLConf /** * This base suite contains unified tests for the `DESCRIBE NAMESPACE` command that checks V1 @@ -50,6 +51,23 @@ trait DescribeNamespaceSuiteBase extends command.DescribeNamespaceSuiteBase { assert(result(3) === Row("Properties", "")) } } + + test("Keep the legacy output schema") { +Seq(true, false).foreach { keepLegacySchema => + withSQLConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA.key -> keepLegacySchema.toString) { +val ns = "db1" +withNamespace(ns) { + sql(s"CREATE NAMESPACE $ns") + val schema = sql(s"DESCRIBE NAMESPACE $ns").schema.fieldNames.toSeq + if (keepLegacySchema) { +assert(schema === Seq("database_description_item", "database_description_value")) + } else { +assert(schema === Seq("info_name", "info_value")) + } +} + } +} + } } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-36647][SQL][TESTS] Push down Aggregate (Min/Max/Count) for Parquet if filter is on partition col
This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4aec9d7 [SPARK-36647][SQL][TESTS] Push down Aggregate (Min/Max/Count) for Parquet if filter is on partition col 4aec9d7 is described below commit 4aec9d7daca7a1a146ff1fb1e7541c9443905725 Author: Huaxin Gao AuthorDate: Wed Oct 27 00:14:00 2021 -0700 [SPARK-36647][SQL][TESTS] Push down Aggregate (Min/Max/Count) for Parquet if filter is on partition col ### What changes were proposed in this pull request? I just realized that with the changes in https://github.com/apache/spark/pull/33650, the restriction for not pushing down Min/Max/Count for partition filter was already removed. This PR just added test to make sure Min/Max/Count in parquet are pushed down if filter is on partition col. ### Why are the changes needed? To complete the work for Aggregate (Min/Max/Count) push down for Parquet ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? new test Closes #34248 from huaxingao/partitionFilter. Authored-by: Huaxin Gao Signed-off-by: Liang-Chi Hsieh --- .../v2/parquet/ParquetScanBuilder.scala| 7 ++-- .../parquet/ParquetAggregatePushDownSuite.scala| 40 -- 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala index 113438a..da49381 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala @@ -134,10 +134,9 @@ case class ParquetScanBuilder( // are combined with filter or group by // e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8 // SELECT COUNT(col1) FROM t GROUP BY col2 - // Todo: 1. add support if groupby column is partition col - // (https://issues.apache.org/jira/browse/SPARK-36646) - // 2. add support if filter col is partition col - // (https://issues.apache.org/jira/browse/SPARK-36647) + // However, if the filter is on partition column, max/min/count can still be pushed down + // Todo: add support if groupby column is partition col + //(https://issues.apache.org/jira/browse/SPARK-36646) return false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala index 0ae95db..77ecd28 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala @@ -129,10 +129,9 @@ abstract class ParquetAggregatePushDownSuite .write.partitionBy("p").parquet(dir.getCanonicalPath) withTempView("tmp") { spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp"); -val enableVectorizedReader = Seq("false", "true") -for (testVectorizedReader <- enableVectorizedReader) { +Seq("false", "true").foreach { enableVectorizedReader => withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true", -vectorizedReaderEnabledKey -> testVectorizedReader) { +vectorizedReaderEnabledKey -> enableVectorizedReader) { val count = sql("SELECT COUNT(p) FROM tmp") count.queryExecution.optimizedPlan.collect { case _: DataSourceV2ScanRelation => @@ -221,7 +220,7 @@ abstract class ParquetAggregatePushDownSuite } } - test("aggregate push down - query with filter not push down") { + test("aggregate push down - aggregate with data filter cannot be pushed down") { val data = Seq((-2, "abc", 2), (3, "def", 4), (6, "ghi", 2), (0, null, 19), (9, "mno", 7), (2, null, 7)) withParquetTable(data, "t") { @@ -240,6 +239,29 @@ abstract class ParquetAggregatePushDownSuite } } + test("aggregate push down - aggregate with partition filter can be pushed down") { +withTempPath { dir => + spark.range(10).selectExpr("id", "id % 3 as p") +.write.partitionBy("p").parquet(dir.getCanonicalPath) + withTempView("tmp") { + spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp"); +Seq("false", "true").foreach { enableVectorizedReader => +