[spark] branch master updated: [SPARK-40998][SQL] Rename the error class `_LEGACY_ERROR_TEMP_0040` to `INVALID_IDENTIFIER`

2022-11-02 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a3c6cd64512 [SPARK-40998][SQL] Rename the error class 
`_LEGACY_ERROR_TEMP_0040` to `INVALID_IDENTIFIER`
a3c6cd64512 is described below

commit a3c6cd64512f2a1f5ad312f4a06db1935e514a07
Author: Max Gekk 
AuthorDate: Thu Nov 3 08:44:43 2022 +0300

[SPARK-40998][SQL] Rename the error class `_LEGACY_ERROR_TEMP_0040` to 
`INVALID_IDENTIFIER`

### What changes were proposed in this pull request?
In the PR, I propose to assign the proper name `INVALID_IDENTIFIER ` to the 
legacy error class `_LEGACY_ERROR_TEMP_0040 `, and modify test suite to use 
`checkError()` which checks the error class name, context and etc.

### Why are the changes needed?
Proper name improves user experience w/ Spark SQL.

### Does this PR introduce _any_ user-facing change?
Yes, the PR changes an user-facing error message.

### How was this patch tested?
By running the modified test suites:
```
$ build/sbt "core/testOnly *SparkThrowableSuite"
$ build/sbt "test:testOnly *ErrorParserSuite"
```

Closes #38484 from MaxGekk/invalid-identifier-error-class.

Authored-by: Max Gekk 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json   | 10 ++--
 .../spark/sql/catalyst/parser/ParseDriver.scala|  2 +-
 .../spark/sql/errors/QueryParsingErrors.scala  |  4 +-
 .../sql/catalyst/parser/ErrorParserSuite.scala | 66 +++---
 4 files changed, 41 insertions(+), 41 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 7ec5e11a206..d1b4c4f030c 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -569,6 +569,11 @@
 ],
 "sqlState" : "22023"
   },
+  "INVALID_IDENTIFIER" : {
+"message" : [
+  "The identifier  is invalid. Please, consider quoting it with 
back-quotes as ``."
+]
+  },
   "INVALID_JSON_SCHEMA_MAP_TYPE" : {
 "message" : [
   "Input schema  can only contain STRING as a key type for a 
MAP."
@@ -1376,11 +1381,6 @@
   "Unsupported SQL statement."
 ]
   },
-  "_LEGACY_ERROR_TEMP_0040" : {
-"message" : [
-  "Possibly unquoted identifier  detected. Please consider quoting 
it with back-quotes as ``."
-]
-  },
   "_LEGACY_ERROR_TEMP_0041" : {
 "message" : [
   "Found duplicate clauses: ."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala
index 498d2d9ee13..10a213373ad 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala
@@ -325,7 +325,7 @@ case object PostProcessor extends SqlBaseParserBaseListener 
{
   override def exitErrorIdent(ctx: SqlBaseParser.ErrorIdentContext): Unit = {
 val ident = ctx.getParent.getText
 
-throw QueryParsingErrors.unquotedIdentifierError(ident, ctx)
+throw QueryParsingErrors.invalidIdentifierError(ident, ctx)
   }
 
   /** Remove the back ticks from an Identifier. */
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
index 204b28f3725..1fce265bece 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
@@ -448,9 +448,9 @@ private[sql] object QueryParsingErrors extends 
QueryErrorsBase {
   Some("_LEGACY_ERROR_TEMP_0039"))
   }
 
-  def unquotedIdentifierError(ident: String, ctx: ErrorIdentContext): 
Throwable = {
+  def invalidIdentifierError(ident: String, ctx: ErrorIdentContext): Throwable 
= {
 new ParseException(
-  errorClass = "_LEGACY_ERROR_TEMP_0040",
+  errorClass = "INVALID_IDENTIFIER",
   messageParameters = Map("ident" -> ident),
   ctx)
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ErrorParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ErrorParserSuite.scala
index b48a950d9d5..e88ccae2ac5 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ErrorParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ErrorParserSuite.scala
@@ -42,22 +42,22 @@ class ErrorParserSuite extends AnalysisTest {
   test("hyphen in identifier - DDL tests") {
 checkError(
   exception = parseException("USE test-test"),
-  errorClass = "_LEGACY_ERROR_TEMP_0040",
+

[spark] branch master updated: [SPARK-40977][CONNECT][PYTHON] Complete Support for Union in Python client

2022-11-02 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 10722044f42 [SPARK-40977][CONNECT][PYTHON] Complete Support for Union 
in Python client
10722044f42 is described below

commit 10722044f429b1a825018673ca139d698559f6fa
Author: Rui Wang 
AuthorDate: Thu Nov 3 13:53:23 2022 +0900

[SPARK-40977][CONNECT][PYTHON] Complete Support for Union in Python client

### What changes were proposed in this pull request?

1. Improve testing coverage for `Union` and `UnionAll` (they are actually 
both `UnionAll`)
2. Add the API which does `UnionByName`.

### Why are the changes needed?

Improve API Coverage.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT

Closes #38453 from amaliujia/python_union.

Authored-by: Rui Wang 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/dataframe.py| 27 ++
 python/pyspark/sql/connect/plan.py |  6 -
 .../sql/tests/connect/test_connect_plan_only.py| 10 
 3 files changed, 42 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index b9ddb0db300..b9ba4b99ba0 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -293,6 +293,33 @@ class DataFrame(object):
 raise ValueError("Argument to Union does not contain a valid 
plan.")
 return DataFrame.withPlan(plan.UnionAll(self._plan, other._plan), 
session=self._session)
 
+def unionByName(self, other: "DataFrame", allowMissingColumns: bool = 
False) -> "DataFrame":
+"""Returns a new :class:`DataFrame` containing union of rows in this 
and another
+:class:`DataFrame`.
+
+This is different from both `UNION ALL` and `UNION DISTINCT` in SQL. 
To do a SQL-style set
+union (that does deduplication of elements), use this function 
followed by :func:`distinct`.
+
+.. versionadded:: 3.4.0
+
+Parameters
+--
+other : :class:`DataFrame`
+Another :class:`DataFrame` that needs to be combined.
+allowMissingColumns : bool, optional, default False
+   Specify whether to allow missing columns.
+
+Returns
+---
+:class:`DataFrame`
+Combined DataFrame.
+"""
+if other._plan is None:
+raise ValueError("Argument to UnionByName does not contain a valid 
plan.")
+return DataFrame.withPlan(
+plan.UnionAll(self._plan, other._plan, allowMissingColumns), 
session=self._session
+)
+
 def where(self, condition: Expression) -> "DataFrame":
 return self.filter(condition)
 
diff --git a/python/pyspark/sql/connect/plan.py 
b/python/pyspark/sql/connect/plan.py
index 2f1f70ec1a9..cc59a493d5a 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -606,9 +606,12 @@ class Join(LogicalPlan):
 
 
 class UnionAll(LogicalPlan):
-def __init__(self, child: Optional["LogicalPlan"], other: "LogicalPlan") 
-> None:
+def __init__(
+self, child: Optional["LogicalPlan"], other: "LogicalPlan", by_name: 
bool = False
+) -> None:
 super().__init__(child)
 self.other = other
+self.by_name = by_name
 
 def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation:
 assert self._child is not None
@@ -617,6 +620,7 @@ class UnionAll(LogicalPlan):
 rel.set_op.right_input.CopyFrom(self.other.plan(session))
 rel.set_op.set_op_type = proto.SetOperation.SET_OP_TYPE_UNION
 rel.set_op.is_all = True
+rel.set_op.by_name = self.by_name
 return rel
 
 def print(self, indent: int = 0) -> str:
diff --git a/python/pyspark/sql/tests/connect/test_connect_plan_only.py 
b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
index e40a54b7d0c..8a9b98e73fd 100644
--- a/python/pyspark/sql/tests/connect/test_connect_plan_only.py
+++ b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
@@ -190,6 +190,16 @@ class SparkConnectTestsPlanOnly(PlanOnlyTestFixture):
 self.assertIsNotNone(plan.root, "Root relation must be set")
 self.assertIsNotNone(plan.root.read)
 
+def test_union(self):
+df1 = self.connect.readTable(table_name=self.tbl_name)
+df2 = self.connect.readTable(table_name=self.tbl_name)
+plan1 = df1.union(df2)._plan.to_proto(self.connect)
+self.assertTrue(plan1.root.set_op.is_all)
+plan2 = df1.union(df2)._plan.to_proto(self.connect)
+self.assertTrue(plan2.root.set_op.is_all)
+plan3 = df1.unionByName(df2, True)._plan.to_proto(self.connect)

[spark] branch master updated: [SPARK-40995][CONNECT][DOC][FOLLOW-UP] Fix the type in the doc name

2022-11-02 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 39824f1dae2 [SPARK-40995][CONNECT][DOC][FOLLOW-UP] Fix the type in the 
doc name
39824f1dae2 is described below

commit 39824f1dae2caa62292f75c818c0d28d281bc415
Author: Rui Wang 
AuthorDate: Thu Nov 3 11:09:11 2022 +0900

[SPARK-40995][CONNECT][DOC][FOLLOW-UP] Fix the type in the doc name

### What changes were proposed in this pull request?

Fix the type in the doc filename: `coient` -> `client`.

### Why are the changes needed?

Fix typo.

### Does this PR introduce _any_ user-facing change?

NO
### How was this patch tested?

UT

Closes #38487 from amaliujia/follow_up_docs.

Authored-by: Rui Wang 
Signed-off-by: Hyukjin Kwon 
---
 .../docs/{coient-connection-string.md => client-connection-string.md} | 0
 1 file changed, 0 insertions(+), 0 deletions(-)

diff --git a/connector/connect/docs/coient-connection-string.md 
b/connector/connect/docs/client-connection-string.md
similarity index 100%
rename from connector/connect/docs/coient-connection-string.md
rename to connector/connect/docs/client-connection-string.md


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (e63b7da85e4 -> adb41ca8480)

2022-11-02 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from e63b7da85e4 [SPARK-40995][CONNECT][DOC] Defining Spark Connect Client 
Connection String
 add adb41ca8480 [SPARK-40989][CONNECT][PYTHON][TESTS] Improve 
`session.sql` testing coverage in Python client

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/tests/connect/test_connect_basic.py | 4 
 python/pyspark/sql/tests/connect/test_connect_plan_only.py | 4 
 python/pyspark/testing/connectutils.py | 8 +++-
 3 files changed, 15 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (c4e6b2cecee -> e63b7da85e4)

2022-11-02 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from c4e6b2cecee [SPARK-40985][BUILD] Upgrade RoaringBitmap to 0.9.35
 add e63b7da85e4 [SPARK-40995][CONNECT][DOC] Defining Spark Connect Client 
Connection String

No new revisions were added by this update.

Summary of changes:
 .../pyspark/sql => connector}/connect/README.md|  26 -
 connector/connect/docs/coient-connection-string.md | 116 +
 2 files changed, 137 insertions(+), 5 deletions(-)
 rename {python/pyspark/sql => connector}/connect/README.md (72%)
 create mode 100644 connector/connect/docs/coient-connection-string.md


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-40985][BUILD] Upgrade RoaringBitmap to 0.9.35

2022-11-02 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c4e6b2cecee [SPARK-40985][BUILD] Upgrade RoaringBitmap to 0.9.35
c4e6b2cecee is described below

commit c4e6b2cecee612035651c32ff5aba3bd2a17a283
Author: yangjie01 
AuthorDate: Wed Nov 2 10:46:55 2022 -0500

[SPARK-40985][BUILD] Upgrade RoaringBitmap to 0.9.35

### What changes were proposed in this pull request?
This pr aims upgrade RoaringBitmap 0.9.35

### Why are the changes needed?
This version bring some bug fix:

- https://github.com/RoaringBitmap/RoaringBitmap/pull/587
- https://github.com/RoaringBitmap/RoaringBitmap/issues/588

other changes as follows:

https://github.com/RoaringBitmap/RoaringBitmap/compare/0.9.32...0.9.35

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

Closes #38465 from LuciferYang/rbitmap-0935.

Authored-by: yangjie01 
Signed-off-by: Sean Owen 
---
 core/benchmarks/MapStatusesConvertBenchmark-jdk11-results.txt |  8 
 core/benchmarks/MapStatusesConvertBenchmark-jdk17-results.txt | 10 +-
 core/benchmarks/MapStatusesConvertBenchmark-results.txt   | 10 +-
 dev/deps/spark-deps-hadoop-2-hive-2.3 |  4 ++--
 dev/deps/spark-deps-hadoop-3-hive-2.3 |  4 ++--
 pom.xml   |  2 +-
 6 files changed, 19 insertions(+), 19 deletions(-)

diff --git a/core/benchmarks/MapStatusesConvertBenchmark-jdk11-results.txt 
b/core/benchmarks/MapStatusesConvertBenchmark-jdk11-results.txt
index adac80834e4..06f7cc7c92c 100644
--- a/core/benchmarks/MapStatusesConvertBenchmark-jdk11-results.txt
+++ b/core/benchmarks/MapStatusesConvertBenchmark-jdk11-results.txt
@@ -2,12 +2,12 @@
 MapStatuses Convert Benchmark
 

 
-OpenJDK 64-Bit Server VM 11.0.16+8-LTS on Linux 5.15.0-1019-azure
+OpenJDK 64-Bit Server VM 11.0.16.1+1 on Linux 5.15.0-1022-azure
 Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
 MapStatuses Convert:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Num Maps: 5 Fetch partitions:500   1269   1276 
  8  0.0  1268666001.0   1.0X
-Num Maps: 5 Fetch partitions:1000  2672   2695 
 39  0.0  2671542753.0   0.5X
-Num Maps: 5 Fetch partitions:1500  4034   4069 
 50  0.0  4033696987.0   0.3X
+Num Maps: 5 Fetch partitions:500   1227   1262 
 47  0.0  1226744907.0   1.0X
+Num Maps: 5 Fetch partitions:1000  2620   2637 
 15  0.0  2620288061.0   0.5X
+Num Maps: 5 Fetch partitions:1500  3975   3990 
 17  0.0  3974979610.0   0.3X
 
 
diff --git a/core/benchmarks/MapStatusesConvertBenchmark-jdk17-results.txt 
b/core/benchmarks/MapStatusesConvertBenchmark-jdk17-results.txt
index 9911ae3326f..3b6f5c6695e 100644
--- a/core/benchmarks/MapStatusesConvertBenchmark-jdk17-results.txt
+++ b/core/benchmarks/MapStatusesConvertBenchmark-jdk17-results.txt
@@ -2,12 +2,12 @@
 MapStatuses Convert Benchmark
 

 
-OpenJDK 64-Bit Server VM 17.0.4+8-LTS on Linux 5.15.0-1019-azure
-Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz
+OpenJDK 64-Bit Server VM 17.0.4.1+1 on Linux 5.15.0-1022-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
 MapStatuses Convert:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Num Maps: 5 Fetch partitions:500   1228   1238 
 17  0.0  1228191051.0   1.0X
-Num Maps: 5 Fetch partitions:1000  2380   2393 
 16  0.0  2379601524.0   0.5X
-Num Maps: 5 Fetch partitions:1500  3803   3857 
 55  0.0  3802550172.0   0.3X
+Num Maps: 5 Fetch partitions:500   1159   1184 
 38  0.0  1159155979.0   1.0X
+Num Maps: 5 Fetch partitions:1000  2329   2387 
 57  0.0  2328833805.0   0.5X
+Num Maps: 5 Fetch partitions:1500  3608   3712 
 92  0.0  3607631972.0 

[spark] branch master updated: [SPARK-40957] Add in memory cache in HDFSMetadataLog

2022-11-02 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5fa2c13cbf8 [SPARK-40957] Add in memory cache in HDFSMetadataLog
5fa2c13cbf8 is described below

commit 5fa2c13cbf83c6c4c040f15bbbf66dbe49581bdc
Author: Jerry Peng 
AuthorDate: Wed Nov 2 22:24:16 2022 +0900

[SPARK-40957] Add in memory cache in HDFSMetadataLog

### What changes were proposed in this pull request?

Every time entries in offset log or commit log needs to be access, we read 
from disk which is slow.  Can a cache of recent entries to speed up reads.

There is already an existing implementation of a caching mechanism in 
OffsetSeqLog.  Lets replace it with an implementation in HDFSMetadataLog 
(parent class) so that we can support reading from in memory cache for both 
offset log and commit log.

### Why are the changes needed?

Improve read speeds for entries in offset log and commit log

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing unit tests should suffice

Closes #38430 from jerrypeng/SPARK-40957.

Authored-by: Jerry Peng 
Signed-off-by: Jungtaek Lim 
---
 .../org/apache/spark/sql/internal/SQLConf.scala|   8 ++
 .../sql/execution/streaming/HDFSMetadataLog.scala  | 112 ++---
 .../sql/execution/streaming/OffsetSeqLog.scala |  18 
 3 files changed, 85 insertions(+), 53 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index abe9df8dd87..0f3dc3cf44c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2007,6 +2007,14 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val STREAMING_METADATA_CACHE_ENABLED =
+buildConf("spark.sql.streaming.metadataCache.enabled")
+  .internal()
+  .doc("Whether the streaming HDFSMetadataLog caches the metadata of the 
latest two batches.")
+  .booleanConf
+  .createWithDefault(true)
+
+
   val VARIABLE_SUBSTITUTE_ENABLED =
 buildConf("spark.sql.variable.substitute")
   .doc("This enables substitution using syntax like `${var}`, 
`${system:var}`, " +
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 8a037b55168..1d444655548 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -19,7 +19,9 @@ package org.apache.spark.sql.execution.streaming
 
 import java.io._
 import java.nio.charset.StandardCharsets
+import java.util.{Collections, LinkedHashMap => JLinkedHashMap}
 
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 import org.apache.commons.io.IOUtils
@@ -30,6 +32,7 @@ import org.json4s.jackson.Serialization
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.internal.SQLConf
 
 
 /**
@@ -64,6 +67,17 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: 
SparkSession, path:
 fileManager.mkdirs(metadataPath)
   }
 
+  protected val metadataCacheEnabled: Boolean
+  = 
sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_METADATA_CACHE_ENABLED)
+
+  /**
+   * Cache the latest two batches. [[StreamExecution]] usually just accesses 
the latest two batches
+   * when committing offsets, this cache will save some file system operations.
+   */
+  protected[sql] val batchCache = Collections.synchronizedMap(new 
JLinkedHashMap[Long, T](2) {
+override def removeEldestEntry(e: java.util.Map.Entry[Long, T]): Boolean = 
size > 2
+  })
+
   /**
* A `PathFilter` to filter only batch files
*/
@@ -113,10 +127,18 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
*/
   override def add(batchId: Long, metadata: T): Boolean = {
 require(metadata != null, "'null' metadata cannot written to a metadata 
log")
-addNewBatchByStream(batchId) { output => serialize(metadata, output) }
+val res = addNewBatchByStream(batchId) { output => serialize(metadata, 
output) }
+if (metadataCacheEnabled && res) batchCache.put(batchId, metadata)
+res
   }
 
   override def get(batchId: Long): Option[T] = {
+if (metadataCacheEnabled && batchCache.containsKey(batchId)) {
+  val metadata = batchCache.get(batchId)
+  assert(metadata != null)
+  return Some(metadata)
+}

[spark] branch master updated: [SPARK-40374][SQL] Migrate type check failures of type creators onto error classes

2022-11-02 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 68531ada34d [SPARK-40374][SQL] Migrate type check failures of type 
creators onto error classes
68531ada34d is described below

commit 68531ada34db72d352c39396f85458a8370af812
Author: panbingkun 
AuthorDate: Wed Nov 2 14:51:36 2022 +0300

[SPARK-40374][SQL] Migrate type check failures of type creators onto error 
classes

### What changes were proposed in this pull request?
This pr replaces TypeCheckFailure by DataTypeMismatch in type checks in the 
complex type creator expressions, includes:

1. CreateMap (3): 
https://github.com/apache/spark/blob/1431975723d8df30a25b2333eddcfd0bb6c57677/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala#L205-L214
2. CreateNamedStruct (3): 
https://github.com/apache/spark/blob/1431975723d8df30a25b2333eddcfd0bb6c57677/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala#L445-L457
3. UpdateFields (2): 
https://github.com/apache/spark/blob/1431975723d8df30a25b2333eddcfd0bb6c57677/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala#L670-L673

### Why are the changes needed?
Migration onto error classes unifies Spark SQL error messages.

### Does this PR introduce _any_ user-facing change?
Yes. The PR changes user-facing error messages.

### How was this patch tested?
1. Add new UT
2. Update existed UT
3. Pass GA

Closes #38463 from panbingkun/SPARK-40374.

Authored-by: panbingkun 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json   | 20 ++
 .../catalyst/expressions/complexTypeCreator.scala  | 72 ++-
 .../analysis/ExpressionTypeCheckingSuite.scala | 83 --
 .../catalyst/expressions/ComplexTypeSuite.scala| 47 
 .../main/scala/org/apache/spark/sql/Column.scala   |  2 +-
 .../apache/spark/sql/ColumnExpressionSuite.scala   | 82 -
 6 files changed, 250 insertions(+), 56 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index fe2cd3a44bb..7ec5e11a206 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -138,6 +138,11 @@
   "Unable to convert column  of type  to JSON."
 ]
   },
+  "CANNOT_DROP_ALL_FIELDS" : {
+"message" : [
+  "Cannot drop all fields in struct."
+]
+  },
   "CAST_WITHOUT_SUGGESTION" : {
 "message" : [
   "cannot cast  to ."
@@ -155,6 +160,21 @@
   "To convert values from  to , you can use the 
functions  instead."
 ]
   },
+  "CREATE_MAP_KEY_DIFF_TYPES" : {
+"message" : [
+  "The given keys of function  should all be the same 
type, but they are ."
+]
+  },
+  "CREATE_MAP_VALUE_DIFF_TYPES" : {
+"message" : [
+  "The given values of function  should all be the same 
type, but they are ."
+]
+  },
+  "CREATE_NAMED_STRUCT_WITHOUT_FOLDABLE_STRING" : {
+"message" : [
+  "Only foldable `STRING` expressions are allowed to appear at odd 
position, but they are ."
+]
+  },
   "DATA_DIFF_TYPES" : {
 "message" : [
   "Input to  should all be the same type, but it's 
."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
index 27d4f506ac8..97c882fd176 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
@@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCheckResult, 
TypeCoercion, UnresolvedAttribute, UnresolvedExtractValue}
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.{FUNC_ALIAS, 
FunctionBuilder}
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
+import org.apache.spark.sql.catalyst.expressions.Cast._
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
@@ -202,16 +204,30 @@ case class CreateMap(children: Seq[Expression], 
useStringTypeWhenEmpty: Boolean)
 
   override def checkInputDataTypes(): TypeCheckResult = {
 if (children.size % 2 != 

[spark] branch master updated: [SPARK-40248][SQL] Use larger number of bits to build Bloom filter

2022-11-02 Thread yumwang
This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d627d8e4f48 [SPARK-40248][SQL] Use larger number of bits to build 
Bloom filter
d627d8e4f48 is described below

commit d627d8e4f4802b8200574a1a73c4bebe5d813a5a
Author: Yuming Wang 
AuthorDate: Wed Nov 2 18:05:54 2022 +0800

[SPARK-40248][SQL] Use larger number of bits to build Bloom filter

### What changes were proposed in this pull request?

This PR makes Bloom filter join use larger number of bits to build Bloom 
filter if row count is exist.

### Why are the changes needed?
To fix Bloom filter join cannot filter out more data when CBO is enabled. 
For example: TPC-DS q64:

CBO is enabled | CBO is disabled
-- | --
https://user-images.githubusercontent.com/5399861/187076753-2e9ccc72-0289-4537-a6d9-3a01a37bf6cd.png;>
 | https://user-images.githubusercontent.com/5399861/187076786-c982e711-52e2-4199-ba42-e1100f57287b.png;>
https://user-images.githubusercontent.com/5399861/187075553-bd6956b7-8f1f-4df5-82b7-d010defb6d21.png;>
 | https://user-images.githubusercontent.com/5399861/187075588-254c3246-b9af-403c-8df7-d8344fd1d2a4.png;>

After this PR:

Build bloom filter | Filter data
-- | --
https://user-images.githubusercontent.com/5399861/187075676-85b2afae-03a0-4430-9c4e-2679c6ef62f7.png;>
 | https://user-images.githubusercontent.com/5399861/187075713-41173dc1-d01d-476a-b218-5c67be823e1b.png;>

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #37697 from wangyum/SPARK-40248.

Lead-authored-by: Yuming Wang 
Co-authored-by: Yuming Wang 
Signed-off-by: Yuming Wang 
---
 .../org/apache/spark/util/sketch/BloomFilter.java  |  9 
 .../aggregate/BloomFilterAggregate.scala   | 16 +++
 .../catalyst/optimizer/InjectRuntimeFilter.scala   |  3 +--
 .../approved-plans-modified/q10.sf100/explain.txt  |  8 
 .../q10.sf100/simplified.txt   |  2 +-
 .../approved-plans-modified/q59.sf100/explain.txt  | 16 +++
 .../q59.sf100/simplified.txt   |  4 ++--
 .../approved-plans-v1_4/q10.sf100/explain.txt  |  8 
 .../approved-plans-v1_4/q10.sf100/simplified.txt   |  2 +-
 .../approved-plans-v1_4/q16.sf100/explain.txt  | 24 +++---
 .../approved-plans-v1_4/q16.sf100/simplified.txt   |  6 +++---
 .../approved-plans-v1_4/q2.sf100/explain.txt   | 16 +++
 .../approved-plans-v1_4/q2.sf100/simplified.txt|  4 ++--
 .../approved-plans-v1_4/q32.sf100/explain.txt  |  8 
 .../approved-plans-v1_4/q32.sf100/simplified.txt   |  2 +-
 .../approved-plans-v1_4/q40.sf100/explain.txt  |  8 
 .../approved-plans-v1_4/q40.sf100/simplified.txt   |  2 +-
 .../approved-plans-v1_4/q59.sf100/explain.txt  | 16 +++
 .../approved-plans-v1_4/q59.sf100/simplified.txt   |  4 ++--
 .../approved-plans-v1_4/q64.sf100/explain.txt  |  8 
 .../approved-plans-v1_4/q64.sf100/simplified.txt   |  2 +-
 .../approved-plans-v1_4/q69.sf100/explain.txt  |  8 
 .../approved-plans-v1_4/q69.sf100/simplified.txt   |  2 +-
 .../approved-plans-v1_4/q80.sf100/explain.txt  | 16 +++
 .../approved-plans-v1_4/q80.sf100/simplified.txt   |  4 ++--
 .../approved-plans-v1_4/q85.sf100/explain.txt  | 16 +++
 .../approved-plans-v1_4/q85.sf100/simplified.txt   |  4 ++--
 .../approved-plans-v1_4/q92.sf100/explain.txt  |  8 
 .../approved-plans-v1_4/q92.sf100/simplified.txt   |  2 +-
 .../approved-plans-v1_4/q94.sf100/explain.txt  | 24 +++---
 .../approved-plans-v1_4/q94.sf100/simplified.txt   |  6 +++---
 .../approved-plans-v1_4/q95.sf100/explain.txt  | 24 +++---
 .../approved-plans-v1_4/q95.sf100/simplified.txt   |  6 +++---
 .../approved-plans-v2_7/q10a.sf100/explain.txt |  8 
 .../approved-plans-v2_7/q10a.sf100/simplified.txt  |  2 +-
 .../approved-plans-v2_7/q64.sf100/explain.txt  |  8 
 .../approved-plans-v2_7/q64.sf100/simplified.txt   |  2 +-
 .../approved-plans-v2_7/q80a.sf100/explain.txt | 16 +++
 .../approved-plans-v2_7/q80a.sf100/simplified.txt  |  4 ++--
 .../spark/sql/BloomFilterAggregateQuerySuite.scala | 17 +++
 40 files changed, 189 insertions(+), 156 deletions(-)

diff --git 
a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java 
b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java
index 2a6e270a912..5c01841e501 100644
--- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java
+++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java

[spark] branch master updated: [SPARK-40883][CONNECT][FOLLOW-UP] Range.step is required and Python client should have a default value=1

2022-11-02 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f03fdf90281 [SPARK-40883][CONNECT][FOLLOW-UP] Range.step is required 
and Python client should have a default value=1
f03fdf90281 is described below

commit f03fdf90281d67065b9ab211b5cd9cfbe5742614
Author: Rui Wang 
AuthorDate: Wed Nov 2 14:10:13 2022 +0800

[SPARK-40883][CONNECT][FOLLOW-UP] Range.step is required and Python client 
should have a default value=1

### What changes were proposed in this pull request?

To match existing Python DataFarme API, this PR changes the `Range.step` as 
required and Python client keep `1` as a default value for this field.

### Why are the changes needed?

Matching existing DataFrame API.

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

UT

Closes #38471 from amaliujia/range_step_required.

Authored-by: Rui Wang 
Signed-off-by: Ruifeng Zheng 
---
 .../main/protobuf/spark/connect/relations.proto|  8 ++--
 .../org/apache/spark/sql/connect/dsl/package.scala |  4 +++-
 .../sql/connect/planner/SparkConnectPlanner.scala  |  6 +-
 python/pyspark/sql/connect/client.py   |  2 +-
 python/pyspark/sql/connect/plan.py |  7 ++-
 python/pyspark/sql/connect/proto/relations_pb2.py  | 14 ++
 python/pyspark/sql/connect/proto/relations_pb2.pyi | 22 --
 .../sql/tests/connect/test_connect_plan_only.py|  4 ++--
 python/pyspark/testing/connectutils.py |  2 +-
 9 files changed, 22 insertions(+), 47 deletions(-)

diff --git a/connector/connect/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/src/main/protobuf/spark/connect/relations.proto
index a4503204aa1..deb35525728 100644
--- a/connector/connect/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/src/main/protobuf/spark/connect/relations.proto
@@ -226,16 +226,12 @@ message Range {
   int64 start = 1;
   // Required.
   int64 end = 2;
-  // Optional. Default value = 1
-  Step step = 3;
+  // Required.
+  int64 step = 3;
   // Optional. Default value is assigned by 1) SQL conf 
"spark.sql.leafNodeDefaultParallelism" if
   // it is set, or 2) spark default parallelism.
   NumPartitions num_partitions = 4;
 
-  message Step {
-int64 step = 1;
-  }
-
   message NumPartitions {
 int32 num_partitions = 1;
   }
diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index f649d040721..e2030c9ad31 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -190,7 +190,9 @@ package object dsl {
 }
 range.setEnd(end)
 if (step.isDefined) {
-  range.setStep(proto.Range.Step.newBuilder().setStep(step.get))
+  range.setStep(step.get)
+} else {
+  range.setStep(1L)
 }
 if (numPartitions.isDefined) {
   range.setNumPartitions(
diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index eea2579e61f..f5c6980290f 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -110,11 +110,7 @@ class SparkConnectPlanner(plan: proto.Relation, session: 
SparkSession) {
   private def transformRange(rel: proto.Range): LogicalPlan = {
 val start = rel.getStart
 val end = rel.getEnd
-val step = if (rel.hasStep) {
-  rel.getStep.getStep
-} else {
-  1
-}
+val step = rel.getStep
 val numPartitions = if (rel.hasNumPartitions) {
   rel.getNumPartitions.getNumPartitions
 } else {
diff --git a/python/pyspark/sql/connect/client.py 
b/python/pyspark/sql/connect/client.py
index e64d612c53e..c845d378320 100644
--- a/python/pyspark/sql/connect/client.py
+++ b/python/pyspark/sql/connect/client.py
@@ -149,7 +149,7 @@ class RemoteSparkSession(object):
 self,
 start: int,
 end: int,
-step: Optional[int] = None,
+step: int = 1,
 numPartitions: Optional[int] = None,
 ) -> DataFrame:
 """
diff --git a/python/pyspark/sql/connect/plan.py 
b/python/pyspark/sql/connect/plan.py
index 71c971d9e91..2f1f70ec1a9 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -705,7 +705,7 @@ class Range(LogicalPlan):
 

[spark] branch master updated: [SPARK-40991][PYTHON] Update `cloudpickle` to v2.2.0

2022-11-02 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9d30e3365c1 [SPARK-40991][PYTHON] Update `cloudpickle` to v2.2.0
9d30e3365c1 is described below

commit 9d30e3365c17b94438290e59e6900ada570b94cc
Author: Dongjoon Hyun 
AuthorDate: Wed Nov 2 15:02:53 2022 +0900

[SPARK-40991][PYTHON] Update `cloudpickle` to v2.2.0

### What changes were proposed in this pull request?

This PR aims to update `cloudpickle` to `v2.2.0` for Apache Spark 3.4.0.

### Why are the changes needed?

SPARK-37457 updated `cloudpickle` v2.0.0 at Apache Spark 3.3.0.

To bring the latest bug fixes.
- https://github.com/cloudpipe/cloudpickle/releases/tag/v2.2.0
- https://github.com/cloudpipe/cloudpickle/releases/tag/2.1.0

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

Closes #38474 from dongjoon-hyun/SPARK-40991.

Authored-by: Dongjoon Hyun 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/cloudpickle/__init__.py |  5 +-
 python/pyspark/cloudpickle/cloudpickle.py  | 77 --
 python/pyspark/cloudpickle/cloudpickle_fast.py | 91 ++
 3 files changed, 90 insertions(+), 83 deletions(-)

diff --git a/python/pyspark/cloudpickle/__init__.py 
b/python/pyspark/cloudpickle/__init__.py
index 0ae79b5535c..efbf1178d43 100644
--- a/python/pyspark/cloudpickle/__init__.py
+++ b/python/pyspark/cloudpickle/__init__.py
@@ -1,6 +1,3 @@
-from __future__ import absolute_import
-
-
 from pyspark.cloudpickle.cloudpickle import *  # noqa
 from pyspark.cloudpickle.cloudpickle_fast import CloudPickler, dumps, dump  # 
noqa
 
@@ -8,4 +5,4 @@ from pyspark.cloudpickle.cloudpickle_fast import CloudPickler, 
dumps, dump  # no
 # expose their Pickler subclass at top-level under the  "Pickler" name.
 Pickler = CloudPickler
 
-__version__ = '2.0.0'
+__version__ = '2.2.0'
diff --git a/python/pyspark/cloudpickle/cloudpickle.py 
b/python/pyspark/cloudpickle/cloudpickle.py
index 347b3869580..317be69151a 100644
--- a/python/pyspark/cloudpickle/cloudpickle.py
+++ b/python/pyspark/cloudpickle/cloudpickle.py
@@ -40,7 +40,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 
(INCLUDING
 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 """
-from __future__ import print_function
 
 import builtins
 import dis
@@ -56,7 +55,7 @@ import warnings
 
 from .compat import pickle
 from collections import OrderedDict
-from typing import Generic, Union, Tuple, Callable
+from typing import ClassVar, Generic, Union, Tuple, Callable
 from pickle import _getattribute
 from importlib._bootstrap import _find_spec
 
@@ -66,11 +65,6 @@ try:  # pragma: no branch
 except ImportError:
 _typing_extensions = Literal = Final = None
 
-if sys.version_info >= (3, 5, 3):
-from typing import ClassVar
-else:  # pragma: no cover
-ClassVar = None
-
 if sys.version_info >= (3, 8):
 from types import CellType
 else:
@@ -327,11 +321,10 @@ def _extract_code_globals(co):
 """
 out_names = _extract_code_globals_cache.get(co)
 if out_names is None:
-names = co.co_names
 # We use a dict with None values instead of a set to get a
 # deterministic order (assuming Python 3.6+) and avoid introducing
 # non-deterministic pickle bytes as a results.
-out_names = {names[oparg]: None for _, oparg in _walk_global_ops(co)}
+out_names = {name: None for name in _walk_global_ops(co)}
 
 # Declaring a function inside another one using the "def ..."
 # syntax generates a constant code object corresponding to the one
@@ -517,13 +510,12 @@ def _builtin_type(name):
 
 def _walk_global_ops(code):
 """
-Yield (opcode, argument number) tuples for all
-global-referencing instructions in *code*.
+Yield referenced name for all global-referencing instructions in *code*.
 """
 for instr in dis.get_instructions(code):
 op = instr.opcode
 if op in GLOBAL_OPS:
-yield op, instr.arg
+yield instr.argval
 
 
 def _extract_class_dict(cls):
@@ -604,43 +596,21 @@ def parametrized_type_hint_getinitargs(obj):
 elif type(obj) is type(ClassVar):
 initargs = (ClassVar, obj.__type__)
 elif type(obj) is type(Generic):
-parameters = obj.__parameters__
-if len(obj.__parameters__) > 0:
-# in early Python 3.5, __parameters__ was sometimes
-# preferred to __args__
-initargs = (obj.__origin__, parameters)
-
-else:
-initargs = (obj.__origin__, obj.__args__)
+initargs = (obj.__origin__, obj.__args__)
 elif type(obj) is