[ 
https://issues.apache.org/jira/browse/BEAM-3813?focusedWorklogId=101702&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101702
 ]

ASF GitHub Bot logged work on BEAM-3813:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/May/18 12:03
            Start Date: 14/May/18 12:03
    Worklog Time Spent: 10m 
      Work Description: jbonofre closed pull request #5244: [BEAM-3813] Support 
encryption for S3FileSystem (SSE-S3, SSE-C and SSE-KMS)
URL: https://github.com/apache/beam/pull/5244
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/io/amazon-web-services/build.gradle 
b/sdks/java/io/amazon-web-services/build.gradle
index 10d016037f3..53b674bdb6f 100644
--- a/sdks/java/io/amazon-web-services/build.gradle
+++ b/sdks/java/io/amazon-web-services/build.gradle
@@ -22,7 +22,7 @@ applyJavaNature()
 description = "Apache Beam :: SDKs :: Java :: IO :: Amazon Web Services"
 ext.summary = "IO library to read and write Amazon Web Services services from 
Beam."
 
-def aws_java_sdk_version = "1.11.255"
+def aws_java_sdk_version = "1.11.319"
 
 dependencies {
   compile library.java.guava
@@ -34,7 +34,9 @@ dependencies {
   shadow library.java.jackson_databind
   shadow library.java.findbugs_jsr305
   shadow library.java.slf4j_api
+  runtime 'commons-codec:commons-codec:1.9'
   runtime "org.apache.httpcomponents:httpclient:4.5.2"
+  testCompile project(path: ":beam-runners-direct-java", configuration: 
"shadow")
   shadowTest library.java.guava_testlib
   shadowTest library.java.hamcrest_core
   shadowTest library.java.mockito_core
diff --git a/sdks/java/io/amazon-web-services/pom.xml 
b/sdks/java/io/amazon-web-services/pom.xml
index f76d5a7e23a..d76f5394200 100644
--- a/sdks/java/io/amazon-web-services/pom.xml
+++ b/sdks/java/io/amazon-web-services/pom.xml
@@ -33,7 +33,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    <aws-java-sdk.version>1.11.255</aws-java-sdk.version>
+    <aws-java-sdk.version>1.11.319</aws-java-sdk.version>
   </properties>
 
   <build>
@@ -77,6 +77,13 @@
       <version>${aws-java-sdk.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+      <version>1.9</version>
+      <scope>runtime</scope>
+    </dependency>
+
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-core</artifactId>
@@ -142,6 +149,12 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-core</artifactId>
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java
index 228ab5c8b85..0c7ce5b292c 100644
--- 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java
@@ -47,7 +47,6 @@
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * A Jackson {@link Module} that registers a {@link JsonSerializer} and {@link 
JsonDeserializer}
@@ -75,7 +74,8 @@ public AwsModule() {
 
   }
 
-  static class AWSCredentialsProviderDeserializer extends 
JsonDeserializer<AWSCredentialsProvider> {
+  private static class AWSCredentialsProviderDeserializer
+      extends JsonDeserializer<AWSCredentialsProvider> {
 
     @Override
     public AWSCredentialsProvider deserialize(
@@ -123,15 +123,14 @@ public AWSCredentialsProvider 
deserializeWithType(JsonParser jsonParser,
   }
 
   static class AWSCredentialsProviderSerializer extends 
JsonSerializer<AWSCredentialsProvider> {
-
     // These providers are singletons, so don't require any serialization, 
other than type.
-    private static final Set<Object> SINGLETON_CREDENTIAL_PROVIDERS = 
ImmutableSet.of(
-        DefaultAWSCredentialsProviderChain.class,
-        EnvironmentVariableCredentialsProvider.class,
-        SystemPropertiesCredentialsProvider.class,
-        ProfileCredentialsProvider.class,
-        EC2ContainerCredentialsProviderWrapper.class
-    );
+    private static final ImmutableSet<Object> SINGLETON_CREDENTIAL_PROVIDERS =
+        ImmutableSet.of(
+            DefaultAWSCredentialsProviderChain.class,
+            EnvironmentVariableCredentialsProvider.class,
+            SystemPropertiesCredentialsProvider.class,
+            ProfileCredentialsProvider.class,
+            EC2ContainerCredentialsProviderWrapper.class);
 
     @Override
     public void serialize(AWSCredentialsProvider credentialsProvider, 
JsonGenerator jsonGenerator,
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/S3Options.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/S3Options.java
index 253272898a1..22ab91ce39f 100644
--- 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/S3Options.java
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/S3Options.java
@@ -17,9 +17,13 @@
  */
 package org.apache.beam.sdk.io.aws.options;
 
+import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
+import com.amazonaws.services.s3.model.SSECustomerKey;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
  * Options used to configure Amazon Web Services S3.
@@ -33,13 +37,44 @@
 
   @Description(
       "Size of S3 upload chunks; max upload object size is this value 
multiplied by 10000;"
-          + "default is 64MB, or 5MB in memory-constrained environments")
-  @Nullable
+          + "default is 64MB, or 5MB in memory-constrained environments. Must 
be at least 5MB.")
+  @Default.InstanceFactory(S3UploadBufferSizeBytesFactory.class)
   Integer getS3UploadBufferSizeBytes();
+
   void setS3UploadBufferSizeBytes(Integer value);
 
   @Description("Thread pool size, limiting max concurrent S3 operations")
   @Default.Integer(50)
   int getS3ThreadPoolSize();
   void setS3ThreadPoolSize(int value);
+
+  @Description("Algorithm for SSE-S3 encryption, e.g. AES256.")
+  @Nullable
+  String getSSEAlgorithm();
+  void setSSEAlgorithm(String value);
+
+  @Description("SSE key for SSE-C encryption, e.g. a base64 encoded key and 
the algorithm.")
+  @Nullable
+  SSECustomerKey getSSECustomerKey();
+  void setSSECustomerKey(SSECustomerKey value);
+
+  @Description("KMS key id for SSE-KMS encryption, e.g. \"arn:aws:kms:...\".")
+  @Nullable
+  SSEAwsKeyManagementParams getSSEAwsKeyManagementParams();
+  void setSSEAwsKeyManagementParams(SSEAwsKeyManagementParams value);
+
+  /**
+   * Provide the default s3 upload buffer size in bytes: 64MB if more than 
512MB in RAM are
+   * available and 5MB otherwise.
+   */
+  class S3UploadBufferSizeBytesFactory implements DefaultValueFactory<Integer> 
{
+    public static final int MINIMUM_UPLOAD_BUFFER_SIZE_BYTES = 5_242_880;
+
+    @Override
+    public Integer create(PipelineOptions options) {
+      return Runtime.getRuntime().maxMemory() < 536_870_912
+          ? MINIMUM_UPLOAD_BUFFER_SIZE_BYTES
+          : 67_108_864;
+    }
+  }
 }
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
index f23e8272b3f..cb7744022e6 100644
--- 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
@@ -27,11 +27,14 @@
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
 import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.CopyObjectResult;
 import com.amazonaws.services.s3.model.CopyPartRequest;
 import com.amazonaws.services.s3.model.CopyPartResult;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
 import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
 import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
 import com.amazonaws.services.s3.model.ListObjectsV2Request;
@@ -43,7 +46,6 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
@@ -61,13 +63,13 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.FileSystem;
 import org.apache.beam.sdk.io.aws.options.S3Options;
@@ -81,37 +83,40 @@
 
   private static final Logger LOG = 
LoggerFactory.getLogger(S3FileSystem.class);
 
-  // Amazon S3 API docs: Each part must be at least 5 MB in size, except the 
last part.
-  private static final int MINIMUM_UPLOAD_BUFFER_SIZE_BYTES = 5 * 1024 * 1024;
-  private static final int DEFAULT_UPLOAD_BUFFER_SIZE_BYTES =
-      Runtime.getRuntime().maxMemory() < 512 * 1024 * 1024
-          ? MINIMUM_UPLOAD_BUFFER_SIZE_BYTES
-          : 64 * 1024 * 1024;
 
   // Amazon S3 API: You can create a copy of your object up to 5 GB in a 
single atomic operation
   // Ref. 
https://docs.aws.amazon.com/AmazonS3/latest/dev/CopyingObjectsExamples.html
-  private static final long MAX_COPY_OBJECT_SIZE_BYTES = 5L * 1024L * 1024L * 
1024L;
+  private static final long MAX_COPY_OBJECT_SIZE_BYTES = 5_368_709_120L;
 
   // S3 API, delete-objects: "You may specify up to 1000 keys."
   private static final int MAX_DELETE_OBJECTS_PER_REQUEST = 1000;
 
-  private static final Set<String> NON_READ_SEEK_EFFICIENT_ENCODINGS = 
ImmutableSet.of("gzip");
+  private static final ImmutableSet<String> NON_READ_SEEK_EFFICIENT_ENCODINGS =
+      ImmutableSet.of("gzip");
 
   // Non-final for testing.
   private AmazonS3 amazonS3;
-  private final String storageClass;
-  private final int s3UploadBufferSizeBytes;
+  private final S3Options options;
   private final ListeningExecutorService executorService;
 
   S3FileSystem(S3Options options) {
-    checkNotNull(options, "options");
-
+    this.options = checkNotNull(options, "options");
     if (Strings.isNullOrEmpty(options.getAwsRegion())) {
       LOG.info(
           "The AWS S3 Beam extension was included in this build, but the 
awsRegion flag "
               + "was not specified. If you don't plan to use S3, then ignore 
this message.");
     }
+    this.amazonS3 = buildAmazonS3Client(options);
+
+    checkNotNull(options.getS3StorageClass(), "storageClass");
+    checkArgument(options.getS3ThreadPoolSize() > 0, "threadPoolSize");
+    executorService =
+        MoreExecutors.listeningDecorator(
+            Executors.newFixedThreadPool(
+                options.getS3ThreadPoolSize(), new 
ThreadFactoryBuilder().setDaemon(true).build()));
+  }
 
+  private static AmazonS3 buildAmazonS3Client(S3Options options) {
     AmazonS3ClientBuilder builder =
         
AmazonS3ClientBuilder.standard().withCredentials(options.getAwsCredentialsProvider());
     if (Strings.isNullOrEmpty(options.getAwsServiceEndpoint())) {
@@ -121,24 +126,7 @@
           builder.withEndpointConfiguration(
               new EndpointConfiguration(options.getAwsServiceEndpoint(), 
options.getAwsRegion()));
     }
-    amazonS3 = builder.build();
-
-    this.storageClass = checkNotNull(options.getS3StorageClass(), 
"storageClass");
-
-    int uploadBufferSizeBytes;
-    if (options.getS3UploadBufferSizeBytes() != null) {
-      uploadBufferSizeBytes = options.getS3UploadBufferSizeBytes();
-    } else {
-      uploadBufferSizeBytes = DEFAULT_UPLOAD_BUFFER_SIZE_BYTES;
-    }
-    this.s3UploadBufferSizeBytes =
-        Math.max(MINIMUM_UPLOAD_BUFFER_SIZE_BYTES, uploadBufferSizeBytes);
-
-    checkArgument(options.getS3ThreadPoolSize() > 0, "threadPoolSize");
-    executorService =
-        MoreExecutors.listeningDecorator(
-            Executors.newFixedThreadPool(
-                options.getS3ThreadPoolSize(), new 
ThreadFactoryBuilder().setDaemon(true).build()));
+    return builder.build();
   }
 
   @Override
@@ -156,15 +144,10 @@ AmazonS3 getAmazonS3Client() {
     return this.amazonS3;
   }
 
-  @VisibleForTesting
-  int getS3UploadBufferSizeBytes() {
-    return s3UploadBufferSizeBytes;
-  }
-
   @Override
   protected List<MatchResult> match(List<String> specs) throws IOException {
     List<S3ResourceId> paths =
-        FluentIterable.from(specs).transform(spec -> 
S3ResourceId.fromUri(spec)).toList();
+        specs.stream().map(S3ResourceId::fromUri).collect(Collectors.toList());
     List<S3ResourceId> globs = new ArrayList<>();
     List<S3ResourceId> nonGlobs = new ArrayList<>();
     List<Boolean> isGlobBooleans = new ArrayList<>();
@@ -357,7 +340,7 @@ private ExpandedGlob expandGlob(S3ResourceId glob) {
   private PathWithEncoding getPathContentEncoding(S3ResourceId path) {
     ObjectMetadata s3Metadata;
     try {
-      s3Metadata = amazonS3.getObjectMetadata(path.getBucket(), path.getKey());
+      s3Metadata = getObjectMetadata(path);
     } catch (AmazonClientException e) {
       if (e instanceof AmazonS3Exception && ((AmazonS3Exception) 
e).getStatusCode() == 404) {
         return PathWithEncoding.create(path, new FileNotFoundException());
@@ -376,11 +359,18 @@ private PathWithEncoding 
getPathContentEncoding(S3ResourceId path) {
     return callTasks(tasks);
   }
 
+  private ObjectMetadata getObjectMetadata(S3ResourceId s3ResourceId) throws 
AmazonClientException {
+    GetObjectMetadataRequest request =
+        new GetObjectMetadataRequest(s3ResourceId.getBucket(), 
s3ResourceId.getKey());
+    request.setSSECustomerKey(options.getSSECustomerKey());
+    return amazonS3.getObjectMetadata(request);
+  }
+
   @VisibleForTesting
   MatchResult matchNonGlobPath(S3ResourceId path) {
     ObjectMetadata s3Metadata;
     try {
-      s3Metadata = amazonS3.getObjectMetadata(path.getBucket(), path.getKey());
+      s3Metadata = getObjectMetadata(path);
     } catch (AmazonClientException e) {
       if (e instanceof AmazonS3Exception && ((AmazonS3Exception) 
e).getStatusCode() == 404) {
         return MatchResult.create(MatchResult.Status.NOT_FOUND, new 
FileNotFoundException());
@@ -473,13 +463,12 @@ private static int doubleSlashes(StringBuilder dst, 
char[] src, int i) {
   @Override
   protected WritableByteChannel create(S3ResourceId resourceId, CreateOptions 
createOptions)
       throws IOException {
-    return new S3WritableByteChannel(
-        amazonS3, resourceId, createOptions.mimeType(), storageClass, 
s3UploadBufferSizeBytes);
+    return new S3WritableByteChannel(amazonS3, resourceId, 
createOptions.mimeType(), options);
   }
 
   @Override
   protected ReadableByteChannel open(S3ResourceId resourceId) throws 
IOException {
-    return new S3ReadableSeekableByteChannel(amazonS3, resourceId);
+    return new S3ReadableSeekableByteChannel(amazonS3, resourceId, options);
   }
 
   @Override
@@ -510,12 +499,11 @@ protected void copy(List<S3ResourceId> sourcePaths, 
List<S3ResourceId> destinati
   @VisibleForTesting
   void copy(S3ResourceId sourcePath, S3ResourceId destinationPath) throws 
IOException {
     try {
-      ObjectMetadata objectMetadata =
-          amazonS3.getObjectMetadata(sourcePath.getBucket(), 
sourcePath.getKey());
-      if (objectMetadata.getContentLength() < MAX_COPY_OBJECT_SIZE_BYTES) {
-        atomicCopy(sourcePath, destinationPath);
+      ObjectMetadata sourceObjectMetadata = getObjectMetadata(sourcePath);
+      if (sourceObjectMetadata.getContentLength() < 
MAX_COPY_OBJECT_SIZE_BYTES) {
+        atomicCopy(sourcePath, destinationPath, sourceObjectMetadata);
       } else {
-        multipartCopy(sourcePath, destinationPath, objectMetadata);
+        multipartCopy(sourcePath, destinationPath, sourceObjectMetadata);
       }
     } catch (AmazonClientException e) {
       throw new IOException(e);
@@ -523,7 +511,8 @@ void copy(S3ResourceId sourcePath, S3ResourceId 
destinationPath) throws IOExcept
   }
 
   @VisibleForTesting
-  void atomicCopy(S3ResourceId sourcePath, S3ResourceId destinationPath)
+  CopyObjectResult atomicCopy(
+      S3ResourceId sourcePath, S3ResourceId destinationPath, ObjectMetadata 
sourceObjectMetadata)
       throws AmazonClientException {
     CopyObjectRequest copyObjectRequest =
         new CopyObjectRequest(
@@ -531,19 +520,22 @@ void atomicCopy(S3ResourceId sourcePath, S3ResourceId 
destinationPath)
             sourcePath.getKey(),
             destinationPath.getBucket(),
             destinationPath.getKey());
-    copyObjectRequest.setStorageClass(storageClass);
-
-    amazonS3.copyObject(copyObjectRequest);
+    copyObjectRequest.setNewObjectMetadata(sourceObjectMetadata);
+    copyObjectRequest.setStorageClass(options.getS3StorageClass());
+    copyObjectRequest.setSourceSSECustomerKey(options.getSSECustomerKey());
+    
copyObjectRequest.setDestinationSSECustomerKey(options.getSSECustomerKey());
+    return amazonS3.copyObject(copyObjectRequest);
   }
 
   @VisibleForTesting
-  void multipartCopy(
-      S3ResourceId sourcePath, S3ResourceId destinationPath, ObjectMetadata 
objectMetadata)
+  CompleteMultipartUploadResult multipartCopy(
+      S3ResourceId sourcePath, S3ResourceId destinationPath, ObjectMetadata 
sourceObjectMetadata)
       throws AmazonClientException {
     InitiateMultipartUploadRequest initiateUploadRequest =
         new InitiateMultipartUploadRequest(destinationPath.getBucket(), 
destinationPath.getKey())
-            .withStorageClass(storageClass)
-            .withObjectMetadata(objectMetadata);
+            .withStorageClass(options.getS3StorageClass())
+            .withObjectMetadata(sourceObjectMetadata);
+    initiateUploadRequest.setSSECustomerKey(options.getSSECustomerKey());
 
     InitiateMultipartUploadResult initiateUploadResult =
         amazonS3.initiateMultipartUpload(initiateUploadRequest);
@@ -551,7 +543,7 @@ void multipartCopy(
 
     List<PartETag> eTags = new ArrayList<>();
 
-    final long objectSize = objectMetadata.getContentLength();
+    final long objectSize = sourceObjectMetadata.getContentLength();
     // extra validation in case a caller calls directly 
S3FileSystem.multipartCopy
     // without using S3FileSystem.copy in the future
     if (objectSize == 0) {
@@ -563,11 +555,14 @@ void multipartCopy(
               .withDestinationKey(destinationPath.getKey())
               .withUploadId(uploadId)
               .withPartNumber(1);
+      copyPartRequest.setSourceSSECustomerKey(options.getSSECustomerKey());
+      
copyPartRequest.setDestinationSSECustomerKey(options.getSSECustomerKey());
 
       CopyPartResult copyPartResult = amazonS3.copyPart(copyPartRequest);
       eTags.add(copyPartResult.getPartETag());
     } else {
       long bytePosition = 0;
+      Integer uploadBufferSizeBytes = options.getS3UploadBufferSizeBytes();
       // Amazon parts are 1-indexed, not zero-indexed.
       for (int partNumber = 1; bytePosition < objectSize; partNumber++) {
         final CopyPartRequest copyPartRequest =
@@ -579,12 +574,14 @@ void multipartCopy(
                 .withUploadId(uploadId)
                 .withPartNumber(partNumber)
                 .withFirstByte(bytePosition)
-                .withLastByte(Math.min(objectSize - 1, bytePosition + 
s3UploadBufferSizeBytes - 1));
+                .withLastByte(Math.min(objectSize - 1, bytePosition + 
uploadBufferSizeBytes - 1));
+        copyPartRequest.setSourceSSECustomerKey(options.getSSECustomerKey());
+        
copyPartRequest.setDestinationSSECustomerKey(options.getSSECustomerKey());
 
         CopyPartResult copyPartResult = amazonS3.copyPart(copyPartRequest);
         eTags.add(copyPartResult.getPartETag());
 
-        bytePosition += s3UploadBufferSizeBytes;
+        bytePosition += uploadBufferSizeBytes;
       }
     }
 
@@ -594,7 +591,7 @@ void multipartCopy(
             .withKey(destinationPath.getKey())
             .withUploadId(uploadId)
             .withPartETags(eTags);
-    amazonS3.completeMultipartUpload(completeUploadRequest);
+    return amazonS3.completeMultipartUpload(completeUploadRequest);
   }
 
   @Override
@@ -608,9 +605,10 @@ protected void rename(
   @Override
   protected void delete(Collection<S3ResourceId> resourceIds) throws 
IOException {
     List<S3ResourceId> nonDirectoryPaths =
-        FluentIterable.from(resourceIds)
+        resourceIds
+            .stream()
             .filter(s3ResourceId -> !s3ResourceId.isDirectory())
-            .toList();
+            .collect(Collectors.toList());
     Multimap<String, String> keysByBucket = ArrayListMultimap.create();
     for (S3ResourceId path : nonDirectoryPaths) {
       keysByBucket.put(path.getBucket(), path.getKey());
@@ -634,11 +632,11 @@ protected void delete(Collection<S3ResourceId> 
resourceIds) throws IOException {
   private void delete(String bucket, Collection<String> keys) throws 
IOException {
     checkArgument(
         keys.size() <= MAX_DELETE_OBJECTS_PER_REQUEST,
-        "only %d keys can be deleted per request, but got %d",
+        "only %s keys can be deleted per request, but got %s",
         MAX_DELETE_OBJECTS_PER_REQUEST,
         keys.size());
     List<KeyVersion> deleteKeyVersions =
-        FluentIterable.from(keys).transform(key -> new 
KeyVersion(key)).toList();
+        keys.stream().map(KeyVersion::new).collect(Collectors.toList());
     DeleteObjectsRequest request = new 
DeleteObjectsRequest(bucket).withKeys(deleteKeyVersions);
     try {
       amazonS3.deleteObjects(request);
@@ -672,7 +670,7 @@ protected S3ResourceId matchNewResource(String 
singleResourceSpec, boolean isDir
     try {
       List<CompletionStage<T>> futures = new ArrayList<>(tasks.size());
       for (Callable<T> task : tasks) {
-        futures.add(MoreFutures.supplyAsync(() -> task.call(), 
executorService));
+        futures.add(MoreFutures.supplyAsync(task::call, executorService));
       }
       return MoreFutures.get(MoreFutures.allAsList(futures));
 
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemRegistrar.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemRegistrar.java
index ebfa7c40ec6..8ab20a92958 100644
--- 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemRegistrar.java
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemRegistrar.java
@@ -38,6 +38,6 @@
   @Override
   public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
     checkNotNull(options, "Expect the runner have called 
FileSystems.setDefaultPipelineOptions().");
-    return ImmutableList.<FileSystem>of(new 
S3FileSystem(options.as(S3Options.class)));
+    return ImmutableList.of(new S3FileSystem(options.as(S3Options.class)));
   }
 }
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3ReadableSeekableByteChannel.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3ReadableSeekableByteChannel.java
index 2c2813a0400..f0cf1e073ef 100644
--- 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3ReadableSeekableByteChannel.java
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3ReadableSeekableByteChannel.java
@@ -32,10 +32,9 @@
 import java.nio.channels.NonWritableChannelException;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SeekableByteChannel;
+import org.apache.beam.sdk.io.aws.options.S3Options;
 
-/**
- * A readable S3 object, as a {@link SeekableByteChannel}.
- */
+/** A readable S3 object, as a {@link SeekableByteChannel}. */
 class S3ReadableSeekableByteChannel implements SeekableByteChannel {
 
   private final AmazonS3 amazonS3;
@@ -44,11 +43,14 @@
   private long position = 0;
   private boolean open = true;
   private S3Object s3Object;
+  private final S3Options options;
   private ReadableByteChannel s3ObjectContentChannel;
 
-  S3ReadableSeekableByteChannel(AmazonS3 amazonS3, S3ResourceId path) throws 
IOException {
+  S3ReadableSeekableByteChannel(AmazonS3 amazonS3, S3ResourceId path, 
S3Options options)
+      throws IOException {
     this.amazonS3 = checkNotNull(amazonS3, "amazonS3");
     checkNotNull(path, "path");
+    this.options = checkNotNull(options, "options");
 
     if (path.getSize().isPresent()) {
       contentLength = path.getSize().get();
@@ -79,6 +81,7 @@ public int read(ByteBuffer destinationBuffer) throws 
IOException {
 
     if (s3Object == null) {
       GetObjectRequest request = new GetObjectRequest(path.getBucket(), 
path.getKey());
+      request.setSSECustomerKey(options.getSSECustomerKey());
       if (position > 0) {
         request.setRange(position, contentLength);
       }
@@ -87,8 +90,8 @@ public int read(ByteBuffer destinationBuffer) throws 
IOException {
       } catch (AmazonClientException e) {
         throw new IOException(e);
       }
-      s3ObjectContentChannel = Channels.newChannel(
-          new BufferedInputStream(s3Object.getObjectContent(), 1024 * 1024));
+      s3ObjectContentChannel =
+          Channels.newChannel(new 
BufferedInputStream(s3Object.getObjectContent(), 1024 * 1024));
     }
 
     int totalBytesRead = 0;
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java
index 17263d9c415..6c1fc7188f8 100644
--- 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java
@@ -29,6 +29,7 @@
 import com.amazonaws.services.s3.model.PartETag;
 import com.amazonaws.services.s3.model.UploadPartRequest;
 import com.amazonaws.services.s3.model.UploadPartResult;
+import com.google.common.annotations.VisibleForTesting;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -36,14 +37,15 @@
 import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.beam.sdk.io.aws.options.S3Options;
+import 
org.apache.beam.sdk.io.aws.options.S3Options.S3UploadBufferSizeBytesFactory;
 
-/**
- * A writable S3 object, as a {@link WritableByteChannel}.
- */
+/** A writable S3 object, as a {@link WritableByteChannel}. */
 class S3WritableByteChannel implements WritableByteChannel {
-
   private final AmazonS3 amazonS3;
+  private final S3Options options;
   private final S3ResourceId path;
+
   private final String uploadId;
   private final ByteBuffer uploadBuffer;
   private final List<PartETag> eTags;
@@ -52,21 +54,38 @@
   private int partNumber = 1;
   private boolean open = true;
 
-  S3WritableByteChannel(AmazonS3 amazonS3, S3ResourceId path, String 
contentType,
-      String storageClass, int uploadBufferSizeBytes)
+  S3WritableByteChannel(AmazonS3 amazonS3, S3ResourceId path, String 
contentType, S3Options options)
       throws IOException {
     this.amazonS3 = checkNotNull(amazonS3, "amazonS3");
+    this.options = checkNotNull(options);
     this.path = checkNotNull(path, "path");
-    checkArgument(uploadBufferSizeBytes > 0, "uploadBufferSizeBytes");
-    this.uploadBuffer = ByteBuffer.allocate(uploadBufferSizeBytes);
+    checkArgument(
+        atMostOne(
+            options.getSSECustomerKey() != null,
+            options.getSSEAlgorithm() != null,
+            options.getSSEAwsKeyManagementParams() != null),
+        "Either SSECustomerKey (SSE-C) or SSEAlgorithm (SSE-S3)"
+            + " or SSEAwsKeyManagementParams (SSE-KMS) must not be set at the 
same time.");
+    // Amazon S3 API docs: Each part must be at least 5 MB in size, except the 
last part.
+    checkArgument(
+        options.getS3UploadBufferSizeBytes()
+            >= S3UploadBufferSizeBytesFactory.MINIMUM_UPLOAD_BUFFER_SIZE_BYTES,
+        "S3UploadBufferSizeBytes must be at least %s bytes",
+        S3UploadBufferSizeBytesFactory.MINIMUM_UPLOAD_BUFFER_SIZE_BYTES);
+    this.uploadBuffer = 
ByteBuffer.allocate(options.getS3UploadBufferSizeBytes());
     eTags = new ArrayList<>();
 
     ObjectMetadata objectMetadata = new ObjectMetadata();
     objectMetadata.setContentType(contentType);
+    if (options.getSSEAlgorithm() != null) {
+      objectMetadata.setSSEAlgorithm(options.getSSEAlgorithm());
+    }
     InitiateMultipartUploadRequest request =
         new InitiateMultipartUploadRequest(path.getBucket(), path.getKey())
-            .withStorageClass(storageClass)
+            .withStorageClass(options.getS3StorageClass())
             .withObjectMetadata(objectMetadata);
+    request.setSSECustomerKey(options.getSSECustomerKey());
+    
request.setSSEAwsKeyManagementParams(options.getSSEAwsKeyManagementParams());
     InitiateMultipartUploadResult result;
     try {
       result = amazonS3.initiateMultipartUpload(request);
@@ -111,6 +130,8 @@ private void flush() throws IOException {
             .withPartNumber(partNumber++)
             .withPartSize(uploadBuffer.remaining())
             .withInputStream(inputStream);
+    request.setSSECustomerKey(options.getSSECustomerKey());
+
     UploadPartResult result;
     try {
       result = amazonS3.uploadPart(request);
@@ -144,4 +165,17 @@ public void close() throws IOException {
       throw new IOException(e);
     }
   }
+
+  @VisibleForTesting
+  static boolean atMostOne(boolean... values) {
+    boolean one = false;
+    for (boolean value : values) {
+      if (!one && value) {
+        one = true;
+      } else if (value) {
+        return false;
+      }
+    }
+    return true;
+  }
 }
diff --git 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/options/AwsModuleTest.java
 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/options/AwsModuleTest.java
index 5047312f7d4..91eafe432eb 100644
--- 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/options/AwsModuleTest.java
+++ 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/options/AwsModuleTest.java
@@ -51,7 +51,7 @@
   private final ObjectMapper objectMapper = new 
ObjectMapper().registerModule(new AwsModule());
 
   @Test
-  public void testObjectMapperIsAbleToFindModule() throws Exception {
+  public void testObjectMapperIsAbleToFindModule() {
     List<Module> modules = 
ObjectMapper.findModules(ReflectHelpers.findClassLoader());
     assertThat(modules, hasItem(Matchers.instanceOf(AwsModule.class)));
   }
diff --git 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/MatchResultMatcher.java
 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/MatchResultMatcher.java
index 04fdbc5f865..db7ca72554f 100644
--- 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/MatchResultMatcher.java
+++ 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/MatchResultMatcher.java
@@ -34,7 +34,7 @@
  * Hamcrest {@link Matcher} to match {@link MatchResult}. Necessary because 
{@link
  * MatchResult#metadata()} throws an exception under normal circumstances.
  */
-public class MatchResultMatcher extends BaseMatcher<MatchResult> {
+class MatchResultMatcher extends BaseMatcher<MatchResult> {
 
   private final MatchResult.Status expectedStatus;
   private final List<MatchResult.Metadata> expectedMetadata;
@@ -54,7 +54,7 @@ static MatchResultMatcher create(List<MatchResult.Metadata> 
expectedMetadata) {
     return new MatchResultMatcher(MatchResult.Status.OK, expectedMetadata, 
null);
   }
 
-  static MatchResultMatcher create(MatchResult.Metadata expectedMetadata) {
+  private static MatchResultMatcher create(MatchResult.Metadata 
expectedMetadata) {
     return create(ImmutableList.of(expectedMetadata));
   }
 
diff --git 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
index b16e9dc5840..55b605dc9e7 100644
--- 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
+++ 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
@@ -15,16 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.beam.sdk.io.aws.s3;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.beam.sdk.io.aws.s3.S3TestUtils.buildMockedS3FileSystem;
+import static org.apache.beam.sdk.io.aws.s3.S3TestUtils.getSSECustomerKeyMd5;
+import static org.apache.beam.sdk.io.aws.s3.S3TestUtils.s3Options;
+import static 
org.apache.beam.sdk.io.aws.s3.S3TestUtils.s3OptionsWithSSECustomerKey;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.notNull;
 import static org.mockito.Mockito.never;
@@ -32,7 +35,6 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
 import com.amazonaws.services.s3.model.CopyObjectRequest;
@@ -40,6 +42,7 @@
 import com.amazonaws.services.s3.model.CopyPartRequest;
 import com.amazonaws.services.s3.model.CopyPartResult;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
 import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
 import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
 import com.amazonaws.services.s3.model.ListObjectsV2Request;
@@ -53,12 +56,10 @@
 import java.util.List;
 import org.apache.beam.sdk.io.aws.options.S3Options;
 import org.apache.beam.sdk.io.fs.MatchResult;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.ArgumentMatcher;
-import org.mockito.Mockito;
 
 /** Test case for {@link S3FileSystem}. */
 @RunWith(JUnit4.class)
@@ -80,39 +81,53 @@ public void testGlobTranslation() {
     assertEquals("foo.*baz", S3FileSystem.wildcardToRegexp("foo**baz"));
   }
 
-  private static S3Options s3Options() {
-    S3Options options = PipelineOptionsFactory.as(S3Options.class);
-    options.setAwsRegion("us-west-1");
-    return options;
-  }
-
-  private S3FileSystem buildMockedS3FileSystem() {
-    S3FileSystem s3FileSystem = new S3FileSystem(s3Options());
-    s3FileSystem.setAmazonS3Client(Mockito.mock(AmazonS3.class));
-    return s3FileSystem;
-  }
-
   @Test
   public void testGetScheme() {
-    S3Options pipelineOptions = s3Options();
-    S3FileSystem s3FileSystem = new S3FileSystem(pipelineOptions);
-
+    S3FileSystem s3FileSystem = new S3FileSystem(s3Options());
     assertEquals("s3", s3FileSystem.getScheme());
   }
 
   @Test
   public void testCopy() throws IOException {
-    S3FileSystem s3FileSystem = buildMockedS3FileSystem();
+    testCopy(s3Options());
+    testCopy(s3OptionsWithSSECustomerKey());
+  }
+
+  private GetObjectMetadataRequest createObjectMetadataRequest(
+      S3ResourceId path, S3Options options) {
+    GetObjectMetadataRequest getObjectMetadataRequest =
+        new GetObjectMetadataRequest(path.getBucket(), path.getKey());
+    getObjectMetadataRequest.setSSECustomerKey(options.getSSECustomerKey());
+    return getObjectMetadataRequest;
+  }
+
+  private void assertGetObjectMetadata(
+      S3FileSystem s3FileSystem,
+      GetObjectMetadataRequest request,
+      S3Options options,
+      ObjectMetadata objectMetadata) {
+    when(s3FileSystem
+            .getAmazonS3Client()
+            .getObjectMetadata(argThat(new 
GetObjectMetadataRequestMatcher(request))))
+        .thenReturn(objectMetadata);
+    assertEquals(
+        getSSECustomerKeyMd5(options),
+        
s3FileSystem.getAmazonS3Client().getObjectMetadata(request).getSSECustomerKeyMd5());
+  }
+
+  private void testCopy(S3Options options) throws IOException {
+    S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options());
 
     S3ResourceId sourcePath = S3ResourceId.fromUri("s3://bucket/from");
     S3ResourceId destinationPath = S3ResourceId.fromUri("s3://bucket/to");
 
     ObjectMetadata objectMetadata = new ObjectMetadata();
     objectMetadata.setContentLength(0);
-    when(s3FileSystem
-            .getAmazonS3Client()
-            .getObjectMetadata(sourcePath.getBucket(), sourcePath.getKey()))
-        .thenReturn(objectMetadata);
+    if (getSSECustomerKeyMd5(options) != null) {
+      objectMetadata.setSSECustomerKeyMd5(getSSECustomerKeyMd5(options));
+    }
+    assertGetObjectMetadata(
+        s3FileSystem, createObjectMetadataRequest(sourcePath, options), 
options, objectMetadata);
 
     s3FileSystem.copy(sourcePath, destinationPath);
 
@@ -120,11 +135,9 @@ public void testCopy() throws IOException {
         .copyObject(argThat(notNullValue(CopyObjectRequest.class)));
 
     // we simulate a big object >= 5GB so it takes the multiPart path
-    objectMetadata.setContentLength((long) 5 * 1024 * 1024 * 1024);
-    when(s3FileSystem
-            .getAmazonS3Client()
-            .getObjectMetadata(sourcePath.getBucket(), sourcePath.getKey()))
-        .thenReturn(objectMetadata);
+    objectMetadata.setContentLength(5_368_709_120L);
+    assertGetObjectMetadata(
+        s3FileSystem, createObjectMetadataRequest(sourcePath, options), 
options, objectMetadata);
 
     try {
       s3FileSystem.copy(sourcePath, destinationPath);
@@ -137,27 +150,52 @@ public void testCopy() throws IOException {
   }
 
   @Test
-  public void testAtomicCopy() throws IOException {
-    S3FileSystem s3FileSystem = buildMockedS3FileSystem();
+  public void testAtomicCopy() {
+    testAtomicCopy(s3Options());
+    testAtomicCopy(s3OptionsWithSSECustomerKey());
+  }
+
+  private void testAtomicCopy(S3Options options) {
+    S3FileSystem s3FileSystem = buildMockedS3FileSystem(options);
 
     S3ResourceId sourcePath = S3ResourceId.fromUri("s3://bucket/from");
     S3ResourceId destinationPath = S3ResourceId.fromUri("s3://bucket/to");
 
     CopyObjectResult copyObjectResult = new CopyObjectResult();
+    if (getSSECustomerKeyMd5(options) != null) {
+      copyObjectResult.setSSECustomerKeyMd5(getSSECustomerKeyMd5(options));
+    }
+    CopyObjectRequest copyObjectRequest =
+        new CopyObjectRequest(
+            sourcePath.getBucket(),
+            sourcePath.getKey(),
+            destinationPath.getBucket(),
+            destinationPath.getKey());
+    copyObjectRequest.setSourceSSECustomerKey(options.getSSECustomerKey());
+    
copyObjectRequest.setDestinationSSECustomerKey(options.getSSECustomerKey());
     when(s3FileSystem
             .getAmazonS3Client()
             .copyObject(argThat(notNullValue(CopyObjectRequest.class))))
         .thenReturn(copyObjectResult);
+    assertEquals(
+        getSSECustomerKeyMd5(options),
+        
s3FileSystem.getAmazonS3Client().copyObject(copyObjectRequest).getSSECustomerKeyMd5());
 
-    s3FileSystem.atomicCopy(sourcePath, destinationPath);
+    ObjectMetadata sourceS3ObjectMetadata = new ObjectMetadata();
+    s3FileSystem.atomicCopy(sourcePath, destinationPath, 
sourceS3ObjectMetadata);
 
-    verify(s3FileSystem.getAmazonS3Client(), times(1))
+    verify(s3FileSystem.getAmazonS3Client(), times(2))
         .copyObject(argThat(notNullValue(CopyObjectRequest.class)));
   }
 
   @Test
-  public void testMultipartCopy() throws IOException {
-    S3FileSystem s3FileSystem = buildMockedS3FileSystem();
+  public void testMultipartCopy() {
+    testMultipartCopy(s3Options());
+    testMultipartCopy(s3OptionsWithSSECustomerKey());
+  }
+
+  private void testMultipartCopy(S3Options options) {
+    S3FileSystem s3FileSystem = buildMockedS3FileSystem(options);
 
     S3ResourceId sourcePath = S3ResourceId.fromUri("s3://bucket/from");
     S3ResourceId destinationPath = S3ResourceId.fromUri("s3://bucket/to");
@@ -165,29 +203,53 @@ public void testMultipartCopy() throws IOException {
     InitiateMultipartUploadResult initiateMultipartUploadResult =
         new InitiateMultipartUploadResult();
     initiateMultipartUploadResult.setUploadId("upload-id");
+    if (getSSECustomerKeyMd5(options) != null) {
+      
initiateMultipartUploadResult.setSSECustomerKeyMd5(getSSECustomerKeyMd5(options));
+    }
     when(s3FileSystem
             .getAmazonS3Client()
             
.initiateMultipartUpload(argThat(notNullValue(InitiateMultipartUploadRequest.class))))
         .thenReturn(initiateMultipartUploadResult);
-
-    ObjectMetadata sourceS3ObjectMetadata = new ObjectMetadata();
-    sourceS3ObjectMetadata.setContentLength(
-        (long) (s3FileSystem.getS3UploadBufferSizeBytes() * 1.5));
-    sourceS3ObjectMetadata.setContentEncoding("read-seek-efficient");
-    when(s3FileSystem
+    assertEquals(
+        getSSECustomerKeyMd5(options),
+        s3FileSystem
             .getAmazonS3Client()
-            .getObjectMetadata(sourcePath.getBucket(), sourcePath.getKey()))
-        .thenReturn(sourceS3ObjectMetadata);
+            .initiateMultipartUpload(
+                new InitiateMultipartUploadRequest(
+                    destinationPath.getBucket(), destinationPath.getKey()))
+            .getSSECustomerKeyMd5());
+
+    ObjectMetadata sourceObjectMetadata = new ObjectMetadata();
+    sourceObjectMetadata.setContentLength(
+        (long) (options.getS3UploadBufferSizeBytes() * 1.5));
+    sourceObjectMetadata.setContentEncoding("read-seek-efficient");
+    if (getSSECustomerKeyMd5(options) != null) {
+      sourceObjectMetadata.setSSECustomerKeyMd5(getSSECustomerKeyMd5(options));
+    }
+    assertGetObjectMetadata(
+        s3FileSystem,
+        createObjectMetadataRequest(sourcePath, options),
+        options,
+        sourceObjectMetadata);
 
     CopyPartResult copyPartResult1 = new CopyPartResult();
     copyPartResult1.setETag("etag-1");
     CopyPartResult copyPartResult2 = new CopyPartResult();
     copyPartResult1.setETag("etag-2");
+    if (getSSECustomerKeyMd5(options) != null) {
+      copyPartResult1.setSSECustomerKeyMd5(getSSECustomerKeyMd5(options));
+      copyPartResult2.setSSECustomerKeyMd5(getSSECustomerKeyMd5(options));
+    }
+    CopyPartRequest copyPartRequest = new CopyPartRequest();
+    copyPartRequest.setSourceSSECustomerKey(options.getSSECustomerKey());
     
when(s3FileSystem.getAmazonS3Client().copyPart(argThat(notNullValue(CopyPartRequest.class))))
         .thenReturn(copyPartResult1)
         .thenReturn(copyPartResult2);
+    assertEquals(
+        getSSECustomerKeyMd5(options),
+        
s3FileSystem.getAmazonS3Client().copyPart(copyPartRequest).getSSECustomerKeyMd5());
 
-    s3FileSystem.multipartCopy(sourcePath, destinationPath, 
sourceS3ObjectMetadata);
+    s3FileSystem.multipartCopy(sourcePath, destinationPath, 
sourceObjectMetadata);
 
     verify(s3FileSystem.getAmazonS3Client(), times(1))
         
.completeMultipartUpload(argThat(notNullValue(CompleteMultipartUploadRequest.class)));
@@ -195,7 +257,7 @@ public void testMultipartCopy() throws IOException {
 
   @Test
   public void deleteThousandsOfObjectsInMultipleBuckets() throws IOException {
-    S3FileSystem s3FileSystem = buildMockedS3FileSystem();
+    S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options());
 
     List<String> buckets = ImmutableList.of("bucket1", "bucket2");
     List<String> keys = new ArrayList<>();
@@ -218,13 +280,18 @@ public void deleteThousandsOfObjectsInMultipleBuckets() 
throws IOException {
 
   @Test
   public void matchNonGlob() {
-    S3FileSystem s3FileSystem = buildMockedS3FileSystem();
+    S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options());
 
     S3ResourceId path = 
S3ResourceId.fromUri("s3://testbucket/testdirectory/filethatexists");
     ObjectMetadata s3ObjectMetadata = new ObjectMetadata();
     s3ObjectMetadata.setContentLength(100);
     s3ObjectMetadata.setContentEncoding("read-seek-efficient");
-    when(s3FileSystem.getAmazonS3Client().getObjectMetadata(path.getBucket(), 
path.getKey()))
+    when(s3FileSystem
+            .getAmazonS3Client()
+            .getObjectMetadata(
+                argThat(
+                    new GetObjectMetadataRequestMatcher(
+                        new GetObjectMetadataRequest(path.getBucket(), 
path.getKey())))))
         .thenReturn(s3ObjectMetadata);
 
     MatchResult result = s3FileSystem.matchNonGlobPath(path);
@@ -241,13 +308,18 @@ public void matchNonGlob() {
 
   @Test
   public void matchNonGlobNotReadSeekEfficient() {
-    S3FileSystem s3FileSystem = buildMockedS3FileSystem();
+    S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options());
 
     S3ResourceId path = 
S3ResourceId.fromUri("s3://testbucket/testdirectory/filethatexists");
     ObjectMetadata s3ObjectMetadata = new ObjectMetadata();
     s3ObjectMetadata.setContentLength(100);
     s3ObjectMetadata.setContentEncoding("gzip");
-    when(s3FileSystem.getAmazonS3Client().getObjectMetadata(path.getBucket(), 
path.getKey()))
+    when(s3FileSystem
+            .getAmazonS3Client()
+            .getObjectMetadata(
+                argThat(
+                    new GetObjectMetadataRequestMatcher(
+                        new GetObjectMetadataRequest(path.getBucket(), 
path.getKey())))))
         .thenReturn(s3ObjectMetadata);
 
     MatchResult result = s3FileSystem.matchNonGlobPath(path);
@@ -264,13 +336,18 @@ public void matchNonGlobNotReadSeekEfficient() {
 
   @Test
   public void matchNonGlobNullContentEncoding() {
-    S3FileSystem s3FileSystem = buildMockedS3FileSystem();
+    S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options());
 
     S3ResourceId path = 
S3ResourceId.fromUri("s3://testbucket/testdirectory/filethatexists");
     ObjectMetadata s3ObjectMetadata = new ObjectMetadata();
     s3ObjectMetadata.setContentLength(100);
     s3ObjectMetadata.setContentEncoding(null);
-    when(s3FileSystem.getAmazonS3Client().getObjectMetadata(path.getBucket(), 
path.getKey()))
+    when(s3FileSystem
+            .getAmazonS3Client()
+            .getObjectMetadata(
+                argThat(
+                    new GetObjectMetadataRequestMatcher(
+                        new GetObjectMetadataRequest(path.getBucket(), 
path.getKey())))))
         .thenReturn(s3ObjectMetadata);
 
     MatchResult result = s3FileSystem.matchNonGlobPath(path);
@@ -286,13 +363,18 @@ public void matchNonGlobNullContentEncoding() {
   }
 
   @Test
-  public void matchNonGlobNotFound() throws IOException {
-    S3FileSystem s3FileSystem = buildMockedS3FileSystem();
+  public void matchNonGlobNotFound() {
+    S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options());
 
     S3ResourceId path = 
S3ResourceId.fromUri("s3://testbucket/testdirectory/nonexistentfile");
     AmazonS3Exception exception = new AmazonS3Exception("mock exception");
     exception.setStatusCode(404);
-    when(s3FileSystem.getAmazonS3Client().getObjectMetadata(path.getBucket(), 
path.getKey()))
+    when(s3FileSystem
+            .getAmazonS3Client()
+            .getObjectMetadata(
+                argThat(
+                    new GetObjectMetadataRequestMatcher(
+                        new GetObjectMetadataRequest(path.getBucket(), 
path.getKey())))))
         .thenThrow(exception);
 
     MatchResult result = s3FileSystem.matchNonGlobPath(path);
@@ -302,13 +384,18 @@ public void matchNonGlobNotFound() throws IOException {
   }
 
   @Test
-  public void matchNonGlobForbidden() throws IOException {
-    S3FileSystem s3FileSystem = buildMockedS3FileSystem();
+  public void matchNonGlobForbidden() {
+    S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options());
 
     AmazonS3Exception exception = new AmazonS3Exception("mock exception");
     exception.setStatusCode(403);
     S3ResourceId path = 
S3ResourceId.fromUri("s3://testbucket/testdirectory/keyname");
-    when(s3FileSystem.getAmazonS3Client().getObjectMetadata(path.getBucket(), 
path.getKey()))
+    when(s3FileSystem
+            .getAmazonS3Client()
+            .getObjectMetadata(
+                argThat(
+                    new GetObjectMetadataRequestMatcher(
+                        new GetObjectMetadataRequest(path.getBucket(), 
path.getKey())))))
         .thenThrow(exception);
 
     assertThat(
@@ -317,7 +404,6 @@ public void matchNonGlobForbidden() throws IOException {
   }
 
   static class ListObjectsV2RequestArgumentMatches extends 
ArgumentMatcher<ListObjectsV2Request> {
-
     private final ListObjectsV2Request expected;
 
     ListObjectsV2RequestArgumentMatches(ListObjectsV2Request expected) {
@@ -326,7 +412,7 @@ public void matchNonGlobForbidden() throws IOException {
 
     @Override
     public boolean matches(Object argument) {
-      if (argument != null && argument instanceof ListObjectsV2Request) {
+      if (argument instanceof ListObjectsV2Request) {
         ListObjectsV2Request actual = (ListObjectsV2Request) argument;
         return expected.getBucketName().equals(actual.getBucketName())
             && expected.getPrefix().equals(actual.getPrefix())
@@ -340,7 +426,7 @@ public boolean matches(Object argument) {
 
   @Test
   public void matchGlob() throws IOException {
-    S3FileSystem s3FileSystem = buildMockedS3FileSystem();
+    S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options());
 
     S3ResourceId path = S3ResourceId.fromUri("s3://testbucket/foo/bar*baz");
 
@@ -397,8 +483,7 @@ public void matchGlob() throws IOException {
     // Expect object metadata queries for content encoding
     ObjectMetadata metadata = new ObjectMetadata();
     metadata.setContentEncoding("");
-    when(s3FileSystem.getAmazonS3Client().getObjectMetadata(anyString(), 
anyString()))
-        .thenReturn(metadata);
+    
when(s3FileSystem.getAmazonS3Client().getObjectMetadata(anyObject())).thenReturn(metadata);
 
     assertThat(
         s3FileSystem.matchGlobPaths(ImmutableList.of(path)).get(0),
@@ -422,7 +507,7 @@ public void matchGlob() throws IOException {
 
   @Test
   public void matchGlobWithSlashes() throws IOException {
-    S3FileSystem s3FileSystem = buildMockedS3FileSystem();
+    S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options());
 
     S3ResourceId path = S3ResourceId.fromUri("s3://testbucket/foo/bar\\baz*");
 
@@ -456,8 +541,7 @@ public void matchGlobWithSlashes() throws IOException {
     // Expect object metadata queries for content encoding
     ObjectMetadata metadata = new ObjectMetadata();
     metadata.setContentEncoding("");
-    when(s3FileSystem.getAmazonS3Client().getObjectMetadata(anyString(), 
anyString()))
-        .thenReturn(metadata);
+    
when(s3FileSystem.getAmazonS3Client().getObjectMetadata(anyObject())).thenReturn(metadata);
 
     assertThat(
         s3FileSystem.matchGlobPaths(ImmutableList.of(path)).get(0),
@@ -474,7 +558,7 @@ public void matchGlobWithSlashes() throws IOException {
 
   @Test
   public void matchVariousInvokeThreadPool() throws IOException {
-    S3FileSystem s3FileSystem = buildMockedS3FileSystem();
+    S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options());
 
     AmazonS3Exception notFoundException = new AmazonS3Exception("mock 
exception");
     notFoundException.setStatusCode(404);
@@ -482,7 +566,11 @@ public void matchVariousInvokeThreadPool() throws 
IOException {
         S3ResourceId.fromUri("s3://testbucket/testdirectory/nonexistentfile");
     when(s3FileSystem
             .getAmazonS3Client()
-            .getObjectMetadata(pathNotExist.getBucket(), 
pathNotExist.getKey()))
+            .getObjectMetadata(
+                argThat(
+                    new GetObjectMetadataRequestMatcher(
+                        new GetObjectMetadataRequest(
+                            pathNotExist.getBucket(), 
pathNotExist.getKey())))))
         .thenThrow(notFoundException);
 
     AmazonS3Exception forbiddenException = new AmazonS3Exception("mock 
exception");
@@ -491,7 +579,11 @@ public void matchVariousInvokeThreadPool() throws 
IOException {
         S3ResourceId.fromUri("s3://testbucket/testdirectory/forbiddenfile");
     when(s3FileSystem
             .getAmazonS3Client()
-            .getObjectMetadata(pathForbidden.getBucket(), 
pathForbidden.getKey()))
+            .getObjectMetadata(
+                argThat(
+                    new GetObjectMetadataRequestMatcher(
+                        new GetObjectMetadataRequest(
+                            pathForbidden.getBucket(), 
pathForbidden.getKey())))))
         .thenThrow(forbiddenException);
 
     S3ResourceId pathExist = 
S3ResourceId.fromUri("s3://testbucket/testdirectory/filethatexists");
@@ -500,7 +592,10 @@ public void matchVariousInvokeThreadPool() throws 
IOException {
     s3ObjectMetadata.setContentEncoding("not-gzip");
     when(s3FileSystem
             .getAmazonS3Client()
-            .getObjectMetadata(pathExist.getBucket(), pathExist.getKey()))
+            .getObjectMetadata(
+                argThat(
+                    new GetObjectMetadataRequestMatcher(
+                        new GetObjectMetadataRequest(pathExist.getBucket(), 
pathExist.getKey())))))
         .thenReturn(s3ObjectMetadata);
 
     S3ResourceId pathGlob = S3ResourceId.fromUri("s3://testbucket/path/part*");
@@ -518,7 +613,12 @@ public void matchVariousInvokeThreadPool() throws 
IOException {
 
     ObjectMetadata metadata = new ObjectMetadata();
     metadata.setContentEncoding("");
-    
when(s3FileSystem.getAmazonS3Client().getObjectMetadata(pathGlob.getBucket(), 
"path/part-0"))
+    when(s3FileSystem
+            .getAmazonS3Client()
+            .getObjectMetadata(
+                argThat(
+                    new GetObjectMetadataRequestMatcher(
+                        new GetObjectMetadataRequest(pathGlob.getBucket(), 
"path/part-0")))))
         .thenReturn(metadata);
 
     assertThat(
@@ -538,4 +638,24 @@ public void matchVariousInvokeThreadPool() throws 
IOException {
                 S3ResourceId.fromComponents(pathGlob.getBucket(), 
foundListObject.getKey()),
                 true)));
   }
+
+  /** A mockito argument matcher to implement equality on 
GetObjectMetadataRequest. */
+  private static class GetObjectMetadataRequestMatcher
+      extends ArgumentMatcher<GetObjectMetadataRequest> {
+    private final GetObjectMetadataRequest expected;
+
+    GetObjectMetadataRequestMatcher(GetObjectMetadataRequest expected) {
+      this.expected = expected;
+    }
+
+    @Override
+    public boolean matches(Object obj) {
+      if (!(obj instanceof GetObjectMetadataRequest)) {
+        return false;
+      }
+      GetObjectMetadataRequest actual = (GetObjectMetadataRequest) obj;
+      return actual.getBucketName().equals(expected.getBucketName())
+          && actual.getKey().equals(expected.getKey());
+    }
+  }
 }
diff --git 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3ResourceIdTest.java
 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3ResourceIdTest.java
index 97fd2bc7b3c..4354833ae92 100644
--- 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3ResourceIdTest.java
+++ 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3ResourceIdTest.java
@@ -23,6 +23,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Arrays;
@@ -67,7 +68,7 @@
 
   // Each test case is an expected URL, then the components used to build it.
   // Empty components result in a double slash.
-  static final List<TestCase> PATH_TEST_CASES =
+  private static final List<TestCase> PATH_TEST_CASES =
       Arrays.asList(
           new TestCase("s3://bucket/", "", RESOLVE_DIRECTORY,
               "s3://bucket/"),
@@ -78,7 +79,7 @@
       );
 
   @Test
-  public void testResolve() throws Exception {
+  public void testResolve() {
     for (TestCase testCase : PATH_TEST_CASES) {
       ResourceId resourceId = S3ResourceId.fromUri(testCase.baseUri);
       ResourceId resolved = resourceId.resolve(testCase.relativePath, 
testCase.resolveOptions);
@@ -117,14 +118,14 @@ public void testResolve() throws Exception {
   }
 
   @Test
-  public void testResolveInvalidInputs() throws Exception {
+  public void testResolveInvalidInputs() {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("Cannot resolve a file with a directory path: 
[tmp/]");
     S3ResourceId.fromUri("s3://my_bucket/").resolve("tmp/", 
StandardResolveOptions.RESOLVE_FILE);
   }
 
   @Test
-  public void testResolveInvalidNotDirectory() throws Exception {
+  public void testResolveInvalidNotDirectory() {
     ResourceId tmpDir = S3ResourceId.fromUri("s3://my_bucket/")
         .resolve("tmp dir", StandardResolveOptions.RESOLVE_FILE);
 
@@ -149,7 +150,7 @@ public void testResolveParentToFile() {
   }
 
   @Test
-  public void testGetCurrentDirectory() throws Exception {
+  public void testGetCurrentDirectory() {
     // Tests gcs paths.
     assertEquals(
         S3ResourceId.fromUri("s3://my_bucket/tmp dir/"),
@@ -170,7 +171,7 @@ public void testGetCurrentDirectory() throws Exception {
   }
 
   @Test
-  public void testIsDirectory() throws Exception {
+  public void testIsDirectory() {
     assertTrue(S3ResourceId.fromUri("s3://my_bucket/tmp dir/").isDirectory());
     assertTrue(S3ResourceId.fromUri("s3://my_bucket/").isDirectory());
     assertTrue(S3ResourceId.fromUri("s3://my_bucket").isDirectory());
@@ -178,21 +179,21 @@ public void testIsDirectory() throws Exception {
   }
 
   @Test
-  public void testInvalidPathNoBucket() throws Exception {
+  public void testInvalidPathNoBucket() {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("Invalid S3 URI: [s3://]");
     S3ResourceId.fromUri("s3://");
   }
 
   @Test
-  public void testInvalidPathNoBucketAndSlash() throws Exception {
+  public void testInvalidPathNoBucketAndSlash() {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("Invalid S3 URI: [s3:///]");
     S3ResourceId.fromUri("s3:///");
   }
 
   @Test
-  public void testGetScheme() throws Exception {
+  public void testGetScheme() {
     // Tests gcs paths.
     assertEquals("s3", S3ResourceId.fromUri("s3://my_bucket/tmp 
dir/").getScheme());
 
@@ -201,17 +202,17 @@ public void testGetScheme() throws Exception {
   }
 
   @Test
-  public void testGetFilename() throws Exception {
-    assertEquals(S3ResourceId.fromUri("s3://my_bucket/").getFilename(), null);
-    assertEquals(S3ResourceId.fromUri("s3://my_bucket/abc").getFilename(), 
"abc");
-    assertEquals(S3ResourceId.fromUri("s3://my_bucket/abc/").getFilename(), 
"abc");
-    assertEquals(S3ResourceId.fromUri("s3://my_bucket/abc/def").getFilename(), 
"def");
-    
assertEquals(S3ResourceId.fromUri("s3://my_bucket/abc/def/").getFilename(), 
"def");
-    
assertEquals(S3ResourceId.fromUri("s3://my_bucket/abc/xyz.txt").getFilename(), 
"xyz.txt");
+  public void testGetFilename() {
+    assertNull(S3ResourceId.fromUri("s3://my_bucket/").getFilename());
+    assertEquals("abc", 
S3ResourceId.fromUri("s3://my_bucket/abc").getFilename());
+    assertEquals("abc", 
S3ResourceId.fromUri("s3://my_bucket/abc/").getFilename());
+    assertEquals("def", 
S3ResourceId.fromUri("s3://my_bucket/abc/def").getFilename());
+    assertEquals("def", 
S3ResourceId.fromUri("s3://my_bucket/abc/def/").getFilename());
+    assertEquals("xyz.txt", 
S3ResourceId.fromUri("s3://my_bucket/abc/xyz.txt").getFilename());
   }
 
   @Test
-  public void testParentRelationship() throws Exception {
+  public void testParentRelationship() {
     S3ResourceId path = S3ResourceId.fromUri("s3://bucket/dir/subdir/object");
     assertEquals("bucket", path.getBucket());
     assertEquals("dir/subdir/object", path.getKey());
@@ -232,7 +233,7 @@ public void testParentRelationship() throws Exception {
   }
 
   @Test
-  public void testBucketParsing() throws Exception {
+  public void testBucketParsing() {
     S3ResourceId path = S3ResourceId.fromUri("s3://bucket");
     S3ResourceId path2 = S3ResourceId.fromUri("s3://bucket/");
 
@@ -241,7 +242,7 @@ public void testBucketParsing() throws Exception {
   }
 
   @Test
-  public void testS3ResourceIdToString() throws Exception {
+  public void testS3ResourceIdToString() {
     String filename = "s3://some-bucket/some/file.txt";
     S3ResourceId path = S3ResourceId.fromUri(filename);
     assertEquals(filename, path.toString());
@@ -297,7 +298,7 @@ public void testInvalidBucketWithUnderscore() {
 
   @Test
   @Ignore("https://issues.apache.org/jira/browse/BEAM-4184";)
-  public void testResourceIdTester() throws Exception {
+  public void testResourceIdTester() {
     S3Options options = PipelineOptionsFactory.create().as(S3Options.class);
     options.setAwsRegion("us-west-1");
     FileSystems.setDefaultPipelineOptions(options);
diff --git 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3TestUtils.java
 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3TestUtils.java
new file mode 100644
index 00000000000..1452e72f878
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3TestUtils.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
+import com.amazonaws.services.s3.model.SSECustomerKey;
+import com.amazonaws.util.Base64;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.aws.options.S3Options;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.mockito.Mockito;
+
+/** Utils to test S3 filesystem. */
+class S3TestUtils {
+  static S3Options s3Options() {
+    S3Options options = PipelineOptionsFactory.as(S3Options.class);
+    options.setAwsRegion("us-west-1");
+    options.setS3UploadBufferSizeBytes(5_242_880);
+    return options;
+  }
+
+  static S3Options s3OptionsWithSSEAlgorithm() {
+    S3Options options = s3Options();
+    options.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
+    return options;
+  }
+
+  static S3Options s3OptionsWithSSECustomerKey() {
+    S3Options options = s3Options();
+    options.setSSECustomerKey(new 
SSECustomerKey("86glyTlCNZgccSxW8JxMa6ZdjdK3N141glAysPUZ3AA="));
+    return options;
+  }
+
+  static S3Options s3OptionsWithSSEAwsKeyManagementParams() {
+    S3Options options = s3Options();
+    String awsKmsKeyId =
+            
"arn:aws:kms:eu-west-1:123456789012:key/dc123456-7890-ABCD-EF01-234567890ABC";
+    SSEAwsKeyManagementParams sseAwsKeyManagementParams =
+        new SSEAwsKeyManagementParams(awsKmsKeyId);
+    options.setSSEAwsKeyManagementParams(sseAwsKeyManagementParams);
+    return options;
+  }
+
+  static S3Options s3OptionsWithMultipleSSEOptions() {
+    S3Options options = s3OptionsWithSSEAwsKeyManagementParams();
+    options.setSSECustomerKey(new 
SSECustomerKey("86glyTlCNZgccSxW8JxMa6ZdjdK3N141glAysPUZ3AA="));
+    return options;
+  }
+
+  static S3FileSystem buildMockedS3FileSystem(S3Options options) {
+    S3FileSystem s3FileSystem = new S3FileSystem(options);
+    s3FileSystem.setAmazonS3Client(Mockito.mock(AmazonS3.class));
+    return s3FileSystem;
+  }
+
+  @Nullable
+  static String getSSECustomerKeyMd5(S3Options options) {
+    SSECustomerKey sseCostumerKey = options.getSSECustomerKey();
+    if (sseCostumerKey != null) {
+      return 
Base64.encodeAsString(DigestUtils.md5(Base64.decode(sseCostumerKey.getKey())));
+    }
+    return null;
+  }
+}
diff --git 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannelTest.java
 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannelTest.java
index cfa4672b0da..5930c1f48a9 100644
--- 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannelTest.java
+++ 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannelTest.java
@@ -17,74 +17,142 @@
  */
 package org.apache.beam.sdk.io.aws.s3;
 
+import static org.apache.beam.sdk.io.aws.s3.S3TestUtils.getSSECustomerKeyMd5;
+import static org.apache.beam.sdk.io.aws.s3.S3TestUtils.s3Options;
+import static 
org.apache.beam.sdk.io.aws.s3.S3TestUtils.s3OptionsWithMultipleSSEOptions;
+import static 
org.apache.beam.sdk.io.aws.s3.S3TestUtils.s3OptionsWithSSEAlgorithm;
+import static 
org.apache.beam.sdk.io.aws.s3.S3TestUtils.s3OptionsWithSSEAwsKeyManagementParams;
+import static 
org.apache.beam.sdk.io.aws.s3.S3TestUtils.s3OptionsWithSSECustomerKey;
+import static org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.atMostOne;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.notNull;
 import static org.mockito.Mockito.RETURNS_SMART_NULLS;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.withSettings;
 
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
 import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
 import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
 import com.amazonaws.services.s3.model.UploadPartRequest;
 import com.amazonaws.services.s3.model.UploadPartResult;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import org.apache.beam.sdk.io.aws.options.S3Options;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/**
- * Tests {@link S3WritableByteChannel}.
- */
+/** Tests {@link S3WritableByteChannel}. */
 @RunWith(JUnit4.class)
 public class S3WritableByteChannelTest {
+  @Rule public ExpectedException expected = ExpectedException.none();
 
   @Test
   public void write() throws IOException {
+    writeFromOptions(s3Options());
+    writeFromOptions(s3OptionsWithSSEAlgorithm());
+    writeFromOptions(s3OptionsWithSSECustomerKey());
+    writeFromOptions(s3OptionsWithSSEAwsKeyManagementParams());
+    expected.expect(IllegalArgumentException.class);
+    writeFromOptions(s3OptionsWithMultipleSSEOptions());
+  }
+
+  private void writeFromOptions(S3Options options) throws IOException {
     AmazonS3 mockAmazonS3 = mock(AmazonS3.class, 
withSettings().defaultAnswer(RETURNS_SMART_NULLS));
+    S3ResourceId path = S3ResourceId.fromUri("s3://bucket/dir/file");
 
     InitiateMultipartUploadResult initiateMultipartUploadResult =
         new InitiateMultipartUploadResult();
     initiateMultipartUploadResult.setUploadId("upload-id");
-    when(mockAmazonS3.initiateMultipartUpload(
-        argThat(notNullValue(InitiateMultipartUploadRequest.class))))
-        .thenReturn(initiateMultipartUploadResult);
+    String sseAlgorithm = options.getSSEAlgorithm();
+    if (options.getSSEAlgorithm() != null) {
+      initiateMultipartUploadResult.setSSEAlgorithm(sseAlgorithm);
+    }
+    if (getSSECustomerKeyMd5(options) != null) {
+      
initiateMultipartUploadResult.setSSECustomerKeyMd5(getSSECustomerKeyMd5(options));
+    }
+    if (options.getSSEAwsKeyManagementParams() != null) {
+      sseAlgorithm = "aws:kms";
+      initiateMultipartUploadResult.setSSEAlgorithm(sseAlgorithm);
+    }
+    doReturn(initiateMultipartUploadResult)
+        .when(mockAmazonS3)
+        
.initiateMultipartUpload(argThat(notNullValue(InitiateMultipartUploadRequest.class)));
+
+    InitiateMultipartUploadResult mockInitiateMultipartUploadResult =
+        mockAmazonS3.initiateMultipartUpload(
+            new InitiateMultipartUploadRequest(path.getBucket(), 
path.getKey()));
+    assertEquals(sseAlgorithm, 
mockInitiateMultipartUploadResult.getSSEAlgorithm());
+    assertEquals(
+        getSSECustomerKeyMd5(options), 
mockInitiateMultipartUploadResult.getSSECustomerKeyMd5());
+
     UploadPartResult result = new UploadPartResult();
     result.setETag("etag");
-    
when(mockAmazonS3.uploadPart(argThat(notNullValue(UploadPartRequest.class))))
-        .thenReturn(result);
+    if (getSSECustomerKeyMd5(options) != null) {
+      result.setSSECustomerKeyMd5(getSSECustomerKeyMd5(options));
+    }
+    
doReturn(result).when(mockAmazonS3).uploadPart(argThat(notNullValue(UploadPartRequest.class)));
 
-    S3ResourceId path = S3ResourceId.fromUri("s3://bucket/dir/file");
-    int uploadBufferSize = 10;
+    UploadPartResult mockUploadPartResult = mockAmazonS3.uploadPart(new 
UploadPartRequest());
+    assertEquals(getSSECustomerKeyMd5(options), 
mockUploadPartResult.getSSECustomerKeyMd5());
 
     S3WritableByteChannel channel =
-        new S3WritableByteChannel(mockAmazonS3, path, "text/plain", 
"STANDARD", uploadBufferSize);
-    int contentSize = 65;
+        new S3WritableByteChannel(mockAmazonS3, path, "text/plain", options);
+    int contentSize = 34_078_720;
     ByteBuffer uploadContent = ByteBuffer.allocate((int) (contentSize * 2.5));
-    for (byte i = 0; i < contentSize; i++) {
-      uploadContent.put(i);
+    for (int i = 0; i < contentSize; i++) {
+      uploadContent.put((byte) 0xff);
     }
     uploadContent.flip();
 
     int uploadedSize = channel.write(uploadContent);
     assertEquals(contentSize, uploadedSize);
 
+    CompleteMultipartUploadResult completeMultipartUploadResult =
+        new CompleteMultipartUploadResult();
+    doReturn(completeMultipartUploadResult)
+        .when(mockAmazonS3)
+        
.completeMultipartUpload(argThat(notNullValue(CompleteMultipartUploadRequest.class)));
+
     channel.close();
 
-    verify(mockAmazonS3, times(1))
+    verify(mockAmazonS3, times(2))
         
.initiateMultipartUpload(notNull(InitiateMultipartUploadRequest.class));
-    int partQuantity = (int) Math.ceil((double) contentSize / 
uploadBufferSize);
+    int partQuantity =
+        ((int) Math.ceil((double) contentSize / 
options.getS3UploadBufferSizeBytes()) + 1);
     verify(mockAmazonS3, 
times(partQuantity)).uploadPart(notNull(UploadPartRequest.class));
     verify(mockAmazonS3, times(1))
         
.completeMultipartUpload(notNull(CompleteMultipartUploadRequest.class));
     verifyNoMoreInteractions(mockAmazonS3);
   }
+
+  @Test
+  public void testAtMostOne() {
+    assertTrue(atMostOne(true));
+    assertTrue(atMostOne(false));
+    assertFalse(atMostOne(true, true));
+    assertTrue(atMostOne(true, false));
+    assertTrue(atMostOne(false, true));
+    assertTrue(atMostOne(false, false));
+    assertFalse(atMostOne(true, true, true));
+    assertFalse(atMostOne(true, true, false));
+    assertFalse(atMostOne(true, false, true));
+    assertTrue(atMostOne(true, false, false));
+    assertFalse(atMostOne(false, true, true));
+    assertTrue(atMostOne(false, true, false));
+    assertTrue(atMostOne(false, false, true));
+    assertTrue(atMostOne(false, false, false));
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 101702)
    Time Spent: 1h 20m  (was: 1h 10m)

> Support encryption for S3FileSystem (SSE-S3, SSE-C and SSE-KMS)
> ---------------------------------------------------------------
>
>                 Key: BEAM-3813
>                 URL: https://issues.apache.org/jira/browse/BEAM-3813
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-aws
>            Reporter: Ismaël Mejía
>            Assignee: Ismaël Mejía
>            Priority: Minor
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> We should enable AWS S3 users to use encryption when reading or writing by 
> provide encryption keys or using server side encryption via an algorithm, or 
> a key management system (KMS)..
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to