Repository: hadoop Updated Branches: refs/heads/branch-2 eab6e633e -> 1370dfc77 refs/heads/branch-2.8 9b8f7a8fb -> 8d74a5804 refs/heads/trunk 6b53802cb -> 27c4e90ef
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d74a580/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 42178a4..27557f8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -21,20 +21,50 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.google.common.base.Preconditions; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.CanSetReadahead; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem; - import org.slf4j.Logger; import java.io.EOFException; import java.io.IOException; -import java.net.SocketTimeoutException; -import java.net.SocketException; -public class S3AInputStream extends FSInputStream { +/** + * The input stream for an S3A object. + * + * As this stream seeks withing an object, it may close then re-open the stream. + * When this happens, any updated stream data may be retrieved, and, given + * the consistency model of Amazon S3, outdated data may in fact be picked up. + * + * As a result, the outcome of reading from a stream of an object which is + * actively manipulated during the read process is "undefined". + * + * The class is marked as private as code should not be creating instances + * themselves. Any extra feature (e.g instrumentation) should be considered + * unstable. + * + * Because it prints some of the state of the instrumentation, + * the output of {@link #toString()} must also be considered unstable. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class S3AInputStream extends FSInputStream implements CanSetReadahead { + /** + * This is the public position; the one set in {@link #seek(long)} + * and returned in {@link #getPos()}. + */ private long pos; - private boolean closed; + /** + * Closed bit. Volatile so reads are non-blocking. + * Updates must be in a synchronized block to guarantee an atomic check and + * set + */ + private volatile boolean closed; private S3ObjectInputStream wrappedStream; private final FileSystem.Statistics stats; private final AmazonS3Client client; @@ -44,62 +74,65 @@ public class S3AInputStream extends FSInputStream { private final String uri; public static final Logger LOG = S3AFileSystem.LOG; public static final long CLOSE_THRESHOLD = 4096; + private final S3AInstrumentation.InputStreamStatistics streamStatistics; + private long readahead; - // Used by lazy seek + /** + * This is the actual position within the object, used by + * lazy seek to decide whether to seek on the next read or not. + */ private long nextReadPos; - //Amount of data requested from the request + /* Amount of data desired from the request */ private long requestedStreamLen; - public S3AInputStream(String bucket, String key, long contentLength, - AmazonS3Client client, FileSystem.Statistics stats) { + public S3AInputStream(String bucket, + String key, + long contentLength, + AmazonS3Client client, + FileSystem.Statistics stats, + S3AInstrumentation instrumentation, + long readahead) { + Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "No Bucket"); + Preconditions.checkArgument(StringUtils.isNotEmpty(key), "No Key"); + Preconditions.checkArgument(contentLength >= 0 , "Negative content length"); this.bucket = bucket; this.key = key; this.contentLength = contentLength; this.client = client; this.stats = stats; - this.pos = 0; - this.nextReadPos = 0; - this.closed = false; - this.wrappedStream = null; this.uri = "s3a://" + this.bucket + "/" + this.key; + this.streamStatistics = instrumentation.newInputStreamStatistics(); + setReadahead(readahead); } /** * Opens up the stream at specified target position and for given length. * + * @param reason reason for reopen * @param targetPos target position * @param length length requested * @throws IOException */ - private synchronized void reopen(long targetPos, long length) + private synchronized void reopen(String reason, long targetPos, long length) throws IOException { - requestedStreamLen = (length < 0) ? this.contentLength : - Math.max(this.contentLength, (CLOSE_THRESHOLD + (targetPos + length))); + requestedStreamLen = this.contentLength; if (wrappedStream != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Closing the previous stream"); - } - closeStream(requestedStreamLen); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Requesting for " - + "targetPos=" + targetPos - + ", length=" + length - + ", requestedStreamLen=" + requestedStreamLen - + ", streamPosition=" + pos - + ", nextReadPosition=" + nextReadPos - ); + closeStream("reopen(" + reason + ")", requestedStreamLen); } + LOG.debug("reopen({}) for {} at targetPos={}, length={}," + + " requestedStreamLen={}, streamPosition={}, nextReadPosition={}", + uri, reason, targetPos, length, requestedStreamLen, pos, nextReadPos); + streamStatistics.streamOpened(); GetObjectRequest request = new GetObjectRequest(bucket, key) .withRange(targetPos, requestedStreamLen); wrappedStream = client.getObject(request).getObjectContent(); if (wrappedStream == null) { - throw new IOException("Null IO stream"); + throw new IOException("Null IO stream from reopen of (" + reason + ") " + + uri); } this.pos = targetPos; @@ -129,6 +162,20 @@ public class S3AInputStream extends FSInputStream { } /** + * Seek without raising any exception. This is for use in + * {@code finally} clauses + * @param positiveTargetPos a target position which must be positive. + */ + private void seekQuietly(long positiveTargetPos) { + try { + seek(positiveTargetPos); + } catch (IOException ioe) { + LOG.debug("Ignoring IOE on seek of {} to {}", + uri, positiveTargetPos, ioe); + } + } + + /** * Adjust the stream to a specific position. * * @param targetPos target seek position @@ -140,23 +187,50 @@ public class S3AInputStream extends FSInputStream { if (wrappedStream == null) { return; } - // compute how much more to skip long diff = targetPos - pos; - if (targetPos > pos) { - if ((diff + length) <= wrappedStream.available()) { - // already available in buffer - pos += wrappedStream.skip(diff); - if (pos != targetPos) { - throw new IOException("Failed to seek to " + targetPos - + ". Current position " + pos); + if (diff > 0) { + // forward seek -this is where data can be skipped + + int available = wrappedStream.available(); + // always seek at least as far as what is available + long forwardSeekRange = Math.max(readahead, available); + // work out how much is actually left in the stream + // then choose whichever comes first: the range or the EOF + long forwardSeekLimit = Math.min(remaining(), forwardSeekRange); + if (diff <= forwardSeekLimit) { + // the forward seek range is within the limits + LOG.debug("Forward seek on {}, of {} bytes", uri, diff); + streamStatistics.seekForwards(diff); + long skipped = wrappedStream.skip(diff); + if (skipped > 0) { + pos += skipped; + // as these bytes have been read, they are included in the counter + incrementBytesRead(diff); + } + + if (pos == targetPos) { + // all is well + return; + } else { + // log a warning; continue to attempt to re-open + LOG.warn("Failed to seek on {} to {}. Current position {}", + uri, targetPos, pos); } - return; } + } else if (diff < 0) { + // backwards seek + streamStatistics.seekBackwards(diff); + } else { + // targetPos == pos + // this should never happen as the caller filters it out. + // Retained just in case + LOG.debug("Ignoring seek {} to {} as target position == current", + uri, targetPos); } // close the stream; if read the object will be opened at the new pos - closeStream(this.requestedStreamLen); + closeStream("seekInStream()", this.requestedStreamLen); pos = targetPos; } @@ -179,7 +253,19 @@ public class S3AInputStream extends FSInputStream { //re-open at specific location if needed if (wrappedStream == null) { - reopen(targetPos, len); + reopen("read from new offset", targetPos, len); + } + } + + /** + * Increment the bytes read counter if there is a stats instance + * and the number of bytes read is more than zero. + * @param bytesRead number of bytes read + */ + private void incrementBytesRead(long bytesRead) { + streamStatistics.bytesRead(bytesRead); + if (stats != null && bytesRead > 0) { + stats.incrementBytesRead(bytesRead); } } @@ -195,13 +281,11 @@ public class S3AInputStream extends FSInputStream { int byteRead; try { byteRead = wrappedStream.read(); - } catch (SocketTimeoutException | SocketException e) { - LOG.info("Got exception while trying to read from stream," - + " trying to recover " + e); - reopen(pos, 1); - byteRead = wrappedStream.read(); } catch (EOFException e) { return -1; + } catch (IOException e) { + onReadFailure(e, 1); + byteRead = wrappedStream.read(); } if (byteRead >= 0) { @@ -209,12 +293,36 @@ public class S3AInputStream extends FSInputStream { nextReadPos++; } - if (stats != null && byteRead >= 0) { - stats.incrementBytesRead(1); + if (byteRead >= 0) { + incrementBytesRead(1); } return byteRead; } + /** + * Handle an IOE on a read by attempting to re-open the stream. + * The filesystem's readException count will be incremented. + * @param ioe exception caught. + * @param length length of data being attempted to read + * @throws IOException any exception thrown on the re-open attempt. + */ + private void onReadFailure(IOException ioe, int length) throws IOException { + LOG.info("Got exception while trying to read from stream {}" + + " trying to recover: "+ ioe, uri); + LOG.debug("While trying to read from stream {}", uri, ioe); + streamStatistics.readException(); + reopen("failure recovery", pos, length); + } + + /** + * {@inheritDoc} + * + * This updates the statistics on read operations started and whether + * or not the read operation "completed", that is: returned the exact + * number of bytes requested. + * @throws EOFException if there is no more data + * @throws IOException if there are other problems + */ @Override public synchronized int read(byte[] buf, int off, int len) throws IOException { @@ -230,61 +338,85 @@ public class S3AInputStream extends FSInputStream { } lazySeek(nextReadPos, len); + streamStatistics.readOperationStarted(nextReadPos, len); - int byteRead; + int bytesRead; try { - byteRead = wrappedStream.read(buf, off, len); - } catch (SocketTimeoutException | SocketException e) { - LOG.info("Got exception while trying to read from stream," - + " trying to recover " + e); - reopen(pos, len); - byteRead = wrappedStream.read(buf, off, len); - } - - if (byteRead > 0) { - pos += byteRead; - nextReadPos += byteRead; + bytesRead = wrappedStream.read(buf, off, len); + } catch (EOFException e) { + throw e; + } catch (IOException e) { + onReadFailure(e, len); + bytesRead = wrappedStream.read(buf, off, len); } - if (stats != null && byteRead > 0) { - stats.incrementBytesRead(byteRead); + if (bytesRead > 0) { + pos += bytesRead; + nextReadPos += bytesRead; } - - return byteRead; + incrementBytesRead(bytesRead); + streamStatistics.readOperationCompleted(len, bytesRead); + return bytesRead; } + /** + * Verify that the input stream is open. Non blocking; this gives + * the last state of the volatile {@link #closed} field. + * @throws IOException if the connection is closed. + */ private void checkNotClosed() throws IOException { if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + throw new IOException(uri + ": " + FSExceptionMessages.STREAM_IS_CLOSED); } } + /** + * Close the stream. + * This triggers publishing of the stream statistics back to the filesystem + * statistics. + * This operation is synchronized, so that only one thread can attempt to + * close the connection; all later/blocked calls are no-ops. + * @throws IOException on any problem + */ @Override public synchronized void close() throws IOException { - super.close(); - closed = true; - closeStream(this.contentLength); + if (!closed) { + closed = true; + try { + // close or abort the stream + closeStream("close() operation", this.contentLength); + // this is actually a no-op + super.close(); + } finally { + // merge the statistics back into the FS statistics. + streamStatistics.close(); + } + } } /** * Close a stream: decide whether to abort or close, based on * the length of the stream and the current position. + * If a close() is attempted and fails, the operation escalates to + * an abort. * * This does not set the {@link #closed} flag. + * + * @param reason reason for stream being closed; used in messages * @param length length of the stream. - * @throws IOException */ - private void closeStream(long length) throws IOException { + private void closeStream(String reason, long length) { if (wrappedStream != null) { - String reason = null; boolean shouldAbort = length - pos > CLOSE_THRESHOLD; if (!shouldAbort) { try { - reason = "Closed stream"; + // clean close. This will read to the end of the stream, + // so, while cleaner, can be pathological on a multi-GB object wrappedStream.close(); + streamStatistics.streamClose(false); } catch (IOException e) { // exception escalates to an abort - LOG.debug("When closing stream", e); + LOG.debug("When closing {} stream for {}", uri, reason, e); shouldAbort = true; } } @@ -292,13 +424,12 @@ public class S3AInputStream extends FSInputStream { // Abort, rather than just close, the underlying stream. Otherwise, the // remaining object payload is read from S3 while closing the stream. wrappedStream.abort(); - reason = "Closed stream with abort"; - } - if (LOG.isDebugEnabled()) { - LOG.debug(reason + "; streamPos=" + pos - + ", nextReadPos=" + nextReadPos - + ", contentLength=" + length); + streamStatistics.streamClose(true); } + LOG.debug("Stream {} {}: {}; streamPos={}, nextReadPos={}," + + " length={}", + uri, (shouldAbort ? "aborted":"closed"), reason, pos, nextReadPos, + length); wrappedStream = null; } } @@ -307,19 +438,34 @@ public class S3AInputStream extends FSInputStream { public synchronized int available() throws IOException { checkNotClosed(); - long remaining = this.contentLength - this.pos; + long remaining = remaining(); if (remaining > Integer.MAX_VALUE) { return Integer.MAX_VALUE; } return (int)remaining; } + /** + * Bytes left in stream. + * @return how many bytes are left to read + */ + protected long remaining() { + return this.contentLength - this.pos; + } + @Override public boolean markSupported() { return false; } + /** + * String value includes statistics as well as stream state. + * <b>Important: there are no guarantees as to the stability + * of this value.</b> + * @return a string value for printing in logs/diagnostics + */ @Override + @InterfaceStability.Unstable public String toString() { final StringBuilder sb = new StringBuilder( "S3AInputStream{"); @@ -327,6 +473,7 @@ public class S3AInputStream extends FSInputStream { sb.append(" pos=").append(pos); sb.append(" nextReadPos=").append(nextReadPos); sb.append(" contentLength=").append(contentLength); + sb.append(" ").append(streamStatistics.toString()); sb.append('}'); return sb.toString(); } @@ -348,6 +495,7 @@ public class S3AInputStream extends FSInputStream { throws IOException { checkNotClosed(); validatePositionedReadArgs(position, buffer, offset, length); + streamStatistics.readFullyOperationStarted(position, length); if (length == 0) { return; } @@ -363,10 +511,38 @@ public class S3AInputStream extends FSInputStream { } nread += nbytes; } - } finally { - seek(oldPos); + seekQuietly(oldPos); } } } + + /** + * Access the input stream statistics. + * This is for internal testing and may be removed without warning. + * @return the statistics for this input stream + */ + @InterfaceAudience.Private + @InterfaceStability.Unstable + public S3AInstrumentation.InputStreamStatistics getS3AStreamStatistics() { + return streamStatistics; + } + + @Override + public void setReadahead(Long readahead) { + if (readahead == null) { + this.readahead = Constants.DEFAULT_READAHEAD_RANGE; + } else { + Preconditions.checkArgument(readahead >= 0, "Negative readahead value"); + this.readahead = readahead; + } + } + + /** + * Get the current readahead value. + * @return a non-negative readahead value + */ + public long getReadahead() { + return readahead; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d74a580/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java new file mode 100644 index 0000000..285f228 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics2.MetricStringBuilder; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.Interns; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +/** + * Instrumentation of S3a. + * Derived from the {@code AzureFileSystemInstrumentation} + */ +@Metrics(about = "Metrics for S3a", context = "S3AFileSystem") +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class S3AInstrumentation { + public static final String CONTEXT = "S3AFileSystem"; + + public static final String STREAM_OPENED = "streamOpened"; + public static final String STREAM_CLOSE_OPERATIONS = "streamCloseOperations"; + public static final String STREAM_CLOSED = "streamClosed"; + public static final String STREAM_ABORTED = "streamAborted"; + public static final String STREAM_READ_EXCEPTIONS = "streamReadExceptions"; + public static final String STREAM_SEEK_OPERATIONS = "streamSeekOperations"; + public static final String STREAM_FORWARD_SEEK_OPERATIONS + = "streamForwardSeekOperations"; + public static final String STREAM_BACKWARD_SEEK_OPERATIONS + = "streamBackwardSeekOperations"; + public static final String STREAM_SEEK_BYTES_SKIPPED = + "streamBytesSkippedOnSeek"; + public static final String STREAM_SEEK_BYTES_BACKWARDS = + "streamBytesBackwardsOnSeek"; + public static final String STREAM_SEEK_BYTES_READ = "streamBytesRead"; + public static final String STREAM_READ_OPERATIONS = "streamReadOperations"; + public static final String STREAM_READ_FULLY_OPERATIONS + = "streamReadFullyOperations"; + public static final String STREAM_READ_OPERATIONS_INCOMPLETE + = "streamReadOperationsIncomplete"; + public static final String FILES_CREATED = "files_created"; + public static final String FILES_COPIED = "files_copied"; + public static final String FILES_COPIED_BYTES = "files_copied_bytes"; + public static final String FILES_DELETED = "files_deleted"; + public static final String DIRECTORIES_CREATED = "directories_created"; + public static final String DIRECTORIES_DELETED = "directories_deleted"; + public static final String IGNORED_ERRORS = "ignored_errors"; + private final MetricsRegistry registry = + new MetricsRegistry("S3AFileSystem").setContext(CONTEXT); + private final MutableCounterLong streamOpenOperations; + private final MutableCounterLong streamCloseOperations; + private final MutableCounterLong streamClosed; + private final MutableCounterLong streamAborted; + private final MutableCounterLong streamSeekOperations; + private final MutableCounterLong streamReadExceptions; + private final MutableCounterLong streamForwardSeekOperations; + private final MutableCounterLong streamBackwardSeekOperations; + private final MutableCounterLong streamBytesSkippedOnSeek; + private final MutableCounterLong streamBytesBackwardsOnSeek; + private final MutableCounterLong streamBytesRead; + private final MutableCounterLong streamReadOperations; + private final MutableCounterLong streamReadFullyOperations; + private final MutableCounterLong streamReadsIncomplete; + private final MutableCounterLong ignoredErrors; + + private final MutableCounterLong numberOfFilesCreated; + private final MutableCounterLong numberOfFilesCopied; + private final MutableCounterLong bytesOfFilesCopied; + private final MutableCounterLong numberOfFilesDeleted; + private final MutableCounterLong numberOfDirectoriesCreated; + private final MutableCounterLong numberOfDirectoriesDeleted; + private final Map<String, MutableCounterLong> streamMetrics = new HashMap<>(); + + public S3AInstrumentation(URI name) { + UUID fileSystemInstanceId = UUID.randomUUID(); + registry.tag("FileSystemId", + "A unique identifier for the FS ", + fileSystemInstanceId.toString() + "-" + name.getHost()); + registry.tag("fsURI", + "URI of this filesystem", + name.toString()); + streamOpenOperations = streamCounter(STREAM_OPENED, + "Total count of times an input stream to object store was opened"); + streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS, + "Total count of times an attempt to close a data stream was made"); + streamClosed = streamCounter(STREAM_CLOSED, + "Count of times the TCP stream was closed"); + streamAborted = streamCounter(STREAM_ABORTED, + "Count of times the TCP stream was aborted"); + streamSeekOperations = streamCounter(STREAM_SEEK_OPERATIONS, + "Number of seek operations invoked on input streams"); + streamReadExceptions = streamCounter(STREAM_READ_EXCEPTIONS, + "Number of read exceptions caught and attempted to recovered from"); + streamForwardSeekOperations = streamCounter(STREAM_FORWARD_SEEK_OPERATIONS, + "Number of executed seek operations which went forward in a stream"); + streamBackwardSeekOperations = streamCounter( + STREAM_BACKWARD_SEEK_OPERATIONS, + "Number of executed seek operations which went backwards in a stream"); + streamBytesSkippedOnSeek = streamCounter(STREAM_SEEK_BYTES_SKIPPED, + "Count of bytes skipped during forward seek operations"); + streamBytesBackwardsOnSeek = streamCounter(STREAM_SEEK_BYTES_BACKWARDS, + "Count of bytes moved backwards during seek operations"); + streamBytesRead = streamCounter(STREAM_SEEK_BYTES_READ, + "Count of bytes read during seek() in stream operations"); + streamReadOperations = streamCounter(STREAM_READ_OPERATIONS, + "Count of read() operations in streams"); + streamReadFullyOperations = streamCounter(STREAM_READ_FULLY_OPERATIONS, + "Count of readFully() operations in streams"); + streamReadsIncomplete = streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE, + "Count of incomplete read() operations in streams"); + + numberOfFilesCreated = counter(FILES_CREATED, + "Total number of files created through the object store."); + numberOfFilesCopied = counter(FILES_COPIED, + "Total number of files copied within the object store."); + bytesOfFilesCopied = counter(FILES_COPIED_BYTES, + "Total number of bytes copied within the object store."); + numberOfFilesDeleted = counter(FILES_DELETED, + "Total number of files deleted through from the object store."); + numberOfDirectoriesCreated = counter(DIRECTORIES_CREATED, + "Total number of directories created through the object store."); + numberOfDirectoriesDeleted = counter(DIRECTORIES_DELETED, + "Total number of directories deleted through the object store."); + ignoredErrors = counter(IGNORED_ERRORS, + "Total number of errors caught and ingored."); + } + + /** + * Create a counter in the registry. + * @param name counter name + * @param desc counter description + * @return a new counter + */ + protected final MutableCounterLong counter(String name, String desc) { + return registry.newCounter(name, desc, 0L); + } + + /** + * Create a counter in the stream map: these are unregistered in the public + * metrics. + * @param name counter name + * @param desc counter description + * @return a new counter + */ + protected final MutableCounterLong streamCounter(String name, String desc) { + MutableCounterLong counter = new MutableCounterLong( + Interns.info(name, desc), 0L); + streamMetrics.put(name, counter); + return counter; + } + + /** + * Create a gauge in the registry. + * @param name name gauge name + * @param desc description + * @return the gauge + */ + protected final MutableGaugeLong gauge(String name, String desc) { + return registry.newGauge(name, desc, 0L); + } + + /** + * Get the metrics registry. + * @return the registry + */ + public MetricsRegistry getRegistry() { + return registry; + } + + /** + * Dump all the metrics to a string. + * @param prefix prefix before every entry + * @param separator separator between name and value + * @param suffix suffix + * @param all get all the metrics even if the values are not changed. + * @return a string dump of the metrics + */ + public String dump(String prefix, + String separator, + String suffix, + boolean all) { + MetricStringBuilder metricBuilder = new MetricStringBuilder(null, + prefix, + separator, suffix); + registry.snapshot(metricBuilder, all); + for (Map.Entry<String, MutableCounterLong> entry: + streamMetrics.entrySet()) { + metricBuilder.tuple(entry.getKey(), + Long.toString(entry.getValue().value())); + } + return metricBuilder.toString(); + } + + /** + * Indicate that S3A created a file. + */ + public void fileCreated() { + numberOfFilesCreated.incr(); + } + + /** + * Indicate that S3A deleted one or more file.s + * @param count number of files. + */ + public void fileDeleted(int count) { + numberOfFilesDeleted.incr(count); + } + + /** + * Indicate that S3A created a directory. + */ + public void directoryCreated() { + numberOfDirectoriesCreated.incr(); + } + + /** + * Indicate that S3A just deleted a directory. + */ + public void directoryDeleted() { + numberOfDirectoriesDeleted.incr(); + } + + /** + * Indicate that S3A copied some files within the store. + * + * @param files number of files + * @param size total size in bytes + */ + public void filesCopied(int files, long size) { + numberOfFilesCopied.incr(files); + bytesOfFilesCopied.incr(size); + } + + /** + * Note that an error was ignored. + */ + public void errorIgnored() { + ignoredErrors.incr(); + } + + /** + * Create a stream input statistics instance. + * @return the new instance + */ + InputStreamStatistics newInputStreamStatistics() { + return new InputStreamStatistics(); + } + + /** + * Merge in the statistics of a single input stream into + * the filesystem-wide statistics. + * @param statistics stream statistics + */ + private void mergeInputStreamStatistics(InputStreamStatistics statistics) { + streamOpenOperations.incr(statistics.openOperations); + streamCloseOperations.incr(statistics.closeOperations); + streamClosed.incr(statistics.closed); + streamAborted.incr(statistics.aborted); + streamSeekOperations.incr(statistics.seekOperations); + streamReadExceptions.incr(statistics.readExceptions); + streamForwardSeekOperations.incr(statistics.forwardSeekOperations); + streamBytesSkippedOnSeek.incr(statistics.bytesSkippedOnSeek); + streamBackwardSeekOperations.incr(statistics.backwardSeekOperations); + streamBytesBackwardsOnSeek.incr(statistics.bytesBackwardsOnSeek); + streamBytesRead.incr(statistics.bytesRead); + streamReadOperations.incr(statistics.readOperations); + streamReadFullyOperations.incr(statistics.readFullyOperations); + streamReadsIncomplete.incr(statistics.readsIncomplete); + } + + /** + * Statistics updated by an input stream during its actual operation. + * These counters not thread-safe and are for use in a single instance + * of a stream. + */ + @InterfaceAudience.Private + @InterfaceStability.Unstable + public final class InputStreamStatistics implements AutoCloseable { + public long openOperations; + public long closeOperations; + public long closed; + public long aborted; + public long seekOperations; + public long readExceptions; + public long forwardSeekOperations; + public long backwardSeekOperations; + public long bytesRead; + public long bytesSkippedOnSeek; + public long bytesBackwardsOnSeek; + public long readOperations; + public long readFullyOperations; + public long readsIncomplete; + + private InputStreamStatistics() { + } + + /** + * Seek backwards, incrementing the seek and backward seek counters. + * @param negativeOffset how far was the seek? + * This is expected to be negative. + */ + public void seekBackwards(long negativeOffset) { + seekOperations++; + backwardSeekOperations++; + bytesBackwardsOnSeek -= negativeOffset; + } + + /** + * Record a forward seek, adding a seek operation, a forward + * seek operation, and any bytes skipped. + * @param skipped number of bytes skipped by reading from the stream. + * If the seek was implemented by a close + reopen, set this to zero. + */ + public void seekForwards(long skipped) { + seekOperations++; + forwardSeekOperations++; + if (skipped > 0) { + bytesSkippedOnSeek += skipped; + } + } + + /** + * The inner stream was opened. + */ + public void streamOpened() { + openOperations++; + } + + /** + * The inner stream was closed. + * @param abortedConnection flag to indicate the stream was aborted, + * rather than closed cleanly + */ + public void streamClose(boolean abortedConnection) { + closeOperations++; + if (abortedConnection) { + this.aborted++; + } else { + closed++; + } + } + + /** + * An ignored stream read exception was received. + */ + public void readException() { + readExceptions++; + } + + /** + * Increment the bytes read counter by the number of bytes; + * no-op if the argument is negative. + * @param bytes number of bytes read + */ + public void bytesRead(long bytes) { + if (bytes > 0) { + bytesRead += bytes; + } + } + + /** + * A {@code read(byte[] buf, int off, int len)} operation has started. + * @param pos starting position of the read + * @param len length of bytes to read + */ + public void readOperationStarted(long pos, long len) { + readOperations++; + } + + /** + * A {@code PositionedRead.read(position, buffer, offset, length)} + * operation has just started. + * @param pos starting position of the read + * @param len length of bytes to read + */ + public void readFullyOperationStarted(long pos, long len) { + readFullyOperations++; + } + + /** + * A read operation has completed. + * @param requested number of requested bytes + * @param actual the actual number of bytes + */ + public void readOperationCompleted(int requested, int actual) { + if (requested > actual) { + readsIncomplete++; + } + } + + /** + * Close triggers the merge of statistics into the filesystem's + * instrumentation instance. + */ + @Override + public void close() { + mergeInputStreamStatistics(this); + } + + /** + * String operator describes all the current statistics. + * <b>Important: there are no guarantees as to the stability + * of this value.</b> + * @return the current values of the stream statistics. + */ + @Override + @InterfaceStability.Unstable + public String toString() { + final StringBuilder sb = new StringBuilder( + "StreamStatistics{"); + sb.append("OpenOperations=").append(openOperations); + sb.append(", CloseOperations=").append(closeOperations); + sb.append(", Closed=").append(closed); + sb.append(", Aborted=").append(aborted); + sb.append(", SeekOperations=").append(seekOperations); + sb.append(", ReadExceptions=").append(readExceptions); + sb.append(", ForwardSeekOperations=") + .append(forwardSeekOperations); + sb.append(", BackwardSeekOperations=") + .append(backwardSeekOperations); + sb.append(", BytesSkippedOnSeek=").append(bytesSkippedOnSeek); + sb.append(", BytesBackwardsOnSeek=").append(bytesBackwardsOnSeek); + sb.append(", BytesRead=").append(bytesRead); + sb.append(", BytesRead excluding skipped=") + .append(bytesRead - bytesSkippedOnSeek); + sb.append(", ReadOperations=").append(readOperations); + sb.append(", ReadFullyOperations=").append(readFullyOperations); + sb.append(", ReadsIncomplete=").append(readsIncomplete); + sb.append('}'); + return sb.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d74a580/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java index 3e079f2..f9ff701 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java @@ -21,14 +21,14 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.event.ProgressEvent; import com.amazonaws.event.ProgressEventType; import com.amazonaws.event.ProgressListener; -import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.transfer.TransferManager; -import com.amazonaws.services.s3.transfer.TransferManagerConfiguration; import com.amazonaws.services.s3.transfer.Upload; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; @@ -46,6 +46,11 @@ import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT; import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT; import static org.apache.hadoop.fs.s3a.Constants.*; +/** + * Output stream to save data to S3. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving public class S3AOutputStream extends OutputStream { private OutputStream backupStream; private File backupFile; @@ -65,9 +70,9 @@ public class S3AOutputStream extends OutputStream { public static final Logger LOG = S3AFileSystem.LOG; public S3AOutputStream(Configuration conf, TransferManager transfers, - S3AFileSystem fs, String bucket, String key, Progressable progress, - CannedAccessControlList cannedACL, FileSystem.Statistics statistics, - String serverSideEncryptionAlgorithm) + S3AFileSystem fs, String bucket, String key, Progressable progress, + CannedAccessControlList cannedACL, FileSystem.Statistics statistics, + String serverSideEncryptionAlgorithm) throws IOException { this.bucket = bucket; this.key = key; @@ -78,9 +83,8 @@ public class S3AOutputStream extends OutputStream { this.statistics = statistics; this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm; - partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); - partSizeThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD, - DEFAULT_MIN_MULTIPART_THRESHOLD); + partSize = fs.getPartitionSize(); + partSizeThreshold = fs.getMultiPartThreshold(); if (conf.get(BUFFER_DIR, null) != null) { lDirAlloc = new LocalDirAllocator(BUFFER_DIR); @@ -91,10 +95,8 @@ public class S3AOutputStream extends OutputStream { backupFile = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf); closed = false; - if (LOG.isDebugEnabled()) { - LOG.debug("OutputStream for key '" + key + "' writing to tempfile: " + - this.backupFile); - } + LOG.debug("OutputStream for key '{}' writing to tempfile: {}", + key, backupFile); this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile)); } @@ -111,10 +113,9 @@ public class S3AOutputStream extends OutputStream { } backupStream.close(); - if (LOG.isDebugEnabled()) { - LOG.debug("OutputStream for key '" + key + "' closed. Now beginning upload"); - LOG.debug("Minimum upload part size: " + partSize + " threshold " + partSizeThreshold); - } + LOG.debug("OutputStream for key '{}' closed. Now beginning upload", key); + LOG.debug("Minimum upload part size: {} threshold {}" , partSize, + partSizeThreshold); try { @@ -129,7 +130,7 @@ public class S3AOutputStream extends OutputStream { Upload upload = transfers.upload(putObjectRequest); ProgressableProgressListener listener = - new ProgressableProgressListener(upload, progress, statistics); + new ProgressableProgressListener(upload, progress, statistics); upload.addProgressListener(listener); upload.waitForUploadResult(); @@ -168,6 +169,9 @@ public class S3AOutputStream extends OutputStream { backupStream.write(b, off, len); } + /** + * Listener to progress from AWS regarding transfers. + */ public static class ProgressableProgressListener implements ProgressListener { private Progressable progress; private FileSystem.Statistics statistics; @@ -175,7 +179,7 @@ public class S3AOutputStream extends OutputStream { private Upload upload; public ProgressableProgressListener(Upload upload, Progressable progress, - FileSystem.Statistics statistics) { + FileSystem.Statistics statistics) { this.upload = upload; this.progress = progress; this.statistics = statistics; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d74a580/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index 002338d..a46e084 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -437,6 +437,14 @@ this capability. <description>The implementation class of the S3A AbstractFileSystem.</description> </property> + <property> + <name>fs.s3a.readahead.range</name> + <value>65536</value> + <description>Bytes to read ahead during a seek() before closing and + re-opening the S3 HTTP connection. This option will be overridden if + any call to setReadahead() is made to an open stream.</description> + </property> + ### S3AFastOutputStream **Warning: NEW in hadoop 2.7. UNSTABLE, EXPERIMENTAL: use at own risk** @@ -647,7 +655,7 @@ Example: <configuration> <include xmlns="http://www.w3.org/2001/XInclude" - href="auth-keys.xml"/> + href="/home/testuser/.ssh/auth-keys.xml"/> <property> <name>fs.contract.test.fs.s3</name> @@ -667,7 +675,61 @@ Example: </configuration> -This example pulls in the `auth-keys.xml` file for the credentials. +This example pulls in the `~/.ssh/auth-keys.xml` file for the credentials. This provides one single place to keep the keys up to date âand means that the file `contract-test-options.xml` does not contain any -secret credentials itself. +secret credentials itself. As the auth keys XML file is kept out of the +source code tree, it is not going to get accidentally committed. + +### Running Performance Tests against non-AWS storage infrastructures + + +#### CSV Data source + +The `TestS3AInputStreamPerformance` tests require read access to a multi-MB +text file. The default file for these tests is one published by amazon, +[s3a://landsat-pds.s3.amazonaws.com/scene_list.gz](http://landsat-pds.s3.amazonaws.com/scene_list.gz). +This is a gzipped CSV index of other files which amazon serves for open use. + +The path to this object is set in the option `fs.s3a.scale.test.csvfile`: + + <property> + <name>fs.s3a.scale.test.csvfile</name> + <value>s3a://landsat-pds/scene_list.gz</value> + </property> + +1. If the option is not overridden, the default value is used. This +is hosted in Amazon's US-east datacenter. +1. If the property is empty, tests which require it will be skipped. +1. If the data cannot be read for any reason then the test will fail. +1. If the property is set to a different path, then that data must be readable +and "sufficiently" large. + +To test on different S3 endpoints, or alternate infrastructures supporting +the same APIs, the option `fs.s3a.scale.test.csvfile` must therefore be +set to " ", or an object of at least 10MB is uploaded to the object store, and +the `fs.s3a.scale.test.csvfile` option set to its path. + + <property> + <name>fs.s3a.scale.test.csvfile</name> + <value> </value> + </property> + + +#### Scale test operation count + +Some scale tests perform multiple operations (such as creating many directories). + +The exact number of operations to perform is configurable in the option +`scale.test.operation.count` + + <property> + <name>scale.test.operation.count</name> + <value>10</value> + </property> + +Larger values generate more load, and are recommended when testing locally, +or in batch runs. + +Smaller values should result in faster test runs, especially when the object +store is a long way away. http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d74a580/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java index e44a90e..42c552a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java @@ -19,18 +19,25 @@ package org.apache.hadoop.fs.s3a.scale; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AInputStream; +import org.apache.hadoop.fs.s3a.S3AInstrumentation; import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.URI; +import java.io.InputStream; +import java.util.Locale; import static org.junit.Assume.assumeTrue; @@ -38,30 +45,61 @@ import static org.junit.Assume.assumeTrue; * Base class for scale tests; here is where the common scale configuration * keys are defined. */ -public class S3AScaleTestBase { +public class S3AScaleTestBase extends Assert { public static final String SCALE_TEST = "scale.test."; + public static final String S3A_SCALE_TEST = "fs.s3a.scale.test."; + + @Rule + public TestName methodName = new TestName(); + + @BeforeClass + public static void nameThread() { + Thread.currentThread().setName("JUnit"); + } + /** - * The number of operations to perform: {@value} + * The number of operations to perform: {@value}. */ public static final String KEY_OPERATION_COUNT = SCALE_TEST + "operation.count"; /** - * The default number of operations to perform: {@value} + * The readahead buffer: {@value}. + */ + public static final String KEY_READ_BUFFER_SIZE = + S3A_SCALE_TEST + "read.buffer.size"; + + public static final int DEFAULT_READ_BUFFER_SIZE = 16384; + + /** + * Key for a multi MB test file: {@value}. + */ + public static final String KEY_CSVTEST_FILE = + S3A_SCALE_TEST + "csvfile"; + + /** + * Default path for the multi MB test file: {@value}. + */ + public static final String DEFAULT_CSVTEST_FILE + = "s3a://landsat-pds/scene_list.gz"; + + /** + * The default number of operations to perform: {@value}. */ public static final long DEFAULT_OPERATION_COUNT = 2005; protected S3AFileSystem fs; - private static final Logger LOG = + + protected static final Logger LOG = LoggerFactory.getLogger(S3AScaleTestBase.class); private Configuration conf; /** * Configuration generator. May be overridden to inject - * some custom options + * some custom options. * @return a configuration with which to create FS instances */ protected Configuration createConfiguration() { @@ -69,7 +107,7 @@ public class S3AScaleTestBase { } /** - * Get the configuration used to set up the FS + * Get the configuration used to set up the FS. * @return the configuration */ public Configuration getConf() { @@ -79,7 +117,7 @@ public class S3AScaleTestBase { @Before public void setUp() throws Exception { conf = createConfiguration(); - LOG.info("Scale test operation count = {}", getOperationCount()); + LOG.debug("Scale test operation count = {}", getOperationCount()); fs = S3ATestUtils.createTestFileSystem(conf); } @@ -95,4 +133,139 @@ public class S3AScaleTestBase { protected long getOperationCount() { return getConf().getLong(KEY_OPERATION_COUNT, DEFAULT_OPERATION_COUNT); } + + /** + * Describe a test in the logs + * @param text text to print + * @param args arguments to format in the printing + */ + protected void describe(String text, Object... args) { + LOG.info("\n\n{}: {}\n", + methodName.getMethodName(), + String.format(text, args)); + } + + /** + * Get the input stream statistics of an input stream. + * Raises an exception if the inner stream is not an S3A input stream + * @param in wrapper + * @return the statistics for the inner stream + */ + protected S3AInstrumentation.InputStreamStatistics getInputStreamStatistics( + FSDataInputStream in) { + InputStream inner = in.getWrappedStream(); + if (inner instanceof S3AInputStream) { + S3AInputStream s3a = (S3AInputStream) inner; + return s3a.getS3AStreamStatistics(); + } else { + Assert.fail("Not an S3AInputStream: " + inner); + // never reached + return null; + } + } + + /** + * Make times more readable, by adding a "," every three digits. + * @param nanos nanos or other large number + * @return a string for logging + */ + protected static String toHuman(long nanos) { + return String.format(Locale.ENGLISH, "%,d", nanos); + } + + /** + * Log the bandwidth of a timer as inferred from the number of + * bytes processed. + * @param timer timer + * @param bytes bytes processed in the time period + */ + protected void bandwidth(NanoTimer timer, long bytes) { + LOG.info("Bandwidth = {} MB/S", + timer.bandwidthDescription(bytes)); + } + + /** + * Work out the bandwidth in MB/s + * @param bytes bytes + * @param durationNS duration in nanos + * @return the number of megabytes/second of the recorded operation + */ + public static double bandwidthMBs(long bytes, long durationNS) { + return (bytes * 1000.0 ) / durationNS; + } + + /** + * A simple class for timing operations in nanoseconds, and for + * printing some useful results in the process. + */ + protected static class NanoTimer { + final long startTime; + long endTime; + + public NanoTimer() { + startTime = now(); + } + + /** + * End the operation + * @return the duration of the operation + */ + public long end() { + endTime = now(); + return duration(); + } + + /** + * End the operation; log the duration + * @param format message + * @param args any arguments + * @return the duration of the operation + */ + public long end(String format, Object... args) { + long d = end(); + LOG.info("Duration of {}: {} nS", + String.format(format, args), toHuman(d)); + return d; + } + + long now() { + return System.nanoTime(); + } + + long duration() { + return endTime - startTime; + } + + double bandwidth(long bytes) { + return S3AScaleTestBase.bandwidthMBs(bytes, duration()); + } + + /** + * Bandwidth as bytes per second + * @param bytes bytes in + * @return the number of bytes per second this operation timed. + */ + double bandwidthBytes(long bytes) { + return (bytes * 1.0 ) / duration(); + } + + /** + * How many nanoseconds per byte + * @param bytes bytes processed in this time period + * @return the nanoseconds it took each byte to be processed + */ + long nanosPerByte(long bytes) { + return duration() / bytes; + } + + /** + * Get a description of the bandwidth, even down to fractions of + * a MB + * @param bytes bytes processed + * @return bandwidth + */ + String bandwidthDescription(long bytes) { + return String.format("%,.6f", bandwidth(bytes)); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d74a580/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java index d521ba8..e3c6fa0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java @@ -40,7 +40,6 @@ public class TestS3ADeleteManyFiles extends S3AScaleTestBase { private static final Logger LOG = LoggerFactory.getLogger(TestS3ADeleteManyFiles.class); - @Rule public Timeout testTimeout = new Timeout(30 * 60 * 1000); http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d74a580/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java new file mode 100644 index 0000000..0c8b273 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.scale; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3AInstrumentation; +import org.apache.hadoop.io.IOUtils; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Look at the performance of S3a operations + */ +public class TestS3AInputStreamPerformance extends S3AScaleTestBase { + private static final Logger LOG = LoggerFactory.getLogger( + TestS3AInputStreamPerformance.class); + + private S3AFileSystem s3aFS; + private Path testData; + private S3AFileStatus testDataStatus; + private FSDataInputStream in; + private S3AInstrumentation.InputStreamStatistics streamStatistics; + public static final int BLOCK_SIZE = 32 * 1024; + public static final int BIG_BLOCK_SIZE = 256 * 1024; + + /** Tests only run if the there is a named test file that can be read */ + private boolean testDataAvailable = true; + private String assumptionMessage = "test file"; + + /** + * Open the FS and the test data. The input stream is always set up here. + * @throws IOException + */ + @Before + public void openFS() throws IOException { + Configuration conf = getConf(); + String testFile = conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE); + if (testFile.isEmpty()) { + assumptionMessage = "Empty test property: " + KEY_CSVTEST_FILE; + testDataAvailable = false; + } else { + testData = new Path(testFile); + s3aFS = (S3AFileSystem) FileSystem.newInstance(testData.toUri(), conf); + try { + testDataStatus = s3aFS.getFileStatus(testData); + } catch (IOException e) { + LOG.warn("Failed to read file {} specified in {}", + testFile, KEY_CSVTEST_FILE, e); + throw e; + } + } + } + + /** + * Cleanup: close the stream, close the FS. + */ + @After + public void cleanup() { + IOUtils.closeStream(in); + IOUtils.closeStream(s3aFS); + } + + /** + * Declare that the test requires the CSV test dataset + */ + private void requireCSVTestData() { + Assume.assumeTrue(assumptionMessage, testDataAvailable); + } + + /** + * Open the test file with the read buffer specified in the setting + * {@link #KEY_READ_BUFFER_SIZE} + * @return the stream, wrapping an S3a one + * @throws IOException + */ + FSDataInputStream openTestFile() throws IOException { + int bufferSize = getConf().getInt(KEY_READ_BUFFER_SIZE, + DEFAULT_READ_BUFFER_SIZE); + FSDataInputStream stream = s3aFS.open(testData, bufferSize); + streamStatistics = getInputStreamStatistics(stream); + return stream; + } + + /** + * assert tha the stream was only ever opened once + */ + protected void assertStreamOpenedExactlyOnce() { + assertOpenOperationCount(1); + } + + /** + * Make an assertion count about the number of open operations + * @param expected the expected number + */ + private void assertOpenOperationCount(int expected) { + assertEquals("open operations in " + streamStatistics, + expected, streamStatistics.openOperations); + } + + /** + * Log how long an IOP took, by dividing the total time by the + * count of operations, printing in a human-readable form + * @param timer timing data + * @param count IOP count. + */ + protected void logTimePerIOP(NanoTimer timer, long count) { + LOG.info("Time per IOP: {} nS", toHuman(timer.duration() / count)); + } + + @Test + public void testTimeToOpenAndReadWholeFileByByte() throws Throwable { + requireCSVTestData(); + describe("Open the test file %s and read it byte by byte", testData); + long len = testDataStatus.getLen(); + NanoTimer timeOpen = new NanoTimer(); + in = openTestFile(); + timeOpen.end("Open stream"); + NanoTimer readTimer = new NanoTimer(); + long count = 0; + while (in.read() >= 0) { + count ++; + } + readTimer.end("Time to read %d bytes", len); + bandwidth(readTimer, count); + assertEquals("Not enough bytes were read)", len, count); + long nanosPerByte = readTimer.nanosPerByte(count); + LOG.info("An open() call has the equivalent duration of reading {} bytes", + toHuman( timeOpen.duration() / nanosPerByte)); + } + + @Test + public void testTimeToOpenAndReadWholeFileBlocks() throws Throwable { + requireCSVTestData(); + describe("Open the test file %s and read it in blocks of size %d", + testData, BLOCK_SIZE); + long len = testDataStatus.getLen(); + in = openTestFile(); + byte[] block = new byte[BLOCK_SIZE]; + NanoTimer timer2 = new NanoTimer(); + long count = 0; + // implicitly rounding down here + long blockCount = len / BLOCK_SIZE; + for (long i = 0; i < blockCount; i++) { + int offset = 0; + int remaining = BLOCK_SIZE; + NanoTimer blockTimer = new NanoTimer(); + int reads = 0; + while (remaining > 0) { + int bytesRead = in.read(block, offset, remaining); + reads ++; + if (bytesRead == 1) { + break; + } + remaining -= bytesRead; + offset += bytesRead; + count += bytesRead; + } + blockTimer.end("Reading block %d in %d reads", i, reads); + } + timer2.end("Time to read %d bytes in %d blocks", len, blockCount ); + bandwidth(timer2, count); + LOG.info("{}", streamStatistics); + } + + @Test + public void testLazySeekEnabled() throws Throwable { + requireCSVTestData(); + describe("Verify that seeks do not trigger any IO"); + long len = testDataStatus.getLen(); + in = openTestFile(); + NanoTimer timer = new NanoTimer(); + long blockCount = len / BLOCK_SIZE; + for (long i = 0; i < blockCount; i++) { + in.seek(in.getPos() + BLOCK_SIZE - 1); + } + in.seek(0); + blockCount++; + timer.end("Time to execute %d seeks", blockCount); + logTimePerIOP(timer, blockCount); + LOG.info("{}", streamStatistics); + assertOpenOperationCount(0); + assertEquals("bytes read", 0, streamStatistics.bytesRead); + } + + @Test + public void testReadAheadDefault() throws Throwable { + requireCSVTestData(); + describe("Verify that a series of forward skips within the readahead" + + " range do not close and reopen the stream"); + executeSeekReadSequence(BLOCK_SIZE, Constants.DEFAULT_READAHEAD_RANGE); + assertStreamOpenedExactlyOnce(); + } + + @Test + public void testReadaheadOutOfRange() throws Throwable { + requireCSVTestData(); + try { + in = openTestFile(); + in.setReadahead(-1L); + fail("Stream should have rejected the request "+ in); + } catch (IllegalArgumentException e) { + // expected + } + + } + + @Test + public void testReadBigBlocksAvailableReadahead() throws Throwable { + requireCSVTestData(); + describe("set readahead to available bytes only"); + executeSeekReadSequence(BIG_BLOCK_SIZE, 0); + // expect that the stream will have had lots of opens + assertTrue("not enough open operations in " + streamStatistics, + streamStatistics.openOperations > 1); + } + + @Test + public void testReadBigBlocksBigReadahead() throws Throwable { + requireCSVTestData(); + describe("Read big blocks with a big readahead"); + executeSeekReadSequence(BIG_BLOCK_SIZE, BIG_BLOCK_SIZE * 2); + assertStreamOpenedExactlyOnce(); + } + + /** + * Execute a seek+read sequence + * @param blockSize block size for seeks + * @param readahead what the readahead value of the stream should be + * @throws IOException IO problems + */ + protected void executeSeekReadSequence(long blockSize, + long readahead) throws IOException { + requireCSVTestData(); + long len = testDataStatus.getLen(); + in = openTestFile(); + in.setReadahead(readahead); + NanoTimer timer = new NanoTimer(); + long blockCount = len / blockSize; + LOG.info("Reading {} blocks, readahead = {}", + blockCount, readahead); + for (long i = 0; i < blockCount; i++) { + in.seek(in.getPos() + blockSize - 1); + // this is the read + assertTrue(in.read() >= 0); + } + timer.end("Time to execute %d seeks of distance %d with readahead = %d", + blockCount, + blockSize, + readahead); + logTimePerIOP(timer, blockCount); + LOG.info("Effective bandwidth {} MB/S", + timer.bandwidthDescription(streamStatistics.bytesRead - + streamStatistics.bytesSkippedOnSeek)); + LOG.info("{}", streamStatistics); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d74a580/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties index ced0687..bc85425 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties @@ -16,3 +16,6 @@ log4j.threshold=ALL log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + +# for debugging low level S3a operations, uncomment this line +# log4j.logger.org.apache.hadoop.fs.s3a=DEBUG --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org