This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 514d1899a1baf7c1bb5af68aa05e3886a80a0843
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Tue May 3 08:33:22 2022 +0900

    Revert "[SPARK-39084][PYSPARK] Fix df.rdd.isEmpty() by using TaskContext to 
stop iterator on task completion"
    
    This reverts commit 4dba99ae359b07f814f68707073414f60616b564.
---
 python/pyspark/sql/tests/test_dataframe.py         | 38 +---------------------
 .../sql/execution/python/EvaluatePython.scala      |  3 +-
 2 files changed, 2 insertions(+), 39 deletions(-)

diff --git a/python/pyspark/sql/tests/test_dataframe.py 
b/python/pyspark/sql/tests/test_dataframe.py
index f2826e29d36..8c9f3304e00 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -20,8 +20,7 @@ import pydoc
 import shutil
 import tempfile
 import time
-import uuid
-from typing import cast
+import unittest
 
 from pyspark.sql import SparkSession, Row
 from pyspark.sql.types import StringType, IntegerType, DoubleType, StructType, 
StructField, \
@@ -874,41 +873,6 @@ class DataFrameTests(ReusedSQLTestCase):
         with self.assertRaisesRegex(TypeError, "Parameter 'truncate=foo'"):
             df.show(truncate='foo')
 
-    def test_df_is_empty(self):
-        # SPARK-39084: Fix df.rdd.isEmpty() resulting in JVM crash.
-
-        # This particular example of DataFrame reproduces an issue in isEmpty 
call
-        # which could result in JVM crash.
-        data = []
-        for t in range(0, 10000):
-            id = str(uuid.uuid4())
-            if t == 0:
-                for i in range(0, 99):
-                    data.append((id,))
-            elif t < 10:
-                for i in range(0, 75):
-                    data.append((id,))
-            elif t < 100:
-                for i in range(0, 50):
-                    data.append((id,))
-            elif t < 1000:
-                for i in range(0, 25):
-                    data.append((id,))
-            else:
-                for i in range(0, 10):
-                    data.append((id,))
-
-        tmpPath = tempfile.mkdtemp()
-        shutil.rmtree(tmpPath)
-        try:
-            df = self.spark.createDataFrame(data, ["col"])
-            df.coalesce(1).write.parquet(tmpPath)
-
-            res = self.spark.read.parquet(tmpPath).groupBy("col").count()
-            self.assertFalse(res.rdd.isEmpty())
-        finally:
-            shutil.rmtree(tmpPath)
-
     @unittest.skipIf(
         not have_pandas or not have_pyarrow,
         pandas_requirement_message or pyarrow_requirement_message)  # type: 
ignore
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
index 667f2c030d5..4885f631138 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
@@ -24,7 +24,6 @@ import scala.collection.JavaConverters._
 
 import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler}
 
-import org.apache.spark.{ContextAwareIterator, TaskContext}
 import org.apache.spark.api.python.SerDeUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -301,7 +300,7 @@ object EvaluatePython {
   def javaToPython(rdd: RDD[Any]): RDD[Array[Byte]] = {
     rdd.mapPartitions { iter =>
       registerPicklers()  // let it called in executor
-      new SerDeUtil.AutoBatchedPickler(new 
ContextAwareIterator(TaskContext.get, iter))
+      new SerDeUtil.AutoBatchedPickler(iter)
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to