Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 45b420d82 -> cce4331dc


Fix issue where batch GCS renames were not issued


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/99bcafe7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/99bcafe7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/99bcafe7

Branch: refs/heads/python-sdk
Commit: 99bcafe7a02bbec5222d77abbad24f5eed8a687f
Parents: 45b420d
Author: Charles Chen <c...@google.com>
Authored: Thu Nov 17 14:13:56 2016 -0800
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Fri Nov 18 13:37:04 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py      |  5 +++
 sdks/python/apache_beam/io/fileio_test.py | 47 ++++++++++++++++++++++++++
 2 files changed, 52 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99bcafe7/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py 
b/sdks/python/apache_beam/io/fileio.py
index 3b67c4f..4d0eea6 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -514,6 +514,7 @@ class ChannelFactory(object):
     gcs_batches = []
     gcs_current_batch = []
     for src, dest in src_dest_pairs:
+      gcs_current_batch.append((src, dest))
       if len(gcs_current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE:
         gcs_batches.append(gcs_current_batch)
         gcs_current_batch = []
@@ -893,6 +894,8 @@ class FileSink(iobase.Sink):
       exception_infos = ChannelFactory.rename_batch(batch)
       for src, dest, exception in exception_infos:
         if exception:
+          logging.warning('Rename not successful: %s -> %s, %s', src, dest,
+                          exception)
           should_report = True
           if isinstance(exception, IOError):
             # May have already been copied.
@@ -906,6 +909,8 @@ class FileSink(iobase.Sink):
             logging.warning(('Exception in _rename_batch. src: %s, '
                              'dest: %s, err: %s'), src, dest, exception)
             exceptions.append(exception)
+        else:
+          logging.debug('Rename successful: %s -> %s', src, dest)
       return exceptions
 
     # ThreadPool crashes in old versions of Python (< 2.7.5) if created from a

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99bcafe7/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py 
b/sdks/python/apache_beam/io/fileio_test.py
index 63e71e0..9d1e424 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -28,6 +28,7 @@ import unittest
 import zlib
 
 import hamcrest as hc
+import mock
 
 import apache_beam as beam
 from apache_beam import coders
@@ -881,6 +882,52 @@ class TestFileSink(unittest.TestCase):
     with self.assertRaises(Exception):
       list(sink.finalize_write(init_token, [res1, res2]))
 
+  @mock.patch('apache_beam.io.fileio.ChannelFactory.rename')
+  @mock.patch('apache_beam.io.fileio.gcsio')
+  def test_rename_batch(self, *unused_args):
+    # Prepare mocks.
+    gcsio_mock = mock.MagicMock()
+    fileio.gcsio.GcsIO = lambda: gcsio_mock
+    fileio.ChannelFactory.rename = mock.MagicMock()
+    to_rename = [
+        ('gs://bucket/from1', 'gs://bucket/to1'),
+        ('gs://bucket/from2', 'gs://bucket/to2'),
+        ('/local/from1', '/local/to1'),
+        ('gs://bucket/from3', 'gs://bucket/to3'),
+        ('/local/from2', '/local/to2'),
+    ]
+    gcsio_mock.copy_batch.side_effect = [[
+        ('gs://bucket/from1', 'gs://bucket/to1', None),
+        ('gs://bucket/from2', 'gs://bucket/to2', None),
+        ('gs://bucket/from3', 'gs://bucket/to3', None),
+    ]]
+    gcsio_mock.delete_batch.side_effect = [[
+        ('gs://bucket/from1', None),
+        ('gs://bucket/from2', None),
+        ('gs://bucket/from3', None),
+    ]]
+
+    # Issue batch rename.
+    fileio.ChannelFactory.rename_batch(to_rename)
+
+    # Verify mocks.
+    expected_local_rename_calls = [
+        mock.call('/local/from1', '/local/to1'),
+        mock.call('/local/from2', '/local/to2'),
+    ]
+    self.assertEqual(fileio.ChannelFactory.rename.call_args_list,
+                     expected_local_rename_calls)
+    gcsio_mock.copy_batch.assert_called_once_with([
+        ('gs://bucket/from1', 'gs://bucket/to1'),
+        ('gs://bucket/from2', 'gs://bucket/to2'),
+        ('gs://bucket/from3', 'gs://bucket/to3'),
+    ])
+    gcsio_mock.delete_batch.assert_called_once_with([
+        'gs://bucket/from1',
+        'gs://bucket/from2',
+        'gs://bucket/from3',
+    ])
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)

Reply via email to