[ 
https://issues.apache.org/jira/browse/BEAM-5959?focusedWorklogId=178126&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-178126
 ]

ASF GitHub Bot logged work on BEAM-5959:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Dec/18 19:45
            Start Date: 21/Dec/18 19:45
    Worklog Time Spent: 10m 
      Work Description: chamikaramj closed pull request #7050: [BEAM-5959] 
Reimplement GCS copies with rewrites.
URL: https://github.com/apache/beam/pull/7050
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.gitignore b/.gitignore
index f7e691ac25cb..8cc5c62f7d2d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -42,6 +42,7 @@ sdks/python/NOTICE
 sdks/python/README.md
 sdks/python/apache_beam/portability/api/*pb2*.*
 sdks/python/nosetests.xml
+sdks/python/postcommit_requirements.txt
 
 # Ignore IntelliJ files.
 **/.idea/**/*
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py 
b/sdks/python/apache_beam/io/gcp/gcsio.py
index 225d7915eb07..1df93fbe47f8 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -142,6 +142,15 @@ def __init__(self, storage_client=None):
     # storage_client is None.
     if storage_client is not None:
       self.client = storage_client
+    self._rewrite_cb = None
+
+  def _set_rewrite_response_callback(self, callback):
+    """For testing purposes only. No backward compatibility guarantees.
+
+    Args:
+      callback: A function that receives ``storage.RewriteResponse``.
+    """
+    self._rewrite_cb = callback
 
   def open(self,
            filename,
@@ -231,31 +240,60 @@ def delete_batch(self, paths):
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(self, src, dest):
+  def copy(self, src, dest, dest_kms_key_name=None,
+           max_bytes_rewritten_per_call=None):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
+      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
+        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
+        encryption defaults.
+      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
+        guarantees. Each rewrite API call will return after these many bytes.
+        Used for testing.
+
+    Raises:
+      TimeoutError on timeout.
     """
     src_bucket, src_path = parse_gcs_path(src)
     dest_bucket, dest_path = parse_gcs_path(dest)
-    request = storage.StorageObjectsCopyRequest(
+    request = storage.StorageObjectsRewriteRequest(
         sourceBucket=src_bucket,
         sourceObject=src_path,
         destinationBucket=dest_bucket,
-        destinationObject=dest_path)
-    self.client.objects.Copy(request)
+        destinationObject=dest_path,
+        destinationKmsKeyName=dest_kms_key_name,
+        maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
+    response = self.client.objects.Rewrite(request)
+    while not response.done:
+      logging.debug(
+          'Rewrite progress: %d of %d bytes, %s to %s',
+          response.totalBytesRewritten, response.objectSize, src, dest)
+      request.rewriteToken = response.rewriteToken
+      response = self.client.objects.Rewrite(request)
+      if self._rewrite_cb is not None:
+        self._rewrite_cb(response)
+
+    logging.debug('Rewrite done: %s to %s', src, dest)
 
   # We intentionally do not decorate this method with a retry, as retrying is
   # handled in BatchApiRequest.Execute().
-  def copy_batch(self, src_dest_pairs):
+  def copy_batch(self, src_dest_pairs, dest_kms_key_name=None,
+                 max_bytes_rewritten_per_call=None):
     """Copies the given GCS object from src to dest.
 
     Args:
       src_dest_pairs: list of (src, dest) tuples of gs://<bucket>/<name> files
                       paths to copy from src to dest, not to exceed
                       MAX_BATCH_OPERATION_SIZE in length.
+      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
+        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
+        encryption defaults.
+      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
+        guarantees. Each rewrite call will return after these many bytes. Used
+        primarily for testing.
 
     Returns: List of tuples of (src, dest, exception) in the same order as the
              src_dest_pairs argument, where exception is None if the operation
@@ -263,31 +301,51 @@ def copy_batch(self, src_dest_pairs):
     """
     if not src_dest_pairs:
       return []
-    batch_request = BatchApiRequest(
-        batch_url=GCS_BATCH_ENDPOINT,
-        retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES)
-    for src, dest in src_dest_pairs:
-      src_bucket, src_path = parse_gcs_path(src)
-      dest_bucket, dest_path = parse_gcs_path(dest)
-      request = storage.StorageObjectsCopyRequest(
+    pair_to_request = {}
+    for pair in src_dest_pairs:
+      src_bucket, src_path = parse_gcs_path(pair[0])
+      dest_bucket, dest_path = parse_gcs_path(pair[1])
+      request = storage.StorageObjectsRewriteRequest(
           sourceBucket=src_bucket,
           sourceObject=src_path,
           destinationBucket=dest_bucket,
-          destinationObject=dest_path)
-      batch_request.Add(self.client.objects, 'Copy', request)
-    api_calls = batch_request.Execute(self.client._http)  # pylint: 
disable=protected-access
-    result_statuses = []
-    for i, api_call in enumerate(api_calls):
-      src, dest = src_dest_pairs[i]
-      exception = None
-      if api_call.is_error:
-        exception = api_call.exception
-        # Translate 404 to the appropriate not found exception.
-        if isinstance(exception, HttpError) and exception.status_code == 404:
-          exception = (
-              GcsIOError(errno.ENOENT, 'Source file not found: %s' % src))
-      result_statuses.append((src, dest, exception))
-    return result_statuses
+          destinationObject=dest_path,
+          destinationKmsKeyName=dest_kms_key_name,
+          maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
+      pair_to_request[pair] = request
+    pair_to_status = {}
+    while True:
+      pairs_in_batch = list(set(src_dest_pairs) - set(pair_to_status))
+      if not pairs_in_batch:
+        break
+      batch_request = BatchApiRequest(
+          batch_url=GCS_BATCH_ENDPOINT,
+          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES)
+      for pair in pairs_in_batch:
+        batch_request.Add(self.client.objects, 'Rewrite', 
pair_to_request[pair])
+      api_calls = batch_request.Execute(self.client._http)  # pylint: 
disable=protected-access
+      for pair, api_call in zip(pairs_in_batch, api_calls):
+        src, dest = pair
+        response = api_call.response
+        if self._rewrite_cb is not None:
+          self._rewrite_cb(response)
+        if api_call.is_error:
+          exception = api_call.exception
+          # Translate 404 to the appropriate not found exception.
+          if isinstance(exception, HttpError) and exception.status_code == 404:
+            exception = (
+                GcsIOError(errno.ENOENT, 'Source file not found: %s' % src))
+          pair_to_status[pair] = exception
+        elif not response.done:
+          logging.debug(
+              'Rewrite progress: %d of %d bytes, %s to %s',
+              response.totalBytesRewritten, response.objectSize, src, dest)
+          pair_to_request[pair].rewriteToken = response.rewriteToken
+        else:
+          logging.debug('Rewrite done: %s to %s', src, dest)
+          pair_to_status[pair] = None
+
+    return [(pair[0], pair[1], pair_to_status[pair]) for pair in 
src_dest_pairs]
 
   # We intentionally do not decorate this method with a retry, since the
   # underlying copy and delete operations are already idempotent operations
@@ -368,6 +426,22 @@ def size(self, path):
         bucket=bucket, object=object_path)
     return self.client.objects.Get(request).size
 
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def kms_key(self, path):
+    """Returns the KMS key of a single GCS object.
+
+    This method does not perform glob expansion. Hence the given path must be
+    for a single GCS object.
+
+    Returns: KMS key name of the GCS object as a string, or None if it doesn't
+      have one.
+    """
+    bucket, object_path = parse_gcs_path(path)
+    request = storage.StorageObjectsGetRequest(
+        bucket=bucket, object=object_path)
+    return self.client.objects.Get(request).kmsKeyName
+
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
   def last_updated(self, path):
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py 
b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
new file mode 100644
index 000000000000..ff15a6ff394e
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Integration tests for gcsio module.
+
+Runs tests against Google Cloud Storage service.
+Instantiates a TestPipeline to get options such as GCP project name, but
+doesn't actually start a Beam pipeline or test any specific runner.
+
+Options:
+  --kms_key_name=projects/<project-name>/locations/<region>/keyRings/\
+      <key-ring-name>/cryptoKeys/<key-name>/cryptoKeyVersions/<version>
+    Pass a Cloud KMS key name to test GCS operations using customer managed
+    encryption keys (CMEK).
+
+Cloud KMS permissions:
+The project's Cloud Storage service account requires Encrypter/Decrypter
+permissions for the key specified in --kms_key_name.
+
+To run these tests manually:
+  ./gradlew beam-sdks-python:integrationTest \
+    -Ptests=apache_beam.io.gcp.gcsio_integration_test:GcsIOIntegrationTest \
+    -PkmsKeyName=KMS_KEY_NAME
+"""
+
+from __future__ import absolute_import
+
+import logging
+import unittest
+import uuid
+
+from nose.plugins.attrib import attr
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.testing.test_pipeline import TestPipeline
+
+try:
+  from apache_beam.io.gcp import gcsio
+except ImportError:
+  gcsio = None
+
+
+@unittest.skipIf(gcsio is None, 'GCP dependencies are not installed')
+class GcsIOIntegrationTest(unittest.TestCase):
+
+  INPUT_FILE = 'gs://dataflow-samples/shakespeare/kinglear.txt'
+  # Larger than 1MB to test maxBytesRewrittenPerCall.
+  INPUT_FILE_LARGE = (
+      'gs://dataflow-samples/wikipedia_edits/wiki_data-000000000000.json')
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.runner_name = type(self.test_pipeline.runner).__name__
+    if self.runner_name != 'TestDataflowRunner':
+      # This test doesn't run a pipeline, so it doesn't make sense to try it on
+      # different runners. Running with TestDataflowRunner makes sense since
+      # it uses GoogleCloudOptions such as 'project'.
+      raise unittest.SkipTest(
+          'This test only runs with TestDataflowRunner.')
+    self.project = self.test_pipeline.get_option('project')
+    self.gcs_tempdir = (self.test_pipeline.get_option('temp_location') +
+                        '/gcs_it-' + str(uuid.uuid4()))
+    self.kms_key_name = self.test_pipeline.get_option('kms_key_name')
+    self.gcsio = gcsio.GcsIO()
+
+  def tearDown(self):
+    FileSystems.delete([self.gcs_tempdir + '/'])
+
+  def _verify_copy(self, src, dst, dst_kms_key_name=None):
+    self.assertTrue(FileSystems.exists(src), 'src does not exist: %s' % src)
+    self.assertTrue(FileSystems.exists(dst), 'dst does not exist: %s' % dst)
+    src_checksum = self.gcsio.checksum(src)
+    dst_checksum = self.gcsio.checksum(dst)
+    self.assertEqual(src_checksum, dst_checksum)
+    self.assertEqual(self.gcsio.kms_key(dst), dst_kms_key_name)
+
+  def _test_copy(self, name, kms_key_name=None,
+                 max_bytes_rewritten_per_call=None, src=None):
+    src = src or self.INPUT_FILE
+    dst = self.gcs_tempdir + '/%s' % name
+    extra_kwargs = {}
+    if max_bytes_rewritten_per_call is not None:
+      extra_kwargs['max_bytes_rewritten_per_call'] = (
+          max_bytes_rewritten_per_call)
+
+    self.gcsio.copy(src, dst, kms_key_name, **extra_kwargs)
+    self._verify_copy(src, dst, kms_key_name)
+
+  @attr('IT')
+  def test_copy(self):
+    self._test_copy("test_copy")
+
+  @attr('IT')
+  def test_copy_kms(self):
+    if self.kms_key_name is None:
+      raise unittest.SkipTest('--kms_key_name not specified')
+    self._test_copy("test_copy_kms", self.kms_key_name)
+
+  @attr('IT')
+  def test_copy_rewrite_token(self):
+    # Tests a multi-part copy (rewrite) operation. This is triggered by a
+    # combination of 3 conditions:
+    #  - a large enough src
+    #  - setting max_bytes_rewritten_per_call
+    #  - setting kms_key_name
+    if self.kms_key_name is None:
+      raise unittest.SkipTest('--kms_key_name not specified')
+
+    rewrite_responses = []
+    self.gcsio._set_rewrite_response_callback(
+        lambda response: rewrite_responses.append(response))
+    self._test_copy("test_copy_rewrite_token", kms_key_name=self.kms_key_name,
+                    max_bytes_rewritten_per_call=50 * 1024 * 1024,
+                    src=self.INPUT_FILE_LARGE)
+    # Verify that there was a multi-part rewrite.
+    self.assertTrue(any([not r.done for r in rewrite_responses]))
+
+  def _test_copy_batch(self, name, kms_key_name=None,
+                       max_bytes_rewritten_per_call=None, src=None):
+    num_copies = 10
+    srcs = [src or self.INPUT_FILE] * num_copies
+    dsts = [self.gcs_tempdir + '/%s_%d' % (name, i)
+            for i in range(num_copies)]
+    src_dst_pairs = list(zip(srcs, dsts))
+    extra_kwargs = {}
+    if max_bytes_rewritten_per_call is not None:
+      extra_kwargs['max_bytes_rewritten_per_call'] = (
+          max_bytes_rewritten_per_call)
+
+    result_statuses = self.gcsio.copy_batch(
+        src_dst_pairs, kms_key_name, **extra_kwargs)
+    for status in result_statuses:
+      self.assertIsNone(status[2], status)
+    for _src, _dst in src_dst_pairs:
+      self._verify_copy(_src, _dst, kms_key_name)
+
+  @attr('IT')
+  def test_copy_batch(self):
+    self._test_copy_batch("test_copy_batch")
+
+  @attr('IT')
+  def test_copy_batch_kms(self):
+    if self.kms_key_name is None:
+      raise unittest.SkipTest('--kms_key_name not specified')
+    self._test_copy_batch("test_copy_batch_kms", self.kms_key_name)
+
+  @attr('IT')
+  def test_copy_batch_rewrite_token(self):
+    # Tests a multi-part copy (rewrite) operation. This is triggered by a
+    # combination of 3 conditions:
+    #  - a large enough src
+    #  - setting max_bytes_rewritten_per_call
+    #  - setting kms_key_name
+    if self.kms_key_name is None:
+      raise unittest.SkipTest('--kms_key_name not specified')
+
+    rewrite_responses = []
+    self.gcsio._set_rewrite_response_callback(
+        lambda response: rewrite_responses.append(response))
+    self._test_copy_batch(
+        "test_copy_batch_rewrite_token", kms_key_name=self.kms_key_name,
+        max_bytes_rewritten_per_call=50 * 1024 * 1024,
+        src=self.INPUT_FILE_LARGE)
+    # Verify that there was a multi-part rewrite.
+    self.assertTrue(any([not r.done for r in rewrite_responses]))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py 
b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index 10789d00e229..4322fc9fa871 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -24,6 +24,7 @@
 import os
 import random
 import sys
+import time
 import unittest
 from builtins import object
 from builtins import range
@@ -48,7 +49,7 @@ class FakeGcsClient(object):
 
   def __init__(self):
     self.objects = FakeGcsObjects()
-    # Referenced in GcsIO.batch_copy() and GcsIO.batch_delete().
+    # Referenced in GcsIO.copy_batch() and GcsIO.delete_batch().
     self._http = object()
 
 
@@ -140,19 +141,31 @@ def Insert(self, insert_request, upload=None):  # pylint: 
disable=invalid-name
 
     self.add_file(f)
 
-  def Copy(self, copy_request):  # pylint: disable=invalid-name
-    src_file = self.get_file(copy_request.sourceBucket,
-                             copy_request.sourceObject)
+  REWRITE_TOKEN = 'test_token'
+
+  def Rewrite(self, rewrite_request):  # pylint: disable=invalid-name
+    if rewrite_request.rewriteToken == self.REWRITE_TOKEN:
+      dest_object = storage.Object()
+      return storage.RewriteResponse(
+          done=True, objectSize=100, resource=dest_object,
+          totalBytesRewritten=100)
+
+    src_file = self.get_file(rewrite_request.sourceBucket,
+                             rewrite_request.sourceObject)
     if not src_file:
       raise HttpError(
           httplib2.Response({'status': '404'}), '404 Not Found',
           'https://fake/url')
-    generation = self.get_last_generation(copy_request.destinationBucket,
-                                          copy_request.destinationObject) + 1
-    dest_file = FakeFile(copy_request.destinationBucket,
-                         copy_request.destinationObject, src_file.contents,
+    generation = self.get_last_generation(rewrite_request.destinationBucket,
+                                          rewrite_request.destinationObject) + 
1
+    dest_file = FakeFile(rewrite_request.destinationBucket,
+                         rewrite_request.destinationObject, src_file.contents,
                          generation)
     self.add_file(dest_file)
+    time.sleep(10)  # time.sleep and time.time are mocked below.
+    return storage.RewriteResponse(
+        done=False, objectSize=100, rewriteToken=self.REWRITE_TOKEN,
+        totalBytesRewritten=5)
 
   def Delete(self, delete_request):  # pylint: disable=invalid-name
     # Here, we emulate the behavior of the GCS service in raising a 404 error
@@ -196,9 +209,11 @@ def List(self, list_request):  # pylint: 
disable=invalid-name
 
 class FakeApiCall(object):
 
-  def __init__(self, exception):
+  def __init__(self, exception, response):
     self.exception = exception
     self.is_error = exception is not None
+    # Response for Rewrite:
+    self.response = response
 
 
 class FakeBatchApiRequest(object):
@@ -213,11 +228,12 @@ def Execute(self, unused_http, **unused_kwargs):  # 
pylint: disable=invalid-name
     api_calls = []
     for service, method, request in self.operations:
       exception = None
+      response = None
       try:
-        getattr(service, method)(request)
+        response = getattr(service, method)(request)
       except Exception as e:  # pylint: disable=broad-except
         exception = e
-      api_calls.append(FakeApiCall(exception))
+      api_calls.append(FakeApiCall(exception, response))
     return api_calls
 
 
@@ -257,6 +273,9 @@ def test_bad_gcs_path_object_optional(self):
 
 
 @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+@mock.patch.multiple('time',
+                     time=mock.MagicMock(side_effect=range(100)),
+                     sleep=mock.MagicMock())
 class TestGCSIO(unittest.TestCase):
 
   def _insert_random_file(self, client, path, size, generation=1, crc32c=None,
@@ -391,13 +410,14 @@ def test_copy(self):
     self.assertFalse(
         gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
 
-    self.gcs.copy(src_file_name, dest_file_name)
+    self.gcs.copy(src_file_name, dest_file_name, dest_kms_key_name='kms_key')
 
     self.assertTrue(
         gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
     self.assertTrue(
         gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
 
+    # Test copy of non-existent files.
     with self.assertRaisesRegexp(HttpError, r'Not Found'):
       self.gcs.copy('gs://gcsio-test/non-existent',
                     'gs://gcsio-test/non-existent-destination')
@@ -410,10 +430,10 @@ def test_copy_batch(self, *unused_args):
     file_size = 1024
     num_files = 10
 
-    # Test copy of non-existent files.
     result = self.gcs.copy_batch(
         [(from_name_pattern % i, to_name_pattern % i)
-         for i in range(num_files)])
+         for i in range(num_files)],
+        dest_kms_key_name='kms_key')
     self.assertTrue(result)
     for i, (src, dest, exception) in enumerate(result):
       self.assertEqual(src, from_name_pattern % i)
diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle
index a17b032a0a48..99e295185b9b 100644
--- a/sdks/python/build.gradle
+++ b/sdks/python/build.gradle
@@ -257,6 +257,8 @@ task integrationTest(dependsOn: ['installGcpTest', 
'sdist']) {
     // Build pipeline options that configures pipeline job
     if (project.hasProperty('pipelineOptions'))
       argMap["pipeline_opts"] = project.property('pipelineOptions')
+    if (project.hasProperty('kmsKeyName'))
+      argMap["kms_key_name"] = project.property('kmsKeyName')
 
     def cmdArgs = mapToArgString(argMap)
     exec {
diff --git a/sdks/python/scripts/run_integration_test.sh 
b/sdks/python/scripts/run_integration_test.sh
index d2f5852b8db8..f8f8bef68d62 100755
--- a/sdks/python/scripts/run_integration_test.sh
+++ b/sdks/python/scripts/run_integration_test.sh
@@ -27,17 +27,18 @@
 #
 # Pipeline related flags:
 #     runner        -> Runner that execute pipeline job.
-#                      e.g. TestDataflowRunner, DirectRunner
+#                      e.g. TestDataflowRunner, TestDirectRunner
 #     project       -> Project name of the cloud service.
 #     gcs_location  -> Base location on GCS. Some pipeline options are
-#                      dirived from it including output, staging_location
+#                      derived from it including output, staging_location
 #                      and temp_location.
 #     sdk_location  -> Python tar ball location. Glob is accepted.
 #     num_workers   -> Number of workers.
 #     sleep_secs    -> Number of seconds to wait before verification.
 #     streaming     -> True if a streaming job.
 #     worker_jar    -> Customized worker jar for dataflow runner.
-#     pipeline_opts -> List of space separateed pipeline options. If this
+#     kms_key_name  -> Name of Cloud KMS encryption key to use in some tests.
+#     pipeline_opts -> List of space separated pipeline options. If this
 #                      flag is specified, all above flag will be ignored.
 #                      Please include all required pipeline options when
 #                      using this flag.
@@ -70,6 +71,9 @@ NUM_WORKERS=1
 SLEEP_SECS=20
 STREAMING=false
 WORKER_JAR=""
+# Specify "/cryptoKeyVersions/1" suffix for testing simplicity. For this to 
work
+# in the long term, this key has rotation disabled.
+KMS_KEY_NAME="projects/apache-beam-testing/locations/global/keyRings/beam-it/cryptoKeys/test/cryptoKeyVersions/1"
 
 # Default test (nose) options.
 # Default test sets are full integration tests.
@@ -119,6 +123,11 @@ case $key in
         shift # past argument
         shift # past value
         ;;
+    --kms_key_name)
+        KMS_KEY_NAME="$2"
+        shift # past argument
+        shift # past value
+        ;;
     --pipeline_opts)
         PIPELINE_OPTS="$2"
         shift # past argument
@@ -192,11 +201,14 @@ if [[ -z $PIPELINE_OPTS ]]; then
     opts+=("--dataflow_worker_jar=$WORKER_JAR")
   fi
 
+  if [[ ! -z "$KMS_KEY_NAME" ]]; then
+    opts+=("--kms_key_name=$KMS_KEY_NAME")
+  fi
+
   PIPELINE_OPTS=$(IFS=" " ; echo "${opts[*]}")
 
 fi
 
-
 ###########################################################################
 # Run tests and validate that jobs finish successfully.
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 178126)
    Time Spent: 13h 50m  (was: 13h 40m)

> Add Cloud KMS support to GCS copies
> -----------------------------------
>
>                 Key: BEAM-5959
>                 URL: https://issues.apache.org/jira/browse/BEAM-5959
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp, sdk-py-core
>            Reporter: Udi Meiri
>            Assignee: Udi Meiri
>            Priority: Major
>          Time Spent: 13h 50m
>  Remaining Estimate: 0h
>
> Beam SDK currently uses the CopyTo GCS API call, which doesn't support 
> copying objects that Customer Managed Encryption Keys (CMEK).
> CMEKs are managed in Cloud KMS.
> Items (for Java and Python SDKs):
> - Update clients to versions that support KMS keys.
> - Change copyTo API calls to use rewriteTo (Python - directly, Java - 
> possibly convert copyTo API call to use client library)
> - Add unit tests.
> - Add basic tests (DirectRunner and GCS buckets with CMEK).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to