Fixes several issues related to 'filebasedsource'. Adds a method 'fileio.ChannelFactory.size_in_bytes()' that can be used to determine the size of a single file. Implements this method for 'ChannelFactory' implementations for GCS and local files. Updates 'filebasedsource' to use this method when determining size of files.
Fixes a small bug in 'OffsetRangeTracker'. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c9c31fdf Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c9c31fdf Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c9c31fdf Branch: refs/heads/python-sdk Commit: c9c31fdffbe2365a8dedd3154726ab1c01cfa889 Parents: d898d56 Author: Chamikara Jayalath <chamik...@apache.org> Authored: Wed Jul 6 20:25:04 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Fri Jul 15 10:23:13 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/filebasedsource.py | 9 +-------- sdks/python/apache_beam/io/fileio.py | 14 ++++++++++++++ sdks/python/apache_beam/io/gcsio.py | 15 +++++++++++++++ sdks/python/apache_beam/io/gcsio_test.py | 8 ++++++++ sdks/python/apache_beam/io/range_trackers.py | 3 ++- sdks/python/apache_beam/io/range_trackers_test.py | 3 +++ 6 files changed, 43 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/filebasedsource.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index c877e44..aa0820d 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -26,7 +26,6 @@ For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``. """ from multiprocessing.pool import ThreadPool -import os import range_trackers from apache_beam.io import fileio @@ -131,13 +130,7 @@ class FileBasedSource(iobase.BoundedSource): def _estimate_sizes_in_parallel(file_names): def _calculate_size_of_file(file_name): - f = fileio.ChannelFactory.open( - file_name, 'rb', 'application/octet-stream') - try: - f.seek(0, os.SEEK_END) - return f.tell() - finally: - f.close() + return fileio.ChannelFactory.size_in_bytes(file_name) return ThreadPool(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION).map( _calculate_size_of_file, file_names) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 31b6a93..f532077 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -255,6 +255,20 @@ class ChannelFactory(object): else: return glob.glob(path) + @staticmethod + def size_in_bytes(path): + """Returns the size of a file in bytes. + + Args: + path: a string that gives the path of a single file. + """ + if path.startswith('gs://'): + # pylint: disable=wrong-import-order, wrong-import-position + from apache_beam.io import gcsio + return gcsio.GcsIO().size(path) + else: + return os.path.getsize(path) + class _CompressionType(object): """Object representing single compression type.""" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/gcsio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py index c61f251..10409c9 100644 --- a/sdks/python/apache_beam/io/gcsio.py +++ b/sdks/python/apache_beam/io/gcsio.py @@ -236,6 +236,21 @@ class GcsIO(object): except IOError: return False + @retry.with_exponential_backoff( + retry_filter=retry.retry_on_server_errors_and_timeout_filter) + def size(self, path): + """Returns the size of a single GCS object. + + This method does not perform glob expansion. Hence the given path must be + for a single GCS object. + + Returns: size of the GCS object in bytes. + """ + bucket, object_path = parse_gcs_path(path) + request = storage.StorageObjectsGetRequest(bucket=bucket, + object=object_path) + return self.client.objects.Get(request).size + class GcsBufferedReader(object): """A class for reading Google Cloud Storage files.""" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/gcsio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py index eeabb1a..7b15ef3 100644 --- a/sdks/python/apache_beam/io/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcsio_test.py @@ -189,6 +189,14 @@ class TestGCSIO(unittest.TestCase): self.client = FakeGcsClient() self.gcs = gcsio.GcsIO(self.client) + def test_size(self): + file_name = 'gs://gcsio-test/dummy_file' + file_size = 1234 + + self._insert_random_file(self.client, file_name, file_size) + self.assertTrue(self.gcs.exists(file_name)) + self.assertEqual(1234, self.gcs.size(file_name)) + def test_delete(self): file_name = 'gs://gcsio-test/delete_me' file_size = 1024 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/range_trackers.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py index c3481de..a736162 100644 --- a/sdks/python/apache_beam/io/range_trackers.py +++ b/sdks/python/apache_beam/io/range_trackers.py @@ -114,6 +114,7 @@ class OffsetRangeTracker(iobase.RangeTracker): self._last_record_start = record_start def try_split(self, split_offset): + assert isinstance(split_offset, (int, long)) with self._lock: if self._stop_offset == OffsetRangeTracker.OFFSET_INFINITY: logging.debug('refusing to split %r at %d: stop position unspecified', @@ -163,7 +164,7 @@ class OffsetRangeTracker(iobase.RangeTracker): raise Exception( 'get_position_for_fraction_consumed is not applicable for an ' 'unbounded range') - return (math.ceil(self.start_position() + fraction * ( + return int(math.ceil(self.start_position() + fraction * ( self.stop_position() - self.start_position()))) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/range_trackers_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/range_trackers_test.py b/sdks/python/apache_beam/io/range_trackers_test.py index ceeccd5..77733d3 100644 --- a/sdks/python/apache_beam/io/range_trackers_test.py +++ b/sdks/python/apache_beam/io/range_trackers_test.py @@ -98,6 +98,9 @@ class OffsetRangeTrackerTest(unittest.TestCase): def test_get_position_for_fraction_dense(self): # Represents positions 3, 4, 5. tracker = range_trackers.OffsetRangeTracker(3, 6) + + # Position must be an integer type. + self.assertTrue(isinstance(tracker.position_at_fraction(0.0), (int, long))) # [3, 3) represents 0.0 of [3, 6) self.assertEqual(3, tracker.position_at_fraction(0.0)) # [3, 4) represents up to 1/3 of [3, 6)