This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new af2f8ee  Make S3 streaming more efficient (#15931)
af2f8ee is described below

commit af2f8ee6cf39a1d63818dbefef322740d5ad794c
Author: Janek Bevendorff <ja...@jbev.net>
AuthorDate: Wed Dec 22 18:09:17 2021 +0100

    Make S3 streaming more efficient (#15931)
---
 .../apache_beam/io/aws/clients/s3/boto3_client.py  | 117 +++++++++++----------
 sdks/python/apache_beam/utils/retry.py             |   2 +-
 2 files changed, 63 insertions(+), 56 deletions(-)

diff --git a/sdks/python/apache_beam/io/aws/clients/s3/boto3_client.py 
b/sdks/python/apache_beam/io/aws/clients/s3/boto3_client.py
index e2f968d..878ad07 100644
--- a/sdks/python/apache_beam/io/aws/clients/s3/boto3_client.py
+++ b/sdks/python/apache_beam/io/aws/clients/s3/boto3_client.py
@@ -19,6 +19,7 @@
 
 from apache_beam.io.aws.clients.s3 import messages
 from apache_beam.options import pipeline_options
+from apache_beam.utils import retry
 
 try:
   # pylint: disable=wrong-import-order, wrong-import-position
@@ -29,6 +30,12 @@ except ImportError:
   boto3 = None
 
 
+def get_http_error_code(exc):
+  if hasattr(exc, 'response'):
+    return exc.response.get('ResponseMetadata', {}).get('HTTPStatusCode')
+  return None
+
+
 class Client(object):
   """
   Wrapper for boto3 library
@@ -67,52 +74,67 @@ class Client(object):
         aws_secret_access_key=secret_access_key,
         aws_session_token=session_token)
 
-  def get_object_metadata(self, request):
-    r"""Retrieves an object's metadata.
+    self._download_request = None
+    self._download_stream = None
+    self._download_pos = 0
 
-    Args:
-      request: (GetRequest) input message
+  def get_stream(self, request, start):
+    """Opens a stream object starting at the given position.
 
+    Args:
+      request: (GetRequest) request
+      start: (int) start offset
     Returns:
-      (Object) The response message.
+      (Stream) Boto3 stream object.
     """
-    kwargs = {'Bucket': request.bucket, 'Key': request.object}
-
-    try:
-      boto_response = self.client.head_object(**kwargs)
-    except Exception as e:
-      message = e.response['Error']['Message']
-      code = e.response['ResponseMetadata']['HTTPStatusCode']
-      raise messages.S3ClientError(message, code)
-
-    item = messages.Item(
-        boto_response['ETag'],
-        request.object,
-        boto_response['LastModified'],
-        boto_response['ContentLength'],
-        boto_response['ContentType'])
-
-    return item
 
+    if self._download_request and (
+        start != self._download_pos or
+        request.bucket != self._download_request.bucket or
+        request.object != self._download_request.object):
+      self._download_stream.close()
+      self._download_stream = None
+
+    # noinspection PyProtectedMember
+    if not self._download_stream or self._download_stream._raw_stream.closed:
+      try:
+        self._download_stream = self.client.get_object(
+            Bucket=request.bucket,
+            Key=request.object,
+            Range='bytes={}-'.format(start))['Body']
+        self._download_request = request
+        self._download_pos = start
+      except Exception as e:
+        raise messages.S3ClientError(str(e), get_http_error_code(e))
+
+    return self._download_stream
+
+  @retry.with_exponential_backoff()
   def get_range(self, request, start, end):
     r"""Retrieves an object's contents.
 
       Args:
         request: (GetRequest) request
+        start: (int) start offset
+        end: (int) end offset (exclusive)
       Returns:
         (bytes) The response message.
       """
-    try:
-      boto_response = self.client.get_object(
-          Bucket=request.bucket,
-          Key=request.object,
-          Range='bytes={}-{}'.format(start, end - 1))
-    except Exception as e:
-      message = e.response['Error']['Message']
-      code = e.response['ResponseMetadata']['HTTPStatusCode']
-      raise messages.S3ClientError(message, code)
-
-    return boto_response['Body'].read()  # A bytes object
+    for i in range(2):
+      try:
+        stream = self.get_stream(request, start)
+        data = stream.read(end - start)
+        self._download_pos += len(data)
+        return data
+      except Exception as e:
+        self._download_stream = None
+        self._download_request = None
+        if i == 0:
+          # Read errors are likely with long-lived connections, retry 
immediately if a read fails once
+          continue
+        if isinstance(e, messages.S3ClientError):
+          raise e
+        raise messages.S3ClientError(str(e), get_http_error_code(e))
 
   def list(self, request):
     r"""Retrieves a list of objects matching the criteria.
@@ -130,9 +152,7 @@ class Client(object):
     try:
       boto_response = self.client.list_objects_v2(**kwargs)
     except Exception as e:
-      message = e.response['Error']['Message']
-      code = e.response['ResponseMetadata']['HTTPStatusCode']
-      raise messages.S3ClientError(message, code)
+      raise messages.S3ClientError(str(e), get_http_error_code(e))
 
     if boto_response['KeyCount'] == 0:
       message = 'Tried to list nonexistent S3 path: s3://%s/%s' % (
@@ -170,9 +190,7 @@ class Client(object):
           ContentType=request.mime_type)
       response = messages.UploadResponse(boto_response['UploadId'])
     except Exception as e:
-      message = e.response['Error']['Message']
-      code = e.response['ResponseMetadata']['HTTPStatusCode']
-      raise messages.S3ClientError(message, code)
+      raise messages.S3ClientError(str(e), get_http_error_code(e))
     return response
 
   def upload_part(self, request):
@@ -194,9 +212,7 @@ class Client(object):
           boto_response['ETag'], request.part_number)
       return response
     except Exception as e:
-      message = e.response['Error']['Message']
-      code = e.response['ResponseMetadata']['HTTPStatusCode']
-      raise messages.S3ClientError(message, code)
+      raise messages.S3ClientError(str(e), get_http_error_code(e))
 
   def complete_multipart_upload(self, request):
     r"""Completes a multipart upload to S3
@@ -214,9 +230,7 @@ class Client(object):
           UploadId=request.upload_id,
           MultipartUpload=parts)
     except Exception as e:
-      message = e.response['Error']['Message']
-      code = e.response['ResponseMetadata']['HTTPStatusCode']
-      raise messages.S3ClientError(message, code)
+      raise messages.S3ClientError(str(e), get_http_error_code(e))
 
   def delete(self, request):
     r"""Deletes given object from bucket
@@ -227,11 +241,8 @@ class Client(object):
     """
     try:
       self.client.delete_object(Bucket=request.bucket, Key=request.object)
-
     except Exception as e:
-      message = e.response['Error']['Message']
-      code = e.response['ResponseMetadata']['HTTPStatusCode']
-      raise messages.S3ClientError(message, code)
+      raise messages.S3ClientError(str(e), get_http_error_code(e))
 
   def delete_batch(self, request):
 
@@ -247,9 +258,7 @@ class Client(object):
     try:
       aws_response = self.client.delete_objects(**aws_request)
     except Exception as e:
-      message = e.response['Error']['Message']
-      code = int(e.response['ResponseMetadata']['HTTPStatusCode'])
-      raise messages.S3ClientError(message, code)
+      raise messages.S3ClientError(str(e), get_http_error_code(e))
 
     deleted = [obj['Key'] for obj in aws_response.get('Deleted', [])]
 
@@ -267,6 +276,4 @@ class Client(object):
       copy_src = {'Bucket': request.src_bucket, 'Key': request.src_key}
       self.client.copy(copy_src, request.dest_bucket, request.dest_key)
     except Exception as e:
-      message = e.response['Error']['Message']
-      code = e.response['ResponseMetadata']['HTTPStatusCode']
-      raise messages.S3ClientError(message, code)
+      raise messages.S3ClientError(str(e), get_http_error_code(e))
diff --git a/sdks/python/apache_beam/utils/retry.py 
b/sdks/python/apache_beam/utils/retry.py
index 16f2882..4784de1 100644
--- a/sdks/python/apache_beam/utils/retry.py
+++ b/sdks/python/apache_beam/utils/retry.py
@@ -124,7 +124,7 @@ def retry_on_server_errors_filter(exception):
   if (HttpError is not None) and isinstance(exception, HttpError):
     return exception.status_code >= 500
   if (S3ClientError is not None) and isinstance(exception, S3ClientError):
-    return exception.code >= 500
+    return exception.code is None or exception.code >= 500
   return not isinstance(exception, PermanentException)
 
 

Reply via email to