Author: hairong Date: Thu Jul 29 00:14:36 2010 New Revision: 980271 URL: http://svn.apache.org/viewvc?rev=980271&view=rev Log: HADOOP-6870. Add a new API getFiles to FileSystem and FileContext. Contributed by Hairong Kuang.
Added: hadoop/common/trunk/src/java/org/apache/hadoop/fs/LocatedFileStatus.java hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestListFiles.java Modified: hadoop/common/trunk/CHANGES.txt hadoop/common/trunk/src/java/org/apache/hadoop/fs/AbstractFileSystem.java hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileContext.java hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileSystem.java hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestFilterFileSystem.java hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestFilterFs.java Modified: hadoop/common/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=980271&r1=980270&r2=980271&view=diff ============================================================================== --- hadoop/common/trunk/CHANGES.txt (original) +++ hadoop/common/trunk/CHANGES.txt Thu Jul 29 00:14:36 2010 @@ -21,6 +21,11 @@ Trunk (unreleased changes) HADOOP-6859 - Introduce additional statistics to FileSystem to track file system operations (suresh) + HADOOP-6870. Add a new API getFiles to FileSystem and FileContext that + lists all files under the input path or the subtree rooted at the + input path if recursive is true. Block locations are returned together + with each file's status. (hairong) + IMPROVEMENTS HADOOP-6644. util.Shell getGROUPS_FOR_USER_COMMAND method name Modified: hadoop/common/trunk/src/java/org/apache/hadoop/fs/AbstractFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/AbstractFileSystem.java?rev=980271&r1=980270&r2=980271&view=diff ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/fs/AbstractFileSystem.java (original) +++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/AbstractFileSystem.java Thu Jul 29 00:14:36 2010 @@ -786,6 +786,48 @@ public abstract class AbstractFileSystem /** * The specification of this method matches that of + * {...@link FileContext#listLocatedStatus(Path)} except that Path f must be for this + * file system. + */ + protected Iterator<LocatedFileStatus> listLocatedStatus(final Path f) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException { + return new Iterator<LocatedFileStatus>() { + private Iterator<FileStatus> itor = listStatusIterator(f); + + @Override + public boolean hasNext() { + return itor.hasNext(); + } + + @Override + public LocatedFileStatus next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + FileStatus result = itor.next(); + try { + + BlockLocation[] locs = null; + if (result.isFile()) { + locs = getFileBlockLocations( + result.getPath(), 0, result.getLen()); + } + return new LocatedFileStatus(result, locs); + } catch (IOException ioe) { + throw (RuntimeException)new RuntimeException().initCause(ioe); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove is not supported"); + } + }; + } + + /** + * The specification of this method matches that of * {...@link FileContext.Util#listStatus(Path)} except that Path f must be * for this file system. */ Modified: hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileContext.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileContext.java?rev=980271&r1=980270&r2=980271&view=diff ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileContext.java (original) +++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileContext.java Thu Jul 29 00:14:36 2010 @@ -27,12 +27,14 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.IdentityHashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Set; +import java.util.Stack; import java.util.TreeSet; import java.util.Map.Entry; -import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -1286,6 +1288,128 @@ public final class FileContext { } /** + * List the statuses and block locations of the files in the given path + * if the path is a directory. + * If the given path is a file, return the file's status and block locations. + * if recursive is true, list all file statuses and block locations in + * the subtree rooted at the given path. + * Files across symbolic links are also returned. + * + * @param f is the path + * @param recursive if the subdirectories need to be traversed recursively + * + * @return an iterator that traverses statuses of the files + * + * @throws AccessControlException If access is denied + * @throws FileNotFoundException If <code>f</code> does not exist + * @throws UnsupportedFileSystemException If file system for <code>f</code> is + * not supported + * @throws IOException If an I/O error occurred + * + * Exceptions applicable to file systems accessed over RPC: + * @throws RpcClientException If an exception occurred in the RPC client + * @throws RpcServerException If an exception occurred in the RPC server + * @throws UnexpectedServerException If server implementation throws + * undeclared exception to RPC server + */ + public Iterator<LocatedFileStatus> listFiles( + final Path f, final boolean recursive) throws AccessControlException, + FileNotFoundException, UnsupportedFileSystemException, + IOException { + return new Iterator<LocatedFileStatus>() { + private Stack<Path> dirs = new Stack<Path>(); + private Stack<Path> symLinks = new Stack<Path>(); + Iterator<LocatedFileStatus> itor = listLocatedStatus(f); + LocatedFileStatus curFile; + + @Override + public boolean hasNext() { + try { + while (curFile == null) { + if (itor.hasNext()) { + handleFileStat(itor.next()); + } else if (!dirs.isEmpty()) { + Path dirPath = dirs.pop(); + itor = listLocatedStatus(dirPath); + } else if (!symLinks.isEmpty()) { + Path symLink = symLinks.pop(); + FileStatus stat = getFileStatus(symLink); + if (stat.isFile() || (recursive && stat.isDirectory())) { + itor = listLocatedStatus(stat.getPath()); + } + } else { + return false; + } + } + return true; + } catch (IOException ioe) { + throw (RuntimeException)new RuntimeException().initCause(ioe); + } + } + + private void handleFileStat(LocatedFileStatus stat) throws IOException { + if (stat.isFile()) { // file + curFile = stat; + } else if (stat.isSymlink()) { // symbolic link + symLinks.push(stat.getSymlink()); + } else if (recursive) { // directory + dirs.push(stat.getPath()); + } + } + + @Override + public LocatedFileStatus next() { + if (hasNext()) { + LocatedFileStatus result = curFile; + curFile = null; + return result; + } + throw new java.util.NoSuchElementException("No more entry in " + f); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove is not supported"); + + } + }; + } + + /** + * List the statuses of the files/directories in the given path if the path is + * a directory. Each returned status contains a file's block locations. + * + * @param f is the path + * + * @return an iterator that traverses statuses of the files/directories + * in the given path + * + * @throws AccessControlException If access is denied + * @throws FileNotFoundException If <code>f</code> does not exist + * @throws UnsupportedFileSystemException If file system for <code>f</code> is + * not supported + * @throws IOException If an I/O error occurred + * + * Exceptions applicable to file systems accessed over RPC: + * @throws RpcClientException If an exception occurred in the RPC client + * @throws RpcServerException If an exception occurred in the RPC server + * @throws UnexpectedServerException If server implementation throws + * undeclared exception to RPC server + */ + public Iterator<LocatedFileStatus> listLocatedStatus(final Path f) throws + AccessControlException, FileNotFoundException, + UnsupportedFileSystemException, IOException { + final Path absF = fixRelativePart(f); + return new FSLinkResolver<Iterator<LocatedFileStatus>>() { + public Iterator<LocatedFileStatus> next( + final AbstractFileSystem fs, final Path p) + throws IOException, UnresolvedLinkException { + return fs.listLocatedStatus(p); + } + }.resolve(this, absF); + } + + /** * Mark a path to be deleted on JVM shutdown. * * @param f the existing path to delete. Modified: hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?rev=980271&r1=980270&r2=980271&view=diff ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original) +++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Thu Jul 29 00:14:36 2010 @@ -29,9 +29,12 @@ import java.util.HashMap; import java.util.HashSet; import java.util.IdentityHashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Set; +import java.util.Stack; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -1278,6 +1281,91 @@ public abstract class FileSystem extends return globPathsLevel(parents, filePattern, level + 1, hasGlob); } + /** + * List the statuses and block locations of the files in the given path + * if the path is a directory. + * If the given path is a file, return the file's status and block locations. + * if recursive is true, list all file statuses and block locations in + * the subtree rooted at the given path. + * + * @param f is the path + * @param recursive if the subdirectories need to be traversed recursively + * + * @return an iterator that traverses statuses of the files + * @throws FileNotFoundException when the path does not exist; + * IOException see specific implementation + */ + public Iterator<LocatedFileStatus> listFiles( + final Path f, final boolean recursive) + throws FileNotFoundException, IOException { + return new Iterator<LocatedFileStatus>() { + private LinkedList<FileStatus> fileStats = new LinkedList<FileStatus>(); + private Stack<FileStatus> dirStats = new Stack<FileStatus>(); + + { // initializer + list(f); + } + + @Override + public boolean hasNext() { + if (fileStats.isEmpty()) { + listDir(); + } + return !fileStats.isEmpty(); + } + + /** + * list at least one directory until file list is not empty + */ + private void listDir() { + while (fileStats.isEmpty() && !dirStats.isEmpty()) { + FileStatus dir = dirStats.pop(); + list(dir.getPath()); + } + } + + /** + * List the given path + * + * @param dirPath a path + */ + private void list(Path dirPath) { + try { + FileStatus[] stats = listStatus(dirPath); + for (FileStatus stat : stats) { + if (stat.isFile()) { + fileStats.add(stat); + } else if (recursive) { // directory & recursive + dirStats.push(stat); + } + } + } catch (IOException ioe) { + throw (RuntimeException) new RuntimeException().initCause(ioe); + } + } + + @Override + public LocatedFileStatus next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + FileStatus status = fileStats.remove(); + try { + BlockLocation[] locs = getFileBlockLocations( + status, 0, status.getLen()); + return new LocatedFileStatus(status, locs); + } catch (IOException ioe) { + throw (RuntimeException) new RuntimeException().initCause(ioe); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove is not supported"); + } + }; + } + /** Return the current user's home directory in this filesystem. * The default implementation returns "/user/$USER/". */ Added: hadoop/common/trunk/src/java/org/apache/hadoop/fs/LocatedFileStatus.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/LocatedFileStatus.java?rev=980271&view=auto ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/fs/LocatedFileStatus.java (added) +++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/LocatedFileStatus.java Thu Jul 29 00:14:36 2010 @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.permission.FsPermission; + +/** + * This class defines a FileStatus that includes a file's block locations. + */ +...@interfaceaudience.public +...@interfacestability.evolving +public class LocatedFileStatus extends FileStatus { + private BlockLocation[] locations; + + /** + * Constructor + * @param stat a file status + * @param locations a file's block locations + */ + public LocatedFileStatus(FileStatus stat, BlockLocation[] locations) + throws IOException { + this(stat.getLen(), stat.isDirectory(), stat.getReplication(), + stat.getBlockSize(), stat.getModificationTime(), + stat.getAccessTime(), stat.getPermission(), stat.getOwner(), + stat.getGroup(), null, stat.getPath(), locations); + if (isSymlink()) { + setSymlink(stat.getSymlink()); + } + } + + /** + * Constructor + * + * @param length a file's length + * @param isdir if the path is a directory + * @param block_replication the file's replication factor + * @param blocksize a file's block size + * @param modification_time a file's modification time + * @param access_time a file's access time + * @param permission a file's permission + * @param owner a file's owner + * @param group a file's group + * @param symlink symlink if the path is a symbolic link + * @param path the path's qualified name + * @param locations a file's block locations + */ + public LocatedFileStatus(long length, boolean isdir, + int block_replication, + long blocksize, long modification_time, long access_time, + FsPermission permission, String owner, String group, + Path symlink, + Path path, + BlockLocation[] locations) { + super(length, isdir, block_replication, blocksize, modification_time, + access_time, permission, owner, group, symlink, path); + this.locations = locations; + } + + /** + * Get the file's block locations + * @return the file's block locations + */ + public BlockLocation[] getBlockLocations() { + return locations; + } + + /** + * Compare this object to another object + * + * @param o the object to be compared. + * @return a negative integer, zero, or a positive integer as this object + * is less than, equal to, or greater than the specified object. + * + * @throws ClassCastException if the specified object's is not of + * type FileStatus + */ + public int compareTo(Object o) { + return super.compareTo(o); + } + + /** Compare if this object is equal to another object + * @param o the object to be compared. + * @return true if two file status has the same path name; false if not. + */ + public boolean equals(Object o) { + return super.equals(o); + } + + /** + * Returns a hash code value for the object, which is defined as + * the hash code of the path name. + * + * @return a hash code value for the path name. + */ + public int hashCode() { + return super.hashCode(); + } +} Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestFilterFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestFilterFileSystem.java?rev=980271&r1=980270&r2=980271&view=diff ============================================================================== --- hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestFilterFileSystem.java (original) +++ hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestFilterFileSystem.java Thu Jul 29 00:14:36 2010 @@ -21,6 +21,7 @@ package org.apache.hadoop.fs; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.EnumSet; +import java.util.Iterator; import junit.framework.TestCase; import org.apache.commons.logging.Log; @@ -107,6 +108,10 @@ public class TestFilterFileSystem extend public FileStatus[] globStatus(Path pathPattern, PathFilter filter) { return null; } + public Iterator<LocatedFileStatus> listFiles( + final Path path, final boolean isRecursive) { + return null; + } public void copyFromLocalFile(Path src, Path dst) { } public void moveFromLocalFile(Path[] srcs, Path dst) { } public void moveFromLocalFile(Path src, Path dst) { } Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestFilterFs.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestFilterFs.java?rev=980271&r1=980270&r2=980271&view=diff ============================================================================== --- hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestFilterFs.java (original) +++ hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestFilterFs.java Thu Jul 29 00:14:36 2010 @@ -35,6 +35,9 @@ public class TestFilterFs extends TestCa public Iterator<FileStatus> listStatusIterator(Path f) { return null; } + public Iterator<LocatedFileStatus> listLocatedStatus(final Path f) { + return null; + } } public void testFilterFileSystem() throws Exception { Added: hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestListFiles.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestListFiles.java?rev=980271&view=auto ============================================================================== --- hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestListFiles.java (added) +++ hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestListFiles.java Thu Jul 29 00:14:36 2010 @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Random; + +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Level; + +import static org.junit.Assert.*; +import org.junit.Test; +import org.junit.BeforeClass; + +/** + * This class tests the FileStatus API. + */ +public class TestListFiles { + { + ((Log4JLogger)FileSystem.LOG).getLogger().setLevel(Level.ALL); + } + + static final long seed = 0xDEADBEEFL; + + final protected static Configuration conf = new Configuration(); + protected static FileSystem fs; + final protected static Path TEST_DIR = getTestDir(); + final private static int FILE_LEN = 10; + final private static Path FILE1 = new Path(TEST_DIR, "file1"); + final private static Path DIR1 = new Path(TEST_DIR, "dir1"); + final private static Path FILE2 = new Path(DIR1, "file2"); + final private static Path FILE3 = new Path(DIR1, "file3"); + + protected static Path getTestDir() { + return new Path( + System.getProperty("test.build.data","build/test/data/work-dir/localfs"), + "main_"); + } + + @BeforeClass + public static void testSetUp() throws Exception { + fs = FileSystem.getLocal(conf); + fs.delete(TEST_DIR, true); + } + + private static void writeFile(FileSystem fileSys, Path name, int fileSize) + throws IOException { + // Create and write a file that contains three blocks of data + FSDataOutputStream stm = fileSys.create(name); + byte[] buffer = new byte[fileSize]; + Random rand = new Random(seed); + rand.nextBytes(buffer); + stm.write(buffer); + stm.close(); + } + + /** Test when input path is a file */ + @Test + public void testFile() throws IOException { + fs.mkdirs(TEST_DIR); + writeFile(fs, FILE1, FILE_LEN); + + Iterator<LocatedFileStatus> itor = fs.listFiles( + FILE1, true); + LocatedFileStatus stat = itor.next(); + assertFalse(itor.hasNext()); + assertTrue(stat.isFile()); + assertEquals(FILE_LEN, stat.getLen()); + assertEquals(fs.makeQualified(FILE1), stat.getPath()); + assertEquals(1, stat.getBlockLocations().length); + + itor = fs.listFiles(FILE1, false); + stat = itor.next(); + assertFalse(itor.hasNext()); + assertTrue(stat.isFile()); + assertEquals(FILE_LEN, stat.getLen()); + assertEquals(fs.makeQualified(FILE1), stat.getPath()); + assertEquals(1, stat.getBlockLocations().length); + + fs.delete(FILE1, true); + } + + + /** Test when input path is a directory */ + @Test + public void testDirectory() throws IOException { + fs.mkdirs(DIR1); + + Iterator<LocatedFileStatus> itor = fs.listFiles( + DIR1, true); + assertFalse(itor.hasNext()); + itor = fs.listFiles(DIR1, false); + assertFalse(itor.hasNext()); + + writeFile(fs, FILE2, FILE_LEN); + + // test empty directory + itor = fs.listFiles(DIR1, true); + LocatedFileStatus stat = itor.next(); + assertFalse(itor.hasNext()); + assertTrue(stat.isFile()); + assertEquals(FILE_LEN, stat.getLen()); + assertEquals(fs.makeQualified(FILE2), stat.getPath()); + assertEquals(1, stat.getBlockLocations().length); + + // testing directory with 1 file + itor = fs.listFiles(DIR1, false); + stat = itor.next(); + assertFalse(itor.hasNext()); + assertTrue(stat.isFile()); + assertEquals(FILE_LEN, stat.getLen()); + assertEquals(fs.makeQualified(FILE2), stat.getPath()); + assertEquals(1, stat.getBlockLocations().length); + + // test more complicated directory + writeFile(fs, FILE1, FILE_LEN); + writeFile(fs, FILE3, FILE_LEN); + + itor = fs.listFiles(TEST_DIR, true); + stat = itor.next(); + assertTrue(stat.isFile()); + assertEquals(fs.makeQualified(FILE1), stat.getPath()); + stat = itor.next(); + assertTrue(stat.isFile()); + assertEquals(fs.makeQualified(FILE2), stat.getPath()); + stat = itor.next(); + assertTrue(stat.isFile()); + assertEquals(fs.makeQualified(FILE3), stat.getPath()); + assertFalse(itor.hasNext()); + + itor = fs.listFiles(TEST_DIR, false); + stat = itor.next(); + assertTrue(stat.isFile()); + assertEquals(fs.makeQualified(FILE1), stat.getPath()); + assertFalse(itor.hasNext()); + + fs.delete(TEST_DIR, true); + } +}