[ 
https://issues.apache.org/jira/browse/BEAM-5959?focusedWorklogId=177259&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-177259
 ]

ASF GitHub Bot logged work on BEAM-5959:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Dec/18 01:05
            Start Date: 20/Dec/18 01:05
    Worklog Time Spent: 10m 
      Work Description: udim commented on a change in pull request #7050: 
[BEAM-5959] Reimplement GCS copies with rewrites.
URL: https://github.com/apache/beam/pull/7050#discussion_r243128516
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/gcsio.py
 ##########
 @@ -275,63 +285,128 @@ def delete_batch(self, paths):
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(self, src, dest):
+  def copy(self, src, dest, dest_kms_key_name=None, timeout_secs=3600,
+           max_bytes_rewritten_per_call=None):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
+      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
+        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
+        encryption defaults.
+      timeout_secs: Experimental. No backwards compatibility guarantees. Wait
+        about this long for the operation to complete.
+      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
+        guarantees. Each rewrite API call will return after these many bytes.
+        Used for testing.
+
+    Raises:
+      TimeoutError on timeout.
     """
     src_bucket, src_path = parse_gcs_path(src)
     dest_bucket, dest_path = parse_gcs_path(dest)
-    request = storage.StorageObjectsCopyRequest(
+    request = storage.StorageObjectsRewriteRequest(
         sourceBucket=src_bucket,
         sourceObject=src_path,
         destinationBucket=dest_bucket,
-        destinationObject=dest_path)
-    self.client.objects.Copy(request)
+        destinationObject=dest_path,
+        destinationKmsKeyName=dest_kms_key_name,
+        maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
+    start_time = time.time()
+    response = self.client.objects.Rewrite(request)
+    while not response.done:
+      logging.debug(
+          'Rewrite progress: %d / %d bytes, %s to %s',
+          response.totalBytesRewritten, response.objectSize, src, dest)
+      elapsed = time.time() - start_time
+      if elapsed > timeout_secs:
+        raise TimeoutError(
+            'Aborting rewrite after %d seconds: %s to %s' % (
+                elapsed, src, dest))
+      request.rewriteToken = response.rewriteToken
+      response = self.client.objects.Rewrite(request)
+      if self._rewrite_cb is not None:
+        self._rewrite_cb(response)
+
+    logging.debug('Rewrite done: %s to %s', src, dest)
 
   # We intentionally do not decorate this method with a retry, as retrying is
   # handled in BatchApiRequest.Execute().
-  def copy_batch(self, src_dest_pairs):
+  def copy_batch(self, src_dest_pairs, dest_kms_key_name=None,
+                 timeout_secs=3600, max_bytes_rewritten_per_call=None):
     """Copies the given GCS object from src to dest.
 
     Args:
       src_dest_pairs: list of (src, dest) tuples of gs://<bucket>/<name> files
                       paths to copy from src to dest, not to exceed
                       MAX_BATCH_OPERATION_SIZE in length.
+      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
+        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
+        encryption defaults.
+      timeout_secs: Experimental. No backwards compatibility guarantees. Wait
+        about this long for all operations to complete.
+      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
+        guarantees. Each rewrite call will return after these many bytes. Used
+        primarily for testing.
 
     Returns: List of tuples of (src, dest, exception) in the same order as the
              src_dest_pairs argument, where exception is None if the operation
              succeeded or the relevant exception if the operation failed.
     """
     if not src_dest_pairs:
       return []
-    batch_request = BatchApiRequest(
-        batch_url=GCS_BATCH_ENDPOINT,
-        retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES)
-    for src, dest in src_dest_pairs:
-      src_bucket, src_path = parse_gcs_path(src)
-      dest_bucket, dest_path = parse_gcs_path(dest)
-      request = storage.StorageObjectsCopyRequest(
+    pair_to_request = {}
+    for pair in src_dest_pairs:
+      src_bucket, src_path = parse_gcs_path(pair[0])
+      dest_bucket, dest_path = parse_gcs_path(pair[1])
+      request = storage.StorageObjectsRewriteRequest(
           sourceBucket=src_bucket,
           sourceObject=src_path,
           destinationBucket=dest_bucket,
-          destinationObject=dest_path)
-      batch_request.Add(self.client.objects, 'Copy', request)
-    api_calls = batch_request.Execute(self.client._http)  # pylint: 
disable=protected-access
-    result_statuses = []
-    for i, api_call in enumerate(api_calls):
-      src, dest = src_dest_pairs[i]
-      exception = None
-      if api_call.is_error:
-        exception = api_call.exception
-        # Translate 404 to the appropriate not found exception.
-        if isinstance(exception, HttpError) and exception.status_code == 404:
-          exception = (
-              GcsIOError(errno.ENOENT, 'Source file not found: %s' % src))
-      result_statuses.append((src, dest, exception))
-    return result_statuses
+          destinationObject=dest_path,
+          destinationKmsKeyName=dest_kms_key_name,
+          maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
+      pair_to_request[pair] = request
+    pair_to_status = {}
+    start_time = time.time()
+    while True:
+      pairs_in_batch = list(set(src_dest_pairs) - set(pair_to_status))
+      if not pairs_in_batch:
+        break
+      elapsed = time.time() - start_time
+      if elapsed > timeout_secs:
+        raise TimeoutError(
+            'Aborting batch rewrite after %d seconds. Remaining: %s' % (
+                elapsed, pairs_in_batch))
+      batch_request = BatchApiRequest(
+          batch_url=GCS_BATCH_ENDPOINT,
+          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES)
+      for pair in pairs_in_batch:
+        batch_request.Add(self.client.objects, 'Rewrite', 
pair_to_request[pair])
+      api_calls = batch_request.Execute(self.client._http)  # pylint: 
disable=protected-access
+      for pair, api_call in zip(pairs_in_batch, api_calls):
+        src, dest = pair
+        response = api_call.response
+        if self._rewrite_cb is not None:
+          self._rewrite_cb(response)
+        if api_call.is_error:
+          exception = api_call.exception
+          # Translate 404 to the appropriate not found exception.
 
 Review comment:
   We spoke about this offline: rewrite is indeed a copy operation, not a 
rename.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 177259)
    Time Spent: 10h  (was: 9h 50m)

> Add Cloud KMS support to GCS copies
> -----------------------------------
>
>                 Key: BEAM-5959
>                 URL: https://issues.apache.org/jira/browse/BEAM-5959
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp, sdk-py-core
>            Reporter: Udi Meiri
>            Assignee: Udi Meiri
>            Priority: Major
>          Time Spent: 10h
>  Remaining Estimate: 0h
>
> Beam SDK currently uses the CopyTo GCS API call, which doesn't support 
> copying objects that Customer Managed Encryption Keys (CMEK).
> CMEKs are managed in Cloud KMS.
> Items (for Java and Python SDKs):
> - Update clients to versions that support KMS keys.
> - Change copyTo API calls to use rewriteTo (Python - directly, Java - 
> possibly convert copyTo API call to use client library)
> - Add unit tests.
> - Add basic tests (DirectRunner and GCS buckets with CMEK).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to