implement codreview feedback
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e223929c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e223929c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e223929c Branch: refs/heads/python-sdk Commit: e223929c68233853ea4d3b589078dbeaff0f6111 Parents: a9a00ba Author: tim1357 <sears....@gmail.com> Authored: Wed Oct 5 13:36:09 2016 -0400 Committer: Robert Bradshaw <rober...@google.com> Committed: Tue Oct 11 11:15:07 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/fileio.py | 10 +++--- sdks/python/apache_beam/io/fileio_test.py | 42 +++++++++++++------------- 2 files changed, 26 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e223929c/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 574295e..2670470 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -18,6 +18,7 @@ from __future__ import absolute_import +import bz2 import glob import logging from multiprocessing.pool import ThreadPool @@ -27,7 +28,6 @@ import shutil import threading import time import zlib -import bz2 import weakref from apache_beam import coders @@ -97,7 +97,7 @@ class CompressionTypes(object): mime_types_by_compression_type = { cls.GZIP: 'application/x-gzip', cls.ZLIB: 'application/octet-stream', - cls.BZIP2: 'application/octet-stream' + cls.BZIP2: 'application/x-bz2', } return mime_types_by_compression_type.get(compression_type, default) @@ -606,14 +606,14 @@ class _CompressedFile(object): if self._compression_type == CompressionTypes.BZIP2: self._decompressor = bz2.BZ2Decompressor() else: - self._decompressor = zlib.decompressobj(\ - self._type_mask[compression_type]) + self._decompressor = zlib.decompressobj( + self._type_mask[compression_type]) else: self._decompressor = None if self._writeable(): if self._compression_type == CompressionTypes.BZIP2: - self._compressor = bz2.BZ2Compressor(9) + self._compressor = bz2.BZ2Compressor() else: self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e223929c/sdks/python/apache_beam/io/fileio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index 1ff9acd..42ea9ff 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -18,6 +18,7 @@ """Unit tests for local and GCS sources and sinks.""" +import bz2 import glob import gzip import logging @@ -25,7 +26,6 @@ import os import tempfile import unittest import zlib -import bz2 import apache_beam as beam from apache_beam import coders @@ -150,10 +150,10 @@ class TestTextFileSource(unittest.TestCase): read_lines.append(line) self.assertEqual(read_lines, lines) - def test_read_entire_file_bzip(self): + def test_read_entire_file_bzip2(self): lines = ['First', 'Second', 'Third'] - compressor = bz2.BZ2Compressor(8) - data = compressor.compress('\n'.join(lines)) +compressor.flush() + compressor = bz2.BZ2Compressor() + data = compressor.compress('\n'.join(lines)) + compressor.flush() source = fileio.TextFileSource( file_path=self.create_temp_file(data), compression_type=fileio.CompressionTypes.BZIP2) @@ -163,9 +163,9 @@ class TestTextFileSource(unittest.TestCase): read_lines.append(line) self.assertEqual(read_lines, lines) - def test_read_entire_file_bzip_auto(self): + def test_read_entire_file_bzip2_auto(self): lines = ['First', 'Second', 'Third'] - compressor = bz2.BZ2Compressor(8) + compressor = bz2.BZ2Compressor() data = compressor.compress('\n'.join(lines)) + compressor.flush() source = fileio.TextFileSource(file_path=self.create_temp_file( data, suffix='.bz2')) @@ -175,8 +175,8 @@ class TestTextFileSource(unittest.TestCase): read_lines.append(line) self.assertEqual(read_lines, lines) - def test_read_entire_file_bzip_empty(self): - compressor = bz2.BZ2Compressor(8) + def test_read_entire_file_bzip2_empty(self): + compressor = bz2.BZ2Compressor() data = compressor.compress('') + compressor.flush() source = fileio.TextFileSource( file_path=self.create_temp_file(data), @@ -187,9 +187,9 @@ class TestTextFileSource(unittest.TestCase): read_lines.append(line) self.assertEqual(read_lines, []) - def test_read_entire_file_bzip_large(self): + def test_read_entire_file_bzip2_large(self): lines = ['Line %d' % d for d in range(10 * 1000)] - compressor = bz2.BZ2Compressor(8) + compressor = bz2.BZ2Compressor() data = compressor.compress('\n'.join(lines)) + compressor.flush() source = fileio.TextFileSource( file_path=self.create_temp_file(data), @@ -264,9 +264,9 @@ class TestTextFileSource(unittest.TestCase): read_lines.append(line) self.assertEqual(read_lines, []) - def test_skip_entire_file_bzip(self): + def test_skip_entire_file_bzip2(self): lines = ['First', 'Second', 'Third'] - compressor = bz2.BZ2Compressor(8) + compressor = bz2.BZ2Compressor() data = compressor.compress('\n'.join(lines)) + compressor.flush() source = fileio.TextFileSource( file_path=self.create_temp_file(data), @@ -306,9 +306,9 @@ class TestTextFileSource(unittest.TestCase): read_lines.append(line) self.assertEqual(read_lines, lines) - def test_consume_entire_file_bzip(self): + def test_consume_entire_file_bzip2(self): lines = ['First', 'Second', 'Third'] - compressor = bz2.BZ2Compressor(8) + compressor = bz2.BZ2Compressor() data = compressor.compress('\n'.join(lines)) + compressor.flush() source = fileio.TextFileSource( file_path=self.create_temp_file(data), @@ -368,9 +368,9 @@ class TestTextFileSource(unittest.TestCase): self.assertEqual(len(progress_record), 3) self.assertEqual(progress_record, [0, 6, 13]) - def test_progress_entire_file_bzip(self): + def test_progress_entire_file_bzip2(self): lines = ['First', 'Second', 'Third'] - compressor = bz2.BZ2Compressor(8) + compressor = bz2.BZ2Compressor() data = compressor.compress('\n'.join(lines)) + compressor.flush() source = fileio.TextFileSource( file_path=self.create_temp_file(data), @@ -451,9 +451,9 @@ class TestTextFileSource(unittest.TestCase): dataflow_io.ReaderProgress(percent_complete=percent_complete)), None) - def test_bzip_file_unsplittable(self): + def test_bzip2_file_unsplittable(self): lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee'] - compressor = bz2.BZ2Compressor(8) + compressor = bz2.BZ2Compressor() data = compressor.compress('\n'.join(lines)) + compressor.flush() source = fileio.TextFileSource( file_path=self.create_temp_file(data), @@ -787,7 +787,7 @@ class TestNativeTextFileSink(unittest.TestCase): with gzip.GzipFile(self.path, 'r') as f: self.assertEqual(f.read().splitlines(), []) - def test_write_text_bzip_file(self): + def test_write_text_bzip2_file(self): sink = fileio.NativeTextFileSink( self.path, compression_type=fileio.CompressionTypes.BZIP2) self._write_lines(sink, self.lines) @@ -795,7 +795,7 @@ class TestNativeTextFileSink(unittest.TestCase): with bz2.BZ2File(self.path, 'r') as f: self.assertEqual(f.read().splitlines(), self.lines) - def test_write_text_bzip_file_auto(self): + def test_write_text_bzip2_file_auto(self): self.path = tempfile.NamedTemporaryFile(suffix='.bz2').name sink = fileio.NativeTextFileSink(self.path) self._write_lines(sink, self.lines) @@ -803,7 +803,7 @@ class TestNativeTextFileSink(unittest.TestCase): with bz2.BZ2File(self.path, 'r') as f: self.assertEqual(f.read().splitlines(), self.lines) - def test_write_text_bzip_file_empty(self): + def test_write_text_bzip2_file_empty(self): sink = fileio.NativeTextFileSink( self.path, compression_type=fileio.CompressionTypes.BZIP2) self._write_lines(sink, [])