[ 
https://issues.apache.org/jira/browse/HADOOP-18708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17870498#comment-17870498
 ] 

ASF GitHub Bot commented on HADOOP-18708:
-----------------------------------------

ahmarsuhail commented on code in PR #6884:
URL: https://github.com/apache/hadoop/pull/6884#discussion_r1701693660


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/EncryptionS3ClientFactory.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.impl.encryption.CSEMaterials;
+import org.apache.hadoop.util.Preconditions;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.functional.LazyAtomicReference;
+
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.encryption.s3.S3AsyncEncryptionClient;
+import software.amazon.encryption.s3.S3EncryptionClient;
+import software.amazon.encryption.s3.materials.CryptographicMaterialsManager;
+import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager;
+import software.amazon.encryption.s3.materials.Keyring;
+
+import static 
org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;
+
+/**
+ * Factory class to create encrypted s3 client and encrypted async s3 client.
+ */
+public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {
+
+  private static final String ENCRYPTION_CLIENT_CLASSNAME =
+      "software.amazon.encryption.s3.S3EncryptionClient";
+
+  /**
+   * Encryption client availability.
+   */
+  private static final LazyAtomicReference<Boolean> 
ENCRYPTION_CLIENT_AVAILABLE =
+      LazyAtomicReference.lazyAtomicReferenceFromSupplier(
+          EncryptionS3ClientFactory::checkForEncryptionClient
+      );
+
+
+  /**
+   * S3Client to be wrapped by encryption client.
+   */
+  private S3Client s3Client;
+
+  /**
+   * S3AsyncClient to be wrapped by encryption client.
+   */
+  private S3AsyncClient s3AsyncClient;
+
+  private static boolean checkForEncryptionClient() {
+    try {
+      ClassLoader cl = EncryptionS3ClientFactory.class.getClassLoader();
+      cl.loadClass(ENCRYPTION_CLIENT_CLASSNAME);
+      LOG.debug("encryption client class {} found", 
ENCRYPTION_CLIENT_CLASSNAME);
+      return true;
+    } catch (Exception e) {
+      LOG.debug("encryption client class {} not found", 
ENCRYPTION_CLIENT_CLASSNAME, e);
+      return false;
+    }
+  }
+
+  /**
+   * Is the Encryption client available?
+   * @return true if it was found in the classloader
+   */
+  private static synchronized boolean isEncryptionClientAvailable() {
+    return ENCRYPTION_CLIENT_AVAILABLE.get();
+  }
+
+  /**
+   * Creates both synchronous and asynchronous encrypted s3 clients.
+   * Synchronous client is wrapped by encryption client first and then
+   * Asynchronous client is wrapped by encryption client.
+   * @param uri S3A file system URI
+   * @param parameters parameter object
+   * @return encrypted s3 client
+   * @throws IOException IO failures
+   */
+  @Override
+  public S3Client createS3Client(URI uri, S3ClientCreationParameters 
parameters)
+      throws IOException {
+    if (!isEncryptionClientAvailable()) {
+      throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
+          "No encryption client available");
+    }
+
+    s3Client = super.createS3Client(uri, parameters);
+    s3AsyncClient = super.createS3AsyncClient(uri, parameters);
+
+    return 
createS3EncryptionClient(parameters.getClientSideEncryptionMaterials());
+  }
+
+  /**
+   * Create async encrypted s3 client.
+   * @param uri S3A file system URI
+   * @param parameters parameter object
+   * @return async encrypted s3 client
+   * @throws IOException IO failures
+   */
+  @Override
+  public S3AsyncClient createS3AsyncClient(URI uri, S3ClientCreationParameters 
parameters)
+      throws IOException {
+    if (!isEncryptionClientAvailable()) {
+      throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
+          "No encryption client available");
+    }
+    return 
createS3AsyncEncryptionClient(parameters.getClientSideEncryptionMaterials());
+  }
+
+  /**
+   * Create encrypted s3 client.
+   * @param cseMaterials
+   * @return encrypted s3 client
+   */
+  private S3Client createS3EncryptionClient(final CSEMaterials cseMaterials) {
+    Preconditions.checkArgument(s3AsyncClient !=null,
+        "S3 async client not initialized");
+    Preconditions.checkArgument(s3Client !=null,
+        "S3 client not initialized");
+    S3EncryptionClient.Builder s3EncryptionClientBuilder =
+        
S3EncryptionClient.builder().wrappedAsyncClient(s3AsyncClient).wrappedClient(s3Client)
+            // this is required for doing S3 ranged GET calls
+            .enableLegacyUnauthenticatedModes(true)
+            // this is required for backward compatibility with older 
encryption clients
+            .enableLegacyWrappingAlgorithms(true);
+
+    switch (cseMaterials.getCseKeyType()) {
+    case KMS:
+      s3EncryptionClientBuilder.kmsKeyId(cseMaterials.getKmsKeyId());
+      break;
+    case CUSTOM:
+      Keyring keyring = 
getKeyringProvider(cseMaterials.getCustomKeyringClassName(),
+          cseMaterials.getConf());
+      CryptographicMaterialsManager cmm =  
DefaultCryptoMaterialsManager.builder()
+          .keyring(keyring)
+          .build();
+      s3EncryptionClientBuilder.cryptoMaterialsManager(cmm);
+      break;
+    default:
+      break;
+    }
+
+    return s3EncryptionClientBuilder.build();
+  }
+
+  /**
+   * Create async encrypted s3 client.
+   * @param cseMaterials
+   * @return encrypted async s3 client
+   */
+  private S3AsyncClient createS3AsyncEncryptionClient(final CSEMaterials 
cseMaterials) {
+    Preconditions.checkArgument(s3AsyncClient !=null,
+        "S3 async client not initialized");
+    S3AsyncEncryptionClient.Builder s3EncryptionAsyncClientBuilder =
+        S3AsyncEncryptionClient.builder().wrappedClient(s3AsyncClient)
+            // this is required for doing S3 ranged GET calls
+            .enableLegacyUnauthenticatedModes(true)
+            // this is required for backward compatibility with older 
encryption clients
+            .enableLegacyWrappingAlgorithms(true);
+
+    switch (cseMaterials.getCseKeyType()) {

Review Comment:
   is it possible to dedupe this code for sync and async using generics? See 
DefaultS3ClientFactory.configureClientBuilder for an example. 



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BaseS3AFileSystemHandler.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+
+/**
+ * An implementation of the {@link S3AFileSystemHandler} interface.
+ * This handles file system operations when client-side encryption is disabled 
or
+ * {@link 
org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_SKIP_INSTRUCTION_FILE} is 
disabled.
+ */
+public class BaseS3AFileSystemHandler implements S3AFileSystemHandler {
+
+  private final S3Client s3Client;
+
+  /**
+   * Constructs a new instance of {@code BaseS3AFileSystemHandler} with a null 
value.
+   */
+  public BaseS3AFileSystemHandler() {
+    this(null);
+  }
+
+  /**
+   * Constructs a new instance of {@code BaseS3AFileSystemHandler} with 
theprovided S3 client.

Review Comment:
   nit: space b/w the provided



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java:
##########
@@ -237,7 +247,9 @@ public RemoteIterator<S3ALocatedFileStatus> 
getLocatedFileStatusIteratorForDir(
             listingOperationCallbacks
                 .createListObjectsRequest(key, "/", span),
             filter,
-            new AcceptAllButSelfAndS3nDirs(dir),
+            isCSEEnabled ?

Review Comment:
   can you not just pass in the handler and reuse the logic? 
s3AFileSystemHandler.getFileStatusAcceptor ?



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/encryption/CSEUtils.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.encryption;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.fs.s3a.impl.InternalConstants;
+import org.apache.hadoop.util.Preconditions;
+
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+
+import static 
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME;
+import static 
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX;
+import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.CSE_CUSTOM;
+import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.CSE_KMS;
+import static org.apache.hadoop.fs.s3a.S3AUtils.formatRange;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.CRYPTO_CEK_ALGORITHM;
+import static 
org.apache.hadoop.fs.s3a.impl.AWSHeaders.UNENCRYPTED_CONTENT_LENGTH;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
+
+/**
+ * S3 client side encryption (CSE) utility class.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class CSEUtils {
+
+  private CSEUtils() {
+  }
+
+  /**
+   * Checks if the file suffix ends CSE file suffix.
+   * {@link 
org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX}
+   * when the config
+   * @param key file name
+   * @return true if file name ends with CSE instruction file suffix
+   */
+  public static boolean isCSEInstructionFile(String key) {
+    return key.endsWith(S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+  }
+
+  /**
+   * Checks if CSE-KMS or CSE-CUSTOM is set.
+   * @param encryptionMethod type of encryption used
+   * @return true if encryption method is CSE-KMS or CSE-CUSTOM
+   */
+  public static boolean isCSEKmsOrCustom(String encryptionMethod) {
+    return CSE_KMS.getMethod().equals(encryptionMethod) ||
+        CSE_CUSTOM.getMethod().equals(encryptionMethod);
+  }
+
+  /**
+   * Checks if a given S3 object is encrypted or not by checking following two 
conditions
+   * 1. if object metadata contains x-amz-cek-alg
+   * 2. if instruction file is present
+   *
+   * @param s3Client S3 client
+   * @param factory   S3 request factory
+   * @param key      key value of the s3 object
+   * @return true if S3 object is encrypted
+   */
+  public static boolean isObjectEncrypted(S3Client s3Client, RequestFactory 
factory, String key) {
+    HeadObjectRequest.Builder requestBuilder = 
factory.newHeadObjectRequestBuilder(key);
+    HeadObjectResponse headObjectResponse = 
s3Client.headObject(requestBuilder.build());
+    if (headObjectResponse.hasMetadata() &&

Review Comment:
   curious, if it's encrypted won't it always have CRYPTO_CEK_ALGORITHM? Why do 
we check for the presence of the instruction file, if there is no 
CRYPTO_CEK_ALGORITHM?



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java:
##########
@@ -746,6 +746,69 @@ private Constants() {
   public static final String S3_ENCRYPTION_CONTEXT =
       "fs.s3a.encryption.context";
 
+  /**
+   * Client side encryption (CSE-CUSTOM) with custom cryptographic material 
manager class name.
+   * Custom keyring class name for CSE-KMS.
+   * value:{@value}
+   */
+  public static final String S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME =
+          "fs.s3a.encryption.cse.custom.keyring.class.name";
+
+  /**
+   * This config initializes unencrypted s3 client will be used to access 
unencrypted
+   * s3 object. This is to provide backward compatibility.
+   * Config to support reading unencrypted s3 objects when CSE is enabled.
+   * This is to provide backward compatibility with V1/V2 client.
+   * value:{@value}
+   */
+  public static final String S3_ENCRYPTION_CSE_READ_UNENCRYPTED_OBJECTS =
+          "fs.s3a.encryption.cse.read.unencrypted.objects";
+
+  /**
+   * Default value : {@value}.
+   */
+  public static final boolean 
S3_ENCRYPTION_CSE_READ_UNENCRYPTED_OBJECTS_DEFAULT = false;
+
+  /**
+   * Config to calculate the size of unencrypted object size using ranged S3 
calls.
+   * This is to provide backward compatability with objects encrypted with V1 
client.
+   * Unlike V2 and V3 client which always pads 16 bytes, V1 client pads bytes 
till the
+   * object size reaches next multiple of 16.
+   * This is to provide backward compatibility.
+   * This is to provide backward compatibility with V1 client.
+   * value:{@value}
+   */
+  public static final String 
S3_ENCRYPTION_CSE_OBJECT_SIZE_FROM_RANGED_GET_ENABLED =

Review Comment:
    can you rename this config? RANGED_GET_ENABLED is quite confusing, so maybe 
something to reflect that you should only switch this on if you need v1 
compatibility, maybe fs.s3a.encryption.cse.v1.compatibility.enabled or 
something?



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java:
##########
@@ -763,6 +778,61 @@ public boolean accept(FileStatus status) {
     }
   }
 
+  /**
+   * Accept all entries except the base path and those which map to S3N
+   * pseudo directory markers and CSE instruction file.
+   */
+  static class AcceptFilesOnlyExceptCSEInstructionFile implements 
FileStatusAcceptor {

Review Comment:
   this is never used



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java:
##########
@@ -76,17 +80,23 @@ public class Listing extends AbstractStoreOperation {
 
   private static final Logger LOG = S3AFileSystem.LOG;
   private final boolean isCSEEnabled;
+  private final S3Client s3Client;
+  private final boolean skipCSEInstructionFile;
 
   static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
       new AcceptAllButS3nDirs();
 
   private final ListingOperationCallbacks listingOperationCallbacks;
 
   public Listing(ListingOperationCallbacks listingOperationCallbacks,
-      StoreContext storeContext) {
+      StoreContext storeContext, S3Client s3Client) {
     super(storeContext);
     this.listingOperationCallbacks = listingOperationCallbacks;
     this.isCSEEnabled = storeContext.isCSEEnabled();
+    this.skipCSEInstructionFile = isCSEEnabled &&
+        
storeContext.getConfiguration().getBoolean(S3_ENCRYPTION_CSE_SKIP_INSTRUCTION_FILE,
+            S3_ENCRYPTION_CSE_SKIP_INSTRUCTION_FILE_DEFAULT);
+    this.s3Client = s3Client;

Review Comment:
   s3Client and skipCSEInstructionFile never seems to be used



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -1109,6 +1136,44 @@ private ClientManager createClientManager(URI fsURI, 
boolean dtEnabled) throws I
         S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
         S3ClientFactory.class);
 
+    S3ClientFactory clientFactory;
+    S3ClientFactory unecnryptedClientFactory = null;
+    CSEMaterials cseMaterials = null;
+
+    if (isCSEEnabled) {

Review Comment:
   this still needs to be addressed?



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java:
##########
@@ -153,11 +154,16 @@ public S3AsyncClient createS3AsyncClient(
         .thresholdInBytes(parameters.getMultiPartThreshold())
         .build();
 
-    return configureClientBuilder(S3AsyncClient.builder(), parameters, conf, 
bucket)
-        .httpClientBuilder(httpClientBuilder)
-        .multipartConfiguration(multipartConfiguration)
-        .multipartEnabled(parameters.isMultipartCopy())
-        .build();
+    S3AsyncClientBuilder s3AsyncClientBuilder =
+            configureClientBuilder(S3AsyncClient.builder(), parameters, conf, 
bucket)
+                .httpClientBuilder(httpClientBuilder);
+
+    if (!parameters.isClientSideEncryptionEnabled()) {

Review Comment:
   are you sure we still need this? what happens without it? IIRC, I had to do 
this because if you enabled multipart, all ranged GETs failed. see 
https://github.com/apache/hadoop/pull/6164/files#r1352852582 for more info. I'm 
hoping this should have been fixed by the SDK by team now 



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java:
##########
@@ -763,6 +778,61 @@ public boolean accept(FileStatus status) {
     }
   }
 
+  /**
+   * Accept all entries except the base path and those which map to S3N
+   * pseudo directory markers and CSE instruction file.
+   */
+  static class AcceptFilesOnlyExceptCSEInstructionFile implements 
FileStatusAcceptor {

Review Comment:
   there are soo many of these acceptors now, it makes the Listing class hard 
to read. could you review which ones we actually need and then move to them to 
a separate class? For example, I also see that AcceptFilesOnly is never used. 



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java:
##########
@@ -266,7 +278,9 @@ public RemoteIterator<S3ALocatedFileStatus> 
getLocatedFileStatusIteratorForDir(
         path,
         request,
         ACCEPT_ALL,
-        new AcceptAllButSelfAndS3nDirs(path),
+        isCSEEnabled ?

Review Comment:
   same as above



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/encryption/CSEUtils.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.encryption;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.fs.s3a.impl.InternalConstants;
+import org.apache.hadoop.util.Preconditions;
+
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+
+import static 
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME;
+import static 
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX;
+import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.CSE_CUSTOM;
+import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.CSE_KMS;
+import static org.apache.hadoop.fs.s3a.S3AUtils.formatRange;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.CRYPTO_CEK_ALGORITHM;
+import static 
org.apache.hadoop.fs.s3a.impl.AWSHeaders.UNENCRYPTED_CONTENT_LENGTH;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
+
+/**
+ * S3 client side encryption (CSE) utility class.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class CSEUtils {
+
+  private CSEUtils() {
+  }
+
+  /**
+   * Checks if the file suffix ends CSE file suffix.
+   * {@link 
org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX}
+   * when the config
+   * @param key file name
+   * @return true if file name ends with CSE instruction file suffix
+   */
+  public static boolean isCSEInstructionFile(String key) {
+    return key.endsWith(S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+  }
+
+  /**
+   * Checks if CSE-KMS or CSE-CUSTOM is set.
+   * @param encryptionMethod type of encryption used
+   * @return true if encryption method is CSE-KMS or CSE-CUSTOM
+   */
+  public static boolean isCSEKmsOrCustom(String encryptionMethod) {
+    return CSE_KMS.getMethod().equals(encryptionMethod) ||
+        CSE_CUSTOM.getMethod().equals(encryptionMethod);
+  }
+
+  /**
+   * Checks if a given S3 object is encrypted or not by checking following two 
conditions
+   * 1. if object metadata contains x-amz-cek-alg
+   * 2. if instruction file is present
+   *
+   * @param s3Client S3 client
+   * @param factory   S3 request factory
+   * @param key      key value of the s3 object
+   * @return true if S3 object is encrypted
+   */
+  public static boolean isObjectEncrypted(S3Client s3Client, RequestFactory 
factory, String key) {
+    HeadObjectRequest.Builder requestBuilder = 
factory.newHeadObjectRequestBuilder(key);
+    HeadObjectResponse headObjectResponse = 
s3Client.headObject(requestBuilder.build());
+    if (headObjectResponse.hasMetadata() &&
+        headObjectResponse.metadata().get(CRYPTO_CEK_ALGORITHM) != null) {
+      return true;
+    }
+    HeadObjectRequest.Builder instructionFileRequestBuilder =
+        factory.newHeadObjectRequestBuilder(key + 
S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+    try {
+      s3Client.headObject(instructionFileRequestBuilder.build());
+      return true;
+    } catch (NoSuchKeyException e) {
+      // Ignore. This indicates no instruction file is present
+    }
+    return false;
+  }
+
+  /**
+   * Get the unencrypted object length by either subtracting
+   * {@link InternalConstants#CSE_PADDING_LENGTH} from the object size or 
calculating the
+   * actual size by doing S3 ranged GET operation.
+   *
+   * @param s3Client           S3 client
+   * @param bucket             bucket name of the s3 object
+   * @param key                key value of the s3 object
+   * @param factory            S3 request factory
+   * @param contentLength      S3 object length
+   * @param headObjectResponse response from headObject call
+   * @param cseRangedGetEnabled is ranged get enabled
+   * @param cseReadUnencryptedObjects is reading of une
+   * @return unencrypted length of the object
+   * @throws IOException IO failures
+   */
+  public static long getUnencryptedObjectLength(S3Client s3Client,
+      String bucket,
+      String key,
+      RequestFactory factory,
+      long contentLength,
+      HeadObjectResponse headObjectResponse,
+      boolean cseRangedGetEnabled,
+      boolean cseReadUnencryptedObjects) throws IOException {
+
+    if (cseReadUnencryptedObjects) {
+      // if object is unencrypted, return the actual size
+      if (!isObjectEncrypted(s3Client, factory, key)) {
+        return contentLength;
+      }
+    }
+
+    // check if unencrypted content length metadata is present or not.
+    if (headObjectResponse != null) {
+      String plaintextLength = 
headObjectResponse.metadata().get(UNENCRYPTED_CONTENT_LENGTH);
+      if (headObjectResponse.hasMetadata() && plaintextLength !=null &&
+          !plaintextLength.isEmpty()) {
+        return Long.parseLong(plaintextLength);
+      }
+    }
+
+    if (cseRangedGetEnabled) {

Review Comment:
   I think previously we always just subtracted 16, was that incorrect? 





> AWS SDK V2 - Implement CSE
> --------------------------
>
>                 Key: HADOOP-18708
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18708
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 3.4.0
>            Reporter: Ahmar Suhail
>            Assignee: Syed Shameerur Rahman
>            Priority: Major
>              Labels: pull-request-available
>
> S3 Encryption client for SDK V2 is now available, so add client side 
> encryption back in. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to