bgaborg commented on a change in pull request #1208: HADOOP-16423. S3Guard 
fsck: Check metadata consistency between S3 and metadatastore (log)
URL: https://github.com/apache/hadoop/pull/1208#discussion_r322451755
 
 

 ##########
 File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardFsck.java
 ##########
 @@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.InvalidParameterException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+
+/**
+ * Main class for the FSCK factored out from S3GuardTool
+ * The implementation uses fixed DynamoDBMetadataStore as the backing store
+ * for metadata.
+ *
+ * Functions:
+ * <ul>
+ *   <li>Checking metadata consistency between S3 and metadatastore</li>
+ * </ul>
+ */
+public class S3GuardFsck {
+  private static final Logger LOG = LoggerFactory.getLogger(S3GuardFsck.class);
+  public static final String ROOT_PATH_STRING = "/";
+
+  private S3AFileSystem rawFS;
+  private DynamoDBMetadataStore metadataStore;
+
+  /**
+   * Creates an S3GuardFsck.
+   * @param fs the filesystem to compare to
+   * @param ms metadatastore the metadatastore to compare with (dynamo)
+   */
+  S3GuardFsck(S3AFileSystem fs, MetadataStore ms)
+      throws InvalidParameterException {
+    this.rawFS = fs;
+
+    if (ms == null) {
+      throw new InvalidParameterException("S3AFileSystem should be guarded by"
+          + " a " + DynamoDBMetadataStore.class.getCanonicalName());
+    }
+    this.metadataStore = (DynamoDBMetadataStore) ms;
+
+    if (rawFS.hasMetadataStore()) {
+      throw new InvalidParameterException("Raw fs should not have a "
+          + "metadatastore.");
+    }
+  }
+
+  /**
+   * Compares S3 to MS.
+   * Iterative breadth first walk on the S3 structure from a given root.
+   * Creates a list of pairs (metadata in S3 and in the MetadataStore) where
+   * the consistency or any rule is violated.
+   * Uses {@link S3GuardFsckViolationHandler} to handle violations.
+   * The violations are listed in Enums: {@link Violation}
+   *
+   * @param p the root path to start the traversal
+   * @throws IOException
+   * @return
+   */
+  public List<ComparePair> compareS3RootToMs(Path p) throws IOException {
+    final Path rootPath = rawFS.qualify(p);
+    final S3AFileStatus root =
+        (S3AFileStatus) rawFS.getFileStatus(rootPath);
+    final List<ComparePair> comparePairs = new ArrayList<>();
+    final Queue<S3AFileStatus> queue = new ArrayDeque<>();
+    queue.add(root);
+
+    while (!queue.isEmpty()) {
+      // pop front node from the queue
+      final S3AFileStatus currentDir = queue.poll();
+
+      // Get a listing of that dir from s3 and add just the files.
+      // (Each directory will be added as a root.)
+      // Files should be casted to S3AFileStatus instead of plain FileStatus
+      // to get the VersionID and Etag.
+      final Path currentDirPath = currentDir.getPath();
+
+      final FileStatus[] s3DirListing = rawFS.listStatus(currentDirPath);
+      final List<S3AFileStatus> children =
+          Arrays.asList(s3DirListing).stream()
+              .filter(status -> !status.isDirectory())
+              .map(S3AFileStatus.class::cast).collect(toList());
+
+      // Compare the directory contents if the listing is authoritative
+      final DirListingMetadata msDirListing =
+          metadataStore.listChildren(currentDirPath);
+      if (msDirListing != null && msDirListing.isAuthoritative()) {
+        final ComparePair cP =
+            compareAuthDirListing(s3DirListing, msDirListing);
+        if (cP.containsViolation()) {
+          comparePairs.add(cP);
+        }
+      }
+
+      // Compare directory and contents, but not the listing
+      final List<ComparePair> compareResult =
+          compareS3DirToMs(currentDir, children).stream()
+              .filter(comparePair -> comparePair.containsViolation())
+              .collect(toList());
+      comparePairs.addAll(compareResult);
+
+      // Add each dir to queue
+      children.stream().filter(pm -> pm.isDirectory())
+          .forEach(pm -> queue.add(pm));
+    }
+
+    // Create a handler and handle each violated pairs
+    S3GuardFsckViolationHandler handler =
+        new S3GuardFsckViolationHandler(rawFS, metadataStore);
+    comparePairs.forEach(handler::handle);
+
+    return comparePairs;
+  }
+
+  private ComparePair compareAuthDirListing(FileStatus[] s3DirListing,
+      DirListingMetadata msDirListing) {
+    ComparePair cP = new ComparePair(s3DirListing, msDirListing);
+
+    if (!msDirListing.isAuthoritative()) {
+      return cP;
+    }
+
+    if (s3DirListing.length != msDirListing.numEntries()) {
+      cP.violations.add(Violation.AUTHORITATIVE_DIRECTORY_CONTENT_MISMATCH);
+    } else {
+      final Set<Path> msPaths = msDirListing.getListing().stream()
+              .map(pm -> pm.getFileStatus().getPath()).collect(toSet());
+      final Set<Path> s3Paths = Arrays.stream(s3DirListing)
+              .map(pm -> pm.getPath()).collect(toSet());
+      if (!s3Paths.equals(msPaths)) {
+        cP.violations.add(Violation.AUTHORITATIVE_DIRECTORY_CONTENT_MISMATCH);
+      }
+    }
+
+    return cP;
+  }
+
+  protected List<ComparePair> compareS3DirToMs(S3AFileStatus s3CurrentDir,
+      List<S3AFileStatus> children) throws IOException {
+    final Path path = s3CurrentDir.getPath();
+    final PathMetadata pathMetadata = metadataStore.get(path);
+    List<ComparePair> violationComparePairs = new ArrayList<>();
+
+    final ComparePair rootComparePair =
+        compareFileStatusToPathMetadata(s3CurrentDir, pathMetadata);
+    if (rootComparePair.containsViolation()) {
+      violationComparePairs.add(rootComparePair);
+    }
+
+    children.forEach(s3ChildMeta -> {
+      try {
+        final PathMetadata msChildMeta =
+            metadataStore.get(s3ChildMeta.getPath());
+        final ComparePair comparePair =
+            compareFileStatusToPathMetadata(s3ChildMeta, msChildMeta);
+        if (comparePair.containsViolation()) {
+          violationComparePairs.add(comparePair);
+        }
+      } catch (Exception e) {
+        e.printStackTrace();
 
 Review comment:
   wow. sorry.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to