This is an automated email from the ASF dual-hosted git repository.
anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 401003a2e73 HADOOP-19672 Nework switchover when apache client throw
error (#7967)
401003a2e73 is described below
commit 401003a2e7357e849322f671c3a71bfe6f5d0036
Author: Manish Bhatt <[email protected]>
AuthorDate: Mon Oct 13 06:18:58 2025 +0000
HADOOP-19672 Nework switchover when apache client throw error (#7967)
Contributed by Manish Bhatt.
---
.../hadoop/fs/azurebfs/AbfsConfiguration.java | 8 ++
.../fs/azurebfs/constants/ConfigurationKeys.java | 7 ++
.../constants/FileSystemConfigurations.java | 10 ++-
.../fs/azurebfs/services/AbfsApacheHttpClient.java | 24 +++++-
.../fs/azurebfs/services/AbfsBlobClient.java | 5 +-
.../hadoop/fs/azurebfs/services/AbfsClient.java | 41 +++++++----
.../fs/azurebfs/services/AbfsClientHandler.java | 5 +-
.../azurebfs/services/AbfsConnectionManager.java | 38 ++++++----
.../hadoop/fs/azurebfs/services/AbfsDfsClient.java | 5 +-
.../hadoop-azure/src/site/markdown/index.md | 4 +-
.../fs/azurebfs/ITestWasbAbfsCompatibility.java | 2 +
.../fs/azurebfs/services/ITestAbfsClient.java | 86 ++++++++++++++++++----
.../services/ITestApacheClientConnectionPool.java | 65 +++++++++++++++-
13 files changed, 243 insertions(+), 57 deletions(-)
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 0c173238c34..7c355671cf8 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -203,6 +203,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_HTTP_READ_TIMEOUT)
private int httpReadTimeout;
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
AZURE_EXPECT_100CONTINUE_WAIT_TIMEOUT,
+ DefaultValue = DEFAULT_EXPECT_100CONTINUE_WAIT_TIMEOUT)
+ private int expect100ContinueWaitTimeout;
+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT,
MinValue = 0,
DefaultValue = DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS)
@@ -1033,6 +1037,10 @@ public int getHttpReadTimeout() {
return this.httpReadTimeout;
}
+ public int getExpect100ContinueWaitTimeout() {
+ return this.expect100ContinueWaitTimeout;
+ }
+
public long getAzureBlockSize() {
return this.azureBlockSize;
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 67747359134..9afb37e35c7 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -97,6 +97,13 @@ public final class ConfigurationKeys {
*/
public static final String AZURE_HTTP_READ_TIMEOUT =
"fs.azure.http.read.timeout";
+ /**
+ * Config to set HTTP Expect100-Continue Wait Timeout Value for Rest
Operations.
+ * Value: {@value}.
+ */
+ public static final String AZURE_EXPECT_100CONTINUE_WAIT_TIMEOUT
+ = "fs.azure.http.expect.100continue.wait.timeout";
+
// Retry strategy for getToken calls
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT =
"fs.azure.oauth.token.fetch.retry.max.retries";
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF =
"fs.azure.oauth.token.fetch.retry.min.backoff.interval";
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 58b1512ac3e..83f636bf1d1 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -58,6 +58,12 @@ public final class FileSystemConfigurations {
*/
public static final int DEFAULT_HTTP_READ_TIMEOUT = 30_000; // 30 secs
+ /**
+ * Default value of connection request timeout to be used when 100continue
is enabled.
+ * Value: {@value}.
+ */
+ public static final int DEFAULT_EXPECT_100CONTINUE_WAIT_TIMEOUT = 3_000; //
3s
+
// Retry parameter defaults.
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS =
5;
public static final int
DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL = 0;
@@ -214,7 +220,7 @@ public final class FileSystemConfigurations {
public static final long THOUSAND = 1000L;
public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY
- = HttpOperationType.JDK_HTTP_URL_CONNECTION;
+ = HttpOperationType.APACHE_HTTP_CLIENT;
public static final int DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES
= 3;
@@ -228,7 +234,7 @@ public final class FileSystemConfigurations {
public static final int MAX_APACHE_HTTP_CLIENT_CACHE_WARMUP_COUNT = 5;
- public static final int DEFAULT_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT = 5;
+ public static final int DEFAULT_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT = 3;
public static final int MAX_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT = 5;
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java
index 1d22ae52cde..97f1e8a5c60 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java
@@ -65,6 +65,13 @@ static void registerFallback() {
usable = false;
}
+ /**
+ * In case, getting success response from apache client, sets the usable
flag to true.
+ */
+ static void setUsable() {
+ usable = true;
+ }
+
/**
* @return if ApacheHttpClient is usable.
*/
@@ -73,18 +80,27 @@ static boolean usable() {
}
AbfsApacheHttpClient(DelegatingSSLSocketFactory delegatingSSLSocketFactory,
- final AbfsConfiguration abfsConfiguration, final KeepAliveCache
keepAliveCache,
- URL baseUrl) {
+ final AbfsConfiguration abfsConfiguration,
+ final KeepAliveCache keepAliveCache,
+ URL baseUrl,
+ final boolean isCacheWarmupNeeded) {
final AbfsConnectionManager connMgr = new AbfsConnectionManager(
createSocketFactoryRegistry(
new SSLConnectionSocketFactory(delegatingSSLSocketFactory,
getDefaultHostnameVerifier())),
new AbfsHttpClientConnectionFactory(), keepAliveCache,
- abfsConfiguration, baseUrl);
+ abfsConfiguration, baseUrl, isCacheWarmupNeeded);
final HttpClientBuilder builder = HttpClients.custom();
builder.setConnectionManager(connMgr)
.setRequestExecutor(
- new
AbfsManagedHttpRequestExecutor(abfsConfiguration.getHttpReadTimeout()))
+ // In case of Expect:100-continue, the timeout for waiting for
+ // the 100-continue response from the server is set using
+ // ExpectWaitContinueTimeout. For other requests, the read timeout
+ // is set using SocketTimeout.
+ new AbfsManagedHttpRequestExecutor(
+ abfsConfiguration.isExpectHeaderEnabled()
+ ? abfsConfiguration.getExpect100ContinueWaitTimeout()
+ : abfsConfiguration.getHttpReadTimeout()))
.disableContentCompression()
.disableRedirectHandling()
.disableAutomaticRetries()
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
index 77aca70990f..d6ae0427b23 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
@@ -59,6 +59,7 @@
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
@@ -188,7 +189,7 @@ public AbfsBlobClient(final URL baseUrl,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider,
- encryptionContextProvider, abfsClientContext);
+ encryptionContextProvider, abfsClientContext, AbfsServiceType.BLOB);
this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAzureAtomicRenameDirs()
.split(AbfsHttpConstants.COMMA)));
@@ -201,7 +202,7 @@ public AbfsBlobClient(final URL baseUrl,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider,
- encryptionContextProvider, abfsClientContext);
+ encryptionContextProvider, abfsClientContext, AbfsServiceType.BLOB);
this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAzureAtomicRenameDirs()
.split(AbfsHttpConstants.COMMA)));
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index ad17c1bfc20..71da8f9bda9 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -58,6 +58,7 @@
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
@@ -199,6 +200,8 @@ public abstract class AbfsClient implements Closeable {
private AbfsApacheHttpClient abfsApacheHttpClient;
+ private AbfsServiceType abfsServiceType;
+
/**
* logging the rename failure if metadata is in an incomplete state.
*/
@@ -208,7 +211,8 @@ private AbfsClient(final URL baseUrl,
final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final EncryptionContextProvider encryptionContextProvider,
- final AbfsClientContext abfsClientContext) throws IOException {
+ final AbfsClientContext abfsClientContext,
+ final AbfsServiceType abfsServiceType) throws IOException {
this.baseUrl = baseUrl;
this.sharedKeyCredentials = sharedKeyCredentials;
String baseUrlString = baseUrl.toString();
@@ -220,6 +224,7 @@ private AbfsClient(final URL baseUrl,
this.authType = abfsConfiguration.getAuthType(accountName);
this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName,
abfsConfiguration);
this.renameResilience = abfsConfiguration.getRenameResilience();
+ this.abfsServiceType = abfsServiceType;
if (encryptionContextProvider != null) {
this.encryptionContextProvider = encryptionContextProvider;
@@ -252,9 +257,13 @@ private AbfsClient(final URL baseUrl,
== HttpOperationType.APACHE_HTTP_CLIENT) {
keepAliveCache = new KeepAliveCache(abfsConfiguration);
+ // Warm up the connection pool during client initialization to avoid
latency during first request.
+ // Since for every filesystem instance, we create both DFS and Blob
client instance,
+ // so warmup is done only for the default client.
abfsApacheHttpClient = new AbfsApacheHttpClient(
DelegatingSSLSocketFactory.getDefaultFactory(),
- abfsConfiguration, keepAliveCache, baseUrl);
+ abfsConfiguration, keepAliveCache, baseUrl,
+ abfsConfiguration.getFsConfiguredServiceType() == abfsServiceType);
}
this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
@@ -328,25 +337,29 @@ private AbfsClient(final URL baseUrl,
LOG.trace("primaryUserGroup is {}", this.primaryUserGroup);
}
- public AbfsClient(final URL baseUrl, final SharedKeyCredentials
sharedKeyCredentials,
- final AbfsConfiguration abfsConfiguration,
- final AccessTokenProvider tokenProvider,
- final EncryptionContextProvider encryptionContextProvider,
- final AbfsClientContext abfsClientContext)
+ public AbfsClient(final URL baseUrl,
+ final SharedKeyCredentials sharedKeyCredentials,
+ final AbfsConfiguration abfsConfiguration,
+ final AccessTokenProvider tokenProvider,
+ final EncryptionContextProvider encryptionContextProvider,
+ final AbfsClientContext abfsClientContext,
+ final AbfsServiceType abfsServiceType)
throws IOException {
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
- encryptionContextProvider, abfsClientContext);
+ encryptionContextProvider, abfsClientContext, abfsServiceType);
this.tokenProvider = tokenProvider;
}
- public AbfsClient(final URL baseUrl, final SharedKeyCredentials
sharedKeyCredentials,
- final AbfsConfiguration abfsConfiguration,
- final SASTokenProvider sasTokenProvider,
- final EncryptionContextProvider encryptionContextProvider,
- final AbfsClientContext abfsClientContext)
+ public AbfsClient(final URL baseUrl,
+ final SharedKeyCredentials sharedKeyCredentials,
+ final AbfsConfiguration abfsConfiguration,
+ final SASTokenProvider sasTokenProvider,
+ final EncryptionContextProvider encryptionContextProvider,
+ final AbfsClientContext abfsClientContext,
+ final AbfsServiceType abfsServiceType)
throws IOException {
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
- encryptionContextProvider, abfsClientContext);
+ encryptionContextProvider, abfsClientContext, abfsServiceType);
this.sasTokenProvider = sasTokenProvider;
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java
index f15b0b5326c..a7bf5699dc2 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java
@@ -68,13 +68,16 @@ public AbfsClientHandler(final URL baseUrl,
final SASTokenProvider sasTokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
+ // This will initialize the default and ingress service types.
+ // This is needed before creating the clients so that we can do cache
warmup
+ // only for default client.
+ initServiceType(abfsConfiguration);
this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
abfsClientContext);
this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
abfsClientContext);
- initServiceType(abfsConfiguration);
}
/**
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java
index 16697fa838a..efd235645cd 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java
@@ -23,6 +23,7 @@
import java.util.UUID;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -91,26 +92,34 @@ class AbfsConnectionManager implements
HttpClientConnectionManager {
/**
* The base host for which connections are managed.
*/
- private HttpHost baseHost;
+ private final HttpHost baseHost;
AbfsConnectionManager(Registry<ConnectionSocketFactory>
socketFactoryRegistry,
- AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac,
- final AbfsConfiguration abfsConfiguration, final URL baseUrl) {
+ AbfsHttpClientConnectionFactory connectionFactory,
+ KeepAliveCache kac,
+ final AbfsConfiguration abfsConfiguration,
+ final URL baseUrl,
+ final boolean isCacheWarmupNeeded) {
this.httpConnectionFactory = connectionFactory;
this.kac = kac;
this.connectionOperator = new DefaultHttpClientConnectionOperator(
socketFactoryRegistry, null, null);
this.abfsConfiguration = abfsConfiguration;
- if (abfsConfiguration.getApacheCacheWarmupCount() > 0
+ this.baseHost = new HttpHost(baseUrl.getHost(),
+ baseUrl.getDefaultPort(), baseUrl.getProtocol());
+ if (isCacheWarmupNeeded && abfsConfiguration.getApacheCacheWarmupCount() > 0
&& kac.getFixedThreadPool() != null) {
// Warm up the cache with connections.
LOG.debug("Warming up the KeepAliveCache with {} connections",
abfsConfiguration.getApacheCacheWarmupCount());
- this.baseHost = new HttpHost(baseUrl.getHost(),
- baseUrl.getDefaultPort(), baseUrl.getProtocol());
HttpRoute route = new HttpRoute(baseHost, null, true);
- cacheExtraConnection(route,
+ int totalConnectionsCreated = cacheExtraConnection(route,
abfsConfiguration.getApacheCacheWarmupCount());
+ if (totalConnectionsCreated == 0) {
+ AbfsApacheHttpClient.registerFallback();
+ } else {
+ AbfsApacheHttpClient.setUsable();
+ }
}
}
@@ -271,7 +280,7 @@ public void connect(final HttpClientConnection conn,
LOG.debug("Connection established: {}", conn);
if (context instanceof AbfsManagedHttpClientContext) {
((AbfsManagedHttpClientContext) context).setConnectTime(
- TimeUnit.MILLISECONDS.toMillis(System.nanoTime() - start));
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
}
}
@@ -318,8 +327,9 @@ public void shutdown() {
* @param route the HTTP route for which connections are created
* @param numberOfConnections the number of connections to create
*/
- private void cacheExtraConnection(final HttpRoute route,
+ private int cacheExtraConnection(final HttpRoute route,
final int numberOfConnections) {
+ AtomicInteger totalConnectionCreated = new AtomicInteger(0);
if (!isCacheRefreshInProgress.getAndSet(true)) {
long start = System.nanoTime();
CountDownLatch latch = new CountDownLatch(numberOfConnections);
@@ -333,6 +343,7 @@ private void cacheExtraConnection(final HttpRoute route,
connect(conn, route,
abfsConfiguration.getHttpConnectionTimeout(),
new AbfsManagedHttpClientContext());
addConnectionToCache(conn);
+ totalConnectionCreated.incrementAndGet();
} catch (Exception e) {
LOG.debug("Error creating connection: {}", e.getMessage());
if (conn != null) {
@@ -350,7 +361,7 @@ private void cacheExtraConnection(final HttpRoute route,
} catch (RejectedExecutionException e) {
LOG.debug("Task rejected for connection creation: {}",
e.getMessage());
- return;
+ return -1;
}
}
@@ -370,6 +381,7 @@ private void cacheExtraConnection(final HttpRoute route,
elapsedTimeMillis(start));
}
}
+ return totalConnectionCreated.get();
}
/**
@@ -383,10 +395,10 @@ private void addConnectionToCache(HttpClientConnection
conn) {
if (((AbfsManagedApacheHttpConnection) conn).getTargetHost()
.equals(baseHost)) {
boolean connAddedInKac = kac.add(conn);
- synchronized (connectionLock) {
- connectionLock.notify(); // wake up one thread only
- }
if (connAddedInKac) {
+ synchronized (connectionLock) {
+ connectionLock.notify(); // wake up one thread only
+ }
LOG.debug("Connection cached: {}", conn);
} else {
LOG.debug("Connection not cached, and is released: {}", conn);
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
index f88df14c342..f574f4704ab 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
@@ -53,6 +53,7 @@
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
@@ -160,7 +161,7 @@ public AbfsDfsClient(final URL baseUrl,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider,
- encryptionContextProvider, abfsClientContext);
+ encryptionContextProvider, abfsClientContext, AbfsServiceType.DFS);
}
public AbfsDfsClient(final URL baseUrl,
@@ -170,7 +171,7 @@ public AbfsDfsClient(final URL baseUrl,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider,
- encryptionContextProvider, abfsClientContext);
+ encryptionContextProvider, abfsClientContext, AbfsServiceType.DFS);
}
/**
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md
b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
index 6695d814c93..e52555ef76f 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
@@ -890,8 +890,8 @@ ABFS Driver can use the following networking libraries:
The networking library can be configured using the configuration
`fs.azure.networking.library`
while initializing the filesystem.
Following are the supported values:
-- `JDK_HTTP_URL_CONNECTION` : Use JDK networking library [Default]
-- `APACHE_HTTP_CLIENT` : Use Apache HttpClient
+- `JDK_HTTP_URL_CONNECTION` : Use JDK networking library
+- `APACHE_HTTP_CLIENT` : Use Apache HttpClient [Default]
#### <a href="ahc_networking_conf"></a>ApacheHttpClient networking layer
configuration Options:
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java
index 967bf6272ab..d1ab0b3a1f8 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java
@@ -69,6 +69,8 @@ public class ITestWasbAbfsCompatibility extends
AbstractAbfsIntegrationTest {
LoggerFactory.getLogger(ITestWasbAbfsCompatibility.class);
public ITestWasbAbfsCompatibility() throws Exception {
+ // To ensure the wasb and abfs filesystem are initialized.
+ super.setup();
assumeThat(isIPAddress()).as("Emulator is not supported").isFalse();
assumeHnsDisabled();
assumeBlobServiceType();
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
index 14a8ca283c8..8505f5f3266 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
@@ -66,6 +66,8 @@
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_APACHE_HTTP_CLIENT_CACHE_WARMUP_COUNT;
import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLUSTER_NAME;
import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLUSTER_TYPE;
@@ -890,8 +892,7 @@ public void
testKeepAliveCacheInitializationWithApacheHttpClient() throws Except
AbfsClient dfsClient = abfsClientHandler.getDfsClient();
AbfsClient blobClient = abfsClientHandler.getBlobClient();
- checkKacOnBothClientsAfterFSInit(dfsClient);
- checkKacOnBothClientsAfterFSInit(blobClient);
+ checkKacState(dfsClient, blobClient);
}
/**
@@ -917,9 +918,7 @@ public void testStaleConnectionBehavior() throws Exception {
AbfsClient dfsClient = abfsClientHandler.getDfsClient();
AbfsClient blobClient = abfsClientHandler.getBlobClient();
- checkKacOnBothClientsAfterFSInit(dfsClient);
- checkKacOnBothClientsAfterFSInit(blobClient);
-
+ checkKacState(dfsClient, blobClient);
// Wait for 5 minutes to make the cached connections stale
// This will ensure all the connections in the KeepAliveCache are stale
// and will be removed by the Apache HttpClient's KeepAliveStrategy.
@@ -949,11 +948,13 @@ public void testApacheConnectionReuse() throws Exception {
AbfsClient dfsClient = abfsClientHandler.getDfsClient();
AbfsClient blobClient = abfsClientHandler.getBlobClient();
- checkKacOnBothClientsAfterFSInit(dfsClient);
- checkKacOnBothClientsAfterFSInit(blobClient);
+ checkKacState(dfsClient, blobClient);
- checkConnectionReuse(dfsClient);
- checkConnectionReuse(blobClient);
+ if (getAbfsServiceType() == AbfsServiceType.DFS) {
+ checkConnectionReuse(dfsClient);
+ } else {
+ checkConnectionReuse(blobClient);
+ }
}
/**
@@ -969,8 +970,8 @@ public void testConnectionNotReusedOnIOException() throws
Exception {
AzureBlobFileSystem fs = this.getFileSystem();
AbfsClientHandler abfsClientHandler = fs.getAbfsStore().getClientHandler();
- AbfsClient dfsClient = abfsClientHandler.getDfsClient();
- KeepAliveCache keepAliveCache = dfsClient.getKeepAliveCache();
+ AbfsClient client = abfsClientHandler.getClient();
+ KeepAliveCache keepAliveCache = client.getKeepAliveCache();
HttpClientConnection connection = keepAliveCache.pollFirst();
Assertions.assertThat(connection)
@@ -988,7 +989,7 @@ public void testConnectionNotReusedOnIOException() throws
Exception {
// First list call fail with IOException exception and that connection
will not be reused.
// Subsequent retry call will use a new connection from the cache.
- dfsClient.listPath("/", false, 1,
+ client.listPath("/", false, 1,
null, getTestTracingContext(fs, true), null);
// After the failed operation, connection should NOT be reused
@@ -1019,11 +1020,15 @@ public void testNumberOfConnectionsInKacWithoutWarmup()
throws Exception {
AzureBlobFileSystem fs = this.getFileSystem();
final Configuration configuration = fs.getConf();
configuration.setInt(FS_AZURE_APACHE_HTTP_CLIENT_CACHE_WARMUP_COUNT, 0);
+ // To avoid any network calls during FS initialization
+ configuration.setBoolean(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, false);
+
configuration.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
false);
fs = this.getFileSystem(configuration);
AbfsClient dfsClient = fs.getAbfsStore().getClientHandler().getDfsClient();
AbfsClient blobClient =
fs.getAbfsStore().getClientHandler().getBlobClient();
+ // In case cache is not warmed up
Assertions.assertThat(dfsClient.getKeepAliveCache().size())
.describedAs("KeepAliveCache will be empty when warmup count is set to
0")
.isEqualTo(0);
@@ -1032,13 +1037,32 @@ public void testNumberOfConnectionsInKacWithoutWarmup()
throws Exception {
.isEqualTo(0);
}
+ /**
+ * Helper method to check the KeepAliveCache on both clients based on the
+ * configured service type.
+ * @param dfsClient AbfsClient instance for DFS endpoint
+ * @param blobClient AbfsClient instance for Blob endpoint
+ *
+ * @throws IOException if an error occurs while checking the cache
+ */
+ private void checkKacState(AbfsClient dfsClient, AbfsClient blobClient)
+ throws IOException {
+ if (getAbfsServiceType() == AbfsServiceType.DFS) {
+ checkKacOnDefaultClientsAfterFSInit(dfsClient);
+ checkKacOnNonDefaultClientsAfterFSInit(blobClient);
+ } else {
+ checkKacOnDefaultClientsAfterFSInit(blobClient);
+ checkKacOnNonDefaultClientsAfterFSInit(dfsClient);
+ }
+ }
+
/**
* Helper method to check the KeepAliveCache on both clients.
* @param abfsClient AbfsClient instance to check
*
* @throws IOException if an error occurs while checking the cache
*/
- private void checkKacOnBothClientsAfterFSInit(AbfsClient abfsClient) throws
IOException {
+ private void checkKacOnDefaultClientsAfterFSInit(AbfsClient abfsClient)
throws IOException {
AbfsApacheHttpClient abfsApacheHttpClient =
abfsClient.getAbfsApacheHttpClient();
Assertions.assertThat(abfsApacheHttpClient)
.describedAs("AbfsApacheHttpClient should not be null")
@@ -1062,6 +1086,36 @@ private void checkKacOnBothClientsAfterFSInit(AbfsClient
abfsClient) throws IOEx
.isEqualTo(this.getConfiguration().getApacheCacheWarmupCount() - 1);
}
+ /**
+ * Helper method to check the KeepAliveCache on both clients.
+ * @param abfsClient AbfsClient instance to check
+ *
+ * @throws IOException if an error occurs while checking the cache
+ */
+ private void checkKacOnNonDefaultClientsAfterFSInit(AbfsClient abfsClient)
throws IOException {
+ AbfsApacheHttpClient abfsApacheHttpClient =
abfsClient.getAbfsApacheHttpClient();
+ Assertions.assertThat(abfsApacheHttpClient)
+ .describedAs("AbfsApacheHttpClient should not be null")
+ .isNotNull();
+
+ KeepAliveCache keepAliveCache = abfsClient.getKeepAliveCache();
+
+ Assertions.assertThat(keepAliveCache.size())
+ .describedAs("KeepAliveCache size should be 0 as non-default clients
do not warmup")
+ .isEqualTo(0);
+
+ Assertions.assertThat(keepAliveCache.get())
+ .describedAs("KeepAliveCache should be null")
+ .isNull();
+
+ // 1 connection is taken in above get call, so size should be
+ // DEFAULT_APACHE_CACHE_WARMUP_CONNECTION_COUNT - 1
+ // after the get call.
+ Assertions.assertThat(keepAliveCache.size())
+ .describedAs("KeepAliveCache size should be 0 as no new connection is
added")
+ .isEqualTo(0);
+ }
+
/**
* Helper method to check the KeepAliveCache after making connections stale.
* @param abfsClient AbfsClient instance to check
@@ -1088,10 +1142,10 @@ private void
checkKacAfterMakingConnectionsStale(AbfsClient abfsClient)
* @throws IOException if an error occurs while checking the cache
*/
private void checkConnectionReuse(AbfsClient abfsClient) throws IOException {
- KeepAliveCache dfsKeepAliveCache = abfsClient.getKeepAliveCache();
+ KeepAliveCache keepAliveCache = abfsClient.getKeepAliveCache();
for (int i = 0; i < this.getConfiguration().getApacheCacheWarmupCount();
i++) {
// Check first connection in the cache before the operation
- HttpClientConnection connection = dfsKeepAliveCache.peekFirst();
+ HttpClientConnection connection = keepAliveCache.peekFirst();
// Perform a list operation to reuse the connection
// This will use the first connection in the cache.
abfsClient.listPath("/", false, 1,
@@ -1099,7 +1153,7 @@ private void checkConnectionReuse(AbfsClient abfsClient)
throws IOException {
// After the operation, the connection should be kept back in the last
position
Assertions.assertThat(connection)
.describedAs("Connection will be put back to the cache for reuse.")
- .isEqualTo(dfsKeepAliveCache.peekLast());
+ .isEqualTo(keepAliveCache.peekLast());
}
}
}
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java
index 05313b52172..a0248ee6f6a 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
+import java.net.URL;
import java.util.Map;
import org.assertj.core.api.Assertions;
@@ -28,9 +29,11 @@
import org.apache.hadoop.fs.ClosedIOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.util.functional.Tuples;
import org.apache.http.HttpHost;
@@ -43,7 +46,10 @@
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JDK_FALLBACK;
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KEEP_ALIVE_CACHE_CLOSED;
import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT;
import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_NETWORKING_LIBRARY;
@@ -71,7 +77,9 @@ public void testKacIsClosed() throws Throwable {
configuration.unset(FS_AZURE_METRIC_FORMAT);
try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
configuration)) {
- KeepAliveCache kac =
fs.getAbfsStore().getClientHandler().getIngressClient()
+ KeepAliveCache kac = fs.getAbfsStore()
+ .getClientHandler()
+ .getIngressClient()
.getKeepAliveCache();
kac.close();
AbfsDriverException ex = intercept(AbfsDriverException.class,
@@ -118,6 +126,61 @@ public void testConnectedConnectionLogging() throws
Exception {
.isEqualTo(4);
}
+ /**
+ * Test to verify that the ApacheHttpClient falls back to JDK client
+ * when connection warmup fails.
+ * This test is applicable only for ApacheHttpClient.
+ */
+ @Test
+ public void testApacheClientFallbackDuringConnectionWarmup()
+ throws Exception {
+ try (KeepAliveCache keepAliveCache = new KeepAliveCache(
+ new AbfsConfiguration(new Configuration(), EMPTY_STRING))) {
+ // Create a connection manager with invalid URL to force fallback to JDK
client
+ // during connection warmup.
+ // This is to simulate failure during connection warmup in the
connection manager.
+ // The invalid URL will cause the connection manager to fail to create
connections
+ // during warmup, forcing it to fall back to JDK client.
+ final AbfsConnectionManager connMgr = new AbfsConnectionManager(
+ RegistryBuilder.<ConnectionSocketFactory>create()
+ .register(HTTPS_SCHEME, new SSLConnectionSocketFactory(
+ DelegatingSSLSocketFactory.getDefaultFactory(),
+ getDefaultHostnameVerifier()))
+ .build(),
+ new AbfsHttpClientConnectionFactory(), keepAliveCache,
+ new AbfsConfiguration(new Configuration(), EMPTY_STRING),
+ new URL("https://test.com"), true);
+
+ Assertions.assertThat(AbfsApacheHttpClient.usable())
+ .describedAs("Apache HttpClient should be not usable")
+ .isFalse();
+ // Make a rest API call to verify that the client falls back to JDK
client.
+ AzureBlobFileSystem fs = getFileSystem();
+ verifyClientRequestId(fs, JDK_FALLBACK);
+ AbfsApacheHttpClient.setUsable();
+ verifyClientRequestId(fs, APACHE_IMPL);
+ }
+ }
+
+ /**
+ * Verify that the client request id contains the expected client.
+ * @param fs AzureBlobFileSystem instance
+ * @param expectedClient Expected client in the client request id.
+ * @throws AzureBlobFileSystemException if any failure occurs during the
operation.
+ */
+ private void verifyClientRequestId(AzureBlobFileSystem fs,
+ String expectedClient)
+ throws AzureBlobFileSystemException {
+ AbfsRestOperation op = fs.getAbfsStore()
+ .getClient()
+ .getFilesystemProperties(getTestTracingContext(fs, true));
+ String[] clientRequestIdList = op.getResult()
+ .getClientRequestId().split(COLON);
+ Assertions.assertThat(clientRequestIdList[clientRequestIdList.length - 1])
+ .describedAs("Http Client in use should be %s", expectedClient)
+ .isEqualTo(expectedClient);
+ }
+
private Map.Entry<HttpRoute, AbfsManagedApacheHttpConnection>
getTestConnection()
throws IOException {
HttpHost host = new HttpHost(getFileSystem().getUri().getHost(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]