HADOOP-14535 wasb: implement high-performance random access and seek of block blobs. Contributed by Thomas Marquardt
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d670c3a4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d670c3a4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d670c3a4 Branch: refs/heads/YARN-5972 Commit: d670c3a4da7dd80dccf6c6308603bb3bb013b3b0 Parents: 12c8fdc Author: Steve Loughran <[email protected]> Authored: Tue Jul 11 21:34:27 2017 +0100 Committer: Steve Loughran <[email protected]> Committed: Tue Jul 11 21:34:27 2017 +0100 ---------------------------------------------------------------------- .../hadoop/fs/contract/ContractTestUtils.java | 8 + .../fs/azure/AzureNativeFileSystemStore.java | 78 +- .../hadoop/fs/azure/BlockBlobInputStream.java | 396 ++++++++++ .../hadoop/fs/azure/NativeAzureFileSystem.java | 38 +- .../fs/azure/NativeAzureFileSystemHelper.java | 28 + .../hadoop/fs/azure/NativeFileSystemStore.java | 6 +- .../fs/azure/SecureStorageInterfaceImpl.java | 5 + .../hadoop/fs/azure/StorageInterface.java | 11 +- .../hadoop/fs/azure/StorageInterfaceImpl.java | 5 + .../fs/azure/AzureBlobStorageTestAccount.java | 40 +- .../hadoop/fs/azure/MockStorageInterface.java | 36 +- .../azure/TestAzureConcurrentOutOfBandIo.java | 2 +- .../fs/azure/TestBlockBlobInputStream.java | 756 +++++++++++++++++++ 13 files changed, 1325 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d670c3a4/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index fd77045..39c6d18 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -1486,6 +1486,14 @@ public class ContractTestUtils extends Assert { return now() - startTime; } + /** + * Elapsed time in milliseconds; no rounding. + * @return elapsed time + */ + public long elapsedTimeMs() { + return elapsedTime() / 1000000; + } + public double bandwidth(long bytes) { return bandwidthMBs(bytes, duration()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d670c3a4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java index 5fa964a..6b6f07a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java @@ -19,7 +19,6 @@ package org.apache.hadoop.fs.azure; import static org.apache.hadoop.fs.azure.NativeAzureFileSystem.PATH_DELIMITER; -import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; @@ -121,6 +120,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { private static final String KEY_STREAM_MIN_READ_SIZE = "fs.azure.read.request.size"; private static final String KEY_STORAGE_CONNECTION_TIMEOUT = "fs.azure.storage.timeout"; private static final String KEY_WRITE_BLOCK_SIZE = "fs.azure.write.request.size"; + @VisibleForTesting + static final String KEY_INPUT_STREAM_VERSION = "fs.azure.input.stream.version.for.internal.use.only"; // Property controlling whether to allow reads on blob which are concurrently // appended out-of-band. @@ -222,6 +223,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { public static final int DEFAULT_DOWNLOAD_BLOCK_SIZE = 4 * 1024 * 1024; public static final int DEFAULT_UPLOAD_BLOCK_SIZE = 4 * 1024 * 1024; + private static final int DEFAULT_INPUT_STREAM_VERSION = 2; + // Retry parameter defaults. // @@ -280,6 +283,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { private int downloadBlockSizeBytes = DEFAULT_DOWNLOAD_BLOCK_SIZE; private int uploadBlockSizeBytes = DEFAULT_UPLOAD_BLOCK_SIZE; + private int inputStreamVersion = DEFAULT_INPUT_STREAM_VERSION; // Bandwidth throttling exponential back-off parameters // @@ -691,6 +695,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { this.uploadBlockSizeBytes = sessionConfiguration.getInt( KEY_WRITE_BLOCK_SIZE, DEFAULT_UPLOAD_BLOCK_SIZE); + this.inputStreamVersion = sessionConfiguration.getInt( + KEY_INPUT_STREAM_VERSION, DEFAULT_INPUT_STREAM_VERSION); + // The job may want to specify a timeout to use when engaging the // storage service. The default is currently 90 seconds. It may // be necessary to increase this value for long latencies in larger @@ -1417,8 +1424,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { private InputStream openInputStream(CloudBlobWrapper blob) throws StorageException, IOException { if (blob instanceof CloudBlockBlobWrapper) { - return blob.openInputStream(getDownloadOptions(), - getInstrumentedContext(isConcurrentOOBAppendAllowed())); + LOG.debug("Using stream seek algorithm {}", inputStreamVersion); + switch(inputStreamVersion) { + case 1: + return blob.openInputStream(getDownloadOptions(), + getInstrumentedContext(isConcurrentOOBAppendAllowed())); + case 2: + return new BlockBlobInputStream((CloudBlockBlobWrapper) blob, + getDownloadOptions(), + getInstrumentedContext(isConcurrentOOBAppendAllowed())); + default: + throw new IOException("Unknown seek algorithm: " + inputStreamVersion); + } } else { return new PageBlobInputStream( (CloudPageBlobWrapper) blob, getInstrumentedContext( @@ -2023,32 +2040,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { } @Override - public DataInputStream retrieve(String key) throws AzureException, IOException { - try { - // Check if a session exists, if not create a session with the - // Azure storage server. - if (null == storageInteractionLayer) { - final String errMsg = String.format( - "Storage session expected for URI '%s' but does not exist.", - sessionUri); - throw new AssertionError(errMsg); - } - checkContainer(ContainerAccessType.PureRead); - - // Get blob reference and open the input buffer stream. - CloudBlobWrapper blob = getBlobReference(key); - - // Return a data input stream. - DataInputStream inDataStream = new DataInputStream(openInputStream(blob)); - return inDataStream; - } catch (Exception e) { - // Re-throw as an Azure storage exception. - throw new AzureException(e); - } + public InputStream retrieve(String key) throws AzureException, IOException { + return retrieve(key, 0); } @Override - public DataInputStream retrieve(String key, long startByteOffset) + public InputStream retrieve(String key, long startByteOffset) throws AzureException, IOException { try { // Check if a session exists, if not create a session with the @@ -2061,24 +2058,19 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { } checkContainer(ContainerAccessType.PureRead); - // Get blob reference and open the input buffer stream. - CloudBlobWrapper blob = getBlobReference(key); - - // Open input stream and seek to the start offset. - InputStream in = blob.openInputStream( - getDownloadOptions(), getInstrumentedContext(isConcurrentOOBAppendAllowed())); - - // Create a data input stream. - DataInputStream inDataStream = new DataInputStream(in); - - // Skip bytes and ignore return value. This is okay - // because if you try to skip too far you will be positioned - // at the end and reads will not return data. - inDataStream.skip(startByteOffset); - return inDataStream; + InputStream inputStream = openInputStream(getBlobReference(key)); + if (startByteOffset > 0) { + // Skip bytes and ignore return value. This is okay + // because if you try to skip too far you will be positioned + // at the end and reads will not return data. + inputStream.skip(startByteOffset); + } + return inputStream; + } catch (IOException e) { + throw e; } catch (Exception e) { - // Re-throw as an Azure storage exception. - throw new AzureException(e); + // Re-throw as an Azure storage exception. + throw new AzureException(e); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d670c3a4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java new file mode 100644 index 0000000..2ed0686 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import com.microsoft.azure.storage.OperationContext; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.BlobRequestOptions; + +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper; + +/** + * Encapsulates the BlobInputStream used by block blobs and adds support for + * random access and seek. Random access performance is improved by several + * orders of magnitude. + */ +final class BlockBlobInputStream extends InputStream implements Seekable { + private final CloudBlockBlobWrapper blob; + private final BlobRequestOptions options; + private final OperationContext opContext; + private InputStream blobInputStream = null; + private int minimumReadSizeInBytes = 0; + private long streamPositionAfterLastRead = -1; + private long streamPosition = 0; + private long streamLength = 0; + private boolean closed = false; + private byte[] streamBuffer; + private int streamBufferPosition; + private int streamBufferLength; + + /** + * Creates a seek-able stream for reading from block blobs. + * @param blob a block blob reference. + * @param options the blob request options. + * @param opContext the blob operation context. + * @throws IOException IO failure + */ + BlockBlobInputStream(CloudBlockBlobWrapper blob, + BlobRequestOptions options, + OperationContext opContext) throws IOException { + this.blob = blob; + this.options = options; + this.opContext = opContext; + + this.minimumReadSizeInBytes = blob.getStreamMinimumReadSizeInBytes(); + + try { + this.blobInputStream = blob.openInputStream(options, opContext); + } catch (StorageException e) { + throw new IOException(e); + } + + this.streamLength = blob.getProperties().getLength(); + } + + private void checkState() throws IOException { + if (closed) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + } + + /** + * Gets the read position of the stream. + * @return the zero-based byte offset of the read position. + * @throws IOException IO failure + */ + @Override + public synchronized long getPos() throws IOException { + checkState(); + return streamPosition; + } + + /** + * Sets the read position of the stream. + * @param pos a zero-based byte offset in the stream. + * @throws EOFException if read is out of range + */ + @Override + public synchronized void seek(long pos) throws IOException { + checkState(); + if (pos < 0) { + throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos); + } + if (pos > streamLength) { + throw new EOFException( + FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos); + } + if (pos == getPos()) { + // no=op, no state change + return; + } + + if (streamBuffer != null) { + long offset = streamPosition - pos; + if (offset > 0 && offset < streamBufferLength) { + streamBufferPosition = streamBufferLength - (int) offset; + } else { + streamBufferPosition = streamBufferLength; + } + } + + streamPosition = pos; + // close BlobInputStream after seek is invoked because BlobInputStream + // does not support seek + closeBlobInputStream(); + } + + /** + * Seeks an secondary copy of the data. This method is not supported. + * @param targetPos a zero-based byte offset in the stream. + * @return false + * @throws IOException IO failure + */ + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + + /** + * Gets the number of bytes that can be read (or skipped over) without + * performing a network operation. + * @throws IOException IO failure + */ + @Override + public synchronized int available() throws IOException { + checkState(); + if (blobInputStream != null) { + return blobInputStream.available(); + } else { + return (streamBuffer == null) + ? 0 + : streamBufferLength - streamBufferPosition; + } + } + + private void closeBlobInputStream() throws IOException { + if (blobInputStream != null) { + try { + blobInputStream.close(); + } finally { + blobInputStream = null; + } + } + } + + /** + * Closes this stream and releases any system resources associated with it. + * @throws IOException IO failure + */ + @Override + public synchronized void close() throws IOException { + closed = true; + closeBlobInputStream(); + streamBuffer = null; + streamBufferPosition = 0; + streamBufferLength = 0; + } + + private int doNetworkRead(byte[] buffer, int offset, int len) + throws IOException { + MemoryOutputStream outputStream; + boolean needToCopy = false; + + if (streamPositionAfterLastRead == streamPosition) { + // caller is reading sequentially, so initialize the stream buffer + if (streamBuffer == null) { + streamBuffer = new byte[(int) Math.min(minimumReadSizeInBytes, + streamLength)]; + } + streamBufferPosition = 0; + streamBufferLength = 0; + outputStream = new MemoryOutputStream(streamBuffer, streamBufferPosition, + streamBuffer.length); + needToCopy = true; + } else { + outputStream = new MemoryOutputStream(buffer, offset, len); + } + + long bytesToRead = Math.min( + minimumReadSizeInBytes, + Math.min( + outputStream.capacity(), + streamLength - streamPosition)); + + try { + blob.downloadRange(streamPosition, bytesToRead, outputStream, options, + opContext); + } catch (StorageException e) { + throw new IOException(e); + } + + int bytesRead = outputStream.size(); + if (bytesRead > 0) { + streamPosition += bytesRead; + streamPositionAfterLastRead = streamPosition; + int count = Math.min(bytesRead, len); + if (needToCopy) { + streamBufferLength = bytesRead; + System.arraycopy(streamBuffer, streamBufferPosition, buffer, offset, + count); + streamBufferPosition += count; + } + return count; + } else { + // This may happen if the blob was modified after the length was obtained. + throw new EOFException("End of stream reached unexpectedly."); + } + } + + /** + * Reads up to <code>len</code> bytes of data from the input stream into an + * array of bytes. + * @param b a buffer into which the data is written. + * @param offset a start offset into {@code buffer} where the data is written. + * @param len the maximum number of bytes to be read. + * @return the number of bytes written into {@code buffer}, or -1. + * @throws IOException IO failure + */ + @Override + public synchronized int read(byte[] b, int offset, int len) + throws IOException { + checkState(); + NativeAzureFileSystemHelper.validateReadArgs(b, offset, len); + if (blobInputStream != null) { + int numberOfBytesRead = blobInputStream.read(b, offset, len); + streamPosition += numberOfBytesRead; + return numberOfBytesRead; + } else { + if (offset < 0 || len < 0 || len > b.length - offset) { + throw new IndexOutOfBoundsException("read arguments out of range"); + } + if (len == 0) { + return 0; + } + + int bytesRead = 0; + int available = available(); + if (available > 0) { + bytesRead = Math.min(available, len); + System.arraycopy(streamBuffer, streamBufferPosition, b, offset, + bytesRead); + streamBufferPosition += bytesRead; + } + + if (len == bytesRead) { + return len; + } + if (streamPosition >= streamLength) { + return (bytesRead > 0) ? bytesRead : -1; + } + + offset += bytesRead; + len -= bytesRead; + + return bytesRead + doNetworkRead(b, offset, len); + } + } + + /** + * Reads the next byte of data from the stream. + * @return the next byte of data, or -1 + * @throws IOException IO failure + */ + @Override + public int read() throws IOException { + byte[] buffer = new byte[1]; + int numberOfBytesRead = read(buffer, 0, 1); + return (numberOfBytesRead < 1) ? -1 : buffer[0]; + } + + /** + * Skips over and discards n bytes of data from this input stream. + * @param n the number of bytes to be skipped. + * @return the actual number of bytes skipped. + * @throws IOException IO failure + */ + @Override + public synchronized long skip(long n) throws IOException { + checkState(); + + if (blobInputStream != null) { + return blobInputStream.skip(n); + } else { + if (n < 0 || streamPosition + n > streamLength) { + throw new IndexOutOfBoundsException("skip range"); + } + + if (streamBuffer != null) { + streamBufferPosition = (n < streamBufferLength - streamBufferPosition) + ? streamBufferPosition + (int) n + : streamBufferLength; + } + + streamPosition += n; + return n; + } + } + + /** + * An <code>OutputStream</code> backed by a user-supplied buffer. + */ + static class MemoryOutputStream extends OutputStream { + private final byte[] buffer; + private final int offset; + private final int length; + private int writePosition; + + /** + * Creates a <code>MemoryOutputStream</code> from a user-supplied buffer. + * @param buffer an array of bytes. + * @param offset a starting offset in <code>buffer</code> where the data + * will be written. + * @param length the maximum number of bytes to be written to the stream. + */ + MemoryOutputStream(byte[] buffer, int offset, int length) { + if (buffer == null) { + throw new NullPointerException("buffer"); + } + if (offset < 0 || length < 0 || length > buffer.length - offset) { + throw new IndexOutOfBoundsException("offset out of range of buffer"); + } + this.buffer = buffer; + this.offset = offset; + this.length = length; + this.writePosition = offset; + } + + /** + * Gets the current size of the stream. + */ + public synchronized int size() { + return writePosition - offset; + } + + /** + * Gets the current capacity of the stream. + */ + public synchronized int capacity() { + return length - offset; + } + + /** + * Writes the next byte to the stream. + * @param b the byte to be written. + * @throws IOException IO failure + */ + public synchronized void write(int b) throws IOException { + if (size() > length - 1) { + throw new IOException("No space for more writes"); + } + buffer[writePosition++] = (byte) b; + } + + /** + * Writes a range of bytes to the stream. + * @param b a byte array. + * @param off the start offset in <code>buffer</code> from which the data + * is read. + * @param length the number of bytes to be written. + * @throws IOException IO failure + */ + public synchronized void write(byte[] b, int off, int length) + throws IOException { + if (b == null) { + throw new NullPointerException("Null buffer argument"); + } + if (off < 0 || length < 0 || length > b.length - off) { + throw new IndexOutOfBoundsException("array write offset"); + } + System.arraycopy(b, off, buffer, writePosition, length); + writePosition += length; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d670c3a4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java index f999992..a45ea81 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java @@ -18,7 +18,6 @@ package org.apache.hadoop.fs.azure; -import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; import java.io.FileNotFoundException; @@ -60,6 +59,7 @@ import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem; import org.apache.hadoop.fs.azure.security.Constants; @@ -743,7 +743,7 @@ public class NativeAzureFileSystem extends FileSystem { // File length, valid only for streams over block blobs. private long fileLength; - public NativeAzureFsInputStream(DataInputStream in, String key, long fileLength) { + NativeAzureFsInputStream(InputStream in, String key, long fileLength) { this.in = in; this.key = key; this.isPageBlob = store.isPageBlobKey(key); @@ -817,27 +817,6 @@ public class NativeAzureFileSystem extends FileSystem { } } - @Override - public synchronized void readFully(long position, byte[] buffer, int offset, int length) - throws IOException { - validatePositionedReadArgs(position, buffer, offset, length); - - int nread = 0; - while (nread < length) { - // In case BlobInputStream is used, mark() can act as a hint to read ahead only this - // length instead of 4 MB boundary. - in.mark(length - nread); - int nbytes = read(position + nread, - buffer, - offset + nread, - length - nread); - if (nbytes < 0) { - throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY); - } - nread += nbytes; - } - } - /* * Reads up to len bytes of data from the input stream into an array of * bytes. An attempt is made to read as many as len bytes, but a smaller @@ -909,9 +888,14 @@ public class NativeAzureFileSystem extends FileSystem { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); } if (this.pos > pos) { - IOUtils.closeStream(in); - in = store.retrieve(key); - this.pos = in.skip(pos); + if (in instanceof Seekable) { + ((Seekable) in).seek(pos); + this.pos = pos; + } else { + IOUtils.closeStream(in); + in = store.retrieve(key); + this.pos = in.skip(pos); + } } else { this.pos += in.skip(pos - this.pos); } @@ -2538,7 +2522,7 @@ public class NativeAzureFileSystem extends FileSystem { + " is a directory not a file."); } - DataInputStream inputStream = null; + InputStream inputStream; try { inputStream = store.retrieve(key); } catch(Exception ex) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d670c3a4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java index 40efdc6..57af1f8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java @@ -18,9 +18,11 @@ package org.apache.hadoop.fs.azure; +import java.io.EOFException; import java.io.IOException; import java.util.Map; +import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +31,8 @@ import com.microsoft.azure.storage.StorageErrorCodeStrings; import com.microsoft.azure.storage.StorageException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.FSExceptionMessages; + /** * Utility class that has helper methods. * @@ -104,4 +108,28 @@ final class NativeAzureFileSystemHelper { } } } + + /** + * Validation code, based on + * {@code FSInputStream.validatePositionedReadArgs()}. + * @param buffer destination buffer + * @param offset offset within the buffer + * @param length length of bytes to read + * @throws EOFException if the position is negative + * @throws IndexOutOfBoundsException if there isn't space for the amount of + * data requested. + * @throws IllegalArgumentException other arguments are invalid. + */ + static void validateReadArgs(byte[] buffer, int offset, int length) + throws EOFException { + Preconditions.checkArgument(length >= 0, "length is negative"); + Preconditions.checkArgument(buffer != null, "Null buffer"); + if (buffer.length - offset < length) { + throw new IndexOutOfBoundsException( + FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER + + ": request length=" + length + + ", with offset =" + offset + + "; buffer capacity =" + (buffer.length - offset)); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d670c3a4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java index 611fe1a..1c7309f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java @@ -18,9 +18,9 @@ package org.apache.hadoop.fs.azure; -import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.InputStream; import java.net.URI; import java.util.Date; @@ -46,9 +46,9 @@ interface NativeFileSystemStore { FileMetadata retrieveMetadata(String key) throws IOException; - DataInputStream retrieve(String key) throws IOException; + InputStream retrieve(String key) throws IOException; - DataInputStream retrieve(String key, long byteRangeStart) throws IOException; + InputStream retrieve(String key, long byteRangeStart) throws IOException; DataOutputStream storefile(String key, PermissionStatus permissionStatus) throws AzureException; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d670c3a4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java index 810aacf..3d33453 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java @@ -466,6 +466,11 @@ public class SecureStorageInterfaceImpl extends StorageInterface { } @Override + public int getStreamMinimumReadSizeInBytes() { + return getBlob().getStreamMinimumReadSizeInBytes(); + } + + @Override public void setStreamMinimumReadSizeInBytes(int minimumReadSizeBytes) { getBlob().setStreamMinimumReadSizeInBytes(minimumReadSizeBytes); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d670c3a4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java index aef9fc3..8b6b082 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java @@ -582,10 +582,17 @@ abstract class StorageInterface { throws StorageException; SelfRenewingLease acquireLease() throws StorageException; - + + /** + * Gets the minimum read block size to use with this Blob. + * + * @return The minimum block size, in bytes, for reading from a block blob. + */ + int getStreamMinimumReadSizeInBytes(); + /** * Sets the minimum read block size to use with this Blob. - * + * * @param minimumReadSizeBytes * The maximum block size, in bytes, for reading from a block blob * while using a {@link BlobInputStream} object, ranging from 512 http://git-wip-us.apache.org/repos/asf/hadoop/blob/d670c3a4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java index 491a0d0..d3d0370 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java @@ -399,6 +399,11 @@ class StorageInterfaceImpl extends StorageInterface { } @Override + public int getStreamMinimumReadSizeInBytes() { + return getBlob().getStreamMinimumReadSizeInBytes(); + } + + @Override public void setStreamMinimumReadSizeInBytes(int minimumReadSizeBytes) { getBlob().setStreamMinimumReadSizeInBytes(minimumReadSizeBytes); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d670c3a4/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java index 2cdc2e7..7fa59ce 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java @@ -82,13 +82,22 @@ public final class AzureBlobStorageTestAccount { private static final ConcurrentLinkedQueue<MetricsRecord> allMetrics = new ConcurrentLinkedQueue<MetricsRecord>(); private static boolean metricsConfigSaved = false; + private boolean skipContainerDelete = false; private AzureBlobStorageTestAccount(NativeAzureFileSystem fs, CloudStorageAccount account, CloudBlobContainer container) { + this(fs, account, container, false); + } + + private AzureBlobStorageTestAccount(NativeAzureFileSystem fs, + CloudStorageAccount account, + CloudBlobContainer container, + boolean skipContainerDelete) { this.account = account; this.container = container; this.fs = fs; + this.skipContainerDelete = skipContainerDelete; } /** @@ -524,8 +533,19 @@ public final class AzureBlobStorageTestAccount { return create(containerNameSuffix, createOptions, null); } - public static AzureBlobStorageTestAccount create(String containerNameSuffix, - EnumSet<CreateOptions> createOptions, Configuration initialConfiguration) + public static AzureBlobStorageTestAccount create( + String containerNameSuffix, + EnumSet<CreateOptions> createOptions, + Configuration initialConfiguration) + throws Exception { + return create(containerNameSuffix, createOptions, initialConfiguration, false); + } + + public static AzureBlobStorageTestAccount create( + String containerNameSuffix, + EnumSet<CreateOptions> createOptions, + Configuration initialConfiguration, + boolean useContainerSuffixAsContainerName) throws Exception { saveMetricsConfigFile(); NativeAzureFileSystem fs = null; @@ -538,12 +558,17 @@ public final class AzureBlobStorageTestAccount { return null; } fs = new NativeAzureFileSystem(); - String containerName = String.format("wasbtests-%s-%tQ%s", - System.getProperty("user.name"), new Date(), containerNameSuffix); + String containerName = useContainerSuffixAsContainerName + ? containerNameSuffix + : String.format( + "wasbtests-%s-%tQ%s", + System.getProperty("user.name"), + new Date(), + containerNameSuffix); container = account.createCloudBlobClient().getContainerReference( containerName); if (createOptions.contains(CreateOptions.CreateContainer)) { - container.create(); + container.createIfNotExists(); } String accountName = conf.get(TEST_ACCOUNT_NAME_PROPERTY_NAME); if (createOptions.contains(CreateOptions.UseSas)) { @@ -578,7 +603,8 @@ public final class AzureBlobStorageTestAccount { // Create test account initializing the appropriate member variables. // AzureBlobStorageTestAccount testAcct = - new AzureBlobStorageTestAccount(fs, account, container); + new AzureBlobStorageTestAccount(fs, account, container, + useContainerSuffixAsContainerName); return testAcct; } @@ -824,7 +850,7 @@ public final class AzureBlobStorageTestAccount { fs.close(); fs = null; } - if (container != null) { + if (!skipContainerDelete && container != null) { container.deleteIfExists(); container = null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d670c3a4/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java index 4fda017..4f26d9f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.lang.reflect.Method; +import java.net.HttpURLConnection; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -474,12 +475,30 @@ public class MockStorageInterface extends StorageInterface { public void downloadRange(long offset, long length, OutputStream os, BlobRequestOptions options, OperationContext opContext) throws StorageException { - throw new NotImplementedException(); + if (offset < 0 || length <= 0) { + throw new IndexOutOfBoundsException(); + } + if (!backingStore.exists(convertUriToDecodedString(uri))) { + throw new StorageException("BlobNotFound", + "Resource does not exist.", + HttpURLConnection.HTTP_NOT_FOUND, + null, + null); + } + byte[] content = backingStore.getContent(convertUriToDecodedString(uri)); + try { + os.write(content, (int) offset, (int) length); + } catch (IOException e) { + throw new StorageException("Unknown error", "Unexpected error", e); + } } } class MockCloudBlockBlobWrapper extends MockCloudBlobWrapper implements CloudBlockBlobWrapper { + + int minimumReadSize = AzureNativeFileSystemStore.DEFAULT_DOWNLOAD_BLOCK_SIZE; + public MockCloudBlockBlobWrapper(URI uri, HashMap<String, String> metadata, int length) { super(uri, metadata, length); @@ -493,7 +512,13 @@ public class MockStorageInterface extends StorageInterface { } @Override + public int getStreamMinimumReadSizeInBytes() { + return this.minimumReadSize; + } + + @Override public void setStreamMinimumReadSizeInBytes(int minimumReadSizeBytes) { + this.minimumReadSize = minimumReadSizeBytes; } @Override @@ -546,6 +571,9 @@ public class MockStorageInterface extends StorageInterface { class MockCloudPageBlobWrapper extends MockCloudBlobWrapper implements CloudPageBlobWrapper { + + int minimumReadSize = AzureNativeFileSystemStore.DEFAULT_DOWNLOAD_BLOCK_SIZE; + public MockCloudPageBlobWrapper(URI uri, HashMap<String, String> metadata, int length) { super(uri, metadata, length); @@ -571,7 +599,13 @@ public class MockStorageInterface extends StorageInterface { } @Override + public int getStreamMinimumReadSizeInBytes() { + return this.minimumReadSize; + } + + @Override public void setStreamMinimumReadSizeInBytes(int minimumReadSize) { + this.minimumReadSize = minimumReadSize; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/d670c3a4/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java index a311a29..7ea7534 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java @@ -155,7 +155,7 @@ public class TestAzureConcurrentOutOfBandIo { "WASB_String.txt"); writeBlockTask.startWriting(); int count = 0; - DataInputStream inputStream = null; + InputStream inputStream = null; for (int i = 0; i < 5; i++) { try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d670c3a4/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java new file mode 100644 index 0000000..2db063b --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java @@ -0,0 +1,756 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.EnumSet; +import java.util.Random; +import java.util.concurrent.Callable; + +import org.junit.FixMethodOrder; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runners.MethodSorters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer; + +import static org.junit.Assert.*; +import static org.junit.Assume.*; + +import static org.apache.hadoop.test.LambdaTestUtils.*; + +/** + * Test semantics and performance of the original block blob input stream + * (KEY_INPUT_STREAM_VERSION=1) and the new + * <code>BlockBlobInputStream</code> (KEY_INPUT_STREAM_VERSION=2). + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) + +public class TestBlockBlobInputStream extends AbstractWasbTestBase { + private static final Logger LOG = LoggerFactory.getLogger( + TestBlockBlobInputStream.class); + private static final int KILOBYTE = 1024; + private static final int MEGABYTE = KILOBYTE * KILOBYTE; + private static final int TEST_FILE_SIZE = 6 * MEGABYTE; + private static final Path TEST_FILE_PATH = new Path( + "TestBlockBlobInputStream.txt"); + + private AzureBlobStorageTestAccount accountUsingInputStreamV1; + private AzureBlobStorageTestAccount accountUsingInputStreamV2; + private long testFileLength; + + /** + * Long test timeout. + */ + @Rule + public Timeout testTimeout = new Timeout(10 * 60 * 1000); + private FileStatus testFileStatus; + private Path hugefile; + + @Override + public void setUp() throws Exception { + super.setUp(); + Configuration conf = new Configuration(); + conf.setInt(AzureNativeFileSystemStore.KEY_INPUT_STREAM_VERSION, 1); + + accountUsingInputStreamV1 = AzureBlobStorageTestAccount.create( + "testblockblobinputstream", + EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer), + conf, + true); + + accountUsingInputStreamV2 = AzureBlobStorageTestAccount.create( + "testblockblobinputstream", + EnumSet.noneOf(AzureBlobStorageTestAccount.CreateOptions.class), + null, + true); + + assumeNotNull(accountUsingInputStreamV1); + assumeNotNull(accountUsingInputStreamV2); + hugefile = fs.makeQualified(TEST_FILE_PATH); + try { + testFileStatus = fs.getFileStatus(TEST_FILE_PATH); + testFileLength = testFileStatus.getLen(); + } catch (FileNotFoundException e) { + // file doesn't exist + testFileLength = 0; + } + } + + @Override + protected AzureBlobStorageTestAccount createTestAccount() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(AzureNativeFileSystemStore.KEY_INPUT_STREAM_VERSION, 1); + + accountUsingInputStreamV1 = AzureBlobStorageTestAccount.create( + "testblockblobinputstream", + EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer), + conf, + true); + + accountUsingInputStreamV2 = AzureBlobStorageTestAccount.create( + "testblockblobinputstream", + EnumSet.noneOf(AzureBlobStorageTestAccount.CreateOptions.class), + null, + true); + + assumeNotNull(accountUsingInputStreamV1); + assumeNotNull(accountUsingInputStreamV2); + return accountUsingInputStreamV1; + } + + /** + * Create a test file by repeating the characters in the alphabet. + * @throws IOException + */ + private void createTestFileAndSetLength() throws IOException { + FileSystem fs = accountUsingInputStreamV1.getFileSystem(); + + // To reduce test run time, the test file can be reused. + if (fs.exists(TEST_FILE_PATH)) { + testFileStatus = fs.getFileStatus(TEST_FILE_PATH); + testFileLength = testFileStatus.getLen(); + LOG.info("Reusing test file: {}", testFileStatus); + return; + } + + int sizeOfAlphabet = ('z' - 'a' + 1); + byte[] buffer = new byte[26 * KILOBYTE]; + char character = 'a'; + for (int i = 0; i < buffer.length; i++) { + buffer[i] = (byte) character; + character = (character == 'z') ? 'a' : (char) ((int) character + 1); + } + + LOG.info("Creating test file {} of size: {}", TEST_FILE_PATH, + TEST_FILE_SIZE ); + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + + try(FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) { + int bytesWritten = 0; + while (bytesWritten < TEST_FILE_SIZE) { + outputStream.write(buffer); + bytesWritten += buffer.length; + } + LOG.info("Closing stream {}", outputStream); + ContractTestUtils.NanoTimer closeTimer + = new ContractTestUtils.NanoTimer(); + outputStream.close(); + closeTimer.end("time to close() output stream"); + } + timer.end("time to write %d KB", TEST_FILE_SIZE / 1024); + testFileLength = fs.getFileStatus(TEST_FILE_PATH).getLen(); + } + + void assumeHugeFileExists() throws IOException { + ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile); + FileStatus status = fs.getFileStatus(hugefile); + ContractTestUtils.assertIsFile(hugefile, status); + assertTrue("File " + hugefile + " is empty", status.getLen() > 0); + } + + /** + * Calculate megabits per second from the specified values for bytes and + * milliseconds. + * @param bytes The number of bytes. + * @param milliseconds The number of milliseconds. + * @return The number of megabits per second. + */ + private static double toMbps(long bytes, long milliseconds) { + return bytes / 1000.0 * 8 / milliseconds; + } + + @Test + public void test_0100_CreateHugeFile() throws IOException { + createTestFileAndSetLength(); + } + + /** + * Validates the implementation of InputStream.markSupported. + * @throws IOException + */ + @Test + public void test_0301_MarkSupportedV1() throws IOException { + validateMarkSupported(accountUsingInputStreamV1.getFileSystem()); + } + + /** + * Validates the implementation of InputStream.markSupported. + * @throws IOException + */ + @Test + public void test_0302_MarkSupportedV2() throws IOException { + validateMarkSupported(accountUsingInputStreamV1.getFileSystem()); + } + + private void validateMarkSupported(FileSystem fs) throws IOException { + assumeHugeFileExists(); + try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { + assertTrue("mark is not supported", inputStream.markSupported()); + } + } + + /** + * Validates the implementation of InputStream.mark and reset + * for version 1 of the block blob input stream. + * @throws Exception + */ + @Test + public void test_0303_MarkAndResetV1() throws Exception { + validateMarkAndReset(accountUsingInputStreamV1.getFileSystem()); + } + + /** + * Validates the implementation of InputStream.mark and reset + * for version 2 of the block blob input stream. + * @throws Exception + */ + @Test + public void test_0304_MarkAndResetV2() throws Exception { + validateMarkAndReset(accountUsingInputStreamV2.getFileSystem()); + } + + private void validateMarkAndReset(FileSystem fs) throws Exception { + assumeHugeFileExists(); + try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { + inputStream.mark(KILOBYTE - 1); + + byte[] buffer = new byte[KILOBYTE]; + int bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + + inputStream.reset(); + assertEquals("rest -> pos 0", 0, inputStream.getPos()); + + inputStream.mark(8 * KILOBYTE - 1); + + buffer = new byte[8 * KILOBYTE]; + bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + + intercept(IOException.class, + "Resetting to invalid mark", + new Callable<FSDataInputStream>() { + @Override + public FSDataInputStream call() throws Exception { + inputStream.reset(); + return inputStream; + } + } + ); + } + } + + /** + * Validates the implementation of Seekable.seekToNewSource, which should + * return false for version 1 of the block blob input stream. + * @throws IOException + */ + @Test + public void test_0305_SeekToNewSourceV1() throws IOException { + validateSeekToNewSource(accountUsingInputStreamV1.getFileSystem()); + } + + /** + * Validates the implementation of Seekable.seekToNewSource, which should + * return false for version 2 of the block blob input stream. + * @throws IOException + */ + @Test + public void test_0306_SeekToNewSourceV2() throws IOException { + validateSeekToNewSource(accountUsingInputStreamV2.getFileSystem()); + } + + private void validateSeekToNewSource(FileSystem fs) throws IOException { + assumeHugeFileExists(); + try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { + assertFalse(inputStream.seekToNewSource(0)); + } + } + + /** + * Validates the implementation of InputStream.skip and ensures there is no + * network I/O for version 1 of the block blob input stream. + * @throws Exception + */ + @Test + public void test_0307_SkipBoundsV1() throws Exception { + validateSkipBounds(accountUsingInputStreamV1.getFileSystem()); + } + + /** + * Validates the implementation of InputStream.skip and ensures there is no + * network I/O for version 2 of the block blob input stream. + * @throws Exception + */ + @Test + public void test_0308_SkipBoundsV2() throws Exception { + validateSkipBounds(accountUsingInputStreamV2.getFileSystem()); + } + + private void validateSkipBounds(FileSystem fs) throws Exception { + assumeHugeFileExists(); + try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { + NanoTimer timer = new NanoTimer(); + + long skipped = inputStream.skip(-1); + assertEquals(0, skipped); + + skipped = inputStream.skip(0); + assertEquals(0, skipped); + + assertTrue(testFileLength > 0); + + skipped = inputStream.skip(testFileLength); + assertEquals(testFileLength, skipped); + + intercept(EOFException.class, + new Callable<Long>() { + @Override + public Long call() throws Exception { + return inputStream.skip(1); + } + } + ); + long elapsedTimeMs = timer.elapsedTimeMs(); + assertTrue( + String.format( + "There should not be any network I/O (elapsedTimeMs=%1$d).", + elapsedTimeMs), + elapsedTimeMs < 20); + } + } + + /** + * Validates the implementation of Seekable.seek and ensures there is no + * network I/O for forward seek. + * @throws Exception + */ + @Test + public void test_0309_SeekBoundsV1() throws Exception { + validateSeekBounds(accountUsingInputStreamV1.getFileSystem()); + } + + /** + * Validates the implementation of Seekable.seek and ensures there is no + * network I/O for forward seek. + * @throws Exception + */ + @Test + public void test_0310_SeekBoundsV2() throws Exception { + validateSeekBounds(accountUsingInputStreamV2.getFileSystem()); + } + + private void validateSeekBounds(FileSystem fs) throws Exception { + assumeHugeFileExists(); + try ( + FSDataInputStream inputStream = fs.open(TEST_FILE_PATH); + ) { + NanoTimer timer = new NanoTimer(); + + inputStream.seek(0); + assertEquals(0, inputStream.getPos()); + + intercept(EOFException.class, + FSExceptionMessages.NEGATIVE_SEEK, + new Callable<FSDataInputStream>() { + @Override + public FSDataInputStream call() throws Exception { + inputStream.seek(-1); + return inputStream; + } + } + ); + + assertTrue("Test file length only " + testFileLength, testFileLength > 0); + inputStream.seek(testFileLength); + assertEquals(testFileLength, inputStream.getPos()); + + intercept(EOFException.class, + FSExceptionMessages.CANNOT_SEEK_PAST_EOF, + new Callable<FSDataInputStream>() { + @Override + public FSDataInputStream call() throws Exception { + inputStream.seek(testFileLength + 1); + return inputStream; + } + } + ); + + long elapsedTimeMs = timer.elapsedTimeMs(); + assertTrue( + String.format( + "There should not be any network I/O (elapsedTimeMs=%1$d).", + elapsedTimeMs), + elapsedTimeMs < 20); + } + } + + /** + * Validates the implementation of Seekable.seek, Seekable.getPos, + * and InputStream.available. + * @throws Exception + */ + @Test + public void test_0311_SeekAndAvailableAndPositionV1() throws Exception { + validateSeekAndAvailableAndPosition( + accountUsingInputStreamV1.getFileSystem()); + } + + /** + * Validates the implementation of Seekable.seek, Seekable.getPos, + * and InputStream.available. + * @throws Exception + */ + @Test + public void test_0312_SeekAndAvailableAndPositionV2() throws Exception { + validateSeekAndAvailableAndPosition( + accountUsingInputStreamV2.getFileSystem()); + } + + private void validateSeekAndAvailableAndPosition(FileSystem fs) + throws Exception { + assumeHugeFileExists(); + try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { + byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'}; + byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'}; + byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'}; + byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'}; + byte[] buffer = new byte[3]; + + int bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected1, buffer); + assertEquals(buffer.length, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + + bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected2, buffer); + assertEquals(2 * buffer.length, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + + // reverse seek + int seekPos = 0; + inputStream.seek(seekPos); + + bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected1, buffer); + assertEquals(buffer.length + seekPos, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + + // reverse seek + seekPos = 1; + inputStream.seek(seekPos); + + bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected3, buffer); + assertEquals(buffer.length + seekPos, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + + // forward seek + seekPos = 6; + inputStream.seek(seekPos); + + bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected4, buffer); + assertEquals(buffer.length + seekPos, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + } + } + + /** + * Validates the implementation of InputStream.skip, Seekable.getPos, + * and InputStream.available. + * @throws IOException + */ + @Test + public void test_0313_SkipAndAvailableAndPositionV1() throws IOException { + validateSkipAndAvailableAndPosition( + accountUsingInputStreamV1.getFileSystem()); + } + + /** + * Validates the implementation of InputStream.skip, Seekable.getPos, + * and InputStream.available. + * @throws IOException + */ + @Test + public void test_0314_SkipAndAvailableAndPositionV2() throws IOException { + validateSkipAndAvailableAndPosition( + accountUsingInputStreamV1.getFileSystem()); + } + + private void validateSkipAndAvailableAndPosition(FileSystem fs) + throws IOException { + assumeHugeFileExists(); + try ( + FSDataInputStream inputStream = fs.open(TEST_FILE_PATH); + ) { + byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'}; + byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'}; + byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'}; + byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'}; + + assertEquals(testFileLength, inputStream.available()); + assertEquals(0, inputStream.getPos()); + + int n = 3; + long skipped = inputStream.skip(n); + + assertEquals(skipped, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + assertEquals(skipped, n); + + byte[] buffer = new byte[3]; + int bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected2, buffer); + assertEquals(buffer.length + skipped, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + + // does skip still work after seek? + int seekPos = 1; + inputStream.seek(seekPos); + + bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected3, buffer); + assertEquals(buffer.length + seekPos, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + + long currentPosition = inputStream.getPos(); + n = 2; + skipped = inputStream.skip(n); + + assertEquals(currentPosition + skipped, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + assertEquals(skipped, n); + + bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected4, buffer); + assertEquals(buffer.length + skipped + currentPosition, + inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + } + } + + /** + * Ensures parity in the performance of sequential read for + * version 1 and version 2 of the block blob input stream. + * @throws IOException + */ + @Test + public void test_0315_SequentialReadPerformance() throws IOException { + assumeHugeFileExists(); + final int maxAttempts = 10; + final double maxAcceptableRatio = 1.01; + double v1ElapsedMs = 0, v2ElapsedMs = 0; + double ratio = Double.MAX_VALUE; + for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) { + v1ElapsedMs = sequentialRead(1, + accountUsingInputStreamV1.getFileSystem(), false); + v2ElapsedMs = sequentialRead(2, + accountUsingInputStreamV2.getFileSystem(), false); + ratio = v2ElapsedMs / v1ElapsedMs; + LOG.info(String.format( + "v1ElapsedMs=%1$d, v2ElapsedMs=%2$d, ratio=%3$.2f", + (long) v1ElapsedMs, + (long) v2ElapsedMs, + ratio)); + } + assertTrue(String.format( + "Performance of version 2 is not acceptable: v1ElapsedMs=%1$d," + + " v2ElapsedMs=%2$d, ratio=%3$.2f", + (long) v1ElapsedMs, + (long) v2ElapsedMs, + ratio), + ratio < maxAcceptableRatio); + } + + /** + * Ensures parity in the performance of sequential read after reverse seek for + * version 2 of the block blob input stream. + * @throws IOException + */ + @Test + public void test_0316_SequentialReadAfterReverseSeekPerformanceV2() + throws IOException { + assumeHugeFileExists(); + final int maxAttempts = 10; + final double maxAcceptableRatio = 1.01; + double beforeSeekElapsedMs = 0, afterSeekElapsedMs = 0; + double ratio = Double.MAX_VALUE; + for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) { + beforeSeekElapsedMs = sequentialRead(2, + accountUsingInputStreamV2.getFileSystem(), false); + afterSeekElapsedMs = sequentialRead(2, + accountUsingInputStreamV2.getFileSystem(), true); + ratio = afterSeekElapsedMs / beforeSeekElapsedMs; + LOG.info(String.format( + "beforeSeekElapsedMs=%1$d, afterSeekElapsedMs=%2$d, ratio=%3$.2f", + (long) beforeSeekElapsedMs, + (long) afterSeekElapsedMs, + ratio)); + } + assertTrue(String.format( + "Performance of version 2 after reverse seek is not acceptable:" + + " beforeSeekElapsedMs=%1$d, afterSeekElapsedMs=%2$d," + + " ratio=%3$.2f", + (long) beforeSeekElapsedMs, + (long) afterSeekElapsedMs, + ratio), + ratio < maxAcceptableRatio); + } + + private long sequentialRead(int version, + FileSystem fs, + boolean afterReverseSeek) throws IOException { + byte[] buffer = new byte[16 * KILOBYTE]; + long totalBytesRead = 0; + long bytesRead = 0; + + try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { + if (afterReverseSeek) { + while (bytesRead > 0 && totalBytesRead < 4 * MEGABYTE) { + bytesRead = inputStream.read(buffer); + totalBytesRead += bytesRead; + } + totalBytesRead = 0; + inputStream.seek(0); + } + + NanoTimer timer = new NanoTimer(); + while ((bytesRead = inputStream.read(buffer)) > 0) { + totalBytesRead += bytesRead; + } + long elapsedTimeMs = timer.elapsedTimeMs(); + + LOG.info(String.format( + "v%1$d: bytesRead=%2$d, elapsedMs=%3$d, Mbps=%4$.2f," + + " afterReverseSeek=%5$s", + version, + totalBytesRead, + elapsedTimeMs, + toMbps(totalBytesRead, elapsedTimeMs), + afterReverseSeek)); + + assertEquals(testFileLength, totalBytesRead); + inputStream.close(); + return elapsedTimeMs; + } + } + + @Test + public void test_0317_RandomReadPerformance() throws IOException { + assumeHugeFileExists(); + final int maxAttempts = 10; + final double maxAcceptableRatio = 0.10; + double v1ElapsedMs = 0, v2ElapsedMs = 0; + double ratio = Double.MAX_VALUE; + for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) { + v1ElapsedMs = randomRead(1, + accountUsingInputStreamV1.getFileSystem()); + v2ElapsedMs = randomRead(2, + accountUsingInputStreamV2.getFileSystem()); + ratio = v2ElapsedMs / v1ElapsedMs; + LOG.info(String.format( + "v1ElapsedMs=%1$d, v2ElapsedMs=%2$d, ratio=%3$.2f", + (long) v1ElapsedMs, + (long) v2ElapsedMs, + ratio)); + } + assertTrue(String.format( + "Performance of version 2 is not acceptable: v1ElapsedMs=%1$d," + + " v2ElapsedMs=%2$d, ratio=%3$.2f", + (long) v1ElapsedMs, + (long) v2ElapsedMs, + ratio), + ratio < maxAcceptableRatio); + } + + private long randomRead(int version, FileSystem fs) throws IOException { + assumeHugeFileExists(); + final int minBytesToRead = 2 * MEGABYTE; + Random random = new Random(); + byte[] buffer = new byte[8 * KILOBYTE]; + long totalBytesRead = 0; + long bytesRead = 0; + try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { + NanoTimer timer = new NanoTimer(); + + do { + bytesRead = inputStream.read(buffer); + totalBytesRead += bytesRead; + inputStream.seek(random.nextInt( + (int) (testFileLength - buffer.length))); + } while (bytesRead > 0 && totalBytesRead < minBytesToRead); + + long elapsedTimeMs = timer.elapsedTimeMs(); + + inputStream.close(); + + LOG.info(String.format( + "v%1$d: totalBytesRead=%2$d, elapsedTimeMs=%3$d, Mbps=%4$.2f", + version, + totalBytesRead, + elapsedTimeMs, + toMbps(totalBytesRead, elapsedTimeMs))); + + assertTrue(minBytesToRead <= totalBytesRead); + + return elapsedTimeMs; + } + } + + @Test + public void test_999_DeleteHugeFiles() throws IOException { + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + fs.delete(TEST_FILE_PATH, false); + timer.end("time to delete %s", TEST_FILE_PATH); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
