[spark] branch master updated (72cce5c39da -> 1c55e94d6a9)

2022-11-21 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 72cce5c39da [SPARK-40872] Fallback to original shuffle block when a 
push-merged shuffle chunk is zero-size
 add 1c55e94d6a9 [SPARK-41211][BUILD] Upgrade ZooKeeper from 3.6.2 to 3.6.3

No new revisions were added by this update.

Summary of changes:
 dev/deps/spark-deps-hadoop-2-hive-2.3 | 4 ++--
 dev/deps/spark-deps-hadoop-3-hive-2.3 | 4 ++--
 pom.xml   | 2 +-
 3 files changed, 5 insertions(+), 5 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41154][SQL] Incorrect relation caching for queries with time travel spec

2022-11-21 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 07427b854be [SPARK-41154][SQL] Incorrect relation caching for queries 
with time travel spec
07427b854be is described below

commit 07427b854be58810bd485c00c5e5c576d5aa404e
Author: ulysses-you 
AuthorDate: Mon Nov 21 18:19:10 2022 +0800

[SPARK-41154][SQL] Incorrect relation caching for queries with time travel 
spec

### What changes were proposed in this pull request?

Add TimeTravelSpec to the key of relation cache in AnalysisContext.

### Why are the changes needed?

Correct the relation resolution for the same table but different 
TimeTravelSpec.

### Does this PR introduce _any_ user-facing change?

yes, bug fix

### How was this patch tested?

add test

Closes #38687 from ulysses-you/time-travel-spec.

Authored-by: ulysses-you 
Signed-off-by: Wenchen Fan 
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala   | 11 ++-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala  | 17 +
 2 files changed, 23 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 104c5c1e080..8cdf83f4be7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -113,9 +113,9 @@ object FakeV2SessionCatalog extends TableCatalog with 
FunctionCatalog {
  * @param nestedViewDepth The nested depth in the view resolution, this 
enables us to limit the
  *depth of nested views.
  * @param maxNestedViewDepth The maximum allowed depth of nested view 
resolution.
- * @param relationCache A mapping from qualified table names to resolved 
relations. This can ensure
- *  that the table is resolved only once if a table is 
used multiple times
- *  in a query.
+ * @param relationCache A mapping from qualified table names and time travel 
spec to resolved
+ *  relations. This can ensure that the table is resolved 
only once if a table
+ *  is used multiple times in a query.
  * @param referredTempViewNames All the temp view names referred by the 
current view we are
  *  resolving. It's used to make sure the relation 
resolution is
  *  consistent between view creation and view 
resolution. For example,
@@ -129,7 +129,8 @@ case class AnalysisContext(
 catalogAndNamespace: Seq[String] = Nil,
 nestedViewDepth: Int = 0,
 maxNestedViewDepth: Int = -1,
-relationCache: mutable.Map[Seq[String], LogicalPlan] = mutable.Map.empty,
+relationCache: mutable.Map[(Seq[String], Option[TimeTravelSpec]), 
LogicalPlan] =
+  mutable.Map.empty,
 referredTempViewNames: Seq[Seq[String]] = Seq.empty,
 // 1. If we are resolving a view, this field will be restored from the 
view metadata,
 //by calling `AnalysisContext.withAnalysisContext(viewDesc)`.
@@ -1239,7 +1240,7 @@ class Analyzer(override val catalogManager: 
CatalogManager)
   resolveTempView(u.multipartIdentifier, u.isStreaming, 
timeTravelSpec.isDefined).orElse {
 expandIdentifier(u.multipartIdentifier) match {
   case CatalogAndIdentifier(catalog, ident) =>
-val key = catalog.name +: ident.namespace :+ ident.name
+val key = ((catalog.name +: ident.namespace :+ ident.name).toSeq, 
timeTravelSpec)
 AnalysisContext.get.relationCache.get(key).map(_.transform {
   case multi: MultiInstanceRelation =>
 val newRelation = multi.newInstance()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index de8612c3348..ea93367b3db 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2775,6 +2775,23 @@ class DataSourceV2SQLSuiteV1Filter extends 
DataSourceV2SQLSuite with AlterTableT
 }
   }
 
+  test("SPARK-41154: Incorrect relation caching for queries with time travel 
spec") {
+sql("use testcat")
+val t1 = "testcat.t1"
+val t2 = "testcat.t2"
+withTable(t1, t2) {
+  sql(s"CREATE TABLE $t1 USING foo AS SELECT 1 as c")
+  sql(s"CREATE TABLE $t2 USING foo AS SELECT 2 as c")
+  assert(
+sql("""
+  |SELECT * FROM t VERSION AS OF '1'
+  |UNION ALL
+  |SELECT * F

[spark] branch master updated (07427b854be -> d9abbfe5f71)

2022-11-21 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 07427b854be [SPARK-41154][SQL] Incorrect relation caching for queries 
with time travel spec
 add d9abbfe5f71 [SPARK-41195][SQL] Support PIVOT/UNPIVOT with join children

No new revisions were added by this update.

Summary of changes:
 docs/sql-ref-syntax-qry-select.md  |   2 +
 .../spark/sql/catalyst/parser/SqlBaseParser.g4 |   8 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala | 105 ---
 .../spark/sql/errors/QueryParsingErrors.scala  |   8 +-
 .../sql/catalyst/parser/UnpivotParserSuite.scala   | 148 +
 5 files changed, 219 insertions(+), 52 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41005][COLLECT][FOLLOWUP] Remove JSON code path and use `RDD.collect` in Arrow code path

2022-11-21 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 908adca2b22 [SPARK-41005][COLLECT][FOLLOWUP] Remove JSON code path and 
use `RDD.collect` in Arrow code path
908adca2b22 is described below

commit 908adca2b229b05b2ae0dd31cbaaa1fdcde16290
Author: Ruifeng Zheng 
AuthorDate: Tue Nov 22 09:43:44 2022 +0900

[SPARK-41005][COLLECT][FOLLOWUP] Remove JSON code path and use 
`RDD.collect` in Arrow code path

### What changes were proposed in this pull request?
1, Remove JSON code path;
2, use RDD.collect in Arrow code path, since existing tests were already 
broken in Arrow code path;
3, reenable `test_fill_na`

### Why are the changes needed?
existing Arrow code path is still problematic and it fails and fallback to 
JSON code path, which change the output datatypes of `test_fill_na`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
reenabled test and added UT

Closes #38706 from zhengruifeng/collect_disable_json.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 .../src/main/protobuf/spark/connect/base.proto |  14 +-
 .../service/SparkConnectStreamHandler.scala| 156 ++---
 python/pyspark/sql/connect/client.py   |   5 -
 python/pyspark/sql/connect/proto/base_pb2.py   |  41 ++
 python/pyspark/sql/connect/proto/base_pb2.pyi  |  51 +--
 .../sql/tests/connect/test_connect_basic.py|  55 +++-
 6 files changed, 82 insertions(+), 240 deletions(-)

diff --git a/connector/connect/src/main/protobuf/spark/connect/base.proto 
b/connector/connect/src/main/protobuf/spark/connect/base.proto
index 66e27187153..277da6b2431 100644
--- a/connector/connect/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/src/main/protobuf/spark/connect/base.proto
@@ -139,11 +139,7 @@ message ExecutePlanRequest {
 message ExecutePlanResponse {
   string client_id = 1;
 
-  // Result type
-  oneof result_type {
-ArrowBatch arrow_batch = 2;
-JSONBatch json_batch = 3;
-  }
+  ArrowBatch arrow_batch = 2;
 
   // Metrics for the query execution. Typically, this field is only present in 
the last
   // batch of results and then represent the overall state of the query 
execution.
@@ -155,14 +151,6 @@ message ExecutePlanResponse {
 bytes data = 2;
   }
 
-  // Message type when the result is returned as JSON. This is essentially a 
bulk wrapper
-  // for the JSON result of a Spark DataFrame. All rows are returned in the 
JSON record format
-  // of `{col -> row}`.
-  message JSONBatch {
-int64 row_count = 1;
-bytes data = 2;
-  }
-
   message Metrics {
 
 repeated MetricObject metrics = 1;
diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
index 50ff08f997c..092bdd00dc1 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
@@ -18,12 +18,10 @@
 package org.apache.spark.sql.connect.service
 
 import scala.collection.JavaConverters._
-import scala.util.control.NonFatal
 
 import com.google.protobuf.ByteString
 import io.grpc.stub.StreamObserver
 
-import org.apache.spark.SparkException
 import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
 import org.apache.spark.internal.Logging
@@ -34,7 +32,6 @@ import org.apache.spark.sql.execution.{SparkPlan, 
SQLExecution}
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
AdaptiveSparkPlanHelper, QueryStageExec}
 import org.apache.spark.sql.execution.arrow.ArrowConverters
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.ThreadUtils
 
 class SparkConnectStreamHandler(responseObserver: 
StreamObserver[ExecutePlanResponse])
 extends Logging {
@@ -57,75 +54,7 @@ class SparkConnectStreamHandler(responseObserver: 
StreamObserver[ExecutePlanResp
 // Extract the plan from the request and convert it to a logical plan
 val planner = new SparkConnectPlanner(session)
 val dataframe = Dataset.ofRows(session, 
planner.transformRelation(request.getPlan.getRoot))
-try {
-  processAsArrowBatches(request.getClientId, dataframe)
-} catch {
-  case e: Exception =>
-logWarning(e.getMessage)
-processAsJsonBatches(request.getClientId, dataframe)
-}
-  }
-
-  def processAsJsonBatches(clientId: String, dataframe: DataFrame): Unit = {
-// Only process up to 10MB of data.
-val sb = new StringBuilder
-

[spark] branch master updated: [SPARK-41151][SQL] Keep built-in file `_metadata` column nullable value consistent

2022-11-21 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 77a8fb445cb [SPARK-41151][SQL] Keep built-in file `_metadata` column 
nullable value consistent
77a8fb445cb is described below

commit 77a8fb445cb099e11ed486447959b3de0a625b6d
Author: yaohua 
AuthorDate: Tue Nov 22 09:54:14 2022 +0900

[SPARK-41151][SQL] Keep built-in file `_metadata` column nullable value 
consistent

### What changes were proposed in this pull request?
In FileSourceStrategy, we add an Alias node to wrap the file metadata 
fields (e.g. file_name, file_size) in a NamedStruct 
([here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L279)).
 But `CreateNamedStruct` has an override `nullable` value `false` 
([here](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala#L44
 [...]

This PR fixes this, by changing `_metadata` column to be always not 
nullable. Rationale:
1. By definition, `_metadata` for file-based sources is always not null;
2. If users have already persisted this nullable `_metadata` somewhere, 
then it's totally fine to write non-nullable data to this nullable column.

### Why are the changes needed?
For stateful streaming, we store the schema in the state store and [check 
consistency across 
batches](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala#L47).
 To avoid state schema compatibility mismatched, we should keep nullable 
consistent in `_metadata`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
New UT

Closes #38683 from Yaohua628/spark-41151.

Authored-by: yaohua 
Signed-off-by: Jungtaek Lim 
---
 .../sql/catalyst/expressions/namedExpressions.scala |  5 +++--
 .../sql/execution/datasources/FileSourceStrategy.scala  |  6 +-
 .../execution/datasources/FileMetadataStructSuite.scala | 17 -
 3 files changed, 24 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 99e5f411bdb..8dd28e9aaae 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -464,8 +464,9 @@ object FileSourceMetadataAttribute {
 
   val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
 
-  def apply(name: String, dataType: DataType, nullable: Boolean = true): 
AttributeReference =
-AttributeReference(name, dataType, nullable,
+  def apply(name: String, dataType: DataType): AttributeReference =
+// Metadata column for file sources is always not nullable.
+AttributeReference(name, dataType, nullable = false,
   new MetadataBuilder()
 .putBoolean(METADATA_COL_ATTR_KEY, value = true)
 .putBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY, value = true).build())()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 576801d3dd5..476d6579b38 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -275,8 +275,12 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
 .get.withName(FileFormat.ROW_INDEX)
   }
 }
+// SPARK-41151: metadata column is not nullable for file sources.
+// Here, we *explicitly* enforce the not null to 
`CreateStruct(structColumns)`
+// to avoid any risk of inconsistent schema nullability
 val metadataAlias =
-  Alias(CreateStruct(structColumns), METADATA_NAME)(exprId = 
metadataStruct.exprId)
+  Alias(KnownNotNull(CreateStruct(structColumns)),
+METADATA_NAME)(exprId = metadataStruct.exprId)
 execution.ProjectExec(
   readDataColumns ++ partitionColumns :+ metadataAlias, scan)
   }.getOrElse(scan)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
index e0e208b62f1..a39a36a4f83 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/data

[spark] branch master updated: [SPARK-41196][CONNECT][INFRA][FOLLOW-UP] Change protobuf versions in CI

2022-11-21 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1734d9b1a1e [SPARK-41196][CONNECT][INFRA][FOLLOW-UP] Change protobuf 
versions in CI
1734d9b1a1e is described below

commit 1734d9b1a1e72153b72687b643ae1597ff15c2cc
Author: Ruifeng Zheng 
AuthorDate: Tue Nov 22 09:12:03 2022 +0800

[SPARK-41196][CONNECT][INFRA][FOLLOW-UP] Change protobuf versions in CI

### What changes were proposed in this pull request?
pin `protobuf==3.19.4` in tests

### Why are the changes needed?
versions were already changed in https://github.com/apache/spark/pull/38693

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
updated CI

Closes #38729 from zhengruifeng/connect_infra_protobuf.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 .github/workflows/build_and_test.yml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/build_and_test.yml 
b/.github/workflows/build_and_test.yml
index 0521270cee7..646a096b72d 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -243,7 +243,7 @@ jobs:
 - name: Install Python packages (Python 3.8)
   if: (contains(matrix.modules, 'sql') && !contains(matrix.modules, 
'sql-'))
   run: |
-python3.8 -m pip install 'numpy>=1.20.0' pyarrow pandas scipy 
unittest-xml-reporting 'grpcio==1.48.1' 'protobuf==4.21.6'
+python3.8 -m pip install 'numpy>=1.20.0' pyarrow pandas scipy 
unittest-xml-reporting 'grpcio==1.48.1' 'protobuf==3.19.4'
 python3.8 -m pip list
 # Run the tests.
 - name: Run tests
@@ -589,7 +589,7 @@ jobs:
 #   See also https://issues.apache.org/jira/browse/SPARK-38279.
 python3.9 -m pip install 'sphinx<3.1.0' mkdocs pydata_sphinx_theme 
ipython nbsphinx numpydoc 'jinja2<3.0.0' 'markupsafe==2.0.1' 'pyzmq<24.0.0'
 python3.9 -m pip install ipython_genutils # See SPARK-38517
-python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' 
pyarrow pandas 'plotly>=4.8' 'grpcio==1.48.1' 'protobuf==4.21.6' 
'mypy-protobuf==3.3.0'
+python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' 
pyarrow pandas 'plotly>=4.8' 'grpcio==1.48.1' 'protobuf==3.19.4' 
'mypy-protobuf==3.3.0'
 python3.9 -m pip install 'docutils<0.18.0' # See SPARK-39421
 apt-get update -y
 apt-get install -y ruby ruby-dev


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-41154][SQL][3.3] Incorrect relation caching for queries with time travel spec

2022-11-21 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new ace3c69ef18 [SPARK-41154][SQL][3.3] Incorrect relation caching for 
queries with time travel spec
ace3c69ef18 is described below

commit ace3c69ef18d648bb15855c40c8b7e44987dede4
Author: ulysses-you 
AuthorDate: Tue Nov 22 10:07:17 2022 +0800

[SPARK-41154][SQL][3.3] Incorrect relation caching for queries with time 
travel spec

backport https://github.com/apache/spark/pull/38687 for branch-3.3

### What changes were proposed in this pull request?

Add TimeTravelSpec to the key of relation cache in AnalysisContext.

### Why are the changes needed?

Correct the relation resolution for the same table but different 
TimeTravelSpec.

### Does this PR introduce _any_ user-facing change?

yes, bug fix

### How was this patch tested?

add test

Closes #38741 from ulysses-you/time-travel-spec-3.3.

Authored-by: ulysses-you 
Signed-off-by: Wenchen Fan 
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala   | 11 ++-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala  | 17 +
 2 files changed, 23 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 2a2fe6f2957..0c68dd8839d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -112,9 +112,9 @@ object FakeV2SessionCatalog extends TableCatalog with 
FunctionCatalog {
  * @param nestedViewDepth The nested depth in the view resolution, this 
enables us to limit the
  *depth of nested views.
  * @param maxNestedViewDepth The maximum allowed depth of nested view 
resolution.
- * @param relationCache A mapping from qualified table names to resolved 
relations. This can ensure
- *  that the table is resolved only once if a table is 
used multiple times
- *  in a query.
+ * @param relationCache A mapping from qualified table names and time travel 
spec to resolved
+ *  relations. This can ensure that the table is resolved 
only once if a table
+ *  is used multiple times in a query.
  * @param referredTempViewNames All the temp view names referred by the 
current view we are
  *  resolving. It's used to make sure the relation 
resolution is
  *  consistent between view creation and view 
resolution. For example,
@@ -128,7 +128,8 @@ case class AnalysisContext(
 catalogAndNamespace: Seq[String] = Nil,
 nestedViewDepth: Int = 0,
 maxNestedViewDepth: Int = -1,
-relationCache: mutable.Map[Seq[String], LogicalPlan] = mutable.Map.empty,
+relationCache: mutable.Map[(Seq[String], Option[TimeTravelSpec]), 
LogicalPlan] =
+  mutable.Map.empty,
 referredTempViewNames: Seq[Seq[String]] = Seq.empty,
 // 1. If we are resolving a view, this field will be restored from the 
view metadata,
 //by calling `AnalysisContext.withAnalysisContext(viewDesc)`.
@@ -1188,7 +1189,7 @@ class Analyzer(override val catalogManager: 
CatalogManager)
   lookupTempView(u.multipartIdentifier, u.isStreaming, 
timeTravelSpec.isDefined).orElse {
 expandIdentifier(u.multipartIdentifier) match {
   case CatalogAndIdentifier(catalog, ident) =>
-val key = catalog.name +: ident.namespace :+ ident.name
+val key = ((catalog.name +: ident.namespace :+ ident.name).toSeq, 
timeTravelSpec)
 AnalysisContext.get.relationCache.get(key).map(_.transform {
   case multi: MultiInstanceRelation =>
 val newRelation = multi.newInstance()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 44f97f55713..7470911c9e5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2576,6 +2576,23 @@ class DataSourceV2SQLSuite
 }
   }
 
+  test("SPARK-41154: Incorrect relation caching for queries with time travel 
spec") {
+sql("use testcat")
+val t1 = "testcat.t1"
+val t2 = "testcat.t2"
+withTable(t1, t2) {
+  sql(s"CREATE TABLE $t1 USING foo AS SELECT 1 as c")
+  sql(s"CREATE TABLE $t2 USING foo AS SELECT 2 as c")
+  assert(
+sql("""
+  |SELECT * FROM t VERSION AS OF '1'
+ 

[spark] branch master updated: [SPARK-41017][SQL][FOLLOWUP] Push Filter with both deterministic and nondeterministic predicates

2022-11-21 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0cdbda13b08 [SPARK-41017][SQL][FOLLOWUP] Push Filter with both 
deterministic and nondeterministic predicates
0cdbda13b08 is described below

commit 0cdbda13b08ad79c91d1d7d5912fb1120191dc56
Author: Wenchen Fan 
AuthorDate: Tue Nov 22 10:40:24 2022 +0800

[SPARK-41017][SQL][FOLLOWUP] Push Filter with both deterministic and 
nondeterministic predicates

### What changes were proposed in this pull request?

This PR fixes a regression caused by 
https://github.com/apache/spark/pull/38511 . For `FROM t WHERE rand() > 0.5 AND 
col = 1`, we can still push down `col = 1` because we don't guarantee the 
predicates evaluation order within a `Filter`.

This PR updates `ScanOperation` to consider this case and bring back the 
previous pushdown behavior.

### Why are the changes needed?

fix perf regression

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new tests

Closes #38746 from cloud-fan/filter.

Lead-authored-by: Wenchen Fan 
Co-authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/planning/patterns.scala | 21 -
 .../spark/sql/FileBasedDataSourceSuite.scala   | 22 --
 2 files changed, 36 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 3c35ba9b600..bbda9eb76b1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -137,11 +137,22 @@ object ScanOperation extends OperationHelper {
 val alwaysInline = 
SQLConf.get.getConf(SQLConf.COLLAPSE_PROJECT_ALWAYS_INLINE)
 val (fields, filters, child, _) = collectProjectsAndFilters(plan, 
alwaysInline)
 // `collectProjectsAndFilters` transforms the plan bottom-up, so the 
bottom-most filter are
-// placed at the beginning of `filters` list. According to the SQL 
semantic, we can only
-// push down the bottom deterministic filters.
-val filtersCanPushDown = 
filters.takeWhile(_.deterministic).flatMap(splitConjunctivePredicates)
-val filtersStayUp = filters.dropWhile(_.deterministic)
-Some((fields.getOrElse(child.output), filtersStayUp, filtersCanPushDown, 
child))
+// placed at the beginning of `filters` list. According to the SQL 
semantic, we cannot merge
+// Filters if one or more of them are nondeterministic. This means we can 
only push down the
+// bottom-most Filter, or more following deterministic Filters if the 
bottom-most Filter is
+// also deterministic.
+if (filters.isEmpty) {
+  Some((fields.getOrElse(child.output), Nil, Nil, child))
+} else if (filters.head.deterministic) {
+  val filtersCanPushDown = filters.takeWhile(_.deterministic)
+.flatMap(splitConjunctivePredicates)
+  val filtersStayUp = filters.dropWhile(_.deterministic)
+  Some((fields.getOrElse(child.output), filtersStayUp, filtersCanPushDown, 
child))
+} else {
+  val filtersCanPushDown = splitConjunctivePredicates(filters.head)
+  val filtersStayUp = filters.drop(1)
+  Some((fields.getOrElse(child.output), filtersStayUp, filtersCanPushDown, 
child))
+}
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 98cb54ccbbc..cf5f8d990f7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -30,10 +30,10 @@ import org.apache.hadoop.fs.{LocalFileSystem, Path}
 import org.apache.spark.SparkException
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT}
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
GreaterThan, Literal}
 import 
org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt,
 positiveInt}
 import org.apache.spark.sql.catalyst.plans.logical.Filter
-import org.apache.spark.sql.execution.SimpleMode
+import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.datasources.FilePartition
 import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
@@ -1074,6

[spark] branch branch-3.3 updated (ace3c69ef18 -> abc343fcab8)

2022-11-21 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a change to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


from ace3c69ef18 [SPARK-41154][SQL][3.3] Incorrect relation caching for 
queries with time travel spec
 add abc343fcab8 [SPARK-41151][SQL][3.3] Keep built-in file `_metadata` 
column nullable value consistent

No new revisions were added by this update.

Summary of changes:
 .../sql/catalyst/expressions/namedExpressions.scala |  5 +++--
 .../sql/execution/datasources/FileSourceStrategy.scala  |  6 +-
 .../execution/datasources/FileMetadataStructSuite.scala | 17 -
 3 files changed, 24 insertions(+), 4 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41213][CONNECT][PYTHON] Implement `DataFrame.__repr__` and `DataFrame.dtypes`

2022-11-21 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 55addb38f2a [SPARK-41213][CONNECT][PYTHON] Implement 
`DataFrame.__repr__` and `DataFrame.dtypes`
55addb38f2a is described below

commit 55addb38f2a9cc2521a2e7908bb8ad3d49c9a8bb
Author: Ruifeng Zheng 
AuthorDate: Tue Nov 22 15:44:28 2022 +0900

[SPARK-41213][CONNECT][PYTHON] Implement `DataFrame.__repr__` and 
`DataFrame.dtypes`

### What changes were proposed in this pull request?
Implement `DataFrame.__repr__` and `DataFrame.dtypes`

### Why are the changes needed?
For api coverage

### Does this PR introduce _any_ user-facing change?
yes

### How was this patch tested?
added UT

Closes #38735 from zhengruifeng/connect_df_repr.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/dataframe.py   | 19 ++-
 .../pyspark/sql/tests/connect/test_connect_basic.py   | 13 ++---
 2 files changed, 28 insertions(+), 4 deletions(-)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 15aa028b11b..275a6d2668d 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -115,6 +115,9 @@ class DataFrame(object):
 self._cache: Dict[str, Any] = {}
 self._session: "RemoteSparkSession" = session
 
+def __repr__(self) -> str:
+return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes))
+
 @classmethod
 def withPlan(cls, plan: plan.LogicalPlan, session: "RemoteSparkSession") 
-> "DataFrame":
 """Main initialization method used to construct a new data frame with 
a child plan."""
@@ -137,13 +140,26 @@ class DataFrame(object):
 def colRegex(self, regex: str) -> "DataFrame":
 ...
 
+@property
+def dtypes(self) -> List[Tuple[str, str]]:
+"""Returns all column names and their data types as a list.
+
+.. versionadded:: 3.4.0
+
+Returns
+---
+list
+List of columns as tuple pairs.
+"""
+return [(str(f.name), f.dataType.simpleString()) for f in 
self.schema.fields]
+
 @property
 def columns(self) -> List[str]:
 """Returns the list of columns of the current data frame."""
 if self._plan is None:
 return []
 
-return self.schema().names
+return self.schema.names
 
 def sparkSession(self) -> "RemoteSparkSession":
 """Returns Spark session that created this :class:`DataFrame`.
@@ -736,6 +752,7 @@ class DataFrame(object):
 query = self._plan.to_proto(self._session)
 return self._session._to_pandas(query)
 
+@property
 def schema(self) -> StructType:
 """Returns the schema of this :class:`DataFrame` as a 
:class:`pyspark.sql.types.StructType`.
 
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py 
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 9e7a5f2f4a5..917cce0ebb0 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -137,7 +137,7 @@ class SparkConnectTests(SparkConnectSQLTestCase):
 self.assertGreater(len(result), 0)
 
 def test_schema(self):
-schema = self.connect.read.table(self.tbl_name).schema()
+schema = self.connect.read.table(self.tbl_name).schema
 self.assertEqual(
 StructType(
 [StructField("id", LongType(), True), StructField("name", 
StringType(), True)]
@@ -333,6 +333,14 @@ class SparkConnectTests(SparkConnectSQLTestCase):
 expected = "+---+---+\n|  X|  Y|\n+---+---+\n|  1|  2|\n+---+---+\n"
 self.assertEqual(show_str, expected)
 
+def test_repr(self):
+# SPARK-41213: Test the __repr__ method
+query = """SELECT * FROM VALUES (1L, NULL), (3L, "Z") AS tab(a, b)"""
+self.assertEqual(
+self.connect.sql(query).__repr__(),
+self.spark.sql(query).__repr__(),
+)
+
 def test_explain_string(self):
 # SPARK-41122: test explain API.
 plan_str = self.connect.sql("SELECT 1").explain(extended=True)
@@ -380,8 +388,7 @@ class SparkConnectTests(SparkConnectSQLTestCase):
 col0 = (
 self.connect.range(1, 10)
 .select(col("id").alias("name", metadata={"max": 99}))
-.schema()
-.names[0]
+.schema.names[0]
 )
 self.assertEqual("name", col0)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (55addb38f2a -> 51b04406028)

2022-11-21 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 55addb38f2a [SPARK-41213][CONNECT][PYTHON] Implement 
`DataFrame.__repr__` and `DataFrame.dtypes`
 add 51b04406028 [SPARK-41209][PYTHON] Improve PySpark type inference in 
_merge_type method

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/tests/test_types.py | 46 +++---
 python/pyspark/sql/types.py|  4 +++
 2 files changed, 36 insertions(+), 14 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (51b04406028 -> 8496059966e)

2022-11-21 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 51b04406028 [SPARK-41209][PYTHON] Improve PySpark type inference in 
_merge_type method
 add 8496059966e [SPARK-41193][SQL][TESTS] Ignore `collect data with single 
partition larger than 2GB bytes array limit` in 
`DatasetLargeResultCollectingSuite`

No new revisions were added by this update.

Summary of changes:
 sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (8496059966e -> b38a1158870)

2022-11-21 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 8496059966e [SPARK-41193][SQL][TESTS] Ignore `collect data with single 
partition larger than 2GB bytes array limit` in 
`DatasetLargeResultCollectingSuite`
 add b38a1158870 [SPARK-41169][CONNECT][PYTHON] Implement `DataFrame.drop`

No new revisions were added by this update.

Summary of changes:
 .../main/protobuf/spark/connect/relations.proto|  14 ++
 .../org/apache/spark/sql/connect/dsl/package.scala |  22 +++
 .../sql/connect/planner/SparkConnectPlanner.scala  |  16 ++-
 .../connect/planner/SparkConnectProtoSuite.scala   |  17 +++
 python/pyspark/sql/connect/dataframe.py|  19 ++-
 python/pyspark/sql/connect/plan.py |  43 ++
 python/pyspark/sql/connect/proto/relations_pb2.py  | 150 +++--
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  45 +++
 .../sql/tests/connect/test_connect_basic.py|  27 
 .../sql/tests/connect/test_connect_plan_only.py|  16 +++
 10 files changed, 296 insertions(+), 73 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (b38a1158870 -> d453598a428)

2022-11-21 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from b38a1158870 [SPARK-41169][CONNECT][PYTHON] Implement `DataFrame.drop`
 add d453598a428 [SPARK-40809][CONNECT][FOLLOW-UP] Do not use Buffer to 
make Scala 2.13 test pass

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/tests/connect/test_connect_basic.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated (abc343fcab8 -> 09000408fbb)

2022-11-21 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


from abc343fcab8 [SPARK-41151][SQL][3.3] Keep built-in file `_metadata` 
column nullable value consistent
 add 09000408fbb [SPARK-41118][SQL][3.3] to_number`/`try_to_number` should 
return `null` when format is `null`

No new revisions were added by this update.

Summary of changes:
 .../expressions/numberFormatExpressions.scala  | 122 +++--
 .../expressions/StringExpressionsSuite.scala   |  11 ++
 2 files changed, 73 insertions(+), 60 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (d453598a428 -> 40b7d29e14c)

2022-11-21 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from d453598a428 [SPARK-40809][CONNECT][FOLLOW-UP] Do not use Buffer to 
make Scala 2.13 test pass
 add 40b7d29e14c [SPARK-41217][SQL] Add the error class 
`FAILED_FUNCTION_CALL`

No new revisions were added by this update.

Summary of changes:
 core/src/main/resources/error/error-classes.json   |  5 ++
 .../sql/catalyst/analysis/FunctionRegistry.scala   |  9 +--
 .../spark/sql/errors/QueryCompilationErrors.scala  | 13 ++-
 .../results/ansi/string-functions.sql.out  | 16 +++-
 .../sql-tests/results/csv-functions.sql.out| 93 +-
 .../resources/sql-tests/results/extract.sql.out| 75 +++--
 .../sql-tests/results/json-functions.sql.out   | 93 +-
 .../sql-tests/results/postgreSQL/int8.sql.out  |  2 +-
 .../sql-tests/results/string-functions.sql.out | 16 +++-
 .../results/table-valued-functions.sql.out |  2 +-
 10 files changed, 269 insertions(+), 55 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (40b7d29e14c -> a80899f8bef)

2022-11-21 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 40b7d29e14c [SPARK-41217][SQL] Add the error class 
`FAILED_FUNCTION_CALL`
 add a80899f8bef [SPARK-41206][SQL] Rename the error class 
`_LEGACY_ERROR_TEMP_1233` to `COLUMN_ALREADY_EXISTS`

No new revisions were added by this update.

Summary of changes:
 .../connect/planner/SparkConnectProtoSuite.scala   | 12 ++--
 core/src/main/resources/error/error-classes.json   | 10 +--
 .../spark/sql/catalyst/analysis/Analyzer.scala |  3 +-
 .../sql/catalyst/analysis/CheckAnalysis.scala  |  1 -
 .../spark/sql/catalyst/analysis/ResolveUnion.scala |  2 -
 .../spark/sql/errors/QueryCompilationErrors.scala  |  8 +--
 .../apache/spark/sql/util/PartitioningUtils.scala  |  3 +-
 .../org/apache/spark/sql/util/SchemaUtils.scala| 40 ---
 .../apache/spark/sql/util/SchemaUtilsSuite.scala   | 52 +++---
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  2 -
 .../spark/sql/execution/command/CommandCheck.scala |  2 +-
 .../spark/sql/execution/command/tables.scala   |  1 -
 .../apache/spark/sql/execution/command/views.scala |  3 +-
 .../sql/execution/datasources/DataSource.scala | 10 +--
 .../InsertIntoHadoopFsRelationCommand.scala|  1 -
 .../execution/datasources/PartitioningUtils.scala  |  3 +-
 .../sql/execution/datasources/jdbc/JdbcUtils.scala |  8 +--
 .../spark/sql/execution/datasources/rules.scala|  9 +--
 .../sql/execution/datasources/v2/FileTable.scala   |  6 +-
 .../sql/execution/datasources/v2/FileWrite.scala   |  3 +-
 .../spark/sql/DataFrameSetOperationsSuite.scala| 21 +++---
 .../org/apache/spark/sql/DataFrameSuite.scala  | 67 +-
 .../apache/spark/sql/NestedDataSourceSuite.scala   |  5 +-
 .../org/apache/spark/sql/SQLInsertTestSuite.scala  |  7 +-
 .../spark/sql/StatisticsCollectionSuite.scala  | 10 +--
 .../spark/sql/connector/AlterTableTests.scala  | 34 +
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 80 --
 .../connector/V2CommandsCaseSensitivitySuite.scala | 21 --
 .../spark/sql/execution/command/DDLSuite.scala | 80 +-
 .../datasources/jdbc/JdbcUtilsSuite.scala  |  6 +-
 .../sql/execution/datasources/json/JsonSuite.scala | 10 +--
 .../org/apache/spark/sql/jdbc/JDBCWriteSuite.scala | 10 +--
 .../spark/sql/sources/PartitionedWriteSuite.scala  |  9 ++-
 .../spark/sql/streaming/FileStreamSinkSuite.scala  | 10 +--
 .../sql/test/DataFrameReaderWriterSuite.scala  | 65 ++
 .../hive/execution/InsertIntoHiveDirCommand.scala  |  1 -
 .../org/apache/spark/sql/hive/InsertSuite.scala|  9 ++-
 .../spark/sql/hive/execution/HiveDDLSuite.scala| 16 +++--
 38 files changed, 326 insertions(+), 314 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org