This is an automated email from the ASF dual-hosted git repository.
anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 422ac9bf25d HADOOP-19766. [ABFS] [FnsBlob] Force Flat Namespace
accounts to use AbfsBlobClient.
422ac9bf25d is described below
commit 422ac9bf25df87a2e86c854c070da00694e5a7d4
Author: manika137 <[email protected]>
AuthorDate: Sun Jan 11 22:31:25 2026 -0800
HADOOP-19766. [ABFS] [FnsBlob] Force Flat Namespace accounts to use
AbfsBlobClient.
Contributed by Manika Joshi
---
.../dev-support/testrun-scripts/runtests.sh | 41 +-----
.../hadoop/fs/azurebfs/AbfsConfiguration.java | 40 ++++--
.../hadoop/fs/azurebfs/AzureBlobFileSystem.java | 14 +-
.../fs/azurebfs/AzureBlobFileSystemStore.java | 15 +-
.../fs/azurebfs/oauth2/MsiTokenProvider.java | 8 ++
.../hadoop/fs/azurebfs/services/AbfsClient.java | 3 +-
.../fs/azurebfs/services/AbfsClientHandler.java | 27 ++++
.../fs/azurebfs/services/AbfsOutputStream.java | 6 +-
.../fs/azurebfs/services/ListResponseData.java | 7 +
.../fs/azurebfs/services/ListingSupport.java | 4 +
.../apache/hadoop/fs/azurebfs/utils/Listener.java | 1 +
.../hadoop/fs/azurebfs/utils/TracingContext.java | 19 ++-
.../fs/azurebfs/utils/TracingHeaderVersion.java | 4 +-
.../hadoop-azure/src/site/markdown/index.md | 60 +++++++-
.../azurebfs/ITestAzureBlobFileSystemAppend.java | 154 ++++++++++++---------
.../ITestAzureBlobFileSystemInitAndCreate.java | 94 ++++++++-----
.../fs/azurebfs/ITestGetNameSpaceEnabled.java | 2 +-
.../fs/azurebfs/services/ITestAbfsClient.java | 54 +++++---
.../azurebfs/services/ITestAbfsOutputStream.java | 110 +++++++++++++++
.../services/TestAbfsRenameRetryRecovery.java | 54 --------
.../fs/azurebfs/utils/TracingHeaderValidator.java | 7 +
21 files changed, 484 insertions(+), 240 deletions(-)
diff --git a/hadoop-tools/hadoop-azure/dev-support/testrun-scripts/runtests.sh
b/hadoop-tools/hadoop-azure/dev-support/testrun-scripts/runtests.sh
index a0039ced06d..88c50173a86 100755
--- a/hadoop-tools/hadoop-azure/dev-support/testrun-scripts/runtests.sh
+++ b/hadoop-tools/hadoop-azure/dev-support/testrun-scripts/runtests.sh
@@ -48,14 +48,6 @@ runHNSSharedKeyDFSTest()
triggerRun "HNS-SharedKey-DFS" "$accountName" "$runTest" $processCount
"$cleanUpTestContainers"
}
-runNonHNSSharedKeyDFSTest()
-{
- accountName=$(xmlstarlet sel -t -v '//property[name =
"fs.azure.nonHnsTestAccountName"]/value' -n $azureTestXmlPath)
- PROPERTIES=("fs.azure.account.auth.type")
- VALUES=("SharedKey")
- triggerRun "NonHNS-SharedKey-DFS" "$accountName" "$runTest" $processCount
"$cleanUpTestContainers"
-}
-
runAppendBlobHNSOAuthDFSTest()
{
accountName=$(xmlstarlet sel -t -v '//property[name =
"fs.azure.hnsTestAccountName"]/value' -n $azureTestXmlPath)
@@ -73,14 +65,6 @@ runNonHNSSharedKeyBlobTest()
triggerRun "NonHNS-SharedKey-Blob" "${accountName}_blob" "$runTest"
$processCount "$cleanUpTestContainers"
}
-runNonHNSOAuthDFSTest()
-{
- accountName=$(xmlstarlet sel -t -v '//property[name =
"fs.azure.nonHnsTestAccountName"]/value' -n $azureTestXmlPath)
- PROPERTIES=("fs.azure.account.auth.type")
- VALUES=("OAuth")
- triggerRun "NonHNS-OAuth-DFS" "$accountName" "$runTest" $processCount
"$cleanUpTestContainers"
-}
-
runNonHNSOAuthBlobTest()
{
accountName=$(xmlstarlet sel -t -v '//property[name =
"fs.azure.nonHnsTestAccountName"]/value' -n $azureTestXmlPath)
@@ -107,14 +91,6 @@ runHNSOAuthDFSIngressBlobTest()
triggerRun "HNS-Oauth-DFS-IngressBlob" "$accountName" "$runTest"
$processCount "$cleanUpTestContainers"
}
-runNonHNSOAuthDFSIngressBlobTest()
-{
- accountName=$(xmlstarlet sel -t -v '//property[name =
"fs.azure.nonHnsTestAccountName"]/value' -n $azureTestXmlPath)
- PROPERTIES=("fs.azure.account.auth.type" "fs.azure.ingress.service.type")
- VALUES=("OAuth" "blob")
- triggerRun "NonHNS-OAuth-DFS-IngressBlob" "$accountName" "$runTest"
$processCount "$cleanUpTestContainers"
-}
-
runTest=false
cleanUpTestContainers=false
echo 'Ensure below are complete before running script:'
@@ -181,7 +157,7 @@ done
echo ' '
echo 'Set the active test combination to run the action:'
-select combo in HNS-OAuth-DFS HNS-SharedKey-DFS NonHNS-SharedKey-DFS
AppendBlob-HNS-OAuth-DFS NonHNS-SharedKey-Blob NonHNS-OAuth-DFS
NonHNS-OAuth-Blob AppendBlob-NonHNS-OAuth-Blob HNS-Oauth-DFS-IngressBlob
NonHNS-Oauth-DFS-IngressBlob AllCombinationsTestRun Quit
+select combo in HNS-OAuth-DFS HNS-SharedKey-DFS AppendBlob-HNS-OAuth-DFS
NonHNS-SharedKey-Blob NonHNS-OAuth-Blob AppendBlob-NonHNS-OAuth-Blob
HNS-Oauth-DFS-IngressBlob AllCombinationsTestRun Quit
do
case $combo in
HNS-OAuth-DFS)
@@ -192,10 +168,6 @@ do
runHNSSharedKeyDFSTest
break
;;
- NonHNS-SharedKey-DFS)
- runNonHNSSharedKeyDFSTest
- break
- ;;
AppendBlob-HNS-OAuth-DFS)
runAppendBlobHNSOAuthDFSTest
break
@@ -204,10 +176,6 @@ do
runNonHNSSharedKeyBlobTest
break
;;
- NonHNS-OAuth-DFS)
- runNonHNSOAuthDFSTest
- break
- ;;
NonHNS-OAuth-Blob)
runNonHNSOAuthBlobTest
break
@@ -220,10 +188,6 @@ do
runHNSOAuthDFSIngressBlobTest
break
;;
- NonHNS-Oauth-DFS-IngressBlob)
- runNonHNSOAuthDFSIngressBlobTest
- break
- ;;
AllCombinationsTestRun)
if [ $runTest == false ]
then
@@ -232,14 +196,11 @@ do
fi
runHNSOAuthDFSTest
runHNSSharedKeyDFSTest
- runNonHNSSharedKeyDFSTest
runAppendBlobHNSOAuthDFSTest
runNonHNSSharedKeyBlobTest
- runNonHNSOAuthDFSTest
runNonHNSOAuthBlobTest
runAppendBlobNonHNSOAuthBlobTest
runHNSOAuthDFSIngressBlobTest
- runNonHNSOAuthDFSIngressBlobTest
break
;;
Quit)
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index e7591292c91..080240d9587 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -80,7 +80,6 @@
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*;
-import static
org.apache.hadoop.fs.azurebfs.services.AbfsErrors.INCORRECT_INGRESS_TYPE;
/**
* Configuration for Azure Blob FileSystem.
@@ -93,7 +92,7 @@ public class AbfsConfiguration{
private final String accountName;
private String fsName;
// Service type identified from URL used to initialize FileSystem.
- private final AbfsServiceType fsConfiguredServiceType;
+ private AbfsServiceType fsConfiguredServiceTypeFromUrl;
private final boolean isSecure;
private static final Logger LOG =
LoggerFactory.getLogger(AbfsConfiguration.class);
private Trilean isNamespaceEnabled = null;
@@ -663,18 +662,18 @@ public class AbfsConfiguration{
* Constructor for AbfsConfiguration for specified service type.
* @param rawConfig used to initialize the configuration.
* @param accountName the name of the azure storage account.
- * @param fsConfiguredServiceType service type configured for the file
system.
+ * @param fsConfiguredServiceTypeFromUrl service type configured for the
file system.
* @throws IllegalAccessException if the field is not accessible.
* @throws IOException if an I/O error occurs.
*/
public AbfsConfiguration(final Configuration rawConfig,
String accountName,
- AbfsServiceType fsConfiguredServiceType)
+ AbfsServiceType fsConfiguredServiceTypeFromUrl)
throws IllegalAccessException, IOException {
this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders(
rawConfig, AzureBlobFileSystem.class);
this.accountName = accountName;
- this.fsConfiguredServiceType = fsConfiguredServiceType;
+ this.fsConfiguredServiceTypeFromUrl = fsConfiguredServiceTypeFromUrl;
this.isSecure = getBoolean(FS_AZURE_SECURE_MODE, false);
Field[] fields = this.getClass().getDeclaredFields();
@@ -701,16 +700,16 @@ public AbfsConfiguration(final Configuration rawConfig,
* @param rawConfig used to initialize the configuration.
* @param accountName the name of the azure storage account.
* @param fsName the name of the file system (container name).
- * @param fsConfiguredServiceType service type configured for the file
system.
+ * @param fsConfiguredServiceTypeFromUrl service type configured for the
file system.
* @throws IllegalAccessException if the field is not accessible.
* @throws IOException if an I/O error occurs.
*/
public AbfsConfiguration(final Configuration rawConfig,
String accountName,
String fsName,
- AbfsServiceType fsConfiguredServiceType)
+ AbfsServiceType fsConfiguredServiceTypeFromUrl)
throws IllegalAccessException, IOException {
- this(rawConfig, accountName, fsConfiguredServiceType);
+ this(rawConfig, accountName, fsConfiguredServiceTypeFromUrl);
this.fsName = fsName;
}
@@ -749,7 +748,16 @@ public Trilean getIsNamespaceEnabledAccount() {
* @return the service type.
*/
public AbfsServiceType getFsConfiguredServiceType() {
- return getCaseInsensitiveEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE,
fsConfiguredServiceType);
+ return getCaseInsensitiveEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE,
fsConfiguredServiceTypeFromUrl);
+ }
+
+ /**
+ * Returns the service type identified from the URL used to initialize the
FileSystem.
+ *
+ * @return the configured AbfsServiceType from the URL
+ */
+ public AbfsServiceType getFsConfiguredServiceTypeFromUrl() {
+ return fsConfiguredServiceTypeFromUrl;
}
/**
@@ -790,13 +798,9 @@ public void validateConfiguredServiceType(boolean
isHNSEnabled)
if (isHNSEnabled && getConfiguredServiceTypeForFNSAccounts() ==
AbfsServiceType.BLOB) {
throw new InvalidConfigurationValueException(
FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, "Service Type Cannot be BLOB for
HNS Account");
- } else if (isHNSEnabled && fsConfiguredServiceType ==
AbfsServiceType.BLOB) {
+ } else if (isHNSEnabled && fsConfiguredServiceTypeFromUrl ==
AbfsServiceType.BLOB) {
throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY,
"Blob Endpoint Url Cannot be used to initialize filesystem for HNS
Account");
- } else if (getFsConfiguredServiceType() == AbfsServiceType.BLOB
- && getIngressServiceType() == AbfsServiceType.DFS) {
- throw new InvalidConfigurationValueException(
- FS_AZURE_INGRESS_SERVICE_TYPE, INCORRECT_INGRESS_TYPE);
}
}
@@ -1799,6 +1803,14 @@ void setReadAheadEnabled(final boolean enabledReadAhead)
{
this.enabledReadAhead = enabledReadAhead;
}
+ /**
+ * Sets the configured service type.
+ * Used to update the service type identified from the URL.
+ */
+ void setFsConfiguredServiceType(AbfsServiceType serviceType) {
+ this.fsConfiguredServiceTypeFromUrl = serviceType;
+ }
+
public int getReadAheadRange() {
return this.readAheadRange;
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 572bc873b1c..e41f73af776 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -129,6 +129,7 @@
import static
org.apache.hadoop.fs.azurebfs.constants.FSOperationType.CREATE_FILESYSTEM;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DFS_DOMAIN_NAME;
import static
org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static
org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_CREATE_ON_ROOT;
import static
org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_INVALID_ABFS_STATE;
@@ -315,6 +316,18 @@ public void initialize(URI uri, Configuration
configuration)
throw new
InvalidConfigurationValueException(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
}
+ /*
+ * For FNS accounts, restrict the endpoint and service type to Blob
+ * For FNS-DFS, also update the tracing context to add metric to show
endpoint conversion.
+ */
+ if (!tryGetIsNamespaceEnabled(new TracingContext(initFSTracingContext))) {
+ LOG.debug("FNS account detected; restricting service type to Blob.");
+ abfsStore.restrictServiceTypeToBlob();
+ if (uri.toString().contains(ABFS_DFS_DOMAIN_NAME)) {
+ initFSTracingContext.setFNSEndpointConverted();
+ }
+ }
+
// Create the file system if it does not exist.
if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
TracingContext createFSTracingContext = new
TracingContext(initFSTracingContext);
@@ -1861,4 +1874,3 @@ public IOStatistics getIOStatistics() {
return abfsCounters != null ? abfsCounters.getIOStatistics() : null;
}
}
-
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 5d7d0895d02..93f1cfb436c 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -798,9 +798,11 @@ private AbfsOutputStreamContext
populateAbfsOutputStreamContext(
ContextEncryptionAdapter contextEncryptionAdapter,
TracingContext tracingContext) {
int bufferSize = abfsConfiguration.getWriteBufferSize();
+
if (isAppendBlob && bufferSize >
FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) {
bufferSize = FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
}
+
return new
AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
.withWriteBufferSize(bufferSize)
.enableExpectHeader(abfsConfiguration.isExpectHeaderEnabled())
@@ -824,7 +826,7 @@ private AbfsOutputStreamContext
populateAbfsOutputStreamContext(
.withWriteThreadPoolManager(writeThreadPoolSizeManager)
.withTracingContext(tracingContext)
.withAbfsBackRef(fsBackRef)
- .withIngressServiceType(abfsConfiguration.getIngressServiceType())
+ .withIngressServiceType(clientHandler.getIngressServiceType())
.withDFSToBlobFallbackEnabled(abfsConfiguration.isDfsToBlobFallbackEnabled())
.withETag(eTag)
.build();
@@ -1906,6 +1908,17 @@ private AbfsPerfInfo startTracking(String callerName,
String calleeName) {
return new AbfsPerfInfo(abfsPerfTracker, callerName, calleeName);
}
+ /**
+ * Restricts all service types to BLOB when FNS account detected
+ * Updates the client to reflect the new default service type.
+ */
+ public void restrictServiceTypeToBlob() {
+ clientHandler.setDefaultServiceType(AbfsServiceType.BLOB);
+ clientHandler.setIngressServiceType(AbfsServiceType.BLOB);
+ getAbfsConfiguration().setFsConfiguredServiceType(AbfsServiceType.BLOB);
+ this.client = clientHandler.getClient();
+ }
+
/**
* Permissions class contain provided permission and umask in octalNotation.
* If the object is created for namespace-disabled account, the permission
and
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java
index 784365b4c9c..ca362e02faf 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java
@@ -42,6 +42,14 @@ public class MsiTokenProvider extends AccessTokenProvider {
private static final Logger LOG =
LoggerFactory.getLogger(AccessTokenProvider.class);
+ /**
+ * Constructs an MsiTokenProvider.
+ *
+ * @param authEndpoint the authentication endpoint for MSI
+ * @param tenantGuid the tenant GUID
+ * @param clientId the client ID for MSI
+ * @param authority the authority URL
+ */
public MsiTokenProvider(final String authEndpoint, final String tenantGuid,
final String clientId, final String authority) {
this.authEndpoint = authEndpoint;
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 2b21411c147..b6a0e1e1ce2 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -1376,7 +1376,8 @@ String initializeUserAgent(final AbfsConfiguration
abfsConfiguration,
// Add a unique identifier in FNS-Blob user agent string
// Current filesystem init restricts HNS-Blob combination
// so namespace check not required.
- if (abfsConfiguration.getFsConfiguredServiceType() == BLOB) {
+ // We need to rely on URL check to identify Blob service instead of user
config
+ if (abfsConfiguration.getFsConfiguredServiceTypeFromUrl() == BLOB) {
sb.append(SEMICOLON)
.append(SINGLE_WHITE_SPACE)
.append(FNS_BLOB_USER_AGENT_IDENTIFIER);
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java
index 393811c256b..26c18a41459 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java
@@ -91,6 +91,33 @@ private void initServiceType(final AbfsConfiguration
abfsConfiguration) {
this.ingressServiceType = abfsConfiguration.getIngressServiceType();
}
+ /**
+ * Sets the default service type.
+ *
+ * @param defaultServiceType the service type to set as default
+ */
+ public void setDefaultServiceType(AbfsServiceType defaultServiceType) {
+ this.defaultServiceType = defaultServiceType;
+ }
+
+ /**
+ * Sets the ingress service type.
+ *
+ * @param ingressServiceType the ingress service type
+ */
+ public void setIngressServiceType(AbfsServiceType ingressServiceType) {
+ this.ingressServiceType = ingressServiceType;
+ }
+
+ /**
+ * Gets the default ingress service type.
+ *
+ * @return the default ingress service type
+ */
+ public AbfsServiceType getIngressServiceType() {
+ return ingressServiceType;
+ }
+
/**
* Get the AbfsClient based on the default service type.
* @return AbfsClient
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 6c68782c398..883af56c589 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -320,7 +320,11 @@ private AzureIngressHandler
createNewHandler(AbfsServiceType serviceType,
boolean isSwitch,
AzureBlockManager blockManager) throws IOException {
this.client = clientHandler.getClient(serviceType);
- if (isDFSToBlobFallbackEnabled && serviceTypeAtInit !=
AbfsServiceType.DFS) {
+
+ // Check ingress service type is also set to DFS along with enabling the
config for fallback
+ // Separate ingress service type is only allowed for HNS accounts
+ if (isDFSToBlobFallbackEnabled && client.getIsNamespaceEnabled()
+ && serviceTypeAtInit != AbfsServiceType.DFS) {
throw new InvalidConfigurationValueException(
"The ingress service type must be configured as DFS");
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListResponseData.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListResponseData.java
index 87f212034b7..05c587acc44 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListResponseData.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListResponseData.java
@@ -35,6 +35,13 @@ public class ListResponseData {
private AbfsRestOperation executedRestOperation;
private String continuationToken;
+ /**
+ * Default constructor for ListResponseData.
+ */
+ public ListResponseData() {
+ // do nothing
+ }
+
/**
* Returns the list of VersionedFileStatus objects.
* @return the list of VersionedFileStatus objects
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java
index f3c08c4a300..2dc137f7a50 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java
@@ -29,6 +29,10 @@
@InterfaceAudience.Private
@InterfaceStability.Unstable
+
+/**
+ * Interface for listing support in Azure Blob File System.
+ */
public interface ListingSupport {
/**
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/Listener.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/Listener.java
index 859b9474cfe..5f5cd355ca4 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/Listener.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/Listener.java
@@ -32,6 +32,7 @@ public interface Listener {
Listener getClone();
void setOperation(FSOperationType operation);
void updateIngressHandler(String ingressHandler);
+ void updateFNSEndpointConverted();
void updatePosition(String position);
void updateReadType(ReadType readType);
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
index 8decba90b9f..ff3ab405959 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
@@ -67,6 +67,9 @@ public class TracingContext {
//final concatenated ID list set into x-ms-client-request-id header
private String header = EMPTY_STRING;
private String ingressHandler = EMPTY_STRING;
+ private Boolean fnsEndpointConverted = false;
+ // Represents endpoint was converted to Blob for FNS; "T" stands for "True"
+ private String fnsEndptConvertedIndicator = "T";
private String position = EMPTY_STRING; // position of read/write in remote
file
private String metricResults = EMPTY_STRING;
private ReadType readType = ReadType.UNKNOWN_READ;
@@ -148,6 +151,7 @@ public TracingContext(TracingContext
originalTracingContext) {
this.format = originalTracingContext.format;
this.position = originalTracingContext.getPosition();
this.ingressHandler = originalTracingContext.getIngressHandler();
+ this.fnsEndpointConverted = originalTracingContext.fnsEndpointConverted;
this.operatedBlobCount = originalTracingContext.operatedBlobCount;
if (originalTracingContext.listener != null) {
this.listener = originalTracingContext.listener.getClone();
@@ -212,6 +216,7 @@ public void setListener(Listener listener) {
* <li>operatedBlobCount - number of blobs operated on by this request</li>
* <li>operationSpecificHeader - different operation types can publish
info relevant to that operation</li>
* <li>httpOperationHeader - suffix for network library used</li>
+ * <li>fnsEndpointConverted - if endpoint was converted to Blob for FNS
accounts</li>
* </ul>
* @param httpOperation AbfsHttpOperation instance to set header into
* connection
@@ -237,7 +242,8 @@ public void constructHeader(AbfsHttpOperation
httpOperation, String previousFail
+ operatedBlobCount + COLON
+ getOperationSpecificHeader(opType) + COLON
+ httpOperation.getTracingContextSuffix() + COLON
- + metricResults + COLON + resourceUtilizationMetricResults;
+ + metricResults + COLON + resourceUtilizationMetricResults + COLON
+ + (fnsEndpointConverted ? fnsEndptConvertedIndicator : EMPTY_STRING);
break;
case TWO_ID_FORMAT:
header = TracingHeaderVersion.getCurrentVersion() + COLON
@@ -371,6 +377,17 @@ public void setIngressHandler(final String ingressHandler)
{
}
}
+/**
+ * Marks that the endpoint was force converted to Blob for FNS account
+ * Sets the fnsEndpointConverted flag to true and notifies the listener if
present.
+ */
+ public void setFNSEndpointConverted() {
+ this.fnsEndpointConverted = true;
+ if (listener != null) {
+ listener.updateFNSEndpointConverted();
+ }
+ }
+
/**
* Sets the position.
*
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java
index 6ce0299ff89..b4b3fe4c072 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java
@@ -45,9 +45,9 @@ public enum TracingHeaderVersion {
* Schema: version:clientCorrelationId:clientRequestId:fileSystemId
* :primaryRequestId:streamId:opType:retryHeader:ingressHandler
*
:position:operatedBlobCount:operationSpecificHeader:httpOperationHeader
- * :aggregatedMetrics:resourceUtilizationMetrics
+ *
:aggregatedMetrics:resourceUtilizationMetrics:fnsEndptConvertedIndicator
*/
- V2("v2", 15);
+ V2("v2", 16);
private final String versionString;
private final int fieldCount;
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md
b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
index 5c5a134fc5b..c482e786445 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
@@ -802,16 +802,15 @@ requests. User can specify them as fixed SAS Token to be
used across all the req
### User-bound SAS
- **Description**: The user-bound SAS auth type allows to track the usage of
the SAS token generated- something
that was not possible in user-delegation SAS authentication type. Reach out
to us at '[email protected]' for more information.
- To use this authentication type, both custom SAS token provider class (that
implements org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider) as
- well as OAuth 2.0 provider type need to be specified.
- - Refer to 'Shared Access Signature (SAS) Token Provider' section above
for user-delegation SAS token provider class details and example class
implementation.
- - There are multiple identity configurations for OAuth settings. Listing
the main ones below:
+- To use this authentication type, both custom SAS token provider class as
well as OAuth 2.0 provider type need to be specified.
+ - Read the section below for SAS Token Provider class details and example
class implementation.
+ - There are multiple identity configurations for OAuth Token Provider
settings. Listing the main ones below:
- Client Credentials
- Custom token provider
- Managed Identity
- Workload Identity
- Refer to respective OAuth 2.0 sections above to correctly chose the
OAuth provider type
+ Refer to respective OAuth 2.0 sections above to correctly choose the
OAuth provider type
- NOTE: User-bound SAS Authentication is **only supported** with HNS
Enabled accounts.
- **Configuration**: To use this method with ABFS Driver, specify the
following properties in your `core-site.xml` file:
@@ -838,6 +837,57 @@ requests. User can specify them as fixed SAS Token to be
used across all the req
</property>
```
+ - ABFS allows you to implement your custom SAS Token Provider. The
declared class must implement
+ `org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider`.
+ ABFS Hadoop Driver provides
+ a
[MockUserBoundSASTokenProvider](https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockUserBoundSASTokenProvider.java)
+ implementation that can be used as an example on how to implement your
own custom
+ SASTokenProvider. This requires the Application credentials to be
specifed using
+ the following configurations apart from above three:
+ 4. Delegator App Service Principal Tenant Id:
+ ```xml
+ <property>
+ <name>fs.azure.test.app.service.principal.tenant.id</name>
+ <value>DELEGATOR_TENANT_ID</value>
+ </property>
+ ```
+ 5. Delegator App Service Principal Object Id:
+ ```xml
+ <property>
+ <name>fs.azure.test.app.service.principal.object.id</name>
+ <value>DELEGATOR_OBJECT_ID</value>
+ </property>
+ ```
+ 6. End-user App Service Principal Tenant Id:
+ ```xml
+ <property>
+ <name>fs.azure.test.end.user.tenant.id</name>
+ <value>DELEGATED_TENANT_ID</value>
+ </property>
+ ```
+ 7. End-user App Service Principal Object Id:
+ ```xml
+ <property>
+ <name>fs.azure.test.end.user.object.id</name>
+ <value>DELEGATED_OBJECT_ID</value>
+ </property>
+ ```
+ 8. Delegator App Id:
+ ```xml
+ <property>
+ <name>fs.azure.test.app.id</name>
+ <value>APPLICATION_ID</value>
+ </property>
+ ```
+ 9. Delegator App Secret:
+ ```xml
+ <property>
+ <name>fs.azure.test.app.secret</name>
+ <value>APPLICATION_SECRET</value>
+ </property>
+ ```
+ - Add all additional configs required by the chosen OAuth Token provider
from the sections above as well.
+
## <a name="technical"></a> Technical notes
### <a name="proxy"></a> Proxy setup
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
index 557333ff0fd..76bdaaba467 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
@@ -223,9 +223,7 @@ public void testCreateOverDfsAppendOverBlob() throws
IOException {
createPath(makeQualified(testPath).toUri().getPath(), true, false,
permissions, false, null,
null, getTestTracingContext(fs, true));
- fs.getAbfsStore()
- .getAbfsConfiguration()
- .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name());
+
fs.getAbfsStore().getClientHandler().setIngressServiceType(AbfsServiceType.BLOB);
FSDataOutputStream outputStream = fs.append(testPath);
AzureIngressHandler ingressHandler
= ((AbfsOutputStream)
outputStream.getWrappedStream()).getIngressHandler();
@@ -262,9 +260,7 @@ public void testMultipleAppendsQualifyForSwitch() throws
Exception {
createPath(makeQualified(testPath).toUri().getPath(), true, false,
permissions, false, null,
null, getTestTracingContext(fs, true));
- fs.getAbfsStore()
- .getAbfsConfiguration()
- .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name());
+
fs.getAbfsStore().getClientHandler().setIngressServiceType(AbfsServiceType.BLOB);
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Future<?>> futures = new ArrayList<>();
@@ -328,9 +324,7 @@ public void testParallelWritesOnDfsAndBlob() throws
Exception {
createPath(makeQualified(testPath).toUri().getPath(), true, false,
permissions, false, null,
null, getTestTracingContext(fs, true));
- fs.getAbfsStore()
- .getAbfsConfiguration()
- .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name());
+
fs.getAbfsStore().getClientHandler().setIngressServiceType(AbfsServiceType.BLOB);
FSDataOutputStream out1 = fs.create(testPath);
fs.getAbfsStore().getClientHandler().getDfsClient().
createPath(makeQualified(testPath1).toUri().getPath(), true, false,
@@ -389,10 +383,7 @@ public void testCreateOverBlobAppendOverDfs() throws
IOException {
fs.getAbfsStore()
.getAbfsConfiguration()
.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
- fs.getAbfsStore()
- .getAbfsConfiguration()
- .set(FS_AZURE_INGRESS_SERVICE_TYPE,
- String.valueOf(AbfsServiceType.DFS));
+
fs.getAbfsStore().getClientHandler().setIngressServiceType(AbfsServiceType.DFS);
fs.getAbfsStore().getClientHandler().getBlobClient().
createPath(makeQualified(testPath).toUri().getPath(), true, false,
permissions, false, null,
@@ -438,10 +429,7 @@ public void
testCreateAppendBlobOverBlobEndpointAppendOverDfs()
fs.getAbfsStore()
.getAbfsConfiguration()
.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
- fs.getAbfsStore()
- .getAbfsConfiguration()
- .set(FS_AZURE_INGRESS_SERVICE_TYPE,
- String.valueOf(AbfsServiceType.DFS));
+
fs.getAbfsStore().getClientHandler().setIngressServiceType(AbfsServiceType.DFS);
fs.getAbfsStore().getClientHandler().getBlobClient().
createPath(makeQualified(testPath).toUri().getPath(), true, false,
permissions, true, null,
@@ -483,9 +471,7 @@ public void
testCreateAppendBlobOverDfsEndpointAppendOverBlob()
createPath(makeQualified(testPath).toUri().getPath(), true, false,
permissions, true, null,
null, getTestTracingContext(fs, true));
- fs.getAbfsStore()
- .getAbfsConfiguration()
- .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name());
+
fs.getAbfsStore().getClientHandler().setIngressServiceType(AbfsServiceType.BLOB);
FSDataOutputStream outputStream = fs.append(testPath);
AzureIngressHandler ingressHandler
= ((AbfsOutputStream)
outputStream.getWrappedStream()).getIngressHandler();
@@ -507,61 +493,101 @@ public void
testCreateAppendBlobOverDfsEndpointAppendOverBlob()
.isInstanceOf(AbfsDfsClient.class);
}
-
/**
- * Tests the correct retrieval of the AzureIngressHandler based on the
configured ingress service type.
+ * Validates that the correct ingress handler and client are used for the
specified
+ * ingress service type.
*
- * @throws IOException if an I/O error occurs
+ * @param ingressServiceType the ingress service type to test (e.g., DFS
or BLOB)
+ * @param expectedIngressHandler the expected class of the
AzureIngressHandler
+ * @param expectedClient the expected class of the AbfsClient
+ * @throws IOException if an I/O error occurs during validation
*/
- @Test
- public void testValidateIngressHandler() throws IOException {
+ private void validateIngressHandler(AbfsServiceType ingressServiceType,
+ Class<? extends AzureIngressHandler> expectedIngressHandler,
+ Class<? extends AbfsClient> expectedClient)
+ throws IOException {
+
Configuration configuration = getRawConfiguration();
configuration.set(FS_AZURE_INGRESS_SERVICE_TYPE,
- AbfsServiceType.BLOB.name());
- try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
- configuration)) {
+ ingressServiceType.name());
+
+ try (AzureBlobFileSystem fs =
+ (AzureBlobFileSystem) FileSystem.newInstance(configuration)) {
+
Path testPath = path(TEST_FILE_PATH);
- AzureBlobFileSystemStore.Permissions permissions
- = new AzureBlobFileSystemStore.Permissions(false,
- FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
- fs.getAbfsStore().getClientHandler().getBlobClient().
- createPath(makeQualified(testPath).toUri().getPath(), true,
- false,
- permissions, false, null,
- null, getTestTracingContext(fs, true));
+ AzureBlobFileSystemStore.Permissions permissions =
+ new AzureBlobFileSystemStore.Permissions(
+ false,
+ FsPermission.getDefault(),
+ FsPermission.getUMask(fs.getConf()));
+
+ fs.getAbfsStore()
+ .getClientHandler()
+ .getBlobClient()
+ .createPath(
+ makeQualified(testPath).toUri().getPath(),
+ true,
+ false,
+ permissions,
+ false,
+ null,
+ null,
+ getTestTracingContext(fs, true));
+
FSDataOutputStream outputStream = fs.append(testPath);
- AzureIngressHandler ingressHandler
- = ((AbfsOutputStream)
outputStream.getWrappedStream()).getIngressHandler();
+ AzureIngressHandler ingressHandler =
+ ((AbfsOutputStream) outputStream.getWrappedStream())
+ .getIngressHandler();
+
assertThat(ingressHandler)
- .as("Blob Ingress handler instance is not correct")
- .isInstanceOf(AzureBlobIngressHandler.class);
- AbfsClient client = ingressHandler.getClient();
- assertThat(client)
- .as("Blob client was not used correctly")
- .isInstanceOf(AbfsBlobClient.class);
-
- Path testPath1 = new Path("testFile1");
- fs.getAbfsStore().getClientHandler().getBlobClient().
- createPath(makeQualified(testPath1).toUri().getPath(), true,
- false,
- permissions, false, null,
- null, getTestTracingContext(fs, true));
- fs.getAbfsStore()
- .getAbfsConfiguration()
- .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.DFS.name());
- FSDataOutputStream outputStream1 = fs.append(testPath1);
- AzureIngressHandler ingressHandler1
- = ((AbfsOutputStream)
outputStream1.getWrappedStream()).getIngressHandler();
- assertThat(ingressHandler1)
- .as("DFS Ingress handler instance is not correct")
- .isInstanceOf(AzureDFSIngressHandler.class);
- AbfsClient client1 = ingressHandler1.getClient();
- assertThat(client1)
- .as("Dfs client was not used correctly")
- .isInstanceOf(AbfsDfsClient.class);
+ .as("Unexpected ingress handler type")
+ .isInstanceOf(expectedIngressHandler);
+
+ assertThat(ingressHandler.getClient())
+ .as("Unexpected client used by ingress handler")
+ .isInstanceOf(expectedClient);
}
}
+ /**
+ * Validates that for FNS, both DFS and BLOB ingress service types force the
use of
+ * AzureBlobIngressHandler and AbfsBlobClient.
+ *
+ * @throws IOException if an I/O error occurs during validation
+ */
+ @Test
+ public void testValidateIngressHandlerForFNS() throws IOException {
+ assumeHnsDisabled();
+
+ validateIngressHandler(AbfsServiceType.DFS,
+ AzureBlobIngressHandler.class,
+ AbfsBlobClient.class);
+ validateIngressHandler(AbfsServiceType.BLOB,
+ AzureBlobIngressHandler.class,
+ AbfsBlobClient.class);
+ }
+
+
+ /**
+ * Validates that for HNS, the correct ingress handler and client
+ * are used for both DFS and BLOB service types.
+ * For DFS ingress service type, expects AzureDFSIngressHandler and
AbfsDfsClient.
+ * For BLOB ingress service type, expects AzureBlobIngressHandler and
AbfsBlobClient.
+ *
+ * @throws IOException if an I/O error occurs during validation
+ */
+ @Test
+ public void testValidateIngressHandlerForHNS() throws IOException {
+ assumeHnsEnabled();
+
+ validateIngressHandler(AbfsServiceType.DFS,
+ AzureDFSIngressHandler.class,
+ AbfsDfsClient.class);
+ validateIngressHandler(AbfsServiceType.BLOB,
+ AzureBlobIngressHandler.class,
+ AbfsBlobClient.class);
+ }
+
@Test
public void testAppendImplicitDirectory() throws Exception {
assertThrows(FileNotFoundException.class, () -> {
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java
index f6ca7042b1c..355a33441e6 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java
@@ -19,38 +19,39 @@
package org.apache.hadoop.fs.azurebfs;
import java.io.FileNotFoundException;
+import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
-import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.enums.Trilean;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
-import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DOT;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SPLIT_NO_LIMIT;
import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
-import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME;
-import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INGRESS_SERVICE_TYPE;
-import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.accountProperty;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DFS_DOMAIN_NAME;
-import static
org.apache.hadoop.fs.azurebfs.services.AbfsErrors.INCORRECT_INGRESS_TYPE;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION;
+import static
org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.mockito.ArgumentMatchers.any;
+
import org.junit.jupiter.api.Assertions;
/**
@@ -120,33 +121,6 @@ public void testNoGetAclCallOnHnsConfigPresence() throws
Exception {
.getAclStatus(Mockito.anyString(), any(TracingContext.class));
}
- /**
- * Test to verify that the initialization of the AzureBlobFileSystem fails
- * when an invalid ingress service type is configured.
- *
- * This test sets up a configuration with an invalid ingress service type
- * (DFS) for a Blob endpoint and expects an
InvalidConfigurationValueException
- * to be thrown during the initialization of the filesystem.
- *
- * @throws Exception if an error occurs during the test execution
- */
- @Test
- public void testFileSystemInitializationFailsForInvalidIngress() throws
Exception {
- assumeHnsDisabled();
- Configuration configuration = new Configuration(getRawConfiguration());
- String defaultUri = configuration.get(FS_DEFAULT_NAME_KEY);
- String accountKey = configuration.get(
- accountProperty(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, getAccountName()),
- configuration.get(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME));
- configuration.set(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME,
- accountKey.replace(ABFS_DFS_DOMAIN_NAME, ABFS_BLOB_DOMAIN_NAME));
- configuration.set(FS_AZURE_INGRESS_SERVICE_TYPE,
AbfsServiceType.DFS.name());
- String blobUri = defaultUri.replace(ABFS_DFS_DOMAIN_NAME,
ABFS_BLOB_DOMAIN_NAME);
- intercept(InvalidConfigurationValueException.class,
- INCORRECT_INGRESS_TYPE, () ->
- FileSystem.newInstance(new Path(blobUri).toUri(), configuration));
- }
-
@Test
public void testFileSystemInitFailsIfNotAbleToDetermineAccountType() throws
Exception {
AzureBlobFileSystem fs = ((AzureBlobFileSystem) FileSystem.newInstance(
@@ -160,4 +134,54 @@ public void
testFileSystemInitFailsIfNotAbleToDetermineAccountType() throws Exce
FS_AZURE_ACCOUNT_IS_HNS_ENABLED, () ->
mockedFs.initialize(fs.getUri(), getRawConfiguration()));
}
+
+ /**
+ * Test to verify that the fnsEndptConvertedIndicator ("T") is present in
the tracing header
+ * after endpoint conversion during AzureBlobFileSystem initialization.
+ *
+ * @throws Exception if any error occurs during the test
+ */
+ @Test
+ public void testFNSEndptConvertedIndicatorInHeader() throws Exception {
+ assumeHnsDisabled();
+ String scheme = "abfs";
+ String dfsDomain = "dfs.core.windows.net";
+ String endptConversionIndicatorInTc = "T";
+ Configuration conf = new Configuration(getRawConfiguration());
+ conf.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
true);
+
+ String dfsUri = String.format("%s://%s@%s.%s/",
+ scheme, getFileSystemName(),
+ getAccountName().substring(0, getAccountName().indexOf(DOT)),
+ dfsDomain);
+
+ // Initialize filesystem with DFS endpoint
+ try (AzureBlobFileSystem fs =
+ (AzureBlobFileSystem) FileSystem.newInstance(new URI(dfsUri),
conf)) {
+ AzureBlobFileSystem spiedFs = Mockito.spy(fs);
+ AzureBlobFileSystemStore spiedStore =
Mockito.spy(spiedFs.getAbfsStore());
+ AbfsClient spiedClient = Mockito.spy(spiedStore.getClient());
+
+ Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
+ Mockito.doReturn(spiedClient).when(spiedStore).getClient();
+
+ // re-init the FS so the spy wiring is used
+ spiedFs.initialize(fs.getUri(), conf);
+ ArgumentCaptor<TracingContext> ctxCaptor =
ArgumentCaptor.forClass(TracingContext.class);
+ Mockito.verify(spiedClient, Mockito.atLeastOnce())
+ .getFilesystemProperties(ctxCaptor.capture());
+
+ TracingContext captured = ctxCaptor.getValue();
+
+ AbfsHttpOperation abfsHttpOperation =
Mockito.mock(AbfsHttpOperation.class);
+ captured.constructHeader(abfsHttpOperation, null,
+ EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
+
+ // The tracing context being used FS Initialization should have the
endpoint conversion indicator set to 'T'
+ final int endpointConversionIndicatorIndex = 15;
+ String endpointConversionIndicator = captured.getHeader().split(COLON,
SPLIT_NO_LIMIT)[endpointConversionIndicatorIndex ];
+ Assertions.assertFalse(endpointConversionIndicator.isEmpty(), "Endpoint
conversion indicator should be present");
+ Assertions.assertEquals(endptConversionIndicatorInTc,
endpointConversionIndicator, "Endpoint conversion indicator should be 'T'");
+ }
+ }
}
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java
index f02086316a7..9e310a8e9cd 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java
@@ -435,7 +435,7 @@ public void testFsInitShouldSetNamespaceConfig() throws
Exception {
AzureBlobFileSystem mockFileSystem = Mockito.spy((AzureBlobFileSystem)
FileSystem.newInstance(getConfigurationWithoutHnsConfig()));
AzureBlobFileSystemStore mockStore =
Mockito.spy(mockFileSystem.getAbfsStore());
- AbfsClient abfsClient = Mockito.spy(mockStore.getClient());
+ AbfsClient abfsClient =
Mockito.spy(mockStore.getClient(AbfsServiceType.DFS));
Mockito.doReturn(abfsClient).when(mockStore).getClient();
Mockito.doReturn(abfsClient).when(mockStore).getClient(any());
abfsClient.getIsNamespaceEnabled();
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
index 4bf1f56e7ee..1eb21ce9da9 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
@@ -370,9 +370,8 @@ public void verifyUserAgentClusterType() throws Exception {
@Test
// Test to verify the unique identifier in user agent string for FNS-Blob
accounts
- public void verifyUserAgentForFNSBlob() throws Exception {
+ public void verifyUserAgentForFNS() throws Exception {
assumeHnsDisabled();
- assumeBlobServiceType();
final AzureBlobFileSystem fs = getFileSystem();
final AbfsConfiguration configuration = fs.getAbfsStore()
.getAbfsConfiguration();
@@ -386,24 +385,6 @@ public void verifyUserAgentForFNSBlob() throws Exception {
.contains(FNS_BLOB_USER_AGENT_IDENTIFIER);
}
- @Test
- // Test to verify that the user agent string for non-FNS-Blob accounts
- // does not contain the FNS identifier.
- public void verifyUserAgentForDFS() throws Exception {
- assumeDfsServiceType();
- final AzureBlobFileSystem fs = getFileSystem();
- final AbfsConfiguration configuration = fs.getAbfsStore()
- .getAbfsConfiguration();
-
- String userAgentStr = getUserAgentString(configuration, false);
- verifyBasicInfo(userAgentStr);
- Assertions.assertThat(userAgentStr)
- .describedAs(
- "User-Agent string for non-FNS-Blob accounts should not contain"
- + FNS_BLOB_USER_AGENT_IDENTIFIER)
- .doesNotContain(FNS_BLOB_USER_AGENT_IDENTIFIER);
- }
-
public static AbfsClient createTestClientFromCurrentContext(
AbfsClient baseAbfsClientInstance,
AbfsConfiguration abfsConfig) throws IOException, URISyntaxException {
@@ -850,6 +831,39 @@ public void testAuthTypeProviderSetup(AuthType authType)
throws Exception {
fs.close();
}
+ /**
+ * Test to verify that when initializing a filesystem with a DFS endpoint
for a FNS account,
+ * we force to Blob endpoint internally.
+ *
+ * @throws Exception if the test fails
+ */
+ @Test
+ public void testFNSDfsUsesBlobInstance() throws Exception {
+ assumeHnsDisabled();
+ String scheme = "abfs";
+ String dfsDomain = "dfs.core.windows.net";
+ String blobDomain = "blob.core.windows.net";
+ Configuration conf = new Configuration(getRawConfiguration());
+ conf.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
true);
+
+ String dfsUri = String.format("%s://%s@%s.%s/",
+ scheme, getFileSystemName(),
+ getAccountName().substring(0, getAccountName().indexOf(DOT)),
+ dfsDomain);
+
+ // Initialize filesystem with DFS endpoint
+ AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
+ new URI(dfsUri), conf);
+
+ // Filesystem initialization should have forced to use Blob instance for
FNS-DFS
+ AbfsClient abfsClient = fs.getAbfsStore().getClient();
+ Assertions.assertThat(abfsClient)
+ .as("abfsClient should be instance of AbfsBlobClient")
+ .isInstanceOf(AbfsBlobClient.class);
+ Assertions.assertThat(abfsClient.getBaseUrl().toString())
+ .contains(blobDomain);
+ }
+
@Test
public void testIsNonEmptyDirectory() throws IOException {
testIsNonEmptyDirectoryInternal(EMPTY_STRING, true, EMPTY_STRING,
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
index 54521c9c971..152c6a43f4c 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
@@ -21,6 +21,8 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.net.ProtocolException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -30,6 +32,8 @@
import java.util.Random;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -54,6 +58,7 @@
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.http.HttpResponse;
+import org.apache.hadoop.fs.store.DataBlocks;
import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
@@ -593,4 +598,109 @@ public void testChecksumComputedWhenConfigTrue() throws
Exception {
.isEqualTo(1);
}
}
+
+ /**
+ * Tests the selection logic for the DFS-to-Blob fallback handler in
AbfsOutputStream.
+ * Verifies:
+ * For FNS, fallback succeeds regardless of ingress service type.
+ * For HNS with BLOB ingress, fallback fails with
InvalidConfigurationValueException.
+ * For HNS with DFS ingress, fallback succeeds.
+ */
+ @Test
+ public void testDFSToBlobFallbackHandlerSelection() throws Exception {
+ // Common mocks
+ DataBlocks.BlockFactory blockFactory =
Mockito.mock(DataBlocks.BlockFactory.class);
+ AzureBlockManager blockManager = Mockito.mock(AzureBlockManager.class);
+ AbfsClientHandler clientHandler = Mockito.mock(AbfsClientHandler.class);
+ AbfsClient client = Mockito.mock(AbfsClient.class);
+
+ Mockito.when(clientHandler.getClient(Mockito.any())).thenReturn(client);
+
+ Method createNewHandler =
+ AbfsOutputStream.class.getDeclaredMethod(
+ "createNewHandler",
+ AbfsServiceType.class,
+ DataBlocks.BlockFactory.class,
+ int.class,
+ boolean.class,
+ AzureBlockManager.class);
+ createNewHandler.setAccessible(true);
+
+ Field fallbackField =
+
AbfsOutputStream.class.getDeclaredField("isDFSToBlobFallbackEnabled");
+ fallbackField.setAccessible(true);
+
+ Field serviceTypeField =
+ AbfsOutputStream.class.getDeclaredField("serviceTypeAtInit");
+ serviceTypeField.setAccessible(true);
+
+ Field clientHandlerField =
+ AbfsOutputStream.class.getDeclaredField("clientHandler");
+ clientHandlerField.setAccessible(true);
+
+ // FNS case: fallback should succeed regardless of ingress service type
+ // Only setting isDFSToBlobFallbackEnabled config is enough
+ Mockito.when(client.getIsNamespaceEnabled()).thenReturn(false);
+
+ AbfsOutputStream fnsStream =
+ Mockito.mock(AbfsOutputStream.class, Mockito.CALLS_REAL_METHODS);
+
+ fallbackField.set(fnsStream, true);
+ clientHandlerField.set(fnsStream, clientHandler);
+
+ Object fnsHandler =
+ createNewHandler.invoke(
+ fnsStream,
+ AbfsServiceType.BLOB,
+ blockFactory,
+ 1024,
+ false,
+ blockManager);
+
+ Assertions.assertThat(fnsHandler)
+ .as("FNS: fallback should succeed regardless of ingress service
type")
+ .isInstanceOf(AzureDfsToBlobIngressFallbackHandler.class);
+
+ // HNS case: if ingress service type is BLOB, fallback should fail
+ Mockito.when(client.getIsNamespaceEnabled()).thenReturn(true);
+
+ AbfsOutputStream hnsBlobStream =
+ Mockito.mock(AbfsOutputStream.class, Mockito.CALLS_REAL_METHODS);
+
+ fallbackField.set(hnsBlobStream, true);
+ serviceTypeField.set(hnsBlobStream, AbfsServiceType.BLOB);
+ clientHandlerField.set(hnsBlobStream, clientHandler);
+
+ Assertions.assertThatThrownBy(() ->
+ createNewHandler.invoke(
+ hnsBlobStream,
+ AbfsServiceType.BLOB,
+ blockFactory,
+ 1024,
+ false,
+ blockManager))
+ .as("HNS with BLOB ingress should not allow fallback")
+ .hasCauseInstanceOf(InvalidConfigurationValueException.class);
+
+ // HNS case: if ingress service type is DFS, fallback should succeed
+ AbfsOutputStream hnsDfsStream =
+ Mockito.mock(AbfsOutputStream.class, Mockito.CALLS_REAL_METHODS);
+
+ fallbackField.set(hnsDfsStream, true);
+ serviceTypeField.set(hnsDfsStream, AbfsServiceType.DFS);
+ clientHandlerField.set(hnsDfsStream, clientHandler);
+
+ Object hnsHandler =
+ createNewHandler.invoke(
+ hnsDfsStream,
+ AbfsServiceType.DFS,
+ blockFactory,
+ 1024,
+ false,
+ blockManager);
+
+ Assertions.assertThat(hnsHandler)
+ .as("HNS with DFS ingress should allow fallback")
+ .isInstanceOf(AzureDfsToBlobIngressFallbackHandler.class);
+ }
}
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java
index 997818b5f9f..c9f3b34f909 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java
@@ -449,60 +449,6 @@ public void testExistingPathCorrectlyRejected() throws
Exception {
false)));
}
- /**
- * Test that rename recovery remains unsupported for
- * FNS configurations.
- */
- @Test
- public void testRenameRecoveryUnsupportedForFlatNamespace() throws Exception
{
- // In DFS endpoint, renamePath is O(1) API call and idempotency issue can
happen.
- // For blob endpoint, client orchestrates the rename operation.
- assumeDfsServiceType();
- assumeHnsDisabled();
- AzureBlobFileSystem fs = getFileSystem();
- AzureBlobFileSystemStore abfsStore = fs.getAbfsStore();
- TracingContext testTracingContext = getTestTracingContext(fs, false);
-
- AbfsClient mockClient = getMockAbfsClient();
-
- String base = "/" + getMethodName();
- String path1 = base + "/dummyFile1";
- String path2 = base + "/dummyFile2";
-
- touch(new Path(path1));
-
- setAbfsClient(abfsStore, mockClient);
-
- // checking correct count in AbfsCounters
- AbfsCounters counter = mockClient.getAbfsCounters();
- IOStatistics ioStats = counter.getIOStatistics();
-
- Long connMadeBeforeRename = lookupCounterStatistic(ioStats,
CONNECTIONS_MADE.getStatName());
- Long renamePathAttemptsBeforeRename = lookupCounterStatistic(ioStats,
RENAME_PATH_ATTEMPTS.getStatName());
-
- expectErrorCode(SOURCE_PATH_NOT_FOUND,
intercept(AbfsRestOperationException.class, () ->
- mockClient.renamePath(path1, path2, null,
- testTracingContext, null,
- false)));
-
- // validating stat counters after rename
-
- // only 2 calls should have happened in total for rename
- // 1 -> original rename rest call, 2 -> first retry,
- // no getPathStatus calls
- // last getPathStatus call should be skipped
- assertThatStatisticCounter(ioStats,
- CONNECTIONS_MADE.getStatName())
- .isEqualTo(2 + connMadeBeforeRename);
-
- // the RENAME_PATH_ATTEMPTS stat should be incremented by 1
- // retries happen internally within AbfsRestOperation execute()
- // the stat for RENAME_PATH_ATTEMPTS is updated only once before execute()
is called
- assertThatStatisticCounter(ioStats,
- RENAME_PATH_ATTEMPTS.getStatName())
- .isEqualTo(1 + renamePathAttemptsBeforeRename);
- }
-
/**
* Test the resilient commit code works through fault injection, including
* reporting recovery.
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java
index 4bdb3a68ae3..c0e0191014f 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java
@@ -42,6 +42,7 @@ public class TracingHeaderValidator implements Listener {
private static final String GUID_PATTERN =
"^[0-9a-fA-F]{8}-([0-9a-fA-F]{4}-){3}[0-9a-fA-F]{12}$";
private String ingressHandler = null;
+ private boolean fnsEndpointConverted = false;
private String position = String.valueOf(0);
private ReadType readType = ReadType.UNKNOWN_READ;
@@ -61,6 +62,7 @@ public TracingHeaderValidator getClone() {
retryNum, streamID);
tracingHeaderValidator.primaryRequestId = primaryRequestId;
tracingHeaderValidator.ingressHandler = ingressHandler;
+ tracingHeaderValidator.fnsEndpointConverted = fnsEndpointConverted;
tracingHeaderValidator.position = position;
tracingHeaderValidator.readType = readType;
tracingHeaderValidator.operatedBlobCount = operatedBlobCount;
@@ -196,6 +198,11 @@ public void updateIngressHandler(String ingressHandler) {
this.ingressHandler = ingressHandler;
}
+ @Override
+ public void updateFNSEndpointConverted() {
+ this.fnsEndpointConverted = true;
+ }
+
@Override
public void updatePosition(String position) {
this.position = position;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]