This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git

commit be5249092e151cbd2f54053d3e66f445b97a460e
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Tue May 3 08:34:39 2022 +0900

    Revert "[SPARK-39084][PYSPARK] Fix df.rdd.isEmpty() by using TaskContext to 
stop iterator on task completion"
    
    This reverts commit 660a9f845f954b4bf2c3a7d51988b33ae94e3207.
---
 python/pyspark/sql/tests/test_dataframe.py         | 81 ----------------------
 .../sql/execution/python/EvaluatePython.scala      |  3 +-
 2 files changed, 1 insertion(+), 83 deletions(-)

diff --git a/python/pyspark/sql/tests/test_dataframe.py 
b/python/pyspark/sql/tests/test_dataframe.py
index dfdbcb912f7..e3977e81851 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -21,7 +21,6 @@ import shutil
 import tempfile
 import time
 import unittest
-import uuid
 
 from pyspark.sql import SparkSession, Row
 from pyspark.sql.types import StringType, IntegerType, DoubleType, StructType, 
StructField, \
@@ -838,86 +837,6 @@ class DataFrameTests(ReusedSQLTestCase):
         finally:
             shutil.rmtree(tpath)
 
-<<<<<<< HEAD
-=======
-    def test_df_show(self):
-        # SPARK-35408: ensure better diagnostics if incorrect parameters are 
passed
-        # to DataFrame.show
-
-        df = self.spark.createDataFrame([("foo",)])
-        df.show(5)
-        df.show(5, True)
-        df.show(5, 1, True)
-        df.show(n=5, truncate="1", vertical=False)
-        df.show(n=5, truncate=1.5, vertical=False)
-
-        with self.assertRaisesRegex(TypeError, "Parameter 'n'"):
-            df.show(True)
-        with self.assertRaisesRegex(TypeError, "Parameter 'vertical'"):
-            df.show(vertical="foo")
-        with self.assertRaisesRegex(TypeError, "Parameter 'truncate=foo'"):
-            df.show(truncate="foo")
-
-    def test_df_is_empty(self):
-        # SPARK-39084: Fix df.rdd.isEmpty() resulting in JVM crash.
-
-        # This particular example of DataFrame reproduces an issue in isEmpty 
call
-        # which could result in JVM crash.
-        data = []
-        for t in range(0, 10000):
-            id = str(uuid.uuid4())
-            if t == 0:
-                for i in range(0, 99):
-                    data.append((id,))
-            elif t < 10:
-                for i in range(0, 75):
-                    data.append((id,))
-            elif t < 100:
-                for i in range(0, 50):
-                    data.append((id,))
-            elif t < 1000:
-                for i in range(0, 25):
-                    data.append((id,))
-            else:
-                for i in range(0, 10):
-                    data.append((id,))
-
-        tmpPath = tempfile.mkdtemp()
-        shutil.rmtree(tmpPath)
-        try:
-            df = self.spark.createDataFrame(data, ["col"])
-            df.coalesce(1).write.parquet(tmpPath)
-
-            res = self.spark.read.parquet(tmpPath).groupBy("col").count()
-            self.assertFalse(res.rdd.isEmpty())
-        finally:
-            shutil.rmtree(tmpPath)
-
-    @unittest.skipIf(
-        not have_pandas or not have_pyarrow,
-        cast(str, pandas_requirement_message or pyarrow_requirement_message),
-    )
-    def test_pandas_api(self):
-        import pandas as pd
-        from pandas.testing import assert_frame_equal
-
-        sdf = self.spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], 
["Col1", "Col2"])
-        psdf_from_sdf = sdf.pandas_api()
-        psdf_from_sdf_with_index = sdf.pandas_api(index_col="Col1")
-        pdf = pd.DataFrame({"Col1": ["a", "b", "c"], "Col2": [1, 2, 3]})
-        pdf_with_index = pdf.set_index("Col1")
-
-        assert_frame_equal(pdf, psdf_from_sdf.to_pandas())
-        assert_frame_equal(pdf_with_index, 
psdf_from_sdf_with_index.to_pandas())
-
-    # test for SPARK-36337
-    def test_create_nan_decimal_dataframe(self):
-        self.assertEqual(
-            self.spark.createDataFrame(data=[Decimal("NaN")], 
schema="decimal").collect(),
-            [Row(value=None)],
-        )
-
->>>>>>> 9305cc744d2 ([SPARK-39084][PYSPARK] Fix df.rdd.isEmpty() by using 
TaskContext to stop iterator on task completion)
 
 class QueryExecutionListenerTests(unittest.TestCase, SQLTestUtils):
     # These tests are separate because it uses 
'spark.sql.queryExecutionListeners' which is
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
index ca33f6951e1..7fe32636308 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
@@ -24,7 +24,6 @@ import scala.collection.JavaConverters._
 
 import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler}
 
-import org.apache.spark.{ContextAwareIterator, TaskContext}
 import org.apache.spark.api.python.SerDeUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -301,7 +300,7 @@ object EvaluatePython {
   def javaToPython(rdd: RDD[Any]): RDD[Array[Byte]] = {
     rdd.mapPartitions { iter =>
       registerPicklers()  // let it called in executor
-      new SerDeUtil.AutoBatchedPickler(new 
ContextAwareIterator(TaskContext.get, iter))
+      new SerDeUtil.AutoBatchedPickler(iter)
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to