Repository: incubator-beam Updated Branches: refs/heads/python-sdk 351c3831d -> c155ef0eb
Increased the GCS buffer size from 1MB to 8MB and introduced a 128kB buffer for the pipe. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1f1fa06 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1f1fa06 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1f1fa06 Branch: refs/heads/python-sdk Commit: a1f1fa06ee8683273182548e7eb2d6612040d2bf Parents: 351c383 Author: Marian Dvorsky <mari...@google.com> Authored: Thu Jul 28 13:02:15 2016 -0700 Committer: Marian Dvorsky <mari...@google.com> Committed: Thu Jul 28 13:02:15 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/gcsio.py | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1f1fa06/sdks/python/apache_beam/io/gcsio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py index 9377266..88fcfb8 100644 --- a/sdks/python/apache_beam/io/gcsio.py +++ b/sdks/python/apache_beam/io/gcsio.py @@ -49,6 +49,7 @@ except ImportError: DEFAULT_READ_BUFFER_SIZE = 1024 * 1024 +WRITE_CHUNK_SIZE = 8 * 1024 * 1024 def parse_gcs_path(gcs_path): @@ -546,6 +547,10 @@ class GcsBufferedWriter(object): self.closed = False self.position = 0 + # A small buffer to avoid CPU-heavy per-write pipe calls. + self.write_buffer = bytearray() + self.write_buffer_size = 128 * 1024 + # Set up communication with uploading thread. parent_conn, child_conn = multiprocessing.Pipe() self.child_conn = child_conn @@ -557,7 +562,7 @@ class GcsBufferedWriter(object): bucket=self.bucket, name=self.name)) self.upload = transfer.Upload(GcsBufferedWriter.PipeStream(child_conn), - mime_type) + mime_type, chunksize=WRITE_CHUNK_SIZE) self.upload.strategy = transfer.RESUMABLE_UPLOAD # Start uploading thread. @@ -598,14 +603,10 @@ class GcsBufferedWriter(object): self._check_open() if not data: return - try: - self.conn.send_bytes(data) - self.position += len(data) - except IOError: - if self.upload_thread.last_error: - raise self.upload_thread.last_error # pylint: disable=raising-bad-type - else: - raise + self.write_buffer.extend(data) + if len(self.write_buffer) > self.write_buffer_size: + self._flush_write_buffer() + self.position += len(data) def tell(self): """Return the total number of bytes passed to write() so far.""" @@ -613,6 +614,7 @@ class GcsBufferedWriter(object): def close(self): """Close the current GCS file.""" + self._flush_write_buffer() self.closed = True self.conn.close() self.upload_thread.join() @@ -635,3 +637,13 @@ class GcsBufferedWriter(object): def writable(self): return True + + def _flush_write_buffer(self): + try: + self.conn.send_bytes(buffer(self.write_buffer)) + self.write_buffer = bytearray() + except IOError: + if self.upload_thread.last_error: + raise self.upload_thread.last_error # pylint: disable=raising-bad-type + else: + raise