[spark] branch branch-3.3 updated: [SPARK-40132][ML] Restore rawPredictionCol to MultilayerPerceptronClassifier.setParams
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new e1c5f90c700 [SPARK-40132][ML] Restore rawPredictionCol to MultilayerPerceptronClassifier.setParams e1c5f90c700 is described below commit e1c5f90c700d844aa56c211e53eb75d0aa99b9ad Author: Sean Owen AuthorDate: Thu Aug 18 00:23:52 2022 -0500 [SPARK-40132][ML] Restore rawPredictionCol to MultilayerPerceptronClassifier.setParams ### What changes were proposed in this pull request? Restore rawPredictionCol to MultilayerPerceptronClassifier.setParams ### Why are the changes needed? This param was inadvertently removed in the refactoring in https://github.com/apache/spark/commit/40cdb6d51c2befcfeac8fb5cf5faf178d1a5ee7b#r81473316 Without it, using this param in the constructor fails. ### Does this PR introduce _any_ user-facing change? Not aside from the bug fix. ### How was this patch tested? Existing tests. Closes #37561 from srowen/SPARK-40132. Authored-by: Sean Owen Signed-off-by: Sean Owen (cherry picked from commit 6768d9cc38a320f7e1c6781afcd170577c5c7d0f) Signed-off-by: Sean Owen --- python/pyspark/ml/classification.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 40a2a87c5db..c09a510d76b 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -3230,6 +3230,7 @@ class MultilayerPerceptronClassifier( solver: str = "l-bfgs", initialWeights: Optional[Vector] = None, probabilityCol: str = "probability", +rawPredictionCol: str = "rawPrediction", ) -> "MultilayerPerceptronClassifier": """ setParams(self, \\*, featuresCol="features", labelCol="label", predictionCol="prediction", \ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40132][ML] Restore rawPredictionCol to MultilayerPerceptronClassifier.setParams
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6768d9cc38a [SPARK-40132][ML] Restore rawPredictionCol to MultilayerPerceptronClassifier.setParams 6768d9cc38a is described below commit 6768d9cc38a320f7e1c6781afcd170577c5c7d0f Author: Sean Owen AuthorDate: Thu Aug 18 00:23:52 2022 -0500 [SPARK-40132][ML] Restore rawPredictionCol to MultilayerPerceptronClassifier.setParams ### What changes were proposed in this pull request? Restore rawPredictionCol to MultilayerPerceptronClassifier.setParams ### Why are the changes needed? This param was inadvertently removed in the refactoring in https://github.com/apache/spark/commit/40cdb6d51c2befcfeac8fb5cf5faf178d1a5ee7b#r81473316 Without it, using this param in the constructor fails. ### Does this PR introduce _any_ user-facing change? Not aside from the bug fix. ### How was this patch tested? Existing tests. Closes #37561 from srowen/SPARK-40132. Authored-by: Sean Owen Signed-off-by: Sean Owen --- python/pyspark/ml/classification.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 40a2a87c5db..c09a510d76b 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -3230,6 +3230,7 @@ class MultilayerPerceptronClassifier( solver: str = "l-bfgs", initialWeights: Optional[Vector] = None, probabilityCol: str = "probability", +rawPredictionCol: str = "rawPrediction", ) -> "MultilayerPerceptronClassifier": """ setParams(self, \\*, featuresCol="features", labelCol="label", predictionCol="prediction", \ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-38946][PYTHON][PS] Generates a new dataframe instead of operating inplace in setitem
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 532c5005f2f [SPARK-38946][PYTHON][PS] Generates a new dataframe instead of operating inplace in setitem 532c5005f2f is described below commit 532c5005f2fd82d714d14a815f435ef48fecc205 Author: Yikun Jiang AuthorDate: Thu Aug 18 12:34:51 2022 +0900 [SPARK-38946][PYTHON][PS] Generates a new dataframe instead of operating inplace in setitem ### What changes were proposed in this pull request? Generates a new dataframe instead of operating inplace in setitem ### Why are the changes needed? Make CI passed in with pandas 1.4.3 Since pandas 1.4.0 https://github.com/pandas-dev/pandas/commit/03dd698bc1e84c35aba8b51bdd45c472860b9ec3 , dataframe.setitem should always make a copy and never write into the existing array. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? CI test with current pandas (1.3.x) and latest pandas 1.4.2, 1.4.3 Closes #36353 from Yikun/SPARK-38946. Authored-by: Yikun Jiang Signed-off-by: Hyukjin Kwon --- .../source/migration_guide/pyspark_3.3_to_3.4.rst | 2 + python/pyspark/pandas/frame.py | 26 + python/pyspark/pandas/tests/test_dataframe.py | 43 -- 3 files changed, 53 insertions(+), 18 deletions(-) diff --git a/python/docs/source/migration_guide/pyspark_3.3_to_3.4.rst b/python/docs/source/migration_guide/pyspark_3.3_to_3.4.rst index dbe7b818b2a..b3baa8345aa 100644 --- a/python/docs/source/migration_guide/pyspark_3.3_to_3.4.rst +++ b/python/docs/source/migration_guide/pyspark_3.3_to_3.4.rst @@ -37,3 +37,5 @@ Upgrading from PySpark 3.3 to 3.4 * In Spark 3.4, the infer schema process of ``groupby.apply`` in Pandas on Spark, will first infer the pandas type to ensure the accuracy of the pandas ``dtype`` as much as possible. * In Spark 3.4, the ``Series.concat`` sort parameter will be respected to follow pandas 1.4 behaviors. + +* In Spark 3.4, the ``DataFrame.__setitem__`` will make a copy and replace pre-existing arrays, which will NOT be over-written to follow pandas 1.4 behaviors. diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index b3ded9885fc..fb4c3368057 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -498,20 +498,30 @@ class DataFrame(Frame, Generic[T]): return cast(InternalFrame, self._internal_frame) # type: ignore[has-type] def _update_internal_frame( -self, internal: InternalFrame, requires_same_anchor: bool = True +self, +internal: InternalFrame, +requires_same_anchor: bool = True, +anchor_force_disconnect: bool = False, ) -> None: """ Update InternalFrame with the given one. -If the column_label is changed or the new InternalFrame is not the same `anchor`, -disconnect the link to the Series and create a new one. +If the column_label is changed or the new InternalFrame is not the same `anchor` or the +`anchor_force_disconnect` flag is set to True, disconnect the original anchor and create +a new one. If `requires_same_anchor` is `False`, checking whether or not the same anchor is ignored and force to update the InternalFrame, e.g., replacing the internal with the resolved_copy, updating the underlying Spark DataFrame which need to combine a different Spark DataFrame. -:param internal: the new InternalFrame -:param requires_same_anchor: whether checking the same anchor +Parameters +-- +internal : InternalFrame +The new InternalFrame +requires_same_anchor : bool +Whether checking the same anchor +anchor_force_disconnect : bool +Force to disconnect the original anchor and create a new one """ from pyspark.pandas.series import Series @@ -527,7 +537,7 @@ class DataFrame(Frame, Generic[T]): renamed = old_label != new_label not_same_anchor = requires_same_anchor and not same_anchor(internal, psser) -if renamed or not_same_anchor: +if renamed or not_same_anchor or anchor_force_disconnect: psdf: DataFrame = DataFrame(self._internal.select_column(old_label)) psser._update_anchor(psdf) psser = None @@ -12903,7 +12913,9 @@ defaultdict(, {'col..., 'col...})] # Same Series. psdf = self._assign({key: value}) -self._update_internal_frame(psdf._internal) +# Since Spark 3.4, df.__setitem__ generates a new
[spark] branch branch-3.1 updated: [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 07cc6a8963d [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF 07cc6a8963d is described below commit 07cc6a8963d9bd26d5ec0738ca4fa4767cbfac63 Author: Hyukjin Kwon AuthorDate: Thu Aug 18 12:23:02 2022 +0900 [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF This PR proposes to initialize the projection so non-deterministic expressions can be evaluated with Python UDFs. To make the Python UDF working with non-deterministic expressions. Yes. ```python from pyspark.sql.functions import udf, rand spark.range(10).select(udf(lambda x: x, "double")(rand())).show() ``` **Before** ``` java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source) at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$10(EvalPythonExec.scala:126) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161) at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176) at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1213) ``` **After** ``` +--+ |rand(-2507211707257730645)| +--+ |0.7691724424045242| | 0.09602244075319044| |0.3006471278112862| |0.4182649571961977| | 0.29349096650900974| |0.7987097908937618| |0.5324802583101007| | 0.72460930912789| |0.1367749768412846| | 0.17277322931919348| +--+ ``` Manually tested, and unittest was added. Closes #37552 from HyukjinKwon/SPARK-40121. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon (cherry picked from commit 336c9bc535895530cc3983b24e7507229fa9570d) Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/tests/test_udf.py | 8 +++- .../org/apache/spark/sql/execution/python/EvalPythonExec.scala| 1 + 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py index 0d13361dcab..47d5efd441f 100644 --- a/python/pyspark/sql/tests/test_udf.py +++ b/python/pyspark/sql/tests/test_udf.py @@ -23,7 +23,7 @@ import unittest from pyspark import SparkContext from pyspark.sql import SparkSession, Column, Row -from pyspark.sql.functions import udf +from pyspark.sql.functions import udf, rand from pyspark.sql.udf import UserDefinedFunction from pyspark.sql.types import StringType, IntegerType, BooleanType, DoubleType, LongType, \ ArrayType, StructType, StructField @@ -685,6 +685,12 @@ class UDFTests(ReusedSQLTestCase): self.assertEqual(result.collect(), [Row(c1=Row(_1=1.0, _2=1.0), c2=Row(_1=1, _2=1), c3=1.0, c4=1)]) +def test_udf_with_rand(self): +# SPARK-40121: rand() with Python UDF. +self.assertEqual( +len(self.spark.range(10).select(udf(lambda x: x, DoubleType())(rand())).collect()), 10 +) + class UDFInitializationTests(unittest.TestCase): def tearDown(self): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index fca43e454bf..1447788a609 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -116,6 +116,7 @@ trait EvalPythonExec extends UnaryExecNode { }.toArray }.toArray val projection = MutableProjection.create(allInputs.toSeq, child.output) + projection.initialize(context.partitionId()) val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) => StructField(s"_$i", dt) }.toSeq) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new fcec11ad932 [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF fcec11ad932 is described below commit fcec11ad9329553f4bea024227bdc6468da85278 Author: Hyukjin Kwon AuthorDate: Thu Aug 18 12:23:02 2022 +0900 [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF This PR proposes to initialize the projection so non-deterministic expressions can be evaluated with Python UDFs. To make the Python UDF working with non-deterministic expressions. Yes. ```python from pyspark.sql.functions import udf, rand spark.range(10).select(udf(lambda x: x, "double")(rand())).show() ``` **Before** ``` java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source) at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$10(EvalPythonExec.scala:126) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161) at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176) at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1213) ``` **After** ``` +--+ |rand(-2507211707257730645)| +--+ |0.7691724424045242| | 0.09602244075319044| |0.3006471278112862| |0.4182649571961977| | 0.29349096650900974| |0.7987097908937618| |0.5324802583101007| | 0.72460930912789| |0.1367749768412846| | 0.17277322931919348| +--+ ``` Manually tested, and unittest was added. Closes #37552 from HyukjinKwon/SPARK-40121. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon (cherry picked from commit 336c9bc535895530cc3983b24e7507229fa9570d) Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/tests/test_udf.py | 8 +++- .../org/apache/spark/sql/execution/python/EvalPythonExec.scala| 1 + 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py index fc475f1121d..5e6738a2f8e 100644 --- a/python/pyspark/sql/tests/test_udf.py +++ b/python/pyspark/sql/tests/test_udf.py @@ -23,7 +23,7 @@ import unittest from pyspark import SparkContext from pyspark.sql import SparkSession, Column, Row -from pyspark.sql.functions import udf +from pyspark.sql.functions import udf, rand from pyspark.sql.udf import UserDefinedFunction from pyspark.sql.types import StringType, IntegerType, BooleanType, DoubleType, LongType, \ ArrayType, StructType, StructField @@ -705,6 +705,12 @@ class UDFTests(ReusedSQLTestCase): finally: shutil.rmtree(path) +def test_udf_with_rand(self): +# SPARK-40121: rand() with Python UDF. +self.assertEqual( +len(self.spark.range(10).select(udf(lambda x: x, DoubleType())(rand())).collect()), 10 +) + class UDFInitializationTests(unittest.TestCase): def tearDown(self): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index fca43e454bf..1447788a609 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -116,6 +116,7 @@ trait EvalPythonExec extends UnaryExecNode { }.toArray }.toArray val projection = MutableProjection.create(allInputs.toSeq, child.output) + projection.initialize(context.partitionId()) val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) => StructField(s"_$i", dt) }.toSeq) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 336c9bc5358 [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF 336c9bc5358 is described below commit 336c9bc535895530cc3983b24e7507229fa9570d Author: Hyukjin Kwon AuthorDate: Thu Aug 18 12:23:02 2022 +0900 [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF ### What changes were proposed in this pull request? This PR proposes to initialize the projection so non-deterministic expressions can be evaluated with Python UDFs. ### Why are the changes needed? To make the Python UDF working with non-deterministic expressions. ### Does this PR introduce _any_ user-facing change? Yes. ```python from pyspark.sql.functions import udf, rand spark.range(10).select(udf(lambda x: x, "double")(rand())).show() ``` **Before** ``` java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source) at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$10(EvalPythonExec.scala:126) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161) at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176) at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1213) ``` **After** ``` +--+ |rand(-2507211707257730645)| +--+ |0.7691724424045242| | 0.09602244075319044| |0.3006471278112862| |0.4182649571961977| | 0.29349096650900974| |0.7987097908937618| |0.5324802583101007| | 0.72460930912789| |0.1367749768412846| | 0.17277322931919348| +--+ ``` ### How was this patch tested? Manually tested, and unittest was added. Closes #37552 from HyukjinKwon/SPARK-40121. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/tests/test_udf.py | 8 +++- .../org/apache/spark/sql/execution/python/EvalPythonExec.scala| 1 + 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py index ba9cfec4600..03bcbaf6ddf 100644 --- a/python/pyspark/sql/tests/test_udf.py +++ b/python/pyspark/sql/tests/test_udf.py @@ -24,7 +24,7 @@ import datetime from pyspark import SparkContext, SQLContext from pyspark.sql import SparkSession, Column, Row -from pyspark.sql.functions import udf, assert_true, lit +from pyspark.sql.functions import udf, assert_true, lit, rand from pyspark.sql.udf import UserDefinedFunction from pyspark.sql.types import ( StringType, @@ -797,6 +797,12 @@ class UDFTests(ReusedSQLTestCase): finally: shutil.rmtree(path) +def test_udf_with_rand(self): +# SPARK-40121: rand() with Python UDF. +self.assertEqual( +len(self.spark.range(10).select(udf(lambda x: x, DoubleType())(rand())).collect()), 10 +) + class UDFInitializationTests(unittest.TestCase): def tearDown(self): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index c567a70e1d3..f117a408566 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -116,6 +116,7 @@ trait EvalPythonExec extends UnaryExecNode { }.toArray }.toArray val projection = MutableProjection.create(allInputs.toSeq, child.output) + projection.initialize(context.partitionId()) val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) => StructField(s"_$i", dt) }.toSeq) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.3 updated: [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 1a01a492c05 [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF 1a01a492c05 is described below commit 1a01a492c051bb861c480f224a3c310e133e4d01 Author: Hyukjin Kwon AuthorDate: Thu Aug 18 12:23:02 2022 +0900 [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF ### What changes were proposed in this pull request? This PR proposes to initialize the projection so non-deterministic expressions can be evaluated with Python UDFs. ### Why are the changes needed? To make the Python UDF working with non-deterministic expressions. ### Does this PR introduce _any_ user-facing change? Yes. ```python from pyspark.sql.functions import udf, rand spark.range(10).select(udf(lambda x: x, "double")(rand())).show() ``` **Before** ``` java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source) at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$10(EvalPythonExec.scala:126) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161) at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176) at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1213) ``` **After** ``` +--+ |rand(-2507211707257730645)| +--+ |0.7691724424045242| | 0.09602244075319044| |0.3006471278112862| |0.4182649571961977| | 0.29349096650900974| |0.7987097908937618| |0.5324802583101007| | 0.72460930912789| |0.1367749768412846| | 0.17277322931919348| +--+ ``` ### How was this patch tested? Manually tested, and unittest was added. Closes #37552 from HyukjinKwon/SPARK-40121. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon (cherry picked from commit 336c9bc535895530cc3983b24e7507229fa9570d) Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/tests/test_udf.py | 8 +++- .../org/apache/spark/sql/execution/python/EvalPythonExec.scala| 1 + 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py index 40deac992c4..34ac08cb818 100644 --- a/python/pyspark/sql/tests/test_udf.py +++ b/python/pyspark/sql/tests/test_udf.py @@ -24,7 +24,7 @@ import datetime from pyspark import SparkContext, SQLContext from pyspark.sql import SparkSession, Column, Row -from pyspark.sql.functions import udf, assert_true, lit +from pyspark.sql.functions import udf, assert_true, lit, rand from pyspark.sql.udf import UserDefinedFunction from pyspark.sql.types import ( StringType, @@ -798,6 +798,12 @@ class UDFTests(ReusedSQLTestCase): finally: shutil.rmtree(path) +def test_udf_with_rand(self): +# SPARK-40121: rand() with Python UDF. +self.assertEqual( +len(self.spark.range(10).select(udf(lambda x: x, DoubleType())(rand())).collect()), 10 +) + class UDFInitializationTests(unittest.TestCase): def tearDown(self): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index c567a70e1d3..f117a408566 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -116,6 +116,7 @@ trait EvalPythonExec extends UnaryExecNode { }.toArray }.toArray val projection = MutableProjection.create(allInputs.toSeq, child.output) + projection.initialize(context.partitionId()) val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) => StructField(s"_$i", dt) }.toSeq) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40128][SQL] Make the VectorizedColumnReader recognize DELTA_LENGTH_BYTE_ARRAY as a standalone column encoding
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 01f9d270ea1 [SPARK-40128][SQL] Make the VectorizedColumnReader recognize DELTA_LENGTH_BYTE_ARRAY as a standalone column encoding 01f9d270ea1 is described below commit 01f9d270ea14d4a9b3a5f326fc3f721ddd23e3f4 Author: Dennis Huo AuthorDate: Wed Aug 17 16:21:43 2022 -0700 [SPARK-40128][SQL] Make the VectorizedColumnReader recognize DELTA_LENGTH_BYTE_ARRAY as a standalone column encoding ### What changes were proposed in this pull request? Add DELTA_LENGTH_BYTE_ARRAY as a recognized encoding in VectorizedColumnReader so that vectorized reads succeed when there are columns using DELTA_LENGTH_BYTE_ARRAY as a standalone encoding. ### Why are the changes needed? Spark currently throws an exception for DELTA_LENGTH_BYTE_ARRAY columns when vectorized reads are enabled and trying to read `delta_length_byte_array.parquet` from https://github.com/apache/parquet-testing: java.lang.UnsupportedOperationException: Unsupported encoding: DELTA_LENGTH_BYTE_ARRAY ### Does this PR introduce _any_ user-facing change? Yes - previously throw UNSUPPORTED exception. Now reads the encoding same as if vectorized reads are disabled. ### How was this patch tested? Added test case to ParquetIOSuite; made sure it fails without the fix to VectorizedColumnReader and passes after. Closes #37557 from sfc-gh-dhuo/support-parquet-delta-length-byte-array. Authored-by: Dennis Huo Signed-off-by: Chao Sun --- .../datasources/parquet/VectorizedColumnReader.java | 2 ++ .../resources/test-data/delta_length_byte_array.parquet | Bin 0 -> 3072 bytes .../execution/datasources/parquet/ParquetIOSuite.scala | 10 ++ 3 files changed, 12 insertions(+) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index c2e85da3884..64178fdd72d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -329,6 +329,8 @@ public class VectorizedColumnReader { return new VectorizedPlainValuesReader(); case DELTA_BYTE_ARRAY: return new VectorizedDeltaByteArrayReader(); + case DELTA_LENGTH_BYTE_ARRAY: +return new VectorizedDeltaLengthByteArrayReader(); case DELTA_BINARY_PACKED: return new VectorizedDeltaBinaryPackedReader(); case RLE: diff --git a/sql/core/src/test/resources/test-data/delta_length_byte_array.parquet b/sql/core/src/test/resources/test-data/delta_length_byte_array.parquet new file mode 100644 index 000..ead505a1a1f Binary files /dev/null and b/sql/core/src/test/resources/test-data/delta_length_byte_array.parquet differ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 5a8f4563756..0458e5a1a14 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -1307,6 +1307,16 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } + test("SPARK-40128 read DELTA_LENGTH_BYTE_ARRAY encoded strings") { +withAllParquetReaders { + checkAnswer( +// "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. +// The file comes from https://github.com/apache/parquet-testing +readResourceParquetFile("test-data/delta_length_byte_array.parquet"), +(0 to 999).map(i => Row("apple_banana_mango" + Integer.toString(i * i +} + } + test("SPARK-12589 copy() on rows returned from reader works for strings") { withTempPath { dir => val data = (1, "abc") ::(2, "helloabcde") :: Nil - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40110][SQL][TESTS] Add JDBCWithAQESuite
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 44f30a04dad [SPARK-40110][SQL][TESTS] Add JDBCWithAQESuite 44f30a04dad is described below commit 44f30a04dad2baa471b505f95c6a29992ee7ca72 Author: Kazuyuki Tanimura AuthorDate: Wed Aug 17 15:32:46 2022 -0700 [SPARK-40110][SQL][TESTS] Add JDBCWithAQESuite ### What changes were proposed in this pull request? This PR proposes to add `JDBCWithAQESuite` i.e. test cases of `JDBCSuite` with AQE (Adaptive Query Execution) enabled. ### Why are the changes needed? Currently `JDBCSuite` assumes that AQE is always turned off. We should also test with AQE turned on ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added the AQE version tests along with the non AQE version Closes #37544 from kazuyukitanimura/SPARK-40110. Authored-by: Kazuyuki Tanimura Signed-off-by: Chao Sun --- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 32 -- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index b87fee6cec2..8eda0c288a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.ShowCreateTable import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeTestUtils} import org.apache.spark.sql.execution.{DataSourceScanExec, ExtendedMode} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.execution.command.{ExplainCommand, ShowCreateTableCommand} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRelation, JdbcUtils} @@ -44,7 +45,8 @@ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.util.Utils -class JDBCSuite extends QueryTest with SharedSparkSession { +class JDBCSuite extends QueryTest with SharedSparkSession + with AdaptiveSparkPlanHelper with DisableAdaptiveExecutionSuite { import testImplicits._ val url = "jdbc:h2:mem:testdb0" @@ -298,10 +300,15 @@ class JDBCSuite extends QueryTest with SharedSparkSession { val parentPlan = df.queryExecution.executedPlan // Check if SparkPlan Filter is removed in a physical plan and // the plan only has PhysicalRDD to scan JDBCRelation. - assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]) -val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec] - assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec]) - assert(node.child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation")) +val child = if (df.sqlContext.conf.adaptiveExecutionEnabled) { + assert(parentPlan.isInstanceOf[AdaptiveSparkPlanExec]) + parentPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan +} else { + assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]) + parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec].child +} + assert(child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec]) + assert(child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation")) df } @@ -309,9 +316,14 @@ class JDBCSuite extends QueryTest with SharedSparkSession { val parentPlan = df.queryExecution.executedPlan // Check if SparkPlan Filter is not removed in a physical plan because JDBCRDD // cannot compile given predicates. - assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]) -val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec] -assert(node.child.isInstanceOf[org.apache.spark.sql.execution.FilterExec]) +val child = if (df.sqlContext.conf.adaptiveExecutionEnabled) { + assert(parentPlan.isInstanceOf[AdaptiveSparkPlanExec]) + parentPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan +} else { + assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]) + parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec].child +} +assert(child.isInstanceOf[org.apache.spark.sql.execution.FilterExec]) df } @@ -1767,7
[spark] branch master updated: [SPARK-40109][SQL] New SQL function: get()
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b6cf3e4598f [SPARK-40109][SQL] New SQL function: get() b6cf3e4598f is described below commit b6cf3e4598fb6ae9f9ed28c7d5a0d4152453a669 Author: Gengliang Wang AuthorDate: Wed Aug 17 15:03:17 2022 -0700 [SPARK-40109][SQL] New SQL function: get() ### What changes were proposed in this pull request? Introduce a new SQL function `get()`: Returns element of array at given (0-based) index. If the index points outside of the array boundaries, then this function returns NULL. Examples: ``` > SELECT _FUNC_(array(1, 2, 3), 2); 2 > SELECT _FUNC_(array(1, 2, 3), 3); NULL > SELECT _FUNC_(array(1, 2, 3), -1); NULL ``` ### Why are the changes needed? Currently, when accessing array element with invalid index under ANSI SQL mode, the error is like: ``` [INVALID_ARRAY_INDEX] The index -1 is out of bounds. The array has 3 elements. Use `try_element_at` and increase the array index by 1(the starting array index is 1 for `try_element_at`) to tolerate accessing element at invalid index and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. ``` The provided solution is complicated. I suggest introducing a new method `get()` which always returns null on an invalid array index. This is from https://docs.snowflake.com/en/sql-reference/functions/get.html. Note: since Spark's map access always returns null, let's don't support map type in the get method for now. ### Does this PR introduce _any_ user-facing change? Yes, a new SQL function `get()`: returns element of array at given (0-based) index. If the index points outside of the array boundaries, then this function returns NULL. ### How was this patch tested? New UT Closes #37541 from gengliangwang/addGetMethod. Lead-authored-by: Gengliang Wang Co-authored-by: Gengliang Wang Signed-off-by: Gengliang Wang --- core/src/main/resources/error/error-classes.json | 2 +- .../sql/catalyst/analysis/FunctionRegistry.scala | 1 + .../expressions/collectionOperations.scala | 36 +++ .../sql-functions/sql-expression-schema.md | 1 + .../src/test/resources/sql-tests/inputs/array.sql | 6 .../resources/sql-tests/results/ansi/array.sql.out | 42 +++--- .../test/resources/sql-tests/results/array.sql.out | 32 + 7 files changed, 114 insertions(+), 6 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index c2c5f30564c..3f6c1ca0362 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -224,7 +224,7 @@ }, "INVALID_ARRAY_INDEX" : { "message" : [ - "The index is out of bounds. The array has elements. Use `try_element_at` and increase the array index by 1(the starting array index is 1 for `try_element_at`) to tolerate accessing element at invalid index and return NULL instead. If necessary set to \"false\" to bypass this error." + "The index is out of bounds. The array has elements. Use the SQL function `get()` to tolerate accessing element at invalid index and return NULL instead. If necessary set to \"false\" to bypass this error." ] }, "INVALID_ARRAY_INDEX_IN_ELEMENT_AT" : { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index b655c45bd5f..42f3ca041b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -702,6 +702,7 @@ object FunctionRegistry { expression[TransformKeys]("transform_keys"), expression[MapZipWith]("map_zip_with"), expression[ZipWith]("zip_with"), +expression[Get]("get"), CreateStruct.registryEntry, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index 3090916582e..40eade75578 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -2072,6 +2072,42 @@ case class ArrayPosition(left: Expression, right: Expression) copy(left = newLeft, right = newRight) } +/** + * Returns the value of
[spark] branch master updated (103768bcb8b -> eef7596d615)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 103768bcb8b [SPARK-40105][SQL] Improve repartition in ReplaceCTERefWithRepartition add eef7596d615 [SPARK-40114][R] Arrow 9.0.0 support with SparkR No new revisions were added by this update. Summary of changes: R/pkg/R/serialize.R | 2 +- appveyor.yml| 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40105][SQL] Improve repartition in ReplaceCTERefWithRepartition
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 103768bcb8b [SPARK-40105][SQL] Improve repartition in ReplaceCTERefWithRepartition 103768bcb8b is described below commit 103768bcb8bb98ae1b55d449f4f7edf215f3a72c Author: ulysses-you AuthorDate: Wed Aug 17 17:46:19 2022 +0800 [SPARK-40105][SQL] Improve repartition in ReplaceCTERefWithRepartition ### What changes were proposed in this pull request? - skip adding a repartition if the top level node of CTE is rebalance - use RepartitionByExpression instead of Repartition so that AQE can coalesce the shuffle partition ### Why are the changes needed? If cte can not inlined, the ReplaceCTERefWithRepartition will add repartition to force a shuffle so that the reference can reuse shuffle exchange. The added repartition should be optimized by AQE for better performance. If the user has specified a rebalance, the ReplaceCTERefWithRepartition should skip add repartition. ### Does this PR introduce _any_ user-facing change? no, only improve performance ### How was this patch tested? add test Closes #37537 from ulysses-you/cte. Authored-by: ulysses-you Signed-off-by: Wenchen Fan --- .../optimizer/ReplaceCTERefWithRepartition.scala | 10 ++-- .../org/apache/spark/sql/CTEInlineSuite.scala | 30 +++--- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceCTERefWithRepartition.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceCTERefWithRepartition.scala index 0190fa2a2ab..c01372c71ab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceCTERefWithRepartition.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceCTERefWithRepartition.scala @@ -41,6 +41,12 @@ object ReplaceCTERefWithRepartition extends Rule[LogicalPlan] { replaceWithRepartition(plan, mutable.HashMap.empty[Long, LogicalPlan]) } + private def canSkipExtraRepartition(p: LogicalPlan): Boolean = p match { +case _: RepartitionOperation => true +case _: RebalancePartitions => true +case _ => false + } + private def replaceWithRepartition( plan: LogicalPlan, cteMap: mutable.HashMap[Long, LogicalPlan]): LogicalPlan = plan match { @@ -48,12 +54,12 @@ object ReplaceCTERefWithRepartition extends Rule[LogicalPlan] { cteDefs.foreach { cteDef => val inlined = replaceWithRepartition(cteDef.child, cteMap) val withRepartition = - if (inlined.isInstanceOf[RepartitionOperation] || cteDef.underSubquery) { + if (canSkipExtraRepartition(inlined) || cteDef.underSubquery) { // If the CTE definition plan itself is a repartition operation or if it hosts a merged // scalar subquery, we do not need to add an extra repartition shuffle. inlined } else { -Repartition(conf.numShufflePartitions, shuffle = true, inlined) +RepartitionByExpression(Seq.empty, inlined, None) } cteMap.put(cteDef.id, withRepartition) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala index ee000bce1fc..26d165b460a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.expressions.{And, GreaterThan, LessThan, Literal, Or} -import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project, RepartitionOperation, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project, RebalancePartitions, RepartitionByExpression, RepartitionOperation, WithCTE} import org.apache.spark.sql.execution.adaptive._ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.internal.SQLConf @@ -42,8 +42,13 @@ abstract class CTEInlineSuiteBase |select * from v except select * from v """.stripMargin) checkAnswer(df, Nil) + + val r = df.queryExecution.optimizedPlan.find { +case RepartitionByExpression(p, _, None) => p.isEmpty +case _ => false + } assert( - df.queryExecution.optimizedPlan.exists(_.isInstanceOf[RepartitionOperation]), +r.isDefined, "Non-deterministic With-CTE with multiple references should be not inlined.") } } @@ -485,4 +490,23 @@ abstract class CTEInlineSuiteBase class CTEInlineSuiteAEOff extends CTEInlineSuiteBase
[spark] branch master updated: [SPARK-40066][SQL][FOLLOW-UP] Check if ElementAt is resolved before getting its dataType
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new fbc0edac585 [SPARK-40066][SQL][FOLLOW-UP] Check if ElementAt is resolved before getting its dataType fbc0edac585 is described below commit fbc0edac5859dae6b2c9ad012d3932f54196f2e6 Author: Hyukjin Kwon AuthorDate: Wed Aug 17 17:34:02 2022 +0900 [SPARK-40066][SQL][FOLLOW-UP] Check if ElementAt is resolved before getting its dataType ### What changes were proposed in this pull request? This PR is a followup of https://github.com/apache/spark/pull/37503 that adds a check if the `ElementAt` expression is resolved or not before getting its dataType. ### Why are the changes needed? To make the tests pass with ANSI enabled. Currently it fails (https://github.com/apache/spark/runs/7870131749?check_suite_focus=true) as below: ``` [info] - map_filter *** FAILED *** (243 milliseconds) [info] org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object [info] at org.apache.spark.sql.catalyst.expressions.UnresolvedNamedLambdaVariable.dataType(higherOrderFunctions.scala:46) [info] at org.apache.spark.sql.catalyst.expressions.ElementAt.initQueryContext(collectionOperations.scala:2275) [info] at org.apache.spark.sql.catalyst.expressions.SupportQueryContext.$init$(Expression.scala:603) [info] at org.apache.spark.sql.catalyst.expressions.ElementAt.(collectionOperations.scala:2105) [info] at org.apache.spark.sql.functions$.element_at(functions.scala:3958) [info] at org.apache.spark.sql.DataFrameFunctionsSuite.$anonfun$new$452(DataFrameFunctionsSuite.scala:2476) [info] at org.apache.spark.sql.functions$.createLambda(functions.scala:4029) [info] at org.apache.spark.sql.functions$.map_filter(functions.scala:4256) [info] at org.apache.spark.sql.DataFrameFunctionsSuite.$anonfun$new$451(DataFrameFunctionsSuite.scala:2476) [info] at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:133) [info] at org.apache.spark.sql.DataFrameFunctionsSuite.$anonfun$new$445(DataFrameFunctionsSuite.scala:2478) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) [info] at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:190) [info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:204) [info] at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:188) ``` ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Manually tested with ANSI mode enabled. Closes #37548 from HyukjinKwon/SPARK-40066. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .../apache/spark/sql/catalyst/expressions/collectionOperations.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index 50da0fb12ec..3090916582e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -2272,7 +2272,7 @@ case class ElementAt( newLeft: Expression, newRight: Expression): ElementAt = copy(left = newLeft, right = newRight) override def initQueryContext(): Option[SQLQueryContext] = { -if (failOnError && left.dataType.isInstanceOf[ArrayType]) { +if (failOnError && left.resolved && left.dataType.isInstanceOf[ArrayType]) { Some(origin.context) } else { None - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-39887][SQL][FOLLOW-UP] Do not exclude Union's first child attributes when traversing other children in RemoveRedundantAliases
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new f96fc06ca4a [SPARK-39887][SQL][FOLLOW-UP] Do not exclude Union's first child attributes when traversing other children in RemoveRedundantAliases f96fc06ca4a is described below commit f96fc06ca4a5a1bbc154aca1c7a9b96e7fc42c7e Author: Peter Toth AuthorDate: Wed Aug 17 14:57:35 2022 +0800 [SPARK-39887][SQL][FOLLOW-UP] Do not exclude Union's first child attributes when traversing other children in RemoveRedundantAliases ### What changes were proposed in this pull request? Do not exclude `Union`'s first child attributes when traversing other children in `RemoveRedundantAliases`. ### Why are the changes needed? We don't need to exclude those attributes that `Union` inherits from its first child. See discussion here: https://github.com/apache/spark/pull/37496#discussion_r944509115 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs. Closes #37534 from peter-toth/SPARK-39887-keep-attributes-of-unions-first-child-follow-up. Authored-by: Peter Toth Signed-off-by: Wenchen Fan (cherry picked from commit e732232dac420826af269d8cf5efacb52933f59a) Signed-off-by: Wenchen Fan --- .../scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala| 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 7993829f1f6..6797bbfc714 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -536,7 +536,7 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { }) Join(newLeft, newRight, joinType, newCondition, hint) - case _: Union => + case u: Union => var first = true plan.mapChildren { child => if (first) { @@ -547,7 +547,8 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { // output attributes could return incorrect result. removeRedundantAliases(child, excluded ++ child.outputSet) } else { -removeRedundantAliases(child, excluded) +// We don't need to exclude those attributes that `Union` inherits from its first child. +removeRedundantAliases(child, excluded -- u.children.head.outputSet) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.3 updated: [SPARK-39887][SQL][FOLLOW-UP] Do not exclude Union's first child attributes when traversing other children in RemoveRedundantAliases
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 9601be96a86 [SPARK-39887][SQL][FOLLOW-UP] Do not exclude Union's first child attributes when traversing other children in RemoveRedundantAliases 9601be96a86 is described below commit 9601be96a86eced683e6aa2b772c726eeb231de8 Author: Peter Toth AuthorDate: Wed Aug 17 14:57:35 2022 +0800 [SPARK-39887][SQL][FOLLOW-UP] Do not exclude Union's first child attributes when traversing other children in RemoveRedundantAliases ### What changes were proposed in this pull request? Do not exclude `Union`'s first child attributes when traversing other children in `RemoveRedundantAliases`. ### Why are the changes needed? We don't need to exclude those attributes that `Union` inherits from its first child. See discussion here: https://github.com/apache/spark/pull/37496#discussion_r944509115 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs. Closes #37534 from peter-toth/SPARK-39887-keep-attributes-of-unions-first-child-follow-up. Authored-by: Peter Toth Signed-off-by: Wenchen Fan (cherry picked from commit e732232dac420826af269d8cf5efacb52933f59a) Signed-off-by: Wenchen Fan --- .../scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala| 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 558a67ff5ca..4807824ee71 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -544,7 +544,7 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { }) Join(newLeft, newRight, joinType, newCondition, hint) - case _: Union => + case u: Union => var first = true plan.mapChildren { child => if (first) { @@ -555,7 +555,8 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { // output attributes could return incorrect result. removeRedundantAliases(child, excluded ++ child.outputSet) } else { -removeRedundantAliases(child, excluded) +// We don't need to exclude those attributes that `Union` inherits from its first child. +removeRedundantAliases(child, excluded -- u.children.head.outputSet) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (6c25ce383d1 -> e732232dac4)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 6c25ce383d1 [SPARK-40116][R][INFRA] Pin Arrow version to 8.0.0 in AppVeyor add e732232dac4 [SPARK-39887][SQL][FOLLOW-UP] Do not exclude Union's first child attributes when traversing other children in RemoveRedundantAliases No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala| 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40116][R][INFRA] Pin Arrow version to 8.0.0 in AppVeyor
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6c25ce383d1 [SPARK-40116][R][INFRA] Pin Arrow version to 8.0.0 in AppVeyor 6c25ce383d1 is described below commit 6c25ce383d1408884366eb6cf22f302e6b2d8864 Author: Hyukjin Kwon AuthorDate: Wed Aug 17 15:34:20 2022 +0900 [SPARK-40116][R][INFRA] Pin Arrow version to 8.0.0 in AppVeyor ### What changes were proposed in this pull request? This PR proposes to remoove the Arrow in AppVeyor for now to recover the build. ### Why are the changes needed? SparkR does not support Arrow 9.0.0 ([SPARK-40114](https://issues.apache.org/jira/browse/SPARK-40114)) so the tests fail (https://ci.appveyor.com/project/HyukjinKwon/spark/builds/44490387) Should recover the tests first because it looks it's going to take a while to add the Arrow 9.0.0 support. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? CI in this PR should test it out. Closes #37546 from HyukjinKwon/SPARK-40116. Lead-authored-by: Hyukjin Kwon Co-authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- appveyor.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/appveyor.yml b/appveyor.yml index 1a2aef0d3b8..3ec79645697 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -42,7 +42,8 @@ install: # Install SBT and dependencies - ps: .\dev\appveyor-install-dependencies.ps1 # Required package for R unit tests. xml2 is required to use jUnit reporter in testthat. - - cmd: Rscript -e "install.packages(c('knitr', 'rmarkdown', 'testthat', 'e1071', 'survival', 'arrow', 'xml2'), repos='https://cloud.r-project.org/')" + # TODO(SPARK-40114): Add 'arrow' back with supporting Arrow 9.0.0 + - cmd: Rscript -e "install.packages(c('knitr', 'rmarkdown', 'testthat', 'e1071', 'survival', 'xml2'), repos='https://cloud.r-project.org/')" - cmd: Rscript -e "pkg_list <- as.data.frame(installed.packages()[,c(1, 3:4)]); pkg_list[is.na(pkg_list$Priority), 1:2, drop = FALSE]" build_script: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.3 updated: [SPARK-40117][PYTHON][SQL] Convert condition to java in DataFrameWriterV2.overwrite
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 0db78424201 [SPARK-40117][PYTHON][SQL] Convert condition to java in DataFrameWriterV2.overwrite 0db78424201 is described below commit 0db78424201cd7b2e2bcffb9de3c2a12a0c67b44 Author: Wenli Looi AuthorDate: Wed Aug 17 15:28:55 2022 +0900 [SPARK-40117][PYTHON][SQL] Convert condition to java in DataFrameWriterV2.overwrite ### What changes were proposed in this pull request? Fix DataFrameWriterV2.overwrite() fails to convert the condition parameter to java. This prevents the function from being called. It is caused by the following commit that deleted the `_to_java_column` call instead of fixing it: https://github.com/apache/spark/commit/a1e459ed9f6777fb8d5a2d09fda666402f9230b9 ### Why are the changes needed? DataFrameWriterV2.overwrite() cannot be called. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually checked whether the arguments are sent to JVM or not. Closes #37547 from looi/fix-overwrite. Authored-by: Wenli Looi Signed-off-by: Hyukjin Kwon (cherry picked from commit 46379863ab0dd2ee8fcf1e31e76476ff18397f60) Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/readwriter.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 760e54831c2..c4c813e56b1 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -1465,6 +1465,7 @@ class DataFrameWriterV2: Overwrite rows matching the given filter condition with the contents of the data frame in the output table. """ +condition = _to_java_column(condition) self._jwriter.overwrite(condition) @since(3.1) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (74deefb7894 -> 46379863ab0)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 74deefb7894 [SPARK-40084][PYTHON] Upgrade Py4J to 0.10.9.7 add 46379863ab0 [SPARK-40117][PYTHON][SQL] Convert condition to java in DataFrameWriterV2.overwrite No new revisions were added by this update. Summary of changes: python/pyspark/sql/readwriter.py | 1 + 1 file changed, 1 insertion(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.1 updated: [SPARK-40117][PYTHON][SQL] Convert condition to java in DataFrameWriterV2.overwrite
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 1bb9e332faa [SPARK-40117][PYTHON][SQL] Convert condition to java in DataFrameWriterV2.overwrite 1bb9e332faa is described below commit 1bb9e332faadd0fe8650285874b7f320d60405e6 Author: Wenli Looi AuthorDate: Wed Aug 17 15:28:55 2022 +0900 [SPARK-40117][PYTHON][SQL] Convert condition to java in DataFrameWriterV2.overwrite ### What changes were proposed in this pull request? Fix DataFrameWriterV2.overwrite() fails to convert the condition parameter to java. This prevents the function from being called. It is caused by the following commit that deleted the `_to_java_column` call instead of fixing it: https://github.com/apache/spark/commit/a1e459ed9f6777fb8d5a2d09fda666402f9230b9 ### Why are the changes needed? DataFrameWriterV2.overwrite() cannot be called. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually checked whether the arguments are sent to JVM or not. Closes #37547 from looi/fix-overwrite. Authored-by: Wenli Looi Signed-off-by: Hyukjin Kwon (cherry picked from commit 46379863ab0dd2ee8fcf1e31e76476ff18397f60) Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/readwriter.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 9529cf0fdae..cfa2f3fffb0 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -1570,6 +1570,7 @@ class DataFrameWriterV2(object): Overwrite rows matching the given filter condition with the contents of the data frame in the output table. """ +condition = _to_java_column(condition) self._jwriter.overwrite(condition) @since(3.1) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-40117][PYTHON][SQL] Convert condition to java in DataFrameWriterV2.overwrite
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 6ec86e5d5d9 [SPARK-40117][PYTHON][SQL] Convert condition to java in DataFrameWriterV2.overwrite 6ec86e5d5d9 is described below commit 6ec86e5d5d9df0b0d111decd0ea16e5a3e4cd3fe Author: Wenli Looi AuthorDate: Wed Aug 17 15:28:55 2022 +0900 [SPARK-40117][PYTHON][SQL] Convert condition to java in DataFrameWriterV2.overwrite ### What changes were proposed in this pull request? Fix DataFrameWriterV2.overwrite() fails to convert the condition parameter to java. This prevents the function from being called. It is caused by the following commit that deleted the `_to_java_column` call instead of fixing it: https://github.com/apache/spark/commit/a1e459ed9f6777fb8d5a2d09fda666402f9230b9 ### Why are the changes needed? DataFrameWriterV2.overwrite() cannot be called. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually checked whether the arguments are sent to JVM or not. Closes #37547 from looi/fix-overwrite. Authored-by: Wenli Looi Signed-off-by: Hyukjin Kwon (cherry picked from commit 46379863ab0dd2ee8fcf1e31e76476ff18397f60) Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/readwriter.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 65753543cb5..892b2500097 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -1163,6 +1163,7 @@ class DataFrameWriterV2(object): Overwrite rows matching the given filter condition with the contents of the data frame in the output table. """ +condition = _to_java_column(condition) self._jwriter.overwrite(condition) @since(3.1) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org