Repository: incubator-beam Updated Branches: refs/heads/python-sdk 45b420d82 -> cce4331dc
Fix issue where batch GCS renames were not issued Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/99bcafe7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/99bcafe7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/99bcafe7 Branch: refs/heads/python-sdk Commit: 99bcafe7a02bbec5222d77abbad24f5eed8a687f Parents: 45b420d Author: Charles Chen <c...@google.com> Authored: Thu Nov 17 14:13:56 2016 -0800 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Fri Nov 18 13:37:04 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/io/fileio.py | 5 +++ sdks/python/apache_beam/io/fileio_test.py | 47 ++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99bcafe7/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 3b67c4f..4d0eea6 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -514,6 +514,7 @@ class ChannelFactory(object): gcs_batches = [] gcs_current_batch = [] for src, dest in src_dest_pairs: + gcs_current_batch.append((src, dest)) if len(gcs_current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE: gcs_batches.append(gcs_current_batch) gcs_current_batch = [] @@ -893,6 +894,8 @@ class FileSink(iobase.Sink): exception_infos = ChannelFactory.rename_batch(batch) for src, dest, exception in exception_infos: if exception: + logging.warning('Rename not successful: %s -> %s, %s', src, dest, + exception) should_report = True if isinstance(exception, IOError): # May have already been copied. @@ -906,6 +909,8 @@ class FileSink(iobase.Sink): logging.warning(('Exception in _rename_batch. src: %s, ' 'dest: %s, err: %s'), src, dest, exception) exceptions.append(exception) + else: + logging.debug('Rename successful: %s -> %s', src, dest) return exceptions # ThreadPool crashes in old versions of Python (< 2.7.5) if created from a http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99bcafe7/sdks/python/apache_beam/io/fileio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index 63e71e0..9d1e424 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -28,6 +28,7 @@ import unittest import zlib import hamcrest as hc +import mock import apache_beam as beam from apache_beam import coders @@ -881,6 +882,52 @@ class TestFileSink(unittest.TestCase): with self.assertRaises(Exception): list(sink.finalize_write(init_token, [res1, res2])) + @mock.patch('apache_beam.io.fileio.ChannelFactory.rename') + @mock.patch('apache_beam.io.fileio.gcsio') + def test_rename_batch(self, *unused_args): + # Prepare mocks. + gcsio_mock = mock.MagicMock() + fileio.gcsio.GcsIO = lambda: gcsio_mock + fileio.ChannelFactory.rename = mock.MagicMock() + to_rename = [ + ('gs://bucket/from1', 'gs://bucket/to1'), + ('gs://bucket/from2', 'gs://bucket/to2'), + ('/local/from1', '/local/to1'), + ('gs://bucket/from3', 'gs://bucket/to3'), + ('/local/from2', '/local/to2'), + ] + gcsio_mock.copy_batch.side_effect = [[ + ('gs://bucket/from1', 'gs://bucket/to1', None), + ('gs://bucket/from2', 'gs://bucket/to2', None), + ('gs://bucket/from3', 'gs://bucket/to3', None), + ]] + gcsio_mock.delete_batch.side_effect = [[ + ('gs://bucket/from1', None), + ('gs://bucket/from2', None), + ('gs://bucket/from3', None), + ]] + + # Issue batch rename. + fileio.ChannelFactory.rename_batch(to_rename) + + # Verify mocks. + expected_local_rename_calls = [ + mock.call('/local/from1', '/local/to1'), + mock.call('/local/from2', '/local/to2'), + ] + self.assertEqual(fileio.ChannelFactory.rename.call_args_list, + expected_local_rename_calls) + gcsio_mock.copy_batch.assert_called_once_with([ + ('gs://bucket/from1', 'gs://bucket/to1'), + ('gs://bucket/from2', 'gs://bucket/to2'), + ('gs://bucket/from3', 'gs://bucket/to3'), + ]) + gcsio_mock.delete_batch.assert_called_once_with([ + 'gs://bucket/from1', + 'gs://bucket/from2', + 'gs://bucket/from3', + ]) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO)