This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 3b7a7f447fdc HADOOP-18925. S3A: option to enable/disable 
CopyFromLocalOperation (#6163) (#6259)
3b7a7f447fdc is described below

commit 3b7a7f447fdc9b86f23b41c539c72a5845344d80
Author: Steve Loughran <ste...@cloudera.com>
AuthorDate: Thu Dec 7 14:41:44 2023 +0000

    HADOOP-18925. S3A: option to enable/disable CopyFromLocalOperation (#6163) 
(#6259)
    
    
    Add a new option fs.s3a.optimized.copy.from.local.enabled
    
    This will enable (default) or disable the
    optimized CopyFromLocalOperation upload operation
    when copyFromLocalFile() is invoked.
    
    When false the superclass implementation is used; duration
    statistics are still collected, though audit span entries
    in logs will be for the individual fs operations, not the
    overall operation.
    
    Contributed by Steve Loughran
---
 .../java/org/apache/hadoop/fs/s3a/Constants.java   | 12 +++
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    | 96 +++++++++++++++-------
 .../tools/hadoop-aws/troubleshooting_s3a.md        | 10 +++
 .../hadoop/fs/s3a/ITestS3ACopyFromLocalFile.java   | 51 ++++++++++++
 4 files changed, 140 insertions(+), 29 deletions(-)

diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index edd63b5f263c..32f108f79808 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -1299,4 +1299,16 @@ public final class Constants {
    */
   public static final int DEFAULT_PREFETCH_MAX_BLOCKS_COUNT = 4;
 
+  /**
+   * Is the higher performance copy from local file to S3 enabled?
+   * This switch allows for it to be disabled if there are problems.
+   * Value: {@value}.
+   */
+  public static final String OPTIMIZED_COPY_FROM_LOCAL = 
"fs.s3a.optimized.copy.from.local.enabled";
+
+  /**
+   * Default value for {@link #OPTIMIZED_COPY_FROM_LOCAL}.
+   * Value: {@value}.
+   */
+  public static final boolean OPTIMIZED_COPY_FROM_LOCAL_DEFAULT = true;
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 6d9fd76d1346..ba2a7c92d6a4 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -429,6 +429,12 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
    */
   private String scheme = FS_S3A;
 
+  /**
+   * Flag to indicate that the higher performance copyFromLocalFile 
implementation
+   * should be used.
+   */
+  private boolean optimizedCopyFromLocal;
+
   /** Add any deprecated keys. */
   @SuppressWarnings("deprecation")
   private static void addDeprecatedKeys() {
@@ -654,6 +660,9 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
               AWS_S3_VECTOR_ACTIVE_RANGE_READS, 
DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS, 1);
       vectoredIOContext = populateVectoredIOContext(conf);
       scheme = (this.uri != null && this.uri.getScheme() != null) ? 
this.uri.getScheme() : FS_S3A;
+      optimizedCopyFromLocal = conf.getBoolean(OPTIMIZED_COPY_FROM_LOCAL,
+          OPTIMIZED_COPY_FROM_LOCAL_DEFAULT);
+      LOG.debug("Using optimized copyFromLocal implementation: {}", 
optimizedCopyFromLocal);
     } catch (AmazonClientException e) {
       // amazon client exception: stop all services then throw the translation
       cleanupWithLogger(LOG, span);
@@ -3825,9 +3834,9 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
    * the given dst name.
    *
    * This version doesn't need to create a temporary file to calculate the md5.
-   * Sadly this doesn't seem to be used by the shell cp :(
+   * If {@link Constants#OPTIMIZED_COPY_FROM_LOCAL} is set to false,
+   * the superclass implementation is used.
    *
-   * delSrc indicates if the source should be removed
    * @param delSrc whether to delete the src
    * @param overwrite whether to overwrite an existing file
    * @param src path
@@ -3842,28 +3851,53 @@ public class S3AFileSystem extends FileSystem 
implements StreamCapabilities,
   public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
                                 Path dst) throws IOException {
     checkNotClosed();
-    LOG.debug("Copying local file from {} to {}", src, dst);
-    trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst,
-        () -> new CopyFromLocalOperation(
-            createStoreContext(),
-            src,
-            dst,
-            delSrc,
-            overwrite,
-            createCopyFromLocalCallbacks()).execute());
+    LOG.debug("Copying local file from {} to {} (delSrc={} overwrite={}",
+        src, dst, delSrc, overwrite);
+    if (optimizedCopyFromLocal) {
+      trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () ->
+          new CopyFromLocalOperation(
+              createStoreContext(),
+              src,
+              dst,
+              delSrc,
+              overwrite,
+              createCopyFromLocalCallbacks(getActiveAuditSpan()))
+              .execute());
+    } else {
+      // call the superclass, but still count statistics.
+      // there is no overall span here, as each FS API call will
+      // be in its own span.
+      LOG.debug("Using base copyFromLocalFile implementation");
+      trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () -> {
+        super.copyFromLocalFile(delSrc, overwrite, src, dst);
+        return null;
+      });
+    }
   }
 
+  /**
+   * Create the CopyFromLocalCallbacks;
+   * protected to assist in mocking.
+   * @param span audit span.
+   * @return the callbacks
+   * @throws IOException failure to get the local fs.
+   */
   protected CopyFromLocalOperation.CopyFromLocalOperationCallbacks
-      createCopyFromLocalCallbacks() throws IOException {
+      createCopyFromLocalCallbacks(final AuditSpanS3A span) throws IOException 
{
     LocalFileSystem local = getLocal(getConf());
-    return new CopyFromLocalCallbacksImpl(local);
+    return new CopyFromLocalCallbacksImpl(span, local);
   }
 
   protected final class CopyFromLocalCallbacksImpl implements
       CopyFromLocalOperation.CopyFromLocalOperationCallbacks {
+
+    /** Span to use for all operations. */
+    private final AuditSpanS3A span;
     private final LocalFileSystem local;
 
-    private CopyFromLocalCallbacksImpl(LocalFileSystem local) {
+    private CopyFromLocalCallbacksImpl(final AuditSpanS3A span,
+        LocalFileSystem local) {
+      this.span = span;
       this.local = local;
     }
 
@@ -3885,21 +3919,21 @@ public class S3AFileSystem extends FileSystem 
implements StreamCapabilities,
 
     @Override
     public void copyLocalFileFromTo(File file, Path from, Path to) throws 
IOException {
-      trackDurationAndSpan(
-          OBJECT_PUT_REQUESTS,
-          to,
-          () -> {
-            final String key = pathToKey(to);
-            final ObjectMetadata om = newObjectMetadata(file.length());
-            Progressable progress = null;
-            PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, 
file);
-            S3AFileSystem.this.invoker.retry(
-                "putObject(" + "" + ")", to.toString(),
-                true,
-                () -> executePut(putObjectRequest, progress, 
putOptionsForPath(to)));
-
-            return null;
-          });
+      // the duration of the put is measured, but the active span is the
+      // constructor-supplied one -this ensures all audit log events are 
grouped correctly
+      span.activate();
+      trackDuration(getDurationTrackerFactory(), 
OBJECT_PUT_REQUESTS.getSymbol(), () -> {
+        final String key = pathToKey(to);
+        final ObjectMetadata om = newObjectMetadata(file.length());
+        Progressable progress = null;
+        PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, file);
+        S3AFileSystem.this.invoker.retry(
+            "putObject()", to.toString(),
+            true,
+            () -> executePut(putObjectRequest, progress, 
putOptionsForPath(to)));
+
+        return null;
+      });
     }
 
     @Override
@@ -5149,6 +5183,10 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
     case FS_S3A_CREATE_HEADER:
       return true;
 
+      // is the optimized copy from local enabled.
+    case OPTIMIZED_COPY_FROM_LOCAL:
+      return optimizedCopyFromLocal;
+
     default:
       return super.hasPathCapability(p, cap);
     }
diff --git 
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
 
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
index d3e7c7d806dc..d040c5b6e6cc 100644
--- 
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
+++ 
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
@@ -2058,3 +2058,13 @@ updated to implement 
`software.amazon.awssdk.core.signer.Signer`.
 This will be logged when `getObjectMetadata` is called. In SDK V2, this 
operation has changed to
 `headObject()` and will return a response of the type `HeadObjectResponse`.
 
+
+### <a name="debug-switches"></a> Debugging Switches
+
+There are some switches which can be set to enable/disable features and assist
+in isolating problems and at least make them "go away".
+
+
+| Key  | Default | Action   |
+|------|---------|----------|
+| `fs.s3a.optimized.copy.from.local.enabled` | `true`  | 
[HADOOP-18925](https://issues.apache.org/jira/browse/HADOOP-18925) 
enable/disable CopyFromLocalOperation. Also a path capability. |
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACopyFromLocalFile.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACopyFromLocalFile.java
index dfac771dd78d..f9600de6d20c 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACopyFromLocalFile.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACopyFromLocalFile.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.fs.s3a;
 
 import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.AbstractContractCopyFromLocalTest;
@@ -26,18 +28,67 @@ import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.apache.hadoop.fs.contract.s3a.S3AContract;
 
 import org.apache.hadoop.fs.Path;
+
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import static org.apache.hadoop.fs.s3a.Constants.OPTIMIZED_COPY_FROM_LOCAL;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
+/**
+ * Test copying files from the local filesystem to S3A.
+ * Parameterized on whether or not the optimized
+ * copyFromLocalFile is enabled.
+ */
+@RunWith(Parameterized.class)
 public class ITestS3ACopyFromLocalFile extends
         AbstractContractCopyFromLocalTest {
+  /**
+   * Parameterization.
+   */
+  @Parameterized.Parameters(name = "enabled={0}")
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{
+        {true},
+        {false},
+    });
+  }
+  private final boolean enabled;
+
+  public ITestS3ACopyFromLocalFile(final boolean enabled) {
+    this.enabled = enabled;
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    final Configuration conf = super.createConfiguration();
+
+    removeBaseAndBucketOverrides(getTestBucketName(conf), conf,
+        OPTIMIZED_COPY_FROM_LOCAL);
+    conf.setBoolean(OPTIMIZED_COPY_FROM_LOCAL, enabled);
+    disableFilesystemCaching(conf);
+    return conf;
+  }
 
   @Override
   protected AbstractFSContract createContract(Configuration conf) {
     return new S3AContract(conf);
   }
 
+  @Test
+  public void testOptionPropagation() throws Throwable {
+    Assertions.assertThat(getFileSystem().hasPathCapability(new Path("/"),
+        OPTIMIZED_COPY_FROM_LOCAL))
+        .describedAs("path capability of %s", OPTIMIZED_COPY_FROM_LOCAL)
+        .isEqualTo(enabled);
+
+  }
+
   @Test
   public void testLocalFilesOnly() throws Throwable {
     describe("Copying into other file systems must fail");


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to