This is an automated email from the ASF dual-hosted git repository.
slfan1989 pushed a commit to branch branch-3.4.0
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4.0 by this push:
new fd0d0c90d964 HADOOP-19044. S3A: AWS SDK V2 - Update region logic
(#6479)
fd0d0c90d964 is described below
commit fd0d0c90d9646becdc4ca6e8098f97b89250f7ee
Author: Viraj Jasani <[email protected]>
AuthorDate: Sat Feb 3 05:44:23 2024 -0900
HADOOP-19044. S3A: AWS SDK V2 - Update region logic (#6479)
Improves region handling in the S3A connector, including enabling
cross-region support
when that is considered necessary.
Consult the documentation in connecting.md/connecting.html for the current
resolution process.
Contributed by Viraj Jasani
---
.../hadoop/fs/s3a/DefaultS3ClientFactory.java | 53 +++++--
.../tools/hadoop-aws/aws_sdk_v2_changelog.md | 4 +-
.../site/markdown/tools/hadoop-aws/connecting.md | 36 +++++
.../src/site/markdown/tools/hadoop-aws/index.md | 4 +-
.../hadoop/fs/s3a/ITestS3AEndpointRegion.java | 163 ++++++++++++++++++++-
.../hadoop/fs/s3a/test/PublicDatasetTestUtils.java | 7 +
6 files changed, 251 insertions(+), 16 deletions(-)
diff --git
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
index 0a3267a9fe51..284ba8e6ae5c 100644
---
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
+++
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
@@ -267,9 +267,10 @@ public class DefaultS3ClientFactory extends Configured
*/
private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT>
void configureEndpointAndRegion(
BuilderT builder, S3ClientCreationParameters parameters, Configuration
conf) {
- URI endpoint = getS3Endpoint(parameters.getEndpoint(), conf);
+ final String endpointStr = parameters.getEndpoint();
+ final URI endpoint = getS3Endpoint(endpointStr, conf);
- String configuredRegion = parameters.getRegion();
+ final String configuredRegion = parameters.getRegion();
Region region = null;
String origin = "";
@@ -291,15 +292,33 @@ public class DefaultS3ClientFactory extends Configured
if (endpoint != null) {
checkArgument(!fipsEnabled,
"%s : %s", ERROR_ENDPOINT_WITH_FIPS, endpoint);
- builder.endpointOverride(endpoint);
- // No region was configured, try to determine it from the endpoint.
+ boolean endpointEndsWithCentral =
+ endpointStr.endsWith(CENTRAL_ENDPOINT);
+
+ // No region was configured,
+ // determine the region from the endpoint.
if (region == null) {
- region = getS3RegionFromEndpoint(parameters.getEndpoint());
+ region = getS3RegionFromEndpoint(endpointStr,
+ endpointEndsWithCentral);
if (region != null) {
origin = "endpoint";
}
}
- LOG.debug("Setting endpoint to {}", endpoint);
+
+ // No need to override endpoint with "s3.amazonaws.com".
+ // Let the client take care of endpoint resolution. Overriding
+ // the endpoint with "s3.amazonaws.com" causes 400 Bad Request
+ // errors for non-existent buckets and objects.
+ // ref: https://github.com/aws/aws-sdk-java-v2/issues/4846
+ if (!endpointEndsWithCentral) {
+ builder.endpointOverride(endpoint);
+ LOG.debug("Setting endpoint to {}", endpoint);
+ } else {
+ builder.crossRegionAccessEnabled(true);
+ origin = "central endpoint with cross region access";
+ LOG.debug("Enabling cross region access for endpoint {}",
+ endpointStr);
+ }
}
if (region != null) {
@@ -354,20 +373,32 @@ public class DefaultS3ClientFactory extends Configured
/**
* Parses the endpoint to get the region.
- * If endpoint is the central one, use US_EAST_1.
+ * If endpoint is the central one, use US_EAST_2.
*
* @param endpoint the configure endpoint.
+ * @param endpointEndsWithCentral true if the endpoint is configured as
central.
* @return the S3 region, null if unable to resolve from endpoint.
*/
- private static Region getS3RegionFromEndpoint(String endpoint) {
+ private static Region getS3RegionFromEndpoint(final String endpoint,
+ final boolean endpointEndsWithCentral) {
- if(!endpoint.endsWith(CENTRAL_ENDPOINT)) {
+ if (!endpointEndsWithCentral) {
LOG.debug("Endpoint {} is not the default; parsing", endpoint);
return AwsHostNameUtils.parseSigningRegion(endpoint,
S3_SERVICE_NAME).orElse(null);
}
- // endpoint is for US_EAST_1;
- return Region.US_EAST_1;
+ // Select default region here to enable cross-region access.
+ // If both "fs.s3a.endpoint" and "fs.s3a.endpoint.region" are empty,
+ // Spark sets "fs.s3a.endpoint" to "s3.amazonaws.com".
+ // This applies to Spark versions with the changes of SPARK-35878.
+ // ref:
+ // https://github.com/apache/spark/blob/v3.5.0/core/
+ // src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L528
+ // If we do not allow cross region access, Spark would not be able to
+ // access any bucket that is not present in the given region.
+ // Hence, we should use default region us-east-2 to allow cross-region
+ // access.
+ return Region.of(AWS_S3_DEFAULT_REGION);
}
}
diff --git
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_v2_changelog.md
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_v2_changelog.md
index 162f15951f5c..de3808c54dcb 100644
---
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_v2_changelog.md
+++
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_v2_changelog.md
@@ -83,8 +83,8 @@ The table below lists the configurations S3A was using and
what they now map to.
Previously, if no endpoint and region was configured, fall back to using
us-east-1. Set
withForceGlobalBucketAccessEnabled(true) which will allow access to buckets
not in this region too.
-Since the SDK V2 no longer supports cross region access, we need to set the
region and endpoint of
-the bucket. The behaviour has now been changed to:
+Since the SDK V2 no longer supports cross region access, we need to set the
region and
+endpoint of the bucket. The behaviour has now been changed to:
* If no endpoint is specified, use s3.amazonaws.com.
* When setting the endpoint, also set the protocol (HTTP or HTTPS)
diff --git
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md
index a31b1c3e39a0..51e70ef231bf 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md
@@ -100,6 +100,42 @@ With the move to the AWS V2 SDK, there is more emphasis on
the region, set by th
Normally, declaring the region in `fs.s3a.endpoint.region` should be
sufficient to set up the network connection to correctly connect to an
AWS-hosted S3 store.
+### <a name="s3_endpoint_region_details"></a> S3 endpoint and region settings
in detail
+
+* Configs `fs.s3a.endpoint` and `fs.s3a.endpoint.region` are used to set values
+ for S3 endpoint and region respectively.
+* If `fs.s3a.endpoint.region` is configured with valid AWS region value, S3A
will
+ configure the S3 client to use this value. If this is set to a region that
does
+ not match your bucket, you will receive a 301 redirect response.
+* If `fs.s3a.endpoint.region` is not set and `fs.s3a.endpoint` is set with
valid
+ endpoint value, S3A will attempt to parse the region from the endpoint and
+ configure S3 client to use the region value.
+* If both `fs.s3a.endpoint` and `fs.s3a.endpoint.region` are not set, S3A will
+ use `us-east-2` as default region and enable cross region access. In this
case,
+ S3A does not attempt to override the endpoint while configuring the S3
client.
+* If `fs.s3a.endpoint` is not set and `fs.s3a.endpoint.region` is set to an
empty
+ string, S3A will configure S3 client without any region or endpoint override.
+ This will allow fallback to S3 SDK region resolution chain. More details
+
[here](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
+* If `fs.s3a.endpoint` is set to central endpoint `s3.amazonaws.com` and
+ `fs.s3a.endpoint.region` is not set, S3A will use `us-east-2` as default
region
+ and enable cross region access. In this case, S3A does not attempt to
override
+ the endpoint while configuring the S3 client.
+* If `fs.s3a.endpoint` is set to central endpoint `s3.amazonaws.com` and
+ `fs.s3a.endpoint.region` is also set to some region, S3A will use that region
+ value and enable cross region access. In this case, S3A does not attempt to
+ override the endpoint while configuring the S3 client.
+
+When the cross region access is enabled while configuring the S3 client, even
if the
+region set is incorrect, S3 SDK determines the region. This is done by making
the
+request, and if the SDK receives 301 redirect response, it determines the
region at
+the cost of a HEAD request, and caches it.
+
+Please note that some endpoint and region settings that require cross region
access
+are complex and improving over time. Hence, they may be considered unstable.
+
+If you are working with third party stores, please check [third party stores
in detail](third_party_stores.html).
+
### <a name="timeouts"></a> Network timeouts
See [Timeouts](performance.html#timeouts).
diff --git
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index 0f09c7f87315..cdc0134e60a8 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -226,7 +226,9 @@ If you do any of these: change your credentials immediately!
## Connecting to Amazon S3 or a third-party store
-See [Connecting to an Amazon S3 Bucket through the S3A
Connector](connecting.md).
+See [Connecting to an Amazon S3 Bucket through the S3A
Connector](connecting.html).
+
+Also, please check [S3 endpoint and region settings in
detail](connecting.html#s3_endpoint_region_details).
## <a name="authenticating"></a> Authenticating with S3
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java
index 5e6991128b20..95f31d7527f8 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java
@@ -38,13 +38,20 @@ import
software.amazon.awssdk.services.s3.model.HeadBucketRequest;
import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
+import org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils;
+import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS;
import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
import static org.apache.hadoop.fs.s3a.Constants.CENTRAL_ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS;
import static
org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.ERROR_ENDPOINT_WITH_FIPS;
import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.DEFAULT_REQUESTER_PAYS_BUCKET_NAME;
import static org.apache.hadoop.io.IOUtils.closeStream;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@@ -146,11 +153,28 @@ public class ITestS3AEndpointRegion extends
AbstractS3ATestBase {
describe("Create a client with the central endpoint");
Configuration conf = getConfiguration();
- S3Client client = createS3Client(conf, CENTRAL_ENDPOINT, null, US_EAST_1,
false);
+ S3Client client = createS3Client(conf, CENTRAL_ENDPOINT, null, US_EAST_2,
false);
expectInterceptorException(client);
}
+ @Test
+ public void testCentralEndpointWithRegion() throws Throwable {
+ describe("Create a client with the central endpoint but also specify
region");
+ Configuration conf = getConfiguration();
+
+ S3Client client = createS3Client(conf, CENTRAL_ENDPOINT, US_WEST_2,
+ US_WEST_2, false);
+
+ expectInterceptorException(client);
+
+ client = createS3Client(conf, CENTRAL_ENDPOINT, US_EAST_1,
+ US_EAST_1, false);
+
+ expectInterceptorException(client);
+
+ }
+
@Test
public void testWithRegionConfig() throws Throwable {
describe("Create a client with a configured region");
@@ -257,6 +281,141 @@ public class ITestS3AEndpointRegion extends
AbstractS3ATestBase {
expectInterceptorException(client);
}
+ @Test
+ public void testCentralEndpointAndDifferentRegionThanBucket() throws
Throwable {
+ describe("Access public bucket using central endpoint and region "
+ + "different than that of the public bucket");
+ final Configuration conf = getConfiguration();
+ final Configuration newConf = new Configuration(conf);
+
+ removeBaseAndBucketOverrides(
+ newConf,
+ ENDPOINT,
+ AWS_REGION,
+ ALLOW_REQUESTER_PAYS,
+ KEY_REQUESTER_PAYS_FILE);
+
+ removeBaseAndBucketOverrides(
+ DEFAULT_REQUESTER_PAYS_BUCKET_NAME,
+ newConf,
+ ENDPOINT,
+ AWS_REGION,
+ ALLOW_REQUESTER_PAYS,
+ KEY_REQUESTER_PAYS_FILE);
+
+ newConf.set(ENDPOINT, CENTRAL_ENDPOINT);
+ newConf.set(AWS_REGION, EU_WEST_1);
+ newConf.setBoolean(ALLOW_REQUESTER_PAYS, true);
+
+ Path filePath = new Path(PublicDatasetTestUtils
+ .getRequesterPaysObject(newConf));
+ newFS = (S3AFileSystem) filePath.getFileSystem(newConf);
+
+ Assertions
+ .assertThat(newFS.exists(filePath))
+ .describedAs("Existence of path: " + filePath)
+ .isTrue();
+ }
+
+ @Test
+ public void testCentralEndpointAndSameRegionAsBucket() throws Throwable {
+ describe("Access public bucket using central endpoint and region "
+ + "same as that of the public bucket");
+ final Configuration conf = getConfiguration();
+ final Configuration newConf = new Configuration(conf);
+
+ removeBaseAndBucketOverrides(
+ newConf,
+ ENDPOINT,
+ AWS_REGION,
+ ALLOW_REQUESTER_PAYS,
+ KEY_REQUESTER_PAYS_FILE);
+
+ removeBaseAndBucketOverrides(
+ DEFAULT_REQUESTER_PAYS_BUCKET_NAME,
+ newConf,
+ ENDPOINT,
+ AWS_REGION,
+ ALLOW_REQUESTER_PAYS,
+ KEY_REQUESTER_PAYS_FILE);
+
+ newConf.set(ENDPOINT, CENTRAL_ENDPOINT);
+ newConf.set(AWS_REGION, US_WEST_2);
+ newConf.setBoolean(ALLOW_REQUESTER_PAYS, true);
+
+ Path filePath = new Path(PublicDatasetTestUtils
+ .getRequesterPaysObject(newConf));
+ newFS = (S3AFileSystem) filePath.getFileSystem(newConf);
+
+ Assertions
+ .assertThat(newFS.exists(filePath))
+ .describedAs("Existence of path: " + filePath)
+ .isTrue();
+ }
+
+ @Test
+ public void testCentralEndpointAndNullRegionWithCRUD() throws Throwable {
+ describe("Access the test bucket using central endpoint and"
+ + " null region, perform file system CRUD operations");
+ final Configuration conf = getConfiguration();
+
+ final Configuration newConf = new Configuration(conf);
+
+ removeBaseAndBucketOverrides(
+ newConf,
+ ENDPOINT,
+ AWS_REGION);
+
+ newConf.set(ENDPOINT, CENTRAL_ENDPOINT);
+
+ newFS = new S3AFileSystem();
+ newFS.initialize(getFileSystem().getUri(), newConf);
+
+ assertOpsUsingNewFs();
+ }
+
+ private void assertOpsUsingNewFs() throws IOException {
+ final String file = getMethodName();
+ final Path basePath = methodPath();
+ final Path srcDir = new Path(basePath, "srcdir");
+ newFS.mkdirs(srcDir);
+ Path srcFilePath = new Path(srcDir, file);
+
+ try (FSDataOutputStream out = newFS.create(srcFilePath)) {
+ out.write(new byte[] {1, 2, 3});
+ }
+
+ Assertions
+ .assertThat(newFS.exists(srcFilePath))
+ .describedAs("Existence of file: " + srcFilePath)
+ .isTrue();
+ Assertions
+ .assertThat(getFileSystem().exists(srcFilePath))
+ .describedAs("Existence of file: " + srcFilePath)
+ .isTrue();
+
+ byte[] buffer = new byte[3];
+
+ try (FSDataInputStream in = newFS.open(srcFilePath)) {
+ in.readFully(buffer);
+ Assertions
+ .assertThat(buffer)
+ .describedAs("Contents read from " + srcFilePath)
+ .containsExactly(1, 2, 3);
+ }
+
+ newFS.delete(srcDir, true);
+
+ Assertions
+ .assertThat(newFS.exists(srcFilePath))
+ .describedAs("Existence of file: " + srcFilePath + " using new FS")
+ .isFalse();
+ Assertions
+ .assertThat(getFileSystem().exists(srcFilePath))
+ .describedAs("Existence of file: " + srcFilePath + " using original
FS")
+ .isFalse();
+ }
+
private final class RegionInterceptor implements ExecutionInterceptor {
private final String endpoint;
private final String region;
@@ -272,7 +431,7 @@ public class ITestS3AEndpointRegion extends
AbstractS3ATestBase {
public void beforeExecution(Context.BeforeExecution context,
ExecutionAttributes executionAttributes) {
- if (endpoint != null) {
+ if (endpoint != null && !endpoint.endsWith(CENTRAL_ENDPOINT)) {
Assertions.assertThat(
executionAttributes.getAttribute(AwsExecutionAttribute.ENDPOINT_OVERRIDDEN))
.describedAs("Endpoint not overridden").isTrue();
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/PublicDatasetTestUtils.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/PublicDatasetTestUtils.java
index 669acd8b8bd5..7ef2449b8e83 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/PublicDatasetTestUtils.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/PublicDatasetTestUtils.java
@@ -53,6 +53,13 @@ public final class PublicDatasetTestUtils {
private static final String DEFAULT_REQUESTER_PAYS_FILE
= "s3a://usgs-landsat/collection02/catalog.json";
+ /**
+ * Default bucket name for the requester pays bucket.
+ * Value = {@value}.
+ */
+ public static final String DEFAULT_REQUESTER_PAYS_BUCKET_NAME =
+ "usgs-landsat";
+
/**
* Default bucket for an S3A file system with many objects: {@value}.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]