This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new daa33aafffc HADOOP-18577. ABFS: Add probes of readahead fix (#5205)
daa33aafffc is described below

commit daa33aafffce3d66c2bc97ded7bab9d79b5e1f98
Author: Steve Loughran <ste...@cloudera.com>
AuthorDate: Thu Dec 15 17:08:25 2022 +0000

    HADOOP-18577. ABFS: Add probes of readahead fix (#5205)
    
    Followup patch to  HADOOP-18456 as part of HADOOP-18521,
    ABFS ReadBufferManager buffer sharing across concurrent HTTP requests
    
    Add probes of readahead fix aid in checking safety of
    hadoop ABFS client across different releases.
    
    * ReadBufferManager constructor logs the fact it is safe at TRACE
    * AbfsInputStream declares it is fixed in toString()
      by including fs.azure.capability.readahead.safe" in the
      result.
    
    The ABFS FileSystem hasPathCapability("fs.azure.capability.readahead.safe")
    probe returns true to indicate the client's readahead manager has been fixed
    to be safe when prefetching.
    
    All Hadoop releases for which probe this returns false
    and for which the probe "fs.capability.etags.available"
    returns true at risk of returning invalid data when reading
    ADLS Gen2/Azure storage data.
    
    Contributed by Steve Loughran.
---
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java    |  7 ++++
 .../fs/azurebfs/constants/InternalConstants.java   | 46 ++++++++++++++++++++++
 .../fs/azurebfs/services/AbfsInputStream.java      |  8 ++--
 .../fs/azurebfs/services/ReadBufferManager.java    |  1 +
 .../fs/azurebfs/ITestFileSystemInitialization.java | 32 +++++++++++++++
 .../azurebfs/services/ITestReadBufferManager.java  | 26 +++++++++++-
 6 files changed, 115 insertions(+), 5 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 21501d28f42..5534b5fb44a 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -117,6 +117,7 @@ import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
 import static 
org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
 import static 
org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
@@ -235,6 +236,7 @@ public class AzureBlobFileSystem extends FileSystem
     sb.append("uri=").append(uri);
     sb.append(", user='").append(abfsStore.getUser()).append('\'');
     sb.append(", 
primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
+    sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]");
     sb.append('}');
     return sb.toString();
   }
@@ -1636,6 +1638,11 @@ public class AzureBlobFileSystem extends FileSystem
           new TracingContext(clientCorrelationId, fileSystemId,
               FSOperationType.HAS_PATH_CAPABILITY, tracingHeaderFormat,
               listener));
+
+      // probe for presence of the HADOOP-18546 readahead fix.
+    case CAPABILITY_SAFE_READAHEAD:
+      return true;
+
     default:
       return super.hasPathCapability(p, capability);
     }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/InternalConstants.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/InternalConstants.java
new file mode 100644
index 00000000000..12d4f14d92a
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/InternalConstants.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.constants;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Constants which are used internally and which don't fit into the other
+ * classes.
+ * For use within the {@code hadoop-azure} module only.
+ */
+@InterfaceAudience.Private
+public final class InternalConstants {
+
+  private InternalConstants() {
+  }
+
+  /**
+   * Does this version of the store have safe readahead?
+   * Possible combinations of this and the probe
+   * {@code "fs.capability.etags.available"}.
+   * <ol>
+   *   <li>{@value}: store is safe</li>
+   *   <li>!etags: store is safe</li>
+   *   <li>etags && !{@value}: store is <i>UNSAFE</i></li>
+   * </ol>
+   */
+  public static final String CAPABILITY_SAFE_READAHEAD =
+      "fs.azure.capability.readahead.safe";
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index e7ddffe99fd..14188535b84 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -50,6 +50,7 @@ import static java.lang.Math.min;
 
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN;
+import static 
org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
 import static org.apache.hadoop.util.StringUtils.toLowerCase;
 
 /**
@@ -828,11 +829,12 @@ public class AbfsInputStream extends FSInputStream 
implements CanUnbuffer,
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder(super.toString());
+    sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
+    sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]");
     if (streamStatistics != null) {
-      sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
-      sb.append(streamStatistics.toString());
-      sb.append("}");
+      sb.append(", ").append(streamStatistics);
     }
+    sb.append("}");
     return sb.toString();
   }
 
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
index 62d050d0fc3..0f91afe0982 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
@@ -101,6 +101,7 @@ final class ReadBufferManager {
 
   // hide instance constructor
   private ReadBufferManager() {
+    LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
   }
 
 
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java
index 8b60dd801cb..f7d4a5b7a83 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java
@@ -20,14 +20,22 @@ package org.apache.hadoop.fs.azurebfs;
 
 import java.net.URI;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
 import org.apache.hadoop.fs.azurebfs.services.AuthType;
 
+import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_AVAILABLE;
+import static 
org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME;
+import static org.apache.hadoop.fs.CommonPathCapabilities.FS_ACLS;
+import static 
org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
+import static org.junit.Assume.assumeTrue;
+
 /**
  * Test AzureBlobFileSystem initialization.
  */
@@ -74,4 +82,28 @@ public class ITestFileSystemInitialization extends 
AbstractAbfsIntegrationTest {
       assertNotNull("working directory", fs.getWorkingDirectory());
     }
   }
+
+  @Test
+  public void testFileSystemCapabilities() throws Throwable {
+    final AzureBlobFileSystem fs = getFileSystem();
+
+    final Path p = new Path("}");
+    // etags always present
+    Assertions.assertThat(fs.hasPathCapability(p, ETAGS_AVAILABLE))
+        .describedAs("path capability %s in %s", ETAGS_AVAILABLE, fs)
+        .isTrue();
+    // readahead always correct
+    Assertions.assertThat(fs.hasPathCapability(p, CAPABILITY_SAFE_READAHEAD))
+        .describedAs("path capability %s in %s", CAPABILITY_SAFE_READAHEAD, fs)
+        .isTrue();
+
+    // etags-over-rename and ACLs are either both true or both false.
+    final boolean etagsAcrossRename = fs.hasPathCapability(p, 
ETAGS_PRESERVED_IN_RENAME);
+    final boolean acls = fs.hasPathCapability(p, FS_ACLS);
+    Assertions.assertThat(etagsAcrossRename)
+        .describedAs("capabilities %s=%s and %s=%s in %s",
+            ETAGS_PRESERVED_IN_RENAME, etagsAcrossRename,
+            FS_ACLS, acls, fs)
+        .isEqualTo(acls);
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
index eca670fba90..a57430fa808 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
@@ -44,9 +44,24 @@ import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH;
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+import static 
org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
+import static org.apache.hadoop.test.LambdaTestUtils.eventually;
 
 public class ITestReadBufferManager extends AbstractAbfsIntegrationTest {
 
+  /**
+   * Time before the JUnit test times out for eventually() clauses
+   * to fail. This copes with slow network connections and debugging
+   * sessions, yet still allows for tests to fail with meaningful
+   * messages.
+   */
+  public static final int TIMEOUT_OFFSET = 5 * 60_000;
+
+  /**
+   * Interval between eventually preobes.
+   */
+  public static final int PROBE_INTERVAL_MILLIS = 1_000;
+
     public ITestReadBufferManager() throws Exception {
     }
 
@@ -61,6 +76,11 @@ public class ITestReadBufferManager extends 
AbstractAbfsIntegrationTest {
         }
         ExecutorService executorService = Executors.newFixedThreadPool(4);
         AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
+        // verify that the fs has the capability to validate the fix
+        Assertions.assertThat(fs.hasPathCapability(new Path("/"), 
CAPABILITY_SAFE_READAHEAD))
+            .describedAs("path capability %s in %s", 
CAPABILITY_SAFE_READAHEAD, fs)
+            .isTrue();
+
         try {
             for (int i = 0; i < 4; i++) {
                 final String fileName = methodName.getMethodName() + i;
@@ -80,9 +100,11 @@ public class ITestReadBufferManager extends 
AbstractAbfsIntegrationTest {
         }
 
         ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
-        // verify there is no work in progress or the readahead queue.
-        assertListEmpty("InProgressList", 
bufferManager.getInProgressCopiedList());
+        // readahead queue is empty
         assertListEmpty("ReadAheadQueue", 
bufferManager.getReadAheadQueueCopy());
+        // verify the in progress list eventually empties out.
+        eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET, 
PROBE_INTERVAL_MILLIS, () ->
+            assertListEmpty("InProgressList", 
bufferManager.getInProgressCopiedList()));
     }
 
     private void assertListEmpty(String listName, List<ReadBuffer> list) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to