http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/io/google_cloud_platform/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/google_cloud_platform/gcsio.py 
b/sdks/python/apache_beam/io/google_cloud_platform/gcsio.py
deleted file mode 100644
index 195fafc..0000000
--- a/sdks/python/apache_beam/io/google_cloud_platform/gcsio.py
+++ /dev/null
@@ -1,871 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-"""Google Cloud Storage client.
-
-This library evolved from the Google App Engine GCS client available at
-https://github.com/GoogleCloudPlatform/appengine-gcs-client.
-"""
-
-import cStringIO
-import errno
-import fnmatch
-import logging
-import multiprocessing
-import os
-import Queue
-import re
-import threading
-import traceback
-
-import apitools.base.py.transfer as transfer
-from apitools.base.py.batch import BatchApiRequest
-from apitools.base.py.exceptions import HttpError
-
-from apache_beam.internal import auth
-from apache_beam.utils import retry
-
-# Issue a friendlier error message if the storage library is not available.
-# TODO(silviuc): Remove this guard when storage is available everywhere.
-try:
-  # pylint: disable=wrong-import-order, wrong-import-position
-  from apache_beam.io.google_cloud_platform.internal.clients import storage
-except ImportError:
-  raise RuntimeError(
-      'Google Cloud Storage I/O not supported for this execution environment '
-      '(could not import storage API client).')
-
-# This is the size of each partial-file read operation from GCS.  This
-# parameter was chosen to give good throughput while keeping memory usage at
-# a reasonable level; the following table shows throughput reached when
-# reading files of a given size with a chosen buffer size and informed the
-# choice of the value, as of 11/2016:
-#
-# +---------------+------------+-------------+-------------+-------------+
-# |               | 50 MB file | 100 MB file | 200 MB file | 400 MB file |
-# +---------------+------------+-------------+-------------+-------------+
-# | 8 MB buffer   | 17.12 MB/s | 22.67 MB/s  | 23.81 MB/s  | 26.05 MB/s  |
-# | 16 MB buffer  | 24.21 MB/s | 42.70 MB/s  | 42.89 MB/s  | 46.92 MB/s  |
-# | 32 MB buffer  | 28.53 MB/s | 48.08 MB/s  | 54.30 MB/s  | 54.65 MB/s  |
-# | 400 MB buffer | 34.72 MB/s | 71.13 MB/s  | 79.13 MB/s  | 85.39 MB/s  |
-# +---------------+------------+-------------+-------------+-------------+
-DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
-
-# This is the number of seconds the library will wait for a partial-file read
-# operation from GCS to complete before retrying.
-DEFAULT_READ_SEGMENT_TIMEOUT_SECONDS = 60
-
-# This is the size of chunks used when writing to GCS.
-WRITE_CHUNK_SIZE = 8 * 1024 * 1024
-
-
-# Maximum number of operations permitted in GcsIO.copy_batch() and
-# GcsIO.delete_batch().
-MAX_BATCH_OPERATION_SIZE = 100
-
-
-def parse_gcs_path(gcs_path):
-  """Return the bucket and object names of the given gs:// path."""
-  match = re.match('^gs://([^/]+)/(.+)$', gcs_path)
-  if match is None:
-    raise ValueError('GCS path must be in the form gs://<bucket>/<object>.')
-  return match.group(1), match.group(2)
-
-
-class GcsIOError(IOError, retry.PermanentException):
-  """GCS IO error that should not be retried."""
-  pass
-
-
-class GcsIO(object):
-  """Google Cloud Storage I/O client."""
-
-  def __new__(cls, storage_client=None):
-    if storage_client:
-      return super(GcsIO, cls).__new__(cls, storage_client)
-    else:
-      # Create a single storage client for each thread.  We would like to avoid
-      # creating more than one storage client for each thread, since each
-      # initialization requires the relatively expensive step of initializing
-      # credentaials.
-      local_state = threading.local()
-      if getattr(local_state, 'gcsio_instance', None) is None:
-        credentials = auth.get_service_credentials()
-        storage_client = storage.StorageV1(credentials=credentials)
-        local_state.gcsio_instance = (
-            super(GcsIO, cls).__new__(cls, storage_client))
-        local_state.gcsio_instance.client = storage_client
-      return local_state.gcsio_instance
-
-  def __init__(self, storage_client=None):
-    # We must do this check on storage_client because the client attribute may
-    # have already been set in __new__ for the singleton case when
-    # storage_client is None.
-    if storage_client is not None:
-      self.client = storage_client
-
-  def open(self,
-           filename,
-           mode='r',
-           read_buffer_size=DEFAULT_READ_BUFFER_SIZE,
-           mime_type='application/octet-stream'):
-    """Open a GCS file path for reading or writing.
-
-    Args:
-      filename: GCS file path in the form gs://<bucket>/<object>.
-      mode: 'r' for reading or 'w' for writing.
-      read_buffer_size: Buffer size to use during read operations.
-      mime_type: Mime type to set for write operations.
-
-    Returns:
-      file object.
-
-    Raises:
-      ValueError: Invalid open file mode.
-    """
-    if mode == 'r' or mode == 'rb':
-      return GcsBufferedReader(self.client, filename, mode=mode,
-                               buffer_size=read_buffer_size)
-    elif mode == 'w' or mode == 'wb':
-      return GcsBufferedWriter(self.client, filename, mode=mode,
-                               mime_type=mime_type)
-    else:
-      raise ValueError('Invalid file open mode: %s.' % mode)
-
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def glob(self, pattern, limit=None):
-    """Return the GCS path names matching a given path name pattern.
-
-    Path name patterns are those recognized by fnmatch.fnmatch().  The path
-    can contain glob characters (*, ?, and [...] sets).
-
-    Args:
-      pattern: GCS file path pattern in the form gs://<bucket>/<name_pattern>.
-
-    Returns:
-      list of GCS file paths matching the given pattern.
-    """
-    bucket, name_pattern = parse_gcs_path(pattern)
-    # Get the prefix with which we can list objects in the given bucket.
-    prefix = re.match('^[^[*?]*', name_pattern).group(0)
-    request = storage.StorageObjectsListRequest(bucket=bucket, prefix=prefix)
-    object_paths = []
-    while True:
-      response = self.client.objects.List(request)
-      for item in response.items:
-        if fnmatch.fnmatch(item.name, name_pattern):
-          object_paths.append('gs://%s/%s' % (item.bucket, item.name))
-      if response.nextPageToken:
-        request.pageToken = response.nextPageToken
-        if limit is not None and len(object_paths) >= limit:
-          break
-      else:
-        break
-    return object_paths[:limit]
-
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def delete(self, path):
-    """Deletes the object at the given GCS path.
-
-    Args:
-      path: GCS file path pattern in the form gs://<bucket>/<name>.
-    """
-    bucket, object_path = parse_gcs_path(path)
-    request = storage.StorageObjectsDeleteRequest(
-        bucket=bucket, object=object_path)
-    try:
-      self.client.objects.Delete(request)
-    except HttpError as http_error:
-      if http_error.status_code == 404:
-        # Return success when the file doesn't exist anymore for idempotency.
-        return
-      raise
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
-  def delete_batch(self, paths):
-    """Deletes the objects at the given GCS paths.
-
-    Args:
-      paths: List of GCS file path patterns in the form gs://<bucket>/<name>,
-             not to exceed MAX_BATCH_OPERATION_SIZE in length.
-
-    Returns: List of tuples of (path, exception) in the same order as the paths
-             argument, where exception is None if the operation succeeded or
-             the relevant exception if the operation failed.
-    """
-    if not paths:
-      return []
-    batch_request = BatchApiRequest(
-        retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES)
-    for path in paths:
-      bucket, object_path = parse_gcs_path(path)
-      request = storage.StorageObjectsDeleteRequest(
-          bucket=bucket, object=object_path)
-      batch_request.Add(self.client.objects, 'Delete', request)
-    api_calls = batch_request.Execute(self.client._http)  # pylint: 
disable=protected-access
-    result_statuses = []
-    for i, api_call in enumerate(api_calls):
-      path = paths[i]
-      exception = None
-      if api_call.is_error:
-        exception = api_call.exception
-        # Return success when the file doesn't exist anymore for idempotency.
-        if isinstance(exception, HttpError) and exception.status_code == 404:
-          exception = None
-      result_statuses.append((path, exception))
-    return result_statuses
-
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(self, src, dest):
-    """Copies the given GCS object from src to dest.
-
-    Args:
-      src: GCS file path pattern in the form gs://<bucket>/<name>.
-      dest: GCS file path pattern in the form gs://<bucket>/<name>.
-    """
-    src_bucket, src_path = parse_gcs_path(src)
-    dest_bucket, dest_path = parse_gcs_path(dest)
-    request = storage.StorageObjectsCopyRequest(
-        sourceBucket=src_bucket,
-        sourceObject=src_path,
-        destinationBucket=dest_bucket,
-        destinationObject=dest_path)
-    try:
-      self.client.objects.Copy(request)
-    except HttpError as http_error:
-      if http_error.status_code == 404:
-        # This is a permanent error that should not be retried.  Note that
-        # FileSink.finalize_write expects an IOError when the source file does
-        # not exist.
-        raise GcsIOError(errno.ENOENT, 'Source file not found: %s' % src)
-      raise
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
-  def copy_batch(self, src_dest_pairs):
-    """Copies the given GCS object from src to dest.
-
-    Args:
-      src_dest_pairs: list of (src, dest) tuples of gs://<bucket>/<name> files
-                      paths to copy from src to dest, not to exceed
-                      MAX_BATCH_OPERATION_SIZE in length.
-
-    Returns: List of tuples of (src, dest, exception) in the same order as the
-             src_dest_pairs argument, where exception is None if the operation
-             succeeded or the relevant exception if the operation failed.
-    """
-    if not src_dest_pairs:
-      return []
-    batch_request = BatchApiRequest(
-        retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES)
-    for src, dest in src_dest_pairs:
-      src_bucket, src_path = parse_gcs_path(src)
-      dest_bucket, dest_path = parse_gcs_path(dest)
-      request = storage.StorageObjectsCopyRequest(
-          sourceBucket=src_bucket,
-          sourceObject=src_path,
-          destinationBucket=dest_bucket,
-          destinationObject=dest_path)
-      batch_request.Add(self.client.objects, 'Copy', request)
-    api_calls = batch_request.Execute(self.client._http)  # pylint: 
disable=protected-access
-    result_statuses = []
-    for i, api_call in enumerate(api_calls):
-      src, dest = src_dest_pairs[i]
-      exception = None
-      if api_call.is_error:
-        exception = api_call.exception
-        # Translate 404 to the appropriate not found exception.
-        if isinstance(exception, HttpError) and exception.status_code == 404:
-          exception = (
-              GcsIOError(errno.ENOENT, 'Source file not found: %s' % src))
-      result_statuses.append((src, dest, exception))
-    return result_statuses
-
-  # We intentionally do not decorate this method with a retry, since the
-  # underlying copy and delete operations are already idempotent operations
-  # protected by retry decorators.
-  def copytree(self, src, dest):
-    """Renames the given GCS "directory" recursively from src to dest.
-
-    Args:
-      src: GCS file path pattern in the form gs://<bucket>/<name>/.
-      dest: GCS file path pattern in the form gs://<bucket>/<name>/.
-    """
-    assert src.endswith('/')
-    assert dest.endswith('/')
-    for entry in self.glob(src + '*'):
-      rel_path = entry[len(src):]
-      self.copy(entry, dest + rel_path)
-
-  # We intentionally do not decorate this method with a retry, since the
-  # underlying copy and delete operations are already idempotent operations
-  # protected by retry decorators.
-  def rename(self, src, dest):
-    """Renames the given GCS object from src to dest.
-
-    Args:
-      src: GCS file path pattern in the form gs://<bucket>/<name>.
-      dest: GCS file path pattern in the form gs://<bucket>/<name>.
-    """
-    self.copy(src, dest)
-    self.delete(src)
-
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def exists(self, path):
-    """Returns whether the given GCS object exists.
-
-    Args:
-      path: GCS file path pattern in the form gs://<bucket>/<name>.
-    """
-    bucket, object_path = parse_gcs_path(path)
-    try:
-      request = storage.StorageObjectsGetRequest(
-          bucket=bucket, object=object_path)
-      self.client.objects.Get(request)  # metadata
-      return True
-    except HttpError as http_error:
-      if http_error.status_code == 404:
-        # HTTP 404 indicates that the file did not exist
-        return False
-      else:
-        # We re-raise all other exceptions
-        raise
-
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def size(self, path):
-    """Returns the size of a single GCS object.
-
-    This method does not perform glob expansion. Hence the given path must be
-    for a single GCS object.
-
-    Returns: size of the GCS object in bytes.
-    """
-    bucket, object_path = parse_gcs_path(path)
-    request = storage.StorageObjectsGetRequest(
-        bucket=bucket, object=object_path)
-    return self.client.objects.Get(request).size
-
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def size_of_files_in_glob(self, pattern):
-    """Returns the size of all the files in the glob as a dictionary
-
-    Args:
-      path: a file path pattern that reads the size of all the files
-    """
-    bucket, name_pattern = parse_gcs_path(pattern)
-    # Get the prefix with which we can list objects in the given bucket.
-    prefix = re.match('^[^[*?]*', name_pattern).group(0)
-    request = storage.StorageObjectsListRequest(bucket=bucket, prefix=prefix)
-    file_sizes = {}
-    while True:
-      response = self.client.objects.List(request)
-      for item in response.items:
-        if fnmatch.fnmatch(item.name, name_pattern):
-          file_name = 'gs://%s/%s' % (item.bucket, item.name)
-          file_sizes[file_name] = item.size
-      if response.nextPageToken:
-        request.pageToken = response.nextPageToken
-      else:
-        break
-    return file_sizes
-
-
-# TODO: Consider using cStringIO instead of buffers and data_lists when 
reading.
-class GcsBufferedReader(object):
-  """A class for reading Google Cloud Storage files."""
-
-  def __init__(self,
-               client,
-               path,
-               mode='r',
-               buffer_size=DEFAULT_READ_BUFFER_SIZE,
-               segment_timeout=DEFAULT_READ_SEGMENT_TIMEOUT_SECONDS):
-    self.client = client
-    self.path = path
-    self.bucket, self.name = parse_gcs_path(path)
-    self.mode = mode
-    self.buffer_size = buffer_size
-    self.segment_timeout = segment_timeout
-
-    # Get object state.
-    self.get_request = (storage.StorageObjectsGetRequest(
-        bucket=self.bucket, object=self.name))
-    try:
-      metadata = self._get_object_metadata(self.get_request)
-    except HttpError as http_error:
-      if http_error.status_code == 404:
-        raise IOError(errno.ENOENT, 'Not found: %s' % self.path)
-      else:
-        logging.error('HTTP error while requesting file %s: %s', self.path,
-                      http_error)
-        raise
-    self.size = metadata.size
-
-    # Ensure read is from file of the correct generation.
-    self.get_request.generation = metadata.generation
-
-    # Initialize read buffer state.
-    self.download_stream = cStringIO.StringIO()
-    self.downloader = transfer.Download(
-        self.download_stream, auto_transfer=False, chunksize=self.buffer_size)
-    self.client.objects.Get(self.get_request, download=self.downloader)
-    self.position = 0
-    self.buffer = ''
-    self.buffer_start_position = 0
-    self.closed = False
-
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def _get_object_metadata(self, get_request):
-    return self.client.objects.Get(get_request)
-
-  def __iter__(self):
-    return self
-
-  def __next__(self):
-    """Read one line delimited by '\\n' from the file.
-    """
-    return self.next()
-
-  def next(self):
-    """Read one line delimited by '\\n' from the file.
-    """
-    line = self.readline()
-    if not line:
-      raise StopIteration
-    return line
-
-  def read(self, size=-1):
-    """Read data from a GCS file.
-
-    Args:
-      size: Number of bytes to read. Actual number of bytes read is always
-            equal to size unless EOF is reached. If size is negative or
-            unspecified, read the entire file.
-
-    Returns:
-      data read as str.
-
-    Raises:
-      IOError: When this buffer is closed.
-    """
-    return self._read_inner(size=size, readline=False)
-
-  def readline(self, size=-1):
-    """Read one line delimited by '\\n' from the file.
-
-    Mimics behavior of the readline() method on standard file objects.
-
-    A trailing newline character is kept in the string. It may be absent when a
-    file ends with an incomplete line. If the size argument is non-negative,
-    it specifies the maximum string size (counting the newline) to return.
-    A negative size is the same as unspecified. Empty string is returned
-    only when EOF is encountered immediately.
-
-    Args:
-      size: Maximum number of bytes to read. If not specified, readline stops
-        only on '\\n' or EOF.
-
-    Returns:
-      The data read as a string.
-
-    Raises:
-      IOError: When this buffer is closed.
-    """
-    return self._read_inner(size=size, readline=True)
-
-  def _read_inner(self, size=-1, readline=False):
-    """Shared implementation of read() and readline()."""
-    self._check_open()
-    if not self._remaining():
-      return ''
-
-    # Prepare to read.
-    data_list = []
-    if size is None:
-      size = -1
-    to_read = min(size, self._remaining())
-    if to_read < 0:
-      to_read = self._remaining()
-    break_after = False
-
-    while to_read > 0:
-      # If we have exhausted the buffer, get the next segment.
-      # TODO(ccy): We should consider prefetching the next block in another
-      # thread.
-      self._fetch_next_if_buffer_exhausted()
-
-      # Determine number of bytes to read from buffer.
-      buffer_bytes_read = self.position - self.buffer_start_position
-      bytes_to_read_from_buffer = min(
-          len(self.buffer) - buffer_bytes_read, to_read)
-
-      # If readline is set, we only want to read up to and including the next
-      # newline character.
-      if readline:
-        next_newline_position = self.buffer.find('\n', buffer_bytes_read,
-                                                 len(self.buffer))
-        if next_newline_position != -1:
-          bytes_to_read_from_buffer = (
-              1 + next_newline_position - buffer_bytes_read)
-          break_after = True
-
-      # Read bytes.
-      data_list.append(self.buffer[buffer_bytes_read:buffer_bytes_read +
-                                   bytes_to_read_from_buffer])
-      self.position += bytes_to_read_from_buffer
-      to_read -= bytes_to_read_from_buffer
-
-      if break_after:
-        break
-
-    return ''.join(data_list)
-
-  def _fetch_next_if_buffer_exhausted(self):
-    if not self.buffer or (
-        self.buffer_start_position + len(self.buffer) <= self.position):
-      bytes_to_request = min(self._remaining(), self.buffer_size)
-      self.buffer_start_position = self.position
-      retry_count = 0
-      while retry_count <= 10:
-        queue = Queue.Queue()
-        t = threading.Thread(target=self._fetch_to_queue,
-                             args=(queue, self._get_segment,
-                                   (self.position, bytes_to_request)))
-        t.daemon = True
-        t.start()
-        try:
-          result, exn, tb = queue.get(timeout=self.segment_timeout)
-        except Queue.Empty:
-          logging.warning(
-              ('Timed out fetching %d bytes from position %d of %s after %f '
-               'seconds; retrying...'), bytes_to_request, self.position,
-              self.path, self.segment_timeout)
-          retry_count += 1
-          # Reinitialize download objects.
-          self.download_stream = cStringIO.StringIO()
-          self.downloader = transfer.Download(
-              self.download_stream, auto_transfer=False,
-              chunksize=self.buffer_size)
-          self.client.objects.Get(self.get_request, download=self.downloader)
-          continue
-        if exn:
-          logging.error(
-              ('Exception while fetching %d bytes from position %d of %s: '
-               '%s\n%s'),
-              bytes_to_request, self.position, self.path, exn, tb)
-          raise exn
-        self.buffer = result
-        return
-      raise GcsIOError(
-          'Reached retry limit for _fetch_next_if_buffer_exhausted.')
-
-  def _fetch_to_queue(self, queue, func, args):
-    try:
-      value = func(*args)
-      queue.put((value, None, None))
-    except Exception as e:  # pylint: disable=broad-except
-      tb = traceback.format_exc()
-      queue.put((None, e, tb))
-
-  def _remaining(self):
-    return self.size - self.position
-
-  def close(self):
-    """Close the current GCS file."""
-    self.closed = True
-    self.download_stream = None
-    self.downloader = None
-    self.buffer = None
-
-  def _get_segment(self, start, size):
-    """Get the given segment of the current GCS file."""
-    if size == 0:
-      return ''
-    # The objects self.downloader and self.download_stream may be recreated if
-    # this call times out; we save them locally to avoid any threading issues.
-    downloader = self.downloader
-    download_stream = self.download_stream
-    end = start + size - 1
-    downloader.GetRange(start, end)
-    value = download_stream.getvalue()
-    # Clear the cStringIO object after we've read its contents.
-    download_stream.truncate(0)
-    assert len(value) == size
-    return value
-
-  def __enter__(self):
-    return self
-
-  def __exit__(self, exception_type, exception_value, traceback):
-    self.close()
-
-  def seek(self, offset, whence=os.SEEK_SET):
-    """Set the file's current offset.
-
-    Note if the new offset is out of bound, it is adjusted to either 0 or EOF.
-
-    Args:
-      offset: seek offset as number.
-      whence: seek mode. Supported modes are os.SEEK_SET (absolute seek),
-        os.SEEK_CUR (seek relative to the current position), and os.SEEK_END
-        (seek relative to the end, offset should be negative).
-
-    Raises:
-      IOError: When this buffer is closed.
-      ValueError: When whence is invalid.
-    """
-    self._check_open()
-
-    self.buffer = ''
-    self.buffer_start_position = -1
-
-    if whence == os.SEEK_SET:
-      self.position = offset
-    elif whence == os.SEEK_CUR:
-      self.position += offset
-    elif whence == os.SEEK_END:
-      self.position = self.size + offset
-    else:
-      raise ValueError('Whence mode %r is invalid.' % whence)
-
-    self.position = min(self.position, self.size)
-    self.position = max(self.position, 0)
-
-  def tell(self):
-    """Tell the file's current offset.
-
-    Returns:
-      current offset in reading this file.
-
-    Raises:
-      IOError: When this buffer is closed.
-    """
-    self._check_open()
-    return self.position
-
-  def _check_open(self):
-    if self.closed:
-      raise IOError('Buffer is closed.')
-
-  def seekable(self):
-    return True
-
-  def readable(self):
-    return True
-
-  def writable(self):
-    return False
-
-
-# TODO: Consider using cStringIO instead of buffers and data_lists when reading
-# and writing.
-class GcsBufferedWriter(object):
-  """A class for writing Google Cloud Storage files."""
-
-  class PipeStream(object):
-    """A class that presents a pipe connection as a readable stream."""
-
-    def __init__(self, recv_pipe):
-      self.conn = recv_pipe
-      self.closed = False
-      self.position = 0
-      self.remaining = ''
-
-    def read(self, size):
-      """Read data from the wrapped pipe connection.
-
-      Args:
-        size: Number of bytes to read. Actual number of bytes read is always
-              equal to size unless EOF is reached.
-
-      Returns:
-        data read as str.
-      """
-      data_list = []
-      bytes_read = 0
-      while bytes_read < size:
-        bytes_from_remaining = min(size - bytes_read, len(self.remaining))
-        data_list.append(self.remaining[0:bytes_from_remaining])
-        self.remaining = self.remaining[bytes_from_remaining:]
-        self.position += bytes_from_remaining
-        bytes_read += bytes_from_remaining
-        if not self.remaining:
-          try:
-            self.remaining = self.conn.recv_bytes()
-          except EOFError:
-            break
-      return ''.join(data_list)
-
-    def tell(self):
-      """Tell the file's current offset.
-
-      Returns:
-        current offset in reading this file.
-
-      Raises:
-        IOError: When this stream is closed.
-      """
-      self._check_open()
-      return self.position
-
-    def seek(self, offset, whence=os.SEEK_SET):
-      # The apitools.base.py.transfer.Upload class insists on seeking to the 
end
-      # of a stream to do a check before completing an upload, so we must have
-      # this no-op method here in that case.
-      if whence == os.SEEK_END and offset == 0:
-        return
-      elif whence == os.SEEK_SET and offset == self.position:
-        return
-      raise NotImplementedError
-
-    def _check_open(self):
-      if self.closed:
-        raise IOError('Stream is closed.')
-
-  def __init__(self,
-               client,
-               path,
-               mode='w',
-               mime_type='application/octet-stream'):
-    self.client = client
-    self.path = path
-    self.mode = mode
-    self.bucket, self.name = parse_gcs_path(path)
-    self.mode = mode
-
-    self.closed = False
-    self.position = 0
-
-    # A small buffer to avoid CPU-heavy per-write pipe calls.
-    self.write_buffer = bytearray()
-    self.write_buffer_size = 128 * 1024
-
-    # Set up communication with uploading thread.
-    parent_conn, child_conn = multiprocessing.Pipe()
-    self.child_conn = child_conn
-    self.conn = parent_conn
-
-    # Set up uploader.
-    self.insert_request = (storage.StorageObjectsInsertRequest(
-        bucket=self.bucket, name=self.name))
-    self.upload = transfer.Upload(
-        GcsBufferedWriter.PipeStream(child_conn),
-        mime_type,
-        chunksize=WRITE_CHUNK_SIZE)
-    self.upload.strategy = transfer.RESUMABLE_UPLOAD
-
-    # Start uploading thread.
-    self.upload_thread = threading.Thread(target=self._start_upload)
-    self.upload_thread.daemon = True
-    self.upload_thread.last_error = None
-    self.upload_thread.start()
-
-  # TODO(silviuc): Refactor so that retry logic can be applied.
-  # There is retry logic in the underlying transfer library but we should make
-  # it more explicit so we can control the retry parameters.
-  @retry.no_retries  # Using no_retries marks this as an integration point.
-  def _start_upload(self):
-    # This starts the uploader thread.  We are forced to run the uploader in
-    # another thread because the apitools uploader insists on taking a stream
-    # as input. Happily, this also means we get asynchronous I/O to GCS.
-    #
-    # The uploader by default transfers data in chunks of 1024 * 1024 bytes at
-    # a time, buffering writes until that size is reached.
-    try:
-      self.client.objects.Insert(self.insert_request, upload=self.upload)
-    except Exception as e:  # pylint: disable=broad-except
-      logging.error('Error in _start_upload while inserting file %s: %s',
-                    self.path, traceback.format_exc())
-      self.upload_thread.last_error = e
-    finally:
-      self.child_conn.close()
-
-  def write(self, data):
-    """Write data to a GCS file.
-
-    Args:
-      data: data to write as str.
-
-    Raises:
-      IOError: When this buffer is closed.
-    """
-    self._check_open()
-    if not data:
-      return
-    self.write_buffer.extend(data)
-    if len(self.write_buffer) > self.write_buffer_size:
-      self._flush_write_buffer()
-    self.position += len(data)
-
-  def flush(self):
-    """Flushes any internal buffer to the underlying GCS file."""
-    self._check_open()
-    self._flush_write_buffer()
-
-  def tell(self):
-    """Return the total number of bytes passed to write() so far."""
-    return self.position
-
-  def close(self):
-    """Close the current GCS file."""
-    if self.closed:
-      logging.warn('Channel for %s is not open.', self.path)
-      return
-
-    self._flush_write_buffer()
-    self.closed = True
-    self.conn.close()
-    self.upload_thread.join()
-    # Check for exception since the last _flush_write_buffer() call.
-    if self.upload_thread.last_error:
-      raise self.upload_thread.last_error  # pylint: disable=raising-bad-type
-
-  def __enter__(self):
-    return self
-
-  def __exit__(self, exception_type, exception_value, traceback):
-    self.close()
-
-  def _check_open(self):
-    if self.closed:
-      raise IOError('Buffer is closed.')
-
-  def seekable(self):
-    return False
-
-  def readable(self):
-    return False
-
-  def writable(self):
-    return True
-
-  def _flush_write_buffer(self):
-    try:
-      self.conn.send_bytes(buffer(self.write_buffer))
-      self.write_buffer = bytearray()
-    except IOError:
-      if self.upload_thread.last_error:
-        raise self.upload_thread.last_error  # pylint: disable=raising-bad-type
-      else:
-        raise

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/io/google_cloud_platform/gcsio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/google_cloud_platform/gcsio_test.py 
b/sdks/python/apache_beam/io/google_cloud_platform/gcsio_test.py
deleted file mode 100644
index 3a66e9a..0000000
--- a/sdks/python/apache_beam/io/google_cloud_platform/gcsio_test.py
+++ /dev/null
@@ -1,796 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-"""Tests for Google Cloud Storage client."""
-
-import errno
-import logging
-import multiprocessing
-import os
-import random
-import threading
-import time
-import unittest
-
-import httplib2
-import mock
-
-from apache_beam.io import gcsio
-from apache_beam.io.google_cloud_platform.internal.clients import storage
-
-# Protect against environments where apitools library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
-try:
-  from apitools.base.py.exceptions import HttpError
-except ImportError:
-  HttpError = None
-# pylint: enable=wrong-import-order, wrong-import-position
-
-
-class FakeGcsClient(object):
-  # Fake storage client.  Usage in gcsio.py is client.objects.Get(...) and
-  # client.objects.Insert(...).
-
-  def __init__(self):
-    self.objects = FakeGcsObjects()
-    # Referenced in GcsIO.batch_copy() and GcsIO.batch_delete().
-    self._http = object()
-
-
-class FakeFile(object):
-
-  def __init__(self, bucket, obj, contents, generation):
-    self.bucket = bucket
-    self.object = obj
-    self.contents = contents
-    self.generation = generation
-
-  def get_metadata(self):
-    return storage.Object(
-        bucket=self.bucket,
-        name=self.object,
-        generation=self.generation,
-        size=len(self.contents))
-
-
-class FakeGcsObjects(object):
-
-  def __init__(self):
-    self.files = {}
-    # Store the last generation used for a given object name.  Note that this
-    # has to persist even past the deletion of the object.
-    self.last_generation = {}
-    self.list_page_tokens = {}
-
-  def add_file(self, f):
-    self.files[(f.bucket, f.object)] = f
-    self.last_generation[(f.bucket, f.object)] = f.generation
-
-  def get_file(self, bucket, obj):
-    return self.files.get((bucket, obj), None)
-
-  def delete_file(self, bucket, obj):
-    del self.files[(bucket, obj)]
-
-  def get_last_generation(self, bucket, obj):
-    return self.last_generation.get((bucket, obj), 0)
-
-  def Get(self, get_request, download=None):  # pylint: disable=invalid-name
-    f = self.get_file(get_request.bucket, get_request.object)
-    if f is None:
-      # Failing with a HTTP 404 if file does not exist.
-      raise HttpError({'status': 404}, None, None)
-    if download is None:
-      return f.get_metadata()
-    else:
-      stream = download.stream
-
-      def get_range_callback(start, end):
-        assert start >= 0 and end >= start and end < len(f.contents)
-        stream.write(f.contents[start:end + 1])
-
-      download.GetRange = get_range_callback
-
-  def Insert(self, insert_request, upload=None):  # pylint: 
disable=invalid-name
-    assert upload is not None
-    generation = self.get_last_generation(insert_request.bucket,
-                                          insert_request.name) + 1
-    f = FakeFile(insert_request.bucket, insert_request.name, '', generation)
-
-    # Stream data into file.
-    stream = upload.stream
-    data_list = []
-    while True:
-      data = stream.read(1024 * 1024)
-      if not data:
-        break
-      data_list.append(data)
-    f.contents = ''.join(data_list)
-
-    self.add_file(f)
-
-  def Copy(self, copy_request):  # pylint: disable=invalid-name
-    src_file = self.get_file(copy_request.sourceBucket,
-                             copy_request.sourceObject)
-    if not src_file:
-      raise HttpError(
-          httplib2.Response({'status': '404'}), '404 Not Found',
-          'https://fake/url')
-    generation = self.get_last_generation(copy_request.destinationBucket,
-                                          copy_request.destinationObject) + 1
-    dest_file = FakeFile(copy_request.destinationBucket,
-                         copy_request.destinationObject, src_file.contents,
-                         generation)
-    self.add_file(dest_file)
-
-  def Delete(self, delete_request):  # pylint: disable=invalid-name
-    # Here, we emulate the behavior of the GCS service in raising a 404 error
-    # if this object already exists.
-    if self.get_file(delete_request.bucket, delete_request.object):
-      self.delete_file(delete_request.bucket, delete_request.object)
-    else:
-      raise HttpError(
-          httplib2.Response({'status': '404'}), '404 Not Found',
-          'https://fake/url')
-
-  def List(self, list_request):  # pylint: disable=invalid-name
-    bucket = list_request.bucket
-    prefix = list_request.prefix or ''
-    matching_files = []
-    for file_bucket, file_name in sorted(iter(self.files)):
-      if bucket == file_bucket and file_name.startswith(prefix):
-        file_object = self.files[(file_bucket, file_name)].get_metadata()
-        matching_files.append(file_object)
-
-    # Handle pagination.
-    items_per_page = 5
-    if not list_request.pageToken:
-      range_start = 0
-    else:
-      if list_request.pageToken not in self.list_page_tokens:
-        raise ValueError('Invalid page token.')
-      range_start = self.list_page_tokens[list_request.pageToken]
-      del self.list_page_tokens[list_request.pageToken]
-
-    result = storage.Objects(
-        items=matching_files[range_start:range_start + items_per_page])
-    if range_start + items_per_page < len(matching_files):
-      next_range_start = range_start + items_per_page
-      next_page_token = '_page_token_%s_%s_%d' % (bucket, prefix,
-                                                  next_range_start)
-      self.list_page_tokens[next_page_token] = next_range_start
-      result.nextPageToken = next_page_token
-    return result
-
-
-class FakeApiCall(object):
-
-  def __init__(self, exception):
-    self.exception = exception
-    self.is_error = exception is not None
-
-
-class FakeBatchApiRequest(object):
-
-  def __init__(self, **unused_kwargs):
-    self.operations = []
-
-  def Add(self, service, method, request):  # pylint: disable=invalid-name
-    self.operations.append((service, method, request))
-
-  def Execute(self, unused_http, **unused_kwargs):  # pylint: 
disable=invalid-name
-    api_calls = []
-    for service, method, request in self.operations:
-      exception = None
-      try:
-        getattr(service, method)(request)
-      except Exception as e:  # pylint: disable=broad-except
-        exception = e
-      api_calls.append(FakeApiCall(exception))
-    return api_calls
-
-
-@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
-class TestGCSPathParser(unittest.TestCase):
-
-  def test_gcs_path(self):
-    self.assertEqual(
-        gcsio.parse_gcs_path('gs://bucket/name'), ('bucket', 'name'))
-    self.assertEqual(
-        gcsio.parse_gcs_path('gs://bucket/name/sub'), ('bucket', 'name/sub'))
-
-  def test_bad_gcs_path(self):
-    self.assertRaises(ValueError, gcsio.parse_gcs_path, 'gs://')
-    self.assertRaises(ValueError, gcsio.parse_gcs_path, 'gs://bucket')
-    self.assertRaises(ValueError, gcsio.parse_gcs_path, 'gs://bucket/')
-    self.assertRaises(ValueError, gcsio.parse_gcs_path, 'gs:///name')
-    self.assertRaises(ValueError, gcsio.parse_gcs_path, 'gs:///')
-    self.assertRaises(ValueError, gcsio.parse_gcs_path, 'gs:/blah/bucket/name')
-
-
-@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
-class TestGCSIO(unittest.TestCase):
-
-  def _insert_random_file(self, client, path, size, generation=1):
-    bucket, name = gcsio.parse_gcs_path(path)
-    f = FakeFile(bucket, name, os.urandom(size), generation)
-    client.objects.add_file(f)
-    return f
-
-  def setUp(self):
-    self.client = FakeGcsClient()
-    self.gcs = gcsio.GcsIO(self.client)
-
-  def test_exists(self):
-    file_name = 'gs://gcsio-test/dummy_file'
-    file_size = 1234
-    self._insert_random_file(self.client, file_name, file_size)
-    self.assertFalse(self.gcs.exists(file_name + 'xyz'))
-    self.assertTrue(self.gcs.exists(file_name))
-
-  @mock.patch.object(FakeGcsObjects, 'Get')
-  def test_exists_failure(self, mock_get):
-    # Raising an error other than 404. Raising 404 is a valid failure for
-    # exists() call.
-    mock_get.side_effect = HttpError({'status': 400}, None, None)
-    file_name = 'gs://gcsio-test/dummy_file'
-    file_size = 1234
-    self._insert_random_file(self.client, file_name, file_size)
-    with self.assertRaises(HttpError) as cm:
-      self.gcs.exists(file_name)
-    self.assertEquals(400, cm.exception.status_code)
-
-  def test_size(self):
-    file_name = 'gs://gcsio-test/dummy_file'
-    file_size = 1234
-
-    self._insert_random_file(self.client, file_name, file_size)
-    self.assertTrue(self.gcs.exists(file_name))
-    self.assertEqual(1234, self.gcs.size(file_name))
-
-  def test_file_mode(self):
-    file_name = 'gs://gcsio-test/dummy_mode_file'
-    with self.gcs.open(file_name, 'wb') as f:
-      assert f.mode == 'wb'
-    with self.gcs.open(file_name, 'rb') as f:
-      assert f.mode == 'rb'
-
-  def test_bad_file_modes(self):
-    file_name = 'gs://gcsio-test/dummy_mode_file'
-    with self.assertRaises(ValueError):
-      self.gcs.open(file_name, 'w+')
-    with self.assertRaises(ValueError):
-      self.gcs.open(file_name, 'r+b')
-
-  def test_empty_batches(self):
-    self.assertEqual([], self.gcs.copy_batch([]))
-    self.assertEqual([], self.gcs.delete_batch([]))
-
-  def test_delete(self):
-    file_name = 'gs://gcsio-test/delete_me'
-    file_size = 1024
-
-    # Test deletion of non-existent file.
-    self.gcs.delete(file_name)
-
-    self._insert_random_file(self.client, file_name, file_size)
-    self.assertTrue(
-        gcsio.parse_gcs_path(file_name) in self.client.objects.files)
-
-    self.gcs.delete(file_name)
-
-    self.assertFalse(
-        gcsio.parse_gcs_path(file_name) in self.client.objects.files)
-
-  @mock.patch('apache_beam.io.gcsio.BatchApiRequest')
-  def test_delete_batch(self, *unused_args):
-    gcsio.BatchApiRequest = FakeBatchApiRequest
-    file_name_pattern = 'gs://gcsio-test/delete_me_%d'
-    file_size = 1024
-    num_files = 10
-
-    # Test deletion of non-existent files.
-    result = self.gcs.delete_batch(
-        [file_name_pattern % i for i in range(num_files)])
-    self.assertTrue(result)
-    for i, (file_name, exception) in enumerate(result):
-      self.assertEqual(file_name, file_name_pattern % i)
-      self.assertEqual(exception, None)
-      self.assertFalse(self.gcs.exists(file_name_pattern % i))
-
-    # Insert some files.
-    for i in range(num_files):
-      self._insert_random_file(self.client, file_name_pattern % i, file_size)
-
-    # Check files inserted properly.
-    for i in range(num_files):
-      self.assertTrue(self.gcs.exists(file_name_pattern % i))
-
-    # Execute batch delete.
-    self.gcs.delete_batch([file_name_pattern % i for i in range(num_files)])
-
-    # Check files deleted properly.
-    for i in range(num_files):
-      self.assertFalse(self.gcs.exists(file_name_pattern % i))
-
-  def test_copy(self):
-    src_file_name = 'gs://gcsio-test/source'
-    dest_file_name = 'gs://gcsio-test/dest'
-    file_size = 1024
-    self._insert_random_file(self.client, src_file_name, file_size)
-    self.assertTrue(
-        gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
-    self.assertFalse(
-        gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
-
-    self.gcs.copy(src_file_name, dest_file_name)
-
-    self.assertTrue(
-        gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
-    self.assertTrue(
-        gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
-
-    self.assertRaises(IOError, self.gcs.copy, 'gs://gcsio-test/non-existent',
-                      'gs://gcsio-test/non-existent-destination')
-
-  @mock.patch('apache_beam.io.gcsio.BatchApiRequest')
-  def test_copy_batch(self, *unused_args):
-    gcsio.BatchApiRequest = FakeBatchApiRequest
-    from_name_pattern = 'gs://gcsio-test/copy_me_%d'
-    to_name_pattern = 'gs://gcsio-test/destination_%d'
-    file_size = 1024
-    num_files = 10
-
-    # Test copy of non-existent files.
-    result = self.gcs.copy_batch(
-        [(from_name_pattern % i, to_name_pattern % i)
-         for i in range(num_files)])
-    self.assertTrue(result)
-    for i, (src, dest, exception) in enumerate(result):
-      self.assertEqual(src, from_name_pattern % i)
-      self.assertEqual(dest, to_name_pattern % i)
-      self.assertTrue(isinstance(exception, IOError))
-      self.assertEqual(exception.errno, errno.ENOENT)
-      self.assertFalse(self.gcs.exists(from_name_pattern % i))
-      self.assertFalse(self.gcs.exists(to_name_pattern % i))
-
-    # Insert some files.
-    for i in range(num_files):
-      self._insert_random_file(self.client, from_name_pattern % i, file_size)
-
-    # Check files inserted properly.
-    for i in range(num_files):
-      self.assertTrue(self.gcs.exists(from_name_pattern % i))
-
-    # Execute batch copy.
-    self.gcs.copy_batch([(from_name_pattern % i, to_name_pattern % i)
-                         for i in range(num_files)])
-
-    # Check files copied properly.
-    for i in range(num_files):
-      self.assertTrue(self.gcs.exists(from_name_pattern % i))
-      self.assertTrue(self.gcs.exists(to_name_pattern % i))
-
-  def test_copytree(self):
-    src_dir_name = 'gs://gcsio-test/source/'
-    dest_dir_name = 'gs://gcsio-test/dest/'
-    file_size = 1024
-    paths = ['a', 'b/c', 'b/d']
-    for path in paths:
-      src_file_name = src_dir_name + path
-      dest_file_name = dest_dir_name + path
-      self._insert_random_file(self.client, src_file_name, file_size)
-      self.assertTrue(
-          gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
-      self.assertFalse(
-          gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
-
-    self.gcs.copytree(src_dir_name, dest_dir_name)
-
-    for path in paths:
-      src_file_name = src_dir_name + path
-      dest_file_name = dest_dir_name + path
-      self.assertTrue(
-          gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
-      self.assertTrue(
-          gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
-
-  def test_rename(self):
-    src_file_name = 'gs://gcsio-test/source'
-    dest_file_name = 'gs://gcsio-test/dest'
-    file_size = 1024
-    self._insert_random_file(self.client, src_file_name, file_size)
-    self.assertTrue(
-        gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
-    self.assertFalse(
-        gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
-
-    self.gcs.rename(src_file_name, dest_file_name)
-
-    self.assertFalse(
-        gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
-    self.assertTrue(
-        gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
-
-  def test_full_file_read(self):
-    file_name = 'gs://gcsio-test/full_file'
-    file_size = 5 * 1024 * 1024 + 100
-    random_file = self._insert_random_file(self.client, file_name, file_size)
-    f = self.gcs.open(file_name)
-    self.assertEqual(f.mode, 'r')
-    f.seek(0, os.SEEK_END)
-    self.assertEqual(f.tell(), file_size)
-    self.assertEqual(f.read(), '')
-    f.seek(0)
-    self.assertEqual(f.read(), random_file.contents)
-
-  def test_flaky_file_read(self):
-    file_name = 'gs://gcsio-test/flaky_file'
-    file_size = 5 * 1024 * 1024 + 100
-    random_file = self._insert_random_file(self.client, file_name, file_size)
-    f = self.gcs.open(file_name)
-    random.seed(0)
-    f.buffer_size = 1024 * 1024
-    f.segment_timeout = 0.01
-    self.assertEqual(f.mode, 'r')
-    f._real_get_segment = f._get_segment
-
-    def flaky_get_segment(start, size):
-      if random.randint(0, 3) == 1:
-        time.sleep(600)
-      return f._real_get_segment(start, size)
-
-    f._get_segment = flaky_get_segment
-    self.assertEqual(f.read(), random_file.contents)
-
-    # Test exception handling in file read.
-    def failing_get_segment(unused_start, unused_size):
-      raise IOError('Could not read.')
-
-    f._get_segment = failing_get_segment
-    f.seek(0)
-    with self.assertRaises(IOError):
-      f.read()
-
-    # Test retry limit in hanging file read.
-    def hanging_get_segment(unused_start, unused_size):
-      time.sleep(600)
-
-    f._get_segment = hanging_get_segment
-    f.seek(0)
-    with self.assertRaises(gcsio.GcsIOError):
-      f.read()
-
-  def test_file_random_seek(self):
-    file_name = 'gs://gcsio-test/seek_file'
-    file_size = 5 * 1024 * 1024 - 100
-    random_file = self._insert_random_file(self.client, file_name, file_size)
-
-    f = self.gcs.open(file_name)
-    random.seed(0)
-    for _ in range(0, 10):
-      a = random.randint(0, file_size - 1)
-      b = random.randint(0, file_size - 1)
-      start, end = min(a, b), max(a, b)
-      f.seek(start)
-      self.assertEqual(f.tell(), start)
-      self.assertEqual(
-          f.read(end - start + 1), random_file.contents[start:end + 1])
-      self.assertEqual(f.tell(), end + 1)
-
-  def test_file_iterator(self):
-    file_name = 'gs://gcsio-test/iterating_file'
-    lines = []
-    line_count = 10
-    for _ in range(line_count):
-      line_length = random.randint(100, 500)
-      line = os.urandom(line_length).replace('\n', ' ') + '\n'
-      lines.append(line)
-
-    contents = ''.join(lines)
-    bucket, name = gcsio.parse_gcs_path(file_name)
-    self.client.objects.add_file(FakeFile(bucket, name, contents, 1))
-
-    f = self.gcs.open(file_name)
-
-    read_lines = 0
-    for line in f:
-      read_lines += 1
-
-    self.assertEqual(read_lines, line_count)
-
-  def test_file_read_line(self):
-    file_name = 'gs://gcsio-test/read_line_file'
-    lines = []
-
-    # Set a small buffer size to exercise refilling the buffer.
-    # First line is carefully crafted so the newline falls as the last 
character
-    # of the buffer to exercise this code path.
-    read_buffer_size = 1024
-    lines.append('x' * 1023 + '\n')
-
-    for _ in range(1, 1000):
-      line_length = random.randint(100, 500)
-      line = os.urandom(line_length).replace('\n', ' ') + '\n'
-      lines.append(line)
-    contents = ''.join(lines)
-
-    file_size = len(contents)
-    bucket, name = gcsio.parse_gcs_path(file_name)
-    self.client.objects.add_file(FakeFile(bucket, name, contents, 1))
-
-    f = self.gcs.open(file_name, read_buffer_size=read_buffer_size)
-
-    # Test read of first two lines.
-    f.seek(0)
-    self.assertEqual(f.readline(), lines[0])
-    self.assertEqual(f.tell(), len(lines[0]))
-    self.assertEqual(f.readline(), lines[1])
-
-    # Test read at line boundary.
-    f.seek(file_size - len(lines[-1]) - 1)
-    self.assertEqual(f.readline(), '\n')
-
-    # Test read at end of file.
-    f.seek(file_size)
-    self.assertEqual(f.readline(), '')
-
-    # Test reads at random positions.
-    random.seed(0)
-    for _ in range(0, 10):
-      start = random.randint(0, file_size - 1)
-      line_index = 0
-      # Find line corresponding to start index.
-      chars_left = start
-      while True:
-        next_line_length = len(lines[line_index])
-        if chars_left - next_line_length < 0:
-          break
-        chars_left -= next_line_length
-        line_index += 1
-      f.seek(start)
-      self.assertEqual(f.readline(), lines[line_index][chars_left:])
-
-  def test_file_write(self):
-    file_name = 'gs://gcsio-test/write_file'
-    file_size = 5 * 1024 * 1024 + 2000
-    contents = os.urandom(file_size)
-    f = self.gcs.open(file_name, 'w')
-    self.assertEqual(f.mode, 'w')
-    f.write(contents[0:1000])
-    f.write(contents[1000:1024 * 1024])
-    f.write(contents[1024 * 1024:])
-    f.close()
-    bucket, name = gcsio.parse_gcs_path(file_name)
-    self.assertEqual(
-        self.client.objects.get_file(bucket, name).contents, contents)
-
-  def test_file_close(self):
-    file_name = 'gs://gcsio-test/close_file'
-    file_size = 5 * 1024 * 1024 + 2000
-    contents = os.urandom(file_size)
-    f = self.gcs.open(file_name, 'w')
-    self.assertEqual(f.mode, 'w')
-    f.write(contents)
-    f.close()
-    f.close()  # This should not crash.
-    bucket, name = gcsio.parse_gcs_path(file_name)
-    self.assertEqual(
-        self.client.objects.get_file(bucket, name).contents, contents)
-
-  def test_file_flush(self):
-    file_name = 'gs://gcsio-test/flush_file'
-    file_size = 5 * 1024 * 1024 + 2000
-    contents = os.urandom(file_size)
-    bucket, name = gcsio.parse_gcs_path(file_name)
-    f = self.gcs.open(file_name, 'w')
-    self.assertEqual(f.mode, 'w')
-    f.write(contents[0:1000])
-    f.flush()
-    f.write(contents[1000:1024 * 1024])
-    f.flush()
-    f.flush()  # Should be a NOOP.
-    f.write(contents[1024 * 1024:])
-    f.close()  # This should already call the equivalent of flush() in its 
body.
-    self.assertEqual(
-        self.client.objects.get_file(bucket, name).contents, contents)
-
-  def test_context_manager(self):
-    # Test writing with a context manager.
-    file_name = 'gs://gcsio-test/context_manager_file'
-    file_size = 1024
-    contents = os.urandom(file_size)
-    with self.gcs.open(file_name, 'w') as f:
-      f.write(contents)
-    bucket, name = gcsio.parse_gcs_path(file_name)
-    self.assertEqual(
-        self.client.objects.get_file(bucket, name).contents, contents)
-
-    # Test reading with a context manager.
-    with self.gcs.open(file_name) as f:
-      self.assertEqual(f.read(), contents)
-
-    # Test that exceptions are not swallowed by the context manager.
-    with self.assertRaises(ZeroDivisionError):
-      with self.gcs.open(file_name) as f:
-        f.read(0 / 0)
-
-  def test_glob(self):
-    bucket_name = 'gcsio-test'
-    object_names = [
-        'cow/cat/fish',
-        'cow/cat/blubber',
-        'cow/dog/blubber',
-        'apple/dog/blubber',
-        'apple/fish/blubber',
-        'apple/fish/blowfish',
-        'apple/fish/bambi',
-        'apple/fish/balloon',
-        'apple/fish/cat',
-        'apple/fish/cart',
-        'apple/fish/carl',
-        'apple/dish/bat',
-        'apple/dish/cat',
-        'apple/dish/carl',
-    ]
-    for object_name in object_names:
-      file_name = 'gs://%s/%s' % (bucket_name, object_name)
-      self._insert_random_file(self.client, file_name, 0)
-    test_cases = [
-        ('gs://gcsio-test/*', [
-            'cow/cat/fish',
-            'cow/cat/blubber',
-            'cow/dog/blubber',
-            'apple/dog/blubber',
-            'apple/fish/blubber',
-            'apple/fish/blowfish',
-            'apple/fish/bambi',
-            'apple/fish/balloon',
-            'apple/fish/cat',
-            'apple/fish/cart',
-            'apple/fish/carl',
-            'apple/dish/bat',
-            'apple/dish/cat',
-            'apple/dish/carl',
-        ]),
-        ('gs://gcsio-test/cow/*', [
-            'cow/cat/fish',
-            'cow/cat/blubber',
-            'cow/dog/blubber',
-        ]),
-        ('gs://gcsio-test/cow/ca*', [
-            'cow/cat/fish',
-            'cow/cat/blubber',
-        ]),
-        ('gs://gcsio-test/apple/[df]ish/ca*', [
-            'apple/fish/cat',
-            'apple/fish/cart',
-            'apple/fish/carl',
-            'apple/dish/cat',
-            'apple/dish/carl',
-        ]),
-        ('gs://gcsio-test/apple/fish/car?', [
-            'apple/fish/cart',
-            'apple/fish/carl',
-        ]),
-        ('gs://gcsio-test/apple/fish/b*', [
-            'apple/fish/blubber',
-            'apple/fish/blowfish',
-            'apple/fish/bambi',
-            'apple/fish/balloon',
-        ]),
-        ('gs://gcsio-test/apple/dish/[cb]at', [
-            'apple/dish/bat',
-            'apple/dish/cat',
-        ]),
-    ]
-    for file_pattern, expected_object_names in test_cases:
-      expected_file_names = ['gs://%s/%s' % (bucket_name, o)
-                             for o in expected_object_names]
-      self.assertEqual(
-          set(self.gcs.glob(file_pattern)), set(expected_file_names))
-
-    # Check if limits are followed correctly
-    limit = 3
-    for file_pattern, expected_object_names in test_cases:
-      expected_num_items = min(len(expected_object_names), limit)
-      self.assertEqual(
-          len(self.gcs.glob(file_pattern, limit)), expected_num_items)
-
-  def test_size_of_files_in_glob(self):
-    bucket_name = 'gcsio-test'
-    object_names = [
-        ('cow/cat/fish', 2),
-        ('cow/cat/blubber', 3),
-        ('cow/dog/blubber', 4),
-        ('apple/dog/blubber', 5),
-        ('apple/fish/blubber', 6),
-        ('apple/fish/blowfish', 7),
-        ('apple/fish/bambi', 8),
-        ('apple/fish/balloon', 9),
-        ('apple/fish/cat', 10),
-        ('apple/fish/cart', 11),
-        ('apple/fish/carl', 12),
-        ('apple/dish/bat', 13),
-        ('apple/dish/cat', 14),
-        ('apple/dish/carl', 15),
-    ]
-    for (object_name, size) in object_names:
-      file_name = 'gs://%s/%s' % (bucket_name, object_name)
-      self._insert_random_file(self.client, file_name, size)
-    test_cases = [
-        ('gs://gcsio-test/cow/*', [
-            ('cow/cat/fish', 2),
-            ('cow/cat/blubber', 3),
-            ('cow/dog/blubber', 4),
-        ]),
-        ('gs://gcsio-test/apple/fish/car?', [
-            ('apple/fish/cart', 11),
-            ('apple/fish/carl', 12),
-        ])
-    ]
-    for file_pattern, expected_object_names in test_cases:
-      expected_file_sizes = {'gs://%s/%s' % (bucket_name, o): s
-                             for (o, s) in expected_object_names}
-      self.assertEqual(
-          self.gcs.size_of_files_in_glob(file_pattern), expected_file_sizes)
-
-
-@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
-class TestPipeStream(unittest.TestCase):
-
-  def _read_and_verify(self, stream, expected, buffer_size):
-    data_list = []
-    bytes_read = 0
-    seen_last_block = False
-    while True:
-      data = stream.read(buffer_size)
-      self.assertLessEqual(len(data), buffer_size)
-      if len(data) < buffer_size:
-        # Test the constraint that the pipe stream returns less than the buffer
-        # size only when at the end of the stream.
-        if data:
-          self.assertFalse(seen_last_block)
-        seen_last_block = True
-      if not data:
-        break
-      data_list.append(data)
-      bytes_read += len(data)
-      self.assertEqual(stream.tell(), bytes_read)
-    self.assertEqual(''.join(data_list), expected)
-
-  def test_pipe_stream(self):
-    block_sizes = list(4**i for i in range(0, 12))
-    data_blocks = list(os.urandom(size) for size in block_sizes)
-    expected = ''.join(data_blocks)
-
-    buffer_sizes = [100001, 512 * 1024, 1024 * 1024]
-
-    for buffer_size in buffer_sizes:
-      parent_conn, child_conn = multiprocessing.Pipe()
-      stream = gcsio.GcsBufferedWriter.PipeStream(child_conn)
-      child_thread = threading.Thread(
-          target=self._read_and_verify, args=(stream, expected, buffer_size))
-      child_thread.start()
-      for data in data_blocks:
-        parent_conn.send_bytes(data)
-      parent_conn.close()
-      child_thread.join()
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/io/google_cloud_platform/internal/__init__.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/io/google_cloud_platform/internal/__init__.py 
b/sdks/python/apache_beam/io/google_cloud_platform/internal/__init__.py
deleted file mode 100644
index cce3aca..0000000
--- a/sdks/python/apache_beam/io/google_cloud_platform/internal/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/__init__.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/__init__.py 
b/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/__init__.py
deleted file mode 100644
index cce3aca..0000000
--- 
a/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/bigquery/__init__.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/bigquery/__init__.py
 
b/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/bigquery/__init__.py
deleted file mode 100644
index 2b78ef1..0000000
--- 
a/sdks/python/apache_beam/io/google_cloud_platform/internal/clients/bigquery/__init__.py
+++ /dev/null
@@ -1,33 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Common imports for generated bigquery client library."""
-# pylint:disable=wildcard-import
-
-import pkgutil
-
-# Protect against environments where apitools library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
-try:
-  from apitools.base.py import *
-  from 
apache_beam.io.google_cloud_platform.internal.clients.bigquery.bigquery_v2_client
 import *
-  from 
apache_beam.io.google_cloud_platform.internal.clients.bigquery.bigquery_v2_messages
 import *
-except ImportError:
-  pass
-# pylint: enable=wrong-import-order, wrong-import-position
-
-__path__ = pkgutil.extend_path(__path__, __name__)

Reply via email to