Repository: beam Updated Branches: refs/heads/python-sdk 2df3eda4d -> 5b031139a
Improve performance of fileio._CompressedFile Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/81e44b83 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/81e44b83 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/81e44b83 Branch: refs/heads/python-sdk Commit: 81e44b833ce54e44e6506eb028afe8564c46cf18 Parents: 2df3eda Author: Charles Chen <[email protected]> Authored: Wed Jan 4 15:48:30 2017 -0800 Committer: Robert Bradshaw <[email protected]> Committed: Thu Jan 5 10:36:42 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/io/fileio.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/81e44b83/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 4ee6a3e..6ea7844 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -664,6 +664,7 @@ class _CompressedFile(object): self._decompressor = bz2.BZ2Decompressor() else: self._decompressor = zlib.decompressobj(self._gzip_mask) + self._read_eof = False else: self._decompressor = None @@ -707,7 +708,7 @@ class _CompressedFile(object): def _fetch_to_internal_buffer(self, num_bytes): """Fetch up to num_bytes into the internal buffer.""" - while len(self._data) < num_bytes: + while not self._read_eof and len(self._data) < num_bytes: buf = self._file.read(self._read_size) if buf: self._data += self._decompressor.decompress(buf) @@ -728,10 +729,14 @@ class _CompressedFile(object): pass # All is as expected! else: self._data += self._decompressor.flush() + # Record that we have hit the end of file, so we won't unnecessarily + # repeat the completeness verification step above. + self._read_eof = True return def _read_from_internal_buffer(self, num_bytes): """Read up to num_bytes from the internal buffer.""" + # TODO: this can be optimized to avoid a string copy operation. result = self._data[:num_bytes] self._data = self._data[num_bytes:] return result
