This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
commit be5249092e151cbd2f54053d3e66f445b97a460e Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Tue May 3 08:34:39 2022 +0900 Revert "[SPARK-39084][PYSPARK] Fix df.rdd.isEmpty() by using TaskContext to stop iterator on task completion" This reverts commit 660a9f845f954b4bf2c3a7d51988b33ae94e3207. --- python/pyspark/sql/tests/test_dataframe.py | 81 ---------------------- .../sql/execution/python/EvaluatePython.scala | 3 +- 2 files changed, 1 insertion(+), 83 deletions(-) diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index dfdbcb912f7..e3977e81851 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_dataframe.py @@ -21,7 +21,6 @@ import shutil import tempfile import time import unittest -import uuid from pyspark.sql import SparkSession, Row from pyspark.sql.types import StringType, IntegerType, DoubleType, StructType, StructField, \ @@ -838,86 +837,6 @@ class DataFrameTests(ReusedSQLTestCase): finally: shutil.rmtree(tpath) -<<<<<<< HEAD -======= - def test_df_show(self): - # SPARK-35408: ensure better diagnostics if incorrect parameters are passed - # to DataFrame.show - - df = self.spark.createDataFrame([("foo",)]) - df.show(5) - df.show(5, True) - df.show(5, 1, True) - df.show(n=5, truncate="1", vertical=False) - df.show(n=5, truncate=1.5, vertical=False) - - with self.assertRaisesRegex(TypeError, "Parameter 'n'"): - df.show(True) - with self.assertRaisesRegex(TypeError, "Parameter 'vertical'"): - df.show(vertical="foo") - with self.assertRaisesRegex(TypeError, "Parameter 'truncate=foo'"): - df.show(truncate="foo") - - def test_df_is_empty(self): - # SPARK-39084: Fix df.rdd.isEmpty() resulting in JVM crash. - - # This particular example of DataFrame reproduces an issue in isEmpty call - # which could result in JVM crash. - data = [] - for t in range(0, 10000): - id = str(uuid.uuid4()) - if t == 0: - for i in range(0, 99): - data.append((id,)) - elif t < 10: - for i in range(0, 75): - data.append((id,)) - elif t < 100: - for i in range(0, 50): - data.append((id,)) - elif t < 1000: - for i in range(0, 25): - data.append((id,)) - else: - for i in range(0, 10): - data.append((id,)) - - tmpPath = tempfile.mkdtemp() - shutil.rmtree(tmpPath) - try: - df = self.spark.createDataFrame(data, ["col"]) - df.coalesce(1).write.parquet(tmpPath) - - res = self.spark.read.parquet(tmpPath).groupBy("col").count() - self.assertFalse(res.rdd.isEmpty()) - finally: - shutil.rmtree(tmpPath) - - @unittest.skipIf( - not have_pandas or not have_pyarrow, - cast(str, pandas_requirement_message or pyarrow_requirement_message), - ) - def test_pandas_api(self): - import pandas as pd - from pandas.testing import assert_frame_equal - - sdf = self.spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["Col1", "Col2"]) - psdf_from_sdf = sdf.pandas_api() - psdf_from_sdf_with_index = sdf.pandas_api(index_col="Col1") - pdf = pd.DataFrame({"Col1": ["a", "b", "c"], "Col2": [1, 2, 3]}) - pdf_with_index = pdf.set_index("Col1") - - assert_frame_equal(pdf, psdf_from_sdf.to_pandas()) - assert_frame_equal(pdf_with_index, psdf_from_sdf_with_index.to_pandas()) - - # test for SPARK-36337 - def test_create_nan_decimal_dataframe(self): - self.assertEqual( - self.spark.createDataFrame(data=[Decimal("NaN")], schema="decimal").collect(), - [Row(value=None)], - ) - ->>>>>>> 9305cc744d2 ([SPARK-39084][PYSPARK] Fix df.rdd.isEmpty() by using TaskContext to stop iterator on task completion) class QueryExecutionListenerTests(unittest.TestCase, SQLTestUtils): # These tests are separate because it uses 'spark.sql.queryExecutionListeners' which is diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala index ca33f6951e1..7fe32636308 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala @@ -24,7 +24,6 @@ import scala.collection.JavaConverters._ import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler} -import org.apache.spark.{ContextAwareIterator, TaskContext} import org.apache.spark.api.python.SerDeUtil import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -301,7 +300,7 @@ object EvaluatePython { def javaToPython(rdd: RDD[Any]): RDD[Array[Byte]] = { rdd.mapPartitions { iter => registerPicklers() // let it called in executor - new SerDeUtil.AutoBatchedPickler(new ContextAwareIterator(TaskContext.get, iter)) + new SerDeUtil.AutoBatchedPickler(iter) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org