steveloughran commented on a change in pull request #2706:
URL: https://github.com/apache/hadoop/pull/2706#discussion_r658081650



##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
##########
@@ -94,9 +105,81 @@ public AmazonS3 createS3Client(
       awsConf.setUserAgentSuffix(parameters.getUserAgentSuffix());
     }
 
-    return buildAmazonS3Client(
-        awsConf,
-        parameters);
+    if (conf.get(CLIENT_SIDE_ENCRYPTION_METHOD) == null) {
+      return buildAmazonS3Client(
+          awsConf,
+          parameters);
+    } else {
+      return newAmazonS3EncryptionClient(
+          awsConf,
+          parameters);
+    }
+  }
+
+  /**
+   * Create an {@link AmazonS3} client of type
+   * {@link AmazonS3EncryptionV2} if CSE is enabled.
+   *
+   * @param awsConf    AWS configuration.
+   * @param parameters parameters
+   *
+   * @return new AmazonS3 client.
+   */
+  protected AmazonS3 newAmazonS3EncryptionClient(
+      final ClientConfiguration awsConf,
+      final S3ClientCreationParameters parameters){
+
+    AmazonS3 client;
+    AmazonS3EncryptionClientV2Builder builder =

Review comment:
       As this extends AmazonS3Builder, it should be possible for all the 
common setup code (endpoints, regions etc) to be pulled out into a shared 
method. This will help us the next time we have to do emergency fixes, and will 
ensure that changes to the normal path also get picked up by by the new client

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
##########
@@ -356,6 +357,19 @@
   private AuditManagerS3A auditManager =
       AuditIntegration.stubAuditManager();
 
+  /**
+   * S3 client side encryption adds padding to the content length of constant
+   * length of 16 bytes(at the moment, since we have only 1 content

Review comment:
       nit: add space before bracket

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
##########
@@ -760,7 +768,8 @@ public void maybeRethrowUploadFailure() throws IOException {
      * @throws IOException upload failure
      * @throws PathIOException if too many blocks were written
      */
-    private void uploadBlockAsync(final S3ADataBlocks.DataBlock block)
+    private void uploadBlockAsync(final S3ADataBlocks.DataBlock block,
+        Boolean isLast)

Review comment:
       why `Boolean` and not `boolean`?

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
##########
@@ -508,6 +524,10 @@ public void initialize(URI name, Configuration 
originalConf)
       blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer);
       blockOutputActiveBlocks = intOption(conf,
           FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1);
+      // If CSE is enabled, do multipart uploads serially.
+      if(isCSEEnabled) {

Review comment:
       * nit: add a space
   * log at debug that CSE is enabled

##########
File path: 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
+import static 
org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
+
+/**
+ * Tests to verify S3 Client-Side Encryption (CSE).
+ */
+public abstract class ITestS3AClientSideEncryption extends AbstractS3ATestBase 
{
+
+  private static final List<Integer> SIZES =
+      new ArrayList<>(Arrays.asList(0, 1, 255, 4095));
+
+  private static final int BIG_FILE_SIZE = 15 * 1024 * 1024;
+
+  /**
+   * Testing S3 CSE on different file sizes.
+   */
+  @Test
+  public void testEncryption() throws Throwable {
+    describe("Test to verify client-side encryption for different file 
sizes.");
+    for (int size : SIZES) {
+      validateEncryptionForFilesize(size);
+    }
+  }
+
+  /**
+   * Testing the S3 client side encryption over rename operation.
+   */
+  @Test
+  public void testEncryptionOverRename() throws Throwable {
+    describe("Test for AWS CSE on Rename Operation.");
+    skipTest();
+    S3AFileSystem fs = getFileSystem();
+    Path src = path(getMethodName());
+    byte[] data = dataset(1024, 'a', 'z');
+    LOG.info("Region used: {}", fs.getAmazonS3Client().getRegionName());
+    writeDataset(fs, src, data, data.length, 1024 * 1024,
+        true, false);
+
+    //ContractTestUtils.verifyFileContents(fs, src, data);
+    Path dest = path(src.getName() + "-copy");
+    fs.rename(src, dest);
+    ContractTestUtils.verifyFileContents(fs, dest, data);
+    assertEncrypted(dest);
+  }
+
+  /**
+   * Test to verify if we get same content length of files in S3 CSE using
+   * listStatus and listFiles on the parent directory.
+   */
+  @Test
+  public void testDirectoryListingFileLengths() throws IOException {
+    describe("Test to verify directory listing calls gives correct content "
+        + "lengths");
+    skipTest();
+    S3AFileSystem fs = getFileSystem();
+    Path parentDir = path(getMethodName());
+
+    // Creating files in the parent directory that will be used to assert
+    // content length.
+    for (int i : SIZES) {
+      Path child = new Path(parentDir, getMethodName() + i);
+      writeThenReadFile(child, i);
+    }
+
+    // Getting the content lengths of files inside the directory via 
FileStatus.
+    List<Integer> fileLengthDirListing = new ArrayList<>();
+    for (FileStatus fileStatus : fs.listStatus(parentDir)) {
+      fileLengthDirListing.add((int) fileStatus.getLen());
+    }
+    // Assert the file length we got against expected file length for
+    // ListStatus.
+    Assertions.assertThat(fileLengthDirListing)
+        .describedAs("File lengths isn't same "
+            + "as expected from FileStatus dir. listing")
+        .containsExactlyInAnyOrderElementsOf(SIZES);
+
+
+    // Getting the content lengths of files inside the directory via ListFiles.
+    RemoteIterator<LocatedFileStatus> listDir = fs.listFiles(parentDir, true);
+    List<Integer> fileLengthListLocated = new ArrayList<>();
+    while(listDir.hasNext()) {
+      LocatedFileStatus fileStatus = listDir.next();
+      fileLengthListLocated.add((int) fileStatus.getLen());
+    }
+    // Assert the file length we got against expected file length for
+    // LocatedFileStatus.
+    Assertions.assertThat(fileLengthListLocated)
+        .describedAs("File lengths isn't same "
+            + "as expected from LocatedFileStatus dir. listing")
+        .containsExactlyInAnyOrderElementsOf(SIZES);
+
+  }
+
+  /**
+   * Test to verify multipart upload through S3ABlockOutputStream and
+   * verifying the contents of the uploaded file.
+   */
+  @Test
+  public void testBigFilePutAndGet() throws IOException {
+    skipTest();
+    S3AFileSystem fs = getFileSystem();
+    Path filePath = path(getMethodName());
+    byte[] fileContent = new byte[BIG_FILE_SIZE];
+    // PUT a 15MB file using CSE to simulate multipart in CSE.
+    createFile(fs, filePath, true, fileContent);
+    LOG.info("Multi-part upload successful...");
+
+    try(FSDataInputStream in = fs.open(filePath)) {
+      in.seek(BIG_FILE_SIZE);
+      in.seek(0);
+      in.readFully(0, fileContent);
+
+      verifyFileContents(fs, filePath, fileContent);
+    }
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+
+    // To simulate multi part put and get in small files, we'll set the
+    // threshold and part size to 5MB.
+    conf.set(Constants.MULTIPART_SIZE, String.valueOf(MULTIPART_MIN_SIZE));
+    conf.set(Constants.MIN_MULTIPART_THRESHOLD, 
String.valueOf(MULTIPART_MIN_SIZE));
+    return conf;
+  }
+
+  /**
+   * Method to validate CSE for different file sizes.
+   *
+   * @param len length of the file.
+   */
+  protected void validateEncryptionForFilesize(int len) throws IOException {
+    skipTest();
+    describe("Create an encrypted file of size " + len);
+    // Creating a unique path by adding file length in file name.
+    Path path = writeThenReadFile(getMethodName() + len, len);
+    assertEncrypted(path);
+    rm(getFileSystem(), path, false, false);
+  }
+
+  /**
+   * Skip tests if certain conditions are met.
+   */
+  protected abstract void skipTest();

Review comment:
       rename `maybeSkipTest()`

##########
File path: 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
##########
@@ -239,6 +240,7 @@ protected String getTestTableName(String suffix) {
   }
 
   /**
+<<<<<<< HEAD

Review comment:
       merge marker in comments; easily missed. 

##########
File path: 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
+import static 
org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
+
+/**
+ * Tests to verify S3 Client-Side Encryption (CSE).
+ */
+public abstract class ITestS3AClientSideEncryption extends AbstractS3ATestBase 
{
+
+  private static final List<Integer> SIZES =
+      new ArrayList<>(Arrays.asList(0, 1, 255, 4095));
+
+  private static final int BIG_FILE_SIZE = 15 * 1024 * 1024;
+
+  /**
+   * Testing S3 CSE on different file sizes.
+   */
+  @Test
+  public void testEncryption() throws Throwable {
+    describe("Test to verify client-side encryption for different file 
sizes.");
+    for (int size : SIZES) {
+      validateEncryptionForFilesize(size);
+    }
+  }
+
+  /**
+   * Testing the S3 client side encryption over rename operation.
+   */
+  @Test
+  public void testEncryptionOverRename() throws Throwable {
+    describe("Test for AWS CSE on Rename Operation.");
+    skipTest();
+    S3AFileSystem fs = getFileSystem();
+    Path src = path(getMethodName());
+    byte[] data = dataset(1024, 'a', 'z');
+    LOG.info("Region used: {}", fs.getAmazonS3Client().getRegionName());
+    writeDataset(fs, src, data, data.length, 1024 * 1024,
+        true, false);
+
+    //ContractTestUtils.verifyFileContents(fs, src, data);
+    Path dest = path(src.getName() + "-copy");
+    fs.rename(src, dest);
+    ContractTestUtils.verifyFileContents(fs, dest, data);
+    assertEncrypted(dest);
+  }
+
+  /**
+   * Test to verify if we get same content length of files in S3 CSE using
+   * listStatus and listFiles on the parent directory.
+   */
+  @Test
+  public void testDirectoryListingFileLengths() throws IOException {
+    describe("Test to verify directory listing calls gives correct content "
+        + "lengths");
+    skipTest();
+    S3AFileSystem fs = getFileSystem();
+    Path parentDir = path(getMethodName());
+
+    // Creating files in the parent directory that will be used to assert
+    // content length.
+    for (int i : SIZES) {
+      Path child = new Path(parentDir, getMethodName() + i);
+      writeThenReadFile(child, i);
+    }
+
+    // Getting the content lengths of files inside the directory via 
FileStatus.
+    List<Integer> fileLengthDirListing = new ArrayList<>();
+    for (FileStatus fileStatus : fs.listStatus(parentDir)) {
+      fileLengthDirListing.add((int) fileStatus.getLen());
+    }
+    // Assert the file length we got against expected file length for
+    // ListStatus.
+    Assertions.assertThat(fileLengthDirListing)
+        .describedAs("File lengths isn't same "
+            + "as expected from FileStatus dir. listing")
+        .containsExactlyInAnyOrderElementsOf(SIZES);
+
+
+    // Getting the content lengths of files inside the directory via ListFiles.
+    RemoteIterator<LocatedFileStatus> listDir = fs.listFiles(parentDir, true);
+    List<Integer> fileLengthListLocated = new ArrayList<>();
+    while(listDir.hasNext()) {
+      LocatedFileStatus fileStatus = listDir.next();
+      fileLengthListLocated.add((int) fileStatus.getLen());
+    }
+    // Assert the file length we got against expected file length for
+    // LocatedFileStatus.
+    Assertions.assertThat(fileLengthListLocated)
+        .describedAs("File lengths isn't same "
+            + "as expected from LocatedFileStatus dir. listing")
+        .containsExactlyInAnyOrderElementsOf(SIZES);
+
+  }
+
+  /**
+   * Test to verify multipart upload through S3ABlockOutputStream and
+   * verifying the contents of the uploaded file.
+   */
+  @Test
+  public void testBigFilePutAndGet() throws IOException {
+    skipTest();
+    S3AFileSystem fs = getFileSystem();
+    Path filePath = path(getMethodName());
+    byte[] fileContent = new byte[BIG_FILE_SIZE];
+    // PUT a 15MB file using CSE to simulate multipart in CSE.
+    createFile(fs, filePath, true, fileContent);
+    LOG.info("Multi-part upload successful...");
+
+    try(FSDataInputStream in = fs.open(filePath)) {
+      in.seek(BIG_FILE_SIZE);
+      in.seek(0);
+      in.readFully(0, fileContent);
+
+      verifyFileContents(fs, filePath, fileContent);
+    }
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+
+    // To simulate multi part put and get in small files, we'll set the
+    // threshold and part size to 5MB.
+    conf.set(Constants.MULTIPART_SIZE, String.valueOf(MULTIPART_MIN_SIZE));

Review comment:
       call S3ATestUtils.removeBaseAndBucketOverrides on these options to make 
sure there's no bucket override in anyone's test setup which will then cause 
tests to fail

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
##########
@@ -3655,7 +3676,14 @@ S3AFileStatus s3GetFileStatus(final Path path,
         // look for the simple file
         ObjectMetadata meta = getObjectMetadata(key);
         LOG.debug("Found exact file: normal file {}", key);
-        return new S3AFileStatus(meta.getContentLength(),
+        long contentLength = meta.getContentLength();
+        // check if CSE is enabled, then strip padded length.
+        if (isCSEEnabled
+            && meta.getUserMetaDataOf(Headers.CRYPTO_CEK_ALGORITHM) != null
+            && contentLength >= CSE_PADDING_LENGTH) {

Review comment:
       isn't there some unpadded length header we can use?

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
##########
@@ -356,6 +357,19 @@
   private AuditManagerS3A auditManager =
       AuditIntegration.stubAuditManager();
 
+  /**
+   * S3 client side encryption adds padding to the content length of constant
+   * length of 16 bytes(at the moment, since we have only 1 content
+   * encryption algorithm). Use this to subtract while listing the content
+   * length when certain conditions are met.
+   */
+  public static final int CSE_PADDING_LENGTH = 16;

Review comment:
       move to InternalConstants and refer to it when the padding is needed, 
e.g in tests

##########
File path: 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
##########
@@ -62,10 +67,25 @@ protected void verifyReadBytes(FileSystem.Statistics stats) 
{
     Assert.assertEquals(2 * blockSize, stats.getBytesRead());
   }
 
+  /**
+   * A method to verify the bytes written.
+   *
+   * NOTE: if Client side encryption is enabled, expected bytes written
+   * should increase by 16(padding of data) + 130(KMS key generation) in case
+   * of storage type{@link CryptoStorageMode} as ObjectMetadata(Default). If
+   * Crypto Storage mode is instruction file then add additional bytes as
+   * that file is stored separately and would account for bytes written.
+   *
+   * @param stats Filesystem statistics.
+   */
   @Override
   protected void verifyWrittenBytes(FileSystem.Statistics stats) {
     //No extra bytes are written
-    Assert.assertEquals(blockSize, stats.getBytesWritten());
+    long expectedBlockSize = blockSize;
+    if(conf.get(CLIENT_SIDE_ENCRYPTION_METHOD) != null) {
+      expectedBlockSize += 16 + 130;

Review comment:
       is that 16 some constant somewhere? If so, please switch to make it more 
obvious

##########
File path: 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
+import static 
org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
+
+/**
+ * Tests to verify S3 Client-Side Encryption (CSE).
+ */
+public abstract class ITestS3AClientSideEncryption extends AbstractS3ATestBase 
{
+
+  private static final List<Integer> SIZES =
+      new ArrayList<>(Arrays.asList(0, 1, 255, 4095));
+
+  private static final int BIG_FILE_SIZE = 15 * 1024 * 1024;
+
+  /**
+   * Testing S3 CSE on different file sizes.
+   */
+  @Test
+  public void testEncryption() throws Throwable {
+    describe("Test to verify client-side encryption for different file 
sizes.");
+    for (int size : SIZES) {
+      validateEncryptionForFilesize(size);
+    }
+  }
+
+  /**
+   * Testing the S3 client side encryption over rename operation.
+   */
+  @Test
+  public void testEncryptionOverRename() throws Throwable {
+    describe("Test for AWS CSE on Rename Operation.");
+    skipTest();
+    S3AFileSystem fs = getFileSystem();
+    Path src = path(getMethodName());
+    byte[] data = dataset(1024, 'a', 'z');
+    LOG.info("Region used: {}", fs.getAmazonS3Client().getRegionName());
+    writeDataset(fs, src, data, data.length, 1024 * 1024,
+        true, false);
+
+    //ContractTestUtils.verifyFileContents(fs, src, data);
+    Path dest = path(src.getName() + "-copy");
+    fs.rename(src, dest);
+    ContractTestUtils.verifyFileContents(fs, dest, data);
+    assertEncrypted(dest);
+  }
+
+  /**
+   * Test to verify if we get same content length of files in S3 CSE using
+   * listStatus and listFiles on the parent directory.
+   */
+  @Test
+  public void testDirectoryListingFileLengths() throws IOException {
+    describe("Test to verify directory listing calls gives correct content "
+        + "lengths");
+    skipTest();
+    S3AFileSystem fs = getFileSystem();
+    Path parentDir = path(getMethodName());
+
+    // Creating files in the parent directory that will be used to assert
+    // content length.
+    for (int i : SIZES) {
+      Path child = new Path(parentDir, getMethodName() + i);
+      writeThenReadFile(child, i);
+    }
+
+    // Getting the content lengths of files inside the directory via 
FileStatus.
+    List<Integer> fileLengthDirListing = new ArrayList<>();
+    for (FileStatus fileStatus : fs.listStatus(parentDir)) {
+      fileLengthDirListing.add((int) fileStatus.getLen());
+    }
+    // Assert the file length we got against expected file length for
+    // ListStatus.
+    Assertions.assertThat(fileLengthDirListing)
+        .describedAs("File lengths isn't same "
+            + "as expected from FileStatus dir. listing")
+        .containsExactlyInAnyOrderElementsOf(SIZES);
+
+
+    // Getting the content lengths of files inside the directory via ListFiles.
+    RemoteIterator<LocatedFileStatus> listDir = fs.listFiles(parentDir, true);
+    List<Integer> fileLengthListLocated = new ArrayList<>();
+    while(listDir.hasNext()) {
+      LocatedFileStatus fileStatus = listDir.next();
+      fileLengthListLocated.add((int) fileStatus.getLen());
+    }
+    // Assert the file length we got against expected file length for
+    // LocatedFileStatus.
+    Assertions.assertThat(fileLengthListLocated)
+        .describedAs("File lengths isn't same "
+            + "as expected from LocatedFileStatus dir. listing")
+        .containsExactlyInAnyOrderElementsOf(SIZES);
+
+  }
+
+  /**
+   * Test to verify multipart upload through S3ABlockOutputStream and
+   * verifying the contents of the uploaded file.
+   */
+  @Test
+  public void testBigFilePutAndGet() throws IOException {
+    skipTest();
+    S3AFileSystem fs = getFileSystem();
+    Path filePath = path(getMethodName());
+    byte[] fileContent = new byte[BIG_FILE_SIZE];
+    // PUT a 15MB file using CSE to simulate multipart in CSE.

Review comment:
       "force"

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
##########
@@ -5075,17 +5103,22 @@ public boolean hasPathCapability(final Path path, final 
String capability)
       return isMagicCommitEnabled();
 
     case SelectConstants.S3_SELECT_CAPABILITY:
-      // select is only supported if enabled
-      return SelectBinding.isSelectEnabled(getConf());
+      // select is only supported if enabled and client side encryption is
+      // disabled
+      if(!isCSEEnabled) {

Review comment:
       simpler jut to merge with the return, eg.
   
   ```
   return !isCSEEnabled && electBinding.isSelectEnabled(getConf());
   ```
   
   Also, will `requireSelectSupport` need the same check?

##########
File path: 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
##########
@@ -70,13 +70,16 @@ public void setup() throws Exception {
     invoker = new Invoker(new S3ARetryPolicy(getConfiguration()),
         Invoker.NO_OP
     );
+    ifCSEThenSkip();

Review comment:
       should be `skipIfClientSideException()` for consistency with the others

##########
File path: 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java
##########
@@ -182,10 +197,21 @@ public void sign(SignableRequest<?> request, 
AWSCredentials credentials) {
       } catch (IOException e) {
         throw new RuntimeException("Failed to get current Ugi", e);
       }
-      AWSS3V4Signer realSigner = new AWSS3V4Signer();
-      realSigner.setServiceName("s3");
-      realSigner.setRegionName(lastStoreValue.conf.get(TEST_REGION_KEY));
-      realSigner.sign(request, credentials);
+      if(bucketName.equals("kms")) {

Review comment:
       nit: add spaces before the (s

##########
File path: 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
+import static 
org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
+
+/**
+ * Tests to verify S3 Client-Side Encryption (CSE).
+ */
+public abstract class ITestS3AClientSideEncryption extends AbstractS3ATestBase 
{
+
+  private static final List<Integer> SIZES =
+      new ArrayList<>(Arrays.asList(0, 1, 255, 4095));
+
+  private static final int BIG_FILE_SIZE = 15 * 1024 * 1024;
+
+  /**
+   * Testing S3 CSE on different file sizes.
+   */
+  @Test
+  public void testEncryption() throws Throwable {
+    describe("Test to verify client-side encryption for different file 
sizes.");
+    for (int size : SIZES) {
+      validateEncryptionForFilesize(size);
+    }
+  }
+
+  /**
+   * Testing the S3 client side encryption over rename operation.
+   */
+  @Test
+  public void testEncryptionOverRename() throws Throwable {
+    describe("Test for AWS CSE on Rename Operation.");
+    skipTest();
+    S3AFileSystem fs = getFileSystem();
+    Path src = path(getMethodName());
+    byte[] data = dataset(1024, 'a', 'z');
+    LOG.info("Region used: {}", fs.getAmazonS3Client().getRegionName());
+    writeDataset(fs, src, data, data.length, 1024 * 1024,
+        true, false);
+
+    //ContractTestUtils.verifyFileContents(fs, src, data);
+    Path dest = path(src.getName() + "-copy");
+    fs.rename(src, dest);
+    ContractTestUtils.verifyFileContents(fs, dest, data);
+    assertEncrypted(dest);
+  }
+
+  /**
+   * Test to verify if we get same content length of files in S3 CSE using
+   * listStatus and listFiles on the parent directory.
+   */
+  @Test
+  public void testDirectoryListingFileLengths() throws IOException {
+    describe("Test to verify directory listing calls gives correct content "
+        + "lengths");
+    skipTest();
+    S3AFileSystem fs = getFileSystem();
+    Path parentDir = path(getMethodName());
+
+    // Creating files in the parent directory that will be used to assert
+    // content length.
+    for (int i : SIZES) {
+      Path child = new Path(parentDir, getMethodName() + i);
+      writeThenReadFile(child, i);
+    }
+
+    // Getting the content lengths of files inside the directory via 
FileStatus.
+    List<Integer> fileLengthDirListing = new ArrayList<>();
+    for (FileStatus fileStatus : fs.listStatus(parentDir)) {
+      fileLengthDirListing.add((int) fileStatus.getLen());
+    }
+    // Assert the file length we got against expected file length for
+    // ListStatus.
+    Assertions.assertThat(fileLengthDirListing)
+        .describedAs("File lengths isn't same "
+            + "as expected from FileStatus dir. listing")
+        .containsExactlyInAnyOrderElementsOf(SIZES);
+
+
+    // Getting the content lengths of files inside the directory via ListFiles.
+    RemoteIterator<LocatedFileStatus> listDir = fs.listFiles(parentDir, true);
+    List<Integer> fileLengthListLocated = new ArrayList<>();
+    while(listDir.hasNext()) {
+      LocatedFileStatus fileStatus = listDir.next();
+      fileLengthListLocated.add((int) fileStatus.getLen());
+    }
+    // Assert the file length we got against expected file length for
+    // LocatedFileStatus.
+    Assertions.assertThat(fileLengthListLocated)
+        .describedAs("File lengths isn't same "
+            + "as expected from LocatedFileStatus dir. listing")
+        .containsExactlyInAnyOrderElementsOf(SIZES);
+
+  }
+
+  /**
+   * Test to verify multipart upload through S3ABlockOutputStream and
+   * verifying the contents of the uploaded file.
+   */
+  @Test
+  public void testBigFilePutAndGet() throws IOException {
+    skipTest();

Review comment:
       this should only execute on a scale run. 
`getConf().getBoolean(KEY_SCALE_TESTS_ENABLED, false)` should provide that

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
##########
@@ -524,10 +525,15 @@ public static S3AFileStatus createFileStatus(Path keyPath,
       long blockSize,
       String owner,
       String eTag,
-      String versionId) {
+      String versionId,
+      boolean isCSEEnabled) {

Review comment:
       do you think it's time we moved to a builder here?

##########
File path: 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
+import static 
org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
+
+/**
+ * Tests to verify S3 Client-Side Encryption (CSE).
+ */
+public abstract class ITestS3AClientSideEncryption extends AbstractS3ATestBase 
{
+
+  private static final List<Integer> SIZES =
+      new ArrayList<>(Arrays.asList(0, 1, 255, 4095));
+
+  private static final int BIG_FILE_SIZE = 15 * 1024 * 1024;
+
+  /**
+   * Testing S3 CSE on different file sizes.
+   */
+  @Test
+  public void testEncryption() throws Throwable {
+    describe("Test to verify client-side encryption for different file 
sizes.");
+    for (int size : SIZES) {
+      validateEncryptionForFilesize(size);
+    }
+  }
+
+  /**
+   * Testing the S3 client side encryption over rename operation.
+   */
+  @Test
+  public void testEncryptionOverRename() throws Throwable {
+    describe("Test for AWS CSE on Rename Operation.");
+    skipTest();
+    S3AFileSystem fs = getFileSystem();
+    Path src = path(getMethodName());
+    byte[] data = dataset(1024, 'a', 'z');
+    LOG.info("Region used: {}", fs.getAmazonS3Client().getRegionName());
+    writeDataset(fs, src, data, data.length, 1024 * 1024,
+        true, false);
+
+    //ContractTestUtils.verifyFileContents(fs, src, data);
+    Path dest = path(src.getName() + "-copy");
+    fs.rename(src, dest);
+    ContractTestUtils.verifyFileContents(fs, dest, data);

Review comment:
       wondering if we should use DurationInfo to log durations of the write 
rename and verify operations? Might be interesting to look at

##########
File path: 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
+import static 
org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
+
+/**
+ * Tests to verify S3 Client-Side Encryption (CSE).
+ */
+public abstract class ITestS3AClientSideEncryption extends AbstractS3ATestBase 
{
+
+  private static final List<Integer> SIZES =
+      new ArrayList<>(Arrays.asList(0, 1, 255, 4095));
+
+  private static final int BIG_FILE_SIZE = 15 * 1024 * 1024;
+
+  /**
+   * Testing S3 CSE on different file sizes.
+   */
+  @Test
+  public void testEncryption() throws Throwable {
+    describe("Test to verify client-side encryption for different file 
sizes.");
+    for (int size : SIZES) {
+      validateEncryptionForFilesize(size);
+    }
+  }
+
+  /**
+   * Testing the S3 client side encryption over rename operation.
+   */
+  @Test
+  public void testEncryptionOverRename() throws Throwable {
+    describe("Test for AWS CSE on Rename Operation.");
+    skipTest();
+    S3AFileSystem fs = getFileSystem();
+    Path src = path(getMethodName());
+    byte[] data = dataset(1024, 'a', 'z');
+    LOG.info("Region used: {}", fs.getAmazonS3Client().getRegionName());
+    writeDataset(fs, src, data, data.length, 1024 * 1024,
+        true, false);
+
+    //ContractTestUtils.verifyFileContents(fs, src, data);
+    Path dest = path(src.getName() + "-copy");
+    fs.rename(src, dest);
+    ContractTestUtils.verifyFileContents(fs, dest, data);
+    assertEncrypted(dest);
+  }
+
+  /**
+   * Test to verify if we get same content length of files in S3 CSE using
+   * listStatus and listFiles on the parent directory.
+   */
+  @Test
+  public void testDirectoryListingFileLengths() throws IOException {
+    describe("Test to verify directory listing calls gives correct content "
+        + "lengths");
+    skipTest();
+    S3AFileSystem fs = getFileSystem();
+    Path parentDir = path(getMethodName());
+
+    // Creating files in the parent directory that will be used to assert
+    // content length.
+    for (int i : SIZES) {
+      Path child = new Path(parentDir, getMethodName() + i);
+      writeThenReadFile(child, i);
+    }
+
+    // Getting the content lengths of files inside the directory via 
FileStatus.
+    List<Integer> fileLengthDirListing = new ArrayList<>();
+    for (FileStatus fileStatus : fs.listStatus(parentDir)) {
+      fileLengthDirListing.add((int) fileStatus.getLen());
+    }
+    // Assert the file length we got against expected file length for
+    // ListStatus.
+    Assertions.assertThat(fileLengthDirListing)
+        .describedAs("File lengths isn't same "

Review comment:
       nit, "aren't the same"

##########
File path: 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
##########
@@ -62,10 +67,25 @@ protected void verifyReadBytes(FileSystem.Statistics stats) 
{
     Assert.assertEquals(2 * blockSize, stats.getBytesRead());
   }
 
+  /**
+   * A method to verify the bytes written.
+   *
+   * NOTE: if Client side encryption is enabled, expected bytes written
+   * should increase by 16(padding of data) + 130(KMS key generation) in case
+   * of storage type{@link CryptoStorageMode} as ObjectMetadata(Default). If
+   * Crypto Storage mode is instruction file then add additional bytes as
+   * that file is stored separately and would account for bytes written.
+   *
+   * @param stats Filesystem statistics.
+   */
   @Override
   protected void verifyWrittenBytes(FileSystem.Statistics stats) {
     //No extra bytes are written
-    Assert.assertEquals(blockSize, stats.getBytesWritten());
+    long expectedBlockSize = blockSize;
+    if(conf.get(CLIENT_SIDE_ENCRYPTION_METHOD) != null) {
+      expectedBlockSize += 16 + 130;
+    }
+    Assert.assertEquals(expectedBlockSize, stats.getBytesWritten());

Review comment:
       add message for the assert

##########
File path: 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionKms.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import com.amazonaws.services.s3.Headers;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import org.assertj.core.api.Assertions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfKmsKeyIdIsNotSet;
+
+/**
+ * Testing the S3 CSE - KME method.
+ */
+public class ITestS3AClientSideEncryptionKms
+    extends ITestS3AClientSideEncryption {
+
+  private static final String KMS_KEY_WRAP_ALGO = "kms+context";
+  private static final String KMS_CONTENT_ENCRYPTION_ALGO = 
"AES/GCM/NoPadding";
+
+  /**
+   * Creating custom configs for KMS testing.
+   *
+   * @return Configuration.
+   */
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    S3ATestUtils.disableFilesystemCaching(conf);
+    return conf;
+  }
+
+  @Override
+  protected void skipTest() {
+    skipIfEncryptionTestsDisabled(getConfiguration());
+    skipIfKmsKeyIdIsNotSet(getConfiguration());
+  }
+
+  @Override
+  protected void assertEncrypted(Path path) throws IOException {
+    ObjectMetadata md = getFileSystem().getObjectMetadata(path);

Review comment:
       what about using fs.getXAttr() to retrieve these headers? It'd help 
verify that codepath

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java
##########
@@ -301,8 +301,15 @@ public HeaderProcessing(final StoreContext storeContext,
         md.getContentEncoding());
     maybeSetHeader(headers, XA_CONTENT_LANGUAGE,
         md.getContentLanguage());
-    maybeSetHeader(headers, XA_CONTENT_LENGTH,
-        md.getContentLength());
+    // If CSE is enabled, use the unencrypted content length.
+    if (md.getUserMetaDataOf(Headers.CRYPTO_CEK_ALGORITHM) != null
+        && md.getUserMetaDataOf(Headers.UNENCRYPTED_CONTENT_LENGTH) != null) {

Review comment:
       nice catch




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to