[spark] branch branch-3.3 updated: [SPARK-40132][ML] Restore rawPredictionCol to MultilayerPerceptronClassifier.setParams

2022-08-17 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new e1c5f90c700 [SPARK-40132][ML] Restore rawPredictionCol to 
MultilayerPerceptronClassifier.setParams
e1c5f90c700 is described below

commit e1c5f90c700d844aa56c211e53eb75d0aa99b9ad
Author: Sean Owen 
AuthorDate: Thu Aug 18 00:23:52 2022 -0500

[SPARK-40132][ML] Restore rawPredictionCol to 
MultilayerPerceptronClassifier.setParams

### What changes were proposed in this pull request?

Restore rawPredictionCol to MultilayerPerceptronClassifier.setParams

### Why are the changes needed?

This param was inadvertently removed in the refactoring in 
https://github.com/apache/spark/commit/40cdb6d51c2befcfeac8fb5cf5faf178d1a5ee7b#r81473316
Without it, using this param in the constructor fails.

### Does this PR introduce _any_ user-facing change?

Not aside from the bug fix.

### How was this patch tested?

Existing tests.

Closes #37561 from srowen/SPARK-40132.

Authored-by: Sean Owen 
Signed-off-by: Sean Owen 
(cherry picked from commit 6768d9cc38a320f7e1c6781afcd170577c5c7d0f)
Signed-off-by: Sean Owen 
---
 python/pyspark/ml/classification.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/python/pyspark/ml/classification.py 
b/python/pyspark/ml/classification.py
index 40a2a87c5db..c09a510d76b 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -3230,6 +3230,7 @@ class MultilayerPerceptronClassifier(
 solver: str = "l-bfgs",
 initialWeights: Optional[Vector] = None,
 probabilityCol: str = "probability",
+rawPredictionCol: str = "rawPrediction",
 ) -> "MultilayerPerceptronClassifier":
 """
 setParams(self, \\*, featuresCol="features", labelCol="label", 
predictionCol="prediction", \


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-40132][ML] Restore rawPredictionCol to MultilayerPerceptronClassifier.setParams

2022-08-17 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6768d9cc38a [SPARK-40132][ML] Restore rawPredictionCol to 
MultilayerPerceptronClassifier.setParams
6768d9cc38a is described below

commit 6768d9cc38a320f7e1c6781afcd170577c5c7d0f
Author: Sean Owen 
AuthorDate: Thu Aug 18 00:23:52 2022 -0500

[SPARK-40132][ML] Restore rawPredictionCol to 
MultilayerPerceptronClassifier.setParams

### What changes were proposed in this pull request?

Restore rawPredictionCol to MultilayerPerceptronClassifier.setParams

### Why are the changes needed?

This param was inadvertently removed in the refactoring in 
https://github.com/apache/spark/commit/40cdb6d51c2befcfeac8fb5cf5faf178d1a5ee7b#r81473316
Without it, using this param in the constructor fails.

### Does this PR introduce _any_ user-facing change?

Not aside from the bug fix.

### How was this patch tested?

Existing tests.

Closes #37561 from srowen/SPARK-40132.

Authored-by: Sean Owen 
Signed-off-by: Sean Owen 
---
 python/pyspark/ml/classification.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/python/pyspark/ml/classification.py 
b/python/pyspark/ml/classification.py
index 40a2a87c5db..c09a510d76b 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -3230,6 +3230,7 @@ class MultilayerPerceptronClassifier(
 solver: str = "l-bfgs",
 initialWeights: Optional[Vector] = None,
 probabilityCol: str = "probability",
+rawPredictionCol: str = "rawPrediction",
 ) -> "MultilayerPerceptronClassifier":
 """
 setParams(self, \\*, featuresCol="features", labelCol="label", 
predictionCol="prediction", \


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-38946][PYTHON][PS] Generates a new dataframe instead of operating inplace in setitem

2022-08-17 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 532c5005f2f [SPARK-38946][PYTHON][PS] Generates a new dataframe 
instead of operating inplace in setitem
532c5005f2f is described below

commit 532c5005f2fd82d714d14a815f435ef48fecc205
Author: Yikun Jiang 
AuthorDate: Thu Aug 18 12:34:51 2022 +0900

[SPARK-38946][PYTHON][PS] Generates a new dataframe instead of operating 
inplace in setitem

### What changes were proposed in this pull request?

Generates a new dataframe instead of operating inplace in setitem

### Why are the changes needed?
Make CI passed in with pandas 1.4.3

Since pandas 1.4.0 
https://github.com/pandas-dev/pandas/commit/03dd698bc1e84c35aba8b51bdd45c472860b9ec3
 , dataframe.setitem should always make a copy and never write into the 
existing array.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
CI test with current pandas (1.3.x) and latest pandas 1.4.2, 1.4.3

Closes #36353 from Yikun/SPARK-38946.

Authored-by: Yikun Jiang 
Signed-off-by: Hyukjin Kwon 
---
 .../source/migration_guide/pyspark_3.3_to_3.4.rst  |  2 +
 python/pyspark/pandas/frame.py | 26 +
 python/pyspark/pandas/tests/test_dataframe.py  | 43 --
 3 files changed, 53 insertions(+), 18 deletions(-)

diff --git a/python/docs/source/migration_guide/pyspark_3.3_to_3.4.rst 
b/python/docs/source/migration_guide/pyspark_3.3_to_3.4.rst
index dbe7b818b2a..b3baa8345aa 100644
--- a/python/docs/source/migration_guide/pyspark_3.3_to_3.4.rst
+++ b/python/docs/source/migration_guide/pyspark_3.3_to_3.4.rst
@@ -37,3 +37,5 @@ Upgrading from PySpark 3.3 to 3.4
 * In Spark 3.4, the infer schema process of ``groupby.apply`` in Pandas on 
Spark, will first infer the pandas type to ensure the accuracy of the pandas 
``dtype`` as much as possible.
 
 * In Spark 3.4, the ``Series.concat`` sort parameter will be respected to 
follow pandas 1.4 behaviors.
+
+* In Spark 3.4, the ``DataFrame.__setitem__`` will make a copy and replace 
pre-existing arrays, which will NOT be over-written to follow pandas 1.4 
behaviors.
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index b3ded9885fc..fb4c3368057 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -498,20 +498,30 @@ class DataFrame(Frame, Generic[T]):
 return cast(InternalFrame, self._internal_frame)  # type: 
ignore[has-type]
 
 def _update_internal_frame(
-self, internal: InternalFrame, requires_same_anchor: bool = True
+self,
+internal: InternalFrame,
+requires_same_anchor: bool = True,
+anchor_force_disconnect: bool = False,
 ) -> None:
 """
 Update InternalFrame with the given one.
 
-If the column_label is changed or the new InternalFrame is not the 
same `anchor`,
-disconnect the link to the Series and create a new one.
+If the column_label is changed or the new InternalFrame is not the 
same `anchor` or the
+`anchor_force_disconnect` flag is set to True, disconnect the original 
anchor and create
+a new one.
 
 If `requires_same_anchor` is `False`, checking whether or not the same 
anchor is ignored
 and force to update the InternalFrame, e.g., replacing the internal 
with the resolved_copy,
 updating the underlying Spark DataFrame which need to combine a 
different Spark DataFrame.
 
-:param internal: the new InternalFrame
-:param requires_same_anchor: whether checking the same anchor
+Parameters
+--
+internal : InternalFrame
+The new InternalFrame
+requires_same_anchor : bool
+Whether checking the same anchor
+anchor_force_disconnect : bool
+Force to disconnect the original anchor and create a new one
 """
 from pyspark.pandas.series import Series
 
@@ -527,7 +537,7 @@ class DataFrame(Frame, Generic[T]):
 renamed = old_label != new_label
 not_same_anchor = requires_same_anchor and not 
same_anchor(internal, psser)
 
-if renamed or not_same_anchor:
+if renamed or not_same_anchor or anchor_force_disconnect:
 psdf: DataFrame = 
DataFrame(self._internal.select_column(old_label))
 psser._update_anchor(psdf)
 psser = None
@@ -12903,7 +12913,9 @@ defaultdict(, {'col..., 'col...})]
 # Same Series.
 psdf = self._assign({key: value})
 
-self._update_internal_frame(psdf._internal)
+# Since Spark 3.4, df.__setitem__ generates a new 

[spark] branch branch-3.1 updated: [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF

2022-08-17 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new 07cc6a8963d [SPARK-40121][PYTHON][SQL] Initialize projection used for 
Python UDF
07cc6a8963d is described below

commit 07cc6a8963d9bd26d5ec0738ca4fa4767cbfac63
Author: Hyukjin Kwon 
AuthorDate: Thu Aug 18 12:23:02 2022 +0900

[SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF

This PR proposes to initialize the projection so non-deterministic 
expressions can be evaluated with Python UDFs.

To make the Python UDF working with non-deterministic expressions.

Yes.

```python
from pyspark.sql.functions import udf, rand
spark.range(10).select(udf(lambda x: x, "double")(rand())).show()
```

**Before**

```
java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$10(EvalPythonExec.scala:126)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at 
scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
at 
scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1213)
```

**After**

```
+--+
|rand(-2507211707257730645)|
+--+
|0.7691724424045242|
|   0.09602244075319044|
|0.3006471278112862|
|0.4182649571961977|
|   0.29349096650900974|
|0.7987097908937618|
|0.5324802583101007|
|  0.72460930912789|
|0.1367749768412846|
|   0.17277322931919348|
+--+
```

Manually tested, and unittest was added.

Closes #37552 from HyukjinKwon/SPARK-40121.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 336c9bc535895530cc3983b24e7507229fa9570d)
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/tests/test_udf.py  | 8 +++-
 .../org/apache/spark/sql/execution/python/EvalPythonExec.scala| 1 +
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/tests/test_udf.py 
b/python/pyspark/sql/tests/test_udf.py
index 0d13361dcab..47d5efd441f 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -23,7 +23,7 @@ import unittest
 
 from pyspark import SparkContext
 from pyspark.sql import SparkSession, Column, Row
-from pyspark.sql.functions import udf
+from pyspark.sql.functions import udf, rand
 from pyspark.sql.udf import UserDefinedFunction
 from pyspark.sql.types import StringType, IntegerType, BooleanType, 
DoubleType, LongType, \
 ArrayType, StructType, StructField
@@ -685,6 +685,12 @@ class UDFTests(ReusedSQLTestCase):
 self.assertEqual(result.collect(),
  [Row(c1=Row(_1=1.0, _2=1.0), c2=Row(_1=1, _2=1), 
c3=1.0, c4=1)])
 
+def test_udf_with_rand(self):
+# SPARK-40121: rand() with Python UDF.
+self.assertEqual(
+len(self.spark.range(10).select(udf(lambda x: x, 
DoubleType())(rand())).collect()), 10
+)
+
 
 class UDFInitializationTests(unittest.TestCase):
 def tearDown(self):
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
index fca43e454bf..1447788a609 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
@@ -116,6 +116,7 @@ trait EvalPythonExec extends UnaryExecNode {
 }.toArray
   }.toArray
   val projection = MutableProjection.create(allInputs.toSeq, child.output)
+  projection.initialize(context.partitionId())
   val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
 StructField(s"_$i", dt)
   }.toSeq)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF

2022-08-17 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new fcec11ad932 [SPARK-40121][PYTHON][SQL] Initialize projection used for 
Python UDF
fcec11ad932 is described below

commit fcec11ad9329553f4bea024227bdc6468da85278
Author: Hyukjin Kwon 
AuthorDate: Thu Aug 18 12:23:02 2022 +0900

[SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF

This PR proposes to initialize the projection so non-deterministic 
expressions can be evaluated with Python UDFs.

To make the Python UDF working with non-deterministic expressions.

Yes.

```python
from pyspark.sql.functions import udf, rand
spark.range(10).select(udf(lambda x: x, "double")(rand())).show()
```

**Before**

```
java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$10(EvalPythonExec.scala:126)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at 
scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
at 
scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1213)
```

**After**

```
+--+
|rand(-2507211707257730645)|
+--+
|0.7691724424045242|
|   0.09602244075319044|
|0.3006471278112862|
|0.4182649571961977|
|   0.29349096650900974|
|0.7987097908937618|
|0.5324802583101007|
|  0.72460930912789|
|0.1367749768412846|
|   0.17277322931919348|
+--+
```

Manually tested, and unittest was added.

Closes #37552 from HyukjinKwon/SPARK-40121.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 336c9bc535895530cc3983b24e7507229fa9570d)
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/tests/test_udf.py  | 8 +++-
 .../org/apache/spark/sql/execution/python/EvalPythonExec.scala| 1 +
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/tests/test_udf.py 
b/python/pyspark/sql/tests/test_udf.py
index fc475f1121d..5e6738a2f8e 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -23,7 +23,7 @@ import unittest
 
 from pyspark import SparkContext
 from pyspark.sql import SparkSession, Column, Row
-from pyspark.sql.functions import udf
+from pyspark.sql.functions import udf, rand
 from pyspark.sql.udf import UserDefinedFunction
 from pyspark.sql.types import StringType, IntegerType, BooleanType, 
DoubleType, LongType, \
 ArrayType, StructType, StructField
@@ -705,6 +705,12 @@ class UDFTests(ReusedSQLTestCase):
 finally:
 shutil.rmtree(path)
 
+def test_udf_with_rand(self):
+# SPARK-40121: rand() with Python UDF.
+self.assertEqual(
+len(self.spark.range(10).select(udf(lambda x: x, 
DoubleType())(rand())).collect()), 10
+)
+
 
 class UDFInitializationTests(unittest.TestCase):
 def tearDown(self):
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
index fca43e454bf..1447788a609 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
@@ -116,6 +116,7 @@ trait EvalPythonExec extends UnaryExecNode {
 }.toArray
   }.toArray
   val projection = MutableProjection.create(allInputs.toSeq, child.output)
+  projection.initialize(context.partitionId())
   val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
 StructField(s"_$i", dt)
   }.toSeq)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF

2022-08-17 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 336c9bc5358 [SPARK-40121][PYTHON][SQL] Initialize projection used for 
Python UDF
336c9bc5358 is described below

commit 336c9bc535895530cc3983b24e7507229fa9570d
Author: Hyukjin Kwon 
AuthorDate: Thu Aug 18 12:23:02 2022 +0900

[SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF

### What changes were proposed in this pull request?

This PR proposes to initialize the projection so non-deterministic 
expressions can be evaluated with Python UDFs.

### Why are the changes needed?

To make the Python UDF working with non-deterministic expressions.

### Does this PR introduce _any_ user-facing change?

Yes.

```python
from pyspark.sql.functions import udf, rand
spark.range(10).select(udf(lambda x: x, "double")(rand())).show()
```

**Before**

```
java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$10(EvalPythonExec.scala:126)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at 
scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
at 
scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1213)
```

**After**

```
+--+
|rand(-2507211707257730645)|
+--+
|0.7691724424045242|
|   0.09602244075319044|
|0.3006471278112862|
|0.4182649571961977|
|   0.29349096650900974|
|0.7987097908937618|
|0.5324802583101007|
|  0.72460930912789|
|0.1367749768412846|
|   0.17277322931919348|
+--+
```

### How was this patch tested?

Manually tested, and unittest was added.

Closes #37552 from HyukjinKwon/SPARK-40121.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/tests/test_udf.py  | 8 +++-
 .../org/apache/spark/sql/execution/python/EvalPythonExec.scala| 1 +
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/tests/test_udf.py 
b/python/pyspark/sql/tests/test_udf.py
index ba9cfec4600..03bcbaf6ddf 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -24,7 +24,7 @@ import datetime
 
 from pyspark import SparkContext, SQLContext
 from pyspark.sql import SparkSession, Column, Row
-from pyspark.sql.functions import udf, assert_true, lit
+from pyspark.sql.functions import udf, assert_true, lit, rand
 from pyspark.sql.udf import UserDefinedFunction
 from pyspark.sql.types import (
 StringType,
@@ -797,6 +797,12 @@ class UDFTests(ReusedSQLTestCase):
 finally:
 shutil.rmtree(path)
 
+def test_udf_with_rand(self):
+# SPARK-40121: rand() with Python UDF.
+self.assertEqual(
+len(self.spark.range(10).select(udf(lambda x: x, 
DoubleType())(rand())).collect()), 10
+)
+
 
 class UDFInitializationTests(unittest.TestCase):
 def tearDown(self):
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
index c567a70e1d3..f117a408566 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
@@ -116,6 +116,7 @@ trait EvalPythonExec extends UnaryExecNode {
 }.toArray
   }.toArray
   val projection = MutableProjection.create(allInputs.toSeq, child.output)
+  projection.initialize(context.partitionId())
   val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
 StructField(s"_$i", dt)
   }.toSeq)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF

2022-08-17 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 1a01a492c05 [SPARK-40121][PYTHON][SQL] Initialize projection used for 
Python UDF
1a01a492c05 is described below

commit 1a01a492c051bb861c480f224a3c310e133e4d01
Author: Hyukjin Kwon 
AuthorDate: Thu Aug 18 12:23:02 2022 +0900

[SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF

### What changes were proposed in this pull request?

This PR proposes to initialize the projection so non-deterministic 
expressions can be evaluated with Python UDFs.

### Why are the changes needed?

To make the Python UDF working with non-deterministic expressions.

### Does this PR introduce _any_ user-facing change?

Yes.

```python
from pyspark.sql.functions import udf, rand
spark.range(10).select(udf(lambda x: x, "double")(rand())).show()
```

**Before**

```
java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$10(EvalPythonExec.scala:126)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at 
scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
at 
scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1213)
```

**After**

```
+--+
|rand(-2507211707257730645)|
+--+
|0.7691724424045242|
|   0.09602244075319044|
|0.3006471278112862|
|0.4182649571961977|
|   0.29349096650900974|
|0.7987097908937618|
|0.5324802583101007|
|  0.72460930912789|
|0.1367749768412846|
|   0.17277322931919348|
+--+
```

### How was this patch tested?

Manually tested, and unittest was added.

Closes #37552 from HyukjinKwon/SPARK-40121.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 336c9bc535895530cc3983b24e7507229fa9570d)
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/tests/test_udf.py  | 8 +++-
 .../org/apache/spark/sql/execution/python/EvalPythonExec.scala| 1 +
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/tests/test_udf.py 
b/python/pyspark/sql/tests/test_udf.py
index 40deac992c4..34ac08cb818 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -24,7 +24,7 @@ import datetime
 
 from pyspark import SparkContext, SQLContext
 from pyspark.sql import SparkSession, Column, Row
-from pyspark.sql.functions import udf, assert_true, lit
+from pyspark.sql.functions import udf, assert_true, lit, rand
 from pyspark.sql.udf import UserDefinedFunction
 from pyspark.sql.types import (
 StringType,
@@ -798,6 +798,12 @@ class UDFTests(ReusedSQLTestCase):
 finally:
 shutil.rmtree(path)
 
+def test_udf_with_rand(self):
+# SPARK-40121: rand() with Python UDF.
+self.assertEqual(
+len(self.spark.range(10).select(udf(lambda x: x, 
DoubleType())(rand())).collect()), 10
+)
+
 
 class UDFInitializationTests(unittest.TestCase):
 def tearDown(self):
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
index c567a70e1d3..f117a408566 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
@@ -116,6 +116,7 @@ trait EvalPythonExec extends UnaryExecNode {
 }.toArray
   }.toArray
   val projection = MutableProjection.create(allInputs.toSeq, child.output)
+  projection.initialize(context.partitionId())
   val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
 StructField(s"_$i", dt)
   }.toSeq)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-40128][SQL] Make the VectorizedColumnReader recognize DELTA_LENGTH_BYTE_ARRAY as a standalone column encoding

2022-08-17 Thread sunchao
This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 01f9d270ea1 [SPARK-40128][SQL] Make the VectorizedColumnReader 
recognize DELTA_LENGTH_BYTE_ARRAY as a standalone column encoding
01f9d270ea1 is described below

commit 01f9d270ea14d4a9b3a5f326fc3f721ddd23e3f4
Author: Dennis Huo 
AuthorDate: Wed Aug 17 16:21:43 2022 -0700

[SPARK-40128][SQL] Make the VectorizedColumnReader recognize 
DELTA_LENGTH_BYTE_ARRAY as a standalone column encoding

### What changes were proposed in this pull request?
Add DELTA_LENGTH_BYTE_ARRAY as a recognized encoding in 
VectorizedColumnReader so that
vectorized reads succeed when there are columns using 
DELTA_LENGTH_BYTE_ARRAY as a standalone
encoding.

### Why are the changes needed?

Spark currently throws an exception for DELTA_LENGTH_BYTE_ARRAY columns 
when vectorized
reads are enabled and trying to read `delta_length_byte_array.parquet` from 
https://github.com/apache/parquet-testing:

java.lang.UnsupportedOperationException: Unsupported encoding: 
DELTA_LENGTH_BYTE_ARRAY

### Does this PR introduce _any_ user-facing change?
Yes - previously throw UNSUPPORTED exception. Now reads the encoding same 
as if vectorized reads are disabled.

### How was this patch tested?
Added test case to ParquetIOSuite; made sure it fails without the fix to 
VectorizedColumnReader and passes after.

Closes #37557 from sfc-gh-dhuo/support-parquet-delta-length-byte-array.

Authored-by: Dennis Huo 
Signed-off-by: Chao Sun 
---
 .../datasources/parquet/VectorizedColumnReader.java  |   2 ++
 .../resources/test-data/delta_length_byte_array.parquet  | Bin 0 -> 3072 bytes
 .../execution/datasources/parquet/ParquetIOSuite.scala   |  10 ++
 3 files changed, 12 insertions(+)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index c2e85da3884..64178fdd72d 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -329,6 +329,8 @@ public class VectorizedColumnReader {
 return new VectorizedPlainValuesReader();
   case DELTA_BYTE_ARRAY:
 return new VectorizedDeltaByteArrayReader();
+  case DELTA_LENGTH_BYTE_ARRAY:
+return new VectorizedDeltaLengthByteArrayReader();
   case DELTA_BINARY_PACKED:
 return new VectorizedDeltaBinaryPackedReader();
   case RLE:
diff --git 
a/sql/core/src/test/resources/test-data/delta_length_byte_array.parquet 
b/sql/core/src/test/resources/test-data/delta_length_byte_array.parquet
new file mode 100644
index 000..ead505a1a1f
Binary files /dev/null and 
b/sql/core/src/test/resources/test-data/delta_length_byte_array.parquet differ
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 5a8f4563756..0458e5a1a14 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -1307,6 +1307,16 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
 }
   }
 
+  test("SPARK-40128 read DELTA_LENGTH_BYTE_ARRAY encoded strings") {
+withAllParquetReaders {
+  checkAnswer(
+// "fruit" column in this file is encoded using 
DELTA_LENGTH_BYTE_ARRAY.
+// The file comes from https://github.com/apache/parquet-testing
+readResourceParquetFile("test-data/delta_length_byte_array.parquet"),
+(0 to 999).map(i => Row("apple_banana_mango" + Integer.toString(i * 
i
+}
+  }
+
   test("SPARK-12589 copy() on rows returned from reader works for strings") {
 withTempPath { dir =>
   val data = (1, "abc") ::(2, "helloabcde") :: Nil


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-40110][SQL][TESTS] Add JDBCWithAQESuite

2022-08-17 Thread sunchao
This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 44f30a04dad [SPARK-40110][SQL][TESTS] Add JDBCWithAQESuite
44f30a04dad is described below

commit 44f30a04dad2baa471b505f95c6a29992ee7ca72
Author: Kazuyuki Tanimura 
AuthorDate: Wed Aug 17 15:32:46 2022 -0700

[SPARK-40110][SQL][TESTS] Add JDBCWithAQESuite

### What changes were proposed in this pull request?
This PR proposes to add `JDBCWithAQESuite` i.e. test cases of `JDBCSuite` 
with AQE (Adaptive Query Execution) enabled.

### Why are the changes needed?
Currently `JDBCSuite` assumes that AQE is always turned off. We should also 
test with AQE turned on

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added the AQE version tests along with the non AQE version

Closes #37544 from kazuyukitanimura/SPARK-40110.

Authored-by: Kazuyuki Tanimura 
Signed-off-by: Chao Sun 
---
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala  | 32 --
 1 file changed, 23 insertions(+), 9 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index b87fee6cec2..8eda0c288a3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.logical.ShowCreateTable
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
DateTimeTestUtils}
 import org.apache.spark.sql.execution.{DataSourceScanExec, ExtendedMode}
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, 
EnableAdaptiveExecutionSuite}
 import org.apache.spark.sql.execution.command.{ExplainCommand, 
ShowCreateTableCommand}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, 
JDBCPartition, JDBCRelation, JdbcUtils}
@@ -44,7 +45,8 @@ import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
-class JDBCSuite extends QueryTest with SharedSparkSession {
+class JDBCSuite extends QueryTest with SharedSparkSession
+  with AdaptiveSparkPlanHelper with DisableAdaptiveExecutionSuite {
   import testImplicits._
 
   val url = "jdbc:h2:mem:testdb0"
@@ -298,10 +300,15 @@ class JDBCSuite extends QueryTest with SharedSparkSession 
{
 val parentPlan = df.queryExecution.executedPlan
 // Check if SparkPlan Filter is removed in a physical plan and
 // the plan only has PhysicalRDD to scan JDBCRelation.
-
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
-val node = 
parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
-
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec])
-
assert(node.child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation"))
+val child = if (df.sqlContext.conf.adaptiveExecutionEnabled) {
+  assert(parentPlan.isInstanceOf[AdaptiveSparkPlanExec])
+  parentPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+} else {
+  
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
+  
parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec].child
+}
+
assert(child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec])
+
assert(child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation"))
 df
   }
 
@@ -309,9 +316,14 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
 val parentPlan = df.queryExecution.executedPlan
 // Check if SparkPlan Filter is not removed in a physical plan because 
JDBCRDD
 // cannot compile given predicates.
-
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
-val node = 
parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
-assert(node.child.isInstanceOf[org.apache.spark.sql.execution.FilterExec])
+val child = if (df.sqlContext.conf.adaptiveExecutionEnabled) {
+  assert(parentPlan.isInstanceOf[AdaptiveSparkPlanExec])
+  parentPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+} else {
+  
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
+  
parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec].child
+}
+assert(child.isInstanceOf[org.apache.spark.sql.execution.FilterExec])
 df
   }
 
@@ -1767,7 

[spark] branch master updated: [SPARK-40109][SQL] New SQL function: get()

2022-08-17 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b6cf3e4598f [SPARK-40109][SQL] New SQL function: get()
b6cf3e4598f is described below

commit b6cf3e4598fb6ae9f9ed28c7d5a0d4152453a669
Author: Gengliang Wang 
AuthorDate: Wed Aug 17 15:03:17 2022 -0700

[SPARK-40109][SQL] New SQL function: get()

### What changes were proposed in this pull request?

Introduce a new SQL function `get()`: Returns element of array at given 
(0-based) index. If the index points
 outside of the array boundaries, then this function returns NULL.
Examples:
```
> SELECT _FUNC_(array(1, 2, 3), 2);
  2
> SELECT _FUNC_(array(1, 2, 3), 3);
  NULL
> SELECT _FUNC_(array(1, 2, 3), -1);
  NULL
```
### Why are the changes needed?

Currently, when accessing array element with invalid index under ANSI SQL 
mode, the error is like:
```
[INVALID_ARRAY_INDEX] The index -1 is out of bounds. The array has 3 
elements. Use `try_element_at` and increase the array index by 1(the starting 
array index is 1 for `try_element_at`) to tolerate accessing element at invalid 
index and return NULL instead. If necessary set "spark.sql.ansi.enabled" to 
"false" to bypass this error.
```
The provided solution is complicated. I suggest introducing a new method 
`get()` which always returns null on an invalid array index. This is from 
https://docs.snowflake.com/en/sql-reference/functions/get.html.

Note: since Spark's map access always returns null, let's don't support map 
type in the get method for now.

### Does this PR introduce _any_ user-facing change?

Yes, a new SQL function `get()`: returns element of array at given 
(0-based) index. If the index points
 outside of the array boundaries, then this function returns NULL.

### How was this patch tested?

New UT

Closes #37541 from gengliangwang/addGetMethod.

Lead-authored-by: Gengliang Wang 
Co-authored-by: Gengliang Wang 
Signed-off-by: Gengliang Wang 
---
 core/src/main/resources/error/error-classes.json   |  2 +-
 .../sql/catalyst/analysis/FunctionRegistry.scala   |  1 +
 .../expressions/collectionOperations.scala | 36 +++
 .../sql-functions/sql-expression-schema.md |  1 +
 .../src/test/resources/sql-tests/inputs/array.sql  |  6 
 .../resources/sql-tests/results/ansi/array.sql.out | 42 +++---
 .../test/resources/sql-tests/results/array.sql.out | 32 +
 7 files changed, 114 insertions(+), 6 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index c2c5f30564c..3f6c1ca0362 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -224,7 +224,7 @@
   },
   "INVALID_ARRAY_INDEX" : {
 "message" : [
-  "The index  is out of bounds. The array has  
elements. Use `try_element_at` and increase the array index by 1(the starting 
array index is 1 for `try_element_at`) to tolerate accessing element at invalid 
index and return NULL instead. If necessary set  to \"false\" to 
bypass this error."
+  "The index  is out of bounds. The array has  
elements. Use the SQL function `get()` to tolerate accessing element at invalid 
index and return NULL instead. If necessary set  to \"false\" to 
bypass this error."
 ]
   },
   "INVALID_ARRAY_INDEX_IN_ELEMENT_AT" : {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index b655c45bd5f..42f3ca041b8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -702,6 +702,7 @@ object FunctionRegistry {
 expression[TransformKeys]("transform_keys"),
 expression[MapZipWith]("map_zip_with"),
 expression[ZipWith]("zip_with"),
+expression[Get]("get"),
 
 CreateStruct.registryEntry,
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index 3090916582e..40eade75578 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -2072,6 +2072,42 @@ case class ArrayPosition(left: Expression, right: 
Expression)
 copy(left = newLeft, right = newRight)
 }
 
+/**
+ * Returns the value of 

[spark] branch master updated (103768bcb8b -> eef7596d615)

2022-08-17 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 103768bcb8b [SPARK-40105][SQL] Improve repartition in 
ReplaceCTERefWithRepartition
 add eef7596d615 [SPARK-40114][R] Arrow 9.0.0 support with SparkR

No new revisions were added by this update.

Summary of changes:
 R/pkg/R/serialize.R | 2 +-
 appveyor.yml| 3 +--
 2 files changed, 2 insertions(+), 3 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-40105][SQL] Improve repartition in ReplaceCTERefWithRepartition

2022-08-17 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 103768bcb8b [SPARK-40105][SQL] Improve repartition in 
ReplaceCTERefWithRepartition
103768bcb8b is described below

commit 103768bcb8bb98ae1b55d449f4f7edf215f3a72c
Author: ulysses-you 
AuthorDate: Wed Aug 17 17:46:19 2022 +0800

[SPARK-40105][SQL] Improve repartition in ReplaceCTERefWithRepartition

### What changes were proposed in this pull request?

- skip adding a repartition if the top level node of CTE is rebalance
- use RepartitionByExpression instead of Repartition so that AQE can 
coalesce the shuffle partition

### Why are the changes needed?

If cte can not inlined, the ReplaceCTERefWithRepartition will add 
repartition to force a shuffle so that the reference can reuse shuffle exchange.
The added repartition should be optimized by AQE for better performance.

If the user has specified a rebalance, the ReplaceCTERefWithRepartition 
should skip add repartition.

### Does this PR introduce _any_ user-facing change?

no, only improve performance

### How was this patch tested?

add test

Closes #37537 from ulysses-you/cte.

Authored-by: ulysses-you 
Signed-off-by: Wenchen Fan 
---
 .../optimizer/ReplaceCTERefWithRepartition.scala   | 10 ++--
 .../org/apache/spark/sql/CTEInlineSuite.scala  | 30 +++---
 2 files changed, 35 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceCTERefWithRepartition.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceCTERefWithRepartition.scala
index 0190fa2a2ab..c01372c71ab 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceCTERefWithRepartition.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceCTERefWithRepartition.scala
@@ -41,6 +41,12 @@ object ReplaceCTERefWithRepartition extends 
Rule[LogicalPlan] {
   replaceWithRepartition(plan, mutable.HashMap.empty[Long, LogicalPlan])
   }
 
+  private def canSkipExtraRepartition(p: LogicalPlan): Boolean = p match {
+case _: RepartitionOperation => true
+case _: RebalancePartitions => true
+case _ => false
+  }
+
   private def replaceWithRepartition(
   plan: LogicalPlan,
   cteMap: mutable.HashMap[Long, LogicalPlan]): LogicalPlan = plan match {
@@ -48,12 +54,12 @@ object ReplaceCTERefWithRepartition extends 
Rule[LogicalPlan] {
   cteDefs.foreach { cteDef =>
 val inlined = replaceWithRepartition(cteDef.child, cteMap)
 val withRepartition =
-  if (inlined.isInstanceOf[RepartitionOperation] || 
cteDef.underSubquery) {
+  if (canSkipExtraRepartition(inlined) || cteDef.underSubquery) {
 // If the CTE definition plan itself is a repartition operation or 
if it hosts a merged
 // scalar subquery, we do not need to add an extra repartition 
shuffle.
 inlined
   } else {
-Repartition(conf.numShufflePartitions, shuffle = true, inlined)
+RepartitionByExpression(Seq.empty, inlined, None)
   }
 cteMap.put(cteDef.id, withRepartition)
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
index ee000bce1fc..26d165b460a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.expressions.{And, GreaterThan, LessThan, 
Literal, Or}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project, 
RepartitionOperation, WithCTE}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project, 
RebalancePartitions, RepartitionByExpression, RepartitionOperation, WithCTE}
 import org.apache.spark.sql.execution.adaptive._
 import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 import org.apache.spark.sql.internal.SQLConf
@@ -42,8 +42,13 @@ abstract class CTEInlineSuiteBase
|select * from v except select * from v
  """.stripMargin)
   checkAnswer(df, Nil)
+
+  val r = df.queryExecution.optimizedPlan.find {
+case RepartitionByExpression(p, _, None) => p.isEmpty
+case _ => false
+  }
   assert(
-
df.queryExecution.optimizedPlan.exists(_.isInstanceOf[RepartitionOperation]),
+r.isDefined,
 "Non-deterministic With-CTE with multiple references should be not 
inlined.")
 }
   }
@@ -485,4 +490,23 @@ abstract class CTEInlineSuiteBase
 
 class CTEInlineSuiteAEOff extends CTEInlineSuiteBase 

[spark] branch master updated: [SPARK-40066][SQL][FOLLOW-UP] Check if ElementAt is resolved before getting its dataType

2022-08-17 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new fbc0edac585 [SPARK-40066][SQL][FOLLOW-UP] Check if ElementAt is 
resolved before getting its dataType
fbc0edac585 is described below

commit fbc0edac5859dae6b2c9ad012d3932f54196f2e6
Author: Hyukjin Kwon 
AuthorDate: Wed Aug 17 17:34:02 2022 +0900

[SPARK-40066][SQL][FOLLOW-UP] Check if ElementAt is resolved before getting 
its dataType

### What changes were proposed in this pull request?

This PR is a followup of https://github.com/apache/spark/pull/37503 that 
adds a check if the `ElementAt` expression is resolved or not before getting 
its dataType.

### Why are the changes needed?

To make the tests pass with ANSI enabled. Currently it fails 
(https://github.com/apache/spark/runs/7870131749?check_suite_focus=true) as 
below:

```
[info] - map_filter *** FAILED *** (243 milliseconds)
[info]  org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid 
call to dataType on unresolved object
[info]  at 
org.apache.spark.sql.catalyst.expressions.UnresolvedNamedLambdaVariable.dataType(higherOrderFunctions.scala:46)
[info]  at 
org.apache.spark.sql.catalyst.expressions.ElementAt.initQueryContext(collectionOperations.scala:2275)
[info]  at 
org.apache.spark.sql.catalyst.expressions.SupportQueryContext.$init$(Expression.scala:603)
[info]  at 
org.apache.spark.sql.catalyst.expressions.ElementAt.(collectionOperations.scala:2105)
[info]  at org.apache.spark.sql.functions$.element_at(functions.scala:3958)
[info]  at 
org.apache.spark.sql.DataFrameFunctionsSuite.$anonfun$new$452(DataFrameFunctionsSuite.scala:2476)
[info]  at 
org.apache.spark.sql.functions$.createLambda(functions.scala:4029)
[info]  at org.apache.spark.sql.functions$.map_filter(functions.scala:4256)
[info]  at 
org.apache.spark.sql.DataFrameFunctionsSuite.$anonfun$new$451(DataFrameFunctionsSuite.scala:2476)
[info]  at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:133)
[info]  at 
org.apache.spark.sql.DataFrameFunctionsSuite.$anonfun$new$445(DataFrameFunctionsSuite.scala:2478)
[info]  at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]  at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]  at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]  at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]  at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]  at 
org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:190)
[info]  at 
org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:204)
[info]  at 
org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:188)
```

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Manually tested with ANSI mode enabled.

Closes #37548 from HyukjinKwon/SPARK-40066.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 .../apache/spark/sql/catalyst/expressions/collectionOperations.scala| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index 50da0fb12ec..3090916582e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -2272,7 +2272,7 @@ case class ElementAt(
 newLeft: Expression, newRight: Expression): ElementAt = copy(left = 
newLeft, right = newRight)
 
   override def initQueryContext(): Option[SQLQueryContext] = {
-if (failOnError && left.dataType.isInstanceOf[ArrayType]) {
+if (failOnError && left.resolved && left.dataType.isInstanceOf[ArrayType]) 
{
   Some(origin.context)
 } else {
   None


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-39887][SQL][FOLLOW-UP] Do not exclude Union's first child attributes when traversing other children in RemoveRedundantAliases

2022-08-17 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new f96fc06ca4a [SPARK-39887][SQL][FOLLOW-UP] Do not exclude Union's first 
child attributes when traversing other children in RemoveRedundantAliases
f96fc06ca4a is described below

commit f96fc06ca4a5a1bbc154aca1c7a9b96e7fc42c7e
Author: Peter Toth 
AuthorDate: Wed Aug 17 14:57:35 2022 +0800

[SPARK-39887][SQL][FOLLOW-UP] Do not exclude Union's first child attributes 
when traversing other children in RemoveRedundantAliases

### What changes were proposed in this pull request?
Do not exclude `Union`'s first child attributes when traversing other 
children in `RemoveRedundantAliases`.

### Why are the changes needed?
We don't need to exclude those attributes that `Union` inherits from its 
first child. See discussion here: 
https://github.com/apache/spark/pull/37496#discussion_r944509115

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing UTs.

Closes #37534 from 
peter-toth/SPARK-39887-keep-attributes-of-unions-first-child-follow-up.

Authored-by: Peter Toth 
Signed-off-by: Wenchen Fan 
(cherry picked from commit e732232dac420826af269d8cf5efacb52933f59a)
Signed-off-by: Wenchen Fan 
---
 .../scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala| 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 7993829f1f6..6797bbfc714 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -536,7 +536,7 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
 })
 Join(newLeft, newRight, joinType, newCondition, hint)
 
-  case _: Union =>
+  case u: Union =>
 var first = true
 plan.mapChildren { child =>
   if (first) {
@@ -547,7 +547,8 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
 // output attributes could return incorrect result.
 removeRedundantAliases(child, excluded ++ child.outputSet)
   } else {
-removeRedundantAliases(child, excluded)
+// We don't need to exclude those attributes that `Union` inherits 
from its first child.
+removeRedundantAliases(child, excluded -- 
u.children.head.outputSet)
   }
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-39887][SQL][FOLLOW-UP] Do not exclude Union's first child attributes when traversing other children in RemoveRedundantAliases

2022-08-17 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 9601be96a86 [SPARK-39887][SQL][FOLLOW-UP] Do not exclude Union's first 
child attributes when traversing other children in RemoveRedundantAliases
9601be96a86 is described below

commit 9601be96a86eced683e6aa2b772c726eeb231de8
Author: Peter Toth 
AuthorDate: Wed Aug 17 14:57:35 2022 +0800

[SPARK-39887][SQL][FOLLOW-UP] Do not exclude Union's first child attributes 
when traversing other children in RemoveRedundantAliases

### What changes were proposed in this pull request?
Do not exclude `Union`'s first child attributes when traversing other 
children in `RemoveRedundantAliases`.

### Why are the changes needed?
We don't need to exclude those attributes that `Union` inherits from its 
first child. See discussion here: 
https://github.com/apache/spark/pull/37496#discussion_r944509115

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing UTs.

Closes #37534 from 
peter-toth/SPARK-39887-keep-attributes-of-unions-first-child-follow-up.

Authored-by: Peter Toth 
Signed-off-by: Wenchen Fan 
(cherry picked from commit e732232dac420826af269d8cf5efacb52933f59a)
Signed-off-by: Wenchen Fan 
---
 .../scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala| 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 558a67ff5ca..4807824ee71 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -544,7 +544,7 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
 })
 Join(newLeft, newRight, joinType, newCondition, hint)
 
-  case _: Union =>
+  case u: Union =>
 var first = true
 plan.mapChildren { child =>
   if (first) {
@@ -555,7 +555,8 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
 // output attributes could return incorrect result.
 removeRedundantAliases(child, excluded ++ child.outputSet)
   } else {
-removeRedundantAliases(child, excluded)
+// We don't need to exclude those attributes that `Union` inherits 
from its first child.
+removeRedundantAliases(child, excluded -- 
u.children.head.outputSet)
   }
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (6c25ce383d1 -> e732232dac4)

2022-08-17 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 6c25ce383d1 [SPARK-40116][R][INFRA] Pin Arrow version to 8.0.0 in 
AppVeyor
 add e732232dac4 [SPARK-39887][SQL][FOLLOW-UP] Do not exclude Union's first 
child attributes when traversing other children in RemoveRedundantAliases

No new revisions were added by this update.

Summary of changes:
 .../scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala| 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-40116][R][INFRA] Pin Arrow version to 8.0.0 in AppVeyor

2022-08-17 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6c25ce383d1 [SPARK-40116][R][INFRA] Pin Arrow version to 8.0.0 in 
AppVeyor
6c25ce383d1 is described below

commit 6c25ce383d1408884366eb6cf22f302e6b2d8864
Author: Hyukjin Kwon 
AuthorDate: Wed Aug 17 15:34:20 2022 +0900

[SPARK-40116][R][INFRA] Pin Arrow version to 8.0.0 in AppVeyor

### What changes were proposed in this pull request?

This PR proposes to remoove the Arrow in AppVeyor for now to recover the 
build.

### Why are the changes needed?

SparkR does not support Arrow 9.0.0 
([SPARK-40114](https://issues.apache.org/jira/browse/SPARK-40114)) so the tests 
fail (https://ci.appveyor.com/project/HyukjinKwon/spark/builds/44490387)

Should recover the tests first because it looks it's going to take a while 
to add the Arrow 9.0.0 support.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

CI in this PR should test it out.

Closes #37546 from HyukjinKwon/SPARK-40116.

Lead-authored-by: Hyukjin Kwon 
Co-authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 appveyor.yml | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/appveyor.yml b/appveyor.yml
index 1a2aef0d3b8..3ec79645697 100644
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -42,7 +42,8 @@ install:
   # Install SBT and dependencies
   - ps: .\dev\appveyor-install-dependencies.ps1
   # Required package for R unit tests. xml2 is required to use jUnit reporter 
in testthat.
-  - cmd: Rscript -e "install.packages(c('knitr', 'rmarkdown', 'testthat', 
'e1071', 'survival', 'arrow', 'xml2'), repos='https://cloud.r-project.org/')"
+  # TODO(SPARK-40114): Add 'arrow' back with supporting Arrow 9.0.0
+  - cmd: Rscript -e "install.packages(c('knitr', 'rmarkdown', 'testthat', 
'e1071', 'survival', 'xml2'), repos='https://cloud.r-project.org/')"
   - cmd: Rscript -e "pkg_list <- as.data.frame(installed.packages()[,c(1, 
3:4)]); pkg_list[is.na(pkg_list$Priority), 1:2, drop = FALSE]"
 
 build_script:


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-40117][PYTHON][SQL] Convert condition to java in DataFrameWriterV2.overwrite

2022-08-17 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 0db78424201 [SPARK-40117][PYTHON][SQL] Convert condition to java in 
DataFrameWriterV2.overwrite
0db78424201 is described below

commit 0db78424201cd7b2e2bcffb9de3c2a12a0c67b44
Author: Wenli Looi 
AuthorDate: Wed Aug 17 15:28:55 2022 +0900

[SPARK-40117][PYTHON][SQL] Convert condition to java in 
DataFrameWriterV2.overwrite

### What changes were proposed in this pull request?

Fix DataFrameWriterV2.overwrite() fails to convert the condition parameter 
to java. This prevents the function from being called.

It is caused by the following commit that deleted the `_to_java_column` 
call instead of fixing it: 
https://github.com/apache/spark/commit/a1e459ed9f6777fb8d5a2d09fda666402f9230b9

### Why are the changes needed?

DataFrameWriterV2.overwrite() cannot be called.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manually checked whether the arguments are sent to JVM or not.

Closes #37547 from looi/fix-overwrite.

Authored-by: Wenli Looi 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 46379863ab0dd2ee8fcf1e31e76476ff18397f60)
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/readwriter.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 760e54831c2..c4c813e56b1 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -1465,6 +1465,7 @@ class DataFrameWriterV2:
 Overwrite rows matching the given filter condition with the contents 
of the data frame in
 the output table.
 """
+condition = _to_java_column(condition)
 self._jwriter.overwrite(condition)
 
 @since(3.1)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (74deefb7894 -> 46379863ab0)

2022-08-17 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 74deefb7894 [SPARK-40084][PYTHON] Upgrade Py4J to 0.10.9.7
 add 46379863ab0 [SPARK-40117][PYTHON][SQL] Convert condition to java in 
DataFrameWriterV2.overwrite

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/readwriter.py | 1 +
 1 file changed, 1 insertion(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.1 updated: [SPARK-40117][PYTHON][SQL] Convert condition to java in DataFrameWriterV2.overwrite

2022-08-17 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new 1bb9e332faa [SPARK-40117][PYTHON][SQL] Convert condition to java in 
DataFrameWriterV2.overwrite
1bb9e332faa is described below

commit 1bb9e332faadd0fe8650285874b7f320d60405e6
Author: Wenli Looi 
AuthorDate: Wed Aug 17 15:28:55 2022 +0900

[SPARK-40117][PYTHON][SQL] Convert condition to java in 
DataFrameWriterV2.overwrite

### What changes were proposed in this pull request?

Fix DataFrameWriterV2.overwrite() fails to convert the condition parameter 
to java. This prevents the function from being called.

It is caused by the following commit that deleted the `_to_java_column` 
call instead of fixing it: 
https://github.com/apache/spark/commit/a1e459ed9f6777fb8d5a2d09fda666402f9230b9

### Why are the changes needed?

DataFrameWriterV2.overwrite() cannot be called.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manually checked whether the arguments are sent to JVM or not.

Closes #37547 from looi/fix-overwrite.

Authored-by: Wenli Looi 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 46379863ab0dd2ee8fcf1e31e76476ff18397f60)
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/readwriter.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 9529cf0fdae..cfa2f3fffb0 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -1570,6 +1570,7 @@ class DataFrameWriterV2(object):
 Overwrite rows matching the given filter condition with the contents 
of the data frame in
 the output table.
 """
+condition = _to_java_column(condition)
 self._jwriter.overwrite(condition)
 
 @since(3.1)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-40117][PYTHON][SQL] Convert condition to java in DataFrameWriterV2.overwrite

2022-08-17 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 6ec86e5d5d9 [SPARK-40117][PYTHON][SQL] Convert condition to java in 
DataFrameWriterV2.overwrite
6ec86e5d5d9 is described below

commit 6ec86e5d5d9df0b0d111decd0ea16e5a3e4cd3fe
Author: Wenli Looi 
AuthorDate: Wed Aug 17 15:28:55 2022 +0900

[SPARK-40117][PYTHON][SQL] Convert condition to java in 
DataFrameWriterV2.overwrite

### What changes were proposed in this pull request?

Fix DataFrameWriterV2.overwrite() fails to convert the condition parameter 
to java. This prevents the function from being called.

It is caused by the following commit that deleted the `_to_java_column` 
call instead of fixing it: 
https://github.com/apache/spark/commit/a1e459ed9f6777fb8d5a2d09fda666402f9230b9

### Why are the changes needed?

DataFrameWriterV2.overwrite() cannot be called.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manually checked whether the arguments are sent to JVM or not.

Closes #37547 from looi/fix-overwrite.

Authored-by: Wenli Looi 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 46379863ab0dd2ee8fcf1e31e76476ff18397f60)
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/readwriter.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 65753543cb5..892b2500097 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -1163,6 +1163,7 @@ class DataFrameWriterV2(object):
 Overwrite rows matching the given filter condition with the contents 
of the data frame in
 the output table.
 """
+condition = _to_java_column(condition)
 self._jwriter.overwrite(condition)
 
 @since(3.1)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org