Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21618#discussion_r216588353
  
    --- Diff: core/src/main/java/org/apache/hadoop/fs/SparkGlobber.java ---
    @@ -0,0 +1,293 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.fs;
    +
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.commons.logging.Log;
    +
    +/**
    + * This is based on hadoop-common-2.7.2
    + * {@link org.apache.hadoop.fs.Globber}.
    + * This class exposes globWithThreshold which can be used glob path in 
parallel.
    + */
    +public class SparkGlobber {
    +  public static final Log LOG = 
LogFactory.getLog(SparkGlobber.class.getName());
    +
    +  private final FileSystem fs;
    +  private final FileContext fc;
    +  private final Path pathPattern;
    +
    +  public SparkGlobber(FileSystem fs, Path pathPattern) {
    +    this.fs = fs;
    +    this.fc = null;
    +    this.pathPattern = pathPattern;
    +  }
    +
    +  public SparkGlobber(FileContext fc, Path pathPattern) {
    +    this.fs = null;
    +    this.fc = fc;
    +    this.pathPattern = pathPattern;
    +  }
    +
    +  private FileStatus getFileStatus(Path path) throws IOException {
    +    try {
    +      if (fs != null) {
    +        return fs.getFileStatus(path);
    +      } else {
    +        return fc.getFileStatus(path);
    +      }
    +    } catch (FileNotFoundException e) {
    +      return null;
    +    }
    +  }
    +
    +  private FileStatus[] listStatus(Path path) throws IOException {
    +    try {
    +      if (fs != null) {
    +        return fs.listStatus(path);
    +      } else {
    +        return fc.util().listStatus(path);
    +      }
    +    } catch (FileNotFoundException e) {
    +      return new FileStatus[0];
    +    }
    +  }
    +
    +  private Path fixRelativePart(Path path) {
    +    if (fs != null) {
    +      return fs.fixRelativePart(path);
    +    } else {
    +      return fc.fixRelativePart(path);
    +    }
    +  }
    +
    +  /**
    +   * Convert a path component that contains backslash ecape sequences to a
    +   * literal string.  This is necessary when you want to explicitly refer 
to a
    +   * path that contains globber metacharacters.
    +   */
    +  private static String unescapePathComponent(String name) {
    +    return name.replaceAll("\\\\(.)", "$1");
    +  }
    +
    +  /**
    +   * Translate an absolute path into a list of path components.
    +   * We merge double slashes into a single slash here.
    +   * POSIX root path, i.e. '/', does not get an entry in the list.
    +   */
    +  private static List<String> getPathComponents(String path)
    +      throws IOException {
    +    ArrayList<String> ret = new ArrayList<String>();
    +    for (String component : path.split(Path.SEPARATOR)) {
    +      if (!component.isEmpty()) {
    +        ret.add(component);
    +      }
    +    }
    +    return ret;
    +  }
    +
    +  private String schemeFromPath(Path path) throws IOException {
    +    String scheme = path.toUri().getScheme();
    +    if (scheme == null) {
    +      if (fs != null) {
    +        scheme = fs.getUri().getScheme();
    +      } else {
    +        scheme = 
fc.getFSofPath(fc.fixRelativePart(path)).getUri().getScheme();
    +      }
    +    }
    +    return scheme;
    +  }
    +
    +  private String authorityFromPath(Path path) throws IOException {
    +    String authority = path.toUri().getAuthority();
    +    if (authority == null) {
    +      if (fs != null) {
    +        authority = fs.getUri().getAuthority();
    +      } else {
    +        authority = 
fc.getFSofPath(fc.fixRelativePart(path)).getUri().getAuthority();
    +      }
    +    }
    +    return authority ;
    +  }
    +
    +  public FileStatus[] globWithThreshold(int threshold) throws IOException {
    +    // First we get the scheme and authority of the pattern that was passed
    +    // in.
    +    String scheme = schemeFromPath(pathPattern);
    +    String authority = authorityFromPath(pathPattern);
    +
    +    // Next we strip off everything except the pathname itself, and expand 
all
    +    // globs.  Expansion is a process which turns "grouping" clauses,
    +    // expressed as brackets, into separate path patterns.
    +    String pathPatternString = pathPattern.toUri().getPath();
    +    List<String> flattenedPatterns = 
GlobExpander.expand(pathPatternString);
    +
    +    // Now loop over all flattened patterns.  In every case, we'll be 
trying to
    +    // match them to entries in the filesystem.
    +    ArrayList<FileStatus> results =
    +        new ArrayList<FileStatus>(flattenedPatterns.size());
    +    boolean sawWildcard = false;
    +    for (String flatPattern : flattenedPatterns) {
    +      // Get the absolute path for this flattened pattern.  We couldn't do
    +      // this prior to flattening because of patterns like {/,a}, where 
which
    +      // path you go down influences how the path must be made absolute.
    +      Path absPattern = fixRelativePart(new Path(
    +          flatPattern.isEmpty() ? Path.CUR_DIR : flatPattern));
    +      // Now we break the flattened, absolute pattern into path components.
    +      // For example, /a/*/c would be broken into the list [a, *, c]
    +      List<String> components =
    +          getPathComponents(absPattern.toUri().getPath());
    +      // Starting out at the root of the filesystem, we try to match
    +      // filesystem entries against pattern components.
    +      ArrayList<FileStatus> candidates = new ArrayList<FileStatus>(1);
    +      // To get the "real" FileStatus of root, we'd have to do an expensive
    +      // RPC to the NameNode.  So we create a placeholder FileStatus which 
has
    +      // the correct path, but defaults for the rest of the information.
    +      // Later, if it turns out we actually want the FileStatus of root, 
we'll
    +      // replace the placeholder with a real FileStatus obtained from the
    +      // NameNode.
    +      FileStatus rootPlaceholder;
    +      if (Path.WINDOWS && !components.isEmpty()
    +          && Path.isWindowsAbsolutePath(absPattern.toUri().getPath(), 
true)) {
    +        // On Windows the path could begin with a drive letter, e.g. 
/E:/foo.
    +        // We will skip matching the drive letter and start from listing 
the
    +        // root of the filesystem on that drive.
    +        String driveLetter = components.remove(0);
    +        rootPlaceholder = new FileStatus(0, true, 0, 0, 0, new Path(scheme,
    +            authority, Path.SEPARATOR + driveLetter + Path.SEPARATOR));
    +      } else {
    +        rootPlaceholder = new FileStatus(0, true, 0, 0, 0,
    +            new Path(scheme, authority, Path.SEPARATOR));
    +      }
    +      candidates.add(rootPlaceholder);
    +
    +      for (int componentIdx = 0; componentIdx < components.size();
    +         componentIdx++) {
    +        ArrayList<FileStatus> newCandidates =
    +            new ArrayList<FileStatus>(candidates.size());
    +        GlobFilter globFilter = new 
GlobFilter(components.get(componentIdx));
    +        String component = 
unescapePathComponent(components.get(componentIdx));
    +        if (globFilter.hasPattern()) {
    +          sawWildcard = true;
    +        }
    +        if (candidates.isEmpty() && sawWildcard) {
    +          // Optimization: if there are no more candidates left, stop 
examining
    +          // the path components.  We can only do this if we've already 
seen
    +          // a wildcard component-- otherwise, we still need to visit all 
path
    +          // components in case one of them is a wildcard.
    +          break;
    +        }
    +        if ((componentIdx < components.size() - 1) &&
    +            (!globFilter.hasPattern())) {
    +          // Optimization: if this is not the terminal path component, and 
we
    +          // are not matching against a glob, assume that it exists.  If it
    +          // doesn't exist, we'll find out later when resolving a later 
glob
    +          // or the terminal path component.
    +          for (FileStatus candidate : candidates) {
    +            candidate.setPath(new Path(candidate.getPath(), component));
    +          }
    +          continue;
    +        }
    +        for (FileStatus candidate : candidates) {
    +          if (globFilter.hasPattern()) {
    +            FileStatus[] children = listStatus(candidate.getPath());
    +            if (children.length == 1) {
    +              // If we get back only one result, this could be either a 
listing
    +              // of a directory with one entry, or it could reflect the 
fact
    +              // that what we listed resolved to a file.
    +              //
    +              // Unfortunately, we can't just compare the returned paths to
    +              // figure this out.  Consider the case where you have /a/b, 
where
    +              // b is a symlink to "..".  In that case, listing /a/b will 
give
    +              // back "/a/b" again.  If we just went by returned pathname, 
we'd
    +              // incorrectly conclude that /a/b was a file and should not 
match
    +              // /a/*/*.  So we use getFileStatus of the path we just 
listed to
    +              // disambiguate.
    +              if (!getFileStatus(candidate.getPath()).isDirectory()) {
    +                continue;
    +              }
    +            }
    +            for (FileStatus child : children) {
    +              if (componentIdx < components.size() - 1) {
    +                // Don't try to recurse into non-directories.  See 
HADOOP-10957.
    +                if (!child.isDirectory()) continue;
    +              }
    +              // Set the child path based on the parent path.
    +              child.setPath(new Path(candidate.getPath(),
    +                  child.getPath().getName()));
    +              if (globFilter.accept(child.getPath())) {
    +                newCandidates.add(child);
    +              }
    +            }
    +          } else {
    +            // When dealing with non-glob components, use getFileStatus
    +            // instead of listStatus.  This is an optimization, but it also
    +            // is necessary for correctness in HDFS, since there are some
    +            // special HDFS directories like .reserved and .snapshot that 
are
    +            // not visible to listStatus, but which do exist.  (See 
HADOOP-9877)
    +            FileStatus childStatus = getFileStatus(
    +                new Path(candidate.getPath(), component));
    +            if (childStatus != null) {
    +              newCandidates.add(childStatus);
    +            }
    +          }
    +        }
    +        candidates = newCandidates;
    +        // When size of candidates has reached the threshold, stop 
expanding other
    --- End diff --
    
    Is it also necessary port the UT? I think maybe enough comment and limit 
change is enough cause UT still has maintenance cost. What's your opinion?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to