[ 
https://issues.apache.org/jira/browse/BEAM-5959?focusedWorklogId=177260&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-177260
 ]

ASF GitHub Bot logged work on BEAM-5959:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Dec/18 01:05
            Start Date: 20/Dec/18 01:05
    Worklog Time Spent: 10m 
      Work Description: udim commented on a change in pull request #7050: 
[BEAM-5959] Reimplement GCS copies with rewrites.
URL: https://github.com/apache/beam/pull/7050#discussion_r243128626
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/gcsio.py
 ##########
 @@ -275,63 +285,128 @@ def delete_batch(self, paths):
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(self, src, dest):
+  def copy(self, src, dest, dest_kms_key_name=None, timeout_secs=3600,
+           max_bytes_rewritten_per_call=None):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
+      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
+        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
+        encryption defaults.
+      timeout_secs: Experimental. No backwards compatibility guarantees. Wait
+        about this long for the operation to complete.
+      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
+        guarantees. Each rewrite API call will return after these many bytes.
+        Used for testing.
+
+    Raises:
+      TimeoutError on timeout.
     """
     src_bucket, src_path = parse_gcs_path(src)
     dest_bucket, dest_path = parse_gcs_path(dest)
-    request = storage.StorageObjectsCopyRequest(
+    request = storage.StorageObjectsRewriteRequest(
         sourceBucket=src_bucket,
         sourceObject=src_path,
         destinationBucket=dest_bucket,
-        destinationObject=dest_path)
-    self.client.objects.Copy(request)
+        destinationObject=dest_path,
+        destinationKmsKeyName=dest_kms_key_name,
+        maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
+    start_time = time.time()
+    response = self.client.objects.Rewrite(request)
+    while not response.done:
+      logging.debug(
+          'Rewrite progress: %d / %d bytes, %s to %s',
+          response.totalBytesRewritten, response.objectSize, src, dest)
+      elapsed = time.time() - start_time
+      if elapsed > timeout_secs:
+        raise TimeoutError(
+            'Aborting rewrite after %d seconds: %s to %s' % (
+                elapsed, src, dest))
+      request.rewriteToken = response.rewriteToken
+      response = self.client.objects.Rewrite(request)
+      if self._rewrite_cb is not None:
+        self._rewrite_cb(response)
+
+    logging.debug('Rewrite done: %s to %s', src, dest)
 
   # We intentionally do not decorate this method with a retry, as retrying is
   # handled in BatchApiRequest.Execute().
-  def copy_batch(self, src_dest_pairs):
+  def copy_batch(self, src_dest_pairs, dest_kms_key_name=None,
+                 timeout_secs=3600, max_bytes_rewritten_per_call=None):
 
 Review comment:
   Removed timeout_secs in latest commit.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 177260)
    Time Spent: 10h 10m  (was: 10h)

> Add Cloud KMS support to GCS copies
> -----------------------------------
>
>                 Key: BEAM-5959
>                 URL: https://issues.apache.org/jira/browse/BEAM-5959
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp, sdk-py-core
>            Reporter: Udi Meiri
>            Assignee: Udi Meiri
>            Priority: Major
>          Time Spent: 10h 10m
>  Remaining Estimate: 0h
>
> Beam SDK currently uses the CopyTo GCS API call, which doesn't support 
> copying objects that Customer Managed Encryption Keys (CMEK).
> CMEKs are managed in Cloud KMS.
> Items (for Java and Python SDKs):
> - Update clients to versions that support KMS keys.
> - Change copyTo API calls to use rewriteTo (Python - directly, Java - 
> possibly convert copyTo API call to use client library)
> - Add unit tests.
> - Add basic tests (DirectRunner and GCS buckets with CMEK).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to