This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 670cb19  [BEAM-5196] Add MD5 consistency check on S3 uploads (writes)
     new 90ca21e  Merge pull request #6232: [BEAM-5196] Add MD5 consistency 
check on S3 uploads (writes)
670cb19 is described below

commit 670cb197ba74a79ae73085a3c4db5040c8e55904
Author: Leen Toelen <leen.toe...@tomtom.com>
AuthorDate: Wed Aug 15 22:03:51 2018 +0200

    [BEAM-5196] Add MD5 consistency check on S3 uploads (writes)
---
 sdks/java/io/amazon-web-services/build.gradle      |  1 +
 .../beam/sdk/io/aws/s3/S3WritableByteChannel.java  | 15 +++++
 .../beam/sdk/io/aws/s3/S3FileSystemTest.java       | 65 ++++++++++++++++++++++
 .../org/apache/beam/sdk/io/aws/s3/S3TestUtils.java |  6 +-
 4 files changed, 86 insertions(+), 1 deletion(-)

diff --git a/sdks/java/io/amazon-web-services/build.gradle 
b/sdks/java/io/amazon-web-services/build.gradle
index cbcc97d..973d115 100644
--- a/sdks/java/io/amazon-web-services/build.gradle
+++ b/sdks/java/io/amazon-web-services/build.gradle
@@ -42,4 +42,5 @@ dependencies {
   shadowTest library.java.mockito_core
   shadowTest library.java.junit
   shadowTest library.java.slf4j_jdk14
+  testCompile "io.findify:s3mock_2.11:0.2.4"
 }
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java
index 6c1fc71..c061adc 100644
--- 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java
@@ -29,12 +29,15 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.PartETag;
 import com.amazonaws.services.s3.model.UploadPartRequest;
 import com.amazonaws.services.s3.model.UploadPartResult;
+import com.amazonaws.util.Base64;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.WritableByteChannel;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.io.aws.options.S3Options;
@@ -53,6 +56,7 @@ class S3WritableByteChannel implements WritableByteChannel {
   // AWS S3 parts are 1-indexed, not zero-indexed.
   private int partNumber = 1;
   private boolean open = true;
+  private final MessageDigest md5 = md5();
 
   S3WritableByteChannel(AmazonS3 amazonS3, S3ResourceId path, String 
contentType, S3Options options)
       throws IOException {
@@ -95,6 +99,14 @@ class S3WritableByteChannel implements WritableByteChannel {
     uploadId = result.getUploadId();
   }
 
+  private static MessageDigest md5() {
+    try {
+      return MessageDigest.getInstance("MD5");
+    } catch (NoSuchAlgorithmException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
   @Override
   public int write(ByteBuffer sourceBuffer) throws IOException {
     if (!isOpen()) {
@@ -109,6 +121,7 @@ class S3WritableByteChannel implements WritableByteChannel {
       byte[] copyBuffer = new byte[bytesWritten];
       sourceBuffer.get(copyBuffer);
       uploadBuffer.put(copyBuffer);
+      md5.update(copyBuffer);
 
       if (!uploadBuffer.hasRemaining() || sourceBuffer.hasRemaining()) {
         flush();
@@ -129,6 +142,7 @@ class S3WritableByteChannel implements WritableByteChannel {
             .withUploadId(uploadId)
             .withPartNumber(partNumber++)
             .withPartSize(uploadBuffer.remaining())
+            .withMD5Digest(Base64.encodeAsString(md5.digest()))
             .withInputStream(inputStream);
     request.setSSECustomerKey(options.getSSECustomerKey());
 
@@ -139,6 +153,7 @@ class S3WritableByteChannel implements WritableByteChannel {
       throw new IOException(e);
     }
     uploadBuffer.clear();
+    md5.reset();
     eTags.add(result.getPartETag());
   }
 
diff --git 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
index 63f69b5..0abf217 100644
--- 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
+++ 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
@@ -22,9 +22,11 @@ import static 
org.apache.beam.sdk.io.aws.s3.S3TestUtils.buildMockedS3FileSystem;
 import static org.apache.beam.sdk.io.aws.s3.S3TestUtils.getSSECustomerKeyMd5;
 import static org.apache.beam.sdk.io.aws.s3.S3TestUtils.s3Options;
 import static 
org.apache.beam.sdk.io.aws.s3.S3TestUtils.s3OptionsWithSSECustomerKey;
+import static 
org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions.builder;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.anyObject;
@@ -35,6 +37,12 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import akka.http.scaladsl.Http;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.AnonymousAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
 import com.amazonaws.services.s3.model.CopyObjectRequest;
@@ -50,12 +58,18 @@ import com.amazonaws.services.s3.model.ListObjectsV2Result;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.google.common.collect.ImmutableList;
+import io.findify.s3mock.S3Mock;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.io.aws.options.S3Options;
 import org.apache.beam.sdk.io.fs.MatchResult;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -64,6 +78,29 @@ import org.mockito.ArgumentMatcher;
 /** Test case for {@link S3FileSystem}. */
 @RunWith(JUnit4.class)
 public class S3FileSystemTest {
+  private static S3Mock api;
+  private static AmazonS3 client;
+
+  @BeforeClass
+  public static void beforeClass() {
+    api = new S3Mock.Builder().withInMemoryBackend().build();
+    Http.ServerBinding binding = api.start();
+
+    EndpointConfiguration endpoint =
+        new EndpointConfiguration(
+            "http://localhost:"; + binding.localAddress().getPort(), 
"us-west-2");
+    client =
+        AmazonS3ClientBuilder.standard()
+            .withPathStyleAccessEnabled(true)
+            .withEndpointConfiguration(endpoint)
+            .withCredentials(new AWSStaticCredentialsProvider(new 
AnonymousAWSCredentials()))
+            .build();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    api.stop();
+  }
 
   @Test
   public void testGlobTranslation() {
@@ -638,6 +675,34 @@ public class S3FileSystemTest {
                 true)));
   }
 
+  @Test
+  public void testWriteAndRead() throws IOException {
+    S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options(), client);
+
+    client.createBucket("testbucket");
+
+    byte[] writtenArray = new byte[] {0};
+    ByteBuffer bb = ByteBuffer.allocate(writtenArray.length);
+    bb.put(writtenArray);
+
+    //First create an object and write data to it
+    S3ResourceId path = S3ResourceId.fromUri("s3://testbucket/foo/bar.txt");
+    WritableByteChannel writableByteChannel =
+        s3FileSystem.create(path, 
builder().setMimeType("application/text").build());
+    writableByteChannel.write(bb);
+    writableByteChannel.close();
+
+    //Now read the same object
+    ByteBuffer bb2 = ByteBuffer.allocate(writtenArray.length);
+    ReadableByteChannel open = s3FileSystem.open(path);
+    open.read(bb2);
+
+    //And compare the content with the one that was written
+    byte[] readArray = bb2.array();
+    assertArrayEquals(readArray, writtenArray);
+    open.close();
+  }
+
   /** A mockito argument matcher to implement equality on 
GetObjectMetadataRequest. */
   private static class GetObjectMetadataRequestMatcher
       extends ArgumentMatcher<GetObjectMetadataRequest> {
diff --git 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3TestUtils.java
 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3TestUtils.java
index 52fc1af..fdd6733 100644
--- 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3TestUtils.java
+++ 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3TestUtils.java
@@ -66,8 +66,12 @@ class S3TestUtils {
   }
 
   static S3FileSystem buildMockedS3FileSystem(S3Options options) {
+    return buildMockedS3FileSystem(options, Mockito.mock(AmazonS3.class));
+  }
+
+  static S3FileSystem buildMockedS3FileSystem(S3Options options, AmazonS3 
client) {
     S3FileSystem s3FileSystem = new S3FileSystem(options);
-    s3FileSystem.setAmazonS3Client(Mockito.mock(AmazonS3.class));
+    s3FileSystem.setAmazonS3Client(client);
     return s3FileSystem;
   }
 

Reply via email to