This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d9bcacf  [SPARK-27629][PYSPARK] Prevent Unpickler from intervening 
each unpickling
d9bcacf is described below

commit d9bcacf94b93fe76542b5c1fd852559075ef6faa
Author: Liang-Chi Hsieh <vii...@gmail.com>
AuthorDate: Sat May 4 13:21:08 2019 +0900

    [SPARK-27629][PYSPARK] Prevent Unpickler from intervening each unpickling
    
    ## What changes were proposed in this pull request?
    
    In SPARK-27612, one correctness issue was reported. When protocol 4 is used 
to pickle Python objects, we found that unpickled objects were wrong. A 
temporary fix was proposed by not using highest protocol.
    
    It was found that Opcodes.MEMOIZE was appeared in the opcodes in protocol 
4. It is suspect to this issue.
    
    A deeper dive found that Opcodes.MEMOIZE stores objects into internal map 
of Unpickler object. We use single Unpickler object to unpickle serialized 
Python bytes. Stored objects intervenes next round of unpickling, if the map is 
not cleared.
    
    We has two options:
    
    1. Continues to reuse Unpickler, but calls its close after each unpickling.
    2. Not to reuse Unpickler and create new Unpickler object in each 
unpickling.
    
    This patch takes option 1.
    
    ## How was this patch tested?
    
    Passing the test added in SPARK-27612 (#24519).
    
    Closes #24521 from viirya/SPARK-27629.
    
    Authored-by: Liang-Chi Hsieh <vii...@gmail.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala       | 3 +++
 .../main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 4 ++++
 python/pyspark/serializers.py                                         | 3 +--
 python/pyspark/sql/tests/test_serde.py                                | 4 ++++
 .../org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala   | 4 ++++
 5 files changed, 16 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala 
b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index 01e64b6..9462dfd 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -186,6 +186,9 @@ private[spark] object SerDeUtil extends Logging {
       val unpickle = new Unpickler
       iter.flatMap { row =>
         val obj = unpickle.loads(row)
+        // `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in 
internal map
+        // of `Unpickler`. This map is cleared when calling 
`Unpickler.close()`.
+        unpickle.close()
         if (batched) {
           obj match {
             case array: Array[Any] => array.toSeq
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 3e1bbba..322ef93 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -1347,6 +1347,10 @@ private[spark] abstract class SerDeBase {
       val unpickle = new Unpickler
       iter.flatMap { row =>
         val obj = unpickle.loads(row)
+        // `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in 
internal map
+        // of `Unpickler`. This map is cleared when calling 
`Unpickler.close()`. Pyrolite
+        // doesn't clear it up, so we manually clear it.
+        unpickle.close()
         if (batched) {
           obj match {
             case list: JArrayList[_] => list.asScala
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 5311087..6058e94 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -62,12 +62,11 @@ import itertools
 if sys.version < '3':
     import cPickle as pickle
     from itertools import izip as zip, imap as map
-    pickle_protocol = 2
 else:
     import pickle
     basestring = unicode = str
     xrange = range
-    pickle_protocol = 3
+pickle_protocol = pickle.HIGHEST_PROTOCOL
 
 from pyspark import cloudpickle
 from pyspark.util import _exception_message
diff --git a/python/pyspark/sql/tests/test_serde.py 
b/python/pyspark/sql/tests/test_serde.py
index 1c18e93..ed4b9a7 100644
--- a/python/pyspark/sql/tests/test_serde.py
+++ b/python/pyspark/sql/tests/test_serde.py
@@ -128,6 +128,10 @@ class SerdeTests(ReusedSQLTestCase):
 
     def test_int_array_serialization(self):
         # Note that this test seems dependent on parallelism.
+        # This issue is because internal object map in Pyrolite is not cleared 
after op code
+        # STOP. If we use protocol 4 to pickle Python objects, op code MEMOIZE 
will store
+        # objects in the map. We need to clear up it to make sure next 
unpickling works on
+        # clear map.
         data = self.spark.sparkContext.parallelize([[1, 2, 3, 4]] * 100, 
numSlices=12)
         df = self.spark.createDataFrame(data, "array<integer>")
         self.assertEqual(len(list(filter(lambda r: None in r.value, 
df.collect()))), 0)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
index d3736d2..eff709e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
@@ -92,6 +92,10 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: 
Seq[Attribute], chi
 
     outputIterator.flatMap { pickedResult =>
       val unpickledBatch = unpickle.loads(pickedResult)
+      // `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in 
internal map
+      // of `Unpickler`. This map is cleared when calling `Unpickler.close()`. 
Pyrolite
+      // doesn't clear it up, so we manually clear it.
+      unpickle.close()
       unpickledBatch.asInstanceOf[java.util.ArrayList[Any]].asScala
     }.map { result =>
       if (udfs.length == 1) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to