This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit cb6729224e15b89bd7fa7877fe045d28b3582f7b
Author: bilaharith <52483117+bilahar...@users.noreply.github.com>
AuthorDate: Sun Jan 3 00:07:10 2021 +0530

    HADOOP-17347. ABFS: Read optimizations
    
    - Contributed by Bilahari T H
    
    (cherry picked from commit 1448add08fcd4a23e59eab5f75ef46fca6b1c3d1)
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  28 ++
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |   2 +
 .../fs/azurebfs/constants/ConfigurationKeys.java   |   2 +
 .../constants/FileSystemConfigurations.java        |   6 +-
 .../fs/azurebfs/services/AbfsInputStream.java      | 194 +++++++++--
 .../azurebfs/services/AbfsInputStreamContext.java  |  24 ++
 .../fs/azurebfs/services/ITestAbfsInputStream.java | 256 +++++++++++++++
 .../services/ITestAbfsInputStreamReadFooter.java   | 358 +++++++++++++++++++++
 .../ITestAbfsInputStreamSmallFileReads.java        | 326 +++++++++++++++++++
 9 files changed, 1175 insertions(+), 21 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 3d09a80..b1c95d2 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -100,6 +100,16 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_WRITE_BUFFER_SIZE)
   private int writeBufferSize;
 
+  @BooleanConfigurationValidatorAnnotation(
+      ConfigurationKey = AZURE_READ_SMALL_FILES_COMPLETELY,
+      DefaultValue = DEFAULT_READ_SMALL_FILES_COMPLETELY)
+  private boolean readSmallFilesCompletely;
+
+  @BooleanConfigurationValidatorAnnotation(
+      ConfigurationKey = AZURE_READ_OPTIMIZE_FOOTER_READ,
+      DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
+  private boolean optimizeFooterRead;
+
   @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
AZURE_READ_BUFFER_SIZE,
       MinValue = MIN_BUFFER_SIZE,
       MaxValue = MAX_BUFFER_SIZE,
@@ -527,6 +537,14 @@ public class AbfsConfiguration{
     return this.writeBufferSize;
   }
 
+  public boolean readSmallFilesCompletely() {
+    return this.readSmallFilesCompletely;
+  }
+
+  public boolean optimizeFooterRead() {
+    return this.optimizeFooterRead;
+  }
+
   public int getReadBufferSize() {
     return this.readBufferSize;
   }
@@ -925,4 +943,14 @@ public class AbfsConfiguration{
     return authority;
   }
 
+  @VisibleForTesting
+  public void setReadSmallFilesCompletely(boolean readSmallFilesCompletely) {
+    this.readSmallFilesCompletely = readSmallFilesCompletely;
+  }
+
+  @VisibleForTesting
+  public void setOptimizeFooterRead(boolean optimizeFooterRead) {
+    this.optimizeFooterRead = optimizeFooterRead;
+  }
+
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index a766c62..869a6f9 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -643,6 +643,8 @@ public class AzureBlobFileSystemStore implements Closeable {
             .withReadBufferSize(abfsConfiguration.getReadBufferSize())
             
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
             .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
+            
.withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely())
+            .withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead())
             .withStreamStatistics(new AbfsInputStreamStatisticsImpl())
             .withShouldReadBufferSizeAlways(
                 abfsConfiguration.shouldReadBufferSizeAlways())
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index cb9c0de..3e1ff80 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -56,6 +56,8 @@ public final class ConfigurationKeys {
   public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = 
"fs.azure.write.max.requests.to.queue";
   public static final String AZURE_WRITE_BUFFER_SIZE = 
"fs.azure.write.request.size";
   public static final String AZURE_READ_BUFFER_SIZE = 
"fs.azure.read.request.size";
+  public static final String AZURE_READ_SMALL_FILES_COMPLETELY = 
"fs.azure.read.smallfilescompletely";
+  public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = 
"fs.azure.read.optimizefooterread";
   public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = 
"fs.azure.block.size";
   public static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = 
"fs.azure.block.location.impersonatedhost";
   public static final String AZURE_CONCURRENT_CONNECTION_VALUE_OUT = 
"fs.azure.concurrentRequestCount.out";
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 27dafd0..8008206 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -50,13 +50,15 @@ public final class FileSystemConfigurations {
   public static final int 
DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL = SIXTY_SECONDS;
   public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF 
= 2;
 
-  private static final int ONE_KB = 1024;
-  private static final int ONE_MB = ONE_KB * ONE_KB;
+  public static final int ONE_KB = 1024;
+  public static final int ONE_MB = ONE_KB * ONE_KB;
 
   // Default upload and download buffer size
   public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB;  // 8 MB
   public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB;  // 4 
MB
   public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB;  // 4 MB
+  public static final boolean DEFAULT_READ_SMALL_FILES_COMPLETELY = false;
+  public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = false;
   public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false;
   public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB;
   public static final int MIN_BUFFER_SIZE = 16 * ONE_KB;  // 16 KB
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 3682bcb..1d109f4 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -38,6 +38,10 @@ import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationExcep
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
 
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
 import static org.apache.hadoop.util.StringUtils.toLowerCase;
 
 /**
@@ -46,6 +50,9 @@ import static org.apache.hadoop.util.StringUtils.toLowerCase;
 public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
         StreamCapabilities {
   private static final Logger LOG = 
LoggerFactory.getLogger(AbfsInputStream.class);
+  //  Footer size is set to qualify for both ORC and parquet files
+  public static final int FOOTER_SIZE = 16 * ONE_KB;
+  public static final int MAX_OPTIMIZED_READ_ATTEMPTS = 2;
 
   private int readAheadBlockSize;
   private final AbfsClient client;
@@ -59,6 +66,7 @@ public class AbfsInputStream extends FSInputStream implements 
CanUnbuffer,
   private final boolean readAheadEnabled; // whether enable readAhead;
   private final boolean alwaysReadBufferSize;
 
+  private boolean firstRead = true;
   // SAS tokens can be re-used until they expire
   private CachedSASToken cachedSasToken;
   private byte[] buffer = null;            // will be initialized on first use
@@ -70,11 +78,21 @@ public class AbfsInputStream extends FSInputStream 
implements CanUnbuffer,
   //                                                      of valid bytes in 
buffer)
   private boolean closed = false;
 
+  //  Optimisations modify the pointer fields.
+  //  For better resilience the following fields are used to save the
+  //  existing state before optimization flows.
+  private int limitBkp;
+  private int bCursorBkp;
+  private long fCursorBkp;
+  private long fCursorAfterLastReadBkp;
+
   /** Stream statistics. */
   private final AbfsInputStreamStatistics streamStatistics;
   private long bytesFromReadAhead; // bytes read from readAhead; for testing
   private long bytesFromRemoteRead; // bytes read remotely; for testing
 
+  private final AbfsInputStreamContext context;
+
   public AbfsInputStream(
           final AbfsClient client,
           final Statistics statistics,
@@ -96,6 +114,7 @@ public class AbfsInputStream extends FSInputStream 
implements CanUnbuffer,
     this.cachedSasToken = new CachedSASToken(
         abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
     this.streamStatistics = abfsInputStreamContext.getStreamStatistics();
+    this.context = abfsInputStreamContext;
     readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize();
 
     // Propagate the config values to ReadBufferManager so that the first 
instance
@@ -137,7 +156,13 @@ public class AbfsInputStream extends FSInputStream 
implements CanUnbuffer,
     }
     incrementReadOps();
     do {
-      lastReadBytes = readOneBlock(b, currentOff, currentLen);
+      if (shouldReadFully()) {
+        lastReadBytes = readFileCompletely(b, currentOff, currentLen);
+      } else if (shouldReadLastBlock()) {
+        lastReadBytes = readLastBlock(b, currentOff, currentLen);
+      } else {
+        lastReadBytes = readOneBlock(b, currentOff, currentLen);
+      }
       if (lastReadBytes > 0) {
         currentOff += lastReadBytes;
         currentLen -= lastReadBytes;
@@ -150,27 +175,24 @@ public class AbfsInputStream extends FSInputStream 
implements CanUnbuffer,
     return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
   }
 
-  private int readOneBlock(final byte[] b, final int off, final int len) 
throws IOException {
-    if (closed) {
-      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
-    }
+  private boolean shouldReadFully() {
+    return this.firstRead && this.context.readSmallFilesCompletely()
+        && this.contentLength <= this.bufferSize;
+  }
 
-    Preconditions.checkNotNull(b);
-    LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
-        off, len);
+  private boolean shouldReadLastBlock() {
+    long footerStart = max(0, this.contentLength - FOOTER_SIZE);
+    return this.firstRead && this.context.optimizeFooterRead()
+        && this.fCursor >= footerStart;
+  }
 
+  private int readOneBlock(final byte[] b, final int off, final int len) 
throws IOException {
     if (len == 0) {
       return 0;
     }
-
-    if (this.available() == 0) {
+    if (!validate(b, off, len)) {
       return -1;
     }
-
-    if (off < 0 || len < 0 || len > b.length - off) {
-      throw new IndexOutOfBoundsException();
-    }
-
     //If buffer is empty, then fill the buffer.
     if (bCursor == limit) {
       //If EOF, then return -1
@@ -197,6 +219,9 @@ public class AbfsInputStream extends FSInputStream 
implements CanUnbuffer,
           bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
         }
       }
+      if (firstRead) {
+        firstRead = false;
+      }
 
       if (bytesRead == -1) {
         return -1;
@@ -206,11 +231,123 @@ public class AbfsInputStream extends FSInputStream 
implements CanUnbuffer,
       fCursor += bytesRead;
       fCursorAfterLastRead = fCursor;
     }
+    return copyToUserBuffer(b, off, len);
+  }
+
+  private int readFileCompletely(final byte[] b, final int off, final int len)
+      throws IOException {
+    if (len == 0) {
+      return 0;
+    }
+    if (!validate(b, off, len)) {
+      return -1;
+    }
+    savePointerState();
+    // data need to be copied to user buffer from index bCursor, bCursor has
+    // to be the current fCusor
+    bCursor = (int) fCursor;
+    return optimisedRead(b, off, len, 0, contentLength);
+  }
+
+  private int readLastBlock(final byte[] b, final int off, final int len)
+      throws IOException {
+    if (len == 0) {
+      return 0;
+    }
+    if (!validate(b, off, len)) {
+      return -1;
+    }
+    savePointerState();
+    // data need to be copied to user buffer from index bCursor,
+    // AbfsInutStream buffer is going to contain data from last block start. In
+    // that case bCursor will be set to fCursor - lastBlockStart
+    long lastBlockStart = max(0, contentLength - bufferSize);
+    bCursor = (int) (fCursor - lastBlockStart);
+    // 0 if contentlength is < buffersize
+    long actualLenToRead = min(bufferSize, contentLength);
+    return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
+  }
+
+  private int optimisedRead(final byte[] b, final int off, final int len,
+      final long readFrom, final long actualLen) throws IOException {
+    fCursor = readFrom;
+    int totalBytesRead = 0;
+    int lastBytesRead = 0;
+    try {
+      buffer = new byte[bufferSize];
+      for (int i = 0;
+           i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) {
+        lastBytesRead = readInternal(fCursor, buffer, limit,
+            (int) actualLen - limit, true);
+        if (lastBytesRead > 0) {
+          totalBytesRead += lastBytesRead;
+          limit += lastBytesRead;
+          fCursor += lastBytesRead;
+          fCursorAfterLastRead = fCursor;
+        }
+      }
+    } catch (IOException e) {
+      LOG.debug("Optimized read failed. Defaulting to readOneBlock {}", e);
+      restorePointerState();
+      return readOneBlock(b, off, len);
+    } finally {
+      firstRead = false;
+    }
+    if (totalBytesRead < 1) {
+      restorePointerState();
+      return -1;
+    }
+    //  If the read was partial and the user requested part of data has
+    //  not read then fallback to readoneblock. When limit is smaller than
+    //  bCursor that means the user requested data has not been read.
+    if (fCursor < contentLength && bCursor > limit) {
+      restorePointerState();
+      return readOneBlock(b, off, len);
+    }
+    return copyToUserBuffer(b, off, len);
+  }
+
+  private void savePointerState() {
+    //  Saving the current state for fall back ifn case optimization fails
+    this.limitBkp = this.limit;
+    this.fCursorBkp = this.fCursor;
+    this.fCursorAfterLastReadBkp = this.fCursorAfterLastRead;
+    this.bCursorBkp = this.bCursor;
+  }
+
+  private void restorePointerState() {
+    //  Saving the current state for fall back ifn case optimization fails
+    this.limit = this.limitBkp;
+    this.fCursor = this.fCursorBkp;
+    this.fCursorAfterLastRead = this.fCursorAfterLastReadBkp;
+    this.bCursor = this.bCursorBkp;
+  }
+
+  private boolean validate(final byte[] b, final int off, final int len)
+      throws IOException {
+    if (closed) {
+      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+
+    Preconditions.checkNotNull(b);
+    LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
+        off, len);
 
+    if (this.available() == 0) {
+      return false;
+    }
+
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
+    return true;
+  }
+
+  private int copyToUserBuffer(byte[] b, int off, int len){
     //If there is anything in the buffer, then return lesser of (requested 
bytes) and (bytes in buffer)
     //(bytes returned may be less than requested)
     int bytesRemaining = limit - bCursor;
-    int bytesToRead = Math.min(len, bytesRemaining);
+    int bytesToRead = min(len, bytesRemaining);
     System.arraycopy(buffer, bCursor, b, off, bytesToRead);
     bCursor += bytesToRead;
     if (statistics != null) {
@@ -224,7 +361,6 @@ public class AbfsInputStream extends FSInputStream 
implements CanUnbuffer,
     return bytesToRead;
   }
 
-
   private int readInternal(final long position, final byte[] b, final int 
offset, final int length,
                            final boolean bypassReadAhead) throws IOException {
     if (readAheadEnabled && !bypassReadAhead) {
@@ -239,7 +375,7 @@ public class AbfsInputStream extends FSInputStream 
implements CanUnbuffer,
       long nextOffset = position;
       // First read to queue needs to be of readBufferSize and later
       // of readAhead Block size
-      long nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
+      long nextSize = min((long) bufferSize, contentLength - nextOffset);
       LOG.debug("read ahead enabled issuing readheads num = {}", 
numReadAheads);
       while (numReadAheads > 0 && nextOffset < contentLength) {
         LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
@@ -248,7 +384,7 @@ public class AbfsInputStream extends FSInputStream 
implements CanUnbuffer,
         nextOffset = nextOffset + nextSize;
         numReadAheads--;
         // From next round onwards should be of readahead block size.
-        nextSize = Math.min((long) readAheadBlockSize, contentLength - 
nextOffset);
+        nextSize = min((long) readAheadBlockSize, contentLength - nextOffset);
       }
 
       // try reading from buffers first
@@ -572,4 +708,24 @@ public class AbfsInputStream extends FSInputStream 
implements CanUnbuffer,
     }
     return sb.toString();
   }
+
+  @VisibleForTesting
+  int getBCursor() {
+    return this.bCursor;
+  }
+
+  @VisibleForTesting
+  long getFCursor() {
+    return this.fCursor;
+  }
+
+  @VisibleForTesting
+  long getFCursorAfterLastRead() {
+    return this.fCursorAfterLastRead;
+  }
+
+  @VisibleForTesting
+  long getLimit() {
+    return this.limit;
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
index ade0583..ab3d3b0 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
@@ -40,6 +40,10 @@ public class AbfsInputStreamContext extends 
AbfsStreamContext {
 
   private AbfsInputStreamStatistics streamStatistics;
 
+  private boolean readSmallFilesCompletely;
+
+  private boolean optimizeFooterRead;
+
   public AbfsInputStreamContext(final long 
sasTokenRenewPeriodForStreamsInSeconds) {
     super(sasTokenRenewPeriodForStreamsInSeconds);
   }
@@ -69,6 +73,18 @@ public class AbfsInputStreamContext extends 
AbfsStreamContext {
     return this;
   }
 
+  public AbfsInputStreamContext withReadSmallFilesCompletely(
+      final boolean readSmallFilesCompletely) {
+    this.readSmallFilesCompletely = readSmallFilesCompletely;
+    return this;
+  }
+
+  public AbfsInputStreamContext withOptimizeFooterRead(
+      final boolean optimizeFooterRead) {
+    this.optimizeFooterRead = optimizeFooterRead;
+    return this;
+  }
+
   public AbfsInputStreamContext withShouldReadBufferSizeAlways(
       final boolean alwaysReadBufferSize) {
     this.alwaysReadBufferSize = alwaysReadBufferSize;
@@ -110,6 +126,14 @@ public class AbfsInputStreamContext extends 
AbfsStreamContext {
     return streamStatistics;
   }
 
+  public boolean readSmallFilesCompletely() {
+    return this.readSmallFilesCompletely;
+  }
+
+  public boolean optimizeFooterRead() {
+    return this.optimizeFooterRead;
+  }
+
   public boolean shouldReadBufferSizeAlways() {
     return alwaysReadBufferSize;
   }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
new file mode 100644
index 0000000..44b0a36
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
+import org.junit.Test;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+public class ITestAbfsInputStream extends AbstractAbfsIntegrationTest {
+
+  protected static final int HUNDRED = 100;
+
+  public ITestAbfsInputStream() throws Exception {
+  }
+
+  @Test
+  public void testWithNoOptimization() throws Exception {
+    for (int i = 2; i <= 7; i++) {
+      int fileSize = i * ONE_MB;
+      final AzureBlobFileSystem fs = getFileSystem(false, false, fileSize);
+      String fileName = methodName.getMethodName() + i;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+      testWithNoOptimization(fs, testFilePath, HUNDRED, fileContent);
+    }
+  }
+
+  protected void testWithNoOptimization(final FileSystem fs,
+      final Path testFilePath, final int seekPos, final byte[] fileContent)
+      throws IOException {
+    FSDataInputStream iStream = fs.open(testFilePath);
+    try {
+      AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
+          .getWrappedStream();
+
+      iStream = new FSDataInputStream(abfsInputStream);
+      seek(iStream, seekPos);
+      long totalBytesRead = 0;
+      int length = HUNDRED * HUNDRED;
+      do {
+        byte[] buffer = new byte[length];
+        int bytesRead = iStream.read(buffer, 0, length);
+        totalBytesRead += bytesRead;
+        if ((totalBytesRead + seekPos) >= fileContent.length) {
+          length = (fileContent.length - seekPos) % length;
+        }
+        assertEquals(length, bytesRead);
+        assertContentReadCorrectly(fileContent,
+            (int) (seekPos + totalBytesRead - length), length, buffer);
+
+        assertTrue(abfsInputStream.getFCursor() >= seekPos + totalBytesRead);
+        assertTrue(abfsInputStream.getFCursorAfterLastRead() >= seekPos + 
totalBytesRead);
+        assertTrue(abfsInputStream.getBCursor() >= totalBytesRead % 
abfsInputStream.getBufferSize());
+        assertTrue(abfsInputStream.getLimit() >= totalBytesRead % 
abfsInputStream.getBufferSize());
+      } while (totalBytesRead + seekPos < fileContent.length);
+    } finally {
+      iStream.close();
+    }
+  }
+
+  @Test
+  public void testExceptionInOptimization() throws Exception {
+    for (int i = 2; i <= 7; i++) {
+      int fileSize = i * ONE_MB;
+      final AzureBlobFileSystem fs = getFileSystem(true, true, fileSize);
+      String fileName = methodName.getMethodName() + i;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+      testExceptionInOptimization(fs, testFilePath, fileSize - HUNDRED,
+          fileSize / 4, fileContent);
+    }
+  }
+
+  private void testExceptionInOptimization(final FileSystem fs,
+      final Path testFilePath,
+      final int seekPos, final int length, final byte[] fileContent)
+      throws IOException {
+
+    FSDataInputStream iStream = fs.open(testFilePath);
+    try {
+      AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
+          .getWrappedStream();
+      abfsInputStream = spy(abfsInputStream);
+      doThrow(new IOException())
+          .doCallRealMethod()
+          .when(abfsInputStream)
+          .readRemote(anyLong(), any(), anyInt(), anyInt());
+
+      iStream = new FSDataInputStream(abfsInputStream);
+      verifyBeforeSeek(abfsInputStream);
+      seek(iStream, seekPos);
+      byte[] buffer = new byte[length];
+      int bytesRead = iStream.read(buffer, 0, length);
+      long actualLength = length;
+      if (seekPos + length > fileContent.length) {
+        long delta = seekPos + length - fileContent.length;
+        actualLength = length - delta;
+      }
+      assertEquals(bytesRead, actualLength);
+      assertContentReadCorrectly(fileContent, seekPos, (int) actualLength, 
buffer);
+      assertEquals(fileContent.length, abfsInputStream.getFCursor());
+      assertEquals(fileContent.length, 
abfsInputStream.getFCursorAfterLastRead());
+      assertEquals(actualLength, abfsInputStream.getBCursor());
+      assertTrue(abfsInputStream.getLimit() >= actualLength);
+    } finally {
+      iStream.close();
+    }
+  }
+
+  protected AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely)
+      throws IOException {
+    final AzureBlobFileSystem fs = getFileSystem();
+    getAbfsStore(fs).getAbfsConfiguration()
+        .setReadSmallFilesCompletely(readSmallFilesCompletely);
+    return fs;
+  }
+
+  private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead,
+      boolean readSmallFileCompletely, int fileSize) throws IOException {
+    final AzureBlobFileSystem fs = getFileSystem();
+    getAbfsStore(fs).getAbfsConfiguration()
+        .setOptimizeFooterRead(optimizeFooterRead);
+    if (fileSize <= getAbfsStore(fs).getAbfsConfiguration()
+        .getReadBufferSize()) {
+      getAbfsStore(fs).getAbfsConfiguration()
+          .setReadSmallFilesCompletely(readSmallFileCompletely);
+    }
+    return fs;
+  }
+
+  protected byte[] getRandomBytesArray(int length) {
+    final byte[] b = new byte[length];
+    new Random().nextBytes(b);
+    return b;
+  }
+
+  protected Path createFileWithContent(FileSystem fs, String fileName,
+      byte[] fileContent) throws IOException {
+    Path testFilePath = path(fileName);
+    try (FSDataOutputStream oStream = fs.create(testFilePath)) {
+      oStream.write(fileContent);
+      oStream.flush();
+    }
+    return testFilePath;
+  }
+
+  protected AzureBlobFileSystemStore getAbfsStore(FileSystem fs)
+      throws NoSuchFieldException, IllegalAccessException {
+    AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
+    Field abfsStoreField = AzureBlobFileSystem.class
+        .getDeclaredField("abfsStore");
+    abfsStoreField.setAccessible(true);
+    return (AzureBlobFileSystemStore) abfsStoreField.get(abfs);
+  }
+
+  protected Map<String, Long> getInstrumentationMap(FileSystem fs)
+      throws NoSuchFieldException, IllegalAccessException {
+    AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
+    Field abfsCountersField = AzureBlobFileSystem.class
+        .getDeclaredField("abfsCounters");
+    abfsCountersField.setAccessible(true);
+    AbfsCounters abfsCounters = (AbfsCounters) abfsCountersField.get(abfs);
+    return abfsCounters.toMap();
+  }
+
+  protected void assertContentReadCorrectly(byte[] actualFileContent, int from,
+      int len, byte[] contentRead) {
+    for (int i = 0; i < len; i++) {
+      assertEquals(contentRead[i], actualFileContent[i + from]);
+    }
+  }
+
+  protected void assertBuffersAreNotEqual(byte[] actualContent,
+      byte[] contentRead, AbfsConfiguration conf) {
+    assertBufferEquality(actualContent, contentRead, conf, false);
+  }
+
+  protected void assertBuffersAreEqual(byte[] actualContent, byte[] 
contentRead,
+      AbfsConfiguration conf) {
+    assertBufferEquality(actualContent, contentRead, conf, true);
+  }
+
+  private void assertBufferEquality(byte[] actualContent, byte[] contentRead,
+      AbfsConfiguration conf, boolean assertEqual) {
+    int bufferSize = conf.getReadBufferSize();
+    int actualContentSize = actualContent.length;
+    int n = (actualContentSize < bufferSize) ? actualContentSize : bufferSize;
+    int matches = 0;
+    for (int i = 0; i < n; i++) {
+      if (actualContent[i] == contentRead[i]) {
+        matches++;
+      }
+    }
+    if (assertEqual) {
+      assertEquals(n, matches);
+    } else {
+      assertNotEquals(n, matches);
+    }
+  }
+
+  protected void seek(FSDataInputStream iStream, long seekPos)
+      throws IOException {
+    AbfsInputStream abfsInputStream = (AbfsInputStream) 
iStream.getWrappedStream();
+    verifyBeforeSeek(abfsInputStream);
+    iStream.seek(seekPos);
+    verifyAfterSeek(abfsInputStream, seekPos);
+  }
+
+  private void verifyBeforeSeek(AbfsInputStream abfsInputStream){
+    assertEquals(0, abfsInputStream.getFCursor());
+    assertEquals(-1, abfsInputStream.getFCursorAfterLastRead());
+    assertEquals(0, abfsInputStream.getLimit());
+    assertEquals(0, abfsInputStream.getBCursor());
+  }
+
+  private void verifyAfterSeek(AbfsInputStream abfsInputStream, long seekPos){
+    assertEquals(seekPos, abfsInputStream.getFCursor());
+    assertEquals(-1, abfsInputStream.getFCursorAfterLastRead());
+    assertEquals(0, abfsInputStream.getLimit());
+    assertEquals(0, abfsInputStream.getBCursor());
+  }
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java
new file mode 100644
index 0000000..09a810c
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+
+public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream {
+
+  private static final int TEN = 10;
+  private static final int TWENTY = 20;
+
+  public ITestAbfsInputStreamReadFooter() throws Exception {
+  }
+
+  @Test
+  public void testOnlyOneServerCallIsMadeWhenTheConfIsTrue() throws Exception {
+    testNumBackendCalls(true);
+  }
+
+  @Test
+  public void testMultipleServerCallsAreMadeWhenTheConfIsFalse()
+      throws Exception {
+    testNumBackendCalls(false);
+  }
+
+  private void testNumBackendCalls(boolean optimizeFooterRead)
+      throws Exception {
+    for (int i = 1; i <= 4; i++) {
+      int fileSize = i * ONE_MB;
+      final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead,
+          fileSize);
+      String fileName = methodName.getMethodName() + i;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+      int length = AbfsInputStream.FOOTER_SIZE;
+      try (FSDataInputStream iStream = fs.open(testFilePath)) {
+        byte[] buffer = new byte[length];
+
+        Map<String, Long> metricMap = getInstrumentationMap(fs);
+        long requestsMadeBeforeTest = metricMap
+            .get(CONNECTIONS_MADE.getStatName());
+
+        iStream.seek(fileSize - 8);
+        iStream.read(buffer, 0, length);
+
+        iStream.seek(fileSize - (TEN * ONE_KB));
+        iStream.read(buffer, 0, length);
+
+        iStream.seek(fileSize - (TWENTY * ONE_KB));
+        iStream.read(buffer, 0, length);
+
+        metricMap = getInstrumentationMap(fs);
+        long requestsMadeAfterTest = metricMap
+            .get(CONNECTIONS_MADE.getStatName());
+
+        if (optimizeFooterRead) {
+          assertEquals(1, requestsMadeAfterTest - requestsMadeBeforeTest);
+        } else {
+          assertEquals(3, requestsMadeAfterTest - requestsMadeBeforeTest);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testSeekToBeginAndReadWithConfTrue() throws Exception {
+    testSeekAndReadWithConf(true, SeekTo.BEGIN);
+  }
+
+  @Test
+  public void testSeekToBeginAndReadWithConfFalse() throws Exception {
+    testSeekAndReadWithConf(false, SeekTo.BEGIN);
+  }
+
+  @Test
+  public void testSeekToBeforeFooterAndReadWithConfTrue() throws Exception {
+    testSeekAndReadWithConf(true, SeekTo.BEFORE_FOOTER_START);
+  }
+
+  @Test
+  public void testSeekToBeforeFooterAndReadWithConfFalse() throws Exception {
+    testSeekAndReadWithConf(false, SeekTo.BEFORE_FOOTER_START);
+  }
+
+  @Test
+  public void testSeekToFooterAndReadWithConfTrue() throws Exception {
+    testSeekAndReadWithConf(true, SeekTo.AT_FOOTER_START);
+  }
+
+  @Test
+  public void testSeekToFooterAndReadWithConfFalse() throws Exception {
+    testSeekAndReadWithConf(false, SeekTo.AT_FOOTER_START);
+  }
+
+  @Test
+  public void testSeekToAfterFooterAndReadWithConfTrue() throws Exception {
+    testSeekAndReadWithConf(true, SeekTo.AFTER_FOOTER_START);
+  }
+
+  @Test
+  public void testSeekToToAfterFooterAndReadWithConfFalse() throws Exception {
+    testSeekAndReadWithConf(false, SeekTo.AFTER_FOOTER_START);
+  }
+
+  @Test
+  public void testSeekToEndAndReadWithConfTrue() throws Exception {
+    testSeekAndReadWithConf(true, SeekTo.END);
+  }
+
+  @Test
+  public void testSeekToEndAndReadWithConfFalse() throws Exception {
+    testSeekAndReadWithConf(false, SeekTo.END);
+  }
+
+  private void testSeekAndReadWithConf(boolean optimizeFooterRead,
+      SeekTo seekTo) throws Exception {
+    for (int i = 2; i <= 6; i++) {
+      int fileSize = i * ONE_MB;
+      final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead,
+          fileSize);
+      String fileName = methodName.getMethodName() + i;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+      seekReadAndTest(fs, testFilePath, seekPos(seekTo, fileSize), HUNDRED,
+          fileContent);
+    }
+  }
+
+  private int seekPos(SeekTo seekTo, int fileSize) {
+    if (seekTo == SeekTo.BEGIN) {
+      return 0;
+    }
+    if (seekTo == SeekTo.BEFORE_FOOTER_START) {
+      return fileSize - AbfsInputStream.FOOTER_SIZE - 1;
+    }
+    if (seekTo == SeekTo.AT_FOOTER_START) {
+      return fileSize - AbfsInputStream.FOOTER_SIZE;
+    }
+    if (seekTo == SeekTo.END) {
+      return fileSize - 1;
+    }
+    //seekTo == SeekTo.AFTER_FOOTER_START
+    return fileSize - AbfsInputStream.FOOTER_SIZE + 1;
+  }
+
+  private void seekReadAndTest(final FileSystem fs, final Path testFilePath,
+      final int seekPos, final int length, final byte[] fileContent)
+      throws IOException, NoSuchFieldException, IllegalAccessException {
+    AbfsConfiguration conf = getAbfsStore(fs).getAbfsConfiguration();
+    long actualContentLength = fileContent.length;
+    try (FSDataInputStream iStream = fs.open(testFilePath)) {
+      AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
+          .getWrappedStream();
+      long bufferSize = abfsInputStream.getBufferSize();
+      seek(iStream, seekPos);
+      byte[] buffer = new byte[length];
+      long bytesRead = iStream.read(buffer, 0, length);
+
+      long footerStart = max(0,
+          actualContentLength - AbfsInputStream.FOOTER_SIZE);
+      boolean optimizationOn =
+          conf.optimizeFooterRead() && seekPos >= footerStart;
+
+      long actualLength = length;
+      if (seekPos + length > actualContentLength) {
+        long delta = seekPos + length - actualContentLength;
+        actualLength = length - delta;
+      }
+      long expectedLimit;
+      long expectedBCurson;
+      long expectedFCursor;
+      if (optimizationOn) {
+        if (actualContentLength <= bufferSize) {
+          expectedLimit = actualContentLength;
+          expectedBCurson = seekPos + actualLength;
+        } else {
+          expectedLimit = bufferSize;
+          long lastBlockStart = max(0, actualContentLength - bufferSize);
+          expectedBCurson = seekPos - lastBlockStart + actualLength;
+        }
+        expectedFCursor = actualContentLength;
+      } else {
+        if (seekPos + bufferSize < actualContentLength) {
+          expectedLimit = bufferSize;
+          expectedFCursor = bufferSize;
+        } else {
+          expectedLimit = actualContentLength - seekPos;
+          expectedFCursor = min(seekPos + bufferSize, actualContentLength);
+        }
+        expectedBCurson = actualLength;
+      }
+
+      assertEquals(expectedFCursor, abfsInputStream.getFCursor());
+      assertEquals(expectedFCursor, abfsInputStream.getFCursorAfterLastRead());
+      assertEquals(expectedLimit, abfsInputStream.getLimit());
+      assertEquals(expectedBCurson, abfsInputStream.getBCursor());
+      assertEquals(actualLength, bytesRead);
+      //  Verify user-content read
+      assertContentReadCorrectly(fileContent, seekPos, (int) actualLength, 
buffer);
+      //  Verify data read to AbfsInputStream buffer
+      int from = seekPos;
+      if (optimizationOn) {
+        from = (int) max(0, actualContentLength - bufferSize);
+      }
+      assertContentReadCorrectly(fileContent, from, (int) 
abfsInputStream.getLimit(),
+          abfsInputStream.getBuffer());
+    }
+  }
+
+  @Test
+  public void testPartialReadWithNoData()
+      throws Exception {
+    for (int i = 2; i <= 6; i++) {
+      int fileSize = i * ONE_MB;
+      final AzureBlobFileSystem fs = getFileSystem(true, fileSize);
+      String fileName = methodName.getMethodName() + i;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+      testPartialReadWithNoData(fs, testFilePath,
+          fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE,
+          fileContent);
+    }
+  }
+
+  private void testPartialReadWithNoData(final FileSystem fs,
+      final Path testFilePath, final int seekPos, final int length,
+      final byte[] fileContent)
+      throws IOException, NoSuchFieldException, IllegalAccessException {
+    FSDataInputStream iStream = fs.open(testFilePath);
+    try {
+      AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
+          .getWrappedStream();
+      abfsInputStream = spy(abfsInputStream);
+      doReturn(10).doReturn(10).doCallRealMethod().when(abfsInputStream)
+          .readRemote(anyLong(), any(), anyInt(), anyInt());
+
+      iStream = new FSDataInputStream(abfsInputStream);
+      seek(iStream, seekPos);
+
+      byte[] buffer = new byte[length];
+      int bytesRead = iStream.read(buffer, 0, length);
+      assertEquals(length, bytesRead);
+      assertContentReadCorrectly(fileContent, seekPos, length, buffer);
+      assertEquals(fileContent.length, abfsInputStream.getFCursor());
+      assertEquals(length, abfsInputStream.getBCursor());
+      assertTrue(abfsInputStream.getLimit() >= length);
+    } finally {
+      iStream.close();
+    }
+  }
+
+  @Test
+  public void testPartialReadWithSomeDat()
+      throws Exception {
+    for (int i = 3; i <= 6; i++) {
+      int fileSize = i * ONE_MB;
+      final AzureBlobFileSystem fs = getFileSystem(true, fileSize);
+      String fileName = methodName.getMethodName() + i;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+      testPartialReadWithSomeDat(fs, testFilePath,
+          fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE,
+          fileContent);
+    }
+  }
+
+  private void testPartialReadWithSomeDat(final FileSystem fs,
+      final Path testFilePath, final int seekPos, final int length,
+      final byte[] fileContent)
+      throws IOException, NoSuchFieldException, IllegalAccessException {
+    FSDataInputStream iStream = fs.open(testFilePath);
+    try {
+      AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
+          .getWrappedStream();
+      abfsInputStream = spy(abfsInputStream);
+      //  first readRemote, will return first 10 bytes
+      //  second readRemote returns data till the last 2 bytes
+      int someDataLength = 2;
+      int secondReturnSize =
+          min(fileContent.length, abfsInputStream.getBufferSize()) - 10
+              - someDataLength;
+      doReturn(10).doReturn(secondReturnSize).doCallRealMethod()
+          .when(abfsInputStream)
+          .readRemote(anyLong(), any(), anyInt(), anyInt());
+
+      iStream = new FSDataInputStream(abfsInputStream);
+      seek(iStream, seekPos);
+
+      byte[] buffer = new byte[length];
+      int bytesRead = iStream.read(buffer, 0, length);
+      assertEquals(length, bytesRead);
+      assertEquals(fileContent.length, abfsInputStream.getFCursor());
+      //  someDataLength(2), because in the do-while loop in read, the 2nd loop
+      //  will go to readoneblock and that resets the bCursor to 0 as
+      //  bCursor == limit finally when the 2 bytes are read bCursor and limit
+      //  will be at someDataLength(2)
+      assertEquals(someDataLength, abfsInputStream.getBCursor());
+      assertEquals(someDataLength, abfsInputStream.getLimit());
+    } finally {
+      iStream.close();
+    }
+  }
+
+  private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead,
+      int fileSize) throws IOException {
+    final AzureBlobFileSystem fs = getFileSystem();
+    getAbfsStore(fs).getAbfsConfiguration()
+        .setOptimizeFooterRead(optimizeFooterRead);
+    if (fileSize <= getAbfsStore(fs).getAbfsConfiguration()
+        .getReadBufferSize()) {
+      getAbfsStore(fs).getAbfsConfiguration()
+          .setReadSmallFilesCompletely(false);
+    }
+    return fs;
+  }
+
+  private enum SeekTo {
+    BEGIN, AT_FOOTER_START, BEFORE_FOOTER_START, AFTER_FOOTER_START, END
+  }
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java
new file mode 100644
index 0000000..ff03c0e
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+
+public class ITestAbfsInputStreamSmallFileReads extends ITestAbfsInputStream {
+
+  public ITestAbfsInputStreamSmallFileReads() throws Exception {
+  }
+
+  @Test
+  public void testOnlyOneServerCallIsMadeWhenTheConfIsTrue() throws Exception {
+    testNumBackendCalls(true);
+  }
+
+  @Test
+  public void testMultipleServerCallsAreMadeWhenTheConfIsFalse()
+      throws Exception {
+    testNumBackendCalls(false);
+  }
+
+  private void testNumBackendCalls(boolean readSmallFilesCompletely)
+      throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem(readSmallFilesCompletely);
+    for (int i = 1; i <= 4; i++) {
+      String fileName = methodName.getMethodName() + i;
+      int fileSize = i * ONE_MB;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+      int length = ONE_KB;
+      try (FSDataInputStream iStream = fs.open(testFilePath)) {
+        byte[] buffer = new byte[length];
+
+        Map<String, Long> metricMap = getInstrumentationMap(fs);
+        long requestsMadeBeforeTest = metricMap
+            .get(CONNECTIONS_MADE.getStatName());
+
+        iStream.seek(seekPos(SeekTo.END, fileSize, length));
+        iStream.read(buffer, 0, length);
+
+        iStream.seek(seekPos(SeekTo.MIDDLE, fileSize, length));
+        iStream.read(buffer, 0, length);
+
+        iStream.seek(seekPos(SeekTo.BEGIN, fileSize, length));
+        iStream.read(buffer, 0, length);
+
+        metricMap = getInstrumentationMap(fs);
+        long requestsMadeAfterTest = metricMap
+            .get(CONNECTIONS_MADE.getStatName());
+
+        if (readSmallFilesCompletely) {
+          assertEquals(1, requestsMadeAfterTest - requestsMadeBeforeTest);
+        } else {
+          assertEquals(3, requestsMadeAfterTest - requestsMadeBeforeTest);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testSeekToBeginingAndReadSmallFileWithConfTrue()
+      throws Exception {
+    testSeekAndReadWithConf(SeekTo.BEGIN, 2, 4, true);
+  }
+
+  @Test
+  public void testSeekToBeginingAndReadSmallFileWithConfFalse()
+      throws Exception {
+    testSeekAndReadWithConf(SeekTo.BEGIN, 2, 4, false);
+  }
+
+  @Test
+  public void testSeekToBeginingAndReadBigFileWithConfTrue() throws Exception {
+    testSeekAndReadWithConf(SeekTo.BEGIN, 5, 6, true);
+  }
+
+  @Test
+  public void testSeekToBeginingAndReadBigFileWithConfFalse() throws Exception 
{
+    testSeekAndReadWithConf(SeekTo.BEGIN, 5, 6, false);
+  }
+
+  @Test
+  public void testSeekToEndAndReadSmallFileWithConfTrue() throws Exception {
+    testSeekAndReadWithConf(SeekTo.END, 2, 4, true);
+  }
+
+  @Test
+  public void testSeekToEndAndReadSmallFileWithConfFalse() throws Exception {
+    testSeekAndReadWithConf(SeekTo.END, 2, 4, false);
+  }
+
+  @Test
+  public void testSeekToEndAndReadBigFileWithConfTrue() throws Exception {
+    testSeekAndReadWithConf(SeekTo.END, 5, 6, true);
+  }
+
+  @Test
+  public void testSeekToEndAndReaBigFiledWithConfFalse() throws Exception {
+    testSeekAndReadWithConf(SeekTo.END, 5, 6, false);
+  }
+
+  @Test
+  public void testSeekToMiddleAndReadSmallFileWithConfTrue() throws Exception {
+    testSeekAndReadWithConf(SeekTo.MIDDLE, 2, 4, true);
+  }
+
+  @Test
+  public void testSeekToMiddleAndReadSmallFileWithConfFalse() throws Exception 
{
+    testSeekAndReadWithConf(SeekTo.MIDDLE, 2, 4, false);
+  }
+
+  @Test
+  public void testSeekToMiddleAndReaBigFileWithConfTrue() throws Exception {
+    testSeekAndReadWithConf(SeekTo.MIDDLE, 5, 6, true);
+  }
+
+  @Test
+  public void testSeekToMiddleAndReadBigFileWithConfFalse() throws Exception {
+    testSeekAndReadWithConf(SeekTo.MIDDLE, 5, 6, false);
+  }
+
+  private void testSeekAndReadWithConf(SeekTo seekTo, int startFileSizeInMB,
+      int endFileSizeInMB, boolean readSmallFilesCompletely) throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem(readSmallFilesCompletely);
+    for (int i = startFileSizeInMB; i <= endFileSizeInMB; i++) {
+      String fileName = methodName.getMethodName() + i;
+      int fileSize = i * ONE_MB;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+      int length = ONE_KB;
+      int seekPos = seekPos(seekTo, fileSize, length);
+      seekReadAndTest(fs, testFilePath, seekPos, length, fileContent);
+    }
+  }
+
+  private int seekPos(SeekTo seekTo, int fileSize, int length) {
+    if (seekTo == SeekTo.BEGIN) {
+      return 0;
+    }
+    if (seekTo == SeekTo.END) {
+      return fileSize - length;
+    }
+    return fileSize / 2;
+  }
+
+  private void seekReadAndTest(FileSystem fs, Path testFilePath, int seekPos,
+      int length, byte[] fileContent)
+      throws IOException, NoSuchFieldException, IllegalAccessException {
+    AbfsConfiguration conf = getAbfsStore(fs).getAbfsConfiguration();
+    try (FSDataInputStream iStream = fs.open(testFilePath)) {
+      seek(iStream, seekPos);
+      byte[] buffer = new byte[length];
+      int bytesRead = iStream.read(buffer, 0, length);
+      assertEquals(bytesRead, length);
+      assertContentReadCorrectly(fileContent, seekPos, length, buffer);
+      AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
+          .getWrappedStream();
+
+      final int readBufferSize = conf.getReadBufferSize();
+      final int fileContentLength = fileContent.length;
+      final boolean smallFile = fileContentLength <= readBufferSize;
+      int expectedLimit, expectedFCursor;
+      int expectedBCursor;
+      if (conf.readSmallFilesCompletely() && smallFile) {
+        assertBuffersAreEqual(fileContent, abfsInputStream.getBuffer(), conf);
+        expectedFCursor = fileContentLength;
+        expectedLimit = fileContentLength;
+        expectedBCursor = seekPos + length;
+      } else {
+        if ((seekPos == 0)) {
+          assertBuffersAreEqual(fileContent, abfsInputStream.getBuffer(), 
conf);
+        } else {
+          assertBuffersAreNotEqual(fileContent, abfsInputStream.getBuffer(),
+              conf);
+        }
+        expectedBCursor = length;
+        expectedFCursor = (fileContentLength < (seekPos + readBufferSize))
+            ? fileContentLength
+            : (seekPos + readBufferSize);
+        expectedLimit = (fileContentLength < (seekPos + readBufferSize))
+            ? (fileContentLength - seekPos)
+            : readBufferSize;
+      }
+      assertEquals(expectedFCursor, abfsInputStream.getFCursor());
+      assertEquals(expectedFCursor, abfsInputStream.getFCursorAfterLastRead());
+      assertEquals(expectedBCursor, abfsInputStream.getBCursor());
+      assertEquals(expectedLimit, abfsInputStream.getLimit());
+    }
+  }
+
+  @Test
+  public void testPartialReadWithNoData() throws Exception {
+    for (int i = 2; i <= 4; i++) {
+      int fileSize = i * ONE_MB;
+      final AzureBlobFileSystem fs = getFileSystem(true);
+      String fileName = methodName.getMethodName() + i;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+      partialReadWithNoData(fs, testFilePath, fileSize / 2, fileSize / 4,
+          fileContent);
+    }
+  }
+
+  private void partialReadWithNoData(final FileSystem fs,
+      final Path testFilePath,
+      final int seekPos, final int length, final byte[] fileContent)
+      throws IOException {
+
+    FSDataInputStream iStream = fs.open(testFilePath);
+    try {
+      AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
+          .getWrappedStream();
+      abfsInputStream = spy(abfsInputStream);
+      doReturn(10)
+          .doReturn(10)
+          .doCallRealMethod()
+          .when(abfsInputStream)
+          .readRemote(anyLong(), any(), anyInt(), anyInt());
+
+      iStream = new FSDataInputStream(abfsInputStream);
+      seek(iStream, seekPos);
+      byte[] buffer = new byte[length];
+      int bytesRead = iStream.read(buffer, 0, length);
+      assertEquals(bytesRead, length);
+      assertContentReadCorrectly(fileContent, seekPos, length, buffer);
+      assertEquals(fileContent.length, abfsInputStream.getFCursor());
+      assertEquals(fileContent.length,
+          abfsInputStream.getFCursorAfterLastRead());
+      assertEquals(length, abfsInputStream.getBCursor());
+      assertTrue(abfsInputStream.getLimit() >= length);
+    } finally {
+      iStream.close();
+    }
+  }
+
+  @Test
+  public void testPartialReadWithSomeData() throws Exception {
+    for (int i = 2; i <= 4; i++) {
+      int fileSize = i * ONE_MB;
+      final AzureBlobFileSystem fs = getFileSystem(true);
+      String fileName = methodName.getMethodName() + i;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+      partialReadWithSomeData(fs, testFilePath, fileSize / 2,
+          fileSize / 4, fileContent);
+    }
+  }
+
+  private void partialReadWithSomeData(final FileSystem fs,
+      final Path testFilePath,
+      final int seekPos, final int length, final byte[] fileContent)
+      throws IOException, NoSuchFieldException, IllegalAccessException {
+    FSDataInputStream iStream = fs.open(testFilePath);
+    try {
+      AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
+          .getWrappedStream();
+      abfsInputStream = spy(abfsInputStream);
+      //  first readRemote, will return first 10 bytes
+      //  second readRemote, seekPos - someDataLength(10) will reach the
+      //  seekPos as 10 bytes are already read in the first call. Plus
+      //  someDataLength(10)
+      int someDataLength = 10;
+      int secondReturnSize = seekPos - 10 + someDataLength;
+      doReturn(10)
+          .doReturn(secondReturnSize)
+          .doCallRealMethod()
+          .when(abfsInputStream)
+          .readRemote(anyLong(), any(), anyInt(), anyInt());
+
+      iStream = new FSDataInputStream(abfsInputStream);
+      seek(iStream, seekPos);
+
+      byte[] buffer = new byte[length];
+      int bytesRead = iStream.read(buffer, 0, length);
+      assertEquals(length, bytesRead);
+      assertTrue(abfsInputStream.getFCursor() > seekPos + length);
+      assertTrue(abfsInputStream.getFCursorAfterLastRead() > seekPos + length);
+      //  Optimized read was no complete but it got some user requested data
+      //  from server. So obviously the buffer will contain data more than
+      //  seekPos + len
+      assertEquals(length - someDataLength, abfsInputStream.getBCursor());
+      assertTrue(abfsInputStream.getLimit() > length - someDataLength);
+    } finally {
+      iStream.close();
+    }
+  }
+
+  private enum SeekTo {BEGIN, MIDDLE, END}
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to