Repository: spark Updated Branches: refs/heads/branch-1.3 87e0f0dc6 -> 1d3234165
SPARK-5633 pyspark saveAsTextFile support for compression codec See https://issues.apache.org/jira/browse/SPARK-5633 for details Author: Vladimir Vladimirov <vladimir.vladimi...@magnetic.com> Closes #4403 from smartkiwi/master and squashes the following commits: 94c014e [Vladimir Vladimirov] SPARK-5633 pyspark saveAsTextFile support for compression codec (cherry picked from commit b3872e00d155939e40366debda635fc3fb12cc73) Signed-off-by: Josh Rosen <joshro...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1d323416 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1d323416 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1d323416 Branch: refs/heads/branch-1.3 Commit: 1d3234165b9e0c4017fe26d9339a1ab49890c868 Parents: 87e0f0d Author: Vladimir Vladimirov <vladimir.vladimi...@magnetic.com> Authored: Fri Feb 6 13:55:02 2015 -0800 Committer: Josh Rosen <joshro...@databricks.com> Committed: Fri Feb 6 13:55:22 2015 -0800 ---------------------------------------------------------------------- python/pyspark/rdd.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1d323416/python/pyspark/rdd.py ---------------------------------------------------------------------- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 6e029bf..bd4f16e 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1366,10 +1366,14 @@ class RDD(object): ser = BatchedSerializer(PickleSerializer(), batchSize) self._reserialize(ser)._jrdd.saveAsObjectFile(path) - def saveAsTextFile(self, path): + def saveAsTextFile(self, path, compressionCodecClass=None): """ Save this RDD as a text file, using string representations of elements. + @param path: path to text file + @param compressionCodecClass: (None by default) string i.e. + "org.apache.hadoop.io.compress.GzipCodec" + >>> tempFile = NamedTemporaryFile(delete=True) >>> tempFile.close() >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) @@ -1385,6 +1389,16 @@ class RDD(object): >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name) >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*")))) '\\n\\n\\nbar\\nfoo\\n' + + Using compressionCodecClass + + >>> tempFile3 = NamedTemporaryFile(delete=True) + >>> tempFile3.close() + >>> codec = "org.apache.hadoop.io.compress.GzipCodec" + >>> sc.parallelize(['foo', 'bar']).saveAsTextFile(tempFile3.name, codec) + >>> from fileinput import input, hook_compressed + >>> ''.join(sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed))) + 'bar\\nfoo\\n' """ def func(split, iterator): for x in iterator: @@ -1395,7 +1409,11 @@ class RDD(object): yield x keyed = self.mapPartitionsWithIndex(func) keyed._bypass_serializer = True - keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) + if compressionCodecClass: + compressionCodec = self.ctx._jvm.java.lang.Class.forName(compressionCodecClass) + keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path, compressionCodec) + else: + keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) # Pair functions --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org