[
https://issues.apache.org/jira/browse/HADOOP-18708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17895766#comment-17895766
]
ASF GitHub Bot commented on HADOOP-18708:
-----------------------------------------
steveloughran commented on code in PR #6884:
URL: https://github.com/apache/hadoop/pull/6884#discussion_r1829845013
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/EncryptionS3ClientFactory.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory;
+import org.apache.hadoop.util.Preconditions;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.functional.LazyAtomicReference;
+
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kms.KmsClient;
+import software.amazon.awssdk.services.kms.KmsClientBuilder;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.encryption.s3.S3AsyncEncryptionClient;
+import software.amazon.encryption.s3.S3EncryptionClient;
+import software.amazon.encryption.s3.materials.CryptographicMaterialsManager;
+import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager;
+import software.amazon.encryption.s3.materials.Keyring;
+import software.amazon.encryption.s3.materials.KmsKeyring;
+
+import static
org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;
+
+/**
+ * Factory class to create encrypted s3 client and encrypted async s3 client.
+ */
+public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {
+
+ /**
+ * Encryption client class name.
+ * value: {@value}
+ */
+ private static final String ENCRYPTION_CLIENT_CLASSNAME =
+ "software.amazon.encryption.s3.S3EncryptionClient";
+
+ /**
+ * Encryption client availability.
+ */
+ private static final LazyAtomicReference<Boolean>
ENCRYPTION_CLIENT_AVAILABLE =
+ LazyAtomicReference.lazyAtomicReferenceFromSupplier(
+ EncryptionS3ClientFactory::checkForEncryptionClient
+ );
+
+
+ /**
+ * S3Client to be wrapped by encryption client.
+ */
+ private S3Client s3Client;
+
+ /**
+ * S3AsyncClient to be wrapped by encryption client.
+ */
+ private S3AsyncClient s3AsyncClient;
+
+ /**
+ * Checks if {@link #ENCRYPTION_CLIENT_CLASSNAME} is available in the class
path.
+ * @return true if available, false otherwise.
+ */
+ private static boolean checkForEncryptionClient() {
+ try {
+ ClassLoader cl = EncryptionS3ClientFactory.class.getClassLoader();
+ cl.loadClass(ENCRYPTION_CLIENT_CLASSNAME);
+ LOG.debug("encryption client class {} found",
ENCRYPTION_CLIENT_CLASSNAME);
+ return true;
+ } catch (Exception e) {
+ LOG.debug("encryption client class {} not found",
ENCRYPTION_CLIENT_CLASSNAME, e);
+ return false;
+ }
+ }
+
+ /**
+ * Is the Encryption client available?
+ * @return true if it was found in the classloader
+ */
+ private static synchronized boolean isEncryptionClientAvailable() {
+ return ENCRYPTION_CLIENT_AVAILABLE.get();
+ }
+
+ /**
+ * Creates both synchronous and asynchronous encrypted s3 clients.
+ * Synchronous client is wrapped by encryption client first and then
+ * Asynchronous client is wrapped by encryption client.
+ * @param uri S3A file system URI
+ * @param parameters parameter object
+ * @return encrypted s3 client
+ * @throws IOException IO failures
+ */
+ @Override
+ public S3Client createS3Client(URI uri, S3ClientCreationParameters
parameters)
+ throws IOException {
+ if (!isEncryptionClientAvailable()) {
+ throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
+ "No encryption client available");
+ }
+
+ s3Client = super.createS3Client(uri, parameters);
+ s3AsyncClient = super.createS3AsyncClient(uri, parameters);
+
+ return createS3EncryptionClient(parameters);
+ }
+
+ /**
+ * Create async encrypted s3 client.
+ * @param uri S3A file system URI
+ * @param parameters parameter object
+ * @return async encrypted s3 client
+ * @throws IOException IO failures
+ */
+ @Override
+ public S3AsyncClient createS3AsyncClient(URI uri, S3ClientCreationParameters
parameters)
+ throws IOException {
+ if (!isEncryptionClientAvailable()) {
+ throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
+ "No encryption client available");
+ }
+ return createS3AsyncEncryptionClient(parameters);
+ }
+
+ /**
+ * Creates an S3EncryptionClient instance based on the provided parameters.
+ *
+ * @param parameters The S3ClientCreationParameters containing the necessary
configuration.
+ * @return An instance of S3EncryptionClient.
+ */
+ private S3Client createS3EncryptionClient(S3ClientCreationParameters
parameters) {
+ CSEMaterials cseMaterials = parameters.getClientSideEncryptionMaterials();
+ Preconditions.checkArgument(s3AsyncClient != null,
+ "S3 async client not initialized");
+ Preconditions.checkArgument(s3Client != null,
+ "S3 client not initialized");
+ Preconditions.checkArgument(parameters != null,
+ "S3ClientCreationParameters is not initialized");
+
+ S3EncryptionClient.Builder s3EncryptionClientBuilder =
+ S3EncryptionClient.builder()
+ .wrappedAsyncClient(s3AsyncClient)
+ .wrappedClient(s3Client)
+ // this is required for doing S3 ranged GET calls
+ .enableLegacyUnauthenticatedModes(true)
+ // this is required for backward compatibility with older
encryption clients
+ .enableLegacyWrappingAlgorithms(true);
+
+ switch (cseMaterials.getCseKeyType()) {
+ case KMS:
+ Keyring kmsKeyring = createKmsKeyring(parameters, cseMaterials);
+ CryptographicMaterialsManager kmsCryptoMaterialsManager =
+ DefaultCryptoMaterialsManager.builder()
+ .keyring(kmsKeyring)
+ .build();
+
s3EncryptionClientBuilder.cryptoMaterialsManager(kmsCryptoMaterialsManager);
+ break;
+ case CUSTOM:
+ Keyring keyring =
getKeyringProvider(cseMaterials.getCustomKeyringClassName(),
+ cseMaterials.getConf());
+ CryptographicMaterialsManager customCryptoMaterialsManager =
+ DefaultCryptoMaterialsManager.builder()
+ .keyring(keyring)
+ .build();
+
s3EncryptionClientBuilder.cryptoMaterialsManager(customCryptoMaterialsManager);
+ break;
+ default:
+ break;
+ }
+ return s3EncryptionClientBuilder.build();
+ }
+
+ /**
+ * Creates KmsKeyring instance based on the provided
S3ClientCreationParameters and CSEMaterials.
+ *
+ * @param parameters The S3ClientCreationParameters containing the necessary
configuration.
+ * @param cseMaterials The CSEMaterials containing the KMS key ID and other
encryption materials.
+ * @return A KmsKeyring instance configured with the appropriate KMS client
and wrapping key ID.
+ */
+ private Keyring createKmsKeyring(S3ClientCreationParameters parameters,
+ CSEMaterials cseMaterials) {
+ KmsClientBuilder kmsClientBuilder = KmsClient.builder();
+ if (parameters.getCredentialSet() != null) {
+ kmsClientBuilder.credentialsProvider(parameters.getCredentialSet());
+ }
+ // check if kms region is configured.
+ if (parameters.getKmsRegion() != null) {
+ kmsClientBuilder.region(Region.of(parameters.getKmsRegion()));
+ } else if (parameters.getRegion() != null) {
+ // fallback to s3 region if kms region is not configured.
+ kmsClientBuilder.region(Region.of(parameters.getRegion()));
+ } else if (parameters.getEndpoint() != null) {
+ // fallback to s3 endpoint config if both kms region and s3 region is
not set.
+ String endpointStr = parameters.getEndpoint();
+ URI endpoint = getS3Endpoint(endpointStr, cseMaterials.getConf());
+ kmsClientBuilder.endpointOverride(endpoint);
+ }
+ return KmsKeyring.builder()
+ .kmsClient(kmsClientBuilder.build())
+ .wrappingKeyId(cseMaterials.getKmsKeyId())
+ .build();
+ }
+
+ /**
+ * Creates an S3AsyncEncryptionClient instance based on the provided
parameters.
+ *
+ * @param parameters The S3ClientCreationParameters containing the necessary
configuration.
+ * @return An instance of S3AsyncEncryptionClient.
+ */
+ private S3AsyncClient
createS3AsyncEncryptionClient(S3ClientCreationParameters parameters) {
+ Preconditions.checkArgument(s3AsyncClient != null,
+ "S3 async client not initialized");
+ Preconditions.checkArgument(parameters != null,
+ "S3ClientCreationParameters is not initialized");
+
+ S3AsyncEncryptionClient.Builder s3EncryptionAsyncClientBuilder =
+ S3AsyncEncryptionClient.builder()
+ .wrappedClient(s3AsyncClient)
+ // this is required for doing S3 ranged GET calls
+ .enableLegacyUnauthenticatedModes(true)
+ // this is required for backward compatibility with older
encryption clients
+ .enableLegacyWrappingAlgorithms(true);
+
+ CSEMaterials cseMaterials = parameters.getClientSideEncryptionMaterials();
+ switch (cseMaterials.getCseKeyType()) {
+ case KMS:
+ Keyring kmsKeyring = createKmsKeyring(parameters, cseMaterials);
+ CryptographicMaterialsManager kmsCryptoMaterialsManager =
+ DefaultCryptoMaterialsManager.builder()
+ .keyring(kmsKeyring)
+ .build();
+
s3EncryptionAsyncClientBuilder.cryptoMaterialsManager(kmsCryptoMaterialsManager);
+ break;
+ case CUSTOM:
+ Keyring keyring =
getKeyringProvider(cseMaterials.getCustomKeyringClassName(),
+ cseMaterials.getConf());
+ CryptographicMaterialsManager customCryptoMaterialsManager =
+ DefaultCryptoMaterialsManager.builder()
+ .keyring(keyring)
+ .build();
+
s3EncryptionAsyncClientBuilder.cryptoMaterialsManager(customCryptoMaterialsManager);
+ break;
+ default:
+ break;
+ }
+ return s3EncryptionAsyncClientBuilder.build();
+ }
+
+
+ /**
+ * Retrieves an instance of the Keyring provider based on the provided class
name.
+ *
+ * @param className The fully qualified class name of the Keyring provider
implementation.
+ * @param conf The Configuration object containing the necessary
configuration properties.
+ * @return An instance of the Keyring provider.
+ */
Review Comment:
mark that this will throw RuntimeException if the provider class cannot be
found or is of the wrong class.
+need to make sure that all calling code handles this
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java:
##########
@@ -266,6 +271,150 @@ public void testEncryptionEnabledAndDisabledFS() throws
Exception {
}
}
+ /**
+ * Test to check if unencrypted objects are read with V1 client
compatibility.
+ * @throws IOException
Review Comment:
cut these @throws lines; no need to explain how tests fail.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java:
##########
@@ -540,6 +555,102 @@ public Map.Entry<Duration, DeleteObjectsResponse>
deleteObjects(
}
}
+ /**
+ * Performs a HEAD request on an S3 object to retrieve its metadata.
+ *
+ * @param key The S3 object key to perform the HEAD operation on
+ * @param changeTracker Tracks changes to the object's metadata across
operations
+ * @param changeInvoker The invoker responsible for executing the HEAD
request with retries
+ * @param fsHandler Handler for filesystem-level operations and
configurations
+ * @param operation Description of the operation being performed for
tracking purposes
+ * @return HeadObjectResponse containing the object's metadata
+ * @throws IOException If the HEAD request fails, object doesn't exist, or
other I/O errors occur
+ */
+ @Override
+ @Retries.RetryRaw
+ public HeadObjectResponse headObject(String key,
+ ChangeTracker changeTracker,
+ Invoker changeInvoker,
+ S3AFileSystemHandler fsHandler,
+ String operation) throws IOException {
+ HeadObjectResponse response = getStoreContext().getInvoker()
+ .retryUntranslated("GET " + key, true,
Review Comment:
change to HEAD
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java:
##########
@@ -638,4 +646,15 @@ private static void skipIfCrossRegionClient(
skip("Skipping test as cross region client is in use ");
}
}
+
+ /**
+ * Unset encryption options.
+ * This is needed to avoid encryption tests interfering with non-encryption
+ * tests.
+ * @param conf Configuations
+ */
+ private static void unsetEncryption(Configuration conf) {
Review Comment:
would it make sense to move this to S3ATestUtils and use elsewhere,
unsetting *all* encryption options? Then use in
ITestS3AClientSideEncryptionCustom
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionCustom.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.AWSHeaders;
+import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
+
+import static
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
+
+/**
+ * Tests to verify Custom S3 client side encryption CSE-CUSTOM.
+ */
+public class ITestS3AClientSideEncryptionCustom extends
ITestS3AClientSideEncryption {
+
+ private static final String KMS_KEY_WRAP_ALGO = "kms+context";
+ /**
+ * Creating custom configs for CSE-CUSTOM testing.
+ *
+ * @return Configuration.
+ */
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ S3ATestUtils.disableFilesystemCaching(conf);
+ conf.set(S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME,
+ CustomKeyring.class.getCanonicalName());
+ return conf;
+ }
+
+ @Override
+ protected void maybeSkipTest() throws IOException {
+ skipIfEncryptionTestsDisabled(getConfiguration());
+ // skip the test if CSE-CUSTOM is not set.
+ skipIfEncryptionNotSet(getConfiguration(),
S3AEncryptionMethods.CSE_CUSTOM);
+ }
+
+
+ @Override
+ protected void assertEncrypted(Path path) throws IOException {
+ Map<String, byte[]> fsXAttrs = getFileSystem().getXAttrs(path);
+ String xAttrPrefix = "header.";
+
+ // Assert KeyWrap Algo
+ assertEquals("Key wrap algo isn't same as expected", KMS_KEY_WRAP_ALGO,
Review Comment:
assertJ assertThat
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AFileSystemHandler.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Listing;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+import org.apache.hadoop.fs.s3a.S3AStore;
+import org.apache.hadoop.fs.s3a.S3ClientFactory;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+
+/**
+ * An interface that defines the contract for handling certain filesystem
operations.
+ */
+public interface S3AFileSystemHandler {
Review Comment:
yes
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/EncryptionS3ClientFactory.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
Review Comment:
move block below the software. one
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java:
##########
@@ -156,7 +159,12 @@ public void testGeneratePoolTimeouts() throws Throwable {
ContractTestUtils.createFile(fs, path, true, DATASET);
final FileStatus st = fs.getFileStatus(path);
try (FileSystem brittleFS = FileSystem.newInstance(fs.getUri(), conf)) {
- intercept(ConnectTimeoutException.class, () -> {
+ Class exceptionClass = ConnectTimeoutException.class;
Review Comment:
won't be needed once the translation is fixed.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEV1CompatibleS3AFileSystemHandler.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
Review Comment:
nit, put block below the "other" package
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java:
##########
@@ -200,9 +203,14 @@ public void
testSingleObjectDeleteNoPermissionsTranslated() throws Throwable {
Path path = requireDefaultExternalData(getConfiguration());
S3AFileSystem fs = (S3AFileSystem) path.getFileSystem(
getConfiguration());
- AccessDeniedException aex = intercept(AccessDeniedException.class,
+ Class exceptionClass = AccessDeniedException.class;
+ if (CSEUtils.isCSEEnabled(getEncryptionAlgorithm(
+ getTestBucketName(getConfiguration()),
getConfiguration()).getMethod())) {
+ exceptionClass = AWSClientIOException.class;
Review Comment:
can you show me the full stack? 403 normally maps to AccessDeniedException,
and it'd be good to keep the same. AWSClientIOException is just our "something
failed" if there's nothing else
1. Add `maybeTranslateEncryptionClientException()` in `ErrorTranslation` to
look at the exception, if the classname string value matches
"software.amazon.encryption.s3.S3EncryptionClientException" then map to
AccessDeniedException
2. call that to S3AUtils.translateException just before the ` // no custom
handling.` bit
It may seem bad, but look at `maybeExtractChannelException()` and other bits
to see worse.
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java:
##########
@@ -266,6 +271,150 @@ public void testEncryptionEnabledAndDisabledFS() throws
Exception {
}
}
+ /**
+ * Test to check if unencrypted objects are read with V1 client
compatibility.
+ * @throws IOException
+ * @throws Exception
+ */
+ @Test
+ public void testUnencryptedObjectReadWithV1CompatibilityConfig() throws
Exception {
+ maybeSkipTest();
+ // initialize base s3 client.
+ Configuration conf = new Configuration(getConfiguration());
+ removeBaseAndBucketOverrides(getTestBucketName(conf),
+ conf,
+ S3_ENCRYPTION_ALGORITHM,
+ S3_ENCRYPTION_KEY,
+ SERVER_SIDE_ENCRYPTION_ALGORITHM,
+ SERVER_SIDE_ENCRYPTION_KEY);
+
+ Path file = methodPath();
+
+ try (S3AFileSystem nonCseFs = createTestFileSystem(conf)) {
+ nonCseFs.initialize(getFileSystem().getUri(), conf);
+
+ // write unencrypted file
+ ContractTestUtils.writeDataset(nonCseFs, file, new byte[SMALL_FILE_SIZE],
+ SMALL_FILE_SIZE, SMALL_FILE_SIZE, true);
+ }
+
+ Configuration cseConf = new Configuration(getConfiguration());
+ cseConf.setBoolean(S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED, true);
+
+ // create filesystem with cse enabled and v1 compatibility.
+ try (S3AFileSystem cseFs = createTestFileSystem(cseConf)) {
+ cseFs.initialize(getFileSystem().getUri(), cseConf);
+
+ // read unencrypted file. It should not throw any exception.
+ try (FSDataInputStream in = cseFs.open(file)) {
+ in.read(new byte[SMALL_FILE_SIZE]);
+ }
+ }
+ }
+
+ /**
+ * Tests the size of an encrypted object when with V1 compatibility and
custom header length.
+ *
+ * @throws Exception If any error occurs during the test execution.
+ */
+ @Test
+ public void testSizeOfEncryptedObjectFromHeaderWithV1Compatibility() throws
Exception {
+ maybeSkipTest();
+ Configuration cseConf = new Configuration(getConfiguration());
+ cseConf.setBoolean(S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED, true);
+ try (S3AFileSystem fs = createTestFileSystem(cseConf)) {
+ fs.initialize(getFileSystem().getUri(), cseConf);
+
+ Path filePath = methodPath();
+ Path file = new Path(filePath, "file");
+ String key = fs.pathToKey(file);
+
+ // write object with random content length header
+ Map<String, String> metadata = new HashMap<>();
+ metadata.put(AWSHeaders.UNENCRYPTED_CONTENT_LENGTH, "10");
+ try (AuditSpan span = span()) {
+ RequestFactory factory = RequestFactoryImpl.builder()
+ .withBucket(fs.getBucket())
+ .build();
+ PutObjectRequest.Builder putObjectRequestBuilder =
+ factory.newPutObjectRequestBuilder(key,
+ null, SMALL_FILE_SIZE, false);
+
putObjectRequestBuilder.contentLength(Long.parseLong(String.valueOf(SMALL_FILE_SIZE)));
+ putObjectRequestBuilder.metadata(metadata);
+ fs.putObjectDirect(putObjectRequestBuilder.build(),
+ PutObjectOptions.deletingDirs(),
+ new S3ADataBlocks.BlockUploadData(new byte[SMALL_FILE_SIZE], null),
+ null);
+
+ // fetch the random content length
+ long contentLength = fs.getFileStatus(file).getLen();
+ assertEquals("content length does not match", 10, contentLength);
Review Comment:
prefer AssertJ assertThat() here, and include the whole status, and put it
in its own method or use below
```
void assertFileLength(fs, path, expected)
st = fs.getFileStatus(path)
Assertions.assertThat(fs.getLen())
.describedAs("Length of %s status: %s", path, st)
.isEqualTo(expected)
```
then call here and below as needed
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionCustom.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.AWSHeaders;
+import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
+
+import static
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
+
+/**
+ * Tests to verify Custom S3 client side encryption CSE-CUSTOM.
+ */
+public class ITestS3AClientSideEncryptionCustom extends
ITestS3AClientSideEncryption {
+
+ private static final String KMS_KEY_WRAP_ALGO = "kms+context";
+ /**
+ * Creating custom configs for CSE-CUSTOM testing.
+ *
+ * @return Configuration.
+ */
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ S3ATestUtils.disableFilesystemCaching(conf);
+ conf.set(S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME,
Review Comment:
remove any bucket overrides for this option.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/EncryptionS3ClientFactory.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory;
+import org.apache.hadoop.util.Preconditions;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.functional.LazyAtomicReference;
+
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kms.KmsClient;
+import software.amazon.awssdk.services.kms.KmsClientBuilder;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.encryption.s3.S3AsyncEncryptionClient;
+import software.amazon.encryption.s3.S3EncryptionClient;
+import software.amazon.encryption.s3.materials.CryptographicMaterialsManager;
+import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager;
+import software.amazon.encryption.s3.materials.Keyring;
+import software.amazon.encryption.s3.materials.KmsKeyring;
+
+import static
org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;
+
+/**
+ * Factory class to create encrypted s3 client and encrypted async s3 client.
+ */
+public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {
+
+ /**
+ * Encryption client class name.
+ * value: {@value}
+ */
+ private static final String ENCRYPTION_CLIENT_CLASSNAME =
+ "software.amazon.encryption.s3.S3EncryptionClient";
+
+ /**
+ * Encryption client availability.
+ */
+ private static final LazyAtomicReference<Boolean>
ENCRYPTION_CLIENT_AVAILABLE =
+ LazyAtomicReference.lazyAtomicReferenceFromSupplier(
+ EncryptionS3ClientFactory::checkForEncryptionClient
+ );
+
+
+ /**
+ * S3Client to be wrapped by encryption client.
+ */
+ private S3Client s3Client;
+
+ /**
+ * S3AsyncClient to be wrapped by encryption client.
+ */
+ private S3AsyncClient s3AsyncClient;
+
+ /**
+ * Checks if {@link #ENCRYPTION_CLIENT_CLASSNAME} is available in the class
path.
+ * @return true if available, false otherwise.
+ */
+ private static boolean checkForEncryptionClient() {
+ try {
+ ClassLoader cl = EncryptionS3ClientFactory.class.getClassLoader();
+ cl.loadClass(ENCRYPTION_CLIENT_CLASSNAME);
+ LOG.debug("encryption client class {} found",
ENCRYPTION_CLIENT_CLASSNAME);
+ return true;
+ } catch (Exception e) {
+ LOG.debug("encryption client class {} not found",
ENCRYPTION_CLIENT_CLASSNAME, e);
+ return false;
+ }
+ }
+
+ /**
+ * Is the Encryption client available?
+ * @return true if it was found in the classloader
+ */
+ private static synchronized boolean isEncryptionClientAvailable() {
+ return ENCRYPTION_CLIENT_AVAILABLE.get();
+ }
+
+ /**
+ * Creates both synchronous and asynchronous encrypted s3 clients.
+ * Synchronous client is wrapped by encryption client first and then
+ * Asynchronous client is wrapped by encryption client.
+ * @param uri S3A file system URI
+ * @param parameters parameter object
+ * @return encrypted s3 client
+ * @throws IOException IO failures
+ */
+ @Override
+ public S3Client createS3Client(URI uri, S3ClientCreationParameters
parameters)
+ throws IOException {
+ if (!isEncryptionClientAvailable()) {
+ throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
+ "No encryption client available");
+ }
+
+ s3Client = super.createS3Client(uri, parameters);
+ s3AsyncClient = super.createS3AsyncClient(uri, parameters);
+
+ return createS3EncryptionClient(parameters);
+ }
+
+ /**
+ * Create async encrypted s3 client.
+ * @param uri S3A file system URI
+ * @param parameters parameter object
+ * @return async encrypted s3 client
+ * @throws IOException IO failures
+ */
+ @Override
+ public S3AsyncClient createS3AsyncClient(URI uri, S3ClientCreationParameters
parameters)
+ throws IOException {
+ if (!isEncryptionClientAvailable()) {
+ throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
+ "No encryption client available");
+ }
+ return createS3AsyncEncryptionClient(parameters);
+ }
+
+ /**
+ * Creates an S3EncryptionClient instance based on the provided parameters.
+ *
+ * @param parameters The S3ClientCreationParameters containing the necessary
configuration.
+ * @return An instance of S3EncryptionClient.
+ */
+ private S3Client createS3EncryptionClient(S3ClientCreationParameters
parameters) {
+ CSEMaterials cseMaterials = parameters.getClientSideEncryptionMaterials();
+ Preconditions.checkArgument(s3AsyncClient != null,
+ "S3 async client not initialized");
+ Preconditions.checkArgument(s3Client != null,
+ "S3 client not initialized");
+ Preconditions.checkArgument(parameters != null,
+ "S3ClientCreationParameters is not initialized");
+
+ S3EncryptionClient.Builder s3EncryptionClientBuilder =
+ S3EncryptionClient.builder()
+ .wrappedAsyncClient(s3AsyncClient)
+ .wrappedClient(s3Client)
+ // this is required for doing S3 ranged GET calls
+ .enableLegacyUnauthenticatedModes(true)
+ // this is required for backward compatibility with older
encryption clients
+ .enableLegacyWrappingAlgorithms(true);
+
+ switch (cseMaterials.getCseKeyType()) {
+ case KMS:
+ Keyring kmsKeyring = createKmsKeyring(parameters, cseMaterials);
+ CryptographicMaterialsManager kmsCryptoMaterialsManager =
+ DefaultCryptoMaterialsManager.builder()
+ .keyring(kmsKeyring)
+ .build();
+
s3EncryptionClientBuilder.cryptoMaterialsManager(kmsCryptoMaterialsManager);
+ break;
+ case CUSTOM:
+ Keyring keyring =
getKeyringProvider(cseMaterials.getCustomKeyringClassName(),
+ cseMaterials.getConf());
+ CryptographicMaterialsManager customCryptoMaterialsManager =
+ DefaultCryptoMaterialsManager.builder()
+ .keyring(keyring)
+ .build();
+
s3EncryptionClientBuilder.cryptoMaterialsManager(customCryptoMaterialsManager);
+ break;
+ default:
+ break;
Review Comment:
if there is no match, log at debug?
is there any actual problem from no match?
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEV1CompatibleS3AFileSystemHandler.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Listing;
+import org.apache.hadoop.fs.s3a.S3AStore;
+import org.apache.hadoop.fs.s3a.S3ClientFactory;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+
+import static
org.apache.hadoop.fs.s3a.Constants.DEFAULT_S3_CLIENT_FACTORY_IMPL;
+import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL;
+import static org.apache.hadoop.fs.s3a.impl.CSEUtils.isObjectEncrypted;
+
+/**
+ * An extension of the {@link CSES3AFileSystemHandler} class.
+ * This handles certain file system operations when client-side encryption is
enabled with v1 client
+ * compatibility.
+ * {@link
org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED}.
+ */
+public class CSEV1CompatibleS3AFileSystemHandler extends
CSES3AFileSystemHandler {
+
+ /**
+ * Constructs a new instance of {@code CSEV1CompatibleS3AFileSystemHandler}.
+ */
+ public CSEV1CompatibleS3AFileSystemHandler() {
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p>This implementation returns a {@link
Listing.AcceptAllButS3nDirsAndCSEInstructionFile}
+ * or {@link Listing.AcceptAllButSelfAndS3nDirsAndCSEInstructionFile} object
+ * based on the value of the {@code includeSelf} parameter.
+ */
+ @Override
+ public Listing.FileStatusAcceptor getFileStatusAcceptor(Path path, boolean
includeSelf) {
+ return includeSelf
+ ? new Listing.AcceptAllButS3nDirsAndCSEInstructionFile()
+ : new Listing.AcceptAllButSelfAndS3nDirsAndCSEInstructionFile(path);
+ }
+
+ /**
+ * Returns a {@link Listing.FileStatusAcceptor} object.
+ * That determines which files and directories should be included in a
listing operation.
+ *
+ * @param path the path for which the listing is being performed
+ * @return a {@link Listing.FileStatusAcceptor} object
+ */
+ @Override
+ public Listing.FileStatusAcceptor getFileStatusAcceptor(Path path) {
+ return new Listing.AcceptFilesOnlyExceptCSEInstructionFile(path);
+ }
+
+ /**
+ * Retrieves an object from the S3.
+ * If the S3 object is encrypted, it uses the encrypted S3 client to
retrieve the object else
+ * it uses the unencrypted S3 client.
+ *
+ * @param store The S3AStore object representing the S3 bucket.
+ * @param request The GetObjectRequest containing the details of the object
to retrieve.
+ * @param factory The RequestFactory used to create the GetObjectRequest.
+ * @return A ResponseInputStream containing the GetObjectResponse.
+ * @throws IOException If an error occurs while retrieving the object.
+ */
+ @Override
+ public ResponseInputStream<GetObjectResponse> getObject(S3AStore store,
GetObjectRequest request,
+ RequestFactory factory) throws IOException {
+ boolean isEncrypted = isObjectEncrypted(store.getOrCreateS3Client(),
factory, request.key());
+ return isEncrypted ? store.getOrCreateS3Client().getObject(request)
Review Comment:
aah.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java:
##########
@@ -160,11 +161,17 @@ public S3AsyncClient createS3AsyncClient(
.thresholdInBytes(parameters.getMultiPartThreshold())
.build();
- return configureClientBuilder(S3AsyncClient.builder(), parameters, conf,
bucket)
- .httpClientBuilder(httpClientBuilder)
- .multipartConfiguration(multipartConfiguration)
- .multipartEnabled(parameters.isMultipartCopy())
- .build();
+ S3AsyncClientBuilder s3AsyncClientBuilder =
+ configureClientBuilder(S3AsyncClient.builder(), parameters, conf,
bucket)
+ .httpClientBuilder(httpClientBuilder);
+
+ // TODO: Enable multi part upload with cse once it is available.
Review Comment:
Create a followup JIRA and reference it here "multipart upload pending with
HADOOP-xyz"
> AWS SDK V2 - Implement CSE
> --------------------------
>
> Key: HADOOP-18708
> URL: https://issues.apache.org/jira/browse/HADOOP-18708
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 3.4.0
> Reporter: Ahmar Suhail
> Assignee: Syed Shameerur Rahman
> Priority: Major
> Labels: pull-request-available
>
> S3 Encryption client for SDK V2 is now available, so add client side
> encryption back in.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]