Repository: spark
Updated Branches:
  refs/heads/branch-1.3 87e0f0dc6 -> 1d3234165


SPARK-5633 pyspark saveAsTextFile support for compression codec

See https://issues.apache.org/jira/browse/SPARK-5633 for details

Author: Vladimir Vladimirov <vladimir.vladimi...@magnetic.com>

Closes #4403 from smartkiwi/master and squashes the following commits:

94c014e [Vladimir Vladimirov] SPARK-5633 pyspark saveAsTextFile support for 
compression codec

(cherry picked from commit b3872e00d155939e40366debda635fc3fb12cc73)
Signed-off-by: Josh Rosen <joshro...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1d323416
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1d323416
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1d323416

Branch: refs/heads/branch-1.3
Commit: 1d3234165b9e0c4017fe26d9339a1ab49890c868
Parents: 87e0f0d
Author: Vladimir Vladimirov <vladimir.vladimi...@magnetic.com>
Authored: Fri Feb 6 13:55:02 2015 -0800
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Fri Feb 6 13:55:22 2015 -0800

----------------------------------------------------------------------
 python/pyspark/rdd.py | 22 ++++++++++++++++++++--
 1 file changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1d323416/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 6e029bf..bd4f16e 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1366,10 +1366,14 @@ class RDD(object):
             ser = BatchedSerializer(PickleSerializer(), batchSize)
         self._reserialize(ser)._jrdd.saveAsObjectFile(path)
 
-    def saveAsTextFile(self, path):
+    def saveAsTextFile(self, path, compressionCodecClass=None):
         """
         Save this RDD as a text file, using string representations of elements.
 
+        @param path: path to text file
+        @param compressionCodecClass: (None by default) string i.e.
+            "org.apache.hadoop.io.compress.GzipCodec"
+
         >>> tempFile = NamedTemporaryFile(delete=True)
         >>> tempFile.close()
         >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
@@ -1385,6 +1389,16 @@ class RDD(object):
         >>> sc.parallelize(['', 'foo', '', 'bar', 
'']).saveAsTextFile(tempFile2.name)
         >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*"))))
         '\\n\\n\\nbar\\nfoo\\n'
+
+        Using compressionCodecClass
+
+        >>> tempFile3 = NamedTemporaryFile(delete=True)
+        >>> tempFile3.close()
+        >>> codec = "org.apache.hadoop.io.compress.GzipCodec"
+        >>> sc.parallelize(['foo', 'bar']).saveAsTextFile(tempFile3.name, 
codec)
+        >>> from fileinput import input, hook_compressed
+        >>> ''.join(sorted(input(glob(tempFile3.name + "/part*.gz"), 
openhook=hook_compressed)))
+        'bar\\nfoo\\n'
         """
         def func(split, iterator):
             for x in iterator:
@@ -1395,7 +1409,11 @@ class RDD(object):
                 yield x
         keyed = self.mapPartitionsWithIndex(func)
         keyed._bypass_serializer = True
-        keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
+        if compressionCodecClass:
+            compressionCodec = 
self.ctx._jvm.java.lang.Class.forName(compressionCodecClass)
+            
keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path, 
compressionCodec)
+        else:
+            keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
 
     # Pair functions
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to