This is an automated email from the ASF dual-hosted git repository.

surendralilhore pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dcddc6a  HADOOP-17682. ABFS: Support FileStatus input to 
OpenFileWithOptions() via OpenFileParameters (#2975)
dcddc6a is described below

commit dcddc6a59f687f40062383aec2fbb548d011821d
Author: sumangala-patki <70206833+sumangala-pa...@users.noreply.github.com>
AuthorDate: Wed Aug 18 19:14:10 2021 +0530

    HADOOP-17682. ABFS: Support FileStatus input to OpenFileWithOptions() via 
OpenFileParameters (#2975)
---
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java    |  20 ++--
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |  71 ++++++++-----
 .../fs/azurebfs/AbstractAbfsIntegrationTest.java   |  10 ++
 .../fs/azurebfs/services/TestAbfsInputStream.java  | 115 ++++++++++++++++++++-
 4 files changed, 183 insertions(+), 33 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index a8bf7c1..9127428 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -208,16 +208,15 @@ public class AzureBlobFileSystem extends FileSystem
   }
 
   private FSDataInputStream open(final Path path,
-      final Optional<Configuration> options) throws IOException {
+      final Optional<OpenFileParameters> parameters) throws IOException {
     statIncrement(CALL_OPEN);
     Path qualifiedPath = makeQualified(path);
 
     try {
       TracingContext tracingContext = new TracingContext(clientCorrelationId,
-          fileSystemId, FSOperationType.OPEN, tracingHeaderFormat,
-          listener);
-      InputStream inputStream = abfsStore.openFileForRead(qualifiedPath,
-          options, statistics, tracingContext);
+          fileSystemId, FSOperationType.OPEN, tracingHeaderFormat, listener);
+      InputStream inputStream = abfsStore
+          .openFileForRead(qualifiedPath, parameters, statistics, 
tracingContext);
       return new FSDataInputStream(inputStream);
     } catch(AzureBlobFileSystemException ex) {
       checkException(path, ex);
@@ -225,6 +224,15 @@ public class AzureBlobFileSystem extends FileSystem
     }
   }
 
+  /**
+   * Takes config and other options through
+   * {@link org.apache.hadoop.fs.impl.OpenFileParameters}. Ensure that
+   * FileStatus entered is up-to-date, as it will be used to create the
+   * InputStream (with info such as contentLength, eTag)
+   * @param path The location of file to be opened
+   * @param parameters OpenFileParameters instance; can hold FileStatus,
+   *                   Configuration, bufferSize and mandatoryKeys
+   */
   @Override
   protected CompletableFuture<FSDataInputStream> openFileWithOptions(
       final Path path, final OpenFileParameters parameters) throws IOException 
{
@@ -235,7 +243,7 @@ public class AzureBlobFileSystem extends FileSystem
         "for " + path);
     return LambdaUtils.eval(
         new CompletableFuture<>(), () ->
-            open(path, Optional.of(parameters.getOptions())));
+            open(path, Optional.of(parameters)));
   }
 
   @Override
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 3a527f7..de6f676 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -115,6 +115,7 @@ import org.apache.hadoop.fs.azurebfs.utils.CRC64;
 import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -129,6 +130,8 @@ import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_HYP
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_PLUS;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION;
@@ -669,44 +672,64 @@ public class AzureBlobFileSystemStore implements 
Closeable, ListingSupport {
 
   public AbfsInputStream openFileForRead(final Path path,
       final FileSystem.Statistics statistics, TracingContext tracingContext)
-      throws AzureBlobFileSystemException {
-    return openFileForRead(path, Optional.empty(), statistics, tracingContext);
+      throws IOException {
+    return openFileForRead(path, Optional.empty(), statistics,
+        tracingContext);
   }
 
-  public AbfsInputStream openFileForRead(final Path path,
-      final Optional<Configuration> options,
+  public AbfsInputStream openFileForRead(Path path,
+      final Optional<OpenFileParameters> parameters,
       final FileSystem.Statistics statistics, TracingContext tracingContext)
-      throws AzureBlobFileSystemException {
-    try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", 
"getPathStatus")) {
+      throws IOException {
+    try (AbfsPerfInfo perfInfo = startTracking("openFileForRead",
+        "getPathStatus")) {
       LOG.debug("openFileForRead filesystem: {} path: {}",
-              client.getFileSystem(),
-              path);
+          client.getFileSystem(), path);
 
+      FileStatus fileStatus = parameters.map(OpenFileParameters::getStatus)
+          .orElse(null);
       String relativePath = getRelativePath(path);
-
-      final AbfsRestOperation op = client
-          .getPathStatus(relativePath, false, tracingContext);
-      perfInfo.registerResult(op.getResult());
-
-      final String resourceType = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
-      final long contentLength = 
Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
-      final String eTag = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+      String resourceType, eTag;
+      long contentLength;
+      if (fileStatus instanceof VersionedFileStatus) {
+        path = path.makeQualified(this.uri, path);
+        Preconditions.checkArgument(fileStatus.getPath().equals(path),
+            String.format(
+                "Filestatus path [%s] does not match with given path [%s]",
+                fileStatus.getPath(), path));
+        resourceType = fileStatus.isFile() ? FILE : DIRECTORY;
+        contentLength = fileStatus.getLen();
+        eTag = ((VersionedFileStatus) fileStatus).getVersion();
+      } else {
+        if (fileStatus != null) {
+          LOG.warn(
+              "Fallback to getPathStatus REST call as provided filestatus "
+                  + "is not of type VersionedFileStatus");
+        }
+        AbfsHttpOperation op = client.getPathStatus(relativePath, false,
+            tracingContext).getResult();
+        resourceType = op.getResponseHeader(
+            HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
+        contentLength = Long.parseLong(
+            op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
+        eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
+      }
 
       if (parseIsDirectory(resourceType)) {
         throw new AbfsRestOperationException(
-                AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
-                AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
-                "openFileForRead must be used with files and not directories",
-                null);
+            AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
+            AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
+            "openFileForRead must be used with files and not directories",
+            null);
       }
 
       perfInfo.registerSuccess(true);
 
       // Add statistics for InputStream
-      return new AbfsInputStream(client, statistics,
-              relativePath, contentLength,
-              populateAbfsInputStreamContext(options),
-              eTag, tracingContext);
+      return new AbfsInputStream(client, statistics, relativePath,
+          contentLength, populateAbfsInputStreamContext(
+          parameters.map(OpenFileParameters::getOptions)),
+          eTag, tracingContext);
     }
   }
 
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
index ae24cf4..d4e44c3 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
 import org.apache.hadoop.fs.azurebfs.services.AuthType;
 import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
@@ -429,6 +430,15 @@ public abstract class AbstractAbfsIntegrationTest extends
     return fs.getAbfsStore();
   }
 
+  public AbfsClient getAbfsClient(final AzureBlobFileSystemStore abfsStore) {
+    return abfsStore.getClient();
+  }
+
+  public void setAbfsClient(AzureBlobFileSystemStore abfsStore,
+      AbfsClient client) {
+    abfsStore.setClient(client);
+  }
+
   public Path makeQualified(Path path) throws java.io.IOException {
     return getFileSystem().makeQualified(path);
   }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
index 62326e0..b5ae9b7 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
@@ -19,31 +19,40 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 import java.io.IOException;
-
-import org.junit.Assert;
-import org.junit.Test;
 import java.util.Arrays;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
 
 import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
 import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -192,6 +201,106 @@ public class TestAbfsInputStream extends
     
ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD);
   }
 
+  private void writeBufferToNewFile(Path testFile, byte[] buffer) throws 
IOException {
+    AzureBlobFileSystem fs = getFileSystem();
+    fs.create(testFile);
+    FSDataOutputStream out = fs.append(testFile);
+    out.write(buffer);
+    out.close();
+  }
+
+  private void verifyOpenWithProvidedStatus(Path path, FileStatus fileStatus,
+      byte[] buf, AbfsRestOperationType source)
+      throws IOException, ExecutionException, InterruptedException {
+    byte[] readBuf = new byte[buf.length];
+    AzureBlobFileSystem fs = getFileSystem();
+    FutureDataInputStreamBuilder builder = fs.openFile(path);
+    builder.withFileStatus(fileStatus);
+    FSDataInputStream in = builder.build().get();
+    assertEquals(String.format(
+        "Open with fileStatus [from %s result]: Incorrect number of bytes 
read",
+        source), buf.length, in.read(readBuf));
+    assertArrayEquals(String
+        .format("Open with fileStatus [from %s result]: Incorrect read data",
+            source), readBuf, buf);
+  }
+
+  private void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus,
+      AzureBlobFileSystemStore abfsStore, AbfsClient mockClient,
+      AbfsRestOperationType source, TracingContext tracingContext)
+      throws IOException {
+
+    // verify GetPathStatus not invoked when FileStatus is provided
+    abfsStore.openFileForRead(testFile, Optional
+        .ofNullable(new OpenFileParameters().withStatus(fileStatus)), null, 
tracingContext);
+    verify(mockClient, times(0).description((String.format(
+        "FileStatus [from %s result] provided, GetFileStatus should not be 
invoked",
+        source)))).getPathStatus(anyString(), anyBoolean(), 
any(TracingContext.class));
+
+    // verify GetPathStatus invoked when FileStatus not provided
+    abfsStore.openFileForRead(testFile,
+        Optional.empty(), null,
+        tracingContext);
+    verify(mockClient, times(1).description(
+        "GetPathStatus should be invoked when FileStatus not provided"))
+        .getPathStatus(anyString(), anyBoolean(), any(TracingContext.class));
+
+    Mockito.reset(mockClient); //clears invocation count for next test case
+  }
+
+  @Test
+  public void testOpenFileWithOptions() throws Exception {
+    AzureBlobFileSystem fs = getFileSystem();
+    String testFolder = "/testFolder";
+    Path smallTestFile = new Path(testFolder + "/testFile0");
+    Path largeTestFile = new Path(testFolder + "/testFile1");
+    fs.mkdirs(new Path(testFolder));
+    int readBufferSize = getConfiguration().getReadBufferSize();
+    byte[] smallBuffer = new byte[5];
+    byte[] largeBuffer = new byte[readBufferSize + 5];
+    new Random().nextBytes(smallBuffer);
+    new Random().nextBytes(largeBuffer);
+    writeBufferToNewFile(smallTestFile, smallBuffer);
+    writeBufferToNewFile(largeTestFile, largeBuffer);
+
+    FileStatus[] getFileStatusResults = {fs.getFileStatus(smallTestFile),
+        fs.getFileStatus(largeTestFile)};
+    FileStatus[] listStatusResults = fs.listStatus(new Path(testFolder));
+
+    // open with fileStatus from GetPathStatus
+    verifyOpenWithProvidedStatus(smallTestFile, getFileStatusResults[0],
+        smallBuffer, AbfsRestOperationType.GetPathStatus);
+    verifyOpenWithProvidedStatus(largeTestFile, getFileStatusResults[1],
+        largeBuffer, AbfsRestOperationType.GetPathStatus);
+
+    // open with fileStatus from ListStatus
+    verifyOpenWithProvidedStatus(smallTestFile, listStatusResults[0], 
smallBuffer,
+        AbfsRestOperationType.ListPaths);
+    verifyOpenWithProvidedStatus(largeTestFile, listStatusResults[1], 
largeBuffer,
+        AbfsRestOperationType.ListPaths);
+
+    // verify number of GetPathStatus invocations
+    AzureBlobFileSystemStore abfsStore = getAbfsStore(fs);
+    AbfsClient mockClient = spy(getAbfsClient(abfsStore));
+    setAbfsClient(abfsStore, mockClient);
+    TracingContext tracingContext = getTestTracingContext(fs, false);
+    checkGetPathStatusCalls(smallTestFile, getFileStatusResults[0],
+        abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, 
tracingContext);
+    checkGetPathStatusCalls(largeTestFile, getFileStatusResults[1],
+        abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, 
tracingContext);
+    checkGetPathStatusCalls(smallTestFile, listStatusResults[0],
+        abfsStore, mockClient, AbfsRestOperationType.ListPaths, 
tracingContext);
+    checkGetPathStatusCalls(largeTestFile, listStatusResults[1],
+        abfsStore, mockClient, AbfsRestOperationType.ListPaths, 
tracingContext);
+
+    // Verify with incorrect filestatus
+    getFileStatusResults[0].setPath(new Path("wrongPath"));
+    intercept(ExecutionException.class,
+        () -> verifyOpenWithProvidedStatus(smallTestFile,
+            getFileStatusResults[0], smallBuffer,
+            AbfsRestOperationType.GetPathStatus));
+  }
+
   /**
    * This test expects AbfsInputStream to throw the exception that readAhead
    * thread received on read. The readAhead thread must be initiated from the

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to