This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new bd6fd7e1320 [SPARK-39084][PYSPARK] Fix df.rdd.isEmpty() by using TaskContext to stop iterator on task completion bd6fd7e1320 is described below commit bd6fd7e1320f689c42c8ef6710f250123a78707d Author: Ivan Sadikov <ivan.sadi...@databricks.com> AuthorDate: Tue May 3 08:30:05 2022 +0900 [SPARK-39084][PYSPARK] Fix df.rdd.isEmpty() by using TaskContext to stop iterator on task completion ### What changes were proposed in this pull request? This PR fixes the issue described in https://issues.apache.org/jira/browse/SPARK-39084 where calling `df.rdd.isEmpty()` on a particular dataset could result in a JVM crash and/or executor failure. The issue was due to Python iterator not being synchronised with Java iterator so when the task is complete, the Python iterator continues to process data. We have introduced ContextAwareIterator as part of https://issues.apache.org/jira/browse/SPARK-33277 but we did not fix all of the places where this should be used. ### Why are the changes needed? Fixes the JVM crash when checking isEmpty() on a dataset. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added a test case that reproduces the issue 100%. I confirmed that the test fails without the fix and passes with the fix. Closes #36425 from sadikovi/fix-pyspark-iter-2. Authored-by: Ivan Sadikov <ivan.sadi...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 9305cc744d27daa6a746d3eb30e7639c63329072) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/sql/tests/test_dataframe.py | 36 ++++++++++++++++++++++ .../sql/execution/python/EvaluatePython.scala | 3 +- 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index be5e1d9a6e5..fd54c25c705 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_dataframe.py @@ -22,6 +22,7 @@ import shutil import tempfile import time import unittest +import uuid from typing import cast from pyspark.sql import SparkSession, Row @@ -1141,6 +1142,41 @@ class DataFrameTests(ReusedSQLTestCase): with self.assertRaisesRegex(TypeError, "Parameter 'truncate=foo'"): df.show(truncate="foo") + def test_df_is_empty(self): + # SPARK-39084: Fix df.rdd.isEmpty() resulting in JVM crash. + + # This particular example of DataFrame reproduces an issue in isEmpty call + # which could result in JVM crash. + data = [] + for t in range(0, 10000): + id = str(uuid.uuid4()) + if t == 0: + for i in range(0, 99): + data.append((id,)) + elif t < 10: + for i in range(0, 75): + data.append((id,)) + elif t < 100: + for i in range(0, 50): + data.append((id,)) + elif t < 1000: + for i in range(0, 25): + data.append((id,)) + else: + for i in range(0, 10): + data.append((id,)) + + tmpPath = tempfile.mkdtemp() + shutil.rmtree(tmpPath) + try: + df = self.spark.createDataFrame(data, ["col"]) + df.coalesce(1).write.parquet(tmpPath) + + res = self.spark.read.parquet(tmpPath).groupBy("col").count() + self.assertFalse(res.rdd.isEmpty()) + finally: + shutil.rmtree(tmpPath) + @unittest.skipIf( not have_pandas or not have_pyarrow, cast(str, pandas_requirement_message or pyarrow_requirement_message), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala index 6664acf9572..8d2f788e05c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler} +import org.apache.spark.{ContextAwareIterator, TaskContext} import org.apache.spark.api.python.SerDeUtil import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -301,7 +302,7 @@ object EvaluatePython { def javaToPython(rdd: RDD[Any]): RDD[Array[Byte]] = { rdd.mapPartitions { iter => registerPicklers() // let it called in executor - new SerDeUtil.AutoBatchedPickler(iter) + new SerDeUtil.AutoBatchedPickler(new ContextAwareIterator(TaskContext.get, iter)) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org