Repository: beam Updated Branches: refs/heads/master 0ce01b63f -> 87a12af6c
[BEAM-539] Fixes several issues of FileSink. (1) Updates FileSink to fail for file name prefixes that only contain a single component (for example GCS buckets). For example, currently FileSink fails for 'gs://aaa' while passing for 'gs://aaa/'. This change makes FileSink fail for both cases (and makes the behaviour consistent with Java). (2) Updates the name of the temporary directory created by FileSink Currently, for a filename prefix 'gs://aaa/bbb', the temp path would be of the form 'gs://aaa/bbb-temp-...'. This is error prone since a user pattern 'gs://aaa/bbb*' would match temp files. This changes makes the temp path format 'gs://aaa/beam-temp-bbb-...' instead. To achieve above this adds a method 'split()' to FileSystem interface that is analogous to Python 'os.path.split()' and has the opposite effect of current method FileSystem.join(). Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5ec48c58 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5ec48c58 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5ec48c58 Branch: refs/heads/master Commit: 5ec48c58c0e32891224598db61ebb63e8731e9fb Parents: 0ce01b6 Author: Chamikara Jayalath <chamik...@google.com> Authored: Fri Apr 28 14:38:35 2017 -0700 Committer: chamik...@google.com <chamik...@google.com> Committed: Tue May 2 13:56:00 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/fileio.py | 20 +++++-- sdks/python/apache_beam/io/fileio_test.py | 56 ++++++++++++++++++++ sdks/python/apache_beam/io/filesystem.py | 17 ++++++ sdks/python/apache_beam/io/filesystems.py | 18 +++++++ sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 34 +++++++++++- .../apache_beam/io/gcp/gcsfilesystem_test.py | 12 +++++ sdks/python/apache_beam/io/localfilesystem.py | 13 +++++ .../apache_beam/io/localfilesystem_test.py | 35 ++++++++++++ 8 files changed, 200 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index bb77bfe..49562f7 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -139,12 +139,26 @@ class FileSink(iobase.Sink): @check_accessible(['file_path_prefix', 'file_name_suffix']) def initialize_write(self): file_path_prefix = self.file_path_prefix.get() - file_name_suffix = self.file_name_suffix.get() - tmp_dir = file_path_prefix + file_name_suffix + time.strftime( - '-temp-%Y-%m-%d_%H-%M-%S') + + tmp_dir = self._create_temp_dir(file_path_prefix) FileSystems.mkdirs(tmp_dir) return tmp_dir + def _create_temp_dir(self, file_path_prefix): + base_path, last_component = FileSystems.split(file_path_prefix) + if not last_component: + # Trying to re-split the base_path to check if it's a root. + new_base_path, _ = FileSystems.split(base_path) + if base_path == new_base_path: + raise ValueError('Cannot create a temporary directory for root path ' + 'prefix %s. Please specify a file path prefix with ' + 'at least two components.', + file_path_prefix) + path_components = [base_path, + 'beam-temp-' + last_component + time.strftime( + '-%Y-%m-%d_%H-%M-%S')] + return FileSystems.join(*path_components) + @check_accessible(['file_path_prefix', 'file_name_suffix']) def open_writer(self, init_result, uid): # A proper suffix is needed for AUTO compression detection. http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/fileio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index 2409873..13778d5 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -26,10 +26,12 @@ import tempfile import unittest import hamcrest as hc +import mock import apache_beam as beam from apache_beam import coders from apache_beam.io import fileio +from apache_beam.io.filesystem import BeamIOError from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher @@ -184,6 +186,60 @@ class TestFileSink(_TestCaseWithTempDirCleanUp): self.assertTrue('][a][' in concat, concat) self.assertTrue('][b][' in concat, concat) + # Not using 'test' in name so that 'nose' doesn't pick this as a test. + def run_temp_dir_check(self, no_dir_path, dir_path, no_dir_root_path, + dir_root_path, prefix, separator): + def _get_temp_dir(file_path_prefix): + sink = MyFileSink( + file_path_prefix, file_name_suffix='.output', + coder=coders.ToStringCoder()) + return sink.initialize_write() + + temp_dir = _get_temp_dir(no_dir_path) + self.assertTrue(temp_dir.startswith(prefix)) + last_sep = temp_dir.rfind(separator) + self.assertTrue(temp_dir[last_sep + 1:].startswith('beam-temp')) + + temp_dir = _get_temp_dir(dir_path) + self.assertTrue(temp_dir.startswith(prefix)) + last_sep = temp_dir.rfind(separator) + self.assertTrue(temp_dir[last_sep + 1:].startswith('beam-temp')) + + with self.assertRaises(ValueError): + _get_temp_dir(no_dir_root_path) + + with self.assertRaises(ValueError): + _get_temp_dir(dir_root_path) + + def test_temp_dir_gcs(self): + try: + self.run_temp_dir_check( + 'gs://aaa/bbb', 'gs://aaa/bbb/', 'gs://aaa', 'gs://aaa/', 'gs://', + '/') + except BeamIOError: + logging.debug('Ignoring test since GCP module is not installed') + + @mock.patch('apache_beam.io.localfilesystem.os') + def test_temp_dir_local(self, filesystem_os_mock): + # Here we test a unix-like mock file-system + # (not really testing Unix or Windows since we mock the function of 'os' + # module). + + def _fake_unix_split(path): + sep = path.rfind('/') + if sep < 0: + raise ValueError('Path must contain a separator') + return (path[:sep], path[sep + 1:]) + + def _fake_unix_join(base, path): + return base + '/' + path + + filesystem_os_mock.path.abspath = lambda a: a + filesystem_os_mock.path.split.side_effect = _fake_unix_split + filesystem_os_mock.path.join.side_effect = _fake_unix_join + self.run_temp_dir_check( + '/aaa/bbb', '/aaa/bbb/', '/', '/', '/', '/') + def test_file_sink_multi_shards(self): temp_path = os.path.join(self._new_tempdir(), 'multishard') sink = MyFileSink( http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/filesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index db38858..ff8af03 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -438,6 +438,23 @@ class FileSystem(object): raise NotImplementedError @abc.abstractmethod + def split(self, path): + """Splits the given path into two parts. + + Splits the path into a pair (head, tail) such that tail contains the last + component of the path and head contains everything up to that. + + For file-systems other than the local file-system, head should include the + prefix. + + Args: + path: path as a string + Returns: + a pair of path components as strings. + """ + raise NotImplementedError + + @abc.abstractmethod def mkdirs(self, path): """Recursively create directories for the provided path. http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/filesystems.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystems.py b/sdks/python/apache_beam/io/filesystems.py index 07fc684..225a857 100644 --- a/sdks/python/apache_beam/io/filesystems.py +++ b/sdks/python/apache_beam/io/filesystems.py @@ -50,6 +50,24 @@ class FileSystems(object): return filesystem.join(basepath, *paths) @staticmethod + def split(path): + """Splits the given path into two parts. + + Splits the path into a pair (head, tail) such that tail contains the last + component of the path and head contains everything up to that. + + For file-systems other than the local file-system, head should include the + prefix. + + Args: + path: path as a string + Returns: + a pair of path components as strings. + """ + filesystem = FileSystems.get_filesystem(path) + return filesystem.split(path) + + @staticmethod def mkdirs(path): """Recursively create directories for the provided path. http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/gcp/gcsfilesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index fdc4757..ba00f50 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -32,6 +32,7 @@ class GCSFileSystem(FileSystem): """ CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations + GCS_PREFIX = 'gs://' def join(self, basepath, *paths): """Join two or more pathname components for the filesystem @@ -42,13 +43,42 @@ class GCSFileSystem(FileSystem): Returns: full path after combining all the passed components """ - if not basepath.startswith('gs://'): + if not basepath.startswith(GCSFileSystem.GCS_PREFIX): raise ValueError('Basepath %r must be GCS path.', basepath) path = basepath for p in paths: path = path.rstrip('/') + '/' + p.lstrip('/') return path + def split(self, path): + """Splits the given path into two parts. + + Splits the path into a pair (head, tail) such that tail contains the last + component of the path and head contains everything up to that. + + Head will include the GCS prefix ('gs://'). + + Args: + path: path as a string + Returns: + a pair of path components as strings. + """ + path = path.strip() + if not path.startswith(GCSFileSystem.GCS_PREFIX): + raise ValueError('Path %r must be GCS path.', path) + + prefix_len = len(GCSFileSystem.GCS_PREFIX) + last_sep = path[prefix_len:].rfind('/') + if last_sep >= 0: + last_sep += prefix_len + + if last_sep > 0: + return (path[:last_sep], path[last_sep + 1:]) + elif last_sep < 0: + return (path, '') + else: + raise ValueError('Invalid path: %s', path) + def mkdirs(self, path): """Recursively create directories for the provided path. @@ -154,7 +184,7 @@ class GCSFileSystem(FileSystem): def _copy_path(source, destination): """Recursively copy the file tree from the source to the destination """ - if not destination.startswith('gs://'): + if not destination.startswith(GCSFileSystem.GCS_PREFIX): raise ValueError('Destination %r must be GCS path.', destination) # Use copy_tree if the path ends with / as it is a directory if source.endswith('/'): http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index 0669bf2..5fb9a62 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -53,6 +53,18 @@ class GCSFileSystemTest(unittest.TestCase): with self.assertRaises(ValueError): file_system.join('/bucket/path/', '/to/file') + def test_split(self): + file_system = gcsfilesystem.GCSFileSystem() + self.assertEqual(('gs://foo/bar', 'baz'), + file_system.split('gs://foo/bar/baz')) + self.assertEqual(('gs://foo', ''), + file_system.split('gs://foo/')) + self.assertEqual(('gs://foo', ''), + file_system.split('gs://foo')) + + with self.assertRaises(ValueError): + file_system.split('/no/gcs/prefix') + @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') def test_match_multiples(self, mock_gcsio): # Prepare mocks. http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/localfilesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py index 8b2bda9..bc7c576 100644 --- a/sdks/python/apache_beam/io/localfilesystem.py +++ b/sdks/python/apache_beam/io/localfilesystem.py @@ -45,6 +45,19 @@ class LocalFileSystem(FileSystem): """ return os.path.join(basepath, *paths) + def split(self, path): + """Splits the given path into two parts. + + Splits the path into a pair (head, tail) such that tail contains the last + component of the path and head contains everything up to that. + + Args: + path: path as a string + Returns: + a pair of path components as strings. + """ + return os.path.split(os.path.abspath(path)) + def mkdirs(self, path): """Recursively create directories for the provided path. http://git-wip-us.apache.org/repos/asf/beam/blob/5ec48c58/sdks/python/apache_beam/io/localfilesystem_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py b/sdks/python/apache_beam/io/localfilesystem_test.py index df6eb61..986dcaf 100644 --- a/sdks/python/apache_beam/io/localfilesystem_test.py +++ b/sdks/python/apache_beam/io/localfilesystem_test.py @@ -39,6 +39,19 @@ def _gen_fake_join(separator): return _join +def _gen_fake_split(separator): + """Returns a callable that splits a with the given separator.""" + + def _split(path): + sep_index = path.rfind(separator) + if sep_index >= 0: + return (path[:sep_index], path[sep_index + 1:]) + else: + return (path, '') + + return _split + + class LocalFileSystemTest(unittest.TestCase): def setUp(self): @@ -66,6 +79,28 @@ class LocalFileSystemTest(unittest.TestCase): self.assertEqual(r'C:\tmp\path\to\file', self.fs.join(r'C:\tmp\path', r'to\file')) + @mock.patch('apache_beam.io.localfilesystem.os') + def test_unix_path_split(self, os_mock): + os_mock.path.abspath.side_effect = lambda a: a + os_mock.path.split.side_effect = _gen_fake_split('/') + self.assertEqual(('/tmp/path/to', 'file'), + self.fs.split('/tmp/path/to/file')) + # Actual os.path.split will split following to '/' and 'tmp' when run in + # Unix. + self.assertEqual(('', 'tmp'), + self.fs.split('/tmp')) + + @mock.patch('apache_beam.io.localfilesystem.os') + def test_windows_path_split(self, os_mock): + os_mock.path.abspath = lambda a: a + os_mock.path.split.side_effect = _gen_fake_split('\\') + self.assertEqual((r'C:\tmp\path\to', 'file'), + self.fs.split(r'C:\tmp\path\to\file')) + # Actual os.path.split will split following to 'C:\' and 'tmp' when run in + # Windows. + self.assertEqual((r'C:', 'tmp'), + self.fs.split(r'C:\tmp')) + def test_mkdirs(self): path = os.path.join(self.tmpdir, 't1/t2') self.fs.mkdirs(path)