This is an automated email from the ASF dual-hosted git repository. stevel pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 3b7a7f447fdc HADOOP-18925. S3A: option to enable/disable CopyFromLocalOperation (#6163) (#6259) 3b7a7f447fdc is described below commit 3b7a7f447fdc9b86f23b41c539c72a5845344d80 Author: Steve Loughran <ste...@cloudera.com> AuthorDate: Thu Dec 7 14:41:44 2023 +0000 HADOOP-18925. S3A: option to enable/disable CopyFromLocalOperation (#6163) (#6259) Add a new option fs.s3a.optimized.copy.from.local.enabled This will enable (default) or disable the optimized CopyFromLocalOperation upload operation when copyFromLocalFile() is invoked. When false the superclass implementation is used; duration statistics are still collected, though audit span entries in logs will be for the individual fs operations, not the overall operation. Contributed by Steve Loughran --- .../java/org/apache/hadoop/fs/s3a/Constants.java | 12 +++ .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 96 +++++++++++++++------- .../tools/hadoop-aws/troubleshooting_s3a.md | 10 +++ .../hadoop/fs/s3a/ITestS3ACopyFromLocalFile.java | 51 ++++++++++++ 4 files changed, 140 insertions(+), 29 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index edd63b5f263c..32f108f79808 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1299,4 +1299,16 @@ public final class Constants { */ public static final int DEFAULT_PREFETCH_MAX_BLOCKS_COUNT = 4; + /** + * Is the higher performance copy from local file to S3 enabled? + * This switch allows for it to be disabled if there are problems. + * Value: {@value}. + */ + public static final String OPTIMIZED_COPY_FROM_LOCAL = "fs.s3a.optimized.copy.from.local.enabled"; + + /** + * Default value for {@link #OPTIMIZED_COPY_FROM_LOCAL}. + * Value: {@value}. + */ + public static final boolean OPTIMIZED_COPY_FROM_LOCAL_DEFAULT = true; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 6d9fd76d1346..ba2a7c92d6a4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -429,6 +429,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private String scheme = FS_S3A; + /** + * Flag to indicate that the higher performance copyFromLocalFile implementation + * should be used. + */ + private boolean optimizedCopyFromLocal; + /** Add any deprecated keys. */ @SuppressWarnings("deprecation") private static void addDeprecatedKeys() { @@ -654,6 +660,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, AWS_S3_VECTOR_ACTIVE_RANGE_READS, DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS, 1); vectoredIOContext = populateVectoredIOContext(conf); scheme = (this.uri != null && this.uri.getScheme() != null) ? this.uri.getScheme() : FS_S3A; + optimizedCopyFromLocal = conf.getBoolean(OPTIMIZED_COPY_FROM_LOCAL, + OPTIMIZED_COPY_FROM_LOCAL_DEFAULT); + LOG.debug("Using optimized copyFromLocal implementation: {}", optimizedCopyFromLocal); } catch (AmazonClientException e) { // amazon client exception: stop all services then throw the translation cleanupWithLogger(LOG, span); @@ -3825,9 +3834,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, * the given dst name. * * This version doesn't need to create a temporary file to calculate the md5. - * Sadly this doesn't seem to be used by the shell cp :( + * If {@link Constants#OPTIMIZED_COPY_FROM_LOCAL} is set to false, + * the superclass implementation is used. * - * delSrc indicates if the source should be removed * @param delSrc whether to delete the src * @param overwrite whether to overwrite an existing file * @param src path @@ -3842,28 +3851,53 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst) throws IOException { checkNotClosed(); - LOG.debug("Copying local file from {} to {}", src, dst); - trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, - () -> new CopyFromLocalOperation( - createStoreContext(), - src, - dst, - delSrc, - overwrite, - createCopyFromLocalCallbacks()).execute()); + LOG.debug("Copying local file from {} to {} (delSrc={} overwrite={}", + src, dst, delSrc, overwrite); + if (optimizedCopyFromLocal) { + trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () -> + new CopyFromLocalOperation( + createStoreContext(), + src, + dst, + delSrc, + overwrite, + createCopyFromLocalCallbacks(getActiveAuditSpan())) + .execute()); + } else { + // call the superclass, but still count statistics. + // there is no overall span here, as each FS API call will + // be in its own span. + LOG.debug("Using base copyFromLocalFile implementation"); + trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () -> { + super.copyFromLocalFile(delSrc, overwrite, src, dst); + return null; + }); + } } + /** + * Create the CopyFromLocalCallbacks; + * protected to assist in mocking. + * @param span audit span. + * @return the callbacks + * @throws IOException failure to get the local fs. + */ protected CopyFromLocalOperation.CopyFromLocalOperationCallbacks - createCopyFromLocalCallbacks() throws IOException { + createCopyFromLocalCallbacks(final AuditSpanS3A span) throws IOException { LocalFileSystem local = getLocal(getConf()); - return new CopyFromLocalCallbacksImpl(local); + return new CopyFromLocalCallbacksImpl(span, local); } protected final class CopyFromLocalCallbacksImpl implements CopyFromLocalOperation.CopyFromLocalOperationCallbacks { + + /** Span to use for all operations. */ + private final AuditSpanS3A span; private final LocalFileSystem local; - private CopyFromLocalCallbacksImpl(LocalFileSystem local) { + private CopyFromLocalCallbacksImpl(final AuditSpanS3A span, + LocalFileSystem local) { + this.span = span; this.local = local; } @@ -3885,21 +3919,21 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, @Override public void copyLocalFileFromTo(File file, Path from, Path to) throws IOException { - trackDurationAndSpan( - OBJECT_PUT_REQUESTS, - to, - () -> { - final String key = pathToKey(to); - final ObjectMetadata om = newObjectMetadata(file.length()); - Progressable progress = null; - PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, file); - S3AFileSystem.this.invoker.retry( - "putObject(" + "" + ")", to.toString(), - true, - () -> executePut(putObjectRequest, progress, putOptionsForPath(to))); - - return null; - }); + // the duration of the put is measured, but the active span is the + // constructor-supplied one -this ensures all audit log events are grouped correctly + span.activate(); + trackDuration(getDurationTrackerFactory(), OBJECT_PUT_REQUESTS.getSymbol(), () -> { + final String key = pathToKey(to); + final ObjectMetadata om = newObjectMetadata(file.length()); + Progressable progress = null; + PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, file); + S3AFileSystem.this.invoker.retry( + "putObject()", to.toString(), + true, + () -> executePut(putObjectRequest, progress, putOptionsForPath(to))); + + return null; + }); } @Override @@ -5149,6 +5183,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, case FS_S3A_CREATE_HEADER: return true; + // is the optimized copy from local enabled. + case OPTIMIZED_COPY_FROM_LOCAL: + return optimizedCopyFromLocal; + default: return super.hasPathCapability(p, cap); } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md index d3e7c7d806dc..d040c5b6e6cc 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md @@ -2058,3 +2058,13 @@ updated to implement `software.amazon.awssdk.core.signer.Signer`. This will be logged when `getObjectMetadata` is called. In SDK V2, this operation has changed to `headObject()` and will return a response of the type `HeadObjectResponse`. + +### <a name="debug-switches"></a> Debugging Switches + +There are some switches which can be set to enable/disable features and assist +in isolating problems and at least make them "go away". + + +| Key | Default | Action | +|------|---------|----------| +| `fs.s3a.optimized.copy.from.local.enabled` | `true` | [HADOOP-18925](https://issues.apache.org/jira/browse/HADOOP-18925) enable/disable CopyFromLocalOperation. Also a path capability. | diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACopyFromLocalFile.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACopyFromLocalFile.java index dfac771dd78d..f9600de6d20c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACopyFromLocalFile.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACopyFromLocalFile.java @@ -19,6 +19,8 @@ package org.apache.hadoop.fs.s3a; import java.io.File; +import java.util.Arrays; +import java.util.Collection; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.AbstractContractCopyFromLocalTest; @@ -26,18 +28,67 @@ import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.contract.s3a.S3AContract; import org.apache.hadoop.fs.Path; + +import org.assertj.core.api.Assertions; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import static org.apache.hadoop.fs.s3a.Constants.OPTIMIZED_COPY_FROM_LOCAL; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +/** + * Test copying files from the local filesystem to S3A. + * Parameterized on whether or not the optimized + * copyFromLocalFile is enabled. + */ +@RunWith(Parameterized.class) public class ITestS3ACopyFromLocalFile extends AbstractContractCopyFromLocalTest { + /** + * Parameterization. + */ + @Parameterized.Parameters(name = "enabled={0}") + public static Collection<Object[]> params() { + return Arrays.asList(new Object[][]{ + {true}, + {false}, + }); + } + private final boolean enabled; + + public ITestS3ACopyFromLocalFile(final boolean enabled) { + this.enabled = enabled; + } + + @Override + protected Configuration createConfiguration() { + final Configuration conf = super.createConfiguration(); + + removeBaseAndBucketOverrides(getTestBucketName(conf), conf, + OPTIMIZED_COPY_FROM_LOCAL); + conf.setBoolean(OPTIMIZED_COPY_FROM_LOCAL, enabled); + disableFilesystemCaching(conf); + return conf; + } @Override protected AbstractFSContract createContract(Configuration conf) { return new S3AContract(conf); } + @Test + public void testOptionPropagation() throws Throwable { + Assertions.assertThat(getFileSystem().hasPathCapability(new Path("/"), + OPTIMIZED_COPY_FROM_LOCAL)) + .describedAs("path capability of %s", OPTIMIZED_COPY_FROM_LOCAL) + .isEqualTo(enabled); + + } + @Test public void testLocalFilesOnly() throws Throwable { describe("Copying into other file systems must fail"); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org