[spark] branch master updated (40a9a6ef5b8 -> 2bc1d4d684a)

2022-11-09 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 40a9a6ef5b8 [SPARK-40877][DOC][FOLLOW-UP] Update the doc of 
`DataFrame.stat.crosstab `
 add 2bc1d4d684a [SPARK-40852][CONNECT][PYTHON][FOLLOWUP] Make `Summary` a 
separate proto plan

No new revisions were added by this update.

Summary of changes:
 .../main/protobuf/spark/connect/relations.proto|  36 +++---
 .../org/apache/spark/sql/connect/dsl/package.scala |  10 +-
 .../sql/connect/planner/SparkConnectPlanner.scala  |  20 +--
 python/pyspark/sql/connect/dataframe.py|   2 +-
 python/pyspark/sql/connect/plan.py |  26 ++--
 python/pyspark/sql/connect/proto/relations_pb2.py  | 134 ++---
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  87 ++---
 .../sql/tests/connect/test_connect_plan_only.py|   4 +-
 8 files changed, 146 insertions(+), 173 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-40877][DOC][FOLLOW-UP] Update the doc of `DataFrame.stat.crosstab `

2022-11-09 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 40a9a6ef5b8 [SPARK-40877][DOC][FOLLOW-UP] Update the doc of 
`DataFrame.stat.crosstab `
40a9a6ef5b8 is described below

commit 40a9a6ef5b89f0c3d19db4a43b8a73decaa173c3
Author: Ruifeng Zheng 
AuthorDate: Thu Nov 10 15:42:19 2022 +0800

[SPARK-40877][DOC][FOLLOW-UP] Update the doc of `DataFrame.stat.crosstab `

### What changes were proposed in this pull request?
remove the outdated comments

### Why are the changes needed?
the limitations are not true after 
[reimplementation](https://github.com/apache/spark/pull/38340)

### Does this PR introduce _any_ user-facing change?
yes

### How was this patch tested?
doc - only

Closes #38579 from zhengruifeng/doc_crosstab.

Lead-authored-by: Ruifeng Zheng 
Co-authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/dataframe.py| 3 +--
 .../src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala   | 2 --
 2 files changed, 1 insertion(+), 4 deletions(-)

diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 3c787f8900f..6d5014918bf 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -4217,8 +4217,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 def crosstab(self, col1: str, col2: str) -> "DataFrame":
 """
 Computes a pair-wise frequency table of the given columns. Also known 
as a contingency
-table. The number of distinct values for each column should be less 
than 1e4. At most 1e6
-non-zero pair frequencies will be returned.
+table.
 The first column of each row will be the distinct values of `col1` and 
the column names
 will be the distinct values of `col2`. The name of the first column 
will be `$col1_$col2`.
 Pairs that have no occurrences will have zero as their counts.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
index efd430633d7..7511c21fa76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
@@ -181,8 +181,6 @@ final class DataFrameStatFunctions private[sql](df: 
DataFrame) {
 
   /**
* Computes a pair-wise frequency table of the given columns. Also known as 
a contingency table.
-   * The number of distinct values for each column should be less than 1e4. At 
most 1e6 non-zero
-   * pair frequencies will be returned.
* The first column of each row will be the distinct values of `col1` and 
the column names will
* be the distinct values of `col2`. The name of the first column will be 
`col1_col2`. Counts
* will be returned as `Long`s. Pairs that have no occurrences will have 
zero as their counts.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (110b5162df6 -> 6308f546c10)

2022-11-09 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 110b5162df6 [SPARK-41034][CONNECT][PYTHON] Connect DataFrame should 
require a RemoteSparkSession
 add 6308f546c10 [SPARK-40815][SQL][FOLLOW-UP] Fix record reader in 
DelegateSymlinkTextInputFormat to avoid Hive ExecMapper.getDone() check

No new revisions were added by this update.

Summary of changes:
 .../hadoop/hive/ql/io/DelegateSymlinkTextInputFormat.java | 15 +--
 .../sql/hive/execution/HiveSerDeReadWriteSuite.scala  | 13 ++---
 2 files changed, 19 insertions(+), 9 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (381b67ba004 -> 110b5162df6)

2022-11-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 381b67ba004 [SPARK-41076][BUILD][CONNECT] Upgrade `protobuf` to 3.21.9
 add 110b5162df6 [SPARK-41034][CONNECT][PYTHON] Connect DataFrame should 
require a RemoteSparkSession

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/connect/column.py   | 12 +++---
 python/pyspark/sql/connect/dataframe.py| 16 
 python/pyspark/sql/connect/function_builder.py |  2 +-
 python/pyspark/sql/connect/plan.py | 44 ++
 .../connect/test_connect_column_expressions.py |  7 ++--
 .../sql/tests/connect/test_connect_select_ops.py   | 13 ---
 python/pyspark/testing/connectutils.py |  7 
 7 files changed, 53 insertions(+), 48 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (865a3ded2ea -> 381b67ba004)

2022-11-09 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 865a3ded2ea [SPARK-38959][SQL][FOLLOWUP] Optimizer batch 
`PartitionPruning` should optimize subqueries
 add 381b67ba004 [SPARK-41076][BUILD][CONNECT] Upgrade `protobuf` to 3.21.9

No new revisions were added by this update.

Summary of changes:
 connector/protobuf/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-38959][SQL][FOLLOWUP] Optimizer batch `PartitionPruning` should optimize subqueries

2022-11-09 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 865a3ded2ea [SPARK-38959][SQL][FOLLOWUP] Optimizer batch 
`PartitionPruning` should optimize subqueries
865a3ded2ea is described below

commit 865a3ded2ea1ca86be93df58205882bc509b98cd
Author: Wenchen Fan 
AuthorDate: Thu Nov 10 13:45:49 2022 +0800

[SPARK-38959][SQL][FOLLOWUP] Optimizer batch `PartitionPruning` should 
optimize subqueries

### What changes were proposed in this pull request?

This is a followup to https://github.com/apache/spark/pull/36304 to 
simplify `RowLevelOperationRuntimeGroupFiltering`. It does 3 things:
1. run `OptimizeSubqueries` in the batch `PartitionPruning`, so that 
`RowLevelOperationRuntimeGroupFiltering` does not need to invoke it manually.
2. skip dpp subquery in `OptimizeSubqueries`, to avoid the issue fixed by 
https://github.com/apache/spark/pull/33664
3. `RowLevelOperationRuntimeGroupFiltering` creates `InSubquery` instead of 
`DynamicPruningSubquery`, so that it can be optimized by `OptimizeSubqueries` 
later. This also avoids unnecessary planning overhead of 
`DynamicPruningSubquery`, as there is no join and we can only run it as a 
subquery.

### Why are the changes needed?

code simplification

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #38557 from cloud-fan/help.

Lead-authored-by: Wenchen Fan 
Co-authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  3 +++
 .../apache/spark/sql/execution/SparkOptimizer.scala|  3 ++-
 .../RowLevelOperationRuntimeGroupFiltering.scala   | 18 +++---
 3 files changed, 12 insertions(+), 12 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index afbf7302727..2bef03d633a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -320,6 +320,9 @@ abstract class Optimizer(catalogManager: CatalogManager)
 }
 def apply(plan: LogicalPlan): LogicalPlan = 
plan.transformAllExpressionsWithPruning(
   _.containsPattern(PLAN_EXPRESSION), ruleId) {
+  // Do not optimize DPP subquery, as it was created from optimized plan 
and we should not
+  // optimize it again, to save optimization time and avoid breaking 
broadcast/subquery reuse.
+  case d: DynamicPruningSubquery => d
   case s: SubqueryExpression =>
 val Subquery(newPlan, _) = 
Optimizer.this.execute(Subquery.fromExpression(s))
 // At this point we have an optimized subquery plan that we are going 
to attach
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index 017d1f937c3..9624bf1fa9f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -51,7 +51,8 @@ class SparkOptimizer(
 Batch("Optimize Metadata Only Query", Once, 
OptimizeMetadataOnlyQuery(catalog)) :+
 Batch("PartitionPruning", Once,
   PartitionPruning,
-  RowLevelOperationRuntimeGroupFiltering(OptimizeSubqueries)) :+
+  RowLevelOperationRuntimeGroupFiltering,
+  OptimizeSubqueries) :+
 Batch("InjectRuntimeFilter", FixedPoint(1),
   InjectRuntimeFilter) :+
 Batch("MergeScalarSubqueries", Once,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
index d9dad43532e..bb5edc94fa5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
@@ -17,10 +17,10 @@
 
 package org.apache.spark.sql.execution.dynamicpruning
 
-import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
DynamicPruningSubquery, Expression, PredicateHelper, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
DynamicPruningExpression, Expression, InSubquery, ListQuery, PredicateHelper, 
V2ExpressionUtils}
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.planning.GroupBasedRowLevelOperation
-import 

[spark] branch master updated: [SPARK-41078][CONNECT] Implement DataFrame `withColumnsRenamed` in Connect proto

2022-11-09 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new db84869d184 [SPARK-41078][CONNECT] Implement DataFrame 
`withColumnsRenamed` in Connect proto
db84869d184 is described below

commit db84869d184680a06e8b030fd750f4aeca6ca29a
Author: Rui Wang 
AuthorDate: Thu Nov 10 13:05:02 2022 +0800

[SPARK-41078][CONNECT] Implement DataFrame `withColumnsRenamed` in Connect 
proto

### What changes were proposed in this pull request?

`RenameColumns` proto is added by 
https://github.com/apache/spark/pull/38475, DataFrame `withColumnsRenamed` 
should use a different proto. This PR:
1. Add a proto for `withColumnsRenamed`.
2. Rename `RenameColumns`.

### Why are the changes needed?

Improve API coverage.

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

UT

Closes #38587 from amaliujia/withColumnsRenamed.

Authored-by: Rui Wang 
Signed-off-by: Wenchen Fan 
---
 .../main/protobuf/spark/connect/relations.proto|  23 +++-
 .../org/apache/spark/sql/connect/dsl/package.scala |  15 ++-
 .../sql/connect/planner/SparkConnectPlanner.scala  |  17 ++-
 .../connect/planner/SparkConnectProtoSuite.scala   |  22 +++-
 python/pyspark/sql/connect/proto/relations_pb2.py  | 134 +++--
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  87 +++--
 6 files changed, 214 insertions(+), 84 deletions(-)

diff --git a/connector/connect/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/src/main/protobuf/spark/connect/relations.proto
index cce9f3b939e..8f6ebf1984a 100644
--- a/connector/connect/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/src/main/protobuf/spark/connect/relations.proto
@@ -47,7 +47,8 @@ message Relation {
 Range range = 15;
 SubqueryAlias subquery_alias = 16;
 Repartition repartition = 17;
-RenameColumns rename_columns = 18;
+RenameColumnsBySameLengthNames rename_columns_by_same_length_names = 18;
+RenameColumnsByNameToNameMap rename_columns_by_name_to_name_map = 19;
 
 StatFunction stat_function = 100;
 
@@ -275,8 +276,8 @@ message StatFunction {
   }
 }
 
-// Rename columns on the input relation.
-message RenameColumns {
+// Rename columns on the input relation by the same length of names.
+message RenameColumnsBySameLengthNames {
   // Required. The input relation.
   Relation input = 1;
 
@@ -286,3 +287,19 @@ message RenameColumns {
   // of this field. If this is not true, an exception will be returned.
   repeated string column_names = 2;
 }
+
+
+// Rename columns on the input relation by a map with name to name mapping.
+message RenameColumnsByNameToNameMap {
+  // Required. The input relation.
+  Relation input = 1;
+
+
+  // Required.
+  //
+  // Renaming column names of input relation from A to B where A is the map key
+  // and B is the map value. This is a no-op if schema doesn't contain any A. 
It
+  // does not require that all input relation column names to present as keys.
+  // duplicated B are not allowed.
+  map rename_columns_map = 2;
+}
diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index ec14333fdc3..7405d43e86b 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -482,13 +482,24 @@ package object dsl {
   def toDF(columnNames: String*): Relation =
 Relation
   .newBuilder()
-  .setRenameColumns(
-RenameColumns
+  .setRenameColumnsBySameLengthNames(
+RenameColumnsBySameLengthNames
   .newBuilder()
   .setInput(logicalPlan)
   .addAllColumnNames(columnNames.asJava))
   .build()
 
+  def withColumnsRenamed(renameColumnsMap: Map[String, String]): Relation 
= {
+Relation
+  .newBuilder()
+  .setRenameColumnsByNameToNameMap(
+RenameColumnsByNameToNameMap
+  .newBuilder()
+  .setInput(logicalPlan)
+  .putAllRenameColumnsMap(renameColumnsMap.asJava))
+  .build()
+  }
+
   private def createSetOperation(
   left: Relation,
   right: Relation,
diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index cb0af99e18b..ac25e978582 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 

[spark] branch master updated: [SPARK-41046][CONNECT] Support CreateView in Connect DSL

2022-11-09 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f5101319c1a [SPARK-41046][CONNECT] Support CreateView in Connect DSL
f5101319c1a is described below

commit f5101319c1a83a754d899e10a367356af069ca66
Author: Rui Wang 
AuthorDate: Thu Nov 10 13:02:45 2022 +0800

[SPARK-41046][CONNECT] Support CreateView in Connect DSL

### What changes were proposed in this pull request?

This PR supports creating global temp view or local temp view in Connect 
DSL.

In proto, it is modeled as a command which will be executed immediately on 
the server side.

### Why are the changes needed?

Improve API coverage.

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

UT

Closes #38566 from amaliujia/create_view_api.

Authored-by: Rui Wang 
Signed-off-by: Wenchen Fan 
---
 .../src/main/protobuf/spark/connect/commands.proto | 19 
 .../command/SparkConnectCommandPlanner.scala   | 32 +
 .../org/apache/spark/sql/connect/dsl/package.scala | 13 ++
 .../planner/SparkConnectCommandPlannerSuite.scala  | 19 
 python/pyspark/sql/connect/proto/commands_pb2.py   | 30 ++--
 python/pyspark/sql/connect/proto/commands_pb2.pyi  | 54 +-
 6 files changed, 152 insertions(+), 15 deletions(-)

diff --git a/connector/connect/src/main/protobuf/spark/connect/commands.proto 
b/connector/connect/src/main/protobuf/spark/connect/commands.proto
index 79c6cffdf60..086d4d0cc92 100644
--- a/connector/connect/src/main/protobuf/spark/connect/commands.proto
+++ b/connector/connect/src/main/protobuf/spark/connect/commands.proto
@@ -31,6 +31,7 @@ message Command {
   oneof command_type {
 CreateScalarFunction create_function = 1;
 WriteOperation write_operation = 2;
+CreateDataFrameViewCommand create_dataframe_view = 3;
   }
 }
 
@@ -65,6 +66,24 @@ message CreateScalarFunction {
   }
 }
 
+// A command that can create DataFrame global temp view or local temp view.
+message CreateDataFrameViewCommand {
+  // Required. The relation that this view will be built on.
+  Relation input = 1;
+
+  // Required. View name.
+  string name = 2;
+
+  // Required. Whether this is global temp view or local temp view.
+  bool is_global = 3;
+
+  // Required.
+  //
+  // If true, and if the view already exists, updates it; if false, and if the 
view
+  // already exists, throws exception.
+  bool replace = 4;
+}
+
 // As writes are not directly handled during analysis and planning, they are 
modeled as commands.
 message WriteOperation {
   // The output of the `input` relation will be persisted according to the 
options.
diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala
index 80c36a4773e..11090976c7f 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala
@@ -25,7 +25,11 @@ import org.apache.spark.api.python.{PythonEvalType, 
SimplePythonFunction}
 import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.WriteOperation
 import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView}
+import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.connect.planner.{DataTypeProtoConverter, 
SparkConnectPlanner}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.command.CreateViewCommand
 import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
 import org.apache.spark.sql.types.StringType
 
@@ -45,6 +49,8 @@ class SparkConnectCommandPlanner(session: SparkSession, 
command: proto.Command)
 handleCreateScalarFunction(command.getCreateFunction)
   case proto.Command.CommandTypeCase.WRITE_OPERATION =>
 handleWriteOperation(command.getWriteOperation)
+  case proto.Command.CommandTypeCase.CREATE_DATAFRAME_VIEW =>
+handleCreateViewCommand(command.getCreateDataframeView)
   case _ => throw new UnsupportedOperationException(s"$command not 
supported.")
 }
   }
@@ -79,6 +85,32 @@ class SparkConnectCommandPlanner(session: SparkSession, 
command: proto.Command)
 session.udf.registerPython(cf.getPartsList.asScala.head, udf)
   }
 
+  def handleCreateViewCommand(createView: proto.CreateDataFrameViewCommand): 
Unit = {
+val viewType = if (createView.getIsGlobal) GlobalTempView else 
LocalTempView
+
+val tableIdentifier =
+  try {
+   

[spark] branch master updated: [SPARK-41010][CONNECT][PYTHON] Complete Support for Except and Intersect in Python client

2022-11-09 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9dc39e199de [SPARK-41010][CONNECT][PYTHON] Complete Support for Except 
and Intersect in Python client
9dc39e199de is described below

commit 9dc39e199de645f60e115267fba2fae782ab53f1
Author: Rui Wang 
AuthorDate: Thu Nov 10 12:11:40 2022 +0800

[SPARK-41010][CONNECT][PYTHON] Complete Support for Except and Intersect in 
Python client

### What changes were proposed in this pull request?

1. Add support for intersect and except.
2. Unify union, intersect and except into `SetOperation`.

### Why are the changes needed?

Improve API coverage.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT

Closes #38506 from amaliujia/except_python.

Authored-by: Rui Wang 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/connect/dataframe.py| 82 +-
 python/pyspark/sql/connect/plan.py | 38 +++---
 .../sql/tests/connect/test_connect_plan_only.py| 22 ++
 3 files changed, 132 insertions(+), 10 deletions(-)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index c6877707ad2..ccd826cd476 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -389,7 +389,9 @@ class DataFrame(object):
 def unionAll(self, other: "DataFrame") -> "DataFrame":
 if other._plan is None:
 raise ValueError("Argument to Union does not contain a valid 
plan.")
-return DataFrame.withPlan(plan.UnionAll(self._plan, other._plan), 
session=self._session)
+return DataFrame.withPlan(
+plan.SetOperation(self._plan, other._plan, "union", is_all=True), 
session=self._session
+)
 
 def unionByName(self, other: "DataFrame", allowMissingColumns: bool = 
False) -> "DataFrame":
 """Returns a new :class:`DataFrame` containing union of rows in this 
and another
@@ -415,7 +417,83 @@ class DataFrame(object):
 if other._plan is None:
 raise ValueError("Argument to UnionByName does not contain a valid 
plan.")
 return DataFrame.withPlan(
-plan.UnionAll(self._plan, other._plan, allowMissingColumns), 
session=self._session
+plan.SetOperation(
+self._plan, other._plan, "union", is_all=True, 
by_name=allowMissingColumns
+),
+session=self._session,
+)
+
+def exceptAll(self, other: "DataFrame") -> "DataFrame":
+"""Return a new :class:`DataFrame` containing rows in this 
:class:`DataFrame` but
+not in another :class:`DataFrame` while preserving duplicates.
+
+This is equivalent to `EXCEPT ALL` in SQL.
+As standard in SQL, this function resolves columns by position (not by 
name).
+
+.. versionadded:: 3.4.0
+
+Parameters
+--
+other : :class:`DataFrame`
+The other :class:`DataFrame` to compare to.
+
+Returns
+---
+:class:`DataFrame`
+"""
+return DataFrame.withPlan(
+plan.SetOperation(self._plan, other._plan, "except", is_all=True), 
session=self._session
+)
+
+def intersect(self, other: "DataFrame") -> "DataFrame":
+"""Return a new :class:`DataFrame` containing rows only in
+both this :class:`DataFrame` and another :class:`DataFrame`.
+Note that any duplicates are removed. To preserve duplicates
+use :func:`intersectAll`.
+
+.. versionadded:: 3.4.0
+
+Parameters
+--
+other : :class:`DataFrame`
+Another :class:`DataFrame` that needs to be combined.
+
+Returns
+---
+:class:`DataFrame`
+Combined DataFrame.
+
+Notes
+-
+This is equivalent to `INTERSECT` in SQL.
+"""
+return DataFrame.withPlan(
+plan.SetOperation(self._plan, other._plan, "intersect", 
is_all=False),
+session=self._session,
+)
+
+def intersectAll(self, other: "DataFrame") -> "DataFrame":
+"""Return a new :class:`DataFrame` containing rows in both this 
:class:`DataFrame`
+and another :class:`DataFrame` while preserving duplicates.
+
+This is equivalent to `INTERSECT ALL` in SQL. As standard in SQL, this 
function
+resolves columns by position (not by name).
+
+.. versionadded:: 3.4.0
+
+Parameters
+--
+other : :class:`DataFrame`
+Another :class:`DataFrame` that needs to be combined.
+
+Returns
+---
+:class:`DataFrame`
+Combined DataFrame.
+  

[spark] branch master updated: [SPARK-41038][SQL] Rename `MULTI_VALUE_SUBQUERY_ERROR` to `SCALAR_SUBQUERY_TOO_MANY_ROWS`

2022-11-09 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0205478b9d3 [SPARK-41038][SQL] Rename `MULTI_VALUE_SUBQUERY_ERROR` to 
`SCALAR_SUBQUERY_TOO_MANY_ROWS`
0205478b9d3 is described below

commit 0205478b9d35d62450fd7c9ade520087fd2979a7
Author: itholic 
AuthorDate: Wed Nov 9 19:14:32 2022 +0300

[SPARK-41038][SQL] Rename `MULTI_VALUE_SUBQUERY_ERROR` to 
`SCALAR_SUBQUERY_TOO_MANY_ROWS`

### What changes were proposed in this pull request?

This PR proposes to rename the `MULTI_VALUE_SUBQUERY_ERROR` to 
`SCALAR_SUBQUERY_TOO_MANY_ROWS`.

### Why are the changes needed?

The current error class name `MULTI_VALUE_SUBQUERY_ERROR` is not clear 
enough to brief the error situation.

`SCALAR_SUBQUERY_TOO_MANY_ROWS` would be more readable since the "scalar 
subquery" is the industrial term.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?
```
./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*"
```

Closes #38551 from itholic/SPARK-41038.

Authored-by: itholic 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json   | 10 +-
 .../org/apache/spark/sql/errors/QueryExecutionErrors.scala |  2 +-
 .../subquery/scalar-subquery/scalar-subquery-select.sql.out|  2 +-
 .../org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala  |  4 ++--
 .../apache/spark/sql/errors/QueryExecutionErrorsSuite.scala|  4 ++--
 5 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 9c914b86bb1..7c33c1059ae 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -690,11 +690,6 @@
   "Not allowed to implement multiple UDF interfaces, UDF class "
 ]
   },
-  "MULTI_VALUE_SUBQUERY_ERROR" : {
-"message" : [
-  "More than one row returned by a subquery used as an expression."
-]
-  },
   "NON_LAST_MATCHED_CLAUSE_OMIT_CONDITION" : {
 "message" : [
   "When there are more than one MATCHED clauses in a MERGE statement, only 
the last MATCHED clause can omit the condition."
@@ -878,6 +873,11 @@
 ],
 "sqlState" : "42000"
   },
+  "SCALAR_SUBQUERY_TOO_MANY_ROWS" : {
+"message" : [
+  "More than one row returned by a subquery used as an expression."
+]
+  },
   "SCHEMA_ALREADY_EXISTS" : {
 "message" : [
   "Cannot create schema  because it already exists.",
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 73664e64c22..828f52fe71d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2766,7 +2766,7 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
 
   def multipleRowSubqueryError(context: SQLQueryContext): Throwable = {
 new SparkException(
-  errorClass = "MULTI_VALUE_SUBQUERY_ERROR",
+  errorClass = "SCALAR_SUBQUERY_TOO_MANY_ROWS",
   messageParameters = Map.empty,
   cause = null,
   context = getQueryContext(context),
diff --git 
a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-select.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-select.sql.out
index 38ab365ef69..0012251d7eb 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-select.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-select.sql.out
@@ -424,7 +424,7 @@ struct<>
 -- !query output
 org.apache.spark.SparkException
 {
-  "errorClass" : "MULTI_VALUE_SUBQUERY_ERROR",
+  "errorClass" : "SCALAR_SUBQUERY_TOO_MANY_ROWS",
   "queryContext" : [ {
 "objectType" : "",
 "objectName" : "",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index c9c66395a3b..25faa34b697 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2676,7 +2676,7 @@ class DataSourceV2SQLSuiteV1Filter extends 
DataSourceV2SQLSuite with AlterTableT
 exception = intercept[SparkException] {
   sql(s"SELECT * FROM t TIMESTAMP AS OF ($subquery4)").collect()
 },
-errorClass = 

[spark] branch master updated: [SPARK-40798][SQL][TESTS][FOLLOW-UP] Improve test coverage

2022-11-09 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ef545d6ce57 [SPARK-40798][SQL][TESTS][FOLLOW-UP] Improve test coverage
ef545d6ce57 is described below

commit ef545d6ce579db1070d260426ab8cbf6e2853c28
Author: ulysses-you 
AuthorDate: Wed Nov 9 18:07:40 2022 +0300

[SPARK-40798][SQL][TESTS][FOLLOW-UP] Improve test coverage

### What changes were proposed in this pull request?

Add ansi test in 
`org.apache.spark.sql.execution.command.v2.AlterTableAddPartitionSuite`

### Why are the changes needed?

Improve test coverage with both ansi on/off

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Pass CI

Closes #38580 from ulysses-you/test.

Authored-by: ulysses-you 
Signed-off-by: Max Gekk 
---
 .../command/v2/AlterTableAddPartitionSuite.scala   | 30 +-
 1 file changed, 24 insertions(+), 6 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala
index c33d9b0101a..09ebd4af4ec 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.command.v2
 
+import org.apache.spark.SparkNumberFormatException
 import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
 import org.apache.spark.sql.execution.command
@@ -129,12 +130,29 @@ class AlterTableAddPartitionSuite
 withNamespaceAndTable("ns", "tbl") { t =>
   sql(s"CREATE TABLE $t (c int) $defaultUsing PARTITIONED BY (p int)")
 
-  withSQLConf(
-  SQLConf.SKIP_TYPE_VALIDATION_ON_ALTER_PARTITION.key -> "true",
-  SQLConf.ANSI_ENABLED.key -> "false") {
-sql(s"ALTER TABLE $t ADD PARTITION (p='aaa')")
-checkPartitions(t, Map("p" -> defaultPartitionName))
-sql(s"ALTER TABLE $t DROP PARTITION (p=null)")
+  withSQLConf(SQLConf.SKIP_TYPE_VALIDATION_ON_ALTER_PARTITION.key -> 
"true") {
+withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") {
+  checkError(
+exception = intercept[SparkNumberFormatException] {
+  sql(s"ALTER TABLE $t ADD PARTITION (p='aaa')")
+},
+errorClass = "CAST_INVALID_INPUT",
+parameters = Map(
+  "ansiConfig" -> "\"spark.sql.ansi.enabled\"",
+  "expression" -> "'aaa'",
+  "sourceType" -> "\"STRING\"",
+  "targetType" -> "\"INT\""),
+context = ExpectedContext(
+  fragment = s"ALTER TABLE $t ADD PARTITION (p='aaa')",
+  start = 0,
+  stop = 35 + t.length))
+}
+
+withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") {
+  sql(s"ALTER TABLE $t ADD PARTITION (p='aaa')")
+  checkPartitions(t, Map("p" -> defaultPartitionName))
+  sql(s"ALTER TABLE $t DROP PARTITION (p=null)")
+}
   }
 }
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [MINOR][DOCS] Fix links in the sql-pyspark-pandas-with-arrow

2022-11-09 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 74bf9fe8eb5 [MINOR][DOCS] Fix links in the 
sql-pyspark-pandas-with-arrow
74bf9fe8eb5 is described below

commit 74bf9fe8eb5d5512de929e87f510ed6e64d6063e
Author: panbingkun 
AuthorDate: Wed Nov 9 07:07:32 2022 -0600

[MINOR][DOCS] Fix links in the sql-pyspark-pandas-with-arrow

### What changes were proposed in this pull request?
The pr aims to fix links in the sql-pyspark-pandas-with-arrow.

### Why are the changes needed?
https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html
https://user-images.githubusercontent.com/15246973/200457446-250e8c9b-3712-4e79-b6e9-6bdabf322206.png;>

when click [this 
page](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html),
 will jump to 
https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html, 
as follow:
https://user-images.githubusercontent.com/15246973/200457489-2561b9df-3107-4e19-960d-881f31851f82.png;>

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Manually verified.

Closes #38545 from panbingkun/arrow_pandas_doc.

Authored-by: panbingkun 
Signed-off-by: Sean Owen 
(cherry picked from commit 70bc5dfc96810e47f11f0f39054b1ceb61066f77)
Signed-off-by: Sean Owen 
---
 docs/sql-pyspark-pandas-with-arrow.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/sql-pyspark-pandas-with-arrow.md 
b/docs/sql-pyspark-pandas-with-arrow.md
index 6895376dfb3..7697588d1a8 100644
--- a/docs/sql-pyspark-pandas-with-arrow.md
+++ b/docs/sql-pyspark-pandas-with-arrow.md
@@ -19,4 +19,4 @@ license: |
   limitations under the License.
 ---
 
-The Arrow usage guide is now archived on [this 
page](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html).
+The Arrow usage guide is now archived on [this 
page](https://spark.apache.org/docs/latest/api/python/user_guide/sql/arrow_pandas.html).


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [MINOR][DOCS] Fix links in the sql-pyspark-pandas-with-arrow

2022-11-09 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 70bc5dfc968 [MINOR][DOCS] Fix links in the 
sql-pyspark-pandas-with-arrow
70bc5dfc968 is described below

commit 70bc5dfc96810e47f11f0f39054b1ceb61066f77
Author: panbingkun 
AuthorDate: Wed Nov 9 07:07:32 2022 -0600

[MINOR][DOCS] Fix links in the sql-pyspark-pandas-with-arrow

### What changes were proposed in this pull request?
The pr aims to fix links in the sql-pyspark-pandas-with-arrow.

### Why are the changes needed?
https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html
https://user-images.githubusercontent.com/15246973/200457446-250e8c9b-3712-4e79-b6e9-6bdabf322206.png;>

when click [this 
page](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html),
 will jump to 
https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html, 
as follow:
https://user-images.githubusercontent.com/15246973/200457489-2561b9df-3107-4e19-960d-881f31851f82.png;>

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Manually verified.

Closes #38545 from panbingkun/arrow_pandas_doc.

Authored-by: panbingkun 
Signed-off-by: Sean Owen 
---
 docs/sql-pyspark-pandas-with-arrow.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/sql-pyspark-pandas-with-arrow.md 
b/docs/sql-pyspark-pandas-with-arrow.md
index 6895376dfb3..7697588d1a8 100644
--- a/docs/sql-pyspark-pandas-with-arrow.md
+++ b/docs/sql-pyspark-pandas-with-arrow.md
@@ -19,4 +19,4 @@ license: |
   limitations under the License.
 ---
 
-The Arrow usage guide is now archived on [this 
page](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html).
+The Arrow usage guide is now archived on [this 
page](https://spark.apache.org/docs/latest/api/python/user_guide/sql/arrow_pandas.html).


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (8084318f25b -> e3989ca35d2)

2022-11-09 Thread yumwang
This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 8084318f25b [SPARK-41074][DOC] Add option `--upgrade` in dependency 
installation command
 add e3989ca35d2 [SPARK-41071][BUILD] Remove `MaxMetaspaceSize` option from 
`make-distribution.sh` to make it run successfully

No new revisions were added by this update.

Summary of changes:
 dev/make-distribution.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41074][DOC] Add option `--upgrade` in dependency installation command

2022-11-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8084318f25b [SPARK-41074][DOC] Add option `--upgrade` in dependency 
installation command
8084318f25b is described below

commit 8084318f25bca0f66de404ea8c258279f1012974
Author: Ruifeng Zheng 
AuthorDate: Wed Nov 9 20:06:02 2022 +0900

[SPARK-41074][DOC] Add option `--upgrade` in dependency installation command

### What changes were proposed in this pull request?
Add option `--upgrade` in dependency installation command

### Why are the changes needed?

for the packages whose version are not pinned, `pip install -r 
dev/requirements.txt` can not upgrade them

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
manually check

Closes #38581 from zhengruifeng/infra_pip.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 docs/README.md  | 2 +-
 python/docs/source/development/contributing.rst | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/docs/README.md b/docs/README.md
index 27238964f0a..4b788dbc79d 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -61,7 +61,7 @@ See also https://issues.apache.org/jira/browse/SPARK-35375.
 -->
 Run the following command from $SPARK_HOME:
 ```sh
-$ sudo pip install -r dev/requirements.txt
+$ sudo pip install --upgrade -r dev/requirements.txt
 ```
 
 ### R API Documentation (Optional)
diff --git a/python/docs/source/development/contributing.rst 
b/python/docs/source/development/contributing.rst
index 3d388e91012..88f7b3a7b43 100644
--- a/python/docs/source/development/contributing.rst
+++ b/python/docs/source/development/contributing.rst
@@ -130,7 +130,7 @@ If you are using Conda, the development environment can be 
set as follows.
 # Python 3.7+ is required
 conda create --name pyspark-dev-env python=3.9
 conda activate pyspark-dev-env
-pip install -r dev/requirements.txt
+pip install --upgrade -r dev/requirements.txt
 
 Once it is set up, make sure you switch to `pyspark-dev-env` before starting 
the development:
 
@@ -147,7 +147,7 @@ With Python 3.7+, pip can be used as below to install and 
set up the development
 
 .. code-block:: bash
 
-pip install -r dev/requirements.txt
+pip install --upgrade -r dev/requirements.txt
 
 Now, you can start developing and `running the tests `_.
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41026][CONNECT][PYTHON][FOLLOW-UP] Add Coalesce and Repartition API to Python client

2022-11-09 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 736be3116c7 [SPARK-41026][CONNECT][PYTHON][FOLLOW-UP] Add Coalesce and 
Repartition API to Python client
736be3116c7 is described below

commit 736be3116c7c13c82eac91f426ee6e96753c9cf5
Author: Rui Wang 
AuthorDate: Wed Nov 9 17:35:43 2022 +0800

[SPARK-41026][CONNECT][PYTHON][FOLLOW-UP] Add Coalesce and Repartition API 
to Python client

### What changes were proposed in this pull request?

Following up https://github.com/apache/spark/pull/38529, with 
`Reparitition` proto we can support `Coalesce` and `Repartition` API in Python 
client.

### Why are the changes needed?

Improve API coverage

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

UT

Closes #38549 from amaliujia/support_coalesce_in_python.

Authored-by: Rui Wang 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/connect/dataframe.py| 49 +-
 python/pyspark/sql/connect/plan.py | 34 +++
 .../sql/tests/connect/test_connect_plan_only.py| 17 
 3 files changed, 98 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 64b2e54f0ef..c6877707ad2 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -156,8 +156,53 @@ class DataFrame(object):
 def crossJoin(self, other: "DataFrame") -> "DataFrame":
 ...
 
-def coalesce(self, num_partitions: int) -> "DataFrame":
-...
+def coalesce(self, numPartitions: int) -> "DataFrame":
+"""
+Returns a new :class:`DataFrame` that has exactly `numPartitions` 
partitions.
+
+Coalesce does not trigger a shuffle.
+
+.. versionadded:: 3.4.0
+
+Parameters
+--
+numPartitions : int
+specify the target number of partitions
+
+Returns
+---
+:class:`DataFrame`
+"""
+if not numPartitions > 0:
+raise ValueError("numPartitions must be positive.")
+return DataFrame.withPlan(
+plan.Repartition(self._plan, num_partitions=numPartitions, 
shuffle=False),
+self._session,
+)
+
+def repartition(self, numPartitions: int) -> "DataFrame":
+"""
+Returns a new :class:`DataFrame` that has exactly `numPartitions` 
partitions.
+
+Repartition will shuffle source partition into partitions specified by 
numPartitions.
+
+.. versionadded:: 3.4.0
+
+Parameters
+--
+numPartitions : int
+specify the target number of partitions
+
+Returns
+---
+:class:`DataFrame`
+"""
+if not numPartitions > 0:
+raise ValueError("numPartitions must be positive.")
+return DataFrame.withPlan(
+plan.Repartition(self._plan, num_partitions=numPartitions, 
shuffle=True),
+self._session,
+)
 
 def describe(self, cols: List[ColumnRef]) -> Any:
 ...
diff --git a/python/pyspark/sql/connect/plan.py 
b/python/pyspark/sql/connect/plan.py
index 1d5c80f510e..3bb5558d04b 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -652,6 +652,40 @@ class UnionAll(LogicalPlan):
 """
 
 
+class Repartition(LogicalPlan):
+"""Repartition Relation into a different number of partitions."""
+
+def __init__(self, child: Optional["LogicalPlan"], num_partitions: int, 
shuffle: bool) -> None:
+super().__init__(child)
+self._num_partitions = num_partitions
+self._shuffle = shuffle
+
+def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation:
+rel = proto.Relation()
+if self._child is not None:
+rel.repartition.input.CopyFrom(self._child.plan(session))
+rel.repartition.shuffle = self._shuffle
+rel.repartition.num_partitions = self._num_partitions
+return rel
+
+def print(self, indent: int = 0) -> str:
+plan_name = "repartition" if self._shuffle else "coalesce"
+c_buf = self._child.print(indent + LogicalPlan.INDENT) if self._child 
else ""
+return f"{' ' * indent}<{plan_name} 
num_partitions={self._num_partitions}>\n{c_buf}"
+
+def _repr_html_(self) -> str:
+plan_name = "repartition" if self._shuffle else "coalesce"
+return f"""
+
+   
+  {plan_name}
+  Child: {self._child_repr_()}
+  num_partitions: {self._num_partitions}
+   
+
+"""
+
+
 class SubqueryAlias(LogicalPlan):
 """Alias for 

[spark] branch master updated: [SPARK-41009][SQL] Rename the error class `_LEGACY_ERROR_TEMP_1070` to `LOCATION_ALREADY_EXISTS`

2022-11-09 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3e8191f7267 [SPARK-41009][SQL] Rename the error class 
`_LEGACY_ERROR_TEMP_1070` to `LOCATION_ALREADY_EXISTS`
3e8191f7267 is described below

commit 3e8191f726721bf74c8dbcb3ea73a216f6bf0517
Author: Max Gekk 
AuthorDate: Wed Nov 9 12:33:13 2022 +0300

[SPARK-41009][SQL] Rename the error class `_LEGACY_ERROR_TEMP_1070` to 
`LOCATION_ALREADY_EXISTS`

### What changes were proposed in this pull request?
In the PR, I propose to assign the proper name `LOCATION_ALREADY_EXISTS ` 
to the legacy error class `_LEGACY_ERROR_TEMP_1070 `, and modify test suite to 
use `checkError()` which checks the error class name, context and etc.

### Why are the changes needed?
Proper name improves user experience w/ Spark SQL.

### Does this PR introduce _any_ user-facing change?
Yes, the PR changes an user-facing error message.

### How was this patch tested?
By running the modified test suites:
```
$ build/sbt "core/testOnly *SparkThrowableSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly 
*AlterTableRenameSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly 
*HiveCatalogedDDLSuite"
```

Closes #38490 from MaxGekk/location-already-exists.

Authored-by: Max Gekk 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json   | 10 +++---
 .../sql/catalyst/catalog/SessionCatalog.scala  |  8 ++---
 .../spark/sql/errors/QueryCompilationErrors.scala  | 10 --
 .../spark/sql/errors/QueryExecutionErrors.scala|  8 +
 .../spark/sql/execution/command/DDLSuite.scala | 42 --
 .../command/v1/AlterTableRenameSuite.scala | 17 +
 6 files changed, 51 insertions(+), 44 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 71703e7efd9..9c914b86bb1 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -669,6 +669,11 @@
   }
 }
   },
+  "LOCATION_ALREADY_EXISTS" : {
+"message" : [
+  "Cannot name the managed table as , as its associated 
location  already exists. Please pick a different table name, or 
remove the existing location first."
+]
+  },
   "MALFORMED_PROTOBUF_MESSAGE" : {
 "message" : [
   "Malformed Protobuf messages are detected in message deserialization. 
Parse Mode: . To process malformed protobuf message as null 
result, try setting the option 'mode' as 'PERMISSIVE'."
@@ -1949,11 +1954,6 @@
   "CREATE EXTERNAL TABLE must be accompanied by LOCATION."
 ]
   },
-  "_LEGACY_ERROR_TEMP_1070" : {
-"message" : [
-  "Can not  the managed table(''). The 
associated location('') already exists."
-]
-  },
   "_LEGACY_ERROR_TEMP_1071" : {
 "message" : [
   "Some existing schema fields () are not present 
in the new schema. We don't support dropping columns yet."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index bf712f9681e..06214613299 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -40,7 +40,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Subque
 import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin}
 import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, StringUtils}
 import org.apache.spark.sql.connector.catalog.CatalogManager
-import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
 import org.apache.spark.sql.types.StructType
@@ -411,8 +411,7 @@ class SessionCatalog(
   val fs = tableLocation.getFileSystem(hadoopConf)
 
   if (fs.exists(tableLocation) && fs.listStatus(tableLocation).nonEmpty) {
-throw 
QueryCompilationErrors.cannotOperateManagedTableWithExistingLocationError(
-  "create", table.identifier, tableLocation)
+throw QueryExecutionErrors.locationAlreadyExists(table.identifier, 
tableLocation)
   }
 }
   }
@@ -1912,8 +1911,7 @@ class SessionCatalog(
   val newTableLocation = new Path(new Path(databaseLocation), 
format(newName.table))
   val fs = newTableLocation.getFileSystem(hadoopConf)
   if (fs.exists(newTableLocation)) {
-throw 

[spark] branch master updated (6b88d55b14d -> 0b8ab3256d4)

2022-11-09 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 6b88d55b14d [SPARK-41041][SQL] Integrate `_LEGACY_ERROR_TEMP_1279` 
into `TABLE_OR_VIEW_ALREADY_EXISTS`
 add 0b8ab3256d4 [SPARK-41061][CONNECT] Support SelectExpr which applies 
Projection by expressions in Strings in Connect DSL

No new revisions were added by this update.

Summary of changes:
 .../main/scala/org/apache/spark/sql/connect/dsl/package.scala| 9 +
 .../apache/spark/sql/connect/planner/SparkConnectPlanner.scala   | 6 ++
 .../spark/sql/connect/planner/SparkConnectProtoSuite.scala   | 6 ++
 3 files changed, 21 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41041][SQL] Integrate `_LEGACY_ERROR_TEMP_1279` into `TABLE_OR_VIEW_ALREADY_EXISTS`

2022-11-09 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6b88d55b14d [SPARK-41041][SQL] Integrate `_LEGACY_ERROR_TEMP_1279` 
into `TABLE_OR_VIEW_ALREADY_EXISTS`
6b88d55b14d is described below

commit 6b88d55b14df1f9d15ba921569239cde86071e7d
Author: itholic 
AuthorDate: Wed Nov 9 11:54:34 2022 +0300

[SPARK-41041][SQL] Integrate `_LEGACY_ERROR_TEMP_1279` into 
`TABLE_OR_VIEW_ALREADY_EXISTS`

### What changes were proposed in this pull request?

This PR proposes to integrate the `_LEGACY_ERROR_TEMP_1279` into 
`TABLE_OR_VIEW_ALREADY_EXISTS`.

### Why are the changes needed?

They're duplicated, both explain about the view already exists.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

```
./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*"
```

Closes #38552 from itholic/SPARK-41041.

Authored-by: itholic 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json | 5 -
 .../scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala   | 4 ++--
 2 files changed, 2 insertions(+), 7 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 57fe79ef184..71703e7efd9 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -2972,11 +2972,6 @@
   " is not a view."
 ]
   },
-  "_LEGACY_ERROR_TEMP_1279" : {
-"message" : [
-  "View  already exists. If you want to update the view definition, 
please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS."
-]
-  },
   "_LEGACY_ERROR_TEMP_1280" : {
 "message" : [
   "It is not allowed to create a persisted view from the Dataset API."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 139ea236e49..67ceafbf03d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2667,8 +2667,8 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
 
   def viewAlreadyExistsError(name: TableIdentifier): Throwable = {
 new AnalysisException(
-  errorClass = "_LEGACY_ERROR_TEMP_1279",
-  messageParameters = Map("name" -> name.toString))
+  errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
+  messageParameters = Map("relationName" -> name.toString))
   }
 
   def createPersistedViewFromDatasetAPINotAllowedError(): Throwable = {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-40588] FileFormatWriter materializes AQE plan before accessing outputOrdering

2022-11-09 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new f0f83b518d0 [SPARK-40588] FileFormatWriter materializes AQE plan 
before accessing outputOrdering
f0f83b518d0 is described below

commit f0f83b518d0bbfccd4ff6414d372bc2ad236f0ff
Author: Enrico Minack 
AuthorDate: Wed Nov 9 15:59:54 2022 +0800

[SPARK-40588] FileFormatWriter materializes AQE plan before accessing 
outputOrdering

The `FileFormatWriter` materializes an `AdaptiveQueryPlan` before accessing 
the plan's `outputOrdering`. This is required for Spark 3.0 to 3.3. Spark 3.4 
does not need this because `FileFormatWriter` gets the final plan.

`FileFormatWriter` enforces an ordering if the written plan does not 
provide that ordering. An `AdaptiveQueryPlan` does not know its final ordering 
(Spark 3.0 to 3.3), in which case `FileFormatWriter` enforces the ordering 
(e.g. by column `"a"`) even if the plan provides a compatible ordering (e.g. by 
columns `"a", "b"`). In case of spilling, that order (e.g. by columns `"a", 
"b"`) gets broken (see SPARK-40588).

This fixes SPARK-40588, which was introduced in 3.0. This restores 
behaviour from Spark 2.4.

The final plan that is written to files cannot be extracted from 
`FileFormatWriter`. The bug explained in 
[SPARK-40588](https://issues.apache.org/jira/browse/SPARK-40588) can only be 
asserted on the result files when spilling occurs. This is very hard to control 
in an unit test scenario.

Therefore, this was tested manually. The [example to reproduce this 
issue](https://issues.apache.org/jira/browse/SPARK-40588?focusedCommentId=17621032=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17621032)
 given in SPARK-40588 now produces sorted files.

The actual plan written into the files changed from

```
Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0
+- AdaptiveSparkPlan isFinalPlan=false
   +- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(day#2L, 2), REPARTITION_BY_NUM, [id=#30]
 +- BroadcastNestedLoopJoin BuildLeft, Inner
:- BroadcastExchange IdentityBroadcastMode, [id=#28]
:  +- Project [id#0L AS day#2L]
: +- Range (0, 2, step=1, splits=2)
+- Range (0, 1000, step=1, splits=2)
```

where `FileFormatWriter` enforces order with `Sort [input[0, bigint, false] 
ASC NULLS FIRST], false, 0`, to

```
*(3) Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
+- AQEShuffleRead coalesced
   +- ShuffleQueryStage 1
  +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, 
[id=#68]
 +- *(2) BroadcastNestedLoopJoin BuildLeft, Inner
:- BroadcastQueryStage 0
:  +- BroadcastExchange IdentityBroadcastMode, [id=#42]
: +- *(1) Project [id#0L AS day#2L]
:+- *(1) Range (0, 2, step=1, splits=2)
+- *(2) Range (0, 100, step=1, splits=2)
```

where the sort given by the user is the outermost sort now.

Closes #38358 from EnricoMi/branch-3.3-materialize-aqe-plan.

Authored-by: Enrico Minack 
Signed-off-by: Wenchen Fan 
(cherry picked from commit f0cad7ad6c2618d2d0d8c8598bbd54c2ca366b6b)
Signed-off-by: Wenchen Fan 
---
 .../sql/execution/adaptive/AdaptiveSparkPlanExec.scala |  2 ++
 .../sql/execution/datasources/FileFormatWriter.scala   | 18 ++
 2 files changed, 16 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 5157c169ef9..1e7cfc474c1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -217,6 +217,8 @@ case class AdaptiveSparkPlanExec(
   .map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe)
   }
 
+  def finalPhysicalPlan: SparkPlan = withFinalPlanUpdate(identity)
+
   private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
 if (isFinalPlan) return currentPhysicalPlan
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index cd3d101ac26..a9d4d4208f3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 

[spark] branch branch-3.3 updated (ef74381ec0d -> f0cad7ad6c2)

2022-11-09 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


from ef74381ec0d [SPARK-41035][SQL] Don't patch foldable children of 
aggregate functions in `RewriteDistinctAggregates`
 add f0cad7ad6c2 [SPARK-40588] FileFormatWriter materializes AQE plan 
before accessing outputOrdering

No new revisions were added by this update.

Summary of changes:
 .../sql/execution/adaptive/AdaptiveSparkPlanExec.scala |  2 ++
 .../sql/execution/datasources/FileFormatWriter.scala   | 18 ++
 2 files changed, 16 insertions(+), 4 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org