HADOOP-13282. S3 blob etags to be made visible in S3A status/getFileChecksum() 
calls.
Contributed by Steve Loughran


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c8ff0cc3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c8ff0cc3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c8ff0cc3

Branch: refs/heads/YARN-1011
Commit: c8ff0cc304f07bf793192291e0611b2fb4bcc4e3
Parents: ef450df
Author: Steve Loughran <ste...@apache.org>
Authored: Thu Dec 21 14:58:58 2017 +0000
Committer: Steve Loughran <ste...@apache.org>
Committed: Thu Dec 21 14:58:58 2017 +0000

----------------------------------------------------------------------
 .../apache/hadoop/fs/store/EtagChecksum.java    |  90 +++++++++++++
 .../apache/hadoop/fs/store/package-info.java    |  28 ++++
 .../hadoop/fs/store/TestEtagChecksum.java       |  85 ++++++++++++
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java |  40 ++++++
 .../hadoop/fs/s3a/ITestS3AMiscOperations.java   | 133 ++++++++++++++++---
 5 files changed, 359 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8ff0cc3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/EtagChecksum.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/EtagChecksum.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/EtagChecksum.java
new file mode 100644
index 0000000..cc29f1b
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/EtagChecksum.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.store;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hadoop.fs.FileChecksum;
+
+/**
+ * An etag as a checksum.
+ * Consider these suitable for checking if an object has changed, but
+ * not suitable for comparing two different objects for equivalence,
+ * especially between object stores.
+ */
+public class EtagChecksum extends FileChecksum {
+
+  /** The algorithm name: {@value}. */
+  private static final String ETAG = "etag";
+
+  /**
+   * Etag string.
+   */
+  private String eTag = "";
+
+  /**
+   * Create with an empty etag.
+   */
+  public EtagChecksum() {
+  }
+
+  /**
+   * Create with a string etag.
+   * @param eTag etag
+   */
+  public EtagChecksum(String eTag) {
+    this.eTag = eTag;
+  }
+
+  @Override
+  public String getAlgorithmName() {
+    return ETAG;
+  }
+
+  @Override
+  public int getLength() {
+    return eTag.getBytes(StandardCharsets.UTF_8).length;
+  }
+
+  @Override
+  public byte[] getBytes() {
+    return eTag != null
+        ? eTag.getBytes(StandardCharsets.UTF_8)
+        : new byte[0];
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(eTag != null ? eTag : "");
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    eTag = in.readUTF();
+  }
+
+  @Override
+  public String toString() {
+    return "etag: \"" + eTag  + '"';
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8ff0cc3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/package-info.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/package-info.java
new file mode 100644
index 0000000..ebe1db4
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package is for classes to be shared across object stores; for internal
+ * use within the hadoop-* modules only. No stability guarantees.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.fs.store;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8ff0cc3/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestEtagChecksum.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestEtagChecksum.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestEtagChecksum.java
new file mode 100644
index 0000000..ef9613f
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestEtagChecksum.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.store;
+
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+
+/**
+ * Unit test of etag operations.
+ */
+public class TestEtagChecksum extends Assert {
+
+  private final EtagChecksum empty1 = tag("");
+  private final EtagChecksum empty2 = tag("");
+  private final EtagChecksum valid1 = tag("valid");
+  private final EtagChecksum valid2 = tag("valid");
+
+  @Test
+  public void testEmptyTagsEqual() {
+    assertEquals(empty1, empty2);
+  }
+
+  @Test
+  public void testEmptyTagRoundTrip() throws Throwable {
+    assertEquals(empty1, roundTrip(empty1));
+  }
+
+  @Test
+  public void testValidTagsEqual() {
+    assertEquals(valid1, valid2);
+  }
+
+  @Test
+  public void testValidTagRoundTrip() throws Throwable {
+    assertEquals(valid1, roundTrip(valid1));
+  }
+
+  @Test
+  public void testValidAndEmptyTagsDontMatch() {
+    assertNotEquals(valid1, empty1);
+    assertNotEquals(valid1, tag("other valid one"));
+  }
+
+  @Test
+  public void testDifferentTagsDontMatch() {
+    assertNotEquals(valid1, tag("other valid one"));
+  }
+
+  private EtagChecksum tag(String t) {
+    return new EtagChecksum(t);
+  }
+
+  private EtagChecksum roundTrip(EtagChecksum tag) throws IOException {
+    try (DataOutputBuffer dob = new DataOutputBuffer();
+         DataInputBuffer dib = new DataInputBuffer()) {
+      tag.write(dob);
+      dib.reset(dob.getData(), dob.getLength());
+      EtagChecksum t2 = new EtagChecksum();
+      t2.readFields(dib);
+      return t2;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8ff0cc3/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index f461c9e..a8147ed 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -112,6 +112,7 @@ import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
 import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
 import org.apache.hadoop.fs.s3native.S3xLoginHelper;
 import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.fs.store.EtagChecksum;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
 import org.apache.hadoop.util.Progressable;
@@ -539,6 +540,14 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities {
   }
 
   /**
+   * Get the encryption algorithm of this endpoint.
+   * @return the encryption algorithm.
+   */
+  public S3AEncryptionMethods getServerSideEncryptionAlgorithm() {
+    return serverSideEncryptionAlgorithm;
+  }
+
+  /**
    * Demand create the directory allocator, then create a temporary file.
    * {@link LocalDirAllocator#createTmpFileForWrite(String, long, 
Configuration)}.
    *  @param pathStr prefix for the temporary file
@@ -1069,6 +1078,7 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities {
    * @throws IOException IO and object access problems.
    */
   @VisibleForTesting
+  @Retries.RetryRaw
   public ObjectMetadata getObjectMetadata(Path path) throws IOException {
     return getObjectMetadata(pathToKey(path));
   }
@@ -2935,6 +2945,36 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities {
   }
 
   /**
+   * Get the etag of a object at the path via HEAD request and return it
+   * as a checksum object. This has the whatever guarantees about equivalence
+   * the S3 implementation offers.
+   * <ol>
+   *   <li>If a tag has not changed, consider the object unchanged.</li>
+   *   <li>Two tags being different does not imply the data is different.</li>
+   * </ol>
+   * Different S3 implementations may offer different guarantees.
+   * @param f The file path
+   * @param length The length of the file range for checksum calculation
+   * @return The EtagChecksum or null if checksums are not supported.
+   * @throws IOException IO failure
+   * @see <a 
href="http://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonResponseHeaders.html";>Common
 Response Headers</a>
+   */
+
+  public EtagChecksum getFileChecksum(Path f, final long length)
+      throws IOException {
+    Preconditions.checkArgument(length >= 0);
+    Path path = qualify(f);
+    LOG.debug("getFileChecksum({})", path);
+    return once("getFileChecksum", path.toString(),
+        () -> {
+          // this always does a full HEAD to the object
+          ObjectMetadata headers = getObjectMetadata(path);
+          String eTag = headers.getETag();
+          return eTag != null ? new EtagChecksum(eTag) : null;
+        });
+  }
+
+  /**
    * {@inheritDoc}.
    *
    * This implementation is optimized for S3, which can do a bulk listing

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c8ff0cc3/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
index 869d64c..ddf2529 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
@@ -18,21 +18,24 @@
 
 package org.apache.hadoop.fs.s3a;
 
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.contract.ContractTestUtils;
-import org.apache.hadoop.test.LambdaTestUtils;
+import java.io.ByteArrayInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.PutObjectResult;
+import org.junit.Assume;
 import org.junit.Test;
 
-import java.io.ByteArrayInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.concurrent.Callable;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.store.EtagChecksum;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
 
 /**
  * Tests of the S3A FileSystem which don't have a specific home and can share
@@ -40,6 +43,8 @@ import java.util.concurrent.Callable;
  */
 public class ITestS3AMiscOperations extends AbstractS3ATestBase {
 
+  private static final byte[] HELLO = "hello".getBytes(StandardCharsets.UTF_8);
+
   @Test
   public void testCreateNonRecursiveSuccess() throws IOException {
     Path shouldWork = path("nonrecursivenode");
@@ -58,7 +63,7 @@ public class ITestS3AMiscOperations extends 
AbstractS3ATestBase {
   @Test(expected = FileAlreadyExistsException.class)
   public void testCreateNonRecursiveParentIsFile() throws IOException {
     Path parent = path("/file.txt");
-    ContractTestUtils.touch(getFileSystem(), parent);
+    touch(getFileSystem(), parent);
     createNonRecursive(new Path(parent, "fail"));
   }
 
@@ -73,12 +78,7 @@ public class ITestS3AMiscOperations extends 
AbstractS3ATestBase {
         new ByteArrayInputStream("PUT".getBytes()),
         metadata);
     LambdaTestUtils.intercept(IllegalStateException.class,
-        new Callable<PutObjectResult>() {
-          @Override
-          public PutObjectResult call() throws Exception {
-            return fs.putObjectDirect(put);
-          }
-        });
+        () -> fs.putObjectDirect(put));
     assertPathDoesNotExist("put object was created", path);
   }
 
@@ -87,4 +87,103 @@ public class ITestS3AMiscOperations extends 
AbstractS3ATestBase {
         (short) 3, (short) 4096,
         null);
   }
+
+  /**
+   * Touch a path, return the full path.
+   * @param name relative name
+   * @return the path
+   * @throws IOException IO failure
+   */
+  Path touchFile(String name) throws IOException {
+    Path path = path(name);
+    touch(getFileSystem(), path);
+    return path;
+  }
+
+  /**
+   * Create a file with the data, return the path.
+   * @param name relative name
+   * @param data data to write
+   * @return the path
+   * @throws IOException IO failure
+   */
+  Path mkFile(String name, byte[] data) throws IOException {
+    final Path f = path(name);
+    createFile(getFileSystem(), f, true, data);
+    return f;
+  }
+
+  /**
+   * The assumption here is that 0-byte files uploaded in a single PUT
+   * always have the same checksum, including stores with encryption.
+   * @throws Throwable on a failure
+   */
+  @Test
+  public void testEmptyFileChecksums() throws Throwable {
+    final S3AFileSystem fs = getFileSystem();
+    Path file1 = touchFile("file1");
+    EtagChecksum checksum1 = fs.getFileChecksum(file1, 0);
+    LOG.info("Checksum for {}: {}", file1, checksum1);
+    assertNotNull("file 1 checksum", checksum1);
+    assertNotEquals("file 1 checksum", 0, checksum1.getLength());
+    assertEquals("checksums", checksum1,
+        fs.getFileChecksum(touchFile("file2"), 0));
+  }
+
+  /**
+   * Verify that different file contents have different
+   * checksums, and that that they aren't the same as the empty file.
+   * @throws Throwable failure
+   */
+  @Test
+  public void testNonEmptyFileChecksums() throws Throwable {
+    final S3AFileSystem fs = getFileSystem();
+    final Path file3 = mkFile("file3", HELLO);
+    final EtagChecksum checksum1 = fs.getFileChecksum(file3, 0);
+    assertNotNull("file 3 checksum", checksum1);
+    final Path file4 = touchFile("file4");
+    final EtagChecksum checksum2 = fs.getFileChecksum(file4, 0);
+    assertNotEquals("checksums", checksum1, checksum2);
+    // overwrite
+    createFile(fs, file4, true,
+        "hello, world".getBytes(StandardCharsets.UTF_8));
+    assertNotEquals(checksum2, fs.getFileChecksum(file4, 0));
+  }
+
+  /**
+   * Verify that on an unencrypted store, the checksum of two non-empty
+   * (single PUT) files is the same if the data is the same.
+   * This will fail if the bucket has S3 default encryption enabled.
+   * @throws Throwable failure
+   */
+  @Test
+  public void testNonEmptyFileChecksumsUnencrypted() throws Throwable {
+    Assume.assumeTrue(encryptionAlgorithm().equals(S3AEncryptionMethods.NONE));
+    final S3AFileSystem fs = getFileSystem();
+    final EtagChecksum checksum1 =
+        fs.getFileChecksum(mkFile("file5", HELLO), 0);
+    assertNotNull("file 3 checksum", checksum1);
+    assertEquals("checksums", checksum1,
+        fs.getFileChecksum(mkFile("file6", HELLO), 0));
+  }
+
+  private S3AEncryptionMethods encryptionAlgorithm() {
+    return getFileSystem().getServerSideEncryptionAlgorithm();
+  }
+
+  @Test
+  public void testNegativeLength() throws Throwable {
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        () -> getFileSystem().getFileChecksum(mkFile("negative", HELLO), -1));
+  }
+
+  @Test
+  public void testLengthPastEOF() throws Throwable {
+    final S3AFileSystem fs = getFileSystem();
+    Path f = mkFile("file5", HELLO);
+    assertEquals(
+        fs.getFileChecksum(f, HELLO.length),
+        fs.getFileChecksum(f, HELLO.length * 2));
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to