HADOOP-13282. S3 blob etags to be made visible in S3A status/getFileChecksum() calls. Contributed by Steve Loughran
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c8ff0cc3 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c8ff0cc3 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c8ff0cc3 Branch: refs/heads/YARN-1011 Commit: c8ff0cc304f07bf793192291e0611b2fb4bcc4e3 Parents: ef450df Author: Steve Loughran <ste...@apache.org> Authored: Thu Dec 21 14:58:58 2017 +0000 Committer: Steve Loughran <ste...@apache.org> Committed: Thu Dec 21 14:58:58 2017 +0000 ---------------------------------------------------------------------- .../apache/hadoop/fs/store/EtagChecksum.java | 90 +++++++++++++ .../apache/hadoop/fs/store/package-info.java | 28 ++++ .../hadoop/fs/store/TestEtagChecksum.java | 85 ++++++++++++ .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 40 ++++++ .../hadoop/fs/s3a/ITestS3AMiscOperations.java | 133 ++++++++++++++++--- 5 files changed, 359 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8ff0cc3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/EtagChecksum.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/EtagChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/EtagChecksum.java new file mode 100644 index 0000000..cc29f1b --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/EtagChecksum.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.store; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import org.apache.hadoop.fs.FileChecksum; + +/** + * An etag as a checksum. + * Consider these suitable for checking if an object has changed, but + * not suitable for comparing two different objects for equivalence, + * especially between object stores. + */ +public class EtagChecksum extends FileChecksum { + + /** The algorithm name: {@value}. */ + private static final String ETAG = "etag"; + + /** + * Etag string. + */ + private String eTag = ""; + + /** + * Create with an empty etag. + */ + public EtagChecksum() { + } + + /** + * Create with a string etag. + * @param eTag etag + */ + public EtagChecksum(String eTag) { + this.eTag = eTag; + } + + @Override + public String getAlgorithmName() { + return ETAG; + } + + @Override + public int getLength() { + return eTag.getBytes(StandardCharsets.UTF_8).length; + } + + @Override + public byte[] getBytes() { + return eTag != null + ? eTag.getBytes(StandardCharsets.UTF_8) + : new byte[0]; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(eTag != null ? eTag : ""); + } + + @Override + public void readFields(DataInput in) throws IOException { + eTag = in.readUTF(); + } + + @Override + public String toString() { + return "etag: \"" + eTag + '"'; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8ff0cc3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/package-info.java new file mode 100644 index 0000000..ebe1db4 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package is for classes to be shared across object stores; for internal + * use within the hadoop-* modules only. No stability guarantees. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.fs.store; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8ff0cc3/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestEtagChecksum.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestEtagChecksum.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestEtagChecksum.java new file mode 100644 index 0000000..ef9613f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestEtagChecksum.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.store; + +import java.io.IOException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; + +/** + * Unit test of etag operations. + */ +public class TestEtagChecksum extends Assert { + + private final EtagChecksum empty1 = tag(""); + private final EtagChecksum empty2 = tag(""); + private final EtagChecksum valid1 = tag("valid"); + private final EtagChecksum valid2 = tag("valid"); + + @Test + public void testEmptyTagsEqual() { + assertEquals(empty1, empty2); + } + + @Test + public void testEmptyTagRoundTrip() throws Throwable { + assertEquals(empty1, roundTrip(empty1)); + } + + @Test + public void testValidTagsEqual() { + assertEquals(valid1, valid2); + } + + @Test + public void testValidTagRoundTrip() throws Throwable { + assertEquals(valid1, roundTrip(valid1)); + } + + @Test + public void testValidAndEmptyTagsDontMatch() { + assertNotEquals(valid1, empty1); + assertNotEquals(valid1, tag("other valid one")); + } + + @Test + public void testDifferentTagsDontMatch() { + assertNotEquals(valid1, tag("other valid one")); + } + + private EtagChecksum tag(String t) { + return new EtagChecksum(t); + } + + private EtagChecksum roundTrip(EtagChecksum tag) throws IOException { + try (DataOutputBuffer dob = new DataOutputBuffer(); + DataInputBuffer dib = new DataInputBuffer()) { + tag.write(dob); + dib.reset(dob.getData(), dob.getLength()); + EtagChecksum t2 = new EtagChecksum(); + t2.readFields(dib); + return t2; + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8ff0cc3/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index f461c9e..a8147ed 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -112,6 +112,7 @@ import org.apache.hadoop.fs.s3a.s3guard.PathMetadata; import org.apache.hadoop.fs.s3a.s3guard.S3Guard; import org.apache.hadoop.fs.s3native.S3xLoginHelper; import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.fs.store.EtagChecksum; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.BlockingThreadPoolExecutorService; import org.apache.hadoop.util.Progressable; @@ -539,6 +540,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { } /** + * Get the encryption algorithm of this endpoint. + * @return the encryption algorithm. + */ + public S3AEncryptionMethods getServerSideEncryptionAlgorithm() { + return serverSideEncryptionAlgorithm; + } + + /** * Demand create the directory allocator, then create a temporary file. * {@link LocalDirAllocator#createTmpFileForWrite(String, long, Configuration)}. * @param pathStr prefix for the temporary file @@ -1069,6 +1078,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { * @throws IOException IO and object access problems. */ @VisibleForTesting + @Retries.RetryRaw public ObjectMetadata getObjectMetadata(Path path) throws IOException { return getObjectMetadata(pathToKey(path)); } @@ -2935,6 +2945,36 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { } /** + * Get the etag of a object at the path via HEAD request and return it + * as a checksum object. This has the whatever guarantees about equivalence + * the S3 implementation offers. + * <ol> + * <li>If a tag has not changed, consider the object unchanged.</li> + * <li>Two tags being different does not imply the data is different.</li> + * </ol> + * Different S3 implementations may offer different guarantees. + * @param f The file path + * @param length The length of the file range for checksum calculation + * @return The EtagChecksum or null if checksums are not supported. + * @throws IOException IO failure + * @see <a href="http://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonResponseHeaders.html">Common Response Headers</a> + */ + + public EtagChecksum getFileChecksum(Path f, final long length) + throws IOException { + Preconditions.checkArgument(length >= 0); + Path path = qualify(f); + LOG.debug("getFileChecksum({})", path); + return once("getFileChecksum", path.toString(), + () -> { + // this always does a full HEAD to the object + ObjectMetadata headers = getObjectMetadata(path); + String eTag = headers.getETag(); + return eTag != null ? new EtagChecksum(eTag) : null; + }); + } + + /** * {@inheritDoc}. * * This implementation is optimized for S3, which can do a bulk listing http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8ff0cc3/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java index 869d64c..ddf2529 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java @@ -18,21 +18,24 @@ package org.apache.hadoop.fs.s3a; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.contract.ContractTestUtils; -import org.apache.hadoop.test.LambdaTestUtils; +import java.io.ByteArrayInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.model.PutObjectResult; +import org.junit.Assume; import org.junit.Test; -import java.io.ByteArrayInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.concurrent.Callable; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.store.EtagChecksum; +import org.apache.hadoop.test.LambdaTestUtils; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; +import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; /** * Tests of the S3A FileSystem which don't have a specific home and can share @@ -40,6 +43,8 @@ import java.util.concurrent.Callable; */ public class ITestS3AMiscOperations extends AbstractS3ATestBase { + private static final byte[] HELLO = "hello".getBytes(StandardCharsets.UTF_8); + @Test public void testCreateNonRecursiveSuccess() throws IOException { Path shouldWork = path("nonrecursivenode"); @@ -58,7 +63,7 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase { @Test(expected = FileAlreadyExistsException.class) public void testCreateNonRecursiveParentIsFile() throws IOException { Path parent = path("/file.txt"); - ContractTestUtils.touch(getFileSystem(), parent); + touch(getFileSystem(), parent); createNonRecursive(new Path(parent, "fail")); } @@ -73,12 +78,7 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase { new ByteArrayInputStream("PUT".getBytes()), metadata); LambdaTestUtils.intercept(IllegalStateException.class, - new Callable<PutObjectResult>() { - @Override - public PutObjectResult call() throws Exception { - return fs.putObjectDirect(put); - } - }); + () -> fs.putObjectDirect(put)); assertPathDoesNotExist("put object was created", path); } @@ -87,4 +87,103 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase { (short) 3, (short) 4096, null); } + + /** + * Touch a path, return the full path. + * @param name relative name + * @return the path + * @throws IOException IO failure + */ + Path touchFile(String name) throws IOException { + Path path = path(name); + touch(getFileSystem(), path); + return path; + } + + /** + * Create a file with the data, return the path. + * @param name relative name + * @param data data to write + * @return the path + * @throws IOException IO failure + */ + Path mkFile(String name, byte[] data) throws IOException { + final Path f = path(name); + createFile(getFileSystem(), f, true, data); + return f; + } + + /** + * The assumption here is that 0-byte files uploaded in a single PUT + * always have the same checksum, including stores with encryption. + * @throws Throwable on a failure + */ + @Test + public void testEmptyFileChecksums() throws Throwable { + final S3AFileSystem fs = getFileSystem(); + Path file1 = touchFile("file1"); + EtagChecksum checksum1 = fs.getFileChecksum(file1, 0); + LOG.info("Checksum for {}: {}", file1, checksum1); + assertNotNull("file 1 checksum", checksum1); + assertNotEquals("file 1 checksum", 0, checksum1.getLength()); + assertEquals("checksums", checksum1, + fs.getFileChecksum(touchFile("file2"), 0)); + } + + /** + * Verify that different file contents have different + * checksums, and that that they aren't the same as the empty file. + * @throws Throwable failure + */ + @Test + public void testNonEmptyFileChecksums() throws Throwable { + final S3AFileSystem fs = getFileSystem(); + final Path file3 = mkFile("file3", HELLO); + final EtagChecksum checksum1 = fs.getFileChecksum(file3, 0); + assertNotNull("file 3 checksum", checksum1); + final Path file4 = touchFile("file4"); + final EtagChecksum checksum2 = fs.getFileChecksum(file4, 0); + assertNotEquals("checksums", checksum1, checksum2); + // overwrite + createFile(fs, file4, true, + "hello, world".getBytes(StandardCharsets.UTF_8)); + assertNotEquals(checksum2, fs.getFileChecksum(file4, 0)); + } + + /** + * Verify that on an unencrypted store, the checksum of two non-empty + * (single PUT) files is the same if the data is the same. + * This will fail if the bucket has S3 default encryption enabled. + * @throws Throwable failure + */ + @Test + public void testNonEmptyFileChecksumsUnencrypted() throws Throwable { + Assume.assumeTrue(encryptionAlgorithm().equals(S3AEncryptionMethods.NONE)); + final S3AFileSystem fs = getFileSystem(); + final EtagChecksum checksum1 = + fs.getFileChecksum(mkFile("file5", HELLO), 0); + assertNotNull("file 3 checksum", checksum1); + assertEquals("checksums", checksum1, + fs.getFileChecksum(mkFile("file6", HELLO), 0)); + } + + private S3AEncryptionMethods encryptionAlgorithm() { + return getFileSystem().getServerSideEncryptionAlgorithm(); + } + + @Test + public void testNegativeLength() throws Throwable { + LambdaTestUtils.intercept(IllegalArgumentException.class, + () -> getFileSystem().getFileChecksum(mkFile("negative", HELLO), -1)); + } + + @Test + public void testLengthPastEOF() throws Throwable { + final S3AFileSystem fs = getFileSystem(); + Path f = mkFile("file5", HELLO); + assertEquals( + fs.getFileChecksum(f, HELLO.length), + fs.getFileChecksum(f, HELLO.length * 2)); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org