This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 120620c1b7f HADOOP-18888. S3A. createS3AsyncClient() always enables 
multipart uploads (#6056)
120620c1b7f is described below

commit 120620c1b7f0a11e140a7d80d2bf7be4c5c83c1b
Author: Steve Loughran <ste...@cloudera.com>
AuthorDate: Fri Sep 15 15:45:09 2023 +0100

    HADOOP-18888. S3A. createS3AsyncClient() always enables multipart uploads 
(#6056)
    
    
    * The multipart flag fs.s3a.multipart.uploads.enabled is passed to the 
async client created
    * s3A connector bypasses the transfer manager entirely if disabled or for 
small files.
    
    Contributed by Steve Loughran
---
 .../hadoop/fs/s3a/DefaultS3ClientFactory.java      |  2 +-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    | 92 +++++++++++++++-------
 .../org/apache/hadoop/fs/s3a/S3AInternals.java     |  6 ++
 .../org/apache/hadoop/fs/s3a/S3ClientFactory.java  | 24 ++++++
 .../fs/s3a/scale/AbstractSTestS3AHugeFiles.java    |  4 +-
 .../fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java | 53 +++++--------
 6 files changed, 117 insertions(+), 64 deletions(-)

diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
index 98c72d27662..c85263f1903 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
@@ -112,7 +112,7 @@ public class DefaultS3ClientFactory extends Configured
     return configureClientBuilder(S3AsyncClient.builder(), parameters, conf, 
bucket)
         .httpClientBuilder(httpClientBuilder)
         .multipartConfiguration(multipartConfiguration)
-        .multipartEnabled(true)
+        .multipartEnabled(parameters.isMultipartCopy())
         .build();
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index e192135b9f3..9307c4c2650 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -440,6 +440,12 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
    */
   private boolean isMultipartUploadEnabled = DEFAULT_MULTIPART_UPLOAD_ENABLED;
 
+  /**
+   * Should file copy operations use the S3 transfer manager?
+   * True unless multipart upload is disabled.
+   */
+  private boolean isMultipartCopyEnabled;
+
   /**
    * A cache of files that should be deleted when the FileSystem is closed
    * or the JVM is exited.
@@ -576,6 +582,9 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
           intOption(conf, PREFETCH_BLOCK_COUNT_KEY, 
PREFETCH_BLOCK_DEFAULT_COUNT, 1);
       this.isMultipartUploadEnabled = 
conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
           DEFAULT_MULTIPART_UPLOAD_ENABLED);
+      // multipart copy and upload are the same; this just makes it explicit
+      this.isMultipartCopyEnabled = isMultipartUploadEnabled;
+
       initThreadPools(conf);
 
       int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
@@ -982,6 +991,7 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
         .withRequesterPays(conf.getBoolean(ALLOW_REQUESTER_PAYS, 
DEFAULT_ALLOW_REQUESTER_PAYS))
         .withExecutionInterceptors(auditManager.createExecutionInterceptors())
         .withMinimumPartSize(partSize)
+        .withMultipartCopyEnabled(isMultipartCopyEnabled)
         .withMultipartThreshold(multiPartThreshold)
         .withTransferManagerExecutor(unboundedThreadPool)
         .withRegion(region);
@@ -1468,6 +1478,11 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
       LOG.debug("Sharing credentials for: {}", purpose);
       return credentials.share();
     }
+
+    @Override
+    public boolean isMultipartCopyEnabled() {
+      return S3AFileSystem.this.isMultipartUploadEnabled;
+    }
   }
 
   /**
@@ -4436,37 +4451,56 @@ public class S3AFileSystem extends FileSystem 
implements StreamCapabilities,
           e);
     }
 
-    return readInvoker.retry(
-        action, srcKey,
-        true,
-        () -> {
-          CopyObjectRequest.Builder copyObjectRequestBuilder =
-              getRequestFactory().newCopyObjectRequestBuilder(srcKey, dstKey, 
srcom);
-          changeTracker.maybeApplyConstraint(copyObjectRequestBuilder);
-          incrementStatistic(OBJECT_COPY_REQUESTS);
-
-          Copy copy = transferManager.copy(
-              CopyRequest.builder()
-                  .copyObjectRequest(copyObjectRequestBuilder.build())
-                  .build());
+    CopyObjectRequest.Builder copyObjectRequestBuilder =
+        getRequestFactory().newCopyObjectRequestBuilder(srcKey, dstKey, srcom);
+    changeTracker.maybeApplyConstraint(copyObjectRequestBuilder);
+    CopyObjectResponse response;
 
-          try {
-            CompletedCopy completedCopy = copy.completionFuture().join();
-            CopyObjectResponse result = completedCopy.response();
-            changeTracker.processResponse(result);
-            incrementWriteOperations();
-            instrumentation.filesCopied(1, size);
-            return result;
-          } catch (CompletionException e) {
-            Throwable cause = e.getCause();
-            if (cause instanceof SdkException) {
-              SdkException awsException = (SdkException)cause;
-              changeTracker.processException(awsException, "copy");
-              throw awsException;
+    // transfer manager is skipped if disabled or the file is too small to 
worry about
+    final boolean useTransferManager = isMultipartCopyEnabled && size >= 
multiPartThreshold;
+    if (useTransferManager) {
+      // use transfer manager
+      response = readInvoker.retry(
+          action, srcKey,
+          true,
+          () -> {
+            incrementStatistic(OBJECT_COPY_REQUESTS);
+
+            Copy copy = transferManager.copy(
+                CopyRequest.builder()
+                    .copyObjectRequest(copyObjectRequestBuilder.build())
+                    .build());
+
+            try {
+              CompletedCopy completedCopy = copy.completionFuture().join();
+              return completedCopy.response();
+            } catch (CompletionException e) {
+              Throwable cause = e.getCause();
+              if (cause instanceof SdkException) {
+                SdkException awsException = (SdkException)cause;
+                changeTracker.processException(awsException, "copy");
+                throw awsException;
+              }
+              throw extractException(action, srcKey, e);
             }
-            throw extractException(action, srcKey, e);
-          }
-        });
+          });
+    } else {
+      // single part copy bypasses transfer manager
+      // note, this helps with some mock testing, e.g. HBoss. as there is less 
to mock.
+      response = readInvoker.retry(
+          action, srcKey,
+          true,
+          () -> {
+            LOG.debug("copyFile: single part copy {} -> {} of size {}", 
srcKey, dstKey, size);
+            incrementStatistic(OBJECT_COPY_REQUESTS);
+            return s3Client.copyObject(copyObjectRequestBuilder.build());
+          });
+    }
+
+    changeTracker.processResponse(response);
+    incrementWriteOperations();
+    instrumentation.filesCopied(1, size);
+    return response;
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java
index 23c4d350120..18d6c1af586 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java
@@ -115,4 +115,10 @@ public interface S3AInternals {
   @AuditEntryPoint
   @Retries.RetryTranslated
   HeadBucketResponse getBucketMetadata() throws IOException;
+
+  /**
+   * Is multipart copy enabled?
+   * @return true if the transfer manager is used to copy files.
+   */
+  boolean isMultipartCopyEnabled();
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
index d4504cd08d7..e2e792ebfb6 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
@@ -156,6 +156,11 @@ public interface S3ClientFactory {
      */
     private long multiPartThreshold;
 
+    /**
+     * Multipart upload enabled.
+     */
+    private boolean multipartCopy = true;
+
     /**
      * Executor that the transfer manager will use to execute background tasks.
      */
@@ -399,5 +404,24 @@ public interface S3ClientFactory {
     public Region getRegion() {
       return region;
     }
+
+    /**
+     * Set the multipart flag..
+     *
+     * @param value new value
+     * @return the builder
+     */
+    public S3ClientCreationParameters withMultipartCopyEnabled(final boolean 
value) {
+      this.multipartCopy = value;
+      return this;
+    }
+
+    /**
+     * Get the multipart flag.
+     * @return multipart flag
+     */
+    public boolean isMultipartCopy() {
+      return multipartCopy;
+    }
   }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
index 1a30c04358b..75c6efbe2ab 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
@@ -355,10 +355,10 @@ public abstract class AbstractSTestS3AHugeFiles extends 
S3AScaleTestBase {
   /**
    * Is this expected to be a multipart upload?
    * Assertions will change if not.
-   * @return true by default.
+   * @return what the filesystem expects.
    */
   protected boolean expectMultipartUpload() {
-    return true;
+    return getFileSystem().getS3AInternals().isMultipartCopyEnabled();
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java
index ed300dba01e..e154ab5676f 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java
@@ -18,13 +18,10 @@
 
 package org.apache.hadoop.fs.s3a.scale;
 
+import org.assertj.core.api.Assertions;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.Constants;
-import org.apache.hadoop.fs.s3a.S3AFileSystem;
-import org.apache.hadoop.fs.s3a.S3ATestUtils;
-import org.apache.hadoop.fs.s3a.api.UnsupportedRequestException;
 
 import static 
org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
 import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
@@ -33,7 +30,6 @@ import static 
org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
 import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
 import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
-import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
  * Use a single PUT for the whole upload/rename/delete workflow; include 
verification
@@ -41,11 +37,6 @@ import static 
org.apache.hadoop.test.LambdaTestUtils.intercept;
  */
 public class ITestS3AHugeFilesNoMultipart extends AbstractSTestS3AHugeFiles {
 
-  /**
-   * Size to ensure MPUs don't happen in transfer manager.
-   */
-  public static final String S_1T = "1T";
-
   public static final String SINGLE_PUT_REQUEST_TIMEOUT = "1h";
 
   /**
@@ -56,11 +47,23 @@ public class ITestS3AHugeFilesNoMultipart extends 
AbstractSTestS3AHugeFiles {
     return Constants.FAST_UPLOAD_BUFFER_DISK;
   }
 
+  /**
+   * Multipart upload is always disabled.
+   * @return false
+   */
   @Override
   protected boolean expectMultipartUpload() {
     return false;
   }
 
+  /**
+   * Is multipart copy enabled?
+   * @return true if the transfer manager is used to copy files.
+   */
+  private boolean isMultipartCopyEnabled() {
+    return getFileSystem().getS3AInternals().isMultipartCopyEnabled();
+  }
+
   /**
    * Create a configuration without multipart upload,
    * and a long request timeout to allow for a very slow
@@ -77,35 +80,21 @@ public class ITestS3AHugeFilesNoMultipart extends 
AbstractSTestS3AHugeFiles {
         MULTIPART_SIZE,
         REQUEST_TIMEOUT);
     conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360);
-    conf.set(MIN_MULTIPART_THRESHOLD, S_1T);
-    conf.set(MULTIPART_SIZE, S_1T);
+    conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
+    conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
     conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
     conf.set(REQUEST_TIMEOUT, SINGLE_PUT_REQUEST_TIMEOUT);
     return conf;
   }
 
   /**
-   * After the file is created, attempt a rename with an FS
-   * instance with a small multipart threshold;
-   * this MUST be rejected.
+   * Verify multipart copy is disabled.
    */
   @Override
   public void test_030_postCreationAssertions() throws Throwable {
-    assumeHugeFileExists();
-    final Path hugefile = getHugefile();
-    final Path hugefileRenamed = getHugefileRenamed();
-    describe("renaming %s to %s", hugefile, hugefileRenamed);
-    S3AFileSystem fs = getFileSystem();
-    fs.delete(hugefileRenamed, false);
-    // create a new fs with a small multipart threshold; expect rename failure.
-    final Configuration conf = new Configuration(fs.getConf());
-    conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
-    conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
-    S3ATestUtils.disableFilesystemCaching(conf);
-
-    try (FileSystem fs2 = FileSystem.get(fs.getUri(), conf)) {
-      intercept(UnsupportedRequestException.class, () ->
-          fs2.rename(hugefile, hugefileRenamed));
-    }
+    super.test_030_postCreationAssertions();
+    Assertions.assertThat(isMultipartCopyEnabled())
+        .describedAs("Multipart copy should be disabled in %s", 
getFileSystem())
+        .isFalse();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to