steveloughran commented on code in PR #6884: URL: https://github.com/apache/hadoop/pull/6884#discussion_r1829845013
########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/EncryptionS3ClientFactory.java: ########## @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory; +import org.apache.hadoop.util.Preconditions; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.functional.LazyAtomicReference; + +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kms.KmsClient; +import software.amazon.awssdk.services.kms.KmsClientBuilder; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.encryption.s3.S3AsyncEncryptionClient; +import software.amazon.encryption.s3.S3EncryptionClient; +import software.amazon.encryption.s3.materials.CryptographicMaterialsManager; +import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager; +import software.amazon.encryption.s3.materials.Keyring; +import software.amazon.encryption.s3.materials.KmsKeyring; + +import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable; + +/** + * Factory class to create encrypted s3 client and encrypted async s3 client. + */ +public class EncryptionS3ClientFactory extends DefaultS3ClientFactory { + + /** + * Encryption client class name. + * value: {@value} + */ + private static final String ENCRYPTION_CLIENT_CLASSNAME = + "software.amazon.encryption.s3.S3EncryptionClient"; + + /** + * Encryption client availability. + */ + private static final LazyAtomicReference<Boolean> ENCRYPTION_CLIENT_AVAILABLE = + LazyAtomicReference.lazyAtomicReferenceFromSupplier( + EncryptionS3ClientFactory::checkForEncryptionClient + ); + + + /** + * S3Client to be wrapped by encryption client. + */ + private S3Client s3Client; + + /** + * S3AsyncClient to be wrapped by encryption client. + */ + private S3AsyncClient s3AsyncClient; + + /** + * Checks if {@link #ENCRYPTION_CLIENT_CLASSNAME} is available in the class path. + * @return true if available, false otherwise. + */ + private static boolean checkForEncryptionClient() { + try { + ClassLoader cl = EncryptionS3ClientFactory.class.getClassLoader(); + cl.loadClass(ENCRYPTION_CLIENT_CLASSNAME); + LOG.debug("encryption client class {} found", ENCRYPTION_CLIENT_CLASSNAME); + return true; + } catch (Exception e) { + LOG.debug("encryption client class {} not found", ENCRYPTION_CLIENT_CLASSNAME, e); + return false; + } + } + + /** + * Is the Encryption client available? + * @return true if it was found in the classloader + */ + private static synchronized boolean isEncryptionClientAvailable() { + return ENCRYPTION_CLIENT_AVAILABLE.get(); + } + + /** + * Creates both synchronous and asynchronous encrypted s3 clients. + * Synchronous client is wrapped by encryption client first and then + * Asynchronous client is wrapped by encryption client. + * @param uri S3A file system URI + * @param parameters parameter object + * @return encrypted s3 client + * @throws IOException IO failures + */ + @Override + public S3Client createS3Client(URI uri, S3ClientCreationParameters parameters) + throws IOException { + if (!isEncryptionClientAvailable()) { + throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null, + "No encryption client available"); + } + + s3Client = super.createS3Client(uri, parameters); + s3AsyncClient = super.createS3AsyncClient(uri, parameters); + + return createS3EncryptionClient(parameters); + } + + /** + * Create async encrypted s3 client. + * @param uri S3A file system URI + * @param parameters parameter object + * @return async encrypted s3 client + * @throws IOException IO failures + */ + @Override + public S3AsyncClient createS3AsyncClient(URI uri, S3ClientCreationParameters parameters) + throws IOException { + if (!isEncryptionClientAvailable()) { + throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null, + "No encryption client available"); + } + return createS3AsyncEncryptionClient(parameters); + } + + /** + * Creates an S3EncryptionClient instance based on the provided parameters. + * + * @param parameters The S3ClientCreationParameters containing the necessary configuration. + * @return An instance of S3EncryptionClient. + */ + private S3Client createS3EncryptionClient(S3ClientCreationParameters parameters) { + CSEMaterials cseMaterials = parameters.getClientSideEncryptionMaterials(); + Preconditions.checkArgument(s3AsyncClient != null, + "S3 async client not initialized"); + Preconditions.checkArgument(s3Client != null, + "S3 client not initialized"); + Preconditions.checkArgument(parameters != null, + "S3ClientCreationParameters is not initialized"); + + S3EncryptionClient.Builder s3EncryptionClientBuilder = + S3EncryptionClient.builder() + .wrappedAsyncClient(s3AsyncClient) + .wrappedClient(s3Client) + // this is required for doing S3 ranged GET calls + .enableLegacyUnauthenticatedModes(true) + // this is required for backward compatibility with older encryption clients + .enableLegacyWrappingAlgorithms(true); + + switch (cseMaterials.getCseKeyType()) { + case KMS: + Keyring kmsKeyring = createKmsKeyring(parameters, cseMaterials); + CryptographicMaterialsManager kmsCryptoMaterialsManager = + DefaultCryptoMaterialsManager.builder() + .keyring(kmsKeyring) + .build(); + s3EncryptionClientBuilder.cryptoMaterialsManager(kmsCryptoMaterialsManager); + break; + case CUSTOM: + Keyring keyring = getKeyringProvider(cseMaterials.getCustomKeyringClassName(), + cseMaterials.getConf()); + CryptographicMaterialsManager customCryptoMaterialsManager = + DefaultCryptoMaterialsManager.builder() + .keyring(keyring) + .build(); + s3EncryptionClientBuilder.cryptoMaterialsManager(customCryptoMaterialsManager); + break; + default: + break; + } + return s3EncryptionClientBuilder.build(); + } + + /** + * Creates KmsKeyring instance based on the provided S3ClientCreationParameters and CSEMaterials. + * + * @param parameters The S3ClientCreationParameters containing the necessary configuration. + * @param cseMaterials The CSEMaterials containing the KMS key ID and other encryption materials. + * @return A KmsKeyring instance configured with the appropriate KMS client and wrapping key ID. + */ + private Keyring createKmsKeyring(S3ClientCreationParameters parameters, + CSEMaterials cseMaterials) { + KmsClientBuilder kmsClientBuilder = KmsClient.builder(); + if (parameters.getCredentialSet() != null) { + kmsClientBuilder.credentialsProvider(parameters.getCredentialSet()); + } + // check if kms region is configured. + if (parameters.getKmsRegion() != null) { + kmsClientBuilder.region(Region.of(parameters.getKmsRegion())); + } else if (parameters.getRegion() != null) { + // fallback to s3 region if kms region is not configured. + kmsClientBuilder.region(Region.of(parameters.getRegion())); + } else if (parameters.getEndpoint() != null) { + // fallback to s3 endpoint config if both kms region and s3 region is not set. + String endpointStr = parameters.getEndpoint(); + URI endpoint = getS3Endpoint(endpointStr, cseMaterials.getConf()); + kmsClientBuilder.endpointOverride(endpoint); + } + return KmsKeyring.builder() + .kmsClient(kmsClientBuilder.build()) + .wrappingKeyId(cseMaterials.getKmsKeyId()) + .build(); + } + + /** + * Creates an S3AsyncEncryptionClient instance based on the provided parameters. + * + * @param parameters The S3ClientCreationParameters containing the necessary configuration. + * @return An instance of S3AsyncEncryptionClient. + */ + private S3AsyncClient createS3AsyncEncryptionClient(S3ClientCreationParameters parameters) { + Preconditions.checkArgument(s3AsyncClient != null, + "S3 async client not initialized"); + Preconditions.checkArgument(parameters != null, + "S3ClientCreationParameters is not initialized"); + + S3AsyncEncryptionClient.Builder s3EncryptionAsyncClientBuilder = + S3AsyncEncryptionClient.builder() + .wrappedClient(s3AsyncClient) + // this is required for doing S3 ranged GET calls + .enableLegacyUnauthenticatedModes(true) + // this is required for backward compatibility with older encryption clients + .enableLegacyWrappingAlgorithms(true); + + CSEMaterials cseMaterials = parameters.getClientSideEncryptionMaterials(); + switch (cseMaterials.getCseKeyType()) { + case KMS: + Keyring kmsKeyring = createKmsKeyring(parameters, cseMaterials); + CryptographicMaterialsManager kmsCryptoMaterialsManager = + DefaultCryptoMaterialsManager.builder() + .keyring(kmsKeyring) + .build(); + s3EncryptionAsyncClientBuilder.cryptoMaterialsManager(kmsCryptoMaterialsManager); + break; + case CUSTOM: + Keyring keyring = getKeyringProvider(cseMaterials.getCustomKeyringClassName(), + cseMaterials.getConf()); + CryptographicMaterialsManager customCryptoMaterialsManager = + DefaultCryptoMaterialsManager.builder() + .keyring(keyring) + .build(); + s3EncryptionAsyncClientBuilder.cryptoMaterialsManager(customCryptoMaterialsManager); + break; + default: + break; + } + return s3EncryptionAsyncClientBuilder.build(); + } + + + /** + * Retrieves an instance of the Keyring provider based on the provided class name. + * + * @param className The fully qualified class name of the Keyring provider implementation. + * @param conf The Configuration object containing the necessary configuration properties. + * @return An instance of the Keyring provider. + */ Review Comment: mark that this will throw RuntimeException if the provider class cannot be found or is of the wrong class. +need to make sure that all calling code handles this ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java: ########## @@ -266,6 +271,150 @@ public void testEncryptionEnabledAndDisabledFS() throws Exception { } } + /** + * Test to check if unencrypted objects are read with V1 client compatibility. + * @throws IOException Review Comment: cut these @throws lines; no need to explain how tests fail. ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java: ########## @@ -540,6 +555,102 @@ public Map.Entry<Duration, DeleteObjectsResponse> deleteObjects( } } + /** + * Performs a HEAD request on an S3 object to retrieve its metadata. + * + * @param key The S3 object key to perform the HEAD operation on + * @param changeTracker Tracks changes to the object's metadata across operations + * @param changeInvoker The invoker responsible for executing the HEAD request with retries + * @param fsHandler Handler for filesystem-level operations and configurations + * @param operation Description of the operation being performed for tracking purposes + * @return HeadObjectResponse containing the object's metadata + * @throws IOException If the HEAD request fails, object doesn't exist, or other I/O errors occur + */ + @Override + @Retries.RetryRaw + public HeadObjectResponse headObject(String key, + ChangeTracker changeTracker, + Invoker changeInvoker, + S3AFileSystemHandler fsHandler, + String operation) throws IOException { + HeadObjectResponse response = getStoreContext().getInvoker() + .retryUntranslated("GET " + key, true, Review Comment: change to HEAD ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java: ########## @@ -638,4 +646,15 @@ private static void skipIfCrossRegionClient( skip("Skipping test as cross region client is in use "); } } + + /** + * Unset encryption options. + * This is needed to avoid encryption tests interfering with non-encryption + * tests. + * @param conf Configuations + */ + private static void unsetEncryption(Configuration conf) { Review Comment: would it make sense to move this to S3ATestUtils and use elsewhere, unsetting *all* encryption options? Then use in ITestS3AClientSideEncryptionCustom ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionCustom.java: ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.impl.AWSHeaders; +import org.apache.hadoop.fs.s3a.impl.HeaderProcessing; + +import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled; + +/** + * Tests to verify Custom S3 client side encryption CSE-CUSTOM. + */ +public class ITestS3AClientSideEncryptionCustom extends ITestS3AClientSideEncryption { + + private static final String KMS_KEY_WRAP_ALGO = "kms+context"; + /** + * Creating custom configs for CSE-CUSTOM testing. + * + * @return Configuration. + */ + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + S3ATestUtils.disableFilesystemCaching(conf); + conf.set(S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME, + CustomKeyring.class.getCanonicalName()); + return conf; + } + + @Override + protected void maybeSkipTest() throws IOException { + skipIfEncryptionTestsDisabled(getConfiguration()); + // skip the test if CSE-CUSTOM is not set. + skipIfEncryptionNotSet(getConfiguration(), S3AEncryptionMethods.CSE_CUSTOM); + } + + + @Override + protected void assertEncrypted(Path path) throws IOException { + Map<String, byte[]> fsXAttrs = getFileSystem().getXAttrs(path); + String xAttrPrefix = "header."; + + // Assert KeyWrap Algo + assertEquals("Key wrap algo isn't same as expected", KMS_KEY_WRAP_ALGO, Review Comment: assertJ assertThat ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AFileSystemHandler.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; + +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Listing; +import org.apache.hadoop.fs.s3a.S3AEncryptionMethods; +import org.apache.hadoop.fs.s3a.S3AStore; +import org.apache.hadoop.fs.s3a.S3ClientFactory; +import org.apache.hadoop.fs.s3a.api.RequestFactory; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; + +/** + * An interface that defines the contract for handling certain filesystem operations. + */ +public interface S3AFileSystemHandler { Review Comment: yes ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/EncryptionS3ClientFactory.java: ########## @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; Review Comment: move block below the software. one ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java: ########## @@ -156,7 +159,12 @@ public void testGeneratePoolTimeouts() throws Throwable { ContractTestUtils.createFile(fs, path, true, DATASET); final FileStatus st = fs.getFileStatus(path); try (FileSystem brittleFS = FileSystem.newInstance(fs.getUri(), conf)) { - intercept(ConnectTimeoutException.class, () -> { + Class exceptionClass = ConnectTimeoutException.class; Review Comment: won't be needed once the translation is fixed. ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEV1CompatibleS3AFileSystemHandler.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; Review Comment: nit, put block below the "other" package ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java: ########## @@ -200,9 +203,14 @@ public void testSingleObjectDeleteNoPermissionsTranslated() throws Throwable { Path path = requireDefaultExternalData(getConfiguration()); S3AFileSystem fs = (S3AFileSystem) path.getFileSystem( getConfiguration()); - AccessDeniedException aex = intercept(AccessDeniedException.class, + Class exceptionClass = AccessDeniedException.class; + if (CSEUtils.isCSEEnabled(getEncryptionAlgorithm( + getTestBucketName(getConfiguration()), getConfiguration()).getMethod())) { + exceptionClass = AWSClientIOException.class; Review Comment: can you show me the full stack? 403 normally maps to AccessDeniedException, and it'd be good to keep the same. AWSClientIOException is just our "something failed" if there's nothing else 1. Add `maybeTranslateEncryptionClientException()` in `ErrorTranslation` to look at the exception, if the classname string value matches "software.amazon.encryption.s3.S3EncryptionClientException" then map to AccessDeniedException 2. call that to S3AUtils.translateException just before the ` // no custom handling.` bit It may seem bad, but look at `maybeExtractChannelException()` and other bits to see worse. ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java: ########## @@ -266,6 +271,150 @@ public void testEncryptionEnabledAndDisabledFS() throws Exception { } } + /** + * Test to check if unencrypted objects are read with V1 client compatibility. + * @throws IOException + * @throws Exception + */ + @Test + public void testUnencryptedObjectReadWithV1CompatibilityConfig() throws Exception { + maybeSkipTest(); + // initialize base s3 client. + Configuration conf = new Configuration(getConfiguration()); + removeBaseAndBucketOverrides(getTestBucketName(conf), + conf, + S3_ENCRYPTION_ALGORITHM, + S3_ENCRYPTION_KEY, + SERVER_SIDE_ENCRYPTION_ALGORITHM, + SERVER_SIDE_ENCRYPTION_KEY); + + Path file = methodPath(); + + try (S3AFileSystem nonCseFs = createTestFileSystem(conf)) { + nonCseFs.initialize(getFileSystem().getUri(), conf); + + // write unencrypted file + ContractTestUtils.writeDataset(nonCseFs, file, new byte[SMALL_FILE_SIZE], + SMALL_FILE_SIZE, SMALL_FILE_SIZE, true); + } + + Configuration cseConf = new Configuration(getConfiguration()); + cseConf.setBoolean(S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED, true); + + // create filesystem with cse enabled and v1 compatibility. + try (S3AFileSystem cseFs = createTestFileSystem(cseConf)) { + cseFs.initialize(getFileSystem().getUri(), cseConf); + + // read unencrypted file. It should not throw any exception. + try (FSDataInputStream in = cseFs.open(file)) { + in.read(new byte[SMALL_FILE_SIZE]); + } + } + } + + /** + * Tests the size of an encrypted object when with V1 compatibility and custom header length. + * + * @throws Exception If any error occurs during the test execution. + */ + @Test + public void testSizeOfEncryptedObjectFromHeaderWithV1Compatibility() throws Exception { + maybeSkipTest(); + Configuration cseConf = new Configuration(getConfiguration()); + cseConf.setBoolean(S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED, true); + try (S3AFileSystem fs = createTestFileSystem(cseConf)) { + fs.initialize(getFileSystem().getUri(), cseConf); + + Path filePath = methodPath(); + Path file = new Path(filePath, "file"); + String key = fs.pathToKey(file); + + // write object with random content length header + Map<String, String> metadata = new HashMap<>(); + metadata.put(AWSHeaders.UNENCRYPTED_CONTENT_LENGTH, "10"); + try (AuditSpan span = span()) { + RequestFactory factory = RequestFactoryImpl.builder() + .withBucket(fs.getBucket()) + .build(); + PutObjectRequest.Builder putObjectRequestBuilder = + factory.newPutObjectRequestBuilder(key, + null, SMALL_FILE_SIZE, false); + putObjectRequestBuilder.contentLength(Long.parseLong(String.valueOf(SMALL_FILE_SIZE))); + putObjectRequestBuilder.metadata(metadata); + fs.putObjectDirect(putObjectRequestBuilder.build(), + PutObjectOptions.deletingDirs(), + new S3ADataBlocks.BlockUploadData(new byte[SMALL_FILE_SIZE], null), + null); + + // fetch the random content length + long contentLength = fs.getFileStatus(file).getLen(); + assertEquals("content length does not match", 10, contentLength); Review Comment: prefer AssertJ assertThat() here, and include the whole status, and put it in its own method or use below ``` void assertFileLength(fs, path, expected) st = fs.getFileStatus(path) Assertions.assertThat(fs.getLen()) .describedAs("Length of %s status: %s", path, st) .isEqualTo(expected) ``` then call here and below as needed ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionCustom.java: ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.impl.AWSHeaders; +import org.apache.hadoop.fs.s3a.impl.HeaderProcessing; + +import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled; + +/** + * Tests to verify Custom S3 client side encryption CSE-CUSTOM. + */ +public class ITestS3AClientSideEncryptionCustom extends ITestS3AClientSideEncryption { + + private static final String KMS_KEY_WRAP_ALGO = "kms+context"; + /** + * Creating custom configs for CSE-CUSTOM testing. + * + * @return Configuration. + */ + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + S3ATestUtils.disableFilesystemCaching(conf); + conf.set(S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME, Review Comment: remove any bucket overrides for this option. ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/EncryptionS3ClientFactory.java: ########## @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory; +import org.apache.hadoop.util.Preconditions; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.functional.LazyAtomicReference; + +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kms.KmsClient; +import software.amazon.awssdk.services.kms.KmsClientBuilder; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.encryption.s3.S3AsyncEncryptionClient; +import software.amazon.encryption.s3.S3EncryptionClient; +import software.amazon.encryption.s3.materials.CryptographicMaterialsManager; +import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager; +import software.amazon.encryption.s3.materials.Keyring; +import software.amazon.encryption.s3.materials.KmsKeyring; + +import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable; + +/** + * Factory class to create encrypted s3 client and encrypted async s3 client. + */ +public class EncryptionS3ClientFactory extends DefaultS3ClientFactory { + + /** + * Encryption client class name. + * value: {@value} + */ + private static final String ENCRYPTION_CLIENT_CLASSNAME = + "software.amazon.encryption.s3.S3EncryptionClient"; + + /** + * Encryption client availability. + */ + private static final LazyAtomicReference<Boolean> ENCRYPTION_CLIENT_AVAILABLE = + LazyAtomicReference.lazyAtomicReferenceFromSupplier( + EncryptionS3ClientFactory::checkForEncryptionClient + ); + + + /** + * S3Client to be wrapped by encryption client. + */ + private S3Client s3Client; + + /** + * S3AsyncClient to be wrapped by encryption client. + */ + private S3AsyncClient s3AsyncClient; + + /** + * Checks if {@link #ENCRYPTION_CLIENT_CLASSNAME} is available in the class path. + * @return true if available, false otherwise. + */ + private static boolean checkForEncryptionClient() { + try { + ClassLoader cl = EncryptionS3ClientFactory.class.getClassLoader(); + cl.loadClass(ENCRYPTION_CLIENT_CLASSNAME); + LOG.debug("encryption client class {} found", ENCRYPTION_CLIENT_CLASSNAME); + return true; + } catch (Exception e) { + LOG.debug("encryption client class {} not found", ENCRYPTION_CLIENT_CLASSNAME, e); + return false; + } + } + + /** + * Is the Encryption client available? + * @return true if it was found in the classloader + */ + private static synchronized boolean isEncryptionClientAvailable() { + return ENCRYPTION_CLIENT_AVAILABLE.get(); + } + + /** + * Creates both synchronous and asynchronous encrypted s3 clients. + * Synchronous client is wrapped by encryption client first and then + * Asynchronous client is wrapped by encryption client. + * @param uri S3A file system URI + * @param parameters parameter object + * @return encrypted s3 client + * @throws IOException IO failures + */ + @Override + public S3Client createS3Client(URI uri, S3ClientCreationParameters parameters) + throws IOException { + if (!isEncryptionClientAvailable()) { + throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null, + "No encryption client available"); + } + + s3Client = super.createS3Client(uri, parameters); + s3AsyncClient = super.createS3AsyncClient(uri, parameters); + + return createS3EncryptionClient(parameters); + } + + /** + * Create async encrypted s3 client. + * @param uri S3A file system URI + * @param parameters parameter object + * @return async encrypted s3 client + * @throws IOException IO failures + */ + @Override + public S3AsyncClient createS3AsyncClient(URI uri, S3ClientCreationParameters parameters) + throws IOException { + if (!isEncryptionClientAvailable()) { + throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null, + "No encryption client available"); + } + return createS3AsyncEncryptionClient(parameters); + } + + /** + * Creates an S3EncryptionClient instance based on the provided parameters. + * + * @param parameters The S3ClientCreationParameters containing the necessary configuration. + * @return An instance of S3EncryptionClient. + */ + private S3Client createS3EncryptionClient(S3ClientCreationParameters parameters) { + CSEMaterials cseMaterials = parameters.getClientSideEncryptionMaterials(); + Preconditions.checkArgument(s3AsyncClient != null, + "S3 async client not initialized"); + Preconditions.checkArgument(s3Client != null, + "S3 client not initialized"); + Preconditions.checkArgument(parameters != null, + "S3ClientCreationParameters is not initialized"); + + S3EncryptionClient.Builder s3EncryptionClientBuilder = + S3EncryptionClient.builder() + .wrappedAsyncClient(s3AsyncClient) + .wrappedClient(s3Client) + // this is required for doing S3 ranged GET calls + .enableLegacyUnauthenticatedModes(true) + // this is required for backward compatibility with older encryption clients + .enableLegacyWrappingAlgorithms(true); + + switch (cseMaterials.getCseKeyType()) { + case KMS: + Keyring kmsKeyring = createKmsKeyring(parameters, cseMaterials); + CryptographicMaterialsManager kmsCryptoMaterialsManager = + DefaultCryptoMaterialsManager.builder() + .keyring(kmsKeyring) + .build(); + s3EncryptionClientBuilder.cryptoMaterialsManager(kmsCryptoMaterialsManager); + break; + case CUSTOM: + Keyring keyring = getKeyringProvider(cseMaterials.getCustomKeyringClassName(), + cseMaterials.getConf()); + CryptographicMaterialsManager customCryptoMaterialsManager = + DefaultCryptoMaterialsManager.builder() + .keyring(keyring) + .build(); + s3EncryptionClientBuilder.cryptoMaterialsManager(customCryptoMaterialsManager); + break; + default: + break; Review Comment: if there is no match, log at debug? is there any actual problem from no match? ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEV1CompatibleS3AFileSystemHandler.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Listing; +import org.apache.hadoop.fs.s3a.S3AStore; +import org.apache.hadoop.fs.s3a.S3ClientFactory; +import org.apache.hadoop.fs.s3a.api.RequestFactory; +import org.apache.hadoop.util.ReflectionUtils; + +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; + +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_S3_CLIENT_FACTORY_IMPL; +import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL; +import static org.apache.hadoop.fs.s3a.impl.CSEUtils.isObjectEncrypted; + +/** + * An extension of the {@link CSES3AFileSystemHandler} class. + * This handles certain file system operations when client-side encryption is enabled with v1 client + * compatibility. + * {@link org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED}. + */ +public class CSEV1CompatibleS3AFileSystemHandler extends CSES3AFileSystemHandler { + + /** + * Constructs a new instance of {@code CSEV1CompatibleS3AFileSystemHandler}. + */ + public CSEV1CompatibleS3AFileSystemHandler() { + } + + /** + * {@inheritDoc} + * + * <p>This implementation returns a {@link Listing.AcceptAllButS3nDirsAndCSEInstructionFile} + * or {@link Listing.AcceptAllButSelfAndS3nDirsAndCSEInstructionFile} object + * based on the value of the {@code includeSelf} parameter. + */ + @Override + public Listing.FileStatusAcceptor getFileStatusAcceptor(Path path, boolean includeSelf) { + return includeSelf + ? new Listing.AcceptAllButS3nDirsAndCSEInstructionFile() + : new Listing.AcceptAllButSelfAndS3nDirsAndCSEInstructionFile(path); + } + + /** + * Returns a {@link Listing.FileStatusAcceptor} object. + * That determines which files and directories should be included in a listing operation. + * + * @param path the path for which the listing is being performed + * @return a {@link Listing.FileStatusAcceptor} object + */ + @Override + public Listing.FileStatusAcceptor getFileStatusAcceptor(Path path) { + return new Listing.AcceptFilesOnlyExceptCSEInstructionFile(path); + } + + /** + * Retrieves an object from the S3. + * If the S3 object is encrypted, it uses the encrypted S3 client to retrieve the object else + * it uses the unencrypted S3 client. + * + * @param store The S3AStore object representing the S3 bucket. + * @param request The GetObjectRequest containing the details of the object to retrieve. + * @param factory The RequestFactory used to create the GetObjectRequest. + * @return A ResponseInputStream containing the GetObjectResponse. + * @throws IOException If an error occurs while retrieving the object. + */ + @Override + public ResponseInputStream<GetObjectResponse> getObject(S3AStore store, GetObjectRequest request, + RequestFactory factory) throws IOException { + boolean isEncrypted = isObjectEncrypted(store.getOrCreateS3Client(), factory, request.key()); + return isEncrypted ? store.getOrCreateS3Client().getObject(request) Review Comment: aah. ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java: ########## @@ -160,11 +161,17 @@ public S3AsyncClient createS3AsyncClient( .thresholdInBytes(parameters.getMultiPartThreshold()) .build(); - return configureClientBuilder(S3AsyncClient.builder(), parameters, conf, bucket) - .httpClientBuilder(httpClientBuilder) - .multipartConfiguration(multipartConfiguration) - .multipartEnabled(parameters.isMultipartCopy()) - .build(); + S3AsyncClientBuilder s3AsyncClientBuilder = + configureClientBuilder(S3AsyncClient.builder(), parameters, conf, bucket) + .httpClientBuilder(httpClientBuilder); + + // TODO: Enable multi part upload with cse once it is available. Review Comment: Create a followup JIRA and reference it here "multipart upload pending with HADOOP-xyz" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
