HADOOP-13171. Add StorageStatistics to S3A; instrument some more operations. Contributed by Steve Loughran.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/043a0c2e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/043a0c2e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/043a0c2e Branch: refs/heads/branch-2 Commit: 043a0c2e6cb1d3708bee254a67c48407c6a06095 Parents: b6d5546 Author: Chris Nauroth <cnaur...@apache.org> Authored: Fri Jun 3 08:56:07 2016 -0700 Committer: Chris Nauroth <cnaur...@apache.org> Committed: Fri Jun 3 08:56:07 2016 -0700 ---------------------------------------------------------------------- .../hadoop/fs/contract/ContractTestUtils.java | 420 +++++++++++++++ .../fs/s3a/ProgressableProgressListener.java | 94 ++++ .../hadoop/fs/s3a/S3AFastOutputStream.java | 65 +-- .../org/apache/hadoop/fs/s3a/S3AFileStatus.java | 7 + .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 507 ++++++++++++++----- .../hadoop/fs/s3a/S3AInstrumentation.java | 218 +++++--- .../apache/hadoop/fs/s3a/S3AOutputStream.java | 98 +--- .../hadoop/fs/s3a/S3AStorageStatistics.java | 104 ++++ .../java/org/apache/hadoop/fs/s3a/S3AUtils.java | 48 ++ .../org/apache/hadoop/fs/s3a/Statistic.java | 143 ++++++ .../src/site/markdown/tools/hadoop-aws/index.md | 12 +- .../org/apache/hadoop/fs/s3a/S3ATestUtils.java | 153 ++++++ .../hadoop/fs/s3a/TestS3AFileOperationCost.java | 191 +++++++ .../hadoop/fs/s3a/scale/S3AScaleTestBase.java | 154 ++---- .../fs/s3a/scale/TestS3ADeleteManyFiles.java | 10 +- .../s3a/scale/TestS3ADirectoryPerformance.java | 189 +++++++ .../scale/TestS3AInputStreamPerformance.java | 6 +- .../src/test/resources/log4j.properties | 4 +- 18 files changed, 1984 insertions(+), 439 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/043a0c2e/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index 6343d40..20ba075 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -22,7 +22,9 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.IOUtils; import org.junit.Assert; import org.junit.internal.AssumptionViolatedException; @@ -34,8 +36,14 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; import java.util.Properties; +import java.util.Set; import java.util.UUID; /** @@ -892,4 +900,416 @@ public class ContractTestUtils extends Assert { fs.delete(objectPath, false); } } + + /** + * Make times more readable, by adding a "," every three digits. + * @param nanos nanos or other large number + * @return a string for logging + */ + public static String toHuman(long nanos) { + return String.format(Locale.ENGLISH, "%,d", nanos); + } + + /** + * Log the bandwidth of a timer as inferred from the number of + * bytes processed. + * @param timer timer + * @param bytes bytes processed in the time period + */ + public static void bandwidth(NanoTimer timer, long bytes) { + LOG.info("Bandwidth = {} MB/S", + timer.bandwidthDescription(bytes)); + } + + /** + * Work out the bandwidth in MB/s. + * @param bytes bytes + * @param durationNS duration in nanos + * @return the number of megabytes/second of the recorded operation + */ + public static double bandwidthMBs(long bytes, long durationNS) { + return (bytes * 1000.0) / durationNS; + } + + /** + * Recursively create a directory tree. + * Return the details about the created tree. The files and directories + * are those created under the path, not the base directory created. That + * is retrievable via {@link TreeScanResults#getBasePath()}. + * @param fs filesystem + * @param current parent dir + * @param depth depth of directory tree + * @param width width: subdirs per entry + * @param files number of files per entry + * @param filesize size of files to create in bytes. + * @return the details about the created tree. + * @throws IOException IO Problems + */ + public static TreeScanResults createSubdirs(FileSystem fs, + Path current, + int depth, + int width, + int files, + int filesize) throws IOException { + return createSubdirs(fs, current, depth, width, files, + filesize, "dir-", "file-", "0"); + } + + /** + * Recursively create a directory tree. + * @param fs filesystem + * @param current the current dir in the walk + * @param depth depth of directory tree + * @param width width: subdirs per entry + * @param files number of files per entry + * @param filesize size of files to create in bytes. + * @param dirPrefix prefix for directory entries + * @param filePrefix prefix for file entries + * @param marker string which is slowly built up to uniquely name things + * @return the details about the created tree. + * @throws IOException IO Problems + */ + public static TreeScanResults createSubdirs(FileSystem fs, + Path current, + int depth, + int width, + int files, + int filesize, + String dirPrefix, + String filePrefix, + String marker) throws IOException { + fs.mkdirs(current); + TreeScanResults results = new TreeScanResults(current); + if (depth > 0) { + byte[] data = dataset(filesize, 'a', 'z'); + for (int i = 0; i < files; i++) { + String name = String.format("%s-%s-%04d.txt", filePrefix, marker, i); + Path path = new Path(current, name); + createFile(fs, path, true, data); + results.add(fs, path); + } + for (int w = 0; w < width; w++) { + String marker2 = String.format("%s-%04d", marker, w); + Path child = new Path(current, dirPrefix + marker2); + results.add(createSubdirs(fs, child, depth - 1, width, files, + filesize, dirPrefix, filePrefix, marker2)); + results.add(fs, child); + } + } + return results; + } + + /** + * Predicate to determine if two lists are equivalent, that is, they + * contain the same entries. + * @param left first collection of paths + * @param right second collection of paths + * @return true if all entries are in each collection of path. + */ + public static boolean collectionsEquivalent(Collection<Path> left, + Collection<Path> right) { + Set<Path> leftSet = new HashSet<>(left); + Set<Path> rightSet = new HashSet<>(right); + return leftSet.containsAll(right) && rightSet.containsAll(left); + } + + /** + * Predicate to determine if two lists are equivalent, that is, they + * contain the same entries. + * @param left first collection of paths + * @param right second collection of paths + * @return true if all entries are in each collection of path. + */ + public static boolean collectionsEquivalentNoDuplicates(Collection<Path> left, + Collection<Path> right) { + return collectionsEquivalent(left, right) && + !containsDuplicates(left) && !containsDuplicates(right); + } + + + /** + * Predicate to test for a collection of paths containing duplicate entries. + * @param paths collection of paths + * @return true if there are duplicates. + */ + public static boolean containsDuplicates(Collection<Path> paths) { + return new HashSet<>(paths).size() != paths.size(); + } + + /** + * Recursively list all entries, with a depth first traversal of the + * directory tree. + * @param path path + * @return the number of entries listed + * @throws IOException IO problems + */ + public static TreeScanResults treeWalk(FileSystem fs, Path path) + throws IOException { + TreeScanResults dirsAndFiles = new TreeScanResults(); + + FileStatus[] statuses = fs.listStatus(path); + for (FileStatus status : statuses) { + LOG.info("{}{}", status.getPath(), status.isDirectory() ? "*" : ""); + } + for (FileStatus status : statuses) { + dirsAndFiles.add(status); + if (status.isDirectory()) { + dirsAndFiles.add(treeWalk(fs, status.getPath())); + } + } + return dirsAndFiles; + } + + /** + * Results of recursive directory creation/scan operations. + */ + public static final class TreeScanResults { + + private Path basePath; + private final List<Path> files = new ArrayList<>(); + private final List<Path> directories = new ArrayList<>(); + private final List<Path> other = new ArrayList<>(); + + + public TreeScanResults() { + } + + public TreeScanResults(Path basePath) { + this.basePath = basePath; + } + + /** + * Build from a located file status iterator. + * @param results results of the listFiles/listStatus call. + * @throws IOException IO problems during the iteration. + */ + public TreeScanResults(RemoteIterator<LocatedFileStatus> results) + throws IOException { + while (results.hasNext()) { + add(results.next()); + } + } + + /** + * Construct results from an array of statistics. + * @param stats statistics array. Must not be null. + */ + public TreeScanResults(FileStatus[] stats) { + assertNotNull("Null file status array", stats); + for (FileStatus stat : stats) { + add(stat); + } + } + + /** + * Add all paths in the other set of results to this instance. + * @param that the other instance + * @return this instance + */ + public TreeScanResults add(TreeScanResults that) { + files.addAll(that.files); + directories.addAll(that.directories); + other.addAll(that.other); + return this; + } + + /** + * Increment the counters based on the file status. + * @param status path status to count. + */ + public void add(FileStatus status) { + if (status.isFile()) { + files.add(status.getPath()); + } else if (status.isDirectory()) { + directories.add(status.getPath()); + } else { + other.add(status.getPath()); + } + } + + public void add(FileSystem fs, Path path) throws IOException { + add(fs.getFileStatus(path)); + } + + @Override + public String toString() { + return String.format("%d director%s and %d file%s", + getDirCount(), + getDirCount() == 1 ? "y" : "ies", + getFileCount(), + getFileCount() == 1 ? "" : "s"); + } + + /** + * Assert that the state of a listing has the specific number of files, + * directories and other entries. The error text will include + * the {@code text} param, the field in question, and the entire object's + * string value. + * @param text text prefix for assertions. + * @param f file count + * @param d expected directory count + * @param o expected other entries. + */ + public void assertSizeEquals(String text, long f, long d, long o) { + String self = toString(); + Assert.assertEquals(text + ": file count in " + self, + f, getFileCount()); + Assert.assertEquals(text + ": directory count in " + self, + d, getDirCount()); + Assert.assertEquals(text + ": 'other' count in " + self, + o, getOtherCount()); + } + + /** + * Assert that the trees are equivalent: that every list matches (and + * that neither has any duplicates). + * @param that the other entry + */ + public void assertEquivalent(TreeScanResults that) { + String details = "this= " + this + "; that=" + that; + assertFieldsEquivalent("files", that, files, that.files); + assertFieldsEquivalent("directories", that, + directories, that.directories); + assertFieldsEquivalent("other", that, other, that.other); + } + + /** + * Assert that a field in two instances are equivalent. + * @param fieldname field name for error messages + * @param that the other instance to scan + * @param ours our field's contents + * @param theirs the other instance's field constants + */ + public void assertFieldsEquivalent(String fieldname, + TreeScanResults that, + List<Path> ours, List<Path> theirs) { + assertFalse("Duplicate " + files + " in " + this, + containsDuplicates(ours)); + assertFalse("Duplicate " + files + " in other " + that, + containsDuplicates(theirs)); + assertTrue(fieldname + " mismatch: between {" + this + "}" + + " and {" + that + "}", + collectionsEquivalent(files, that.files)); + } + + public List<Path> getFiles() { + return files; + } + + public List<Path> getDirectories() { + return directories; + } + + public List<Path> getOther() { + return other; + } + + public Path getBasePath() { + return basePath; + } + + public long getFileCount() { + return files.size(); + } + + public long getDirCount() { + return directories.size(); + } + + public long getOtherCount() { + return other.size(); + } + + /** + * Total count of entries. + * @return the total number of entries + */ + public long totalCount() { + return getFileCount() + getDirCount() + getOtherCount(); + } + + } + + /** + * A simple class for timing operations in nanoseconds, and for + * printing some useful results in the process. + */ + public static final class NanoTimer { + private final long startTime; + private long endTime; + + public NanoTimer() { + startTime = now(); + } + + /** + * End the operation. + * @return the duration of the operation + */ + public long end() { + endTime = now(); + return duration(); + } + + /** + * End the operation; log the duration. + * @param format message + * @param args any arguments + * @return the duration of the operation + */ + public long end(String format, Object... args) { + long d = end(); + LOG.info("Duration of {}: {} nS", + String.format(format, args), toHuman(d)); + return d; + } + + public long now() { + return System.nanoTime(); + } + + public long duration() { + return endTime - startTime; + } + + public double bandwidth(long bytes) { + return bandwidthMBs(bytes, duration()); + } + + /** + * Bandwidth as bytes per second. + * @param bytes bytes in + * @return the number of bytes per second this operation timed. + */ + public double bandwidthBytes(long bytes) { + return (bytes * 1.0) / duration(); + } + + /** + * How many nanoseconds per IOP, byte, etc. + * @param operations operations processed in this time period + * @return the nanoseconds it took each byte to be processed + */ + public long nanosPerOperation(long operations) { + return duration() / operations; + } + + /** + * Get a description of the bandwidth, even down to fractions of + * a MB. + * @param bytes bytes processed + * @return bandwidth + */ + public String bandwidthDescription(long bytes) { + return String.format("%,.6f", bandwidth(bytes)); + } + + public long getStartTime() { + return startTime; + } + + public long getEndTime() { + return endTime; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/043a0c2e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ProgressableProgressListener.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ProgressableProgressListener.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ProgressableProgressListener.java new file mode 100644 index 0000000..0ce022a --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ProgressableProgressListener.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.event.ProgressEvent; +import com.amazonaws.event.ProgressEventType; +import com.amazonaws.event.ProgressListener; +import com.amazonaws.services.s3.transfer.Upload; +import org.apache.hadoop.util.Progressable; +import org.slf4j.Logger; + +import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT; +import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT; + +/** + * Listener to progress from AWS regarding transfers. + */ +public class ProgressableProgressListener implements ProgressListener { + private static final Logger LOG = S3AFileSystem.LOG; + private final S3AFileSystem fs; + private final String key; + private final Progressable progress; + private long lastBytesTransferred; + private final Upload upload; + + /** + * Instantiate. + * @param fs filesystem: will be invoked with statistics updates + * @param key key for the upload + * @param upload source of events + * @param progress optional callback for progress. + */ + public ProgressableProgressListener(S3AFileSystem fs, + String key, + Upload upload, + Progressable progress) { + this.fs = fs; + this.key = key; + this.upload = upload; + this.progress = progress; + this.lastBytesTransferred = 0; + } + + @Override + public void progressChanged(ProgressEvent progressEvent) { + if (progress != null) { + progress.progress(); + } + + // There are 3 http ops here, but this should be close enough for now + ProgressEventType pet = progressEvent.getEventType(); + if (pet == TRANSFER_PART_STARTED_EVENT || + pet == TRANSFER_COMPLETED_EVENT) { + fs.incrementWriteOperations(); + } + + long transferred = upload.getProgress().getBytesTransferred(); + long delta = transferred - lastBytesTransferred; + fs.incrementPutProgressStatistics(key, delta); + lastBytesTransferred = transferred; + } + + /** + * Method to invoke after upload has completed. + * This can handle race conditions in setup/teardown. + * @return the number of bytes which were transferred after the notification + */ + public long uploadCompleted() { + long delta = upload.getProgress().getBytesTransferred() - + lastBytesTransferred; + if (delta > 0) { + LOG.debug("S3A write delta changed after finished: {} bytes", delta); + fs.incrementPutProgressStatistics(key, delta); + } + return delta; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/043a0c2e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java index 61a83d4..7a985c6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java @@ -35,10 +35,8 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; @@ -54,6 +52,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadPoolExecutor; import static org.apache.hadoop.fs.s3a.S3AUtils.*; +import static org.apache.hadoop.fs.s3a.Statistic.*; /** * Upload files/parts asap directly from a memory buffer (instead of buffering @@ -77,8 +76,6 @@ public class S3AFastOutputStream extends OutputStream { private final int multiPartThreshold; private final S3AFileSystem fs; private final CannedAccessControlList cannedACL; - private final FileSystem.Statistics statistics; - private final String serverSideEncryptionAlgorithm; private final ProgressListener progressListener; private final ListeningExecutorService executorService; private MultiPartUpload multiPartUpload; @@ -98,28 +95,28 @@ public class S3AFastOutputStream extends OutputStream { * @param bucket S3 bucket name * @param key S3 key name * @param progress report progress in order to prevent timeouts - * @param statistics track FileSystem.Statistics on the performed operations * @param cannedACL used CannedAccessControlList - * @param serverSideEncryptionAlgorithm algorithm for server side encryption * @param partSize size of a single part in a multi-part upload (except * last part) * @param multiPartThreshold files at least this size use multi-part upload * @param threadPoolExecutor thread factory * @throws IOException on any problem */ - public S3AFastOutputStream(AmazonS3Client client, S3AFileSystem fs, - String bucket, String key, Progressable progress, - FileSystem.Statistics statistics, CannedAccessControlList cannedACL, - String serverSideEncryptionAlgorithm, long partSize, - long multiPartThreshold, ThreadPoolExecutor threadPoolExecutor) + public S3AFastOutputStream(AmazonS3Client client, + S3AFileSystem fs, + String bucket, + String key, + Progressable progress, + CannedAccessControlList cannedACL, + long partSize, + long multiPartThreshold, + ThreadPoolExecutor threadPoolExecutor) throws IOException { this.bucket = bucket; this.key = key; this.client = client; this.fs = fs; this.cannedACL = cannedACL; - this.statistics = statistics; - this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm; //Ensure limit as ByteArrayOutputStream size cannot exceed Integer.MAX_VALUE if (partSize > Integer.MAX_VALUE) { this.partSize = Integer.MAX_VALUE; @@ -246,16 +243,17 @@ public class S3AFastOutputStream extends OutputStream { if (multiPartUpload == null) { putObject(); } else { - if (buffer.size() > 0) { + int size = buffer.size(); + if (size > 0) { + fs.incrementPutStartStatistics(size); //send last part multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer - .toByteArray()), buffer.size()); + .toByteArray()), size); } final List<PartETag> partETags = multiPartUpload .waitForAllPartUploads(); multiPartUpload.complete(partETags); } - statistics.incrementWriteOps(1); // This will delete unnecessary fake parent directories fs.finishedWrite(key); LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key); @@ -265,18 +263,19 @@ public class S3AFastOutputStream extends OutputStream { } } + /** + * Create the default metadata for a multipart upload operation. + * @return the metadata to use/extend. + */ private ObjectMetadata createDefaultMetadata() { - ObjectMetadata om = new ObjectMetadata(); - if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - om.setSSEAlgorithm(serverSideEncryptionAlgorithm); - } - return om; + return fs.newObjectMetadata(); } private MultiPartUpload initiateMultiPartUpload() throws IOException { - final ObjectMetadata om = createDefaultMetadata(); final InitiateMultipartUploadRequest initiateMPURequest = - new InitiateMultipartUploadRequest(bucket, key, om); + new InitiateMultipartUploadRequest(bucket, + key, + createDefaultMetadata()); initiateMPURequest.setCannedACL(cannedACL); try { return new MultiPartUpload( @@ -290,15 +289,18 @@ public class S3AFastOutputStream extends OutputStream { LOG.debug("Executing regular upload for bucket '{}' key '{}'", bucket, key); final ObjectMetadata om = createDefaultMetadata(); - om.setContentLength(buffer.size()); - final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, - new ByteArrayInputStream(buffer.toByteArray()), om); - putObjectRequest.setCannedAcl(cannedACL); + final int size = buffer.size(); + om.setContentLength(size); + final PutObjectRequest putObjectRequest = + fs.newPutObjectRequest(key, + om, + new ByteArrayInputStream(buffer.toByteArray())); putObjectRequest.setGeneralProgressListener(progressListener); ListenableFuture<PutObjectResult> putObjectResult = executorService.submit(new Callable<PutObjectResult>() { @Override public PutObjectResult call() throws Exception { + fs.incrementPutStartStatistics(size); return client.putObject(putObjectRequest); } }); @@ -306,7 +308,7 @@ public class S3AFastOutputStream extends OutputStream { try { putObjectResult.get(); } catch (InterruptedException ie) { - LOG.warn("Interrupted object upload:" + ie, ie); + LOG.warn("Interrupted object upload: {}", ie, ie); Thread.currentThread().interrupt(); } catch (ExecutionException ee) { throw extractException("regular upload", key, ee); @@ -339,7 +341,7 @@ public class S3AFastOutputStream extends OutputStream { public PartETag call() throws Exception { LOG.debug("Uploading part {} for id '{}'", currentPartNumber, uploadId); - return client.uploadPart(request).getPartETag(); + return fs.uploadPart(request).getPartETag(); } }); partETagsFutures.add(partETagFuture); @@ -349,7 +351,7 @@ public class S3AFastOutputStream extends OutputStream { try { return Futures.allAsList(partETagsFutures).get(); } catch (InterruptedException ie) { - LOG.warn("Interrupted partUpload:" + ie, ie); + LOG.warn("Interrupted partUpload: {}", ie, ie); Thread.currentThread().interrupt(); return null; } catch (ExecutionException ee) { @@ -382,11 +384,12 @@ public class S3AFastOutputStream extends OutputStream { public void abort() { LOG.warn("Aborting multi-part upload with id '{}'", uploadId); try { + fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED); client.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId)); } catch (Exception e2) { LOG.warn("Unable to abort multipart upload, you may need to purge " + - "uploaded parts: " + e2, e2); + "uploaded parts: {}", e2, e2); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/043a0c2e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java index 9ecca33..75a6500 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java @@ -93,4 +93,11 @@ public class S3AFileStatus extends FileStatus { return super.getModificationTime(); } } + + @Override + public String toString() { + return super.toString() + + String.format(" isEmptyDirectory=%s", isEmptyDirectory()); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/043a0c2e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index a4c0c25..d392d8e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -45,7 +45,6 @@ import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.S3ClientOptions; import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.CannedAccessControlList; -import com.amazonaws.services.s3.model.DeleteObjectRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.ListObjectsRequest; import com.amazonaws.services.s3.model.ObjectListing; @@ -53,6 +52,8 @@ import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.CopyObjectRequest; import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; import com.amazonaws.services.s3.transfer.Copy; import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.TransferManagerConfiguration; @@ -71,8 +72,13 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.GlobalStorageStatistics; import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.StorageStatistics; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.ProviderUtils; import org.apache.hadoop.util.Progressable; @@ -80,6 +86,7 @@ import org.apache.hadoop.util.VersionInfo; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3AUtils.*; +import static org.apache.hadoop.fs.s3a.Statistic.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,6 +125,7 @@ public class S3AFileSystem extends FileSystem { private CannedAccessControlList cannedACL; private String serverSideEncryptionAlgorithm; private S3AInstrumentation instrumentation; + private S3AStorageStatistics storageStatistics; private long readAhead; // The maximum number of entries that can be deleted in any call to s3 @@ -237,6 +245,15 @@ public class S3AFileSystem extends FileSystem { enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true); readAhead = longOption(conf, READAHEAD_RANGE, DEFAULT_READAHEAD_RANGE, 0); + storageStatistics = (S3AStorageStatistics) + GlobalStorageStatistics.INSTANCE + .put(S3AStorageStatistics.NAME, + new GlobalStorageStatistics.StorageStatisticsProvider() { + @Override + public StorageStatistics provide() { + return new S3AStorageStatistics(); + } + }); int maxThreads = intOption(conf, MAX_THREADS, DEFAULT_MAX_THREADS, 0); int coreThreads = intOption(conf, CORE_THREADS, DEFAULT_CORE_THREADS, 0); @@ -346,6 +363,14 @@ public class S3AFileSystem extends FileSystem { } /** + * Get S3A Instrumentation. For test purposes. + * @return this instance's instrumentation. + */ + public S3AInstrumentation getInstrumentation() { + return instrumentation; + } + + /** * Initializes the User-Agent header to send in HTTP requests to the S3 * back-end. We always include the Hadoop version number. The user also may * set an optional custom prefix to put in front of the Hadoop version number. @@ -621,23 +646,26 @@ public class S3AFileSystem extends FileSystem { } instrumentation.fileCreated(); if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) { - return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket, - key, progress, statistics, cannedACL, - serverSideEncryptionAlgorithm, partSize, multiPartThreshold, - threadPoolExecutor), statistics); + return new FSDataOutputStream( + new S3AFastOutputStream(s3, + this, + bucket, + key, + progress, + cannedACL, + partSize, + multiPartThreshold, + threadPoolExecutor), + statistics); } // We pass null to FSDataOutputStream so it won't count writes that // are being buffered to a file return new FSDataOutputStream( new S3AOutputStream(getConf(), - transfers, this, - bucket, key, - progress, - cannedACL, - statistics, - serverSideEncryptionAlgorithm), + progress + ), null); } @@ -693,6 +721,7 @@ public class S3AFileSystem extends FileSystem { private boolean innerRename(Path src, Path dst) throws IOException, AmazonClientException { LOG.debug("Rename path {} to {}", src, dst); + incrementStatistic(INVOCATION_RENAME); String srcKey = pathToKey(src); String dstKey = pathToKey(dst); @@ -793,8 +822,7 @@ public class S3AFileSystem extends FileSystem { request.setPrefix(srcKey); request.setMaxKeys(maxKeys); - ObjectListing objects = s3.listObjects(request); - statistics.incrementReadOps(1); + ObjectListing objects = listObjects(request); while (true) { for (S3ObjectSummary summary : objects.getObjectSummaries()) { @@ -808,8 +836,7 @@ public class S3AFileSystem extends FileSystem { } if (objects.isTruncated()) { - objects = s3.listNextBatchOfObjects(objects); - statistics.incrementReadOps(1); + objects = continueListObjects(objects); } else { if (!keysToDelete.isEmpty()) { removeKeys(keysToDelete, false); @@ -838,17 +865,223 @@ public class S3AFileSystem extends FileSystem { } /** + * Increment a statistic by 1. + * @param statistic The operation to increment + */ + protected void incrementStatistic(Statistic statistic) { + incrementStatistic(statistic, 1); + } + + /** + * Increment a statistic by a specific value. + * @param statistic The operation to increment + * @param count the count to increment + */ + protected void incrementStatistic(Statistic statistic, long count) { + instrumentation.incrementCounter(statistic, count); + storageStatistics.incrementCounter(statistic, count); + } + + /** * Request object metadata; increments counters in the process. * @param key key * @return the metadata */ - private ObjectMetadata getObjectMetadata(String key) { + protected ObjectMetadata getObjectMetadata(String key) { + incrementStatistic(OBJECT_METADATA_REQUESTS); ObjectMetadata meta = s3.getObjectMetadata(bucket, key); - statistics.incrementReadOps(1); + incrementReadOperations(); return meta; } /** + * Initiate a {@code listObjects} operation, incrementing metrics + * in the process. + * @param request request to initiate + * @return the results + */ + protected ObjectListing listObjects(ListObjectsRequest request) { + incrementStatistic(OBJECT_LIST_REQUESTS); + incrementReadOperations(); + return s3.listObjects(request); + } + + /** + * List the next set of objects. + * @param objects paged result + * @return the next result object + */ + protected ObjectListing continueListObjects(ObjectListing objects) { + incrementStatistic(OBJECT_LIST_REQUESTS); + incrementReadOperations(); + return s3.listNextBatchOfObjects(objects); + } + + /** + * Increment read operations. + */ + public void incrementReadOperations() { + statistics.incrementReadOps(1); + } + + /** + * Increment the write operation counter. + * This is somewhat inaccurate, as it appears to be invoked more + * often than needed in progress callbacks. + */ + public void incrementWriteOperations() { + statistics.incrementWriteOps(1); + } + + /** + * Delete an object. + * Increments the {@code OBJECT_DELETE_REQUESTS} and write + * operation statistics. + * @param key key to blob to delete. + */ + private void deleteObject(String key) { + incrementWriteOperations(); + incrementStatistic(OBJECT_DELETE_REQUESTS); + s3.deleteObject(bucket, key); + } + + /** + * Perform a bulk object delete operation. + * Increments the {@code OBJECT_DELETE_REQUESTS} and write + * operation statistics. + * @param deleteRequest keys to delete on the s3-backend + */ + private void deleteObjects(DeleteObjectsRequest deleteRequest) { + incrementWriteOperations(); + incrementStatistic(OBJECT_DELETE_REQUESTS, 1); + s3.deleteObjects(deleteRequest); + } + + /** + * Create a putObject request. + * Adds the ACL and metadata + * @param key key of object + * @param metadata metadata header + * @param srcfile source file + * @return the request + */ + public PutObjectRequest newPutObjectRequest(String key, + ObjectMetadata metadata, File srcfile) { + PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, + srcfile); + putObjectRequest.setCannedAcl(cannedACL); + putObjectRequest.setMetadata(metadata); + return putObjectRequest; + } + + /** + * Create a {@link PutObjectRequest} request. + * The metadata is assumed to have been configured with the size of the + * operation. + * @param key key of object + * @param metadata metadata header + * @param inputStream source data. + * @return the request + */ + PutObjectRequest newPutObjectRequest(String key, + ObjectMetadata metadata, InputStream inputStream) { + PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, + inputStream, metadata); + putObjectRequest.setCannedAcl(cannedACL); + return putObjectRequest; + } + + /** + * Create a new object metadata instance. + * Any standard metadata headers are added here, for example: + * encryption. + * @return a new metadata instance + */ + public ObjectMetadata newObjectMetadata() { + final ObjectMetadata om = new ObjectMetadata(); + if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { + om.setSSEAlgorithm(serverSideEncryptionAlgorithm); + } + return om; + } + + /** + * Create a new object metadata instance. + * Any standard metadata headers are added here, for example: + * encryption. + * + * @param length length of data to set in header. + * @return a new metadata instance + */ + public ObjectMetadata newObjectMetadata(long length) { + final ObjectMetadata om = newObjectMetadata(); + om.setContentLength(length); + return om; + } + + /** + * PUT an object, incrementing the put requests and put bytes + * counters. + * It does not update the other counters, + * as existing code does that as progress callbacks come in. + * Byte length is calculated from the file length, or, if there is no + * file, from the content length of the header. + * @param putObjectRequest the request + * @return the upload initiated + */ + public Upload putObject(PutObjectRequest putObjectRequest) { + long len; + if (putObjectRequest.getFile() != null) { + len = putObjectRequest.getFile().length(); + } else { + len = putObjectRequest.getMetadata().getContentLength(); + } + incrementPutStartStatistics(len); + return transfers.upload(putObjectRequest); + } + + /** + * Upload part of a multi-partition file. + * Increments the write and put counters + * @param request request + * @return the result of the operation. + */ + public UploadPartResult uploadPart(UploadPartRequest request) { + incrementPutStartStatistics(request.getPartSize()); + return s3.uploadPart(request); + } + + /** + * At the start of a put/multipart upload operation, update the + * relevant counters. + * + * @param bytes bytes in the request. + */ + public void incrementPutStartStatistics(long bytes) { + LOG.debug("PUT start {} bytes", bytes); + incrementWriteOperations(); + incrementStatistic(OBJECT_PUT_REQUESTS); + if (bytes > 0) { + incrementStatistic(OBJECT_PUT_BYTES, bytes); + } + } + + /** + * Callback for use in progress callbacks from put/multipart upload events. + * Increments those statistics which are expected to be updated during + * the ongoing upload operation. + * @param key key to file that is being written (for logging) + * @param bytes bytes successfully uploaded. + */ + public void incrementPutProgressStatistics(String key, long bytes) { + LOG.debug("PUT {}: {} bytes", key, bytes); + incrementWriteOperations(); + if (bytes > 0) { + statistics.incrementBytesWritten(bytes); + } + } + + /** * A helper method to delete a list of keys on a s3-backend. * * @param keysToDelete collection of keys to delete on the s3-backend @@ -858,21 +1091,13 @@ public class S3AFileSystem extends FileSystem { private void removeKeys(List<DeleteObjectsRequest.KeyVersion> keysToDelete, boolean clearKeys) throws AmazonClientException { if (enableMultiObjectsDelete) { - DeleteObjectsRequest deleteRequest - = new DeleteObjectsRequest(bucket).withKeys(keysToDelete); - s3.deleteObjects(deleteRequest); + deleteObjects(new DeleteObjectsRequest(bucket).withKeys(keysToDelete)); instrumentation.fileDeleted(keysToDelete.size()); - statistics.incrementWriteOps(1); } else { - int writeops = 0; - for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) { - s3.deleteObject( - new DeleteObjectRequest(bucket, keyVersion.getKey())); - writeops++; + deleteObject(keyVersion.getKey()); } instrumentation.fileDeleted(keysToDelete.size()); - statistics.incrementWriteOps(writeops); } if (clearKeys) { keysToDelete.clear(); @@ -942,9 +1167,8 @@ public class S3AFileSystem extends FileSystem { if (status.isEmptyDirectory()) { LOG.debug("Deleting fake empty directory {}", key); - s3.deleteObject(bucket, key); + deleteObject(key); instrumentation.directoryDeleted(); - statistics.incrementWriteOps(1); } else { LOG.debug("Getting objects for directory prefix {} to delete", key); @@ -955,9 +1179,9 @@ public class S3AFileSystem extends FileSystem { //request.setDelimiter("/"); request.setMaxKeys(maxKeys); - List<DeleteObjectsRequest.KeyVersion> keys = new ArrayList<>(); - ObjectListing objects = s3.listObjects(request); - statistics.incrementReadOps(1); + ObjectListing objects = listObjects(request); + List<DeleteObjectsRequest.KeyVersion> keys = + new ArrayList<>(objects.getObjectSummaries().size()); while (true) { for (S3ObjectSummary summary : objects.getObjectSummaries()) { keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey())); @@ -969,8 +1193,7 @@ public class S3AFileSystem extends FileSystem { } if (objects.isTruncated()) { - objects = s3.listNextBatchOfObjects(objects); - statistics.incrementReadOps(1); + objects = continueListObjects(objects); } else { if (!keys.isEmpty()) { removeKeys(keys, false); @@ -981,13 +1204,11 @@ public class S3AFileSystem extends FileSystem { } } else { LOG.debug("delete: Path is a file"); - s3.deleteObject(bucket, key); instrumentation.fileDeleted(1); - statistics.incrementWriteOps(1); + deleteObject(key); } createFakeDirectoryIfNecessary(f.getParent()); - return true; } @@ -996,7 +1217,7 @@ public class S3AFileSystem extends FileSystem { String key = pathToKey(f); if (!key.isEmpty() && !exists(f)) { LOG.debug("Creating new fake directory at {}", f); - createFakeDirectory(bucket, key); + createFakeDirectory(key); } } @@ -1032,6 +1253,7 @@ public class S3AFileSystem extends FileSystem { IOException, AmazonClientException { String key = pathToKey(f); LOG.debug("List status for path: {}", f); + incrementStatistic(INVOCATION_LIST_STATUS); final List<FileStatus> result = new ArrayList<FileStatus>(); final FileStatus fileStatus = getFileStatus(f); @@ -1049,8 +1271,7 @@ public class S3AFileSystem extends FileSystem { LOG.debug("listStatus: doing listObjects for directory {}", key); - ObjectListing objects = s3.listObjects(request); - statistics.incrementReadOps(1); + ObjectListing objects = listObjects(request); Path fQualified = f.makeQualified(uri, workingDir); @@ -1061,33 +1282,25 @@ public class S3AFileSystem extends FileSystem { if (keyPath.equals(fQualified) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) { LOG.debug("Ignoring: {}", keyPath); - continue; - } - - if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) { - result.add(new S3AFileStatus(true, true, keyPath)); - LOG.debug("Adding: fd: {}", keyPath); } else { - result.add(new S3AFileStatus(summary.getSize(), - dateToLong(summary.getLastModified()), keyPath, - getDefaultBlockSize(fQualified))); - LOG.debug("Adding: fi: {}", keyPath); + S3AFileStatus status = createFileStatus(keyPath, summary, + getDefaultBlockSize(keyPath)); + result.add(status); + LOG.debug("Adding: {}", status); } } for (String prefix : objects.getCommonPrefixes()) { Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir); - if (keyPath.equals(f)) { - continue; + if (!keyPath.equals(f)) { + result.add(new S3AFileStatus(true, false, keyPath)); + LOG.debug("Adding: rd: {}", keyPath); } - result.add(new S3AFileStatus(true, false, keyPath)); - LOG.debug("Adding: rd: {}", keyPath); } if (objects.isTruncated()) { LOG.debug("listStatus: list truncated - getting next batch"); - objects = s3.listNextBatchOfObjects(objects); - statistics.incrementReadOps(1); + objects = continueListObjects(objects); } else { break; } @@ -1100,8 +1313,6 @@ public class S3AFileSystem extends FileSystem { return result.toArray(new FileStatus[result.size()]); } - - /** * Set the current working directory for the given file system. All relative * paths will be resolved relative to it. @@ -1123,7 +1334,7 @@ public class S3AFileSystem extends FileSystem { /** * * Make the given path and all non-existent parents into - * directories. Has the semantics of Unix @{code 'mkdir -p'}. + * directories. Has the semantics of Unix {@code 'mkdir -p'}. * Existence of the directory hierarchy is not an error. * @param path path to create * @param permission to apply to f @@ -1158,7 +1369,7 @@ public class S3AFileSystem extends FileSystem { private boolean innerMkdirs(Path f, FsPermission permission) throws IOException, FileAlreadyExistsException, AmazonClientException { LOG.debug("Making directory: {}", f); - + incrementStatistic(INVOCATION_MKDIRS); try { FileStatus fileStatus = getFileStatus(f); @@ -1187,7 +1398,7 @@ public class S3AFileSystem extends FileSystem { } while (fPart != null); String key = pathToKey(f); - createFakeDirectory(bucket, key); + createFakeDirectory(key); return true; } } @@ -1201,12 +1412,12 @@ public class S3AFileSystem extends FileSystem { */ public S3AFileStatus getFileStatus(Path f) throws IOException { String key = pathToKey(f); + incrementStatistic(INVOCATION_GET_FILE_STATUS); LOG.debug("Getting path status for {} ({})", f , key); if (!key.isEmpty()) { try { - ObjectMetadata meta = s3.getObjectMetadata(bucket, key); - statistics.incrementReadOps(1); + ObjectMetadata meta = getObjectMetadata(key); if (objectRepresentsDirectory(key, meta.getContentLength())) { LOG.debug("Found exact file: fake directory"); @@ -1231,8 +1442,7 @@ public class S3AFileSystem extends FileSystem { if (!key.endsWith("/")) { String newKey = key + "/"; try { - ObjectMetadata meta = s3.getObjectMetadata(bucket, newKey); - statistics.incrementReadOps(1); + ObjectMetadata meta = getObjectMetadata(newKey); if (objectRepresentsDirectory(newKey, meta.getContentLength())) { LOG.debug("Found file (with /): fake directory"); @@ -1265,8 +1475,7 @@ public class S3AFileSystem extends FileSystem { request.setDelimiter("/"); request.setMaxKeys(1); - ObjectListing objects = s3.listObjects(request); - statistics.incrementReadOps(1); + ObjectListing objects = listObjects(request); if (!objects.getCommonPrefixes().isEmpty() || !objects.getObjectSummaries().isEmpty()) { @@ -1349,7 +1558,8 @@ public class S3AFileSystem extends FileSystem { private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst) throws IOException, FileAlreadyExistsException, AmazonClientException { - String key = pathToKey(dst); + incrementStatistic(INVOCATION_COPY_FROM_LOCAL_FILE); + final String key = pathToKey(dst); if (!overwrite && exists(dst)) { throw new FileAlreadyExistsException(dst + " already exists"); @@ -1360,35 +1570,19 @@ public class S3AFileSystem extends FileSystem { LocalFileSystem local = getLocal(getConf()); File srcfile = local.pathToFile(src); - final ObjectMetadata om = new ObjectMetadata(); - if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - om.setSSEAlgorithm(serverSideEncryptionAlgorithm); - } - PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, srcfile); - putObjectRequest.setCannedAcl(cannedACL); - putObjectRequest.setMetadata(om); - - ProgressListener progressListener = new ProgressListener() { - public void progressChanged(ProgressEvent progressEvent) { - switch (progressEvent.getEventType()) { - case TRANSFER_PART_COMPLETED_EVENT: - statistics.incrementWriteOps(1); - break; - default: - break; - } - } - }; - - statistics.incrementWriteOps(1); - Upload up = transfers.upload(putObjectRequest); - up.addProgressListener(progressListener); + final ObjectMetadata om = newObjectMetadata(); + PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile); + Upload up = putObject(putObjectRequest); + ProgressableProgressListener listener = new ProgressableProgressListener( + this, key, up, null); + up.addProgressListener(listener); try { up.waitForUploadResult(); } catch (InterruptedException e) { throw new InterruptedIOException("Interrupted copying " + src + " to " + dst + ", cancelling"); } + listener.uploadCompleted(); // This will delete unnecessary fake parent directories finishedWrite(key); @@ -1437,7 +1631,7 @@ public class S3AFileSystem extends FileSystem { LOG.debug("copyFile {} -> {} ", srcKey, dstKey); try { - ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey); + ObjectMetadata srcom = getObjectMetadata(srcKey); ObjectMetadata dstom = cloneObjectMetadata(srcom); if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm); @@ -1451,7 +1645,7 @@ public class S3AFileSystem extends FileSystem { public void progressChanged(ProgressEvent progressEvent) { switch (progressEvent.getEventType()) { case TRANSFER_PART_COMPLETED_EVENT: - statistics.incrementWriteOps(1); + incrementWriteOperations(); break; default: break; @@ -1463,7 +1657,7 @@ public class S3AFileSystem extends FileSystem { copy.addProgressListener(progressListener); try { copy.waitForCopyResult(); - statistics.incrementWriteOps(1); + incrementWriteOperations(); instrumentation.filesCopied(1, size); } catch (InterruptedException e) { throw new InterruptedIOException("Interrupted copying " + srcKey @@ -1475,26 +1669,12 @@ public class S3AFileSystem extends FileSystem { } } - private boolean objectRepresentsDirectory(final String name, final long size) { - return !name.isEmpty() - && name.charAt(name.length() - 1) == '/' - && size == 0L; - } - - // Handles null Dates that can be returned by AWS - private static long dateToLong(final Date date) { - if (date == null) { - return 0L; - } - - return date.getTime(); - } - /** * Perform post-write actions. * @param key key written to */ public void finishedWrite(String key) { + LOG.debug("Finished write to {}", key); deleteUnnecessaryFakeDirectories(keyToPath(key).getParent()); } @@ -1516,8 +1696,7 @@ public class S3AFileSystem extends FileSystem { if (status.isDirectory() && status.isEmptyDirectory()) { LOG.debug("Deleting fake directory {}/", key); - s3.deleteObject(bucket, key + "/"); - statistics.incrementWriteOps(1); + deleteObject(key + "/"); } } catch (IOException | AmazonClientException e) { LOG.debug("While deleting key {} ", key, e); @@ -1533,18 +1712,20 @@ public class S3AFileSystem extends FileSystem { } - private void createFakeDirectory(final String bucketName, final String objectName) - throws AmazonClientException, AmazonServiceException { + private void createFakeDirectory(final String objectName) + throws AmazonClientException, AmazonServiceException, + InterruptedIOException { if (!objectName.endsWith("/")) { - createEmptyObject(bucketName, objectName + "/"); + createEmptyObject(objectName + "/"); } else { - createEmptyObject(bucketName, objectName); + createEmptyObject(objectName); } } // Used to create an empty file that represents an empty directory - private void createEmptyObject(final String bucketName, final String objectName) - throws AmazonClientException, AmazonServiceException { + private void createEmptyObject(final String objectName) + throws AmazonClientException, AmazonServiceException, + InterruptedIOException { final InputStream im = new InputStream() { @Override public int read() throws IOException { @@ -1552,16 +1733,16 @@ public class S3AFileSystem extends FileSystem { } }; - final ObjectMetadata om = new ObjectMetadata(); - om.setContentLength(0L); - if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - om.setSSEAlgorithm(serverSideEncryptionAlgorithm); + PutObjectRequest putObjectRequest = newPutObjectRequest(objectName, + newObjectMetadata(0L), + im); + Upload upload = putObject(putObjectRequest); + try { + upload.waitForUploadResult(); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted creating " + objectName); } - PutObjectRequest putObjectRequest = - new PutObjectRequest(bucketName, objectName, im, om); - putObjectRequest.setCannedAcl(cannedACL); - s3.putObject(putObjectRequest); - statistics.incrementWriteOps(1); + incrementPutProgressStatistics(objectName, 0); instrumentation.directoryCreated(); } @@ -1576,10 +1757,7 @@ public class S3AFileSystem extends FileSystem { // This approach may be too brittle, especially if // in future there are new attributes added to ObjectMetadata // that we do not explicitly call to set here - ObjectMetadata ret = new ObjectMetadata(); - - // Non null attributes - ret.setContentLength(source.getContentLength()); + ObjectMetadata ret = newObjectMetadata(source.getContentLength()); // Possibly null attributes // Allowing nulls to pass breaks it during later use @@ -1689,6 +1867,75 @@ public class S3AFileSystem extends FileSystem { } /** + * Override superclass so as to add statistic collection. + * {@inheritDoc} + */ + @Override + public FileStatus[] globStatus(Path pathPattern) throws IOException { + incrementStatistic(INVOCATION_GLOB_STATUS); + return super.globStatus(pathPattern); + } + + /** + * Override superclass so as to add statistic collection. + * {@inheritDoc} + */ + @Override + public FileStatus[] globStatus(Path pathPattern, PathFilter filter) + throws IOException { + incrementStatistic(INVOCATION_GLOB_STATUS); + return super.globStatus(pathPattern, filter); + } + + /** + * Override superclass so as to add statistic collection. + * {@inheritDoc} + */ + @Override + public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) + throws FileNotFoundException, IOException { + incrementStatistic(INVOCATION_LIST_LOCATED_STATUS); + return super.listLocatedStatus(f); + } + + @Override + public RemoteIterator<LocatedFileStatus> listFiles(Path f, + boolean recursive) throws FileNotFoundException, IOException { + incrementStatistic(INVOCATION_LIST_FILES); + return super.listFiles(f, recursive); + } + + /** + * Override superclass so as to add statistic collection. + * {@inheritDoc} + */ + @Override + public boolean exists(Path f) throws IOException { + incrementStatistic(INVOCATION_EXISTS); + return super.exists(f); + } + + /** + * Override superclass so as to add statistic collection. + * {@inheritDoc} + */ + @Override + public boolean isDirectory(Path f) throws IOException { + incrementStatistic(INVOCATION_IS_DIRECTORY); + return super.isDirectory(f); + } + + /** + * Override superclass so as to add statistic collection. + * {@inheritDoc} + */ + @Override + public boolean isFile(Path f) throws IOException { + incrementStatistic(INVOCATION_IS_FILE); + return super.isFile(f); + } + + /** * Get a integer option >= the minimum allowed value. * @param conf configuration * @param key key to look up http://git-wip-us.apache.org/repos/asf/hadoop/blob/043a0c2e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 285f228..8892f0e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.s3a; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.metrics2.MetricStringBuilder; @@ -26,49 +27,30 @@ import org.apache.hadoop.metrics2.lib.Interns; import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.metrics2.lib.MutableMetric; import java.net.URI; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import static org.apache.hadoop.fs.s3a.Statistic.*; + /** * Instrumentation of S3a. - * Derived from the {@code AzureFileSystemInstrumentation} + * Derived from the {@code AzureFileSystemInstrumentation}. + * + * Counters and metrics are generally addressed in code by their name or + * {@link Statistic} key. There <i>may</i> be some Statistics which do + * not have an entry here. To avoid attempts to access such counters failing, + * the operations to increment/query metric values are designed to handle + * lookup failures. */ @Metrics(about = "Metrics for S3a", context = "S3AFileSystem") @InterfaceAudience.Private @InterfaceStability.Evolving public class S3AInstrumentation { public static final String CONTEXT = "S3AFileSystem"; - - public static final String STREAM_OPENED = "streamOpened"; - public static final String STREAM_CLOSE_OPERATIONS = "streamCloseOperations"; - public static final String STREAM_CLOSED = "streamClosed"; - public static final String STREAM_ABORTED = "streamAborted"; - public static final String STREAM_READ_EXCEPTIONS = "streamReadExceptions"; - public static final String STREAM_SEEK_OPERATIONS = "streamSeekOperations"; - public static final String STREAM_FORWARD_SEEK_OPERATIONS - = "streamForwardSeekOperations"; - public static final String STREAM_BACKWARD_SEEK_OPERATIONS - = "streamBackwardSeekOperations"; - public static final String STREAM_SEEK_BYTES_SKIPPED = - "streamBytesSkippedOnSeek"; - public static final String STREAM_SEEK_BYTES_BACKWARDS = - "streamBytesBackwardsOnSeek"; - public static final String STREAM_SEEK_BYTES_READ = "streamBytesRead"; - public static final String STREAM_READ_OPERATIONS = "streamReadOperations"; - public static final String STREAM_READ_FULLY_OPERATIONS - = "streamReadFullyOperations"; - public static final String STREAM_READ_OPERATIONS_INCOMPLETE - = "streamReadOperationsIncomplete"; - public static final String FILES_CREATED = "files_created"; - public static final String FILES_COPIED = "files_copied"; - public static final String FILES_COPIED_BYTES = "files_copied_bytes"; - public static final String FILES_DELETED = "files_deleted"; - public static final String DIRECTORIES_CREATED = "directories_created"; - public static final String DIRECTORIES_DELETED = "directories_deleted"; - public static final String IGNORED_ERRORS = "ignored_errors"; private final MetricsRegistry registry = new MetricsRegistry("S3AFileSystem").setContext(CONTEXT); private final MutableCounterLong streamOpenOperations; @@ -95,6 +77,27 @@ public class S3AInstrumentation { private final MutableCounterLong numberOfDirectoriesDeleted; private final Map<String, MutableCounterLong> streamMetrics = new HashMap<>(); + private static final Statistic[] COUNTERS_TO_CREATE = { + INVOCATION_COPY_FROM_LOCAL_FILE, + INVOCATION_EXISTS, + INVOCATION_GET_FILE_STATUS, + INVOCATION_GLOB_STATUS, + INVOCATION_IS_DIRECTORY, + INVOCATION_IS_FILE, + INVOCATION_LIST_FILES, + INVOCATION_LIST_LOCATED_STATUS, + INVOCATION_LIST_STATUS, + INVOCATION_MKDIRS, + INVOCATION_RENAME, + OBJECT_COPY_REQUESTS, + OBJECT_DELETE_REQUESTS, + OBJECT_LIST_REQUESTS, + OBJECT_METADATA_REQUESTS, + OBJECT_MULTIPART_UPLOAD_ABORTED, + OBJECT_PUT_BYTES, + OBJECT_PUT_REQUESTS + }; + public S3AInstrumentation(URI name) { UUID fileSystemInstanceId = UUID.randomUUID(); registry.tag("FileSystemId", @@ -103,50 +106,35 @@ public class S3AInstrumentation { registry.tag("fsURI", "URI of this filesystem", name.toString()); - streamOpenOperations = streamCounter(STREAM_OPENED, - "Total count of times an input stream to object store was opened"); - streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS, - "Total count of times an attempt to close a data stream was made"); - streamClosed = streamCounter(STREAM_CLOSED, - "Count of times the TCP stream was closed"); - streamAborted = streamCounter(STREAM_ABORTED, - "Count of times the TCP stream was aborted"); - streamSeekOperations = streamCounter(STREAM_SEEK_OPERATIONS, - "Number of seek operations invoked on input streams"); - streamReadExceptions = streamCounter(STREAM_READ_EXCEPTIONS, - "Number of read exceptions caught and attempted to recovered from"); - streamForwardSeekOperations = streamCounter(STREAM_FORWARD_SEEK_OPERATIONS, - "Number of executed seek operations which went forward in a stream"); - streamBackwardSeekOperations = streamCounter( - STREAM_BACKWARD_SEEK_OPERATIONS, - "Number of executed seek operations which went backwards in a stream"); - streamBytesSkippedOnSeek = streamCounter(STREAM_SEEK_BYTES_SKIPPED, - "Count of bytes skipped during forward seek operations"); - streamBytesBackwardsOnSeek = streamCounter(STREAM_SEEK_BYTES_BACKWARDS, - "Count of bytes moved backwards during seek operations"); - streamBytesRead = streamCounter(STREAM_SEEK_BYTES_READ, - "Count of bytes read during seek() in stream operations"); - streamReadOperations = streamCounter(STREAM_READ_OPERATIONS, - "Count of read() operations in streams"); - streamReadFullyOperations = streamCounter(STREAM_READ_FULLY_OPERATIONS, - "Count of readFully() operations in streams"); - streamReadsIncomplete = streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE, - "Count of incomplete read() operations in streams"); - - numberOfFilesCreated = counter(FILES_CREATED, - "Total number of files created through the object store."); - numberOfFilesCopied = counter(FILES_COPIED, - "Total number of files copied within the object store."); - bytesOfFilesCopied = counter(FILES_COPIED_BYTES, - "Total number of bytes copied within the object store."); - numberOfFilesDeleted = counter(FILES_DELETED, - "Total number of files deleted through from the object store."); - numberOfDirectoriesCreated = counter(DIRECTORIES_CREATED, - "Total number of directories created through the object store."); - numberOfDirectoriesDeleted = counter(DIRECTORIES_DELETED, - "Total number of directories deleted through the object store."); - ignoredErrors = counter(IGNORED_ERRORS, - "Total number of errors caught and ingored."); + streamOpenOperations = streamCounter(STREAM_OPENED); + streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS); + streamClosed = streamCounter(STREAM_CLOSED); + streamAborted = streamCounter(STREAM_ABORTED); + streamSeekOperations = streamCounter(STREAM_SEEK_OPERATIONS); + streamReadExceptions = streamCounter(STREAM_READ_EXCEPTIONS); + streamForwardSeekOperations = + streamCounter(STREAM_FORWARD_SEEK_OPERATIONS); + streamBackwardSeekOperations = + streamCounter(STREAM_BACKWARD_SEEK_OPERATIONS); + streamBytesSkippedOnSeek = streamCounter(STREAM_SEEK_BYTES_SKIPPED); + streamBytesBackwardsOnSeek = + streamCounter(STREAM_SEEK_BYTES_BACKWARDS); + streamBytesRead = streamCounter(STREAM_SEEK_BYTES_READ); + streamReadOperations = streamCounter(STREAM_READ_OPERATIONS); + streamReadFullyOperations = + streamCounter(STREAM_READ_FULLY_OPERATIONS); + streamReadsIncomplete = + streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE); + numberOfFilesCreated = counter(FILES_CREATED); + numberOfFilesCopied = counter(FILES_COPIED); + bytesOfFilesCopied = counter(FILES_COPIED_BYTES); + numberOfFilesDeleted = counter(FILES_DELETED); + numberOfDirectoriesCreated = counter(DIRECTORIES_CREATED); + numberOfDirectoriesDeleted = counter(DIRECTORIES_DELETED); + ignoredErrors = counter(IGNORED_ERRORS); + for (Statistic statistic : COUNTERS_TO_CREATE) { + counter(statistic); + } } /** @@ -174,6 +162,25 @@ public class S3AInstrumentation { } /** + * Create a counter in the registry. + * @param op statistic to count + * @return a new counter + */ + protected final MutableCounterLong counter(Statistic op) { + return counter(op.getSymbol(), op.getDescription()); + } + + /** + * Create a counter in the stream map: these are unregistered in the public + * metrics. + * @param op statistic to count + * @return a new counter + */ + protected final MutableCounterLong streamCounter(Statistic op) { + return streamCounter(op.getSymbol(), op.getDescription()); + } + + /** * Create a gauge in the registry. * @param name name gauge name * @param desc description @@ -216,6 +223,58 @@ public class S3AInstrumentation { } /** + * Get the value of a counter. + * @param statistic the operation + * @return its value, or 0 if not found. + */ + public long getCounterValue(Statistic statistic) { + return getCounterValue(statistic.getSymbol()); + } + + /** + * Get the value of a counter. + * If the counter is null, return 0. + * @param name the name of the counter + * @return its value. + */ + public long getCounterValue(String name) { + MutableCounterLong counter = lookupCounter(name); + return counter == null ? 0 : counter.value(); + } + + /** + * Lookup a counter by name. Return null if it is not known. + * @param name counter name + * @return the counter + */ + private MutableCounterLong lookupCounter(String name) { + MutableMetric metric = lookupMetric(name); + if (metric == null) { + return null; + } + Preconditions.checkNotNull(metric, "not found: " + name); + if (!(metric instanceof MutableCounterLong)) { + throw new IllegalStateException("Metric " + name + + " is not a MutableCounterLong: " + metric); + } + return (MutableCounterLong) metric; + } + + /** + * Look up a metric from both the registered set and the lighter weight + * stream entries. + * @param name metric name + * @return the metric or null + */ + public MutableMetric lookupMetric(String name) { + MutableMetric metric = getRegistry().get(name); + if (metric == null) { + metric = streamMetrics.get(name); + } + return metric; + } + + /** * Indicate that S3A created a file. */ public void fileCreated() { @@ -263,6 +322,19 @@ public class S3AInstrumentation { } /** + * Increment a specific counter. + * No-op if not defined. + * @param op operation + * @param count increment value + */ + public void incrementCounter(Statistic op, long count) { + MutableCounterLong counter = lookupCounter(op.getSymbol()); + if (counter != null) { + counter.incr(count); + } + } + + /** * Create a stream input statistics instance. * @return the new instance */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/043a0c2e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java index 593e9e8..23ba682 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java @@ -19,19 +19,11 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.AmazonClientException; -import com.amazonaws.event.ProgressEvent; -import com.amazonaws.event.ProgressEventType; -import com.amazonaws.event.ProgressListener; -import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.Upload; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.util.Progressable; @@ -44,8 +36,6 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; -import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT; -import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3AUtils.*; @@ -59,32 +49,20 @@ public class S3AOutputStream extends OutputStream { private File backupFile; private boolean closed; private String key; - private String bucket; - private TransferManager transfers; private Progressable progress; private long partSize; private long partSizeThreshold; private S3AFileSystem fs; - private CannedAccessControlList cannedACL; - private FileSystem.Statistics statistics; private LocalDirAllocator lDirAlloc; - private String serverSideEncryptionAlgorithm; public static final Logger LOG = S3AFileSystem.LOG; - public S3AOutputStream(Configuration conf, TransferManager transfers, - S3AFileSystem fs, String bucket, String key, Progressable progress, - CannedAccessControlList cannedACL, FileSystem.Statistics statistics, - String serverSideEncryptionAlgorithm) + public S3AOutputStream(Configuration conf, + S3AFileSystem fs, String key, Progressable progress) throws IOException { - this.bucket = bucket; this.key = key; - this.transfers = transfers; this.progress = progress; this.fs = fs; - this.cannedACL = cannedACL; - this.statistics = statistics; - this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm; partSize = fs.getPartitionSize(); partSizeThreshold = fs.getMultiPartThreshold(); @@ -124,30 +102,18 @@ public class S3AOutputStream extends OutputStream { try { - final ObjectMetadata om = new ObjectMetadata(); - if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - om.setSSEAlgorithm(serverSideEncryptionAlgorithm); - } - PutObjectRequest putObjectRequest = - new PutObjectRequest(bucket, key, backupFile); - putObjectRequest.setCannedAcl(cannedACL); - putObjectRequest.setMetadata(om); - - Upload upload = transfers.upload(putObjectRequest); - - ProgressableProgressListener listener = - new ProgressableProgressListener(upload, progress, statistics); + final ObjectMetadata om = fs.newObjectMetadata(); + Upload upload = fs.putObject( + fs.newPutObjectRequest( + key, + om, + backupFile)); + ProgressableProgressListener listener = + new ProgressableProgressListener(fs, key, upload, progress); upload.addProgressListener(listener); upload.waitForUploadResult(); - - long delta = upload.getProgress().getBytesTransferred() - - listener.getLastBytesTransferred(); - if (statistics != null && delta != 0) { - LOG.debug("S3A write delta changed after finished: {} bytes", delta); - statistics.incrementBytesWritten(delta); - } - + listener.uploadCompleted(); // This will delete unnecessary fake parent directories fs.finishedWrite(key); } catch (InterruptedException e) { @@ -175,46 +141,4 @@ public class S3AOutputStream extends OutputStream { backupStream.write(b, off, len); } - /** - * Listener to progress from AWS regarding transfers. - */ - public static class ProgressableProgressListener implements ProgressListener { - private Progressable progress; - private FileSystem.Statistics statistics; - private long lastBytesTransferred; - private Upload upload; - - public ProgressableProgressListener(Upload upload, Progressable progress, - FileSystem.Statistics statistics) { - this.upload = upload; - this.progress = progress; - this.statistics = statistics; - this.lastBytesTransferred = 0; - } - - public void progressChanged(ProgressEvent progressEvent) { - if (progress != null) { - progress.progress(); - } - - // There are 3 http ops here, but this should be close enough for now - ProgressEventType pet = progressEvent.getEventType(); - if (pet == TRANSFER_PART_STARTED_EVENT || - pet == TRANSFER_COMPLETED_EVENT) { - statistics.incrementWriteOps(1); - } - - long transferred = upload.getProgress().getBytesTransferred(); - long delta = transferred - lastBytesTransferred; - if (statistics != null && delta != 0) { - statistics.incrementBytesWritten(delta); - } - - lastBytesTransferred = transferred; - } - - public long getLastBytesTransferred() { - return lastBytesTransferred; - } - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/043a0c2e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java new file mode 100644 index 0000000..f69159a --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.StorageStatistics; +import org.slf4j.Logger; + +import java.util.Collections; +import java.util.EnumMap; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Storage statistics for S3A. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class S3AStorageStatistics extends StorageStatistics { + private static final Logger LOG = S3AFileSystem.LOG; + + public static final String NAME = "S3AStorageStatistics"; + private final Map<Statistic, AtomicLong> opsCount = + new EnumMap<>(Statistic.class); + + public S3AStorageStatistics() { + super(NAME); + for (Statistic opType : Statistic.values()) { + opsCount.put(opType, new AtomicLong(0)); + } + } + + /** + * Increment a specific counter. + * @param op operation + * @param count increment value + * @return the new value + */ + public long incrementCounter(Statistic op, long count) { + long updated = opsCount.get(op).addAndGet(count); + LOG.debug("{} += {} -> {}", op, count, updated); + return updated; + } + + private class LongIterator implements Iterator<LongStatistic> { + private Iterator<Map.Entry<Statistic, AtomicLong>> iterator = + Collections.unmodifiableSet(opsCount.entrySet()).iterator(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public LongStatistic next() { + if (!iterator.hasNext()) { + throw new NoSuchElementException(); + } + final Map.Entry<Statistic, AtomicLong> entry = iterator.next(); + return new LongStatistic(entry.getKey().name(), entry.getValue().get()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + @Override + public Iterator<LongStatistic> getLongStatistics() { + return new LongIterator(); + } + + @Override + public Long getLong(String key) { + final Statistic type = Statistic.fromSymbol(key); + return type == null ? null : opsCount.get(type).get(); + } + + @Override + public boolean isTracked(String key) { + return Statistic.fromSymbol(key) == null; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/043a0c2e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 12d14e2..062fca4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.S3ObjectSummary; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.Path; @@ -29,6 +30,7 @@ import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.AccessDeniedException; +import java.util.Date; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -186,4 +188,50 @@ public final class S3AUtils { } return builder.toString(); } + + /** + * Create a files status instance from a listing. + * @param keyPath path to entry + * @param summary summary from AWS + * @param blockSize block size to declare. + * @return a status entry + */ + public static S3AFileStatus createFileStatus(Path keyPath, + S3ObjectSummary summary, + long blockSize) { + if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) { + return new S3AFileStatus(true, true, keyPath); + } else { + return new S3AFileStatus(summary.getSize(), + dateToLong(summary.getLastModified()), keyPath, + blockSize); + } + } + + /** + * Predicate: does the object represent a directory?. + * @param name object name + * @param size object size + * @return true if it meets the criteria for being an object + */ + public static boolean objectRepresentsDirectory(final String name, + final long size) { + return !name.isEmpty() + && name.charAt(name.length() - 1) == '/' + && size == 0L; + } + + /** + * Date to long conversion. + * Handles null Dates that can be returned by AWS by returning 0 + * @param date date from AWS query + * @return timestamp of the object + */ + public static long dateToLong(final Date date) { + if (date == null) { + return 0L; + } + + return date.getTime(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org