This is an automated email from the ASF dual-hosted git repository.
anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 99d091f2d9c HADOOP-19767: [ABFS] Introduce Abfs Input Policy for
detecting read patterns (#8153)
99d091f2d9c is described below
commit 99d091f2d9c846d29386ce767939456aa811fbc3
Author: Anuj Modi <[email protected]>
AuthorDate: Fri Jan 23 17:04:46 2026 +0530
HADOOP-19767: [ABFS] Introduce Abfs Input Policy for detecting read
patterns (#8153)
---
.../hadoop/fs/azurebfs/AbfsConfiguration.java | 21 ++
.../fs/azurebfs/AzureBlobFileSystemStore.java | 39 ++-
.../fs/azurebfs/constants/ConfigurationKeys.java | 7 +
.../constants/FileSystemConfigurations.java | 2 +
.../hadoop/fs/azurebfs/constants/ReadType.java | 4 +
.../exceptions/HttpResponseException.java | 10 +
.../azurebfs/services/AbfsAdaptiveInputStream.java | 117 +++++++++
.../fs/azurebfs/services/AbfsInputStream.java | 267 ++++++++++++++++-----
.../hadoop/fs/azurebfs/services/AbfsLease.java | 7 +-
.../azurebfs/services/AbfsPrefetchInputStream.java | 100 ++++++++
.../azurebfs/services/AbfsRandomInputStream.java | 105 ++++++++
.../fs/azurebfs/services/AbfsReadPolicy.java | 78 ++++++
.../fs/azurebfs/services/AbfsRetryPolicy.java | 5 +
.../fs/azurebfs/services/ReadBufferManager.java | 19 +-
.../fs/azurebfs/services/ReadBufferManagerV1.java | 17 ++
.../fs/azurebfs/services/ReadBufferManagerV2.java | 75 ++++--
.../azurebfs/ITestAbfsInputStreamStatistics.java | 3 +-
.../fs/azurebfs/services/ITestAbfsInputStream.java | 4 +-
.../fs/azurebfs/services/TestAbfsInputStream.java | 142 ++++++++++-
.../azurebfs/services/TestReadBufferManagerV2.java | 2 +
20 files changed, 901 insertions(+), 123 deletions(-)
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index e9dd94ff4ed..6c5929d2b90 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -679,6 +679,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_FS_AZURE_LOWEST_REQUEST_PRIORITY_VALUE)
private int prefetchRequestPriorityValue;
+ @StringConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_READ_POLICY,
+ DefaultValue = DEFAULT_AZURE_READ_POLICY)
+ private String abfsReadPolicy;
+
private String clientProvidedEncryptionKey;
private String clientProvidedEncryptionKeySHA;
@@ -1433,6 +1437,14 @@ public String getPrefetchRequestPriorityValue() {
return Integer.toString(prefetchRequestPriorityValue);
}
+ /**
+ * Get the ABFS read policy set by user.
+ * @return the ABFS read policy.
+ */
+ public String getAbfsReadPolicy() {
+ return abfsReadPolicy;
+ }
+
/**
* Enum config to allow user to pick format of x-ms-client-request-id header
* @return tracingContextFormat config if valid, else default ALL_ID_FORMAT
@@ -2139,6 +2151,15 @@ public void setIsChecksumValidationEnabled(boolean
isChecksumValidationEnabled)
this.isChecksumValidationEnabled = isChecksumValidationEnabled;
}
+ /**
+ * Sets the ABFS read policy for testing purposes.
+ * @param readPolicy the read policy to set.
+ */
+ @VisibleForTesting
+ public void setAbfsReadPolicy(String readPolicy) {
+ abfsReadPolicy = readPolicy;
+ }
+
public boolean isFullBlobChecksumValidationEnabled() {
return isFullBlobChecksumValidationEnabled;
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index de4bc79d55a..6ec5c51eb1d 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -77,7 +77,6 @@
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
-import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
import org.apache.hadoop.fs.azurebfs.enums.Trilean;
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
@@ -90,6 +89,7 @@
import org.apache.hadoop.fs.azurebfs.security.ContextProviderEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.security.NoContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
+import org.apache.hadoop.fs.azurebfs.services.AbfsAdaptiveInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientContextBuilder;
@@ -97,6 +97,7 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsClientRenameResult;
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
+import org.apache.hadoop.fs.azurebfs.services.AbfsReadPolicy;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
@@ -107,10 +108,13 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
+import org.apache.hadoop.fs.azurebfs.services.AbfsPrefetchInputStream;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRandomInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.ListingSupport;
+import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
import org.apache.hadoop.fs.azurebfs.services.StaticRetryPolicy;
import
org.apache.hadoop.fs.azurebfs.services.TailLatencyRequestTimeoutRetryPolicy;
@@ -950,8 +954,37 @@ public AbfsInputStream openFileForRead(Path path,
perfInfo.registerSuccess(true);
- // Add statistics for InputStream
- return new AbfsInputStream(getClient(), statistics, relativePath,
+ return getRelevantInputStream(statistics, relativePath, contentLength,
+ parameters, contextEncryptionAdapter, eTag, tracingContext);
+ }
+ }
+
+ private AbfsInputStream getRelevantInputStream(final FileSystem.Statistics
statistics,
+ final String relativePath,
+ final long contentLength,
+ final Optional<OpenFileParameters> parameters,
+ final ContextEncryptionAdapter contextEncryptionAdapter,
+ final String eTag,
+ TracingContext tracingContext) {
+ AbfsReadPolicy inputPolicy =
AbfsReadPolicy.getAbfsReadPolicy(getAbfsConfiguration().getAbfsReadPolicy());
+ switch (inputPolicy) {
+ case SEQUENTIAL:
+ return new AbfsPrefetchInputStream(getClient(), statistics, relativePath,
+ contentLength, populateAbfsInputStreamContext(
+ parameters.map(OpenFileParameters::getOptions),
+ contextEncryptionAdapter),
+ eTag, tracingContext);
+
+ case RANDOM:
+ return new AbfsRandomInputStream(getClient(), statistics, relativePath,
+ contentLength, populateAbfsInputStreamContext(
+ parameters.map(OpenFileParameters::getOptions),
+ contextEncryptionAdapter),
+ eTag, tracingContext);
+
+ case ADAPTIVE:
+ default:
+ return new AbfsAdaptiveInputStream(getClient(), statistics, relativePath,
contentLength, populateAbfsInputStreamContext(
parameters.map(OpenFileParameters::getOptions),
contextEncryptionAdapter),
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index c5eb9235fbb..115568c7853 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options.OpenFileOptions;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DOT;
@@ -266,6 +267,12 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH =
"fs.azure.readaheadqueue.depth";
public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE =
"fs.azure.read.alwaysReadBufferSize";
public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE =
"fs.azure.read.readahead.blocksize";
+ /**
+ * Provides hint for the read workload pattern.
+ * Possible Values Exposed in {@link
OpenFileOptions#FS_OPTION_OPENFILE_READ_POLICIES}
+ */
+ public static final String FS_AZURE_READ_POLICY = "fs.azure.read.policy";
+
/** Provides a config control to enable or disable ABFS Flush operations -
* HFlush and HSync. Default is true. **/
public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush";
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 6f76f2e033c..a7717c124db 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
/**
@@ -108,6 +109,7 @@ public final class FileSystemConfigurations {
public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; //
changing default abfs blocksize to 256MB
public static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost";
public static final int DEFAULT_AZURE_LIST_MAX_RESULTS = 5000;
+ public static final String DEFAULT_AZURE_READ_POLICY =
FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM = "AES256";
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java
index 332a5a5ac56..51391cc7477 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java
@@ -48,6 +48,10 @@ public enum ReadType {
* Only triggered when small file read optimization kicks in.
*/
SMALLFILE_READ("SR"),
+ /**
+ * Reads from Random Input Stream with read ahead up to readAheadRange
+ */
+ RANDOM_READ("RR"),
/**
* None of the above read types were applicable.
*/
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/HttpResponseException.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/HttpResponseException.java
index c257309c8c9..76126049f07 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/HttpResponseException.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/HttpResponseException.java
@@ -28,12 +28,22 @@
*/
public class HttpResponseException extends IOException {
private final HttpResponse httpResponse;
+
+ /**
+ * Constructor for HttpResponseException.
+ * @param s the exception message
+ * @param httpResponse the HttpResponse object
+ */
public HttpResponseException(final String s, final HttpResponse
httpResponse) {
super(s);
Objects.requireNonNull(httpResponse, "httpResponse should be non-null");
this.httpResponse = httpResponse;
}
+ /**
+ * Gets the HttpResponse associated with this exception.
+ * @return the HttpResponse
+ */
public HttpResponse getHttpResponse() {
return httpResponse;
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java
new file mode 100644
index 00000000000..25b4529aa08
--- /dev/null
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+/**
+ * Input stream implementation optimized for adaptive read patterns.
+ * This is the default implementation used for cases where user does not
specify any input policy.
+ * It switches between sequential and random read optimizations based on the
detected read pattern.
+ * It also keeps footer read and small file optimizations enabled.
+ */
+public class AbfsAdaptiveInputStream extends AbfsInputStream {
+
+ /**
+ * Constructs AbfsAdaptiveInputStream instance.
+ * @param client to be used for read operations
+ * @param statistics to record input stream statistics
+ * @param path file path
+ * @param contentLength file content length
+ * @param abfsInputStreamContext input stream context
+ * @param eTag file eTag
+ * @param tracingContext tracing context to trace the read operations
+ */
+ public AbfsAdaptiveInputStream(
+ final AbfsClient client,
+ final FileSystem.Statistics statistics,
+ final String path,
+ final long contentLength,
+ final AbfsInputStreamContext abfsInputStreamContext,
+ final String eTag,
+ TracingContext tracingContext) {
+ super(client, statistics, path, contentLength,
+ abfsInputStreamContext, eTag, tracingContext);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected int readOneBlock(final byte[] b, final int off, final int len)
throws IOException {
+ if (len == 0) {
+ return 0;
+ }
+ if (!validate(b, off, len)) {
+ return -1;
+ }
+ // If buffer is empty, then fill the buffer.
+ if (getBCursor() == getLimit()) {
+ // If EOF, then return -1
+ if (getFCursor() >= getContentLength()) {
+ return -1;
+ }
+
+ long bytesRead = 0;
+ // reset buffer to initial state - i.e., throw away existing data
+ setBCursor(0);
+ setLimit(0);
+ if (getBuffer() == null) {
+ LOG.debug("created new buffer size {}", getBufferSize());
+ setBuffer(new byte[getBufferSize()]);
+ }
+
+ // Reset Read Type back to normal and set again based on code flow.
+ getTracingContext().setReadType(ReadType.NORMAL_READ);
+ if (shouldAlwaysReadBufferSize()) {
+ bytesRead = readInternal(getFCursor(), getBuffer(), 0,
getBufferSize(), false);
+ } else {
+ // Enable readAhead when reading sequentially
+ if (-1 == getFCursorAfterLastRead() || getFCursorAfterLastRead() ==
getFCursor() || b.length >= getBufferSize()) {
+ LOG.debug("Sequential read with read ahead size of {}",
getBufferSize());
+ bytesRead = readInternal(getFCursor(), getBuffer(), 0,
getBufferSize(), false);
+ } else {
+ /*
+ * Disable queuing prefetches when random read pattern detected.
+ * Instead, read ahead only for readAheadRange above what is asked
by caller.
+ */
+ getTracingContext().setReadType(ReadType.RANDOM_READ);
+ int lengthWithReadAhead = Math.min(b.length + getReadAheadRange(),
getBufferSize());
+ LOG.debug("Random read with read ahead size of {}",
lengthWithReadAhead);
+ bytesRead = readInternal(getFCursor(), getBuffer(), 0,
lengthWithReadAhead, true);
+ }
+ }
+ if (isFirstRead()) {
+ setFirstRead(false);
+ }
+ if (bytesRead == -1) {
+ return -1;
+ }
+
+ setLimit(getLimit() + (int) bytesRead);
+ setFCursor(getFCursor() + bytesRead);
+ setFCursorAfterLastRead(getFCursor());
+ }
+ return copyToUserBuffer(b, off, len);
+ }
+}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 31b6f0f0739..3c009c71bcd 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -26,7 +26,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
-import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.azurebfs.constants.ReadType;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.util.Preconditions;
@@ -62,9 +61,9 @@
/**
* The AbfsInputStream for AbfsClient.
*/
-public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
+public abstract class AbfsInputStream extends FSInputStream implements
CanUnbuffer,
StreamCapabilities, IOStatisticsSource {
- private static final Logger LOG =
LoggerFactory.getLogger(AbfsInputStream.class);
+ protected static final Logger LOG =
LoggerFactory.getLogger(AbfsInputStream.class);
// Footer size is set to qualify for both ORC and parquet files
public static final int FOOTER_SIZE = 16 * ONE_KB;
public static final int MAX_OPTIMIZED_READ_ATTEMPTS = 2;
@@ -73,6 +72,7 @@ public class AbfsInputStream extends FSInputStream implements
CanUnbuffer,
private final AbfsClient client;
private final Statistics statistics;
private final String path;
+
private final long contentLength;
private final int bufferSize; // default buffer size
private final int footerReadSize; // default buffer size to read when
reading footer
@@ -94,7 +94,8 @@ public class AbfsInputStream extends FSInputStream implements
CanUnbuffer,
// User configured size of read ahead.
private final int readAheadRange;
- private boolean firstRead = true;
+ private boolean firstRead = true; // to identify first read for optimizations
+
// SAS tokens can be re-used until they expire
private CachedSASToken cachedSasToken;
private byte[] buffer = null; // will be initialized on first use
@@ -134,6 +135,16 @@ public class AbfsInputStream extends FSInputStream
implements CanUnbuffer,
private final BackReference fsBackRef;
private final ReadBufferManager readBufferManager;
+ /**
+ * Constructor for AbfsInputStream.
+ * @param client the ABFS client
+ * @param statistics the statistics
+ * @param path the file path
+ * @param contentLength the content length
+ * @param abfsInputStreamContext the input stream context
+ * @param eTag the eTag of the file
+ * @param tracingContext the tracing context
+ */
public AbfsInputStream(
final AbfsClient client,
final Statistics statistics,
@@ -197,6 +208,10 @@ public AbfsInputStream(
}
}
+ /**
+ * Returns the path of file associated with this stream.
+ * @return the path of the file
+ */
public String getPath() {
return path;
}
@@ -258,7 +273,7 @@ public synchronized int read(final byte[] b, final int off,
final int len) throw
// check if buffer is null before logging the length
if (b != null) {
LOG.debug("read requested b.length = {} offset = {} len = {}", b.length,
- off, len);
+ off, len);
} else {
LOG.debug("read requested b = null offset = {} len = {}", off, len);
}
@@ -327,58 +342,15 @@ private boolean shouldReadLastBlock() {
&& this.fCursor >= footerStart;
}
- private int readOneBlock(final byte[] b, final int off, final int len)
throws IOException {
- if (len == 0) {
- return 0;
- }
- if (!validate(b, off, len)) {
- return -1;
- }
- //If buffer is empty, then fill the buffer.
- if (bCursor == limit) {
- //If EOF, then return -1
- if (fCursor >= contentLength) {
- return -1;
- }
-
- long bytesRead = 0;
- //reset buffer to initial state - i.e., throw away existing data
- bCursor = 0;
- limit = 0;
- if (buffer == null) {
- LOG.debug("created new buffer size {}", bufferSize);
- buffer = new byte[bufferSize];
- }
-
- // Reset Read Type back to normal and set again based on code flow.
- tracingContext.setReadType(ReadType.NORMAL_READ);
- if (alwaysReadBufferSize) {
- bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
- } else {
- // Enable readAhead when reading sequentially
- if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor ||
b.length >= bufferSize) {
- LOG.debug("Sequential read with read ahead size of {}", bufferSize);
- bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
- } else {
- // Enabling read ahead for random reads as well to reduce number of
remote calls.
- int lengthWithReadAhead = Math.min(b.length + readAheadRange,
bufferSize);
- LOG.debug("Random read with read ahead size of {}",
lengthWithReadAhead);
- bytesRead = readInternal(fCursor, buffer, 0, lengthWithReadAhead,
true);
- }
- }
- if (firstRead) {
- firstRead = false;
- }
- if (bytesRead == -1) {
- return -1;
- }
-
- limit += bytesRead;
- fCursor += bytesRead;
- fCursorAfterLastRead = fCursor;
- }
- return copyToUserBuffer(b, off, len);
- }
+ /**
+ * Read one block of data into buffer.
+ * @param b buffer
+ * @param off offset
+ * @param len length
+ * @return number of bytes read
+ * @throws IOException if there is an error
+ */
+ protected abstract int readOneBlock(byte[] b, int off, int len) throws
IOException;
private int readFileCompletely(final byte[] b, final int off, final int len)
throws IOException {
@@ -472,7 +444,15 @@ private void restorePointerState() {
this.bCursor = this.bCursorBkp;
}
- private boolean validate(final byte[] b, final int off, final int len)
+ /**
+ * Validate the read parameters.
+ * @param b buffer byte array
+ * @param off offset in buffer
+ * @param len length to read
+ * @return true if valid else false
+ * @throws IOException if there is an error
+ */
+ protected boolean validate(final byte[] b, final int off, final int len)
throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
@@ -492,7 +472,14 @@ private boolean validate(final byte[] b, final int off,
final int len)
return true;
}
- private int copyToUserBuffer(byte[] b, int off, int len){
+ /**
+ * Copy data from internal buffer to user buffer.
+ * @param b user buffer
+ * @param off offset
+ * @param len length
+ * @return number of bytes copied
+ */
+ protected int copyToUserBuffer(byte[] b, int off, int len){
//If there is anything in the buffer, then return lesser of (requested
bytes) and (bytes in buffer)
//(bytes returned may be less than requested)
int bytesRemaining = limit - bCursor;
@@ -511,7 +498,17 @@ private int copyToUserBuffer(byte[] b, int off, int len){
return bytesToRead;
}
- private int readInternal(final long position, final byte[] b, final int
offset, final int length,
+ /**
+ * Internal read method which handles read-ahead logic.
+ * @param position to read from
+ * @param b buffer
+ * @param offset in buffer
+ * @param length to read
+ * @param bypassReadAhead whether to bypass read-ahead
+ * @return number of bytes read
+ * @throws IOException if there is an error
+ */
+ protected int readInternal(final long position, final byte[] b, final int
offset, final int length,
final boolean bypassReadAhead) throws IOException {
if (isReadAheadEnabled() && !bypassReadAhead) {
// try reading from read-ahead
@@ -728,6 +725,10 @@ public synchronized long getPos() throws IOException {
return nextReadPos < 0 ? 0 : nextReadPos;
}
+ /**
+ * Get the tracing context associated with this stream.
+ * @return the tracing context
+ */
public TracingContext getTracingContext() {
return tracingContext;
}
@@ -797,10 +798,22 @@ public boolean hasCapability(String capability) {
return StreamCapabilities.UNBUFFER.equals(toLowerCase(capability));
}
- byte[] getBuffer() {
+ /**
+ * Getter for buffer.
+ * @return the buffer
+ */
+ synchronized byte[] getBuffer() {
return buffer;
}
+ /**
+ * Setter for buffer.
+ * @param buffer the buffer to set
+ */
+ protected synchronized void setBuffer(byte[] buffer) {
+ this.buffer = buffer;
+ }
+
/**
* Checks if any version of read ahead is enabled.
* If both are disabled, then skip read ahead logic.
@@ -811,21 +824,38 @@ public boolean isReadAheadEnabled() {
return (readAheadEnabled || readAheadV2Enabled) && getReadBufferManager()
!= null;
}
+ /**
+ * Getter for user configured read ahead range.
+ * @return the read ahead range in int.
+ */
@VisibleForTesting
public int getReadAheadRange() {
return readAheadRange;
}
+ /**
+ * Setter for cachedSasToken.
+ * @param cachedSasToken the cachedSasToken to set
+ */
@VisibleForTesting
protected void setCachedSasToken(final CachedSASToken cachedSasToken) {
this.cachedSasToken = cachedSasToken;
}
+ /**
+ * Getter for inputStreamId.
+ * @return the inputStreamId
+ */
@VisibleForTesting
public String getStreamID() {
return inputStreamId;
}
+ /**
+ * Getter for eTag.
+ *
+ * @return the eTag
+ */
public String getETag() {
return eTag;
}
@@ -840,6 +870,10 @@ public AbfsInputStreamStatistics getStreamStatistics() {
return streamStatistics;
}
+ /**
+ * Register a listener for this stream.
+ * @param listener1 the listener to register
+ */
@VisibleForTesting
public void registerListener(Listener listener1) {
listener = listener1;
@@ -866,26 +900,46 @@ public long getBytesFromRemoteRead() {
return bytesFromRemoteRead;
}
+ /**
+ * Getter for buffer size.
+ * @return the buffer size
+ */
@VisibleForTesting
public int getBufferSize() {
return bufferSize;
}
+ /**
+ * Getter for footer read buffer size.
+ * @return the footer read buffer size
+ */
@VisibleForTesting
protected int getFooterReadBufferSize() {
return footerReadSize;
}
+ /**
+ * Getter for read ahead queue depth.
+ * @return the read ahead queue depth
+ */
@VisibleForTesting
public int getReadAheadQueueDepth() {
return readAheadQueueDepth;
}
+ /**
+ * Getter for alwaysReadBufferSize.
+ * @return the alwaysReadBufferSize
+ */
@VisibleForTesting
public boolean shouldAlwaysReadBufferSize() {
return alwaysReadBufferSize;
}
+ /**
+ * Get the IOStatistics for the stream.
+ * @return IOStatistics
+ */
@Override
public IOStatistics getIOStatistics() {
return ioStatistics;
@@ -907,48 +961,131 @@ public String toString() {
return sb.toString();
}
+ /**
+ * Getter for bCursor.
+ * @return the bCursor
+ */
@VisibleForTesting
- int getBCursor() {
+ synchronized int getBCursor() {
return this.bCursor;
}
+ /**
+ * Setter for bCursor.
+ * @param bCursor the bCursor to set
+ */
+ protected synchronized void setBCursor(int bCursor) {
+ this.bCursor = bCursor;
+ }
+
+ /**
+ * Getter for fCursor.
+ * @return the fCursor
+ */
@VisibleForTesting
- long getFCursor() {
+ synchronized long getFCursor() {
return this.fCursor;
}
+ /**
+ * Setter for fCursor.
+ * @param fCursor the fCursor to set
+ */
+ protected synchronized void setFCursor(long fCursor) {
+ this.fCursor = fCursor;
+ }
+
+ /**
+ * Getter for fCursorAfterLastRead.
+ * @return the fCursorAfterLastRead
+ */
@VisibleForTesting
- long getFCursorAfterLastRead() {
+ synchronized long getFCursorAfterLastRead() {
return this.fCursorAfterLastRead;
}
+ /**
+ * Setter for fCursorAfterLastRead.
+ * @param fCursorAfterLastRead the fCursorAfterLastRead to set
+ */
+ protected synchronized void setFCursorAfterLastRead(long
fCursorAfterLastRead) {
+ this.fCursorAfterLastRead = fCursorAfterLastRead;
+ }
+
+ /**
+ * Getter for limit.
+ * @return the limit
+ */
@VisibleForTesting
- long getLimit() {
+ synchronized int getLimit() {
return this.limit;
}
+ /**
+ * Setter for limit.
+ * @param limit the limit to set
+ */
+ protected synchronized void setLimit(int limit) {
+ this.limit = limit;
+ }
+
+ /**
+ * Getter for firstRead.
+ * @return the firstRead
+ */
boolean isFirstRead() {
return this.firstRead;
}
+ /**
+ * Setter for firstRead.
+ * @param firstRead the firstRead to set
+ */
+ protected void setFirstRead(boolean firstRead) {
+ this.firstRead = firstRead;
+ }
+
+ /**
+ * Getter for fsBackRef.
+ * @return the fsBackRef
+ */
@VisibleForTesting
BackReference getFsBackRef() {
return fsBackRef;
}
+ /**
+ * Getter for readBufferManager.
+ * @return the readBufferManager
+ */
@VisibleForTesting
ReadBufferManager getReadBufferManager() {
return readBufferManager;
}
+ /**
+ * Minimum seek distance for vector reads.
+ * @return the minimum seek distance
+ */
@Override
public int minSeekForVectorReads() {
return S_128K;
}
+ /**
+ * Maximum read size for vector reads.
+ * @return the maximum read size
+ */
@Override
public int maxReadSizeForVectorReads() {
return S_2M;
}
+ /**
+ * Getter for contentLength.
+ * @return the contentLength
+ */
+ protected long getContentLength() {
+ return contentLength;
+ }
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java
index fa4b0ac209d..aab6f3d5510 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java
@@ -22,6 +22,7 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,7 +71,7 @@ public final class AbfsLease {
private volatile boolean leaseFreed;
private volatile String leaseID = null;
private volatile Throwable exception = null;
- private volatile int acquireRetryCount = 0;
+ private AtomicInteger acquireRetryCount = new AtomicInteger(0);
private volatile ListenableScheduledFuture<AbfsRestOperation> future = null;
private final long leaseRefreshDuration;
private final int leaseRefreshDurationInSeconds;
@@ -197,7 +198,7 @@ public void onFailure(Throwable throwable) {
if (RetryPolicy.RetryAction.RetryDecision.RETRY
== retryPolicy.shouldRetry(null, numRetries, 0, true).action) {
LOG.debug("Failed to acquire lease on {}, retrying: {}", path,
throwable);
- acquireRetryCount++;
+ acquireRetryCount.incrementAndGet();
acquireLease(retryPolicy, numRetries + 1, retryInterval,
retryInterval, eTag, tracingContext);
} else {
@@ -289,7 +290,7 @@ public String getLeaseID() {
*/
@VisibleForTesting
public int getAcquireRetryCount() {
- return acquireRetryCount;
+ return acquireRetryCount.get();
}
/**
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPrefetchInputStream.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPrefetchInputStream.java
new file mode 100644
index 00000000000..c0343ca724e
--- /dev/null
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPrefetchInputStream.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+/**
+ * Input stream implementation optimized for prefetching data.
+ * This implementation always prefetches data in advance if enabled
+ * to optimize for sequential read patterns.
+ */
+public class AbfsPrefetchInputStream extends AbfsInputStream {
+
+ /**
+ * Constructs AbfsPrefetchInputStream
+ * @param client AbfsClient to be used for read operations
+ * @param statistics to recordinput stream statistics
+ * @param path file path
+ * @param contentLength file content length
+ * @param abfsInputStreamContext input stream context
+ * @param eTag file eTag
+ * @param tracingContext tracing context to trace the read operations
+ */
+ public AbfsPrefetchInputStream(
+ final AbfsClient client,
+ final FileSystem.Statistics statistics,
+ final String path,
+ final long contentLength,
+ final AbfsInputStreamContext abfsInputStreamContext,
+ final String eTag,
+ TracingContext tracingContext) {
+ super(client, statistics, path, contentLength,
+ abfsInputStreamContext, eTag, tracingContext);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected int readOneBlock(final byte[] b, final int off, final int len)
throws IOException {
+ if (len == 0) {
+ return 0;
+ }
+ if (!validate(b, off, len)) {
+ return -1;
+ }
+ // If buffer is empty, then fill the buffer.
+ if (getBCursor() == getLimit()) {
+ // If EOF, then return -1
+ if (getFCursor() >= getContentLength()) {
+ return -1;
+ }
+
+ long bytesRead = 0;
+ // reset buffer to initial state - i.e., throw away existing data
+ setBCursor(0);
+ setLimit(0);
+ if (getBuffer() == null) {
+ LOG.debug("created new buffer size {}", getBufferSize());
+ setBuffer(new byte[getBufferSize()]);
+ }
+
+ /*
+ * Always start with Prefetch even from first read.
+ * Even if out of order seek comes, prefetches will be triggered for
next set of blocks.
+ */
+ bytesRead = readInternal(getFCursor(), getBuffer(), 0, getBufferSize(),
false);
+ if (isFirstRead()) {
+ setFirstRead(false);
+ }
+ if (bytesRead == -1) {
+ return -1;
+ }
+
+ setLimit(getLimit() + (int) bytesRead);
+ setFCursor(getFCursor() + bytesRead);
+ setFCursorAfterLastRead(getFCursor());
+ }
+ return copyToUserBuffer(b, off, len);
+ }
+}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRandomInputStream.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRandomInputStream.java
new file mode 100644
index 00000000000..b484cc6c843
--- /dev/null
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRandomInputStream.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+/**
+ * Input stream implementation optimized for random read patterns.
+ * This implementation disables prefetching of data blocks instead only
+ * reads ahead for a small range beyond what is requested by the caller.
+ */
+public class AbfsRandomInputStream extends AbfsInputStream {
+
+ /**
+ * Constructs AbfsRandomInputStream
+ * @param client AbfsClient to be used for read operations
+ * @param statistics to record input stream statistics
+ * @param path file path
+ * @param contentLength file content length
+ * @param abfsInputStreamContext input stream context
+ * @param eTag file eTag
+ * @param tracingContext tracing context to trace the read operations
+ */
+ public AbfsRandomInputStream(
+ final AbfsClient client,
+ final FileSystem.Statistics statistics,
+ final String path,
+ final long contentLength,
+ final AbfsInputStreamContext abfsInputStreamContext,
+ final String eTag,
+ TracingContext tracingContext) {
+ super(client, statistics, path, contentLength,
+ abfsInputStreamContext, eTag, tracingContext);
+ }
+
+ /**
+ * inheritDoc
+ */
+ @Override
+ protected int readOneBlock(final byte[] b, final int off, final int len)
+ throws IOException {
+ if (len == 0) {
+ return 0;
+ }
+ if (!validate(b, off, len)) {
+ return -1;
+ }
+ // If buffer is empty, then fill the buffer.
+ if (getBCursor() == getLimit()) {
+ // If EOF, then return -1
+ if (getFCursor() >= getContentLength()) {
+ return -1;
+ }
+
+ long bytesRead = 0;
+ // reset buffer to initial state - i.e., throw away existing data
+ setBCursor(0);
+ setLimit(0);
+ if (getBuffer() == null) {
+ LOG.debug("created new buffer size {}", getBufferSize());
+ setBuffer(new byte[getBufferSize()]);
+ }
+
+ /*
+ * Disable queuing prefetches when random read pattern detected.
+ * Instead, read ahead only for readAheadRange above what is asked by
caller.
+ */
+ getTracingContext().setReadType(ReadType.RANDOM_READ);
+ int lengthWithReadAhead = Math.min(b.length + getReadAheadRange(),
getBufferSize());
+ LOG.debug("Random read with read ahead size of {}", lengthWithReadAhead);
+ bytesRead = readInternal(getFCursor(), getBuffer(), 0,
lengthWithReadAhead, true);
+ if (isFirstRead()) {
+ setFirstRead(false);
+ }
+ if (bytesRead == -1) {
+ return -1;
+ }
+
+ setLimit(getLimit() + (int) bytesRead);
+ setFCursor(getFCursor() + bytesRead);
+ setFCursorAfterLastRead(getFCursor());
+ }
+ return copyToUserBuffer(b, off, len);
+ }
+}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadPolicy.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadPolicy.java
new file mode 100644
index 00000000000..bdd895a39de
--- /dev/null
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadPolicy.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.Locale;
+
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_COLUMNAR;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ORC;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+
+/**
+ * Enum for ABFS Input Policies.
+ * Each policy maps to a particular implementation of {@link AbfsInputStream}
+ */
+public enum AbfsReadPolicy {
+
+ SEQUENTIAL(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL),
+ RANDOM(FS_OPTION_OPENFILE_READ_POLICY_RANDOM),
+ ADAPTIVE(FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE);
+
+ private final String readPolicy;
+
+ AbfsReadPolicy(String readPolicy) {
+ this.readPolicy = readPolicy;
+ }
+
+ @Override
+ public String toString() {
+ return readPolicy;
+ }
+
+ /**
+ * Get the enum constant from the string name.
+ * @param name policy name as configured by user
+ * @return the corresponding AbsInputPolicy to be used
+ */
+ public static AbfsReadPolicy getAbfsReadPolicy(String name) {
+ String readPolicyStr = name.trim().toLowerCase(Locale.ENGLISH);
+ switch (readPolicyStr) {
+ // all these options currently map to random IO.
+ case FS_OPTION_OPENFILE_READ_POLICY_RANDOM:
+ case FS_OPTION_OPENFILE_READ_POLICY_COLUMNAR:
+ case FS_OPTION_OPENFILE_READ_POLICY_ORC:
+ case FS_OPTION_OPENFILE_READ_POLICY_PARQUET:
+ return RANDOM;
+
+ // handle the sequential formats.
+ case FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL:
+ case FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE:
+ return SEQUENTIAL;
+
+ // Everything else including ABFS Default Policy maps to Adaptive
+ case FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE:
+ default:
+ return ADAPTIVE;
+ }
+ }
+}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java
index f3e1e582f9d..7164e55e90c 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java
@@ -41,6 +41,11 @@ public abstract class AbfsRetryPolicy {
*/
private final String retryPolicyAbbreviation;
+ /**
+ * Constructor to initialize max retry count and abbreviation
+ * @param maxRetryCount maximum retry count
+ * @param retryPolicyAbbreviation abbreviation for retry policy
+ */
protected AbfsRetryPolicy(final int maxRetryCount, final String
retryPolicyAbbreviation) {
this.maxRetryCount = maxRetryCount;
this.retryPolicyAbbreviation = retryPolicyAbbreviation;
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
index 712b04fb499..5b53d641a20 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
@@ -23,7 +23,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
-import java.util.Stack;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
@@ -45,7 +44,6 @@ public abstract class ReadBufferManager {
private static int thresholdAgeMilliseconds;
private static int blockSize = DEFAULT_READ_AHEAD_BLOCK_SIZE; // default
block size for read-ahead in bytes
- private Stack<Integer> freeList = new Stack<>(); // indices in buffers[]
array that are available
private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of
requests that are not picked up by any worker thread yet
private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); //
requests being processed by worker threads
private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); //
buffers available for reading
@@ -200,15 +198,6 @@ protected static void setReadAheadBlockSize(int
readAheadBlockSize) {
blockSize = readAheadBlockSize;
}
- /**
- * Gets the stack of free buffer indices.
- *
- * @return the stack of free buffer indices
- */
- Stack<Integer> getFreeList() {
- return freeList;
- }
-
/**
* Gets the queue of read-ahead requests.
*
@@ -243,9 +232,7 @@ LinkedList<ReadBuffer> getCompletedReadList() {
* @return a list of free buffer indices
*/
@VisibleForTesting
- List<Integer> getFreeListCopy() {
- return new ArrayList<>(freeList);
- }
+ abstract List<Integer> getFreeListCopy();
/**
* Gets a copy of the read-ahead queue.
@@ -294,7 +281,9 @@ int getCompletedReadListSize() {
*/
@VisibleForTesting
protected void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) {
- freeList.clear();
+ clearFreeList();
completedReadList.add(buf);
}
+
+ abstract void clearFreeList();
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
index 240a6186666..c034d856596 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
@@ -24,6 +24,8 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.List;
+import java.util.Stack;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
@@ -42,6 +44,7 @@ public final class ReadBufferManagerV1 extends
ReadBufferManager {
private Thread[] threads = new Thread[NUM_THREADS];
private byte[][] buffers;
+ private Stack<Integer> freeList = new Stack<>(); // indices in buffers[]
array that are available
private static ReadBufferManagerV1 bufferManager;
// hide instance constructor
@@ -607,7 +610,21 @@ void resetBufferManager() {
setBufferManager(null); // reset the singleton instance
}
+ @Override
+ protected List<Integer> getFreeListCopy() {
+ return new ArrayList<>(freeList);
+ }
+
+ private Stack<Integer> getFreeList() {
+ return freeList;
+ }
+
private static void setBufferManager(ReadBufferManagerV1 manager) {
bufferManager = manager;
}
+
+ @Override
+ protected void clearFreeList() {
+ getFreeList().clear();
+ }
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
index 7f276eb77d8..5cbe4893b12 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
@@ -27,7 +27,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.Stack;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -100,7 +100,12 @@ public final class ReadBufferManagerV2 extends
ReadBufferManager {
private byte[][] bufferPool;
- private final Stack<Integer> removedBufferList = new Stack<>();
+ /*
+ * List of buffer indexes that are currently free and can be assigned to new
read-ahead requests.
+ * Using a thread safe data structure as multiple threads can access this
concurrently.
+ */
+ private final ConcurrentSkipListSet<Integer> removedBufferList = new
ConcurrentSkipListSet<>();
+ private ConcurrentSkipListSet<Integer> freeList = new
ConcurrentSkipListSet<>();
private ScheduledExecutorService memoryMonitorThread;
@@ -209,7 +214,7 @@ void init() {
// Start with just minimum number of buffers.
bufferPool[i]
= new byte[getReadAheadBlockSize()]; // same buffers are reused.
The byte array never goes back to GC
- getFreeList().add(i);
+ pushToFreeList(i);
numberOfActiveBuffers.getAndIncrement();
}
memoryMonitorThread = Executors.newSingleThreadScheduledExecutor(
@@ -768,12 +773,17 @@ private synchronized boolean tryMemoryUpscale() {
if (memoryLoad < memoryThreshold && getNumBuffers() < maxBufferPoolSize) {
// Create and Add more buffers in getFreeList().
int nextIndx = getNumBuffers();
- if (removedBufferList.isEmpty() && nextIndx < bufferPool.length) {
+ if (removedBufferList.isEmpty()) {
+ if (nextIndx >= bufferPool.length) {
+ printTraceLog("Invalid next index: {}. Current buffer pool size: {}",
+ nextIndx, bufferPool.length);
+ return false;
+ }
bufferPool[nextIndx] = new byte[getReadAheadBlockSize()];
pushToFreeList(nextIndx);
} else {
// Reuse a removed buffer index.
- int freeIndex = removedBufferList.pop();
+ int freeIndex = removedBufferList.pollFirst();
if (freeIndex >= bufferPool.length || bufferPool[freeIndex] != null) {
printTraceLog("Invalid free index: {}. Current buffer pool size: {}",
freeIndex, bufferPool.length);
@@ -811,7 +821,7 @@ > getThresholdAgeMilliseconds()) {
}
double memoryLoad = ResourceUtilizationUtils.getMemoryLoad();
- if (isDynamicScalingEnabled && memoryLoad > memoryThreshold) {
+ if (isDynamicScalingEnabled && memoryLoad > memoryThreshold &&
getNumBuffers() > minBufferPoolSize) {
synchronized (this) {
if (isFreeListEmpty()) {
printTraceLog(
@@ -980,7 +990,7 @@ public void testResetReadBufferManager() {
getReadAheadQueue().clear();
getInProgressList().clear();
getCompletedReadList().clear();
- getFreeList().clear();
+ clearFreeList();
for (int i = 0; i < maxBufferPoolSize; i++) {
bufferPool[i] = null;
}
@@ -1023,6 +1033,16 @@ void resetBufferManager() {
setIsConfigured(false);
}
+ @Override
+ protected List<Integer> getFreeListCopy() {
+ return new ArrayList<>(freeList);
+ }
+
+ @Override
+ protected void clearFreeList() {
+ freeList.clear();
+ }
+
private static void setBufferManager(ReadBufferManagerV2 manager) {
bufferManager = manager;
}
@@ -1062,11 +1082,20 @@ public int getMinBufferPoolSize() {
return minBufferPoolSize;
}
+ @VisibleForTesting
+ public void setMinBufferPoolSize(int size) {
+ this.minBufferPoolSize = size;
+ }
+
@VisibleForTesting
public int getMaxBufferPoolSize() {
return maxBufferPoolSize;
}
+ /**
+ * Gets the maximum buffer pool size.
+ * @return size of the maximum buffer pool
+ */
@VisibleForTesting
public int getCurrentThreadPoolSize() {
return workerRefs.size();
@@ -1082,6 +1111,10 @@ public int getMemoryMonitoringIntervalInMilliSec() {
return memoryMonitoringIntervalInMilliSec;
}
+ /**
+ * Returns the scheduled executor service used for CPU monitoring.
+ * @return the ScheduledExecutorService for CPU monitoring tasks
+ */
@VisibleForTesting
public ScheduledExecutorService getCpuMonitoringThread() {
return cpuMonitorThread;
@@ -1098,6 +1131,13 @@ public long getMaxJvmCpuUtilization() {
return maxJvmCpuUtilization;
}
+ /**
+ * Calculates the required thread pool size based on the current
+ * read-ahead queue size and in-progress list size, applying a buffer
+ * to accommodate workload fluctuations.
+ *
+ * @return the calculated required thread pool size
+ */
public int getRequiredThreadPoolSize() {
return (int) Math.ceil(THREAD_POOL_REQUIREMENT_BUFFER
* (getReadAheadQueue().size()
@@ -1105,30 +1145,15 @@ public int getRequiredThreadPoolSize() {
}
private boolean isFreeListEmpty() {
- LOCK.lock();
- try {
- return getFreeList().isEmpty();
- } finally {
- LOCK.unlock();
- }
+ return this.freeList.isEmpty();
}
private Integer popFromFreeList() {
- LOCK.lock();
- try {
- return getFreeList().pop();
- } finally {
- LOCK.unlock();
- }
+ return this.freeList.pollFirst();
}
private void pushToFreeList(int idx) {
- LOCK.lock();
- try {
- getFreeList().push(idx);
- } finally {
- LOCK.unlock();
- }
+ this.freeList.add(idx);
}
private void incrementActiveBufferCount() {
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
index 6b87f1b73ef..c215094eca0 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
@@ -26,6 +26,7 @@
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.services.AbfsAdaptiveInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
@@ -275,7 +276,7 @@ public void testWithNullStreamStatistics() throws
IOException {
getTestTracingContext(fs, false), null);
// AbfsInputStream with no StreamStatistics.
- in = new AbfsInputStream(fs.getAbfsClient(), null,
+ in = new AbfsAdaptiveInputStream(fs.getAbfsClient(), null,
nullStatFilePath.toUri().getPath(), ONE_KB, abfsInputStreamContext,
abfsRestOperation.getResult().getResponseHeader("ETag"),
getTestTracingContext(fs, false));
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
index 938f5f4300c..2d8629294f6 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
@@ -28,11 +28,11 @@
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
-import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
import static
org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamTestUtils.HUNDRED;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -117,7 +117,7 @@ public void
testAzureBlobFileSystemBackReferenceInInputStream()
FSDataInputStream in = getFileSystem().open(path)) {
AbfsInputStream abfsInputStream = (AbfsInputStream)
in.getWrappedStream();
- Assertions.assertThat(abfsInputStream.getFsBackRef().isNull())
+ assertThat(abfsInputStream.getFsBackRef().isNull())
.describedAs("BackReference in input stream should not be null")
.isFalse();
}
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
index 93df6529cb8..5cf0bd473fc 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
@@ -57,6 +58,10 @@
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderVersion;
import org.apache.hadoop.fs.impl.OpenFileParameters;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_AVRO;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SPLIT_NO_LIMIT;
@@ -67,6 +72,7 @@
import static
org.apache.hadoop.fs.azurebfs.constants.ReadType.MISSEDCACHE_READ;
import static org.apache.hadoop.fs.azurebfs.constants.ReadType.NORMAL_READ;
import static org.apache.hadoop.fs.azurebfs.constants.ReadType.PREFETCH_READ;
+import static org.apache.hadoop.fs.azurebfs.constants.ReadType.RANDOM_READ;
import static org.apache.hadoop.fs.azurebfs.constants.ReadType.SMALLFILE_READ;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -92,8 +98,7 @@
/**
* Unit test AbfsInputStream.
*/
-public class TestAbfsInputStream extends
- AbstractAbfsIntegrationTest {
+public class TestAbfsInputStream extends AbstractAbfsIntegrationTest {
private static final int ONE_KB = 1 * 1024;
private static final int TWO_KB = 2 * 1024;
@@ -148,7 +153,7 @@ AbfsInputStream getAbfsInputStream(AbfsClient
mockAbfsClient,
String fileName) throws IOException {
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
// Create AbfsInputStream with the client instance
- AbfsInputStream inputStream = new AbfsInputStream(
+ AbfsInputStream inputStream = new AbfsAdaptiveInputStream(
mockAbfsClient,
null,
FORWARD_SLASH + fileName,
@@ -176,7 +181,7 @@ public AbfsInputStream getAbfsInputStream(AbfsClient
abfsClient,
int readAheadBlockSize) throws IOException {
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
// Create AbfsInputStream with the client instance
- AbfsInputStream inputStream = new AbfsInputStream(
+ AbfsInputStream inputStream = new AbfsAdaptiveInputStream(
abfsClient,
null,
FORWARD_SLASH + fileName,
@@ -848,6 +853,7 @@ public void testReadTypeInTracingContextHeader() throws
Exception {
fileSize = 3 * ONE_MB; // To make sure multiple blocks are read with MR
totalReadCalls += 3; // 3 block of 1MB.
Mockito.doReturn(0).when(spiedConfig).getReadAheadQueueDepth();
+
Mockito.doReturn(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL).when(spiedConfig).getAbfsReadPolicy();
doReturn(true).when(spiedConfig).isReadAheadEnabled();
testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize,
MISSEDCACHE_READ, 3, totalReadCalls);
@@ -881,6 +887,15 @@ public void testReadTypeInTracingContextHeader() throws
Exception {
doReturn(false).when(spiedConfig).optimizeFooterRead();
testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize,
SMALLFILE_READ, 1, totalReadCalls);
+ /*
+ * Test to verify Random Read Type.
+ * Setting Read Policy to Parquet ensures Random Read Type.
+ */
+ fileSize = 3 * ONE_MB; // To make sure multiple blocks are read.
+ totalReadCalls += 3; // Full file will be read along with footer.
+
doReturn(FS_OPTION_OPENFILE_READ_POLICY_PARQUET).when(spiedConfig).getAbfsReadPolicy();
+ testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, RANDOM_READ,
1, totalReadCalls);
+
/*
* Test to verify Direct Read Type and a read from random position.
* Separate AbfsInputStream method needs to be called.
@@ -904,11 +919,11 @@ public void testReadTypeInTracingContextHeader() throws
Exception {
private void testReadTypeInTracingContextHeaderInternal(AzureBlobFileSystem
fs,
int fileSize, ReadType readType, int numOfReadCalls, int totalReadCalls)
throws Exception {
Path testPath = createTestFile(fs, fileSize);
- readFile(fs, testPath, fileSize);
+ readFile(fs, testPath, fileSize, readType);
assertReadTypeInClientRequestId(fs, numOfReadCalls, totalReadCalls,
readType);
}
- /*
+ /**
* Test to verify that both conditions of prefetch read and respective config
* enabled needs to be true for the priority header to be added
*/
@@ -937,6 +952,101 @@ public void
testPrefetchReadAddsPriorityHeaderWithDifferentConfigs()
executePrefetchReadTest(tracingContext1, configuration1, false);
}
+ /**
+ * Test to verify that the correct AbfsInputStream instance is created
+ * based on the read policy set in AbfsConfiguration.
+ */
+ @Test
+ public void testAbfsInputStreamInstance() throws Exception {
+ AzureBlobFileSystem fs = getFileSystem();
+ Path path = new Path("/testPath");
+ fs.create(path).close();
+
+ // Assert that Sequential Read Policy uses Prefetch Input Stream
+
getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL);
+ InputStream stream = fs.open(path).getWrappedStream();
+ assertThat(stream).isInstanceOf(AbfsPrefetchInputStream.class);
+ stream.close();
+
+ // Assert that Adaptive Read Policy uses Adaptive Input Stream
+
getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE);
+ stream = fs.open(path).getWrappedStream();
+ assertThat(stream).isInstanceOf(AbfsAdaptiveInputStream.class);
+ stream.close();
+
+ // Assert that Parquet Read Policy uses Random Input Stream
+
getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_PARQUET);
+ stream = fs.open(path).getWrappedStream();
+ assertThat(stream).isInstanceOf(AbfsRandomInputStream.class);
+ stream.close();
+
+ // Assert that Avro Read Policy uses Adaptive Input Stream
+
getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_AVRO);
+ stream = fs.open(path).getWrappedStream();
+ assertThat(stream).isInstanceOf(AbfsAdaptiveInputStream.class);
+ stream.close();
+ }
+
+ /**
+ * Test to verify that Random Input Stream does not queue prefetches.
+ * @throws Exception if any error occurs during the test
+ */
+ @Test
+ public void testRandomInputStreamDoesNotQueuePrefetches() throws Exception {
+ AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem());
+ AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore());
+ AbfsConfiguration spiedConfig =
Mockito.spy(spiedStore.getAbfsConfiguration());
+ AbfsClient spiedClient = Mockito.spy(spiedStore.getClient());
+ Mockito.doReturn(ONE_MB).when(spiedConfig).getReadBufferSize();
+ Mockito.doReturn(ONE_MB).when(spiedConfig).getReadAheadBlockSize();
+ Mockito.doReturn(spiedClient).when(spiedStore).getClient();
+ Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
+ Mockito.doReturn(spiedConfig).when(spiedStore).getAbfsConfiguration();
+
+ int fileSize = 3 * ONE_MB; // To make sure multiple blocks are read.
+ int totalReadCalls = 3;
+ Mockito.doReturn(3).when(spiedConfig).getReadAheadQueueDepth();
+
Mockito.doReturn(FS_OPTION_OPENFILE_READ_POLICY_PARQUET).when(spiedConfig).getAbfsReadPolicy();
+ testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, RANDOM_READ,
3, totalReadCalls);
+ }
+
+ /**
+ * Test to verify that Adaptive Input Stream queues prefetches for in-order
reads
+ * and performs random reads for out-of-order seeks.
+ * @throws Exception if any error occurs during the test
+ */
+ @Test
+ public void testAdaptiveInputStream() throws Exception {
+ AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem());
+ AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore());
+ AbfsConfiguration spiedConfig =
Mockito.spy(spiedStore.getAbfsConfiguration());
+ AbfsClient spiedClient = Mockito.spy(spiedStore.getClient());
+ Mockito.doReturn(ONE_MB).when(spiedConfig).getReadBufferSize();
+ Mockito.doReturn(ONE_MB).when(spiedConfig).getReadAheadBlockSize();
+ Mockito.doReturn(ONE_KB).when(spiedConfig).getReadAheadRange();
+
Mockito.doReturn(FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE).when(spiedConfig).getAbfsReadPolicy();
+ Mockito.doReturn(spiedClient).when(spiedStore).getClient();
+ Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
+ Mockito.doReturn(spiedConfig).when(spiedStore).getAbfsConfiguration();
+
+ int fileSize = 10 * ONE_MB;
+ Path testPath = createTestFile(spiedFs, fileSize);
+
+ try (FSDataInputStream iStream = spiedFs.open(testPath)) {
+
assertThat(iStream.getWrappedStream()).isInstanceOf(AbfsAdaptiveInputStream.class);
+
+ // In order reads trigger prefetches in adaptive stream
+ int bytesRead = iStream.read(new byte[2 * ONE_MB], 0, 2 * ONE_MB);
+ assertReadTypeInClientRequestId(spiedFs, 3, 3, PREFETCH_READ);
+ assertThat(bytesRead).isEqualTo(2 * ONE_MB);
+
+ // Out of order seek causes random read
+ iStream.seek(7 * ONE_MB);
+ bytesRead = iStream.read(new byte[ONE_MB/2], 0, ONE_MB/2);
+ assertReadTypeInClientRequestId(spiedFs, 1, 4, RANDOM_READ);
+ }
+ }
+
/*
* Helper method to execute read and verify if priority header is added or
not as expected
*/
@@ -1005,8 +1115,15 @@ private Path createTestFile(AzureBlobFileSystem fs, int
fileSize) throws Excepti
return testPath;
}
- private void readFile(AzureBlobFileSystem fs, Path testPath, int fileSize)
throws Exception {
+ private void readFile(AzureBlobFileSystem fs, Path testPath, int fileSize,
ReadType readType) throws Exception {
try (FSDataInputStream iStream = fs.open(testPath)) {
+ if (readType == PREFETCH_READ || readType == MISSEDCACHE_READ) {
+
assertThat(iStream.getWrappedStream()).isInstanceOf(AbfsPrefetchInputStream.class);
+ } else if (readType == NORMAL_READ) {
+
assertThat(iStream.getWrappedStream()).isInstanceOf(AbfsAdaptiveInputStream.class);
+ } else if (readType == RANDOM_READ) {
+
assertThat(iStream.getWrappedStream()).isInstanceOf(AbfsRandomInputStream.class);
+ }
int bytesRead = iStream.read(new byte[fileSize], 0,
fileSize);
assertThat(fileSize)
@@ -1027,6 +1144,7 @@ private void
assertReadTypeInClientRequestId(AzureBlobFileSystem fs, int numOfRe
ArgumentCaptor<ContextEncryptionAdapter> captor8 =
ArgumentCaptor.forClass(ContextEncryptionAdapter.class);
ArgumentCaptor<TracingContext> captor9 =
ArgumentCaptor.forClass(TracingContext.class);
+ List<String> paths = captor1.getAllValues();
verify(fs.getAbfsStore().getClient(), times(totalReadCalls)).read(
captor1.capture(), captor2.capture(), captor3.capture(),
captor4.capture(), captor5.capture(), captor6.capture(),
@@ -1071,8 +1189,14 @@ private void
verifyHeaderForReadTypeInTracingContextHeader(TracingContext tracin
}
assertThat(idList[OPERATION_INDEX]).describedAs("Operation Type Should Be
Read")
.isEqualTo(FSOperationType.READ.toString());
- assertThat(idList[READTYPE_INDEX]).describedAs("Read type in tracing
context header should match")
- .isEqualTo(readType.toString());
+ if (readType == PREFETCH_READ) {
+ // For prefetch read, it might be missed cache as well.
+ assertThat(idList[READTYPE_INDEX]).describedAs("Read type in tracing
context header should match")
+ .isIn(PREFETCH_READ.toString(), MISSEDCACHE_READ.toString());
+ } else {
+ assertThat(idList[READTYPE_INDEX]).describedAs("Read type in tracing
context header should match")
+ .isEqualTo(readType.toString());
+ }
}
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
index e94c535bd39..a7cbc83f0c7 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
@@ -268,6 +268,7 @@ public void testMemoryDownscaleIfMemoryAboveThreshold()
throws Exception {
ReadBufferManagerV2 bufferManagerV2 =
ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters());
int initialBuffers = bufferManagerV2.getMinBufferPoolSize();
assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(initialBuffers);
+ bufferManagerV2.setMinBufferPoolSize(initialBuffers - 5); // allow
downscale
running = true;
Thread t = new Thread(() -> {
while (running) {
@@ -314,6 +315,7 @@ public void testReadMetricUpdation() throws Exception {
bufferManagerV2.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad());
int initialBuffers = bufferManagerV2.getMinBufferPoolSize();
assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(initialBuffers);
+ bufferManagerV2.setMinBufferPoolSize(initialBuffers - 5); // allow
downscale
running = true;
Thread t = new Thread(() -> {
while (running) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]