[GitHub] [spark-website] gengliangwang commented on pull request #428: Add 3.2.3 announcement news, release note and download link

2022-11-29 Thread GitBox


gengliangwang commented on PR #428:
URL: https://github.com/apache/spark-website/pull/428#issuecomment-1331761672

   @dongjoon-hyun @sunchao Sure, the search engine is updated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41315][CONNECT][PYTHON] Implement `DataFrame.replace` and `DataFrame.na.replace`

2022-11-29 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5fc482eb591 [SPARK-41315][CONNECT][PYTHON] Implement 
`DataFrame.replace` and `DataFrame.na.replace`
5fc482eb591 is described below

commit 5fc482eb591a42f6dc1eb6ca7cffea72a4616f09
Author: Ruifeng Zheng 
AuthorDate: Wed Nov 30 15:40:53 2022 +0800

[SPARK-41315][CONNECT][PYTHON] Implement `DataFrame.replace` and 
`DataFrame.na.replace`

### What changes were proposed in this pull request?
Implement `DataFrame.replace` and `DataFrame.na.replace`

### Why are the changes needed?
for api coverage

### Does this PR introduce _any_ user-facing change?
yes, new api

### How was this patch tested?
added ut

Closes #38836 from zhengruifeng/connect_df_replace.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 .../main/protobuf/spark/connect/relations.proto|  30 
 .../org/apache/spark/sql/connect/dsl/package.scala |  25 +++
 .../sql/connect/planner/SparkConnectPlanner.scala  |  32 
 .../connect/planner/SparkConnectProtoSuite.scala   |  12 ++
 python/pyspark/sql/connect/dataframe.py| 150 -
 python/pyspark/sql/connect/plan.py |  72 +++--
 python/pyspark/sql/connect/proto/relations_pb2.py  | 179 -
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  93 +++
 .../sql/tests/connect/test_connect_basic.py|  46 ++
 .../sql/tests/connect/test_connect_plan_only.py|  27 
 10 files changed, 579 insertions(+), 87 deletions(-)

diff --git a/connector/connect/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/src/main/protobuf/spark/connect/relations.proto
index d0ebce5ccab..8b87845245f 100644
--- a/connector/connect/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/src/main/protobuf/spark/connect/relations.proto
@@ -57,6 +57,7 @@ message Relation {
 // NA functions
 NAFill fill_na = 90;
 NADrop drop_na = 91;
+NAReplace replace = 92;
 
 // stat functions
 StatSummary summary = 100;
@@ -466,6 +467,35 @@ message NADrop {
 }
 
 
+// Replaces old values with the corresponding values.
+// It will invoke 'Dataset.na.replace' (same as 'DataFrameNaFunctions.replace')
+// to compute the results.
+message NAReplace {
+  // (Required) The input relation.
+  Relation input = 1;
+
+  // (Optional) List of column names to consider.
+  //
+  // When it is empty, all the type-compatible columns in the input relation 
will be considered.
+  repeated string cols = 2;
+
+  // (Optional) The value replacement mapping.
+  repeated Replacement replacements = 3;
+
+  message Replacement {
+// (Required) The old value.
+//
+// Only 4 data types are supported now: null, bool, double, string.
+Expression.Literal old_value = 1;
+
+// (Required) The new value.
+//
+// Should be of the same data type with the old value.
+Expression.Literal new_value = 2;
+  }
+}
+
+
 // Rename columns on the input relation by the same length of names.
 message RenameColumnsBySameLengthNames {
   // (Required) The input relation of RenameColumnsBySameLengthNames.
diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index 42f59e112af..654a4d5ce20 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -311,6 +311,31 @@ package object dsl {
   .setDropNa(dropna.build())
   .build()
   }
+
+  def replace(cols: Seq[String], replacement: Map[Any, Any]): Relation = {
+require(cols.nonEmpty)
+
+val replace = proto.NAReplace
+  .newBuilder()
+  .setInput(logicalPlan)
+
+if (!(cols.length == 1 && cols.head == "*")) {
+  replace.addAllCols(cols.asJava)
+}
+
+replacement.foreach { case (oldValue, newValue) =>
+  replace.addReplacements(
+proto.NAReplace.Replacement
+  .newBuilder()
+  .setOldValue(convertValue(oldValue))
+  .setNewValue(convertValue(newValue)))
+}
+
+Relation
+  .newBuilder()
+  .setReplace(replace.build())
+  .build()
+  }
 }
 
 implicit class DslStatFunctions(val logicalPlan: Relation) {
diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 2c492d973e4..7b9e13cadab 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connec

[spark] branch master updated: [SPARK-41323][SQL] Support current_schema

2022-11-29 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7fd654c0142 [SPARK-41323][SQL] Support current_schema
7fd654c0142 is described below

commit 7fd654c0142ab9e4002882da4e65d3b25bebd26c
Author: Serge Rielau 
AuthorDate: Wed Nov 30 14:56:23 2022 +0800

[SPARK-41323][SQL] Support current_schema

### What changes were proposed in this pull request?
Support current_schema(0 as a synonym for current_database()

### Why are the changes needed?
current_schema is used in the SQL Standard and many other products such as 
Snowflake, Db2, PostgreSQL, Redshift, ...
It also disambiguates from database which is also used with catalog in some 
products.

### Does this PR introduce _any_ user-facing change?
It is a new feature.

### How was this patch tested?
Add a test verifying that current_schema() and current_database() return 
the same result()

Closes #38840 from srielau/SPARK-41323-CURRENT_SCHEMA.

Authored-by: Serge Rielau 
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala   | 1 +
 sql/core/src/test/resources/sql-functions/sql-expression-schema.md  | 5 +++--
 .../test/resources/sql-tests/inputs/current_database_catalog.sql| 4 ++--
 .../resources/sql-tests/results/current_database_catalog.sql.out| 6 +++---
 .../scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala  | 2 ++
 5 files changed, 11 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index a1cecc4b6e0..3817f00d09d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -726,6 +726,7 @@ object FunctionRegistry {
 expression[InputFileBlockLength]("input_file_block_length"),
 expression[MonotonicallyIncreasingID]("monotonically_increasing_id"),
 expression[CurrentDatabase]("current_database"),
+expression[CurrentDatabase]("current_schema", true),
 expression[CurrentCatalog]("current_catalog"),
 expression[CurrentUser]("current_user"),
 expression[CurrentUser]("user", setAlias = true),
diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md 
b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
index 8d47878de15..8c2134a6142 100644
--- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
+++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
@@ -89,6 +89,7 @@
 | org.apache.spark.sql.catalyst.expressions.CurDateExpressionBuilder | curdate 
| SELECT curdate() | struct |
 | org.apache.spark.sql.catalyst.expressions.CurrentCatalog | current_catalog | 
SELECT current_catalog() | struct |
 | org.apache.spark.sql.catalyst.expressions.CurrentDatabase | current_database 
| SELECT current_database() | struct |
+| org.apache.spark.sql.catalyst.expressions.CurrentDatabase | current_schema | 
SELECT current_schema() | struct |
 | org.apache.spark.sql.catalyst.expressions.CurrentDate | current_date | 
SELECT current_date() | struct |
 | org.apache.spark.sql.catalyst.expressions.CurrentTimeZone | current_timezone 
| SELECT current_timezone() | struct |
 | org.apache.spark.sql.catalyst.expressions.CurrentTimestamp | 
current_timestamp | SELECT current_timestamp() | 
struct |
@@ -400,7 +401,7 @@
 | org.apache.spark.sql.catalyst.expressions.aggregate.StddevSamp | stddev | 
SELECT stddev(col) FROM VALUES (1), (2), (3) AS tab(col) | 
struct |
 | org.apache.spark.sql.catalyst.expressions.aggregate.StddevSamp | stddev_samp 
| SELECT stddev_samp(col) FROM VALUES (1), (2), (3) AS tab(col) | 
struct |
 | org.apache.spark.sql.catalyst.expressions.aggregate.Sum | sum | SELECT 
sum(col) FROM VALUES (5), (10), (15) AS tab(col) | struct |
-| org.apache.spark.sql.catalyst.expressions.aggregate.TryAverage | try_avg | 
SELECT try_avg(col) FROM VALUES (1), (2), (3) AS tab(col) | 
struct |
+| 
org.apache.spark.sql.catalyst.expressions.aggregate.TryAverageExpressionBuilder 
| try_avg | SELECT try_avg(col) FROM VALUES (1), (2), (3) AS tab(col) | 
struct |
 | org.apache.spark.sql.catalyst.expressions.aggregate.TrySumExpressionBuilder 
| try_sum | SELECT try_sum(col) FROM VALUES (5), (10), (15) AS tab(col) | 
struct |
 | org.apache.spark.sql.catalyst.expressions.aggregate.VariancePop | var_pop | 
SELECT var_pop(col) FROM VALUES (1), (2), (3) AS tab(col) | 
struct |
 | org.apache.spark.sql.catalyst.expressions.aggregate.VarianceSamp | var_samp 
| SELECT var_samp(col) FROM VALUES (1), (2), (3) AS tab(col) | 
struct |
@@ -413,4 +414,4 @@
 | org.apa

[spark] branch master updated: [SPARK-41331][CONNECT][PYTHON] Add `orderBy` and `drop_duplicates`

2022-11-29 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 92847d98810 [SPARK-41331][CONNECT][PYTHON] Add `orderBy` and 
`drop_duplicates`
92847d98810 is described below

commit 92847d98810280c9ddebea2e12a5e4945601f809
Author: Ruifeng Zheng 
AuthorDate: Wed Nov 30 14:34:02 2022 +0800

[SPARK-41331][CONNECT][PYTHON] Add `orderBy` and `drop_duplicates`

### What changes were proposed in this pull request?
Add `orderBy` and `drop_duplicates`

### Why are the changes needed?
For API coverage

### Does this PR introduce _any_ user-facing change?
yes, new api

### How was this patch tested?
added test cases, since they are only alias, I just test them in plan-only

Closes #38846 from zhengruifeng/connect_df_orderby_dropduplicate.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/connect/dataframe.py|  4 
 python/pyspark/sql/tests/connect/test_connect_plan_only.py | 14 ++
 2 files changed, 18 insertions(+)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index ef70336e31d..034f410b1ad 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -299,6 +299,8 @@ class DataFrame(object):
 plan.Deduplicate(child=self._plan, column_names=subset), 
session=self._session
 )
 
+drop_duplicates = dropDuplicates
+
 def distinct(self) -> "DataFrame":
 """Returns a new :class:`DataFrame` containing the distinct rows in 
this :class:`DataFrame`.
 
@@ -513,6 +515,8 @@ class DataFrame(object):
 plan.Sort(self._plan, columns=list(cols), is_global=True), 
session=self._session
 )
 
+orderBy = sort
+
 def sortWithinPartitions(self, *cols: "ColumnOrName") -> "DataFrame":
 """Sort within each partition by a specific column"""
 return DataFrame.withPlan(
diff --git a/python/pyspark/sql/tests/connect/test_connect_plan_only.py 
b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
index ed4b688cef2..337917335e3 100644
--- a/python/pyspark/sql/tests/connect/test_connect_plan_only.py
+++ b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
@@ -193,6 +193,16 @@ class SparkConnectTestsPlanOnly(PlanOnlyTestFixture):
 )
 self.assertEqual(plan.root.sort.is_global, True)
 
+plan = df.filter(df.col_name > 3).orderBy("col_a", 
"col_b")._plan.to_proto(self.connect)
+self.assertEqual(
+[
+f.expression.unresolved_attribute.unparsed_identifier
+for f in plan.root.sort.sort_fields
+],
+["col_a", "col_b"],
+)
+self.assertEqual(plan.root.sort.is_global, True)
+
 plan = (
 df.filter(df.col_name > 3)
 .sortWithinPartitions("col_a", "col_b")
@@ -236,6 +246,10 @@ class SparkConnectTestsPlanOnly(PlanOnlyTestFixture):
 
self.assertEqual(deduplicate_on_all_columns_plan.root.deduplicate.all_columns_as_keys,
 True)
 
self.assertEqual(len(deduplicate_on_all_columns_plan.root.deduplicate.column_names),
 0)
 
+deduplicate_on_all_columns_plan = 
df.drop_duplicates()._plan.to_proto(self.connect)
+
self.assertEqual(deduplicate_on_all_columns_plan.root.deduplicate.all_columns_as_keys,
 True)
+
self.assertEqual(len(deduplicate_on_all_columns_plan.root.deduplicate.column_names),
 0)
+
 deduplicate_on_subset_columns_plan = df.dropDuplicates(["name", 
"height"])._plan.to_proto(
 self.connect
 )


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41330][CONNECT][PYSPARK][DOC] Improve Documentation for Take,Tail, Limit and Offset

2022-11-29 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f65129d3a53 [SPARK-41330][CONNECT][PYSPARK][DOC] Improve Documentation 
for Take,Tail, Limit and Offset
f65129d3a53 is described below

commit f65129d3a53e1cd58418268ccc1f900a1ee8eb16
Author: Rui Wang 
AuthorDate: Wed Nov 30 14:21:36 2022 +0800

[SPARK-41330][CONNECT][PYSPARK][DOC] Improve Documentation for Take,Tail, 
Limit and Offset

### What changes were proposed in this pull request?

For `Tail`, `Take`, `Limit`, `Offset`. Current documentation says `whataver 
number is available` when the parameter is smaller than the entire dataset 
size. This PR re-phrases it to say the all the records in the dataframe.

### Why are the changes needed?

Improve documentation.

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

N/A

Closes #38845 from amaliujia/fix_pyspark_comment.

Authored-by: Rui Wang 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/connect/dataframe.py | 4 ++--
 python/pyspark/sql/dataframe.py | 6 +++---
 2 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 08ad9954672..ef70336e31d 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -421,7 +421,7 @@ class DataFrame(object):
 --
 num : int
 Number of records to return. Will return this number of records
-or whataver number is available.
+or all records if the DataFrame contains less than this number of 
records..
 
 Returns
 ---
@@ -456,7 +456,7 @@ class DataFrame(object):
 --
 num : int
 Number of records to return. Will return this number of records
-or whatever number is available.
+or all records if the DataFrame contains less than this number of 
records.
 
 Returns
 ---
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 61e628c9015..f3873e3c8cd 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -1176,7 +1176,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 --
 num : int
 Number of records to return. Will return this number of records
-or whataver number is available.
+or all records if the DataFrame contains less than this number of 
records.
 
 Returns
 ---
@@ -1211,7 +1211,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 --
 num : int
 Number of records to return. Will return this number of records
-or whataver number is available.
+or all records if the DataFrame contains less than this number of 
records..
 
 Returns
 ---
@@ -1243,7 +1243,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 --
 num : int
 Number of records to return. Will return this number of records
-or whataver number is available.
+or all records if the DataFrame contains less than this number of 
records.
 
 Returns
 ---


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41328][CONNECT][PYTHON] Add logical and string API to Column

2022-11-29 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 40e6592e02c [SPARK-41328][CONNECT][PYTHON] Add logical and string API 
to Column
40e6592e02c is described below

commit 40e6592e02cbe679daec9e302e1027ffc64e7323
Author: Rui Wang 
AuthorDate: Wed Nov 30 13:13:57 2022 +0900

[SPARK-41328][CONNECT][PYTHON] Add logical and string API to Column

### What changes were proposed in this pull request?

1. Upgrade `_typing.py` to use `Column`.
2. Add logical operators (and, or, etc.) and strings (like, substr, etc.) 
to `Column`.
3. Add basic tests for new API.

### Why are the changes needed?

Improve API coverage

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

UT

Closes #38844 from amaliujia/refactor_column_back_up_2.

Authored-by: Rui Wang 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/_typing.py  |   4 +-
 python/pyspark/sql/connect/column.py   | 339 -
 python/pyspark/sql/connect/function_builder.py |   6 +-
 .../sql/tests/connect/test_connect_basic.py|  31 +-
 4 files changed, 367 insertions(+), 13 deletions(-)

diff --git a/python/pyspark/sql/connect/_typing.py 
b/python/pyspark/sql/connect/_typing.py
index 8629d1c23cc..e5ade4cfcbe 100644
--- a/python/pyspark/sql/connect/_typing.py
+++ b/python/pyspark/sql/connect/_typing.py
@@ -26,7 +26,7 @@ from typing import Union, Optional
 import datetime
 import decimal
 
-from pyspark.sql.connect.column import ScalarFunctionExpression, Column
+from pyspark.sql.connect.column import Column
 
 ColumnOrName = Union[Column, str]
 
@@ -42,7 +42,7 @@ DateTimeLiteral = Union[datetime.datetime, datetime.date]
 
 
 class FunctionBuilderCallable(Protocol):
-def __call__(self, *_: ColumnOrName) -> ScalarFunctionExpression:
+def __call__(self, *_: ColumnOrName) -> Column:
 ...
 
 
diff --git a/python/pyspark/sql/connect/column.py 
b/python/pyspark/sql/connect/column.py
index 83e8b28da0f..c53d2c90bf6 100644
--- a/python/pyspark/sql/connect/column.py
+++ b/python/pyspark/sql/connect/column.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-from typing import get_args, TYPE_CHECKING, Callable, Any, Union
+from typing import get_args, TYPE_CHECKING, Callable, Any, Union, overload
 
 import json
 import decimal
@@ -29,6 +29,17 @@ if TYPE_CHECKING:
 from pyspark.sql.connect.client import SparkConnectClient
 import pyspark.sql.connect.proto as proto
 
+# TODO(SPARK-41329): solve the circular import between _typing and this class
+# if we want to reuse _type.PrimitiveType
+PrimitiveType = Union[bool, float, int, str]
+
+
+def _func_op(name: str, doc: str = "") -> Callable[["Column"], "Column"]:
+def _(self: "Column") -> "Column":
+return scalar_function(name, self)
+
+return _
+
 
 def _bin_op(
 name: str, doc: str = "binary function", reverse: bool = False
@@ -219,6 +230,8 @@ class LiteralExpression(Expression):
 else:
 pair.value.CopyFrom(lit(value).to_plan(session).literal)
 expr.literal.map.pairs.append(pair)
+elif isinstance(self._value, Column):
+expr.CopyFrom(self._value.to_plan(session))
 else:
 raise ValueError(f"Could not convert literal for type 
{type(self._value)}")
 
@@ -352,17 +365,326 @@ class Column(object):
 __rpow__ = _bin_op("pow", reverse=True)
 __ge__ = _bin_op(">=")
 __le__ = _bin_op("<=")
-# __eq__ = _bin_op("==")  # ignore [assignment]
+
+_eqNullSafe_doc = """
+Equality test that is safe for null values.
+
+Parameters
+--
+other
+a value or :class:`Column`
+
+Examples
+
+>>> from pyspark.sql import Row
+>>> df1 = spark.createDataFrame([
+... Row(id=1, value='foo'),
+... Row(id=2, value=None)
+... ])
+>>> df1.select(
+... df1['value'] == 'foo',
+... df1['value'].eqNullSafe('foo'),
+... df1['value'].eqNullSafe(None)
+... ).show()
++-+---++
+|(value = foo)|(value <=> foo)|(value <=> NULL)|
++-+---++
+| true|   true|   false|
+| null|  false|true|
++-+---++
+>>> df2 = spark.createDataFrame([
+... Row(value = 'bar'),
+... Row(value = None)
+... ])
+>>> df1.join(df2, df1["value"] == df2["value"]).count()
+0
+>>> df1.join(df2, df1["value"].eqNullSafe(df

[spark] branch branch-3.2 updated: [SPARK-40987][CORE] `BlockManager#removeBlockInternal` should ensure the lock is unlocked gracefully

2022-11-29 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new b95a77107a6 [SPARK-40987][CORE] `BlockManager#removeBlockInternal` 
should ensure the lock is unlocked gracefully
b95a77107a6 is described below

commit b95a77107a6f9e91e1a1ad57e7efd6b90c5517f8
Author: sychen 
AuthorDate: Tue Nov 29 21:52:43 2022 -0600

[SPARK-40987][CORE] `BlockManager#removeBlockInternal` should ensure the 
lock is unlocked gracefully

### What changes were proposed in this pull request?
`BlockManager#removeBlockInternal` should ensure the lock is unlocked 
gracefully.
`removeBlockInternal` tries to call `removeBlock` in the finally block.

### Why are the changes needed?
When the driver submits a job, `DAGScheduler` calls 
`sc.broadcast(taskBinaryBytes)`.
`TorrentBroadcast#writeBlocks` may fail due to disk problems during 
`blockManager#putBytes`.
`BlockManager#doPut` calls `BlockManager#removeBlockInternal` to clean up 
the block.
`BlockManager#removeBlockInternal` calls `DiskStore#remove` to clean up 
blocks on disk.
`DiskStore#remove` will try to create the directory because the directory 
does not exist, and an exception will be thrown at this time.
`BlockInfoManager#blockInfoWrappers` block info and lock not removed.
The catch block in `TorrentBroadcast#writeBlocks` will call 
`blockManager.removeBroadcast` to clean up the broadcast.
Because the block lock in `BlockInfoManager#blockInfoWrappers` is not 
released, the `dag-scheduler-event-loop` thread of `DAGScheduler` will wait 
forever.

```
22/11/01 18:27:48 WARN BlockManager: Putting block broadcast_0_piece0 
failed due to exception java.io.IOException: X.
22/11/01 18:27:48 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, 
remove all pieces of the broadcast
```

```
"dag-scheduler-event-loop" #54 daemon prio=5 os_prio=31 
tid=0x7fc98e3fa800 nid=0x7203 waiting on condition [0x78c1e000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0007add3d8c8> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1(BlockInfoManager.scala:221)
    at 
org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1$adapted(BlockInfoManager.scala:214)
    at 
org.apache.spark.storage.BlockInfoManager$$Lambda$3038/1307533457.apply(Unknown 
Source)
    at 
org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:105)
    at 
org.apache.spark.storage.BlockInfoManager.acquireLock(BlockInfoManager.scala:214)
    at 
org.apache.spark.storage.BlockInfoManager.lockForWriting(BlockInfoManager.scala:293)
    at 
org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1979)
    at 
org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3(BlockManager.scala:1970)
    at 
org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3$adapted(BlockManager.scala:1970)
    at 
org.apache.spark.storage.BlockManager$$Lambda$3092/1241801156.apply(Unknown 
Source)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at 
org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1970)
    at 
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:179)
    at 
org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:99)
    at 
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
    at 
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
    at 
org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
    at 
org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
    at 
org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
    at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2921)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onRec

[spark] branch branch-3.3 updated: [SPARK-40987][CORE] `BlockManager#removeBlockInternal` should ensure the lock is unlocked gracefully

2022-11-29 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 3f7ff3507c3 [SPARK-40987][CORE] `BlockManager#removeBlockInternal` 
should ensure the lock is unlocked gracefully
3f7ff3507c3 is described below

commit 3f7ff3507c3964f965202e1fcb0048ac0a4af8e5
Author: sychen 
AuthorDate: Tue Nov 29 21:52:43 2022 -0600

[SPARK-40987][CORE] `BlockManager#removeBlockInternal` should ensure the 
lock is unlocked gracefully

### What changes were proposed in this pull request?
`BlockManager#removeBlockInternal` should ensure the lock is unlocked 
gracefully.
`removeBlockInternal` tries to call `removeBlock` in the finally block.

### Why are the changes needed?
When the driver submits a job, `DAGScheduler` calls 
`sc.broadcast(taskBinaryBytes)`.
`TorrentBroadcast#writeBlocks` may fail due to disk problems during 
`blockManager#putBytes`.
`BlockManager#doPut` calls `BlockManager#removeBlockInternal` to clean up 
the block.
`BlockManager#removeBlockInternal` calls `DiskStore#remove` to clean up 
blocks on disk.
`DiskStore#remove` will try to create the directory because the directory 
does not exist, and an exception will be thrown at this time.
`BlockInfoManager#blockInfoWrappers` block info and lock not removed.
The catch block in `TorrentBroadcast#writeBlocks` will call 
`blockManager.removeBroadcast` to clean up the broadcast.
Because the block lock in `BlockInfoManager#blockInfoWrappers` is not 
released, the `dag-scheduler-event-loop` thread of `DAGScheduler` will wait 
forever.

```
22/11/01 18:27:48 WARN BlockManager: Putting block broadcast_0_piece0 
failed due to exception java.io.IOException: X.
22/11/01 18:27:48 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, 
remove all pieces of the broadcast
```

```
"dag-scheduler-event-loop" #54 daemon prio=5 os_prio=31 
tid=0x7fc98e3fa800 nid=0x7203 waiting on condition [0x78c1e000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0007add3d8c8> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1(BlockInfoManager.scala:221)
    at 
org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1$adapted(BlockInfoManager.scala:214)
    at 
org.apache.spark.storage.BlockInfoManager$$Lambda$3038/1307533457.apply(Unknown 
Source)
    at 
org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:105)
    at 
org.apache.spark.storage.BlockInfoManager.acquireLock(BlockInfoManager.scala:214)
    at 
org.apache.spark.storage.BlockInfoManager.lockForWriting(BlockInfoManager.scala:293)
    at 
org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1979)
    at 
org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3(BlockManager.scala:1970)
    at 
org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3$adapted(BlockManager.scala:1970)
    at 
org.apache.spark.storage.BlockManager$$Lambda$3092/1241801156.apply(Unknown 
Source)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at 
org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1970)
    at 
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:179)
    at 
org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:99)
    at 
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
    at 
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
    at 
org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
    at 
org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
    at 
org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
    at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2921)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onRec

[spark] branch master updated: [SPARK-40987][CORE] `BlockManager#removeBlockInternal` should ensure the lock is unlocked gracefully

2022-11-29 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new bbab0afb9a6 [SPARK-40987][CORE] `BlockManager#removeBlockInternal` 
should ensure the lock is unlocked gracefully
bbab0afb9a6 is described below

commit bbab0afb9a6919694cda5b9d490203af93a23460
Author: sychen 
AuthorDate: Tue Nov 29 21:52:43 2022 -0600

[SPARK-40987][CORE] `BlockManager#removeBlockInternal` should ensure the 
lock is unlocked gracefully

### What changes were proposed in this pull request?
`BlockManager#removeBlockInternal` should ensure the lock is unlocked 
gracefully.
`removeBlockInternal` tries to call `removeBlock` in the finally block.

### Why are the changes needed?
When the driver submits a job, `DAGScheduler` calls 
`sc.broadcast(taskBinaryBytes)`.
`TorrentBroadcast#writeBlocks` may fail due to disk problems during 
`blockManager#putBytes`.
`BlockManager#doPut` calls `BlockManager#removeBlockInternal` to clean up 
the block.
`BlockManager#removeBlockInternal` calls `DiskStore#remove` to clean up 
blocks on disk.
`DiskStore#remove` will try to create the directory because the directory 
does not exist, and an exception will be thrown at this time.
`BlockInfoManager#blockInfoWrappers` block info and lock not removed.
The catch block in `TorrentBroadcast#writeBlocks` will call 
`blockManager.removeBroadcast` to clean up the broadcast.
Because the block lock in `BlockInfoManager#blockInfoWrappers` is not 
released, the `dag-scheduler-event-loop` thread of `DAGScheduler` will wait 
forever.

```
22/11/01 18:27:48 WARN BlockManager: Putting block broadcast_0_piece0 
failed due to exception java.io.IOException: X.
22/11/01 18:27:48 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, 
remove all pieces of the broadcast
```

```
"dag-scheduler-event-loop" #54 daemon prio=5 os_prio=31 
tid=0x7fc98e3fa800 nid=0x7203 waiting on condition [0x78c1e000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0007add3d8c8> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at 
org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1(BlockInfoManager.scala:221)
    at 
org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1$adapted(BlockInfoManager.scala:214)
    at 
org.apache.spark.storage.BlockInfoManager$$Lambda$3038/1307533457.apply(Unknown 
Source)
    at 
org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:105)
    at 
org.apache.spark.storage.BlockInfoManager.acquireLock(BlockInfoManager.scala:214)
    at 
org.apache.spark.storage.BlockInfoManager.lockForWriting(BlockInfoManager.scala:293)
    at 
org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1979)
    at 
org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3(BlockManager.scala:1970)
    at 
org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3$adapted(BlockManager.scala:1970)
    at 
org.apache.spark.storage.BlockManager$$Lambda$3092/1241801156.apply(Unknown 
Source)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at 
org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1970)
    at 
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:179)
    at 
org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:99)
    at 
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
    at 
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
    at 
org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
    at 
org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
    at 
org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
    at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2921)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAG

[spark] branch master updated: [SPARK-41326] [CONNECT] Fix deduplicate is missing input

2022-11-29 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f529d0e7e01 [SPARK-41326] [CONNECT] Fix deduplicate is missing input
f529d0e7e01 is described below

commit f529d0e7e016075fcd560e79c71f97fcfd581ddc
Author: Martin Grund 
AuthorDate: Wed Nov 30 10:00:24 2022 +0900

[SPARK-41326] [CONNECT] Fix deduplicate is missing input

### What changes were proposed in this pull request?
In the transformation of the Spark Connect plan for `Deduplicate`, it was 
missing to copy the input relation into the plan. This caused an exception on 
the server and failing the query.

This patch fixes that bug.

### Why are the changes needed?
Bugfix

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes #38842 from grundprinzip/SPARK-41326.

Authored-by: Martin Grund 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/plan.py | 1 +
 python/pyspark/sql/tests/connect/test_connect_plan_only.py | 2 ++
 2 files changed, 3 insertions(+)

diff --git a/python/pyspark/sql/connect/plan.py 
b/python/pyspark/sql/connect/plan.py
index ed4080e4230..e7bbad25e59 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -462,6 +462,7 @@ class Deduplicate(LogicalPlan):
 def plan(self, session: "SparkConnectClient") -> proto.Relation:
 assert self._child is not None
 plan = proto.Relation()
+plan.deduplicate.input.CopyFrom(self._child.plan(session))
 plan.deduplicate.all_columns_as_keys = self.all_columns_as_keys
 if self.column_names is not None:
 plan.deduplicate.column_names.extend(self.column_names)
diff --git a/python/pyspark/sql/tests/connect/test_connect_plan_only.py 
b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
index fe005cd6365..ed4b688cef2 100644
--- a/python/pyspark/sql/tests/connect/test_connect_plan_only.py
+++ b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
@@ -227,6 +227,8 @@ class SparkConnectTestsPlanOnly(PlanOnlyTestFixture):
 df = self.connect.readTable(table_name=self.tbl_name)
 
 distinct_plan = df.distinct()._plan.to_proto(self.connect)
+self.assertTrue(distinct_plan.root.deduplicate.HasField("input"), 
"input must be set")
+
 self.assertEqual(distinct_plan.root.deduplicate.all_columns_as_keys, 
True)
 self.assertEqual(len(distinct_plan.root.deduplicate.column_names), 0)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41268][CONNECT][PYTHON] Refactor "Column" for API Compatibility

2022-11-29 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new fa21bc52f1d [SPARK-41268][CONNECT][PYTHON] Refactor "Column" for API 
Compatibility
fa21bc52f1d is described below

commit fa21bc52f1d079aac4be38a4809a96317913c7e8
Author: Rui Wang 
AuthorDate: Wed Nov 30 09:56:11 2022 +0900

[SPARK-41268][CONNECT][PYTHON] Refactor "Column" for API Compatibility

### What changes were proposed in this pull request?

In the past, the expression system is pretty different between Connect and 
PySpark. `Column` is the first class API for all the expression in PySpark 
while `Expression` is the top level API in Connect. To maintain API 
Compatibility we need to promote `Column` in Connect. Also Connect has its 
special needs to maintain a series of `to_proto` methods to convert expressions 
to proto representation.

This PR:
1. Promote `Column` to be the first class citizen.
2. Wrap `Expression` into `Column` and do not expose it into DataFrame.

**The main idea of this PR**
We wrap `Expression` into `Column` and only expose `Column` to DataFrame 
API surface. In this concept, `Column` is the public API that user access 
(along with functions.py which offer a rich set of ways to create columns). 
`Expression` serve as internal API to differentiate different types of columns 
and also build a bridge between Python and Connect protobuf.

It is not decided yet if `plan.py` or Logical Plan should deal with 
Expression or Column, and same for `str` versus `Column` or `Expression`.

Examples as the following:

```
Class Column:
   def __init__(self, expr: Expression):
 self._expr = expr;

def like(self: "Column", other: str) -> "Column":
 # UnresolvedFunction extends Expression for `like` operation
 # LiteralExpression as expression to carry `str` which is the 
function paramter.
 return Column(UnresolvedFunction("like",  self, 
Column(LiteralExpression(str)))

df.select(df.data, df.data.like("secret")).collect()

data | bool
test | False
secret | True
```

**Caveats:**
TBD

**Alternative approach:**
We keep `Column` as a base class and other expressions inherits it. Still 
only use `Column` in DataFrame API surface.

```
Class Column:
   def __init__(self, expr: Expression):
 self._expr = expr;

def like(self: "Column", other: str) -> "Column":
 return cast(Column, UnresolvedFunction("like",  self, 
LiteralExpression(str)))

df.select(df.data, df.data.like("secret")).collect()
```

**Caveats:**
TBD

### Why are the changes needed?

API Compatibility

### Does this PR introduce _any_ user-facing change?

NO
### How was this patch tested?

Existing UT

Closes #38806 from amaliujia/refactor_column.

Authored-by: Rui Wang 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/_typing.py  |   9 +-
 python/pyspark/sql/connect/column.py   | 180 ++---
 python/pyspark/sql/connect/dataframe.py|  59 +++
 python/pyspark/sql/connect/function_builder.py |  18 +--
 python/pyspark/sql/connect/functions.py|   8 +-
 python/pyspark/sql/connect/plan.py |  36 ++---
 .../connect/test_connect_column_expressions.py |  10 +-
 .../sql/tests/connect/test_connect_plan_only.py|   4 +-
 8 files changed, 188 insertions(+), 136 deletions(-)

diff --git a/python/pyspark/sql/connect/_typing.py 
b/python/pyspark/sql/connect/_typing.py
index 262d71fcea1..8629d1c23cc 100644
--- a/python/pyspark/sql/connect/_typing.py
+++ b/python/pyspark/sql/connect/_typing.py
@@ -26,10 +26,7 @@ from typing import Union, Optional
 import datetime
 import decimal
 
-from pyspark.sql.connect.column import ScalarFunctionExpression, Expression, 
Column
-from pyspark.sql.connect.function_builder import UserDefinedFunction
-
-ExpressionOrString = Union[Expression, str]
+from pyspark.sql.connect.column import ScalarFunctionExpression, Column
 
 ColumnOrName = Union[Column, str]
 
@@ -45,10 +42,10 @@ DateTimeLiteral = Union[datetime.datetime, datetime.date]
 
 
 class FunctionBuilderCallable(Protocol):
-def __call__(self, *_: ExpressionOrString) -> ScalarFunctionExpression:
+def __call__(self, *_: ColumnOrName) -> ScalarFunctionExpression:
 ...
 
 
 class UserDefinedFunctionCallable(Protocol):
-def __call__(self, *_: ColumnOrName) -> UserDefinedFunction:
+def __call__(self, *_: ColumnOrName) -> Column:
 ...
diff --git a/python/pyspark/sql/connect/column.py 
b/python/pyspark/sql/connect/column.py
index 663af074925..83e8b28da0f 100644
---

[spark] branch master updated (a47869af7fa -> 54c57fa8690)

2022-11-29 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from a47869af7fa [SPARK-41309][SQL] Reuse 
`INVALID_SCHEMA.NON_STRING_LITERAL` instead of  `_LEGACY_ERROR_TEMP_1093`
 add 54c57fa8690 [SPARK-41297][CONNECT][PYTHON] Support String Expressions 
in filter

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/connect/dataframe.py| 12 +++-
 python/pyspark/sql/tests/connect/test_connect_plan_only.py |  8 
 2 files changed, 15 insertions(+), 5 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41309][SQL] Reuse `INVALID_SCHEMA.NON_STRING_LITERAL` instead of `_LEGACY_ERROR_TEMP_1093`

2022-11-29 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a47869af7fa [SPARK-41309][SQL] Reuse 
`INVALID_SCHEMA.NON_STRING_LITERAL` instead of  `_LEGACY_ERROR_TEMP_1093`
a47869af7fa is described below

commit a47869af7fa82b708520da123fa0446214f601c2
Author: yangjie01 
AuthorDate: Tue Nov 29 19:36:59 2022 +0300

[SPARK-41309][SQL] Reuse `INVALID_SCHEMA.NON_STRING_LITERAL` instead of  
`_LEGACY_ERROR_TEMP_1093`

### What changes were proposed in this pull request?
This pr aims reuse `INVALID_SCHEMA.NON_STRING_LITERAL` instead of 
`_LEGACY_ERROR_TEMP_1093`.

### Why are the changes needed?
Proper names of error classes to improve user experience with Spark SQL.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

Closes #38830 from LuciferYang/SPARK-41309.

Lead-authored-by: yangjie01 
Co-authored-by: YangJie 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json|  5 -
 .../apache/spark/sql/catalyst/expressions/ExprUtils.scala   |  2 +-
 .../apache/spark/sql/errors/QueryCompilationErrors.scala|  6 --
 .../test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala | 13 -
 .../scala/org/apache/spark/sql/JsonFunctionsSuite.scala | 13 -
 5 files changed, 17 insertions(+), 22 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 89728777201..cddb0848765 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -2215,11 +2215,6 @@
   "Cannot read table property '' as it's corrupted.."
 ]
   },
-  "_LEGACY_ERROR_TEMP_1093" : {
-"message" : [
-  "Schema should be specified in DDL format as a string literal or output 
of the schema_of_json/schema_of_csv functions instead of ."
-]
-  },
   "_LEGACY_ERROR_TEMP_1094" : {
 "message" : [
   "Schema should be struct type but got ."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
index e9084442b22..fbe3d5eb458 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala
@@ -39,7 +39,7 @@ object ExprUtils extends QueryErrorsBase {
 
   }
 } else {
-  throw QueryCompilationErrors.schemaNotFoldableError(exp)
+  throw QueryCompilationErrors.unexpectedSchemaTypeError(exp)
 }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index ce99bf4aa47..e5b1c3c100d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -1009,12 +1009,6 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
   messageParameters = Map("inputSchema" -> toSQLExpr(exp)))
   }
 
-  def schemaNotFoldableError(exp: Expression): Throwable = {
-new AnalysisException(
-  errorClass = "_LEGACY_ERROR_TEMP_1093",
-  messageParameters = Map("expr" -> exp.sql))
-  }
-
   def schemaIsNotStructTypeError(dataType: DataType): Throwable = {
 new AnalysisException(
   errorClass = "_LEGACY_ERROR_TEMP_1094",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
index 940eaaed6ac..ab4c148da04 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -357,11 +357,14 @@ class CsvFunctionsSuite extends QueryTest with 
SharedSparkSession {
   Seq("""1,"a).toDS().select(from_csv($"value", schema, options)),
   Row(Row(1, "a")))
 
-val errMsg = intercept[AnalysisException] {
-  Seq(("1", "i int")).toDF("csv", "schema")
-.select(from_csv($"csv", $"schema", options)).collect()
-}.getMessage
-assert(errMsg.contains("Schema should be specified in DDL format as a 
string literal"))
+checkError(
+  exception = intercept[AnalysisException] {
+Seq(("1", "i int")).toDF("csv", "schema")
+  .select(from_csv($"csv", $"schema", options)).collect()
+  },
+  errorClass = "INVALID_SCHEMA.NON_STRING_LITERAL",
+  parameters = Map("inputSchema" -> "\"schema\"")
+)
 
 checkError(
   exception = intercept[Ana

[spark] branch master updated (99de9879b6a -> 4043da8f230)

2022-11-29 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 99de9879b6a [SPARK-41316][SQL] Enable tail-recursion wherever possible
 add 4043da8f230 [SPARK-41317][CONNECT][PYTHON] Add basic support for 
DataFrameWriter

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/connect/dataframe.py|   6 +
 python/pyspark/sql/connect/plan.py |  86 ++
 python/pyspark/sql/connect/readwriter.py   | 921 -
 python/pyspark/sql/connect/session.py  |  37 +-
 python/pyspark/sql/readwriter.py   |  39 +-
 .../sql/tests/connect/test_connect_basic.py|  26 +
 .../sql/tests/connect/test_connect_plan_only.py|  35 +
 7 files changed, 1122 insertions(+), 28 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41316][SQL] Enable tail-recursion wherever possible

2022-11-29 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 99de9879b6a [SPARK-41316][SQL] Enable tail-recursion wherever possible
99de9879b6a is described below

commit 99de9879b6a3942d88c0644dc5fc8c75e682d925
Author: yangjie01 
AuthorDate: Tue Nov 29 19:20:05 2022 +0800

[SPARK-41316][SQL] Enable tail-recursion wherever possible

### What changes were proposed in this pull request?
Similar to SPARK-37783, this pr adds `scala.annotation.tailrec` inspected 
by IDE (IntelliJ),  these are new cases after Spark 3.3.

### Why are the changes needed?
To improve performance.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GItHub Actions

Closes #38835 from LuciferYang/SPARK-41316.

Authored-by: yangjie01 
Signed-off-by: Ruifeng Zheng 
---
 .../org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala | 4 
 .../scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 1 +
 .../main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala  | 1 +
 .../spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala   | 1 +
 4 files changed, 7 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index 1d5129ff7f0..df04a248a27 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
+import scala.annotation.tailrec
+
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate
 import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
@@ -116,6 +118,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
* do not add a subquery that might have an expensive computation
*/
   private def isSelectiveFilterOverScan(plan: LogicalPlan): Boolean = {
+@tailrec
 def isSelective(
 p: LogicalPlan,
 predicateReference: AttributeSet,
@@ -225,6 +228,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
   }
 
   // This checks if there is already a DPP filter, as this rule is called just 
after DPP.
+  @tailrec
   def hasDynamicPruningSubquery(
   left: LogicalPlan,
   right: LogicalPlan,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index ecb93f6b239..1f0fb667753 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1083,6 +1083,7 @@ object CollapseProject extends Rule[LogicalPlan] with 
AliasHelper {
   }
 
   private object ExtractOnlyRef {
+@scala.annotation.tailrec
 def unapply(expr: Expression): Option[Attribute] = expr match {
   case a: Alias => unapply(a.child)
   case e: ExtractValue => unapply(e.children.head)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index bbda9eb76b1..b029a3b0ce9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -157,6 +157,7 @@ object ScanOperation extends OperationHelper {
 }
 
 object NodeWithOnlyDeterministicProjectAndFilter {
+  @scala.annotation.tailrec
   def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
 case Project(projectList, child) if projectList.forall(_.deterministic) => 
unapply(child)
 case Filter(cond, child) if cond.deterministic => unapply(child)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
index 87b11da5d5c..49a6c7232ec 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
@@ -454,6 +454,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
 case other => (other, false)
   }
 
+  @scala.annotation.tailrec
   private def pushDownOffset(
   

[spark] branch master updated: [SPARK-41256][CONNECT][FOLLOWUP] Fix compile error

2022-11-29 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 86e1df4e44c [SPARK-41256][CONNECT][FOLLOWUP] Fix compile error
86e1df4e44c is described below

commit 86e1df4e44cb5b2ce3a8215eab6c89a7f46811f9
Author: Ruifeng Zheng 
AuthorDate: Tue Nov 29 18:47:06 2022 +0900

[SPARK-41256][CONNECT][FOLLOWUP] Fix compile error

### What changes were proposed in this pull request?


https://github.com/apache/spark/commit/4c35c5bb5e545acb2f46a80218f68e69c868b388 
made a breaking change in proto message, and cause 
https://github.com/apache/spark/commit/dcd9cae4a3cb715aa91e9679b0adc9aa43035b2f 
failed after merged

### Why are the changes needed?
to fix compile error:

```
[error] 
/__w/spark/spark/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala:441:67:
 value setI32 is not a member of 
org.apache.spark.connect.proto.Expression.Literal.Builder
[error]   
.setLiteral(proto.Expression.Literal.newBuilder.setI32(32
[error]   ^
[error] 
/__w/spark/spark/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala:446:67:
 value setI32 is not a member of 
org.apache.spark.connect.proto.Expression.Literal.Builder
[error]   
.setLiteral(proto.Expression.Literal.newBuilder.setI32(32)
[error]   ^
[error] 
/__w/spark/spark/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala:466:69:
 value setI32 is not a member of 
org.apache.spark.connect.proto.Expression.Literal.Builder
[error] 
.setLiteral(proto.Expression.Literal.newBuilder.setI32(32)
[error] 
^
[error] three errors found
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
local test

Closes #38837 from zhengruifeng/connect_hot_fix.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 .../apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
 
b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
index 007452cfa3b..8fbf2be3730 100644
--- 
a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
+++ 
b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
@@ -438,12 +438,12 @@ class SparkConnectPlannerSuite extends SparkFunSuite with 
SparkConnectPlanTest {
 .newBuilder()
 .addName("test")
 .setExpr(proto.Expression.newBuilder
-  .setLiteral(proto.Expression.Literal.newBuilder.setI32(32
+  
.setLiteral(proto.Expression.Literal.newBuilder.setInteger(32
   .addNameExprList(proto.Expression.Alias
 .newBuilder()
 .addName("test")
 .setExpr(proto.Expression.newBuilder
-  
.setLiteral(proto.Expression.Literal.newBuilder.setI32(32)
+  
.setLiteral(proto.Expression.Literal.newBuilder.setInteger(32)
   .build())
 }
   }
@@ -463,7 +463,7 @@ class SparkConnectPlannerSuite extends SparkFunSuite with 
SparkConnectPlanTest {
   .addName("part1")
   .addName("part2")
   .setExpr(proto.Expression.newBuilder
-
.setLiteral(proto.Expression.Literal.newBuilder.setI32(32)
+
.setLiteral(proto.Expression.Literal.newBuilder.setInteger(32)
   .build())
 }
 assert(e.getMessage.contains("part1, part2"))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41310][CONNECT][PYTHON] Implement DataFrame.toDF

2022-11-29 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4bfe7a77a62 [SPARK-41310][CONNECT][PYTHON] Implement DataFrame.toDF
4bfe7a77a62 is described below

commit 4bfe7a77a62494515aa5406e9966349588f7dd3b
Author: Rui Wang 
AuthorDate: Tue Nov 29 16:58:27 2022 +0800

[SPARK-41310][CONNECT][PYTHON] Implement DataFrame.toDF

### What changes were proposed in this pull request?

Implement DataFrame.toDF by reusing existing 
`RenameColumnsBySameLengthNames` proto.

### Why are the changes needed?

API coverage.

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

UT

Closes #38829 from amaliujia/toDF.

Authored-by: Rui Wang 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/connect/dataframe.py| 17 +
 python/pyspark/sql/connect/plan.py | 29 ++
 .../sql/tests/connect/test_connect_basic.py|  7 ++
 3 files changed, 53 insertions(+)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 62b989d35e4..dd67f5a8726 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -1244,6 +1244,23 @@ class DataFrame(object):
 query = self._plan.to_proto(self._session.client)
 return self._session.client._analyze(query).input_files
 
+def toDF(self, *cols: str) -> "DataFrame":
+"""Returns a new :class:`DataFrame` that with new specified column 
names
+
+Parameters
+--
+*cols : tuple
+a tuple of string new column name or :class:`Column`. The length 
of the
+list needs to be the same as the number of columns in the initial
+:class:`DataFrame`
+
+Returns
+---
+:class:`DataFrame`
+DataFrame with new column names.
+"""
+return DataFrame.withPlan(plan.RenameColumns(self._plan, list(cols)), 
self._session)
+
 def transform(self, func: Callable[..., "DataFrame"], *args: Any, 
**kwargs: Any) -> "DataFrame":
 """Returns a new :class:`DataFrame`. Concise syntax for chaining 
custom transformations.
 
diff --git a/python/pyspark/sql/connect/plan.py 
b/python/pyspark/sql/connect/plan.py
index 95b15fd0dc1..6e079655c00 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -1158,6 +1158,35 @@ class StatCrosstab(LogicalPlan):
 """
 
 
+class RenameColumns(LogicalPlan):
+def __init__(self, child: Optional["LogicalPlan"], cols: Sequence[str]) -> 
None:
+super().__init__(child)
+self._cols = cols
+
+def plan(self, session: "SparkConnectClient") -> proto.Relation:
+assert self._child is not None
+
+plan = proto.Relation()
+
plan.rename_columns_by_same_length_names.input.CopyFrom(self._child.plan(session))
+
plan.rename_columns_by_same_length_names.column_names.extend(self._cols)
+return plan
+
+def print(self, indent: int = 0) -> str:
+i = " " * indent
+return f"""{i}"""
+
+def _repr_html_(self) -> str:
+return f"""
+
+   
+  RenameColumns
+  cols: {self._cols} 
+  {self._child_repr_()}
+   
+
+"""
+
+
 class CreateView(LogicalPlan):
 def __init__(
 self, child: Optional["LogicalPlan"], name: str, is_global: bool, 
replace: bool
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py 
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 5d8d381dc87..378d6a4fe81 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -250,6 +250,13 @@ class SparkConnectTests(SparkConnectSQLTestCase):
 self.connect.sql(query).schema.__repr__(),
 )
 
+def test_toDF(self):
+# SPARK-41310: test DataFrame.toDF()
+self.assertEqual(
+self.connect.read.table(self.tbl_name).toDF("col1", "col2").schema,
+self.spark.read.table(self.tbl_name).toDF("col1", "col2").schema,
+)
+
 def test_print_schema(self):
 # SPARK-41216: Test print schema
 tree_str = self.connect.sql("SELECT 1 AS X, 2 AS Y")._tree_string()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (5b7f89cee8e -> dcd9cae4a3c)

2022-11-29 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 5b7f89cee8e [MINOR][PYTHON][DOCS] Fix types and docstring in 
DataFrame.toDF
 add dcd9cae4a3c [SPARK-41256][CONNECT] Implement DataFrame.withColumn(s)

No new revisions were added by this update.

Summary of changes:
 .../main/protobuf/spark/connect/relations.proto|  18 +++
 .../org/apache/spark/sql/connect/dsl/package.scala |  13 ++
 .../sql/connect/planner/SparkConnectPlanner.scala  |  20 +++
 .../connect/planner/SparkConnectPlannerSuite.scala |  46 +-
 .../connect/planner/SparkConnectProtoSuite.scala   |   6 +
 python/pyspark/sql/connect/dataframe.py|  57 
 python/pyspark/sql/connect/plan.py |  34 +
 python/pyspark/sql/connect/proto/relations_pb2.py  | 162 +++--
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  56 +++
 .../sql/tests/connect/test_connect_basic.py|  26 +++-
 10 files changed, 362 insertions(+), 76 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org