Add support for bz2 compression
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a9a00bae Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a9a00bae Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a9a00bae Branch: refs/heads/python-sdk Commit: a9a00bae2f25237ffb58049d5ea72f532da24d78 Parents: 240f8f6 Author: tim1357 <sears....@gmail.com> Authored: Mon Sep 26 15:15:51 2016 -0400 Committer: Robert Bradshaw <rober...@google.com> Committed: Tue Oct 11 11:15:07 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/fileio.py | 33 ++++-- sdks/python/apache_beam/io/fileio_test.py | 156 ++++++++++++++++++++++++- 2 files changed, 179 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a9a00bae/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index c248f12..574295e 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -27,6 +27,7 @@ import shutil import threading import time import zlib +import bz2 import weakref from apache_beam import coders @@ -76,6 +77,9 @@ class CompressionTypes(object): # ZLIB compression (deflate with ZLIB headers). ZLIB = _CompressionType('zlib') + # BZIP2 compression (deflate with BZIP2 headers) + BZIP2 = _CompressionType('bz2') + # Uncompressed (i.e., may be split). UNCOMPRESSED = _CompressionType('uncompressed') @@ -92,14 +96,17 @@ class CompressionTypes(object): def mime_type(cls, compression_type, default='application/octet-stream'): mime_types_by_compression_type = { cls.GZIP: 'application/x-gzip', - cls.ZLIB: 'application/octet-stream' + cls.ZLIB: 'application/octet-stream', + cls.BZIP2: 'application/octet-stream' } return mime_types_by_compression_type.get(compression_type, default) @classmethod def detect_compression_type(cls, file_path): """Returns the compression type of a file (based on its suffix)""" - compression_types_by_suffix = {'.gz': cls.GZIP, '.z': cls.ZLIB} + compression_types_by_suffix = {'.gz': cls.GZIP, + '.z': cls.ZLIB, + '.bz2': cls.BZIP2} lowercased_path = file_path.lower() for suffix, compression_type in compression_types_by_suffix.iteritems(): if lowercased_path.endswith(suffix): @@ -596,14 +603,21 @@ class _CompressedFile(object): self._compression_type = compression_type if self._readable(): - self._decompressor = zlib.decompressobj(self._type_mask[compression_type]) + if self._compression_type == CompressionTypes.BZIP2: + self._decompressor = bz2.BZ2Decompressor() + else: + self._decompressor = zlib.decompressobj(\ + self._type_mask[compression_type]) else: self._decompressor = None if self._writeable(): - self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, - zlib.DEFLATED, - self._type_mask[compression_type]) + if self._compression_type == CompressionTypes.BZIP2: + self._compressor = bz2.BZ2Compressor(9) + else: + self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, + zlib.DEFLATED, + self._type_mask[compression_type]) else: self._compressor = None @@ -638,10 +652,13 @@ class _CompressedFile(object): buf = self._file.read(self._read_size) if buf: self._data += self._decompressor.decompress(buf) - else: - # EOF reached, flush. + elif self._compression_type != CompressionTypes.BZIP2: + # EOF reached, flush. BZIP2 does not have this flush method, + # because of it's block nature self._data += self._decompressor.flush() return + else: + return def _read_from_internal_buffer(self, num_bytes): """Read up to num_bytes from the internal buffer.""" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a9a00bae/sdks/python/apache_beam/io/fileio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index b518b97..1ff9acd 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -25,6 +25,7 @@ import os import tempfile import unittest import zlib +import bz2 import apache_beam as beam from apache_beam import coders @@ -149,6 +150,56 @@ class TestTextFileSource(unittest.TestCase): read_lines.append(line) self.assertEqual(read_lines, lines) + def test_read_entire_file_bzip(self): + lines = ['First', 'Second', 'Third'] + compressor = bz2.BZ2Compressor(8) + data = compressor.compress('\n'.join(lines)) +compressor.flush() + source = fileio.TextFileSource( + file_path=self.create_temp_file(data), + compression_type=fileio.CompressionTypes.BZIP2) + read_lines = [] + with source.reader() as reader: + for line in reader: + read_lines.append(line) + self.assertEqual(read_lines, lines) + + def test_read_entire_file_bzip_auto(self): + lines = ['First', 'Second', 'Third'] + compressor = bz2.BZ2Compressor(8) + data = compressor.compress('\n'.join(lines)) + compressor.flush() + source = fileio.TextFileSource(file_path=self.create_temp_file( + data, suffix='.bz2')) + read_lines = [] + with source.reader() as reader: + for line in reader: + read_lines.append(line) + self.assertEqual(read_lines, lines) + + def test_read_entire_file_bzip_empty(self): + compressor = bz2.BZ2Compressor(8) + data = compressor.compress('') + compressor.flush() + source = fileio.TextFileSource( + file_path=self.create_temp_file(data), + compression_type=fileio.CompressionTypes.BZIP2) + read_lines = [] + with source.reader() as reader: + for line in reader: + read_lines.append(line) + self.assertEqual(read_lines, []) + + def test_read_entire_file_bzip_large(self): + lines = ['Line %d' % d for d in range(10 * 1000)] + compressor = bz2.BZ2Compressor(8) + data = compressor.compress('\n'.join(lines)) + compressor.flush() + source = fileio.TextFileSource( + file_path=self.create_temp_file(data), + compression_type=fileio.CompressionTypes.BZIP2) + read_lines = [] + with source.reader() as reader: + for line in reader: + read_lines.append(line) + self.assertEqual(read_lines, lines) + def test_read_entire_file_zlib(self): lines = ['First', 'Second', 'Third'] compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS) @@ -199,7 +250,7 @@ class TestTextFileSource(unittest.TestCase): read_lines.append(line) self.assertEqual(read_lines, lines) - def test_skip_entire_file_gzip(self): + def test_skip_entire_file_zlib(self): lines = ['First', 'Second', 'Third'] compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS | 16) data = compressor.compress('\n'.join(lines)) + compressor.flush() @@ -213,7 +264,21 @@ class TestTextFileSource(unittest.TestCase): read_lines.append(line) self.assertEqual(read_lines, []) - def test_skip_entire_file_zlib(self): + def test_skip_entire_file_bzip(self): + lines = ['First', 'Second', 'Third'] + compressor = bz2.BZ2Compressor(8) + data = compressor.compress('\n'.join(lines)) + compressor.flush() + source = fileio.TextFileSource( + file_path=self.create_temp_file(data), + start_offset=1, # Anything other than 0 should lead to a null-read. + compression_type=fileio.CompressionTypes.BZIP2) + read_lines = [] + with source.reader() as reader: + for line in reader: + read_lines.append(line) + self.assertEqual(read_lines, []) + + def test_skip_entire_file_gzip(self): lines = ['First', 'Second', 'Third'] compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS) data = compressor.compress('\n'.join(lines)) + compressor.flush() @@ -241,6 +306,20 @@ class TestTextFileSource(unittest.TestCase): read_lines.append(line) self.assertEqual(read_lines, lines) + def test_consume_entire_file_bzip(self): + lines = ['First', 'Second', 'Third'] + compressor = bz2.BZ2Compressor(8) + data = compressor.compress('\n'.join(lines)) + compressor.flush() + source = fileio.TextFileSource( + file_path=self.create_temp_file(data), + end_offset=1, # Any end_offset should effectively be ignored. + compression_type=fileio.CompressionTypes.BZIP2) + read_lines = [] + with source.reader() as reader: + for line in reader: + read_lines.append(line) + self.assertEqual(read_lines, lines) + def test_consume_entire_file_zlib(self): lines = ['First', 'Second', 'Third'] compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS) @@ -289,6 +368,25 @@ class TestTextFileSource(unittest.TestCase): self.assertEqual(len(progress_record), 3) self.assertEqual(progress_record, [0, 6, 13]) + def test_progress_entire_file_bzip(self): + lines = ['First', 'Second', 'Third'] + compressor = bz2.BZ2Compressor(8) + data = compressor.compress('\n'.join(lines)) + compressor.flush() + source = fileio.TextFileSource( + file_path=self.create_temp_file(data), + compression_type=fileio.CompressionTypes.BZIP2) + progress_record = [] + with source.reader() as reader: + self.assertEqual(-1, reader.get_progress().position.byte_offset) + for line in reader: + self.assertIsNotNone(line) + progress_record.append(reader.get_progress().position.byte_offset) + self.assertEqual(18, # Reading the entire contents before we decide EOF. + reader.get_progress().position.byte_offset) + + self.assertEqual(len(progress_record), 3) + self.assertEqual(progress_record, [0, 6, 13]) + def test_progress_entire_file_zlib(self): lines = ['First', 'Second', 'Third'] compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS) @@ -353,6 +451,36 @@ class TestTextFileSource(unittest.TestCase): dataflow_io.ReaderProgress(percent_complete=percent_complete)), None) + def test_bzip_file_unsplittable(self): + lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee'] + compressor = bz2.BZ2Compressor(8) + data = compressor.compress('\n'.join(lines)) + compressor.flush() + source = fileio.TextFileSource( + file_path=self.create_temp_file(data), + compression_type=fileio.CompressionTypes.BZIP2) + + with source.reader() as reader: + percents_complete = [x / 100.0 for x in range(101)] + + # Cursor at beginning of file. + for percent_complete in percents_complete: + self.try_splitting_reader_at( + reader, + iobase.DynamicSplitRequest( + iobase.ReaderProgress(percent_complete=percent_complete)), + None) + + # Cursor passed beginning of file. + reader_iter = iter(reader) + next(reader_iter) + next(reader_iter) + for percent_complete in percents_complete: + self.try_splitting_reader_at( + reader, + iobase.DynamicSplitRequest( + iobase.ReaderProgress(percent_complete=percent_complete)), + None) + def test_zlib_file_unsplittable(self): lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee'] compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS) @@ -659,6 +787,30 @@ class TestNativeTextFileSink(unittest.TestCase): with gzip.GzipFile(self.path, 'r') as f: self.assertEqual(f.read().splitlines(), []) + def test_write_text_bzip_file(self): + sink = fileio.NativeTextFileSink( + self.path, compression_type=fileio.CompressionTypes.BZIP2) + self._write_lines(sink, self.lines) + + with bz2.BZ2File(self.path, 'r') as f: + self.assertEqual(f.read().splitlines(), self.lines) + + def test_write_text_bzip_file_auto(self): + self.path = tempfile.NamedTemporaryFile(suffix='.bz2').name + sink = fileio.NativeTextFileSink(self.path) + self._write_lines(sink, self.lines) + + with bz2.BZ2File(self.path, 'r') as f: + self.assertEqual(f.read().splitlines(), self.lines) + + def test_write_text_bzip_file_empty(self): + sink = fileio.NativeTextFileSink( + self.path, compression_type=fileio.CompressionTypes.BZIP2) + self._write_lines(sink, []) + + with bz2.BZ2File(self.path, 'r') as f: + self.assertEqual(f.read().splitlines(), []) + def test_write_text_zlib_file(self): sink = fileio.NativeTextFileSink( self.path, compression_type=fileio.CompressionTypes.ZLIB)