add getFileStatus and listStatus()
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8fb665dc Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8fb665dc Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8fb665dc Branch: refs/heads/s3_create Commit: 8fb665dc5a78e237d52b927d7043df59387b3f46 Parents: a281799 Author: Lei Xu <l...@cloudera.com> Authored: Thu Jul 7 21:50:40 2016 -0700 Committer: Lei Xu <l...@cloudera.com> Committed: Thu Jul 7 21:50:40 2016 -0700 ---------------------------------------------------------------------- .../apache/hadoop/fs/s3a/CachedDirectory.java | 49 +++++++++++------- .../apache/hadoop/fs/s3a/CachedFileStatus.java | 14 +++++- .../hadoop/fs/s3a/DynamoDBMetadataStore.java | 2 +- .../org/apache/hadoop/fs/s3a/S3AFileStatus.java | 6 +++ .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 53 ++++++++++++++++++-- .../hadoop/fs/s3a/TestLocalMetadataStore.java | 17 +++---- 6 files changed, 106 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb665dc/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CachedDirectory.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CachedDirectory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CachedDirectory.java index 5f0f606..0ea9436 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CachedDirectory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CachedDirectory.java @@ -1,11 +1,14 @@ package org.apache.hadoop.fs.s3a; +import com.google.common.collect.ImmutableCollection; +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import java.util.ArrayList; -import java.util.Arrays; +import java.io.IOException; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; /** * Result of a listStatus() call for placing into a coherent metadata cache. @@ -17,12 +20,12 @@ import java.util.Collection; */ public class CachedDirectory { - public static final FileStatus[] EMPTY_DIR = {}; + public static final S3AFileStatus[] EMPTY_DIR = {}; protected Path path; /** TODO optimize out initial copy: use wrapped Arrays.asList() initially * and change to ArrayList only on add/delete. */ - protected ArrayList<FileStatus> fileStatuses; + protected Map<Path, CachedFileStatus> statusMap = new HashMap<>(); /** * True iff this CachedDirectory contained the same set of files as actually @@ -35,9 +38,12 @@ public class CachedDirectory { */ protected boolean isFullyCached; - public CachedDirectory(Path path, FileStatus[] fileStatuses, boolean isFullyCached) { + public CachedDirectory(Path path, S3AFileStatus[] fileStatuses, boolean isFullyCached) { this.path = path; - this.fileStatuses = new ArrayList<>(Arrays.asList(fileStatuses)); + for (S3AFileStatus s : fileStatuses) { + // TODO generate a lot of garbage + statusMap.put(s.getPath(), new CachedFileStatus(s)); + } this.isFullyCached = isFullyCached; } @@ -45,18 +51,21 @@ public class CachedDirectory { return path; } - public FileStatus[] getFileStatuses() { - FileStatus[] statuses = new FileStatus[fileStatuses.size()]; - return fileStatuses.toArray(statuses); + public Collection<CachedFileStatus> getFileStatuses() { + return statusMap.values(); } public boolean isFullyCached() { return isFullyCached; } + public void setFullyCached() { + isFullyCached = true; + } + /** Add given file to this directory. Does not check for duplicates. */ - public void addFile(FileStatus status) { - fileStatuses.add(status); + public void addFile(S3AFileStatus status) { + statusMap.put(status.getPath(), new CachedFileStatus(status)); } /** @@ -64,21 +73,25 @@ public class CachedDirectory { * @return true iff path was found and removed. */ public boolean removeFile(Path path) { - for (int i = 0; i < fileStatuses.size(); i++) { - FileStatus s = fileStatuses.get(i); - if (s.getPath().equals(path)) { - fileStatuses.remove(i); - return true; - } + if (statusMap.containsKey(path.toString())) { + statusMap.get(path.toString()).delete(); } return false; } + public boolean has(Path path) { + return statusMap.containsKey(path); + } + + public void flush() throws IOException { + // TODO: Save to database. + } + @Override public String toString() { return "CachedDirectory{" + "path=" + path + - ", fileStatuses=" + fileStatuses + + ", fileStatuses=" + statusMap + ", isFullyCached=" + isFullyCached + '}'; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb665dc/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CachedFileStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CachedFileStatus.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CachedFileStatus.java index 4b0156e..dcc1f44 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CachedFileStatus.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CachedFileStatus.java @@ -9,19 +9,29 @@ import org.apache.hadoop.fs.FileStatus; */ @InterfaceAudience.LimitedPrivate("HDFS") @InterfaceStability.Evolving -public class CachedFileStatus { +public class CachedFileStatus extends S3AFileStatus { // TODO this may change to inheritance, not sure yet. protected S3AFileStatus fileStatus; + boolean isDeleted; + public CachedFileStatus(S3AFileStatus fileStatus) { - this.fileStatus = fileStatus; + super(fileStatus); + isDeleted = false; } public S3AFileStatus getFileStatus() { return fileStatus; } + public boolean isDeleted() { + return isDeleted; + } + + public void delete() { + isDeleted = true; + } @Override public String toString() { return "CachedFileStatus{" + http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb665dc/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DynamoDBMetadataStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DynamoDBMetadataStore.java index efc2fb3..061369d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DynamoDBMetadataStore.java @@ -132,7 +132,7 @@ public class DynamoDBMetadataStore implements MetadataStore, Closeable { // XXX TODO this object needs to map cleanly to/from FileStatus // hacking broken values for now - FileStatus fs = new FileStatus(0, isdir, 0, 0, 0, getPath()); + S3AFileStatus fs = new S3AFileStatus(0, 0, getPath(), 0); CachedFileStatus cfs = new CachedFileStatus(fs); return cfs; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb665dc/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java index 75a6500..b633f84 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java @@ -33,6 +33,12 @@ import org.apache.hadoop.fs.Path; public class S3AFileStatus extends FileStatus { private boolean isEmptyDirectory; + S3AFileStatus(S3AFileStatus other) { + super(other.getLen(), other.isDirectory(), 1, other.getBlockSize(), + other.getModificationTime(), other.getPath()); + isEmptyDirectory = other.isEmptyDirectory(); + } + // Directories public S3AFileStatus(boolean isdir, boolean isemptydir, Path path) { super(0, isdir, 1, 0, 0, path); http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb665dc/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 3299f2b..55540e7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -25,7 +25,10 @@ import java.io.InputStream; import java.io.InterruptedIOException; import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -61,6 +64,7 @@ import com.amazonaws.event.ProgressEvent; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.commons.collections.map.HashedMap; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -1155,7 +1159,46 @@ public class S3AFileSystem extends FileSystem { public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { try { - return innerListStatus(f); + CachedDirectory cachedDir = null; + if (metadataStore != null) { + cachedDir = metadataStore.listStatus(f); + if (cachedDir != null && cachedDir.isFullyCached()) { + return cachedDir.getFileStatuses().toArray(new FileStatus[0]); + } + } + + S3AFileStatus[] statuses = innerListStatus(f); + if (cachedDir != null) { + // Merge the view from the external metadata store. + Map<Path, S3AFileStatus> baseView = new HashMap<>(); + for (S3AFileStatus s : statuses) { + baseView.put(s.getPath(), s); + } + Collection<CachedFileStatus> consistentView = + cachedDir.getFileStatuses(); + for (CachedFileStatus cfs : consistentView) { + if (cfs.isDeleted) { + if (baseView.containsKey(cfs.getPath())) { + baseView.remove(cfs.getPath()); + } + } else { + baseView.put(cfs.getPath(), cfs.getFileStatus()); + } + } + // Add missing files back to the cache so that we can set cachedDir as + // fully cached. + for (S3AFileStatus status : baseView.values()) { + if (!cachedDir.has(status.getPath())) { + cachedDir.addFile(status); + } + } + cachedDir.setFullyCached(); + + cachedDir.flush(); // TODO async save + statuses = baseView.values().toArray(new S3AFileStatus[0]); + } + return statuses; + } catch (AmazonClientException e) { throw translateException("listStatus", f, e); } @@ -1171,14 +1214,14 @@ public class S3AFileSystem extends FileSystem { * @throws IOException due to an IO problem. * @throws AmazonClientException on failures inside the AWS SDK */ - public FileStatus[] innerListStatus(Path f) throws FileNotFoundException, + public S3AFileStatus[] innerListStatus(Path f) throws FileNotFoundException, IOException, AmazonClientException { String key = pathToKey(f); LOG.debug("List status for path: {}", f); incrementStatistic(INVOCATION_LIST_STATUS); - final List<FileStatus> result = new ArrayList<FileStatus>(); - final FileStatus fileStatus = getFileStatus(f); + final List<S3AFileStatus> result = new ArrayList<>(); + final S3AFileStatus fileStatus = getFileStatus(f); if (fileStatus.isDirectory()) { if (!key.isEmpty()) { @@ -1232,7 +1275,7 @@ public class S3AFileSystem extends FileSystem { result.add(fileStatus); } - return result.toArray(new FileStatus[result.size()]); + return result.toArray(new S3AFileStatus[result.size()]); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb665dc/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestLocalMetadataStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestLocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestLocalMetadataStore.java index 4af60ea..6e3e9a6 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestLocalMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestLocalMetadataStore.java @@ -14,6 +14,7 @@ import org.slf4j.LoggerFactory; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -199,7 +200,7 @@ public class TestLocalMetadataStore extends Assert { lms.putNew(new CachedFileStatus(makeFileStatus("/a1/b1/file2", 100))); } - private void assertFileStatusesEqual(FileStatus[] statuses, String ...pathStrs) { + private void assertFileStatusesEqual(Collection<? extends FileStatus> statuses, String ...pathStrs) { Set<Path> a = new HashSet<>(); for (FileStatus fs : statuses) { a.add(fs.getPath()); @@ -224,7 +225,7 @@ public class TestLocalMetadataStore extends Assert { CachedDirectory dir = lms.listStatus(new Path(pathStr)); assertNotNull("Directory " + pathStr + " in cache", dir); assertEquals("Number of entries in dir " + pathStr, - dir.getFileStatuses().length, size); + dir.getFileStatuses().size(), size); } private void assertNotCached(String pathStr) throws IOException { @@ -244,15 +245,13 @@ public class TestLocalMetadataStore extends Assert { } } - private FileStatus makeFileStatus(String pathStr, long length) { - Path f = new Path(pathStr); - return new FileStatus(length, false /* not dir */, REPLICATION, - BLOCK_SIZE, modTime, accessTime, fsPermission, OWNER, GROUP, f); + private S3AFileStatus makeFileStatus(String path, long length) { + Path f = new Path(path); + return new S3AFileStatus(length, modTime, f, BLOCK_SIZE); } - private FileStatus makeDirStatus(String pathStr) { + private S3AFileStatus makeDirStatus(String pathStr) { Path f = new Path(pathStr); - return new FileStatus(0, true /* dir */, REPLICATION, - 0, modTime, accessTime, fsPermission, OWNER, GROUP, f); + return new S3AFileStatus(true, true, f); } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org