This is an automated email from the ASF dual-hosted git repository.

anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 401003a2e73 HADOOP-19672 Nework switchover when apache client throw 
error (#7967)
401003a2e73 is described below

commit 401003a2e7357e849322f671c3a71bfe6f5d0036
Author: Manish Bhatt <[email protected]>
AuthorDate: Mon Oct 13 06:18:58 2025 +0000

    HADOOP-19672 Nework switchover when apache client throw error (#7967)
    
    Contributed by Manish Bhatt.
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  8 ++
 .../fs/azurebfs/constants/ConfigurationKeys.java   |  7 ++
 .../constants/FileSystemConfigurations.java        | 10 ++-
 .../fs/azurebfs/services/AbfsApacheHttpClient.java | 24 +++++-
 .../fs/azurebfs/services/AbfsBlobClient.java       |  5 +-
 .../hadoop/fs/azurebfs/services/AbfsClient.java    | 41 +++++++----
 .../fs/azurebfs/services/AbfsClientHandler.java    |  5 +-
 .../azurebfs/services/AbfsConnectionManager.java   | 38 ++++++----
 .../hadoop/fs/azurebfs/services/AbfsDfsClient.java |  5 +-
 .../hadoop-azure/src/site/markdown/index.md        |  4 +-
 .../fs/azurebfs/ITestWasbAbfsCompatibility.java    |  2 +
 .../fs/azurebfs/services/ITestAbfsClient.java      | 86 ++++++++++++++++++----
 .../services/ITestApacheClientConnectionPool.java  | 65 +++++++++++++++-
 13 files changed, 243 insertions(+), 57 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 0c173238c34..7c355671cf8 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -203,6 +203,10 @@ public class AbfsConfiguration{
           DefaultValue = DEFAULT_HTTP_READ_TIMEOUT)
   private int httpReadTimeout;
 
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
AZURE_EXPECT_100CONTINUE_WAIT_TIMEOUT,
+          DefaultValue = DEFAULT_EXPECT_100CONTINUE_WAIT_TIMEOUT)
+  private int expect100ContinueWaitTimeout;
+
   @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT,
       MinValue = 0,
       DefaultValue = DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS)
@@ -1033,6 +1037,10 @@ public int getHttpReadTimeout() {
     return this.httpReadTimeout;
   }
 
+  public int getExpect100ContinueWaitTimeout() {
+    return this.expect100ContinueWaitTimeout;
+  }
+
   public long getAzureBlockSize() {
     return this.azureBlockSize;
   }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 67747359134..9afb37e35c7 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -97,6 +97,13 @@ public final class ConfigurationKeys {
    */
   public static final String AZURE_HTTP_READ_TIMEOUT = 
"fs.azure.http.read.timeout";
 
+  /**
+   * Config to set HTTP Expect100-Continue Wait Timeout Value for Rest 
Operations.
+   * Value: {@value}.
+   */
+  public static final String AZURE_EXPECT_100CONTINUE_WAIT_TIMEOUT
+      = "fs.azure.http.expect.100continue.wait.timeout";
+
   //  Retry strategy for getToken calls
   public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT = 
"fs.azure.oauth.token.fetch.retry.max.retries";
   public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF = 
"fs.azure.oauth.token.fetch.retry.min.backoff.interval";
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 58b1512ac3e..83f636bf1d1 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -58,6 +58,12 @@ public final class FileSystemConfigurations {
    */
   public static final int DEFAULT_HTTP_READ_TIMEOUT = 30_000; // 30 secs
 
+  /**
+   * Default value of connection request timeout to be used when 100continue 
is enabled.
+   * Value: {@value}.
+   */
+  public static final int DEFAULT_EXPECT_100CONTINUE_WAIT_TIMEOUT = 3_000; // 
3s
+
   // Retry parameter defaults.
   public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS = 
5;
   public static final int 
DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL = 0;
@@ -214,7 +220,7 @@ public final class FileSystemConfigurations {
   public static final long THOUSAND = 1000L;
 
   public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY
-      = HttpOperationType.JDK_HTTP_URL_CONNECTION;
+      = HttpOperationType.APACHE_HTTP_CLIENT;
 
   public static final int DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES 
= 3;
 
@@ -228,7 +234,7 @@ public final class FileSystemConfigurations {
 
   public static final int MAX_APACHE_HTTP_CLIENT_CACHE_WARMUP_COUNT = 5;
 
-  public static final int DEFAULT_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT = 5;
+  public static final int DEFAULT_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT = 3;
 
   public static final int MAX_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT = 5;
 
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java
index 1d22ae52cde..97f1e8a5c60 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java
@@ -65,6 +65,13 @@ static void registerFallback() {
     usable = false;
   }
 
+  /**
+   * In case, getting success response from apache client, sets the usable 
flag to true.
+   */
+  static void setUsable() {
+    usable = true;
+  }
+
   /**
    * @return if ApacheHttpClient is usable.
    */
@@ -73,18 +80,27 @@ static boolean usable() {
   }
 
   AbfsApacheHttpClient(DelegatingSSLSocketFactory delegatingSSLSocketFactory,
-      final AbfsConfiguration abfsConfiguration, final KeepAliveCache 
keepAliveCache,
-      URL baseUrl) {
+      final AbfsConfiguration abfsConfiguration,
+      final KeepAliveCache keepAliveCache,
+      URL baseUrl,
+      final boolean isCacheWarmupNeeded) {
     final AbfsConnectionManager connMgr = new AbfsConnectionManager(
         createSocketFactoryRegistry(
             new SSLConnectionSocketFactory(delegatingSSLSocketFactory,
                 getDefaultHostnameVerifier())),
         new AbfsHttpClientConnectionFactory(), keepAliveCache,
-        abfsConfiguration, baseUrl);
+        abfsConfiguration, baseUrl, isCacheWarmupNeeded);
     final HttpClientBuilder builder = HttpClients.custom();
     builder.setConnectionManager(connMgr)
         .setRequestExecutor(
-            new 
AbfsManagedHttpRequestExecutor(abfsConfiguration.getHttpReadTimeout()))
+            // In case of Expect:100-continue, the timeout for waiting for
+            // the 100-continue response from the server is set using
+            // ExpectWaitContinueTimeout. For other requests, the read timeout
+            // is set using SocketTimeout.
+            new AbfsManagedHttpRequestExecutor(
+                abfsConfiguration.isExpectHeaderEnabled()
+                    ? abfsConfiguration.getExpect100ContinueWaitTimeout()
+                    : abfsConfiguration.getHttpReadTimeout()))
         .disableContentCompression()
         .disableRedirectHandling()
         .disableAutomaticRetries()
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
index 77aca70990f..d6ae0427b23 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
@@ -59,6 +59,7 @@
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
 import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
@@ -188,7 +189,7 @@ public AbfsBlobClient(final URL baseUrl,
       final EncryptionContextProvider encryptionContextProvider,
       final AbfsClientContext abfsClientContext) throws IOException {
     super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider,
-        encryptionContextProvider, abfsClientContext);
+        encryptionContextProvider, abfsClientContext, AbfsServiceType.BLOB);
     this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
         abfsConfiguration.getAzureAtomicRenameDirs()
             .split(AbfsHttpConstants.COMMA)));
@@ -201,7 +202,7 @@ public AbfsBlobClient(final URL baseUrl,
       final EncryptionContextProvider encryptionContextProvider,
       final AbfsClientContext abfsClientContext) throws IOException {
     super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider,
-        encryptionContextProvider, abfsClientContext);
+        encryptionContextProvider, abfsClientContext, AbfsServiceType.BLOB);
     this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
         abfsConfiguration.getAzureAtomicRenameDirs()
             .split(AbfsHttpConstants.COMMA)));
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index ad17c1bfc20..71da8f9bda9 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -58,6 +58,7 @@
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
 import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
@@ -199,6 +200,8 @@ public abstract class AbfsClient implements Closeable {
 
   private AbfsApacheHttpClient abfsApacheHttpClient;
 
+  private AbfsServiceType abfsServiceType;
+
   /**
    * logging the rename failure if metadata is in an incomplete state.
    */
@@ -208,7 +211,8 @@ private AbfsClient(final URL baseUrl,
       final SharedKeyCredentials sharedKeyCredentials,
       final AbfsConfiguration abfsConfiguration,
       final EncryptionContextProvider encryptionContextProvider,
-      final AbfsClientContext abfsClientContext) throws IOException {
+      final AbfsClientContext abfsClientContext,
+      final AbfsServiceType abfsServiceType) throws IOException {
     this.baseUrl = baseUrl;
     this.sharedKeyCredentials = sharedKeyCredentials;
     String baseUrlString = baseUrl.toString();
@@ -220,6 +224,7 @@ private AbfsClient(final URL baseUrl,
     this.authType = abfsConfiguration.getAuthType(accountName);
     this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, 
abfsConfiguration);
     this.renameResilience = abfsConfiguration.getRenameResilience();
+    this.abfsServiceType = abfsServiceType;
 
     if (encryptionContextProvider != null) {
       this.encryptionContextProvider = encryptionContextProvider;
@@ -252,9 +257,13 @@ private AbfsClient(final URL baseUrl,
         == HttpOperationType.APACHE_HTTP_CLIENT) {
       keepAliveCache = new KeepAliveCache(abfsConfiguration);
 
+      // Warm up the connection pool during client initialization to avoid 
latency during first request.
+      // Since for every filesystem instance, we create both DFS and Blob 
client instance,
+      // so warmup is done only for the default client.
       abfsApacheHttpClient = new AbfsApacheHttpClient(
           DelegatingSSLSocketFactory.getDefaultFactory(),
-          abfsConfiguration, keepAliveCache, baseUrl);
+          abfsConfiguration, keepAliveCache, baseUrl,
+          abfsConfiguration.getFsConfiguredServiceType() == abfsServiceType);
     }
 
     this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
@@ -328,25 +337,29 @@ private AbfsClient(final URL baseUrl,
     LOG.trace("primaryUserGroup is {}", this.primaryUserGroup);
   }
 
-  public AbfsClient(final URL baseUrl, final SharedKeyCredentials 
sharedKeyCredentials,
-                    final AbfsConfiguration abfsConfiguration,
-                    final AccessTokenProvider tokenProvider,
-                    final EncryptionContextProvider encryptionContextProvider,
-                    final AbfsClientContext abfsClientContext)
+  public AbfsClient(final URL baseUrl,
+      final SharedKeyCredentials sharedKeyCredentials,
+      final AbfsConfiguration abfsConfiguration,
+      final AccessTokenProvider tokenProvider,
+      final EncryptionContextProvider encryptionContextProvider,
+      final AbfsClientContext abfsClientContext,
+      final AbfsServiceType abfsServiceType)
       throws IOException {
     this(baseUrl, sharedKeyCredentials, abfsConfiguration,
-        encryptionContextProvider, abfsClientContext);
+        encryptionContextProvider, abfsClientContext, abfsServiceType);
     this.tokenProvider = tokenProvider;
   }
 
-  public AbfsClient(final URL baseUrl, final SharedKeyCredentials 
sharedKeyCredentials,
-                    final AbfsConfiguration abfsConfiguration,
-                    final SASTokenProvider sasTokenProvider,
-                    final EncryptionContextProvider encryptionContextProvider,
-                    final AbfsClientContext abfsClientContext)
+  public AbfsClient(final URL baseUrl,
+      final SharedKeyCredentials sharedKeyCredentials,
+      final AbfsConfiguration abfsConfiguration,
+      final SASTokenProvider sasTokenProvider,
+      final EncryptionContextProvider encryptionContextProvider,
+      final AbfsClientContext abfsClientContext,
+      final AbfsServiceType abfsServiceType)
       throws IOException {
     this(baseUrl, sharedKeyCredentials, abfsConfiguration,
-        encryptionContextProvider, abfsClientContext);
+        encryptionContextProvider, abfsClientContext, abfsServiceType);
     this.sasTokenProvider = sasTokenProvider;
   }
 
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java
index f15b0b5326c..a7bf5699dc2 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java
@@ -68,13 +68,16 @@ public AbfsClientHandler(final URL baseUrl,
       final SASTokenProvider sasTokenProvider,
       final EncryptionContextProvider encryptionContextProvider,
       final AbfsClientContext abfsClientContext) throws IOException {
+    // This will initialize the default and ingress service types.
+    // This is needed before creating the clients so that we can do cache 
warmup
+    // only for default client.
+    initServiceType(abfsConfiguration);
     this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials,
         abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
         abfsClientContext);
     this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials,
         abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
         abfsClientContext);
-    initServiceType(abfsConfiguration);
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java
index 16697fa838a..efd235645cd 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java
@@ -23,6 +23,7 @@
 import java.util.UUID;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -91,26 +92,34 @@ class AbfsConnectionManager implements 
HttpClientConnectionManager {
   /**
    * The base host for which connections are managed.
    */
-  private HttpHost baseHost;
+  private final HttpHost baseHost;
 
   AbfsConnectionManager(Registry<ConnectionSocketFactory> 
socketFactoryRegistry,
-      AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac,
-      final AbfsConfiguration abfsConfiguration, final URL baseUrl) {
+      AbfsHttpClientConnectionFactory connectionFactory,
+      KeepAliveCache kac,
+      final AbfsConfiguration abfsConfiguration,
+      final URL baseUrl,
+      final boolean isCacheWarmupNeeded) {
     this.httpConnectionFactory = connectionFactory;
     this.kac = kac;
     this.connectionOperator = new DefaultHttpClientConnectionOperator(
         socketFactoryRegistry, null, null);
     this.abfsConfiguration = abfsConfiguration;
-    if (abfsConfiguration.getApacheCacheWarmupCount() > 0
+    this.baseHost = new HttpHost(baseUrl.getHost(),
+        baseUrl.getDefaultPort(), baseUrl.getProtocol());
+    if (isCacheWarmupNeeded && abfsConfiguration.getApacheCacheWarmupCount() > 0
         && kac.getFixedThreadPool() != null) {
       // Warm up the cache with connections.
       LOG.debug("Warming up the KeepAliveCache with {} connections",
           abfsConfiguration.getApacheCacheWarmupCount());
-      this.baseHost = new HttpHost(baseUrl.getHost(),
-          baseUrl.getDefaultPort(), baseUrl.getProtocol());
       HttpRoute route = new HttpRoute(baseHost, null, true);
-      cacheExtraConnection(route,
+      int totalConnectionsCreated = cacheExtraConnection(route,
           abfsConfiguration.getApacheCacheWarmupCount());
+      if (totalConnectionsCreated == 0) {
+        AbfsApacheHttpClient.registerFallback();
+      } else {
+        AbfsApacheHttpClient.setUsable();
+      }
     }
   }
 
@@ -271,7 +280,7 @@ public void connect(final HttpClientConnection conn,
     LOG.debug("Connection established: {}", conn);
     if (context instanceof AbfsManagedHttpClientContext) {
       ((AbfsManagedHttpClientContext) context).setConnectTime(
-          TimeUnit.MILLISECONDS.toMillis(System.nanoTime() - start));
+          TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
     }
   }
 
@@ -318,8 +327,9 @@ public void shutdown() {
    * @param route the HTTP route for which connections are created
    * @param numberOfConnections the number of connections to create
    */
-  private void cacheExtraConnection(final HttpRoute route,
+  private int cacheExtraConnection(final HttpRoute route,
       final int numberOfConnections) {
+    AtomicInteger totalConnectionCreated = new AtomicInteger(0);
     if (!isCacheRefreshInProgress.getAndSet(true)) {
       long start = System.nanoTime();
       CountDownLatch latch = new CountDownLatch(numberOfConnections);
@@ -333,6 +343,7 @@ private void cacheExtraConnection(final HttpRoute route,
               connect(conn, route, 
abfsConfiguration.getHttpConnectionTimeout(),
                   new AbfsManagedHttpClientContext());
               addConnectionToCache(conn);
+              totalConnectionCreated.incrementAndGet();
             } catch (Exception e) {
               LOG.debug("Error creating connection: {}", e.getMessage());
               if (conn != null) {
@@ -350,7 +361,7 @@ private void cacheExtraConnection(final HttpRoute route,
         } catch (RejectedExecutionException e) {
           LOG.debug("Task rejected for connection creation: {}",
               e.getMessage());
-          return;
+          return -1;
         }
       }
 
@@ -370,6 +381,7 @@ private void cacheExtraConnection(final HttpRoute route,
             elapsedTimeMillis(start));
       }
     }
+    return totalConnectionCreated.get();
   }
 
   /**
@@ -383,10 +395,10 @@ private void addConnectionToCache(HttpClientConnection 
conn) {
       if (((AbfsManagedApacheHttpConnection) conn).getTargetHost()
           .equals(baseHost)) {
         boolean connAddedInKac = kac.add(conn);
-        synchronized (connectionLock) {
-          connectionLock.notify(); // wake up one thread only
-        }
         if (connAddedInKac) {
+          synchronized (connectionLock) {
+            connectionLock.notify(); // wake up one thread only
+          }
           LOG.debug("Connection cached: {}", conn);
         } else {
           LOG.debug("Connection not cached, and is released: {}", conn);
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
index f88df14c342..f574f4704ab 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
@@ -53,6 +53,7 @@
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
@@ -160,7 +161,7 @@ public AbfsDfsClient(final URL baseUrl,
       final EncryptionContextProvider encryptionContextProvider,
       final AbfsClientContext abfsClientContext) throws IOException {
     super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider,
-        encryptionContextProvider, abfsClientContext);
+        encryptionContextProvider, abfsClientContext, AbfsServiceType.DFS);
   }
 
   public AbfsDfsClient(final URL baseUrl,
@@ -170,7 +171,7 @@ public AbfsDfsClient(final URL baseUrl,
       final EncryptionContextProvider encryptionContextProvider,
       final AbfsClientContext abfsClientContext) throws IOException {
     super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider,
-        encryptionContextProvider, abfsClientContext);
+        encryptionContextProvider, abfsClientContext, AbfsServiceType.DFS);
   }
 
   /**
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md 
b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
index 6695d814c93..e52555ef76f 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
@@ -890,8 +890,8 @@ ABFS Driver can use the following networking libraries:
 The networking library can be configured using the configuration 
`fs.azure.networking.library`
 while initializing the filesystem.
 Following are the supported values:
-- `JDK_HTTP_URL_CONNECTION` : Use JDK networking library [Default]
-- `APACHE_HTTP_CLIENT` : Use Apache HttpClient
+- `JDK_HTTP_URL_CONNECTION` : Use JDK networking library
+- `APACHE_HTTP_CLIENT` : Use Apache HttpClient [Default]
 
 #### <a href="ahc_networking_conf"></a>ApacheHttpClient networking layer 
configuration Options:
 
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java
index 967bf6272ab..d1ab0b3a1f8 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java
@@ -69,6 +69,8 @@ public class ITestWasbAbfsCompatibility extends 
AbstractAbfsIntegrationTest {
       LoggerFactory.getLogger(ITestWasbAbfsCompatibility.class);
 
   public ITestWasbAbfsCompatibility() throws Exception {
+    // To ensure the wasb and abfs filesystem are initialized.
+    super.setup();
     assumeThat(isIPAddress()).as("Emulator is not supported").isFalse();
     assumeHnsDisabled();
     assumeBlobServiceType();
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
index 14a8ca283c8..8505f5f3266 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
@@ -66,6 +66,8 @@
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_APACHE_HTTP_CLIENT_CACHE_WARMUP_COUNT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLUSTER_NAME;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLUSTER_TYPE;
@@ -890,8 +892,7 @@ public void 
testKeepAliveCacheInitializationWithApacheHttpClient() throws Except
     AbfsClient dfsClient = abfsClientHandler.getDfsClient();
     AbfsClient blobClient = abfsClientHandler.getBlobClient();
 
-    checkKacOnBothClientsAfterFSInit(dfsClient);
-    checkKacOnBothClientsAfterFSInit(blobClient);
+    checkKacState(dfsClient, blobClient);
   }
 
   /**
@@ -917,9 +918,7 @@ public void testStaleConnectionBehavior() throws Exception {
     AbfsClient dfsClient = abfsClientHandler.getDfsClient();
     AbfsClient blobClient = abfsClientHandler.getBlobClient();
 
-    checkKacOnBothClientsAfterFSInit(dfsClient);
-    checkKacOnBothClientsAfterFSInit(blobClient);
-
+    checkKacState(dfsClient, blobClient);
     // Wait for 5 minutes to make the cached connections stale
     // This will ensure all the connections in the KeepAliveCache are stale
     // and will be removed by the Apache HttpClient's KeepAliveStrategy.
@@ -949,11 +948,13 @@ public void testApacheConnectionReuse() throws Exception {
     AbfsClient dfsClient = abfsClientHandler.getDfsClient();
     AbfsClient blobClient = abfsClientHandler.getBlobClient();
 
-    checkKacOnBothClientsAfterFSInit(dfsClient);
-    checkKacOnBothClientsAfterFSInit(blobClient);
+    checkKacState(dfsClient, blobClient);
 
-    checkConnectionReuse(dfsClient);
-    checkConnectionReuse(blobClient);
+    if (getAbfsServiceType() == AbfsServiceType.DFS) {
+      checkConnectionReuse(dfsClient);
+    } else {
+      checkConnectionReuse(blobClient);
+    }
   }
 
   /**
@@ -969,8 +970,8 @@ public void testConnectionNotReusedOnIOException() throws 
Exception {
     AzureBlobFileSystem fs = this.getFileSystem();
 
     AbfsClientHandler abfsClientHandler = fs.getAbfsStore().getClientHandler();
-    AbfsClient dfsClient = abfsClientHandler.getDfsClient();
-    KeepAliveCache keepAliveCache = dfsClient.getKeepAliveCache();
+    AbfsClient client = abfsClientHandler.getClient();
+    KeepAliveCache keepAliveCache = client.getKeepAliveCache();
 
     HttpClientConnection connection = keepAliveCache.pollFirst();
     Assertions.assertThat(connection)
@@ -988,7 +989,7 @@ public void testConnectionNotReusedOnIOException() throws 
Exception {
 
     // First list call fail with IOException exception and that connection 
will not be reused.
     // Subsequent retry call will use a new connection from the cache.
-    dfsClient.listPath("/", false, 1,
+    client.listPath("/", false, 1,
           null, getTestTracingContext(fs, true), null);
 
     // After the failed operation, connection should NOT be reused
@@ -1019,11 +1020,15 @@ public void testNumberOfConnectionsInKacWithoutWarmup() 
throws Exception {
     AzureBlobFileSystem fs = this.getFileSystem();
     final Configuration configuration = fs.getConf();
     configuration.setInt(FS_AZURE_APACHE_HTTP_CLIENT_CACHE_WARMUP_COUNT, 0);
+    // To avoid any network calls during FS initialization
+    configuration.setBoolean(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, false);
+    
configuration.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, 
false);
     fs = this.getFileSystem(configuration);
 
     AbfsClient dfsClient = fs.getAbfsStore().getClientHandler().getDfsClient();
     AbfsClient blobClient = 
fs.getAbfsStore().getClientHandler().getBlobClient();
 
+    // In case cache is not warmed up
     Assertions.assertThat(dfsClient.getKeepAliveCache().size())
         .describedAs("KeepAliveCache will be empty when warmup count is set to 
0")
         .isEqualTo(0);
@@ -1032,13 +1037,32 @@ public void testNumberOfConnectionsInKacWithoutWarmup() 
throws Exception {
         .isEqualTo(0);
   }
 
+  /**
+   * Helper method to check the KeepAliveCache on both clients based on the
+   * configured service type.
+   * @param dfsClient AbfsClient instance for DFS endpoint
+   * @param blobClient AbfsClient instance for Blob endpoint
+   *
+   * @throws IOException if an error occurs while checking the cache
+   */
+  private void checkKacState(AbfsClient dfsClient, AbfsClient blobClient)
+      throws IOException {
+    if (getAbfsServiceType() == AbfsServiceType.DFS) {
+      checkKacOnDefaultClientsAfterFSInit(dfsClient);
+      checkKacOnNonDefaultClientsAfterFSInit(blobClient);
+    } else {
+      checkKacOnDefaultClientsAfterFSInit(blobClient);
+      checkKacOnNonDefaultClientsAfterFSInit(dfsClient);
+    }
+  }
+
   /**
    * Helper method to check the KeepAliveCache on both clients.
    * @param abfsClient AbfsClient instance to check
    *
    * @throws IOException if an error occurs while checking the cache
    */
-  private void checkKacOnBothClientsAfterFSInit(AbfsClient abfsClient) throws 
IOException {
+  private void checkKacOnDefaultClientsAfterFSInit(AbfsClient abfsClient) 
throws IOException {
     AbfsApacheHttpClient abfsApacheHttpClient = 
abfsClient.getAbfsApacheHttpClient();
     Assertions.assertThat(abfsApacheHttpClient)
         .describedAs("AbfsApacheHttpClient should not be null")
@@ -1062,6 +1086,36 @@ private void checkKacOnBothClientsAfterFSInit(AbfsClient 
abfsClient) throws IOEx
         .isEqualTo(this.getConfiguration().getApacheCacheWarmupCount() - 1);
   }
 
+  /**
+   * Helper method to check the KeepAliveCache on both clients.
+   * @param abfsClient AbfsClient instance to check
+   *
+   * @throws IOException if an error occurs while checking the cache
+   */
+  private void checkKacOnNonDefaultClientsAfterFSInit(AbfsClient abfsClient) 
throws IOException {
+    AbfsApacheHttpClient abfsApacheHttpClient = 
abfsClient.getAbfsApacheHttpClient();
+    Assertions.assertThat(abfsApacheHttpClient)
+        .describedAs("AbfsApacheHttpClient should not be null")
+        .isNotNull();
+
+    KeepAliveCache keepAliveCache = abfsClient.getKeepAliveCache();
+
+    Assertions.assertThat(keepAliveCache.size())
+        .describedAs("KeepAliveCache size should be 0 as non-default clients 
do not warmup")
+        .isEqualTo(0);
+
+    Assertions.assertThat(keepAliveCache.get())
+        .describedAs("KeepAliveCache should be null")
+        .isNull();
+
+    // 1 connection is taken in above get call, so size should be
+    // DEFAULT_APACHE_CACHE_WARMUP_CONNECTION_COUNT - 1
+    // after the get call.
+    Assertions.assertThat(keepAliveCache.size())
+        .describedAs("KeepAliveCache size should be 0 as no new connection is 
added")
+        .isEqualTo(0);
+  }
+
   /**
    * Helper method to check the KeepAliveCache after making connections stale.
    * @param abfsClient AbfsClient instance to check
@@ -1088,10 +1142,10 @@ private void 
checkKacAfterMakingConnectionsStale(AbfsClient abfsClient)
    * @throws IOException if an error occurs while checking the cache
    */
   private void checkConnectionReuse(AbfsClient abfsClient) throws IOException {
-    KeepAliveCache dfsKeepAliveCache = abfsClient.getKeepAliveCache();
+    KeepAliveCache keepAliveCache = abfsClient.getKeepAliveCache();
     for (int i = 0; i < this.getConfiguration().getApacheCacheWarmupCount(); 
i++) {
       // Check first connection in the cache before the operation
-      HttpClientConnection connection = dfsKeepAliveCache.peekFirst();
+      HttpClientConnection connection = keepAliveCache.peekFirst();
       // Perform a list operation to reuse the connection
       // This will use the first connection in the cache.
       abfsClient.listPath("/", false, 1,
@@ -1099,7 +1153,7 @@ private void checkConnectionReuse(AbfsClient abfsClient) 
throws IOException {
       // After the operation, the connection should be kept back in the last 
position
       Assertions.assertThat(connection)
           .describedAs("Connection will be put back to the cache for reuse.")
-          .isEqualTo(dfsKeepAliveCache.peekLast());
+          .isEqualTo(keepAliveCache.peekLast());
     }
   }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java
index 05313b52172..a0248ee6f6a 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 import java.io.IOException;
+import java.net.URL;
 import java.util.Map;
 
 import org.assertj.core.api.Assertions;
@@ -28,9 +29,11 @@
 import org.apache.hadoop.fs.ClosedIOException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
 import org.apache.hadoop.util.functional.Tuples;
 import org.apache.http.HttpHost;
@@ -43,7 +46,10 @@
 import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
 import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
 
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JDK_FALLBACK;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KEEP_ALIVE_CACHE_CLOSED;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_NETWORKING_LIBRARY;
@@ -71,7 +77,9 @@ public void testKacIsClosed() throws Throwable {
     configuration.unset(FS_AZURE_METRIC_FORMAT);
     try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
         configuration)) {
-      KeepAliveCache kac = 
fs.getAbfsStore().getClientHandler().getIngressClient()
+      KeepAliveCache kac = fs.getAbfsStore()
+          .getClientHandler()
+          .getIngressClient()
           .getKeepAliveCache();
       kac.close();
       AbfsDriverException ex = intercept(AbfsDriverException.class,
@@ -118,6 +126,61 @@ public void testConnectedConnectionLogging() throws 
Exception {
         .isEqualTo(4);
   }
 
+  /**
+   * Test to verify that the ApacheHttpClient falls back to JDK client
+   * when connection warmup fails.
+   * This test is applicable only for ApacheHttpClient.
+   */
+  @Test
+  public void testApacheClientFallbackDuringConnectionWarmup()
+      throws Exception {
+    try (KeepAliveCache keepAliveCache = new KeepAliveCache(
+        new AbfsConfiguration(new Configuration(), EMPTY_STRING))) {
+      // Create a connection manager with invalid URL to force fallback to JDK 
client
+      // during connection warmup.
+      // This is to simulate failure during connection warmup in the 
connection manager.
+      // The invalid URL will cause the connection manager to fail to create 
connections
+      // during warmup, forcing it to fall back to JDK client.
+      final AbfsConnectionManager connMgr = new AbfsConnectionManager(
+          RegistryBuilder.<ConnectionSocketFactory>create()
+              .register(HTTPS_SCHEME, new SSLConnectionSocketFactory(
+                  DelegatingSSLSocketFactory.getDefaultFactory(),
+                  getDefaultHostnameVerifier()))
+              .build(),
+          new AbfsHttpClientConnectionFactory(), keepAliveCache,
+          new AbfsConfiguration(new Configuration(), EMPTY_STRING),
+          new URL("https://test.com";), true);
+
+      Assertions.assertThat(AbfsApacheHttpClient.usable())
+          .describedAs("Apache HttpClient should be not usable")
+          .isFalse();
+      // Make a rest API call to verify that the client falls back to JDK 
client.
+      AzureBlobFileSystem fs = getFileSystem();
+      verifyClientRequestId(fs, JDK_FALLBACK);
+      AbfsApacheHttpClient.setUsable();
+      verifyClientRequestId(fs, APACHE_IMPL);
+    }
+  }
+
+  /**
+   * Verify that the client request id contains the expected client.
+   * @param fs AzureBlobFileSystem instance
+   * @param expectedClient Expected client in the client request id.
+   * @throws AzureBlobFileSystemException if any failure occurs during the 
operation.
+   */
+  private void verifyClientRequestId(AzureBlobFileSystem fs,
+      String expectedClient)
+      throws AzureBlobFileSystemException {
+    AbfsRestOperation op = fs.getAbfsStore()
+        .getClient()
+        .getFilesystemProperties(getTestTracingContext(fs, true));
+    String[] clientRequestIdList = op.getResult()
+        .getClientRequestId().split(COLON);
+    Assertions.assertThat(clientRequestIdList[clientRequestIdList.length - 1])
+        .describedAs("Http Client in use should be %s", expectedClient)
+        .isEqualTo(expectedClient);
+  }
+
   private Map.Entry<HttpRoute, AbfsManagedApacheHttpConnection> 
getTestConnection()
       throws IOException {
     HttpHost host = new HttpHost(getFileSystem().getUri().getHost(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to