[spark] branch master updated: [SPARK-37133][SQL] Add a config to optionally enforce ANSI reserved keywords

2021-10-27 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new cfb96eb  [SPARK-37133][SQL] Add a config to optionally enforce ANSI 
reserved keywords
cfb96eb is described below

commit cfb96ebd3991c62fc737242aeeb9b5cdb4abe7ae
Author: Wenchen Fan 
AuthorDate: Thu Oct 28 12:58:15 2021 +0800

[SPARK-37133][SQL] Add a config to optionally enforce ANSI reserved keywords

### What changes were proposed in this pull request?

This PR adds a new config to optionally enforce the ANSI reserved keywords 
in the parser. The default value is true, so we by default still enforce it and 
there is no behavior change.

### Why are the changes needed?

In Spark 3.2, the ANSI mode is GA. We want more people to try and use the 
ANSI mode, to find data issues as early as possible and get better data 
quality. However, the reserved keywords thing is a big stopper for many users 
that want to try ANSI mode. They have to update the SQL queries to pass the 
parser, which is nothing about data quality but just trouble.

With a new config to allow users to not enforce reserved keywords, I think 
we can get better adoption of the ANSI mode.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

updated tests.

Closes #34403 from cloud-fan/parser.

Lead-authored-by: Wenchen Fan 
Co-authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 docs/sql-ref-ansi-compliance.md  |  4 +++-
 .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala|  2 +-
 .../org/apache/spark/sql/catalyst/parser/ParseDriver.scala   |  2 +-
 .../main/scala/org/apache/spark/sql/internal/SQLConf.scala   | 10 ++
 .../spark/sql/catalyst/parser/ExpressionParserSuite.scala| 12 +++-
 .../sql/catalyst/parser/TableIdentifierParserSuite.scala |  9 +
 6 files changed, 35 insertions(+), 4 deletions(-)

diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md
index c10e866..4527faa 100644
--- a/docs/sql-ref-ansi-compliance.md
+++ b/docs/sql-ref-ansi-compliance.md
@@ -299,7 +299,9 @@ When the ANSI mode is disabled, Spark SQL has two kinds of 
keywords:
 * Non-reserved keywords: Same definition as the one when the ANSI mode enabled.
 * Strict-non-reserved keywords: A strict version of non-reserved keywords, 
which can not be used as table alias.
 
-By default `spark.sql.ansi.enabled` is false.
+If you want to still use reserved keywords as identifiers with ANSI mode, you 
can set `spark.sql.ansi.enforceReservedKeywords` to false.
+
+By default `spark.sql.ansi.enabled` is false and 
`spark.sql.ansi.enforceReservedKeywords` is true.
 
 Below is a list of all the keywords in Spark SQL.
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index d36c7ac..768d406 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1728,7 +1728,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with 
SQLConfHelper with Logg
   }
 
   override def visitCurrentLike(ctx: CurrentLikeContext): Expression = 
withOrigin(ctx) {
-if (conf.ansiEnabled) {
+if (conf.enforceReservedKeywords) {
   ctx.name.getType match {
 case SqlBaseParser.CURRENT_DATE =>
   CurrentDate()
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala
index 64216e6..b459a2d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala
@@ -100,7 +100,7 @@ abstract class AbstractSqlParser extends ParserInterface 
with SQLConfHelper with
 parser.addErrorListener(ParseErrorListener)
 parser.legacy_setops_precedence_enabled = conf.setOpsPrecedenceEnforced
 parser.legacy_exponent_literal_as_decimal_enabled = 
conf.exponentLiteralAsDecimalEnabled
-parser.SQL_standard_keyword_behavior = conf.ansiEnabled
+parser.SQL_standard_keyword_behavior = conf.enforceReservedKeywords
 
 try {
   try {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 5023b4a..fe3204b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2590,6 +2590,14 @@ object SQLConf {

[spark] branch master updated (30e1261 -> 502a1ec)

2021-10-27 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 30e1261  [SPARK-37121][HIVE][TEST] Fix Python version detection bug in 
TestUtils used by HiveExternalCatalogVersionsSuite
 add 502a1ec  [SPARK-37036][PYTHON] Add util function to raise advice 
warning for pandas API on Spark

No new revisions were added by this update.

Summary of changes:
 python/pyspark/pandas/accessors.py|  9 +
 python/pyspark/pandas/frame.py| 38 +++
 python/pyspark/pandas/generic.py  | 13 
 python/pyspark/pandas/groupby.py  |  9 +
 python/pyspark/pandas/indexes/base.py | 13 
 python/pyspark/pandas/namespace.py| 25 +++
 python/pyspark/pandas/series.py   |  9 +
 python/pyspark/pandas/utils.py|  9 +
 8 files changed, 125 insertions(+)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-37121][HIVE][TEST] Fix Python version detection bug in TestUtils used by HiveExternalCatalogVersionsSuite

2021-10-27 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 31af87c  [SPARK-37121][HIVE][TEST] Fix Python version detection bug in 
TestUtils used by HiveExternalCatalogVersionsSuite
31af87c is described below

commit 31af87c154881743ad85d0259d5d8769968a8b58
Author: Erik Krogen 
AuthorDate: Thu Oct 28 09:46:01 2021 +0900

[SPARK-37121][HIVE][TEST] Fix Python version detection bug in TestUtils 
used by HiveExternalCatalogVersionsSuite

### What changes were proposed in this pull request?
Fix a bug in `TestUtils.isPythonVersionAtLeast38` to allow for 
`HiveExternalCatalogVersionsSuite` to test against Spark 2.x releases in 
environments with Python <= 3.7.

### Why are the changes needed?
The logic in `TestUtils.isPythonVersionAtLeast38` was added in #30044 to 
prevent Spark 2.4 from being run in an environment where the Python3 version 
installed was >= Python 3.8, which is not compatible with Spark 2.4. However, 
this method always returns true, so only Spark 3.x versions will ever be 
included in the version set for `HiveExternalCatalogVersionsSuite`, regardless 
of the system-installed version of Python.

The problem is here:

https://github.com/apache/spark/blob/951efb80856e2a92ba3690886c95643567dae9d0/core/src/main/scala/org/apache/spark/TestUtils.scala#L280-L291
It's trying to evaluate the version of Python using a `ProcessLogger`, but 
the logger accepts a `String => Unit` function, i.e., it does not make use of 
the return value in any way (since it's meant for logging). So the result of 
the `startsWith` checks are thrown away, and `attempt.isSuccess && attempt.get 
== 0` will always be true as long as your system has a `python3` binary (of any 
version).

### Does this PR introduce _any_ user-facing change?
No, test changes only.

### How was this patch tested?
Confirmed by checking that `HiveExternalCatalogVersionsSuite` downloads 
binary distros for Spark 2.x lines as well as 3.x when I symlink my `python3` 
to Python 3.7, and only downloads distros for the 3.x lines when I symlink my 
`python3` to Python 3.9.

```bash
brew link --force python3.7
# run HiveExternalCatalogVersionsSuite and validate that 2.x and 3.x tests 
get executed
brew unlink python3.7
brew link --force python3.9
# run HiveExternalCatalogVersionsSuite and validate that only 3.x tests get 
executed
```

Closes #34395 from xkrogen/xkrogen-SPARK-37121-testutils-python38-fix.

Authored-by: Erik Krogen 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 30e126176a9e29f23d6a16407074200d377153d6)
Signed-off-by: Hyukjin Kwon 
---
 core/src/main/scala/org/apache/spark/TestUtils.scala | 13 +++--
 1 file changed, 3 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index dcbb9ba..2cbd156 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -277,16 +277,9 @@ private[spark] object TestUtils {
   }
 
   def isPythonVersionAtLeast38(): Boolean = {
-val attempt = if (Utils.isWindows) {
-  Try(Process(Seq("cmd.exe", "/C", "python3 --version"))
-.run(ProcessLogger(s => s.startsWith("Python 3.8") || 
s.startsWith("Python 3.9")))
-.exitValue())
-} else {
-  Try(Process(Seq("sh", "-c", "python3 --version"))
-.run(ProcessLogger(s => s.startsWith("Python 3.8") || 
s.startsWith("Python 3.9")))
-.exitValue())
-}
-attempt.isSuccess && attempt.get == 0
+val cmdSeq = if (Utils.isWindows) Seq("cmd.exe", "/C") else Seq("sh", "-c")
+val pythonSnippet = "import sys; sys.exit(sys.version_info < (3, 8, 0))"
+Try(Process(cmdSeq :+ s"python3 -c '$pythonSnippet'").! == 
0).getOrElse(false)
   }
 
   /**

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-37121][HIVE][TEST] Fix Python version detection bug in TestUtils used by HiveExternalCatalogVersionsSuite

2021-10-27 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 30e1261  [SPARK-37121][HIVE][TEST] Fix Python version detection bug in 
TestUtils used by HiveExternalCatalogVersionsSuite
30e1261 is described below

commit 30e126176a9e29f23d6a16407074200d377153d6
Author: Erik Krogen 
AuthorDate: Thu Oct 28 09:46:01 2021 +0900

[SPARK-37121][HIVE][TEST] Fix Python version detection bug in TestUtils 
used by HiveExternalCatalogVersionsSuite

### What changes were proposed in this pull request?
Fix a bug in `TestUtils.isPythonVersionAtLeast38` to allow for 
`HiveExternalCatalogVersionsSuite` to test against Spark 2.x releases in 
environments with Python <= 3.7.

### Why are the changes needed?
The logic in `TestUtils.isPythonVersionAtLeast38` was added in #30044 to 
prevent Spark 2.4 from being run in an environment where the Python3 version 
installed was >= Python 3.8, which is not compatible with Spark 2.4. However, 
this method always returns true, so only Spark 3.x versions will ever be 
included in the version set for `HiveExternalCatalogVersionsSuite`, regardless 
of the system-installed version of Python.

The problem is here:

https://github.com/apache/spark/blob/951efb80856e2a92ba3690886c95643567dae9d0/core/src/main/scala/org/apache/spark/TestUtils.scala#L280-L291
It's trying to evaluate the version of Python using a `ProcessLogger`, but 
the logger accepts a `String => Unit` function, i.e., it does not make use of 
the return value in any way (since it's meant for logging). So the result of 
the `startsWith` checks are thrown away, and `attempt.isSuccess && attempt.get 
== 0` will always be true as long as your system has a `python3` binary (of any 
version).

### Does this PR introduce _any_ user-facing change?
No, test changes only.

### How was this patch tested?
Confirmed by checking that `HiveExternalCatalogVersionsSuite` downloads 
binary distros for Spark 2.x lines as well as 3.x when I symlink my `python3` 
to Python 3.7, and only downloads distros for the 3.x lines when I symlink my 
`python3` to Python 3.9.

```bash
brew link --force python3.7
# run HiveExternalCatalogVersionsSuite and validate that 2.x and 3.x tests 
get executed
brew unlink python3.7
brew link --force python3.9
# run HiveExternalCatalogVersionsSuite and validate that only 3.x tests get 
executed
```

Closes #34395 from xkrogen/xkrogen-SPARK-37121-testutils-python38-fix.

Authored-by: Erik Krogen 
Signed-off-by: Hyukjin Kwon 
---
 core/src/main/scala/org/apache/spark/TestUtils.scala | 13 +++--
 1 file changed, 3 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index 24e5534..65ef813 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -278,16 +278,9 @@ private[spark] object TestUtils {
   }
 
   def isPythonVersionAtLeast38(): Boolean = {
-val attempt = if (Utils.isWindows) {
-  Try(Process(Seq("cmd.exe", "/C", "python3 --version"))
-.run(ProcessLogger(s => s.startsWith("Python 3.8") || 
s.startsWith("Python 3.9")))
-.exitValue())
-} else {
-  Try(Process(Seq("sh", "-c", "python3 --version"))
-.run(ProcessLogger(s => s.startsWith("Python 3.8") || 
s.startsWith("Python 3.9")))
-.exitValue())
-}
-attempt.isSuccess && attempt.get == 0
+val cmdSeq = if (Utils.isWindows) Seq("cmd.exe", "/C") else Seq("sh", "-c")
+val pythonSnippet = "import sys; sys.exit(sys.version_info < (3, 8, 0))"
+Try(Process(cmdSeq :+ s"python3 -c '$pythonSnippet'").! == 
0).getOrElse(false)
   }
 
   /**

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-37071][CORE] Make OpenHashMap serialize without reference tracking

2021-10-27 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3319361  [SPARK-37071][CORE] Make OpenHashMap serialize without 
reference tracking
3319361 is described below

commit 3319361ca67212d2ae373bb46c5b6f2d80d792a4
Author: Emil Ejbyfeldt 
AuthorDate: Wed Oct 27 08:58:37 2021 -0500

[SPARK-37071][CORE] Make OpenHashMap serialize without reference tracking

### What changes were proposed in this pull request?
Change the anonymous functions in OpenHashMap to member methods. This avoid 
having a member which captures the OpenHashMap object in its closure. This 
fixes so that OpenHashMap instances can be serialized with Kryo with reference 
tracking turned off.

I am not sure why the original implementation had the anonymous function 
members in the first place. But if it was implemented that way for performance 
reason another possible fix is just to mark the `grow` and `move` members as 
transient.

### Why are the changes needed?
User might want to turn off referenceTracking in kryo since it has 
performance benefits, but currently this will unnecessary and unexpectedly 
prevent them from using some features of spark that uses OpenHashMap internally.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests and a new test in the `KryoSerializerSuite`.

Closes #34351 from 
eejbyfeldt/SPARK-37071-make-open-hash-map-serialize-without-reference-tracking.

Authored-by: Emil Ejbyfeldt 
Signed-off-by: Sean Owen 
---
 .../scala/org/apache/spark/util/collection/OpenHashMap.scala  |  9 ++---
 .../spark/util/collection/PrimitiveKeyOpenHashMap.scala   |  9 ++---
 .../org/apache/spark/serializer/KryoSerializerSuite.scala | 11 +++
 .../util/collection/GraphXPrimitiveKeyOpenHashMap.scala   |  9 ++---
 4 files changed, 17 insertions(+), 21 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala 
b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
index 1200ac0..79e1a35 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
@@ -149,17 +149,12 @@ class OpenHashMap[K : ClassTag, @specialized(Long, Int, 
Double) V: ClassTag](
 }
   }
 
-  // The following member variables are declared as protected instead of 
private for the
-  // specialization to work (specialized class extends the non-specialized one 
and needs access
-  // to the "private" variables).
-  // They also should have been val's. We use var's because there is a Scala 
compiler bug that
-  // would throw illegal access error at runtime if they are declared as val's.
-  protected var grow = (newCapacity: Int) => {
+  private def grow(newCapacity: Int): Unit = {
 _oldValues = _values
 _values = new Array[V](newCapacity)
   }
 
-  protected var move = (oldPos: Int, newPos: Int) => {
+  private def move(oldPos: Int, newPos: Int): Unit = {
 _values(newPos) = _oldValues(oldPos)
   }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
index 7a50d85..69665aa 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
@@ -117,17 +117,12 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: 
ClassTag,
 }
   }
 
-  // The following member variables are declared as protected instead of 
private for the
-  // specialization to work (specialized class extends the unspecialized one 
and needs access
-  // to the "private" variables).
-  // They also should have been val's. We use var's because there is a Scala 
compiler bug that
-  // would throw illegal access error at runtime if they are declared as val's.
-  protected var grow = (newCapacity: Int) => {
+  private def grow(newCapacity: Int): Unit = {
 _oldValues = _values
 _values = new Array[V](newCapacity)
   }
 
-  protected var move = (oldPos: Int, newPos: Int) => {
+  private def move(oldPos: Int, newPos: Int): Unit = {
 _values(newPos) = _oldValues(oldPos)
   }
 }
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala 
b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 229ef69..dd2340a 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -39,6 +39,7 @@ import org.apache.spark.scheduler.HighlyCompressedMapStatus
 import 

[spark] branch master updated: [SPARK-16280][SPARK-37082][SQL] Implements histogram_numeric aggregation function which supports partial aggregation

2021-10-27 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new de0d7fb  [SPARK-16280][SPARK-37082][SQL] Implements histogram_numeric 
aggregation function which supports partial aggregation
de0d7fb is described below

commit de0d7fbb4f010bec8e457d0dc00b5618e7a43750
Author: Angerszh 
AuthorDate: Wed Oct 27 19:47:17 2021 +0800

[SPARK-16280][SPARK-37082][SQL] Implements histogram_numeric aggregation 
function which supports partial aggregation

### What changes were proposed in this pull request?
This PR implements aggregation function `histogram_numeric`. Function 
`histogram_numeric` returns an approximate histogram of a numerical column 
using a user-specified number of bins. For example, the histogram of column 
`col` when split to 3 bins.

Syntax:
 an approximate histogram of a numerical column using a user-specified 
number of bins.
histogram_numebric(col, nBins)

## Returns an approximate histogram of a column `col` into 3 bins.
SELECT histogram_numebric(col, 3) FROM table

# Returns an approximate histogram of a column `col` into 5 bins.
SELECT histogram_numebric(col, 5) FROM table

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?
No change from user side

### How was this patch tested?
Added UT

Closes #34380 from AngersZh/SPARK-37082.

Authored-by: Angerszh 
Signed-off-by: Wenchen Fan 
---
 .../apache/spark/sql/util/NumericHistogram.java| 286 +
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   1 +
 .../sql/catalyst/catalog/SessionCatalog.scala  |   2 +-
 .../expressions/aggregate/HistogramNumeric.scala   | 207 +++
 .../aggregate/HistogramNumericSuite.scala  | 166 
 .../sql-functions/sql-expression-schema.md |   3 +-
 .../test/resources/sql-tests/inputs/group-by.sql   |  12 +
 .../resources/sql-tests/results/group-by.sql.out   |  20 +-
 .../apache/spark/sql/hive/HiveSessionCatalog.scala |   4 +-
 9 files changed, 695 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/util/NumericHistogram.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/util/NumericHistogram.java
new file mode 100644
index 000..987c18e
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/util/NumericHistogram.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Random;
+
+
+/**
+ * A generic, re-usable histogram class that supports partial aggregations.
+ * The algorithm is a heuristic adapted from the following paper:
+ * Yael Ben-Haim and Elad Tom-Tov, "A streaming parallel decision tree 
algorithm",
+ * J. Machine Learning Research 11 (2010), pp. 849--872. Although there are no 
approximation
+ * guarantees, it appears to work well with adequate data and a large (e.g., 
20-80) number
+ * of histogram bins.
+ *
+ * Adapted from Hive's NumericHistogram. Can refer to
+ * https://github.com/apache/hive/blob/master/ql/src/
+ * java/org/apache/hadoop/hive/ql/udf/generic/NumericHistogram.java
+ *
+ * Differences:
+ *   1. Declaring [[Coord]] and it's variables as public types for
+ *  easy access in the HistogramNumeric class.
+ *   2. Add method [[getNumBins()]] for serialize [[NumericHistogram]]
+ *  in [[NumericHistogramSerializer]].
+ *   3. Add method [[addBin()]] for deserialize [[NumericHistogram]]
+ *  in [[NumericHistogramSerializer]].
+ *   4. In Hive's code, the method [[merge()] pass a serialized histogram,
+ *  in Spark, this method pass a deserialized histogram.
+ *  Here we change the code about merge bins.
+ */
+public class NumericHistogram {
+  /**
+   * The Coord class defines a histogram bin, which is just an (x,y) pair.
+   */
+  public static class Coord implements Comparable {
+public double x;
+public double 

[spark] branch master updated: [SPARK-37115][SQL] HiveClientImpl should use shim to wrap all hive client calls

2021-10-27 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1099fd3  [SPARK-37115][SQL] HiveClientImpl should use shim to wrap all 
hive client calls
1099fd3 is described below

commit 1099fd342075b53ad9ddb2787911f2dabb340a3d
Author: Angerszh 
AuthorDate: Wed Oct 27 15:32:47 2021 +0800

[SPARK-37115][SQL] HiveClientImpl should use shim to wrap all hive client 
calls

### What changes were proposed in this pull request?
In this pr we use `shim` to wrap all hive client api to make it easier.

### Why are the changes needed?
Use `shim` to wrap all hive client api  to make it easier.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existed UT

Closes #34388 from AngersZh/SPARK-37115.

Authored-by: Angerszh 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/hive/client/HiveClientImpl.scala |  64 
 .../apache/spark/sql/hive/client/HiveShim.scala| 176 -
 2 files changed, 205 insertions(+), 35 deletions(-)

diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index e295e0f..25be8b5 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -343,14 +343,14 @@ private[hive] class HiveClientImpl(
   database: CatalogDatabase,
   ignoreIfExists: Boolean): Unit = withHiveState {
 val hiveDb = toHiveDatabase(database, Some(userName))
-client.createDatabase(hiveDb, ignoreIfExists)
+shim.createDatabase(client, hiveDb, ignoreIfExists)
   }
 
   override def dropDatabase(
   name: String,
   ignoreIfNotExists: Boolean,
   cascade: Boolean): Unit = withHiveState {
-client.dropDatabase(name, true, ignoreIfNotExists, cascade)
+shim.dropDatabase(client, name, true, ignoreIfNotExists, cascade)
   }
 
   override def alterDatabase(database: CatalogDatabase): Unit = withHiveState {
@@ -361,7 +361,7 @@ private[hive] class HiveClientImpl(
   }
 }
 val hiveDb = toHiveDatabase(database)
-client.alterDatabase(database.name, hiveDb)
+shim.alterDatabase(client, database.name, hiveDb)
   }
 
   private def toHiveDatabase(
@@ -379,7 +379,7 @@ private[hive] class HiveClientImpl(
   }
 
   override def getDatabase(dbName: String): CatalogDatabase = withHiveState {
-Option(client.getDatabase(dbName)).map { d =>
+Option(shim.getDatabase(client, dbName)).map { d =>
   val params = 
Option(d.getParameters).map(_.asScala.toMap).getOrElse(Map()) ++
 Map(PROP_OWNER -> shim.getDatabaseOwnerName(d))
 
@@ -392,15 +392,15 @@ private[hive] class HiveClientImpl(
   }
 
   override def databaseExists(dbName: String): Boolean = withHiveState {
-client.databaseExists(dbName)
+shim.databaseExists(client, dbName)
   }
 
   override def listDatabases(pattern: String): Seq[String] = withHiveState {
-client.getDatabasesByPattern(pattern).asScala.toSeq
+shim.getDatabasesByPattern(client, pattern)
   }
 
   private def getRawTableOption(dbName: String, tableName: String): 
Option[HiveTable] = {
-Option(client.getTable(dbName, tableName, false /* do not throw exception 
*/))
+Option(shim.getTable(client, dbName, tableName, false /* do not throw 
exception */))
   }
 
   private def getRawTablesByName(dbName: String, tableNames: Seq[String]): 
Seq[HiveTable] = {
@@ -551,7 +551,7 @@ private[hive] class HiveClientImpl(
 
   override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit 
= withHiveState {
 verifyColumnDataType(table.dataSchema)
-client.createTable(toHiveTable(table, Some(userName)), ignoreIfExists)
+shim.createTable(client, toHiveTable(table, Some(userName)), 
ignoreIfExists)
   }
 
   override def dropTable(
@@ -583,7 +583,7 @@ private[hive] class HiveClientImpl(
   tableName: String,
   newDataSchema: StructType,
   schemaProps: Map[String, String]): Unit = withHiveState {
-val oldTable = client.getTable(dbName, tableName)
+val oldTable = shim.getTable(client, dbName, tableName)
 verifyColumnDataType(newDataSchema)
 val hiveCols = newDataSchema.map(toHiveColumn)
 oldTable.setFields(hiveCols.asJava)
@@ -630,7 +630,7 @@ private[hive] class HiveClientImpl(
   purge: Boolean,
   retainData: Boolean): Unit = withHiveState {
 // TODO: figure out how to drop multiple partitions in one call
-val hiveTable = client.getTable(db, table, true /* throw exception */)
+val hiveTable = shim.getTable(client, db, table, true /* throw exception 
*/)
 // do the check at first and collect all the matching 

[spark] branch master updated: [SPARK-37125][SQL] Support AnsiInterval radix sort

2021-10-27 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 101dd6b  [SPARK-37125][SQL] Support AnsiInterval radix sort
101dd6b is described below

commit 101dd6bbff2491a608e1ab51541a120a1f08e942
Author: ulysses-you 
AuthorDate: Wed Oct 27 15:31:26 2021 +0800

[SPARK-37125][SQL] Support AnsiInterval radix sort

### What changes were proposed in this pull request?

- Make `AnsiInterval` data type support radix sort in SQL.
- Enhance the `SortSuite` by disable radix.

### Why are the changes needed?

The radix sort is more faster than timsort, the benchmark result can see in 
`SortBenchmark`.

Since the `AnsiInterval` data type is comparable:

- `YearMonthIntervalType` -> int ordering
- `DayTimeIntervalType` -> long ordering

And we aslo support radix sort when the ordering column date type is int or 
long.

So `AnsiInterval` radix sort can be supported.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

- The data correctness should be ensured in `SortSuite`
- Add a new benchmark

Closes #34398 from ulysses-you/ansi-interval-sort.

Authored-by: ulysses-you 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/expressions/SortOrder.scala |  6 +-
 .../AnsiIntervalSortBenchmark-jdk11-results.txt| 28 +
 .../AnsiIntervalSortBenchmark-results.txt  | 28 +
 .../spark/sql/execution/SortPrefixUtils.scala  |  5 +-
 .../org/apache/spark/sql/execution/SortSuite.scala | 17 +++--
 .../benchmark/AnsiIntervalSortBenchmark.scala  | 73 ++
 6 files changed, 146 insertions(+), 11 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
index 9aef25c..8e6f076 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
@@ -132,7 +132,7 @@ object SortOrder {
 case class SortPrefix(child: SortOrder) extends UnaryExpression {
 
   val nullValue = child.child.dataType match {
-case BooleanType | DateType | TimestampType | _: IntegralType =>
+case BooleanType | DateType | TimestampType | _: IntegralType | _: 
AnsiIntervalType =>
   if (nullAsSmallest) Long.MinValue else Long.MaxValue
 case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS 
=>
   if (nullAsSmallest) Long.MinValue else Long.MaxValue
@@ -154,7 +154,7 @@ case class SortPrefix(child: SortOrder) extends 
UnaryExpression {
   private lazy val calcPrefix: Any => Long = child.child.dataType match {
 case BooleanType => (raw) =>
   if (raw.asInstanceOf[Boolean]) 1 else 0
-case DateType | TimestampType | _: IntegralType => (raw) =>
+case DateType | TimestampType | _: IntegralType | _: AnsiIntervalType => 
(raw) =>
   raw.asInstanceOf[java.lang.Number].longValue()
 case FloatType | DoubleType => (raw) => {
   val dVal = raw.asInstanceOf[java.lang.Number].doubleValue()
@@ -198,7 +198,7 @@ case class SortPrefix(child: SortOrder) extends 
UnaryExpression {
 s"$input ? 1L : 0L"
   case _: IntegralType =>
 s"(long) $input"
-  case DateType | TimestampType =>
+  case DateType | TimestampType | _: AnsiIntervalType =>
 s"(long) $input"
   case FloatType | DoubleType =>
 s"$DoublePrefixCmp.computePrefix((double)$input)"
diff --git a/sql/core/benchmarks/AnsiIntervalSortBenchmark-jdk11-results.txt 
b/sql/core/benchmarks/AnsiIntervalSortBenchmark-jdk11-results.txt
new file mode 100644
index 000..004d9d8
--- /dev/null
+++ b/sql/core/benchmarks/AnsiIntervalSortBenchmark-jdk11-results.txt
@@ -0,0 +1,28 @@
+OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.8.0-1042-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
+year month interval one column:   Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
+
+year month interval one column enable radix   40092  40744 
668  2.5 400.9   1.0X
+year month interval one column disable radix  55178  55871 
609  1.8 551.8   0.7X
+
+OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.8.0-1042-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
+year month interval two columns:   Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative

[spark] branch master updated: [SPARK-37031][SQL][TESTS][FOLLOWUP] Add a missing test to DescribeNamespaceSuite

2021-10-27 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7954a91  [SPARK-37031][SQL][TESTS][FOLLOWUP] Add a missing test to 
DescribeNamespaceSuite
7954a91 is described below

commit 7954a91d76b5f60234b2f8628c0330a65653297a
Author: Terry Kim 
AuthorDate: Wed Oct 27 15:28:19 2021 +0800

[SPARK-37031][SQL][TESTS][FOLLOWUP] Add a missing test to 
DescribeNamespaceSuite

### What changes were proposed in this pull request?

This PR proposes to add a missing test on "keeping the legacy output 
schema" to `DescribeNamespaceSuite`. (#31705 didn't seem to add it).

### Why are the changes needed?

To increase the test coverage.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a new test.

Closes #34399 from imback82/SPARK-37031-followup.

Authored-by: Terry Kim 
Signed-off-by: Wenchen Fan 
---
 .../execution/command/v1/DescribeNamespaceSuite.scala  | 18 ++
 1 file changed, 18 insertions(+)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeNamespaceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeNamespaceSuite.scala
index a86e4a5..e97f0c5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeNamespaceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeNamespaceSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command.v1
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.command
+import org.apache.spark.sql.internal.SQLConf
 
 /**
  * This base suite contains unified tests for the `DESCRIBE NAMESPACE` command 
that checks V1
@@ -50,6 +51,23 @@ trait DescribeNamespaceSuiteBase extends 
command.DescribeNamespaceSuiteBase {
   assert(result(3) === Row("Properties", ""))
 }
   }
+
+  test("Keep the legacy output schema") {
+Seq(true, false).foreach { keepLegacySchema =>
+  withSQLConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA.key -> 
keepLegacySchema.toString) {
+val ns = "db1"
+withNamespace(ns) {
+  sql(s"CREATE NAMESPACE $ns")
+  val schema = sql(s"DESCRIBE NAMESPACE $ns").schema.fieldNames.toSeq
+  if (keepLegacySchema) {
+assert(schema === Seq("database_description_item", 
"database_description_value"))
+  } else {
+assert(schema === Seq("info_name", "info_value"))
+  }
+}
+  }
+}
+  }
 }
 
 /**

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-36647][SQL][TESTS] Push down Aggregate (Min/Max/Count) for Parquet if filter is on partition col

2021-10-27 Thread viirya
This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4aec9d7  [SPARK-36647][SQL][TESTS] Push down Aggregate (Min/Max/Count) 
for Parquet if filter is on partition col
4aec9d7 is described below

commit 4aec9d7daca7a1a146ff1fb1e7541c9443905725
Author: Huaxin Gao 
AuthorDate: Wed Oct 27 00:14:00 2021 -0700

[SPARK-36647][SQL][TESTS] Push down Aggregate (Min/Max/Count) for Parquet 
if filter is on partition col

### What changes were proposed in this pull request?
I just realized that with the changes in 
https://github.com/apache/spark/pull/33650, the restriction for not pushing 
down Min/Max/Count for partition filter was already removed. This PR just added 
test to make sure Min/Max/Count in parquet are pushed down if filter is on 
partition col.

### Why are the changes needed?
To complete the work for Aggregate (Min/Max/Count) push down for Parquet

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
new test

Closes #34248 from huaxingao/partitionFilter.

Authored-by: Huaxin Gao 
Signed-off-by: Liang-Chi Hsieh 
---
 .../v2/parquet/ParquetScanBuilder.scala|  7 ++--
 .../parquet/ParquetAggregatePushDownSuite.scala| 40 --
 2 files changed, 33 insertions(+), 14 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
index 113438a..da49381 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
@@ -134,10 +134,9 @@ case class ParquetScanBuilder(
   // are combined with filter or group by
   // e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8
   //  SELECT COUNT(col1) FROM t GROUP BY col2
-  // Todo: 1. add support if groupby column is partition col
-  //  (https://issues.apache.org/jira/browse/SPARK-36646)
-  //   2. add support if filter col is partition col
-  //  (https://issues.apache.org/jira/browse/SPARK-36647)
+  // However, if the filter is on partition column, max/min/count can 
still be pushed down
+  // Todo:  add support if groupby column is partition col
+  //(https://issues.apache.org/jira/browse/SPARK-36646)
   return false
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala
index 0ae95db..77ecd28 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala
@@ -129,10 +129,9 @@ abstract class ParquetAggregatePushDownSuite
 .write.partitionBy("p").parquet(dir.getCanonicalPath)
   withTempView("tmp") {
 
spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp");
-val enableVectorizedReader = Seq("false", "true")
-for (testVectorizedReader <- enableVectorizedReader) {
+Seq("false", "true").foreach { enableVectorizedReader =>
   withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true",
-vectorizedReaderEnabledKey -> testVectorizedReader) {
+vectorizedReaderEnabledKey -> enableVectorizedReader) {
 val count = sql("SELECT COUNT(p) FROM tmp")
 count.queryExecution.optimizedPlan.collect {
   case _: DataSourceV2ScanRelation =>
@@ -221,7 +220,7 @@ abstract class ParquetAggregatePushDownSuite
 }
   }
 
-  test("aggregate push down - query with filter not push down") {
+  test("aggregate push down - aggregate with data filter cannot be pushed 
down") {
 val data = Seq((-2, "abc", 2), (3, "def", 4), (6, "ghi", 2), (0, null, 19),
   (9, "mno", 7), (2, null, 7))
 withParquetTable(data, "t") {
@@ -240,6 +239,29 @@ abstract class ParquetAggregatePushDownSuite
 }
   }
 
+  test("aggregate push down - aggregate with partition filter can be pushed 
down") {
+withTempPath { dir =>
+  spark.range(10).selectExpr("id", "id % 3 as p")
+.write.partitionBy("p").parquet(dir.getCanonicalPath)
+  withTempView("tmp") {
+
spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp");
+Seq("false", "true").foreach { enableVectorizedReader =>
+