This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch branch-3.4.0
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.4.0 by this push:
     new fd0d0c90d964 HADOOP-19044. S3A: AWS SDK V2 - Update region logic 
(#6479)
fd0d0c90d964 is described below

commit fd0d0c90d9646becdc4ca6e8098f97b89250f7ee
Author: Viraj Jasani <vjas...@apache.org>
AuthorDate: Sat Feb 3 05:44:23 2024 -0900

    HADOOP-19044. S3A: AWS SDK V2 - Update region logic (#6479)
    
    
    Improves region handling in the S3A connector, including enabling 
cross-region support
    when that is considered necessary.
    
    Consult the documentation in connecting.md/connecting.html for the current
    resolution process.
    
    Contributed by Viraj Jasani
---
 .../hadoop/fs/s3a/DefaultS3ClientFactory.java      |  53 +++++--
 .../tools/hadoop-aws/aws_sdk_v2_changelog.md       |   4 +-
 .../site/markdown/tools/hadoop-aws/connecting.md   |  36 +++++
 .../src/site/markdown/tools/hadoop-aws/index.md    |   4 +-
 .../hadoop/fs/s3a/ITestS3AEndpointRegion.java      | 163 ++++++++++++++++++++-
 .../hadoop/fs/s3a/test/PublicDatasetTestUtils.java |   7 +
 6 files changed, 251 insertions(+), 16 deletions(-)

diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
index 0a3267a9fe51..284ba8e6ae5c 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
@@ -267,9 +267,10 @@ public class DefaultS3ClientFactory extends Configured
    */
   private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> 
void configureEndpointAndRegion(
       BuilderT builder, S3ClientCreationParameters parameters, Configuration 
conf) {
-    URI endpoint = getS3Endpoint(parameters.getEndpoint(), conf);
+    final String endpointStr = parameters.getEndpoint();
+    final URI endpoint = getS3Endpoint(endpointStr, conf);
 
-    String configuredRegion = parameters.getRegion();
+    final String configuredRegion = parameters.getRegion();
     Region region = null;
     String origin = "";
 
@@ -291,15 +292,33 @@ public class DefaultS3ClientFactory extends Configured
     if (endpoint != null) {
       checkArgument(!fipsEnabled,
           "%s : %s", ERROR_ENDPOINT_WITH_FIPS, endpoint);
-      builder.endpointOverride(endpoint);
-      // No region was configured, try to determine it from the endpoint.
+      boolean endpointEndsWithCentral =
+          endpointStr.endsWith(CENTRAL_ENDPOINT);
+
+      // No region was configured,
+      // determine the region from the endpoint.
       if (region == null) {
-        region = getS3RegionFromEndpoint(parameters.getEndpoint());
+        region = getS3RegionFromEndpoint(endpointStr,
+            endpointEndsWithCentral);
         if (region != null) {
           origin = "endpoint";
         }
       }
-      LOG.debug("Setting endpoint to {}", endpoint);
+
+      // No need to override endpoint with "s3.amazonaws.com".
+      // Let the client take care of endpoint resolution. Overriding
+      // the endpoint with "s3.amazonaws.com" causes 400 Bad Request
+      // errors for non-existent buckets and objects.
+      // ref: https://github.com/aws/aws-sdk-java-v2/issues/4846
+      if (!endpointEndsWithCentral) {
+        builder.endpointOverride(endpoint);
+        LOG.debug("Setting endpoint to {}", endpoint);
+      } else {
+        builder.crossRegionAccessEnabled(true);
+        origin = "central endpoint with cross region access";
+        LOG.debug("Enabling cross region access for endpoint {}",
+            endpointStr);
+      }
     }
 
     if (region != null) {
@@ -354,20 +373,32 @@ public class DefaultS3ClientFactory extends Configured
 
   /**
    * Parses the endpoint to get the region.
-   * If endpoint is the central one, use US_EAST_1.
+   * If endpoint is the central one, use US_EAST_2.
    *
    * @param endpoint the configure endpoint.
+   * @param endpointEndsWithCentral true if the endpoint is configured as 
central.
    * @return the S3 region, null if unable to resolve from endpoint.
    */
-  private static Region getS3RegionFromEndpoint(String endpoint) {
+  private static Region getS3RegionFromEndpoint(final String endpoint,
+      final boolean endpointEndsWithCentral) {
 
-    if(!endpoint.endsWith(CENTRAL_ENDPOINT)) {
+    if (!endpointEndsWithCentral) {
       LOG.debug("Endpoint {} is not the default; parsing", endpoint);
       return AwsHostNameUtils.parseSigningRegion(endpoint, 
S3_SERVICE_NAME).orElse(null);
     }
 
-    // endpoint is for US_EAST_1;
-    return Region.US_EAST_1;
+    // Select default region here to enable cross-region access.
+    // If both "fs.s3a.endpoint" and "fs.s3a.endpoint.region" are empty,
+    // Spark sets "fs.s3a.endpoint" to "s3.amazonaws.com".
+    // This applies to Spark versions with the changes of SPARK-35878.
+    // ref:
+    // https://github.com/apache/spark/blob/v3.5.0/core/
+    // src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L528
+    // If we do not allow cross region access, Spark would not be able to
+    // access any bucket that is not present in the given region.
+    // Hence, we should use default region us-east-2 to allow cross-region
+    // access.
+    return Region.of(AWS_S3_DEFAULT_REGION);
   }
 
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_v2_changelog.md
 
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_v2_changelog.md
index 162f15951f5c..de3808c54dcb 100644
--- 
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_v2_changelog.md
+++ 
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_v2_changelog.md
@@ -83,8 +83,8 @@ The table below lists the configurations S3A was using and 
what they now map to.
 
 Previously, if no endpoint and region was configured, fall back to using 
us-east-1. Set
 withForceGlobalBucketAccessEnabled(true) which will allow access to buckets 
not in this region too.
-Since the SDK V2 no longer supports cross region access, we need to set the 
region and endpoint of
-the bucket. The behaviour has now been changed to:
+Since the SDK V2 no longer supports cross region access, we need to set the 
region and
+endpoint of the bucket. The behaviour has now been changed to:
 
 * If no endpoint is specified, use s3.amazonaws.com.
 * When setting the endpoint, also set the protocol (HTTP or HTTPS)
diff --git 
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md 
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md
index a31b1c3e39a0..51e70ef231bf 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md
@@ -100,6 +100,42 @@ With the move to the AWS V2 SDK, there is more emphasis on 
the region, set by th
 
 Normally, declaring the region in `fs.s3a.endpoint.region` should be 
sufficient to set up the network connection to correctly connect to an 
AWS-hosted S3 store.
 
+### <a name="s3_endpoint_region_details"></a> S3 endpoint and region settings 
in detail
+
+* Configs `fs.s3a.endpoint` and `fs.s3a.endpoint.region` are used to set values
+  for S3 endpoint and region respectively.
+* If `fs.s3a.endpoint.region` is configured with valid AWS region value, S3A 
will
+  configure the S3 client to use this value. If this is set to a region that 
does
+  not match your bucket, you will receive a 301 redirect response.
+* If `fs.s3a.endpoint.region` is not set and `fs.s3a.endpoint` is set with 
valid
+  endpoint value, S3A will attempt to parse the region from the endpoint and
+  configure S3 client to use the region value.
+* If both `fs.s3a.endpoint` and `fs.s3a.endpoint.region` are not set, S3A will
+  use `us-east-2` as default region and enable cross region access. In this 
case,
+  S3A does not attempt to override the endpoint while configuring the S3 
client.
+* If `fs.s3a.endpoint` is not set and `fs.s3a.endpoint.region` is set to an 
empty
+  string, S3A will configure S3 client without any region or endpoint override.
+  This will allow fallback to S3 SDK region resolution chain. More details
+  
[here](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
+* If `fs.s3a.endpoint` is set to central endpoint `s3.amazonaws.com` and
+  `fs.s3a.endpoint.region` is not set, S3A will use `us-east-2` as default 
region
+  and enable cross region access. In this case, S3A does not attempt to 
override
+  the endpoint while configuring the S3 client.
+* If `fs.s3a.endpoint` is set to central endpoint `s3.amazonaws.com` and
+  `fs.s3a.endpoint.region` is also set to some region, S3A will use that region
+  value and enable cross region access. In this case, S3A does not attempt to
+  override the endpoint while configuring the S3 client.
+
+When the cross region access is enabled while configuring the S3 client, even 
if the
+region set is incorrect, S3 SDK determines the region. This is done by making 
the
+request, and if the SDK receives 301 redirect response, it determines the 
region at
+the cost of a HEAD request, and caches it.
+
+Please note that some endpoint and region settings that require cross region 
access
+are complex and improving over time. Hence, they may be considered unstable.
+
+If you are working with third party stores, please check [third party stores 
in detail](third_party_stores.html).
+
 ### <a name="timeouts"></a> Network timeouts
 
 See [Timeouts](performance.html#timeouts).
diff --git 
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md 
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index 0f09c7f87315..cdc0134e60a8 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -226,7 +226,9 @@ If you do any of these: change your credentials immediately!
 
 ## Connecting to Amazon S3 or a third-party store
 
-See [Connecting to an Amazon S3 Bucket through the S3A 
Connector](connecting.md).
+See [Connecting to an Amazon S3 Bucket through the S3A 
Connector](connecting.html).
+
+Also, please check [S3 endpoint and region settings in 
detail](connecting.html#s3_endpoint_region_details).
 
 ## <a name="authenticating"></a> Authenticating with S3
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java
index 5e6991128b20..95f31d7527f8 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java
@@ -38,13 +38,20 @@ import 
software.amazon.awssdk.services.s3.model.HeadBucketRequest;
 import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
+import org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils;
 
+import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS;
 import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
 import static org.apache.hadoop.fs.s3a.Constants.CENTRAL_ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
 import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS;
 import static 
org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.ERROR_ENDPOINT_WITH_FIPS;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static 
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.DEFAULT_REQUESTER_PAYS_BUCKET_NAME;
 import static org.apache.hadoop.io.IOUtils.closeStream;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
@@ -146,11 +153,28 @@ public class ITestS3AEndpointRegion extends 
AbstractS3ATestBase {
     describe("Create a client with the central endpoint");
     Configuration conf = getConfiguration();
 
-    S3Client client = createS3Client(conf, CENTRAL_ENDPOINT, null, US_EAST_1, 
false);
+    S3Client client = createS3Client(conf, CENTRAL_ENDPOINT, null, US_EAST_2, 
false);
 
     expectInterceptorException(client);
   }
 
+  @Test
+  public void testCentralEndpointWithRegion() throws Throwable {
+    describe("Create a client with the central endpoint but also specify 
region");
+    Configuration conf = getConfiguration();
+
+    S3Client client = createS3Client(conf, CENTRAL_ENDPOINT, US_WEST_2,
+        US_WEST_2, false);
+
+    expectInterceptorException(client);
+
+    client = createS3Client(conf, CENTRAL_ENDPOINT, US_EAST_1,
+        US_EAST_1, false);
+
+    expectInterceptorException(client);
+
+  }
+
   @Test
   public void testWithRegionConfig() throws Throwable {
     describe("Create a client with a configured region");
@@ -257,6 +281,141 @@ public class ITestS3AEndpointRegion extends 
AbstractS3ATestBase {
     expectInterceptorException(client);
   }
 
+  @Test
+  public void testCentralEndpointAndDifferentRegionThanBucket() throws 
Throwable {
+    describe("Access public bucket using central endpoint and region "
+        + "different than that of the public bucket");
+    final Configuration conf = getConfiguration();
+    final Configuration newConf = new Configuration(conf);
+
+    removeBaseAndBucketOverrides(
+        newConf,
+        ENDPOINT,
+        AWS_REGION,
+        ALLOW_REQUESTER_PAYS,
+        KEY_REQUESTER_PAYS_FILE);
+
+    removeBaseAndBucketOverrides(
+        DEFAULT_REQUESTER_PAYS_BUCKET_NAME,
+        newConf,
+        ENDPOINT,
+        AWS_REGION,
+        ALLOW_REQUESTER_PAYS,
+        KEY_REQUESTER_PAYS_FILE);
+
+    newConf.set(ENDPOINT, CENTRAL_ENDPOINT);
+    newConf.set(AWS_REGION, EU_WEST_1);
+    newConf.setBoolean(ALLOW_REQUESTER_PAYS, true);
+
+    Path filePath = new Path(PublicDatasetTestUtils
+        .getRequesterPaysObject(newConf));
+    newFS = (S3AFileSystem) filePath.getFileSystem(newConf);
+
+    Assertions
+        .assertThat(newFS.exists(filePath))
+        .describedAs("Existence of path: " + filePath)
+        .isTrue();
+  }
+
+  @Test
+  public void testCentralEndpointAndSameRegionAsBucket() throws Throwable {
+    describe("Access public bucket using central endpoint and region "
+        + "same as that of the public bucket");
+    final Configuration conf = getConfiguration();
+    final Configuration newConf = new Configuration(conf);
+
+    removeBaseAndBucketOverrides(
+        newConf,
+        ENDPOINT,
+        AWS_REGION,
+        ALLOW_REQUESTER_PAYS,
+        KEY_REQUESTER_PAYS_FILE);
+
+    removeBaseAndBucketOverrides(
+        DEFAULT_REQUESTER_PAYS_BUCKET_NAME,
+        newConf,
+        ENDPOINT,
+        AWS_REGION,
+        ALLOW_REQUESTER_PAYS,
+        KEY_REQUESTER_PAYS_FILE);
+
+    newConf.set(ENDPOINT, CENTRAL_ENDPOINT);
+    newConf.set(AWS_REGION, US_WEST_2);
+    newConf.setBoolean(ALLOW_REQUESTER_PAYS, true);
+
+    Path filePath = new Path(PublicDatasetTestUtils
+        .getRequesterPaysObject(newConf));
+    newFS = (S3AFileSystem) filePath.getFileSystem(newConf);
+
+    Assertions
+        .assertThat(newFS.exists(filePath))
+        .describedAs("Existence of path: " + filePath)
+        .isTrue();
+  }
+
+  @Test
+  public void testCentralEndpointAndNullRegionWithCRUD() throws Throwable {
+    describe("Access the test bucket using central endpoint and"
+        + " null region, perform file system CRUD operations");
+    final Configuration conf = getConfiguration();
+
+    final Configuration newConf = new Configuration(conf);
+
+    removeBaseAndBucketOverrides(
+        newConf,
+        ENDPOINT,
+        AWS_REGION);
+
+    newConf.set(ENDPOINT, CENTRAL_ENDPOINT);
+
+    newFS = new S3AFileSystem();
+    newFS.initialize(getFileSystem().getUri(), newConf);
+
+    assertOpsUsingNewFs();
+  }
+
+  private void assertOpsUsingNewFs() throws IOException {
+    final String file = getMethodName();
+    final Path basePath = methodPath();
+    final Path srcDir = new Path(basePath, "srcdir");
+    newFS.mkdirs(srcDir);
+    Path srcFilePath = new Path(srcDir, file);
+
+    try (FSDataOutputStream out = newFS.create(srcFilePath)) {
+      out.write(new byte[] {1, 2, 3});
+    }
+
+    Assertions
+        .assertThat(newFS.exists(srcFilePath))
+        .describedAs("Existence of file: " + srcFilePath)
+        .isTrue();
+    Assertions
+        .assertThat(getFileSystem().exists(srcFilePath))
+        .describedAs("Existence of file: " + srcFilePath)
+        .isTrue();
+
+    byte[] buffer = new byte[3];
+
+    try (FSDataInputStream in = newFS.open(srcFilePath)) {
+      in.readFully(buffer);
+      Assertions
+          .assertThat(buffer)
+          .describedAs("Contents read from " + srcFilePath)
+          .containsExactly(1, 2, 3);
+    }
+
+    newFS.delete(srcDir, true);
+
+    Assertions
+        .assertThat(newFS.exists(srcFilePath))
+        .describedAs("Existence of file: " + srcFilePath + " using new FS")
+        .isFalse();
+    Assertions
+        .assertThat(getFileSystem().exists(srcFilePath))
+        .describedAs("Existence of file: " + srcFilePath + " using original 
FS")
+        .isFalse();
+  }
+
   private final class RegionInterceptor implements ExecutionInterceptor {
     private final String endpoint;
     private final String region;
@@ -272,7 +431,7 @@ public class ITestS3AEndpointRegion extends 
AbstractS3ATestBase {
     public void beforeExecution(Context.BeforeExecution context,
         ExecutionAttributes executionAttributes)  {
 
-      if (endpoint != null) {
+      if (endpoint != null && !endpoint.endsWith(CENTRAL_ENDPOINT)) {
         Assertions.assertThat(
                 
executionAttributes.getAttribute(AwsExecutionAttribute.ENDPOINT_OVERRIDDEN))
             .describedAs("Endpoint not overridden").isTrue();
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/PublicDatasetTestUtils.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/PublicDatasetTestUtils.java
index 669acd8b8bd5..7ef2449b8e83 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/PublicDatasetTestUtils.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/PublicDatasetTestUtils.java
@@ -53,6 +53,13 @@ public final class PublicDatasetTestUtils {
   private static final String DEFAULT_REQUESTER_PAYS_FILE
       = "s3a://usgs-landsat/collection02/catalog.json";
 
+  /**
+   * Default bucket name for the requester pays bucket.
+   * Value = {@value}.
+   */
+  public static final String DEFAULT_REQUESTER_PAYS_BUCKET_NAME =
+      "usgs-landsat";
+
   /**
    * Default bucket for an S3A file system with many objects: {@value}.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to