Repository: incubator-beam Updated Branches: refs/heads/python-sdk 2363ee510 -> fd6a52c15
Do not need to list all files in GCS for validation. Add limit field to fileIO Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/16886904 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/16886904 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/16886904 Branch: refs/heads/python-sdk Commit: 16886904df9fd1d3f92e1f7aabd134a28d6c1c00 Parents: 2363ee5 Author: Sourabh Bajaj <sourabhba...@google.com> Authored: Fri Dec 2 13:56:42 2016 -0800 Committer: Sourabh Bajaj <sourabhba...@google.com> Committed: Fri Dec 2 13:56:42 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/io/filebasedsource.py | 3 ++- sdks/python/apache_beam/io/fileio.py | 7 ++++--- sdks/python/apache_beam/io/gcsio.py | 6 ++++-- sdks/python/apache_beam/io/gcsio_test.py | 7 +++++++ 4 files changed, 17 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16886904/sdks/python/apache_beam/io/filebasedsource.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index 14c2b06..8921801 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -175,7 +175,8 @@ class FileBasedSource(iobase.BoundedSource): def _validate(self): """Validate if there are actual files in the specified glob pattern """ - if len(fileio.ChannelFactory.glob(self._pattern)) <= 0: + # Limit the responses as we only want to check if something exists + if len(fileio.ChannelFactory.glob(self._pattern, limit=1)) <= 0: raise IOError( 'No files found based on the file pattern %s' % self._pattern) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16886904/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index c71a730..82e7813 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -588,11 +588,12 @@ class ChannelFactory(object): raise IOError(err) @staticmethod - def glob(path): + def glob(path, limit=None): if path.startswith('gs://'): - return gcsio.GcsIO().glob(path) + return gcsio.GcsIO().glob(path, limit) else: - return glob.glob(path) + files = glob.glob(path) + return files[:limit] @staticmethod def size_in_bytes(path): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16886904/sdks/python/apache_beam/io/gcsio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py index 9adb946..748465f 100644 --- a/sdks/python/apache_beam/io/gcsio.py +++ b/sdks/python/apache_beam/io/gcsio.py @@ -142,7 +142,7 @@ class GcsIO(object): @retry.with_exponential_backoff( retry_filter=retry.retry_on_server_errors_and_timeout_filter) - def glob(self, pattern): + def glob(self, pattern, limit=None): """Return the GCS path names matching a given path name pattern. Path name patterns are those recognized by fnmatch.fnmatch(). The path @@ -166,9 +166,11 @@ class GcsIO(object): object_paths.append('gs://%s/%s' % (item.bucket, item.name)) if response.nextPageToken: request.pageToken = response.nextPageToken + if limit is not None and len(object_paths) >= limit: + break else: break - return object_paths + return object_paths[:limit] @retry.with_exponential_backoff( retry_filter=retry.retry_on_server_errors_and_timeout_filter) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16886904/sdks/python/apache_beam/io/gcsio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py index 9d44e17..5af13c6 100644 --- a/sdks/python/apache_beam/io/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcsio_test.py @@ -652,6 +652,13 @@ class TestGCSIO(unittest.TestCase): self.assertEqual( set(self.gcs.glob(file_pattern)), set(expected_file_names)) + # Check if limits are followed correctly + limit = 3 + for file_pattern, expected_object_names in test_cases: + expected_num_items = min(len(expected_object_names), limit) + self.assertEqual( + len(self.gcs.glob(file_pattern, limit)), expected_num_items) + def test_size_of_files_in_glob(self): bucket_name = 'gcsio-test' object_names = [