This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 670cb19 [BEAM-5196] Add MD5 consistency check on S3 uploads (writes) new 90ca21e Merge pull request #6232: [BEAM-5196] Add MD5 consistency check on S3 uploads (writes) 670cb19 is described below commit 670cb197ba74a79ae73085a3c4db5040c8e55904 Author: Leen Toelen <leen.toe...@tomtom.com> AuthorDate: Wed Aug 15 22:03:51 2018 +0200 [BEAM-5196] Add MD5 consistency check on S3 uploads (writes) --- sdks/java/io/amazon-web-services/build.gradle | 1 + .../beam/sdk/io/aws/s3/S3WritableByteChannel.java | 15 +++++ .../beam/sdk/io/aws/s3/S3FileSystemTest.java | 65 ++++++++++++++++++++++ .../org/apache/beam/sdk/io/aws/s3/S3TestUtils.java | 6 +- 4 files changed, 86 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/amazon-web-services/build.gradle b/sdks/java/io/amazon-web-services/build.gradle index cbcc97d..973d115 100644 --- a/sdks/java/io/amazon-web-services/build.gradle +++ b/sdks/java/io/amazon-web-services/build.gradle @@ -42,4 +42,5 @@ dependencies { shadowTest library.java.mockito_core shadowTest library.java.junit shadowTest library.java.slf4j_jdk14 + testCompile "io.findify:s3mock_2.11:0.2.4" } diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java index 6c1fc71..c061adc 100644 --- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java +++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java @@ -29,12 +29,15 @@ import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartResult; +import com.amazonaws.util.Base64; import com.google.common.annotations.VisibleForTesting; import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.WritableByteChannel; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.io.aws.options.S3Options; @@ -53,6 +56,7 @@ class S3WritableByteChannel implements WritableByteChannel { // AWS S3 parts are 1-indexed, not zero-indexed. private int partNumber = 1; private boolean open = true; + private final MessageDigest md5 = md5(); S3WritableByteChannel(AmazonS3 amazonS3, S3ResourceId path, String contentType, S3Options options) throws IOException { @@ -95,6 +99,14 @@ class S3WritableByteChannel implements WritableByteChannel { uploadId = result.getUploadId(); } + private static MessageDigest md5() { + try { + return MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException(e); + } + } + @Override public int write(ByteBuffer sourceBuffer) throws IOException { if (!isOpen()) { @@ -109,6 +121,7 @@ class S3WritableByteChannel implements WritableByteChannel { byte[] copyBuffer = new byte[bytesWritten]; sourceBuffer.get(copyBuffer); uploadBuffer.put(copyBuffer); + md5.update(copyBuffer); if (!uploadBuffer.hasRemaining() || sourceBuffer.hasRemaining()) { flush(); @@ -129,6 +142,7 @@ class S3WritableByteChannel implements WritableByteChannel { .withUploadId(uploadId) .withPartNumber(partNumber++) .withPartSize(uploadBuffer.remaining()) + .withMD5Digest(Base64.encodeAsString(md5.digest())) .withInputStream(inputStream); request.setSSECustomerKey(options.getSSECustomerKey()); @@ -139,6 +153,7 @@ class S3WritableByteChannel implements WritableByteChannel { throw new IOException(e); } uploadBuffer.clear(); + md5.reset(); eTags.add(result.getPartETag()); } diff --git a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java index 63f69b5..0abf217 100644 --- a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java +++ b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java @@ -22,9 +22,11 @@ import static org.apache.beam.sdk.io.aws.s3.S3TestUtils.buildMockedS3FileSystem; import static org.apache.beam.sdk.io.aws.s3.S3TestUtils.getSSECustomerKeyMd5; import static org.apache.beam.sdk.io.aws.s3.S3TestUtils.s3Options; import static org.apache.beam.sdk.io.aws.s3.S3TestUtils.s3OptionsWithSSECustomerKey; +import static org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions.builder; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.anyObject; @@ -35,6 +37,12 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import akka.http.scaladsl.Http; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.AnonymousAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; import com.amazonaws.services.s3.model.CopyObjectRequest; @@ -50,12 +58,18 @@ import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.collect.ImmutableList; +import io.findify.s3mock.S3Mock; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.io.aws.options.S3Options; import org.apache.beam.sdk.io.fs.MatchResult; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -64,6 +78,29 @@ import org.mockito.ArgumentMatcher; /** Test case for {@link S3FileSystem}. */ @RunWith(JUnit4.class) public class S3FileSystemTest { + private static S3Mock api; + private static AmazonS3 client; + + @BeforeClass + public static void beforeClass() { + api = new S3Mock.Builder().withInMemoryBackend().build(); + Http.ServerBinding binding = api.start(); + + EndpointConfiguration endpoint = + new EndpointConfiguration( + "http://localhost:" + binding.localAddress().getPort(), "us-west-2"); + client = + AmazonS3ClientBuilder.standard() + .withPathStyleAccessEnabled(true) + .withEndpointConfiguration(endpoint) + .withCredentials(new AWSStaticCredentialsProvider(new AnonymousAWSCredentials())) + .build(); + } + + @AfterClass + public static void afterClass() { + api.stop(); + } @Test public void testGlobTranslation() { @@ -638,6 +675,34 @@ public class S3FileSystemTest { true))); } + @Test + public void testWriteAndRead() throws IOException { + S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Options(), client); + + client.createBucket("testbucket"); + + byte[] writtenArray = new byte[] {0}; + ByteBuffer bb = ByteBuffer.allocate(writtenArray.length); + bb.put(writtenArray); + + //First create an object and write data to it + S3ResourceId path = S3ResourceId.fromUri("s3://testbucket/foo/bar.txt"); + WritableByteChannel writableByteChannel = + s3FileSystem.create(path, builder().setMimeType("application/text").build()); + writableByteChannel.write(bb); + writableByteChannel.close(); + + //Now read the same object + ByteBuffer bb2 = ByteBuffer.allocate(writtenArray.length); + ReadableByteChannel open = s3FileSystem.open(path); + open.read(bb2); + + //And compare the content with the one that was written + byte[] readArray = bb2.array(); + assertArrayEquals(readArray, writtenArray); + open.close(); + } + /** A mockito argument matcher to implement equality on GetObjectMetadataRequest. */ private static class GetObjectMetadataRequestMatcher extends ArgumentMatcher<GetObjectMetadataRequest> { diff --git a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3TestUtils.java b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3TestUtils.java index 52fc1af..fdd6733 100644 --- a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3TestUtils.java +++ b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3TestUtils.java @@ -66,8 +66,12 @@ class S3TestUtils { } static S3FileSystem buildMockedS3FileSystem(S3Options options) { + return buildMockedS3FileSystem(options, Mockito.mock(AmazonS3.class)); + } + + static S3FileSystem buildMockedS3FileSystem(S3Options options, AmazonS3 client) { S3FileSystem s3FileSystem = new S3FileSystem(options); - s3FileSystem.setAmazonS3Client(Mockito.mock(AmazonS3.class)); + s3FileSystem.setAmazonS3Client(client); return s3FileSystem; }