Repository: spark
Updated Branches:
  refs/heads/master c7628771d -> c246b95dd


[SPARK-4841] fix zip with textFile()

UTF8Deserializer can not be used in BatchedSerializer, so always use 
PickleSerializer() when change batchSize in zip().

Also, if two RDD have the same batch size already, they did not need 
re-serialize any more.

Author: Davies Liu <dav...@databricks.com>

Closes #3706 from davies/fix_4841 and squashes the following commits:

20ce3a3 [Davies Liu] fix bug in _reserialize()
e3ebf7c [Davies Liu] add comment
379d2c8 [Davies Liu] fix zip with textFile()


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c246b95d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c246b95d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c246b95d

Branch: refs/heads/master
Commit: c246b95dd2f565043db429c38c6cc029a0b870c1
Parents: c762877
Author: Davies Liu <dav...@databricks.com>
Authored: Mon Dec 15 22:58:26 2014 -0800
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Mon Dec 15 22:58:26 2014 -0800

----------------------------------------------------------------------
 python/pyspark/rdd.py         | 25 +++++++++++--------------
 python/pyspark/serializers.py |  6 ++++++
 python/pyspark/tests.py       |  9 +++++++++
 3 files changed, 26 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c246b95d/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 5775477..bd2ff00 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -469,8 +469,7 @@ class RDD(object):
     def _reserialize(self, serializer=None):
         serializer = serializer or self.ctx.serializer
         if self._jrdd_deserializer != serializer:
-            if not isinstance(self, PipelinedRDD):
-                self = self.map(lambda x: x, preservesPartitioning=True)
+            self = self.map(lambda x: x, preservesPartitioning=True)
             self._jrdd_deserializer = serializer
         return self
 
@@ -1798,23 +1797,21 @@ class RDD(object):
         def get_batch_size(ser):
             if isinstance(ser, BatchedSerializer):
                 return ser.batchSize
-            return 1
+            return 1  # not batched
 
         def batch_as(rdd, batchSize):
-            ser = rdd._jrdd_deserializer
-            if isinstance(ser, BatchedSerializer):
-                ser = ser.serializer
-            return rdd._reserialize(BatchedSerializer(ser, batchSize))
+            return rdd._reserialize(BatchedSerializer(PickleSerializer(), 
batchSize))
 
         my_batch = get_batch_size(self._jrdd_deserializer)
         other_batch = get_batch_size(other._jrdd_deserializer)
-        # use the smallest batchSize for both of them
-        batchSize = min(my_batch, other_batch)
-        if batchSize <= 0:
-            # auto batched or unlimited
-            batchSize = 100
-        other = batch_as(other, batchSize)
-        self = batch_as(self, batchSize)
+        if my_batch != other_batch:
+            # use the smallest batchSize for both of them
+            batchSize = min(my_batch, other_batch)
+            if batchSize <= 0:
+                # auto batched or unlimited
+                batchSize = 100
+            other = batch_as(other, batchSize)
+            self = batch_as(self, batchSize)
 
         if self.getNumPartitions() != other.getNumPartitions():
             raise ValueError("Can only zip with RDD which has the same number 
of partitions")

http://git-wip-us.apache.org/repos/asf/spark/blob/c246b95d/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 33aa55f..bd08c9a 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -463,6 +463,9 @@ class CompressedSerializer(FramedSerializer):
     def loads(self, obj):
         return self.serializer.loads(zlib.decompress(obj))
 
+    def __eq__(self, other):
+        return isinstance(other, CompressedSerializer) and self.serializer == 
other.serializer
+
 
 class UTF8Deserializer(Serializer):
 
@@ -489,6 +492,9 @@ class UTF8Deserializer(Serializer):
         except EOFError:
             return
 
+    def __eq__(self, other):
+        return isinstance(other, UTF8Deserializer) and self.use_unicode == 
other.use_unicode
+
 
 def read_long(stream):
     length = stream.read(8)

http://git-wip-us.apache.org/repos/asf/spark/blob/c246b95d/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 3264577..bca52a7 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -533,6 +533,15 @@ class RDDTests(ReusedPySparkTestCase):
         a = a._reserialize(BatchedSerializer(PickleSerializer(), 2))
         b = b._reserialize(MarshalSerializer())
         self.assertEqual(a.zip(b).collect(), [(0, 100), (1, 101), (2, 102), 
(3, 103), (4, 104)])
+        # regression test for SPARK-4841
+        path = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
+        t = self.sc.textFile(path)
+        cnt = t.count()
+        self.assertEqual(cnt, t.zip(t).count())
+        rdd = t.map(str)
+        self.assertEqual(cnt, t.zip(rdd).count())
+        # regression test for bug in _reserializer()
+        self.assertEqual(cnt, t.zip(rdd).count())
 
     def test_zip_with_different_number_of_items(self):
         a = self.sc.parallelize(range(5), 2)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to