This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d9bcacf [SPARK-27629][PYSPARK] Prevent Unpickler from intervening each unpickling d9bcacf is described below commit d9bcacf94b93fe76542b5c1fd852559075ef6faa Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Sat May 4 13:21:08 2019 +0900 [SPARK-27629][PYSPARK] Prevent Unpickler from intervening each unpickling ## What changes were proposed in this pull request? In SPARK-27612, one correctness issue was reported. When protocol 4 is used to pickle Python objects, we found that unpickled objects were wrong. A temporary fix was proposed by not using highest protocol. It was found that Opcodes.MEMOIZE was appeared in the opcodes in protocol 4. It is suspect to this issue. A deeper dive found that Opcodes.MEMOIZE stores objects into internal map of Unpickler object. We use single Unpickler object to unpickle serialized Python bytes. Stored objects intervenes next round of unpickling, if the map is not cleared. We has two options: 1. Continues to reuse Unpickler, but calls its close after each unpickling. 2. Not to reuse Unpickler and create new Unpickler object in each unpickling. This patch takes option 1. ## How was this patch tested? Passing the test added in SPARK-27612 (#24519). Closes #24521 from viirya/SPARK-27629. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala | 3 +++ .../main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 4 ++++ python/pyspark/serializers.py | 3 +-- python/pyspark/sql/tests/test_serde.py | 4 ++++ .../org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala | 4 ++++ 5 files changed, 16 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index 01e64b6..9462dfd 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -186,6 +186,9 @@ private[spark] object SerDeUtil extends Logging { val unpickle = new Unpickler iter.flatMap { row => val obj = unpickle.loads(row) + // `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map + // of `Unpickler`. This map is cleared when calling `Unpickler.close()`. + unpickle.close() if (batched) { obj match { case array: Array[Any] => array.toSeq diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 3e1bbba..322ef93 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1347,6 +1347,10 @@ private[spark] abstract class SerDeBase { val unpickle = new Unpickler iter.flatMap { row => val obj = unpickle.loads(row) + // `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map + // of `Unpickler`. This map is cleared when calling `Unpickler.close()`. Pyrolite + // doesn't clear it up, so we manually clear it. + unpickle.close() if (batched) { obj match { case list: JArrayList[_] => list.asScala diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 5311087..6058e94 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -62,12 +62,11 @@ import itertools if sys.version < '3': import cPickle as pickle from itertools import izip as zip, imap as map - pickle_protocol = 2 else: import pickle basestring = unicode = str xrange = range - pickle_protocol = 3 +pickle_protocol = pickle.HIGHEST_PROTOCOL from pyspark import cloudpickle from pyspark.util import _exception_message diff --git a/python/pyspark/sql/tests/test_serde.py b/python/pyspark/sql/tests/test_serde.py index 1c18e93..ed4b9a7 100644 --- a/python/pyspark/sql/tests/test_serde.py +++ b/python/pyspark/sql/tests/test_serde.py @@ -128,6 +128,10 @@ class SerdeTests(ReusedSQLTestCase): def test_int_array_serialization(self): # Note that this test seems dependent on parallelism. + # This issue is because internal object map in Pyrolite is not cleared after op code + # STOP. If we use protocol 4 to pickle Python objects, op code MEMOIZE will store + # objects in the map. We need to clear up it to make sure next unpickling works on + # clear map. data = self.spark.sparkContext.parallelize([[1, 2, 3, 4]] * 100, numSlices=12) df = self.spark.createDataFrame(data, "array<integer>") self.assertEqual(len(list(filter(lambda r: None in r.value, df.collect()))), 0) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala index d3736d2..eff709e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala @@ -92,6 +92,10 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi outputIterator.flatMap { pickedResult => val unpickledBatch = unpickle.loads(pickedResult) + // `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map + // of `Unpickler`. This map is cleared when calling `Unpickler.close()`. Pyrolite + // doesn't clear it up, so we manually clear it. + unpickle.close() unpickledBatch.asInstanceOf[java.util.ArrayList[Any]].asScala }.map { result => if (udfs.length == 1) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org