rakeshadr commented on code in PR #6879: URL: https://github.com/apache/hadoop/pull/6879#discussion_r1692432341
########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java: ########## @@ -434,10 +445,70 @@ public AbfsConfiguration(final Configuration rawConfig, String accountName) } } + public AbfsConfiguration(final Configuration rawConfig, String accountName) + throws IllegalAccessException, IOException { + this(rawConfig, accountName, AbfsServiceType.DFS); + } + public Trilean getIsNamespaceEnabledAccount() { return Trilean.getTrilean(isNamespaceEnabledAccount); } + /** + * Returns the service type to be used based on the filesystem configuration. + * Precedence is given to service type configured for FNS Accounts using + * "fs.azure.fns.account.service.type". If not configured, then the service + * type identified from url used to initialize filesystem will be used. + * @return the service type. + */ + public AbfsServiceType getFsConfiguredServiceType() { + return getEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, fsConfiguredServiceType); + } + + /** + * Returns the service type configured for FNS Accounts to override the + * service type identified by URL used to initialize the filesystem. + * @return the service type. + */ + public AbfsServiceType getConfiguredServiceTypeForFNSAccounts() { + return getEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, null); + } + + /** + * Returns the service type to be used for Ingress Operations irrespective of account type. + * Default value is the same as the service type configured for the file system. + * @return the service type. + */ + public AbfsServiceType getIngressServiceType() { + return getEnum(FS_AZURE_INGRESS_SERVICE_TYPE, getFsConfiguredServiceType()); + } + + public boolean isDfsToBlobFallbackEnabled() { + return isDfsToBlobFallbackEnabled; + } + + /** + * Checks if the service type configured is valid for account type used. + * HNS Enabled accounts cannot have service type as BLOB. + * @param isHNSEnabled Flag to indicate if HNS is enabled for the account. + * @throws InvalidConfigurationValueException if the service type is invalid. + */ + public void validateConfiguredServiceType(boolean isHNSEnabled) + throws InvalidConfigurationValueException { + // Todo: [FnsOverBlob] - Remove this check, Failing FS Init with Blob Endpoint Until FNS over Blob is ready. + if (getFsConfiguredServiceType() == AbfsServiceType.BLOB) { + throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY, + "Blob Endpoint Support not yet available"); + } + if (isHNSEnabled && getConfiguredServiceTypeForFNSAccounts() == AbfsServiceType.BLOB) { + throw new InvalidConfigurationValueException( + FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, "Cannot be BLOB for HNS Account"); + } else if (isHNSEnabled && fsConfiguredServiceType == AbfsServiceType.BLOB) { + throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY, Review Comment: Please add test case for this condition ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java: ########## @@ -434,10 +445,70 @@ public AbfsConfiguration(final Configuration rawConfig, String accountName) } } + public AbfsConfiguration(final Configuration rawConfig, String accountName) + throws IllegalAccessException, IOException { + this(rawConfig, accountName, AbfsServiceType.DFS); + } + public Trilean getIsNamespaceEnabledAccount() { return Trilean.getTrilean(isNamespaceEnabledAccount); } + /** + * Returns the service type to be used based on the filesystem configuration. + * Precedence is given to service type configured for FNS Accounts using + * "fs.azure.fns.account.service.type". If not configured, then the service + * type identified from url used to initialize filesystem will be used. + * @return the service type. + */ + public AbfsServiceType getFsConfiguredServiceType() { + return getEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, fsConfiguredServiceType); + } + + /** + * Returns the service type configured for FNS Accounts to override the + * service type identified by URL used to initialize the filesystem. + * @return the service type. + */ + public AbfsServiceType getConfiguredServiceTypeForFNSAccounts() { + return getEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, null); + } + + /** + * Returns the service type to be used for Ingress Operations irrespective of account type. + * Default value is the same as the service type configured for the file system. + * @return the service type. + */ + public AbfsServiceType getIngressServiceType() { + return getEnum(FS_AZURE_INGRESS_SERVICE_TYPE, getFsConfiguredServiceType()); + } + + public boolean isDfsToBlobFallbackEnabled() { Review Comment: pls add javadoc ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java: ########## @@ -1059,203 +666,44 @@ public boolean appendSuccessCheckOp(AbfsRestOperation op, final String path, return false; } - public AbfsRestOperation flush(final String path, final long position, + public abstract AbfsRestOperation flush(String path, long position, boolean retainUncommittedData, boolean isClose, - final String cachedSasToken, final String leaseId, + String cachedSasToken, String leaseId, ContextEncryptionAdapter contextEncryptionAdapter, TracingContext tracingContext) - throws AzureBlobFileSystemException { - final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); - addEncryptionKeyRequestHeaders(path, requestHeaders, false, - contextEncryptionAdapter, tracingContext); - // JDK7 does not support PATCH, so to workaround the issue we will use - // PUT and specify the real method in the X-Http-Method-Override header. - requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, - HTTP_METHOD_PATCH)); - if (leaseId != null) { - requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); - } - - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position)); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, String.valueOf(retainUncommittedData)); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose)); - - // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance - String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION, - abfsUriQueryBuilder, cachedSasToken); - - final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.Flush, - HTTP_METHOD_PUT, - url, - requestHeaders, sasTokenForReuse); - op.execute(tracingContext); - return op; - } - - public AbfsRestOperation setPathProperties(final String path, final String properties, - final TracingContext tracingContext, final ContextEncryptionAdapter contextEncryptionAdapter) - throws AzureBlobFileSystemException { - final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); - addEncryptionKeyRequestHeaders(path, requestHeaders, false, - contextEncryptionAdapter, tracingContext); - // JDK7 does not support PATCH, so to workaround the issue we will use - // PUT and specify the real method in the X-Http-Method-Override header. - requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, - HTTP_METHOD_PATCH)); - - requestHeaders.add(new AbfsHttpHeader(X_MS_PROPERTIES, properties)); + throws AzureBlobFileSystemException; - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, SET_PROPERTIES_ACTION); - appendSASTokenToQuery(path, SASTokenProvider.SET_PROPERTIES_OPERATION, abfsUriQueryBuilder); - - final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.SetPathProperties, - HTTP_METHOD_PUT, - url, - requestHeaders); - op.execute(tracingContext); - return op; - } - - public AbfsRestOperation getPathStatus(final String path, - final boolean includeProperties, final TracingContext tracingContext, - final ContextEncryptionAdapter contextEncryptionAdapter) - throws AzureBlobFileSystemException { - final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - String operation = SASTokenProvider.GET_PROPERTIES_OPERATION; - if (!includeProperties) { - // The default action (operation) is implicitly to get properties and this action requires read permission - // because it reads user defined properties. If the action is getStatus or getAclStatus, then - // only traversal (execute) permission is required. - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_STATUS); - operation = SASTokenProvider.GET_STATUS_OPERATION; - } else { - addEncryptionKeyRequestHeaders(path, requestHeaders, false, - contextEncryptionAdapter, - tracingContext); - } - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(abfsConfiguration.isUpnUsed())); - appendSASTokenToQuery(path, operation, abfsUriQueryBuilder); - - final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.GetPathStatus, - HTTP_METHOD_HEAD, - url, - requestHeaders); - op.execute(tracingContext); - return op; - } - - public AbfsRestOperation read(final String path, - final long position, - final byte[] buffer, - final int bufferOffset, - final int bufferLength, - final String eTag, + public abstract AbfsRestOperation flush(byte[] buffer, + String path, + boolean isClose, + String cachedSasToken, + String leaseId, + String eTag, + TracingContext tracingContext) throws AzureBlobFileSystemException; + + public abstract AbfsRestOperation setPathProperties(String path, Hashtable<String, String> properties, + TracingContext tracingContext, ContextEncryptionAdapter contextEncryptionAdapter) + throws AzureBlobFileSystemException; + + public abstract AbfsRestOperation getPathStatus(String path, Review Comment: Please add javadoc for all the abstract methods. It would help to give more idea about its functionality. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java: ########## @@ -169,6 +173,24 @@ public static String getMaskedUrl(URL url) { return url.toString().replace(queryString, maskedQueryString); } + public static URL changeUrlFromBlobToDfs(URL url) throws InvalidUriException { Review Comment: Please add javadoc here as well. Its public function. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java: ########## @@ -1810,18 +1804,28 @@ private void initializeClient(URI uri, String fileSystemName, LOG.trace("Initializing AbfsClient for {}", baseUrl); if (tokenProvider != null) { - this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, + this.clientHandler = new AbfsClientHandler(baseUrl, creds, abfsConfiguration, tokenProvider, encryptionContextProvider, populateAbfsClientContext()); } else { - this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, + this.clientHandler = new AbfsClientHandler(baseUrl, creds, abfsConfiguration, sasTokenProvider, encryptionContextProvider, populateAbfsClientContext()); } + this.client = getClientHandler().getClient(); LOG.trace("AbfsClient init complete"); } + private AbfsServiceType identifyAbfsServiceTypeFromUrl() { Review Comment: Please rename method name #identifyAbfsServiceTypeFromUrl to #getAbfsServiceTypeFromUrl. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java: ########## @@ -213,6 +217,23 @@ public void initialize(URI uri, Configuration configuration) TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, listener); + + /* + * Validate the service type configured in the URI is valid for account type used. + * HNS Account Cannot have Blob Endpoint URI. + */ + try { + abfsConfiguration.validateConfiguredServiceType( + tryGetIsNamespaceEnabled(new TracingContext(tracingContext))); + } catch (InvalidConfigurationValueException ex) { + LOG.debug("File system configured with Invalid Service Type", ex); + throw ex; + } catch (AzureBlobFileSystemException ex) { + LOG.debug("Enable to determine account type for service type validation", ex); Review Comment: Please add test case for this case as well. Thanks! ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java: ########## @@ -434,10 +445,70 @@ public AbfsConfiguration(final Configuration rawConfig, String accountName) } } + public AbfsConfiguration(final Configuration rawConfig, String accountName) + throws IllegalAccessException, IOException { + this(rawConfig, accountName, AbfsServiceType.DFS); + } + public Trilean getIsNamespaceEnabledAccount() { return Trilean.getTrilean(isNamespaceEnabledAccount); } + /** + * Returns the service type to be used based on the filesystem configuration. + * Precedence is given to service type configured for FNS Accounts using + * "fs.azure.fns.account.service.type". If not configured, then the service + * type identified from url used to initialize filesystem will be used. + * @return the service type. + */ + public AbfsServiceType getFsConfiguredServiceType() { + return getEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, fsConfiguredServiceType); + } + + /** + * Returns the service type configured for FNS Accounts to override the + * service type identified by URL used to initialize the filesystem. + * @return the service type. + */ + public AbfsServiceType getConfiguredServiceTypeForFNSAccounts() { + return getEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, null); + } + + /** + * Returns the service type to be used for Ingress Operations irrespective of account type. + * Default value is the same as the service type configured for the file system. + * @return the service type. + */ + public AbfsServiceType getIngressServiceType() { + return getEnum(FS_AZURE_INGRESS_SERVICE_TYPE, getFsConfiguredServiceType()); + } + + public boolean isDfsToBlobFallbackEnabled() { + return isDfsToBlobFallbackEnabled; + } + + /** + * Checks if the service type configured is valid for account type used. + * HNS Enabled accounts cannot have service type as BLOB. + * @param isHNSEnabled Flag to indicate if HNS is enabled for the account. + * @throws InvalidConfigurationValueException if the service type is invalid. + */ + public void validateConfiguredServiceType(boolean isHNSEnabled) + throws InvalidConfigurationValueException { + // Todo: [FnsOverBlob] - Remove this check, Failing FS Init with Blob Endpoint Until FNS over Blob is ready. + if (getFsConfiguredServiceType() == AbfsServiceType.BLOB) { + throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY, + "Blob Endpoint Support not yet available"); + } + if (isHNSEnabled && getConfiguredServiceTypeForFNSAccounts() == AbfsServiceType.BLOB) { + throw new InvalidConfigurationValueException( Review Comment: Please add test case for this condition ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java: ########## @@ -1400,6 +1418,29 @@ private FileStatus tryGetFileStatus(final Path f, TracingContext tracingContext) } } + private boolean tryGetIsNamespaceEnabled(TracingContext tracingContext) + throws AzureBlobFileSystemException{ + try { + return getIsNamespaceEnabled(tracingContext); + } catch (AbfsRestOperationException ex) { + /* + * Exception will be thrown for any non 400 error code. + * If status code is in 4xx range, it means it's an HNS account. + * If status code is in 5xx range, it means nothing can be inferred. + * In case of network errors status code will be -1. + */ + int statusCode = ex.getStatusCode(); + if (statusCode > HTTP_BAD_REQUEST && statusCode < HTTP_INTERNAL_ERROR) { + LOG.debug("getNamespace failed with non 400 user error", ex); + statIncrement(ERROR_IGNORED); + return true; + } + throw ex; + } catch (AzureBlobFileSystemException ex) { Review Comment: As 'AzureBlobFileSystemException' already defined in the throws clause, this catch block is not required, right? ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java: ########## @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.net.URL; + +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; +import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; +import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider; +import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; + +import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.changeUrlFromBlobToDfs; +import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.changeUrlFromDfsToBlob; + +/** + * AbfsClientHandler is a class that provides a way to get the AbfsClient + * based on the service type. + */ +public class AbfsClientHandler { + + private AbfsServiceType defaultServiceType; + private AbfsServiceType ingressServiceType; + private final AbfsDfsClient dfsAbfsClient; + + public AbfsClientHandler(final URL baseUrl, + final SharedKeyCredentials sharedKeyCredentials, + final AbfsConfiguration abfsConfiguration, + final AccessTokenProvider tokenProvider, + final EncryptionContextProvider encryptionContextProvider, + final AbfsClientContext abfsClientContext) throws IOException { + this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials, + abfsConfiguration, tokenProvider, null, encryptionContextProvider, + abfsClientContext); + initServiceType(abfsConfiguration); + } + + public AbfsClientHandler(final URL baseUrl, + final SharedKeyCredentials sharedKeyCredentials, + final AbfsConfiguration abfsConfiguration, + final SASTokenProvider sasTokenProvider, + final EncryptionContextProvider encryptionContextProvider, + final AbfsClientContext abfsClientContext) throws IOException { + this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials, + abfsConfiguration, null, sasTokenProvider, encryptionContextProvider, + abfsClientContext); + initServiceType(abfsConfiguration); + } + + private void initServiceType(final AbfsConfiguration abfsConfiguration) { + this.defaultServiceType = abfsConfiguration.getFsConfiguredServiceType(); + this.ingressServiceType = abfsConfiguration.getIngressServiceType(); + } + + public AbfsClient getClient() { + return getClient(defaultServiceType); + } + + public AbfsClient getIngressClient() { Review Comment: Pls remove getIngressClient() function, as its not used anywhere. Later you can add in the followup task, if required. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java: ########## @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.net.URL; + +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; +import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; +import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider; +import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; + +import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.changeUrlFromBlobToDfs; +import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.changeUrlFromDfsToBlob; + +/** + * AbfsClientHandler is a class that provides a way to get the AbfsClient + * based on the service type. + */ +public class AbfsClientHandler { + + private AbfsServiceType defaultServiceType; + private AbfsServiceType ingressServiceType; + private final AbfsDfsClient dfsAbfsClient; + + public AbfsClientHandler(final URL baseUrl, + final SharedKeyCredentials sharedKeyCredentials, + final AbfsConfiguration abfsConfiguration, + final AccessTokenProvider tokenProvider, + final EncryptionContextProvider encryptionContextProvider, + final AbfsClientContext abfsClientContext) throws IOException { + this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials, + abfsConfiguration, tokenProvider, null, encryptionContextProvider, + abfsClientContext); + initServiceType(abfsConfiguration); + } + + public AbfsClientHandler(final URL baseUrl, + final SharedKeyCredentials sharedKeyCredentials, + final AbfsConfiguration abfsConfiguration, + final SASTokenProvider sasTokenProvider, + final EncryptionContextProvider encryptionContextProvider, + final AbfsClientContext abfsClientContext) throws IOException { + this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials, Review Comment: Can we move initServiceType(abfsConfiguration); function call inside #createDfsClient() ? ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java: ########## @@ -0,0 +1,1308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.Closeable; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; +import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; +import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider; +import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; +import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter; +import org.apache.hadoop.fs.azurebfs.utils.Base64; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; + +import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ACQUIRE_LEASE_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_BLOB_TYPE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_JSON; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_OCTET_STREAM; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BREAK_LEASE_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHECK_ACCESS; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COMMA; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_LEASE_BREAK_PERIOD; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILESYSTEM; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FLUSH_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.GET_ACCESS_CONTROL; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.GET_STATUS; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.RELEASE_LEASE_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.RENEW_LEASE_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SET_ACCESS_CONTROL; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SET_PROPERTIES_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.STAR; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_ASCII; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_MATCH; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_NONE_MATCH; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.RANGE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.USER_AGENT; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_EXISTING_RESOURCE_TYPE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_BREAK_PERIOD; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_DURATION; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_ID; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_PROPERTIES; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_RANGE_GET_CONTENT_MD5; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_RENAME_SOURCE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_FS_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_BLOBTYPE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_CLOSE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_CONTINUATION; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_DIRECTORY; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_FLUSH; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_MAXRESULTS; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_PAGINATED; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RECURSIVE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RESOURCE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RETAIN_UNCOMMITTED_DATA; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND; + +/** + * AbfsClient interacting with the DFS Endpoint. + */ +public class AbfsDfsClient extends AbfsClient implements Closeable { + + public AbfsDfsClient(final URL baseUrl, + final SharedKeyCredentials sharedKeyCredentials, + final AbfsConfiguration abfsConfiguration, + final AccessTokenProvider tokenProvider, + final EncryptionContextProvider encryptionContextProvider, + final AbfsClientContext abfsClientContext) throws IOException { + super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider, + encryptionContextProvider, abfsClientContext); + } + + public AbfsDfsClient(final URL baseUrl, + final SharedKeyCredentials sharedKeyCredentials, + final AbfsConfiguration abfsConfiguration, + final SASTokenProvider sasTokenProvider, + final EncryptionContextProvider encryptionContextProvider, + final AbfsClientContext abfsClientContext) throws IOException { + super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider, + encryptionContextProvider, abfsClientContext); + } + + @Override + public void close() throws IOException { + super.close(); + } + + /** + * Create request headers for Rest Operation using the default API version. + * @return default request headers. + */ + @Override + public List<AbfsHttpHeader> createDefaultHeaders() { + return this.createDefaultHeaders(getxMsVersion()); + } + + /** + * Create request headers for Rest Operation using the specified API version. + * DFS Endpoint API responses are in JSON/Stream format. + * @param xMsVersion API version to be used. + * @return default request headers. + */ + @Override + public List<AbfsHttpHeader> createDefaultHeaders(ApiVersion xMsVersion) { + List<AbfsHttpHeader> requestHeaders = super.createCommonHeaders(xMsVersion); + requestHeaders.add(new AbfsHttpHeader(ACCEPT, APPLICATION_JSON + + COMMA + SINGLE_WHITE_SPACE + APPLICATION_OCTET_STREAM)); + return requestHeaders; + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/filesystem/create"> + * Filesystem - Create</a>. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation createFilesystem(TracingContext tracingContext) + throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); + + final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.CreateFileSystem, + HTTP_METHOD_PUT, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/filesystem/set-properties"> + * Filesystem - Set Properties</a>. + * @param properties comma separated list of metadata key-value pairs. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation setFilesystemProperties(final Hashtable<String, String> properties, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final String commaSeparatedProperties; + try { + commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties); + } catch (CharacterCodingException ex) { + throw new InvalidAbfsRestOperationException(ex); + } + + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + // JDK7 does not support PATCH, so to work around the issue we will use + // PUT and specify the real method in the X-Http-Method-Override header. + requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, + HTTP_METHOD_PATCH)); + requestHeaders.add(new AbfsHttpHeader(X_MS_PROPERTIES, commaSeparatedProperties)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); + + final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.SetFileSystemProperties, + HTTP_METHOD_PUT, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/filesystem/get-properties"> + * Filesystem - Get Properties</a>. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + * */ + @Override + public AbfsRestOperation getFilesystemProperties(TracingContext tracingContext) + throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); + + final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.GetFileSystemProperties, + HTTP_METHOD_HEAD, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/filesystem/delete"> + * Filesystem - Delete</a>. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation deleteFilesystem(TracingContext tracingContext) + throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); + + final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.DeleteFileSystem, + HTTP_METHOD_DELETE, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/list"> + * Filesystem - List</a>. + * List paths and their properties in the current filesystem. + * @param relativePath to return only blobs within this directory. + * @param recursive to return all blobs in the path, including those in subdirectories. + * @param listMaxResults maximum number of blobs to return. + * @param continuation marker to specify the continuation token. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation or response parsing fails. + */ + @Override + public AbfsRestOperation listPath(final String relativePath, + final boolean recursive, + final int listMaxResults, + final String continuation, + TracingContext tracingContext) throws IOException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, + getDirectoryQueryParameter(relativePath)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAXRESULTS, + String.valueOf(listMaxResults)); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, + String.valueOf(getAbfsConfiguration().isUpnUsed())); + appendSASTokenToQuery(relativePath, SASTokenProvider.LIST_OPERATION, + abfsUriQueryBuilder); + + final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.ListPaths, + HTTP_METHOD_GET, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create"> + * Path - Create</a>. + * Create a path (file or directory) in the current filesystem. + * @param path to be created inside the filesystem. + * @param isFile to specify if the created path is file or directory. + * @param overwrite to specify if the path should be overwritten if it already exists. + * @param permissions to specify the permissions of the path. + * @param isAppendBlob to specify if the path to be created is an append blob. + * @param eTag to specify conditional headers. + * @param contextEncryptionAdapter to provide encryption context. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation createPath(final String path, + final boolean isFile, + final boolean overwrite, + final AzureBlobFileSystemStore.Permissions permissions, + final boolean isAppendBlob, + final String eTag, + final ContextEncryptionAdapter contextEncryptionAdapter, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + if (isFile) { + addEncryptionKeyRequestHeaders(path, requestHeaders, true, + contextEncryptionAdapter, tracingContext); + } + if (!overwrite) { + requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR)); + } + + if (permissions.hasPermission()) { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS, + permissions.getPermission())); + } + + if (permissions.hasUmask()) { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_UMASK, + permissions.getUmask())); + } + + if (eTag != null && !eTag.isEmpty()) { + requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); + } + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY); + if (isAppendBlob) { + abfsUriQueryBuilder.addQuery(QUERY_PARAM_BLOBTYPE, APPEND_BLOB_TYPE); + } + + String operation = isFile + ? SASTokenProvider.CREATE_FILE_OPERATION + : SASTokenProvider.CREATE_DIRECTORY_OPERATION; + appendSASTokenToQuery(path, operation, abfsUriQueryBuilder); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.CreatePath, + HTTP_METHOD_PUT, url, requestHeaders); + try { + op.execute(tracingContext); + } catch (AzureBlobFileSystemException ex) { + // If we have no HTTP response, throw the original exception. + if (!op.hasResult()) { + throw ex; + } + if (!isFile && op.getResult().getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { + String existingResource = + op.getResult().getResponseHeader(X_MS_EXISTING_RESOURCE_TYPE); + if (existingResource != null && existingResource.equals(DIRECTORY)) { + return op; //don't throw ex on mkdirs for existing directory + } + } + throw ex; + } + return op; + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/lease"> + * Path - Lease</a>. + * Acquire lease on specified path. + * @param path on which lease has to be acquired. + * @param duration for which lease has to be acquired. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation acquireLease(final String path, final int duration, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, ACQUIRE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_DURATION, Integer.toString(duration))); + requestHeaders.add(new AbfsHttpHeader(X_MS_PROPOSED_LEASE_ID, + UUID.randomUUID().toString())); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.LeasePath, + HTTP_METHOD_POST, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/lease"> + * Path - Lease</a>. + * Renew lease on specified path. + * @param path on which lease has to be renewed. + * @param leaseId of the lease to be renewed. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation renewLease(final String path, final String leaseId, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RENEW_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.LeasePath, + HTTP_METHOD_POST, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/lease"> + * Path - Lease</a>. + * Release lease on specified path. + * @param path on which lease has to be released. + * @param leaseId of the lease to be released. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation releaseLease(final String path, final String leaseId, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.LeasePath, + HTTP_METHOD_POST, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/lease"> + * Path - Lease</a>. + * Break lease on specified path. + * @param path on which lease has to be broke. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation breakLease(final String path, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, BREAK_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_BREAK_PERIOD, + DEFAULT_LEASE_BREAK_PERIOD)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.LeasePath, + HTTP_METHOD_POST, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create"> + * Path - Create</a>. + * @param source path to source file + * @param destination destination of rename. + * @param continuation continuation. + * @param tracingContext for tracing the server calls. + * @param sourceEtag etag of source file. may be null or empty + * @param isMetadataIncompleteState was there a rename failure due to incomplete metadata state? + * @param isNamespaceEnabled whether namespace enabled account or not + * @return executed rest operation containing response from server. + * @throws IOException if rest operation fails. + */ + @Override + public AbfsClientRenameResult renamePath( + final String source, + final String destination, + final String continuation, + final TracingContext tracingContext, + String sourceEtag, + boolean isMetadataIncompleteState, + boolean isNamespaceEnabled) throws IOException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + + final boolean hasEtag = !isEmpty(sourceEtag); + + boolean shouldAttemptRecovery = isRenameResilience() && isNamespaceEnabled; + if (!hasEtag && shouldAttemptRecovery) { + // in case eTag is already not supplied to the API + // and rename resilience is expected and it is an HNS enabled account + // fetch the source etag to be used later in recovery + try { + final AbfsRestOperation srcStatusOp = getPathStatus(source, + false, tracingContext, null); + if (srcStatusOp.hasResult()) { + final AbfsHttpOperation result = srcStatusOp.getResult(); + sourceEtag = extractEtagHeader(result); + // and update the directory status. + boolean isDir = checkIsDir(result); + shouldAttemptRecovery = !isDir; + LOG.debug( + "Retrieved etag of source for rename recovery: {}; isDir={}", + sourceEtag, isDir); + } + } catch (AbfsRestOperationException e) { + throw new AbfsRestOperationException(e.getStatusCode(), + SOURCE_PATH_NOT_FOUND.getErrorCode(), + e.getMessage(), e); + } + + } + + String encodedRenameSource = urlEncode( + FORWARD_SLASH + this.getFileSystem() + source); + if (getAuthType() == AuthType.SAS) { + final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder(); + appendSASTokenToQuery(source, SASTokenProvider.RENAME_SOURCE_OPERATION, + srcQueryBuilder); + encodedRenameSource += srcQueryBuilder.toString(); + } + + LOG.trace("Rename source queryparam added {}", encodedRenameSource); + requestHeaders.add(new AbfsHttpHeader(X_MS_RENAME_SOURCE, encodedRenameSource)); + requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); + appendSASTokenToQuery(destination, + SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder); + + final URL url = createRequestUrl(destination, + abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = createRenameRestOperation(url, requestHeaders); + try { + incrementAbfsRenamePath(); + op.execute(tracingContext); + // AbfsClientResult contains the AbfsOperation, If recovery happened or + // not, and the incompleteMetaDataState is true or false. + // If we successfully rename a path and isMetadataIncompleteState was + // true, then rename was recovered, else it didn't, this is why + // isMetadataIncompleteState is used for renameRecovery(as the 2nd param). + return new AbfsClientRenameResult(op, isMetadataIncompleteState, + isMetadataIncompleteState); + } catch (AzureBlobFileSystemException e) { + // If we have no HTTP response, throw the original exception. + if (!op.hasResult()) { + throw e; + } + + // ref: HADOOP-18242. Rename failure occurring due to a rare case of + // tracking metadata being in incomplete state. + if (op.getResult().getStorageErrorCode() + .equals(RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode()) + && !isMetadataIncompleteState) { + //Logging + ABFS_METADATA_INCOMPLETE_RENAME_FAILURE + .info( + "Rename Failure attempting to resolve tracking metadata state and retrying."); + // rename recovery should be attempted in this case also + shouldAttemptRecovery = true; + isMetadataIncompleteState = true; + String sourceEtagAfterFailure = sourceEtag; + if (isEmpty(sourceEtagAfterFailure)) { + // Doing a HEAD call resolves the incomplete metadata state and + // then we can retry the rename operation. + AbfsRestOperation sourceStatusOp = getPathStatus(source, false, + tracingContext, null); + isMetadataIncompleteState = true; + // Extract the sourceEtag, using the status Op, and set it + // for future rename recovery. + AbfsHttpOperation sourceStatusResult = sourceStatusOp.getResult(); + sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult); + } + renamePath(source, destination, continuation, tracingContext, + sourceEtagAfterFailure, isMetadataIncompleteState, + isNamespaceEnabled); + } + // if we get out of the condition without a successful rename, then + // it isn't metadata incomplete state issue. + isMetadataIncompleteState = false; + + // setting default rename recovery success to false + boolean etagCheckSucceeded = false; + if (shouldAttemptRecovery) { + etagCheckSucceeded = renameIdempotencyCheckOp( + source, + sourceEtag, op, destination, tracingContext); + } + if (!etagCheckSucceeded) { + // idempotency did not return different result + // throw back the exception + throw e; + } + return new AbfsClientRenameResult(op, true, isMetadataIncompleteState); + } + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update"> + * Path - Update</a>. + * Uploads data to be appended to a file. + * @param path to which data has to be appended. + * @param buffer containing data to be appended. + * @param reqParams containing parameters for append operation like offset, length etc. + * @param cachedSasToken to be used for the authenticating operation. + * @param contextEncryptionAdapter to provide encryption context. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation append(final String path, + final byte[] buffer, + AppendRequestParameters reqParams, + final String cachedSasToken, + ContextEncryptionAdapter contextEncryptionAdapter, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + addEncryptionKeyRequestHeaders(path, requestHeaders, false, + contextEncryptionAdapter, tracingContext); + if (reqParams.isExpectHeaderEnabled()) { + requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE)); + } + // JDK7 does not support PATCH, so to workaround the issue we will use + // PUT and specify the real method in the X-Http-Method-Override header. + requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH)); + if (reqParams.getLeaseId() != null) { + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, reqParams.getLeaseId())); + } + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, + Long.toString(reqParams.getPosition())); + + if ((reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_MODE) || ( + reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE)) { + abfsUriQueryBuilder.addQuery(QUERY_PARAM_FLUSH, TRUE); + if (reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE) { + abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, TRUE); + } + } + + // Check if the retry is with "Expect: 100-continue" header being present in the previous request. + if (reqParams.isRetryDueToExpect()) { + String userAgentRetry = getUserAgent(); + // Remove the specific marker related to "Expect: 100-continue" from the User-Agent string. + userAgentRetry = userAgentRetry.replace(HUNDRED_CONTINUE_USER_AGENT, EMPTY_STRING); + requestHeaders.removeIf(header -> header.getName().equalsIgnoreCase(USER_AGENT)); + requestHeaders.add(new AbfsHttpHeader(USER_AGENT, userAgentRetry)); + } + + // Add MD5 Hash of request content as request header if feature is enabled + if (isChecksumValidationEnabled()) { + addCheckSumHeaderForWrite(requestHeaders, reqParams, buffer); + } + + // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance + String sasTokenForReuse = appendSASTokenToQuery(path, + SASTokenProvider.WRITE_OPERATION, + abfsUriQueryBuilder, cachedSasToken); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.Append, + HTTP_METHOD_PUT, url, requestHeaders, + buffer, reqParams.getoffset(), reqParams.getLength(), + sasTokenForReuse); + try { + op.execute(tracingContext); + } catch (AbfsRestOperationException e) { + /* + If the http response code indicates a user error we retry + the same append request with expect header being disabled. + When "100-continue" header is enabled but a non Http 100 response comes, + the response message might not get set correctly by the server. + So, this handling is to avoid breaking of backward compatibility + if someone has taken dependency on the exception message, + which is created using the error string present in the response header. + */ + int responseStatusCode = e.getStatusCode(); + if (checkUserError(responseStatusCode) + && reqParams.isExpectHeaderEnabled()) { + LOG.debug( + "User error, retrying without 100 continue enabled for the given path {}", + path); + reqParams.setExpectHeaderEnabled(false); + reqParams.setRetryDueToExpect(true); + return this.append(path, buffer, reqParams, cachedSasToken, + contextEncryptionAdapter, tracingContext); + } + // If we have no HTTP response, throw the original exception. + if (!op.hasResult()) { + throw e; + } + + if (isMd5ChecksumError(e)) { + throw new AbfsInvalidChecksumException(e); + } + + if (reqParams.isAppendBlob() + && appendSuccessCheckOp(op, path, + (reqParams.getPosition() + reqParams.getLength()), tracingContext)) { + final AbfsRestOperation successOp = getAbfsRestOperation( + AbfsRestOperationType.Append, + HTTP_METHOD_PUT, url, requestHeaders, + buffer, reqParams.getoffset(), reqParams.getLength(), + sasTokenForReuse); + successOp.hardSetResult(HttpURLConnection.HTTP_OK); + return successOp; + } + throw e; + } catch (AzureBlobFileSystemException e) { + // Any server side issue will be returned as AbfsRestOperationException and will be handled above. + LOG.debug( + "Append request failed with non server issues for path: {}, offset: {}, position: {}", + path, reqParams.getoffset(), reqParams.getPosition()); + throw e; + } + + return op; + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update"> + * Path - Update</a>. + * Flush previously uploaded data to a file. + * @param path on which data has to be flushed. + * @param position to which data has to be flushed. + * @param retainUncommittedData whether to retain uncommitted data after flush. + * @param isClose specify if this is the last flush to the file. + * @param cachedSasToken to be used for the authenticating operation. + * @param leaseId if there is an active lease on the path. + * @param contextEncryptionAdapter to provide encryption context. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation flush(final String path, + final long position, + boolean retainUncommittedData, + boolean isClose, + final String cachedSasToken, + final String leaseId, + ContextEncryptionAdapter contextEncryptionAdapter, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + addEncryptionKeyRequestHeaders(path, requestHeaders, false, + contextEncryptionAdapter, tracingContext); + // JDK7 does not support PATCH, so to workaround the issue we will use + // PUT and specify the real method in the X-Http-Method-Override header. + requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH)); + if (leaseId != null) { + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); + } + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, + String.valueOf(retainUncommittedData)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose)); + // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance + String sasTokenForReuse = appendSASTokenToQuery(path, + SASTokenProvider.WRITE_OPERATION, + abfsUriQueryBuilder, cachedSasToken); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.Flush, + HTTP_METHOD_PUT, url, requestHeaders, + sasTokenForReuse); + op.execute(tracingContext); + return op; + } + + @Override + public AbfsRestOperation flush(byte[] buffer, + final String path, + boolean isClose, + final String cachedSasToken, + final String leaseId, + final String eTag, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + throw new UnsupportedOperationException( + "Flush with blockIds not supported on DFS Endpoint"); + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update"> + * Path - Update</a>. + * Set the properties of a file or directory. + * @param path on which properties have to be set. + * @param properties comma separated list of metadata key-value pairs. + * @param tracingContext for tracing the server calls. + * @param contextEncryptionAdapter to provide encryption context. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation setPathProperties(final String path, + final Hashtable<String, String> properties, + final TracingContext tracingContext, + final ContextEncryptionAdapter contextEncryptionAdapter) + throws AzureBlobFileSystemException { + final String commaSeparatedProperties; + try { + commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties); + } catch (CharacterCodingException ex) { + throw new InvalidAbfsRestOperationException(ex); + } + + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + addEncryptionKeyRequestHeaders(path, requestHeaders, false, + contextEncryptionAdapter, tracingContext); + // JDK7 does not support PATCH, so to workaround the issue we will use + // PUT and specify the real method in the X-Http-Method-Override header. + requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH)); + requestHeaders.add(new AbfsHttpHeader(X_MS_PROPERTIES, commaSeparatedProperties)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, SET_PROPERTIES_ACTION); + appendSASTokenToQuery(path, SASTokenProvider.SET_PROPERTIES_OPERATION, + abfsUriQueryBuilder); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.SetPathProperties, + HTTP_METHOD_PUT, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/get-properties"> + * Path - Get Properties</a>. + * Get the properties of a file or directory. + * @param path of which properties have to be fetched. + * @param includeProperties to include user defined properties. + * @param tracingContext for tracing the server calls. + * @param contextEncryptionAdapter to provide encryption context. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation getPathStatus(final String path, + final boolean includeProperties, + final TracingContext tracingContext, + final ContextEncryptionAdapter contextEncryptionAdapter) + throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + String operation = SASTokenProvider.GET_PROPERTIES_OPERATION; + if (!includeProperties) { + // The default action (operation) is implicitly to get properties and this action requires read permission + // because it reads user defined properties. If the action is getStatus or getAclStatus, then + // only traversal (execute) permission is required. + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, GET_STATUS); + operation = SASTokenProvider.GET_STATUS_OPERATION; + } else { + addEncryptionKeyRequestHeaders(path, requestHeaders, false, + contextEncryptionAdapter, tracingContext); + } + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, + String.valueOf(getAbfsConfiguration().isUpnUsed())); + appendSASTokenToQuery(path, operation, abfsUriQueryBuilder); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.GetPathStatus, + HTTP_METHOD_HEAD, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/read"> + * Path - Read</a>. + * Read the contents of the file at specified path + * @param path of the file to be read. + * @param position in the file from where data has to be read. + * @param buffer to store the data read. + * @param bufferOffset offset in the buffer to start storing the data. + * @param bufferLength length of data to be read. + * @param eTag to specify conditional headers. + * @param cachedSasToken to be used for the authenticating operation. + * @param contextEncryptionAdapter to provide encryption context. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation read(final String path, + final long position, + final byte[] buffer, + final int bufferOffset, + final int bufferLength, + final String eTag, + String cachedSasToken, + ContextEncryptionAdapter contextEncryptionAdapter, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + addEncryptionKeyRequestHeaders(path, requestHeaders, false, + contextEncryptionAdapter, tracingContext); + AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE, + String.format("bytes=%d-%d", position, position + bufferLength - 1)); + requestHeaders.add(rangeHeader); + requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); + + // Add request header to fetch MD5 Hash of data returned by server. + if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) { + requestHeaders.add(new AbfsHttpHeader(X_MS_RANGE_GET_CONTENT_MD5, TRUE)); + } + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance + String sasTokenForReuse = appendSASTokenToQuery(path, + SASTokenProvider.READ_OPERATION, + abfsUriQueryBuilder, cachedSasToken); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.ReadFile, + HTTP_METHOD_GET, url, requestHeaders, + buffer, bufferOffset, bufferLength, + sasTokenForReuse); + op.execute(tracingContext); + + // Verify the MD5 hash returned by server holds valid on the data received. + if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) { + verifyCheckSumForRead(buffer, op.getResult(), bufferOffset); + } + + return op; + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/delete"> + * Path - Delete</a>. + * Delete the file or directory at specified path. + * @param path to be deleted. + * @param recursive if the path is a directory, delete recursively. + * @param continuation to specify continuation token. + * @param tracingContext for tracing the server calls. + * @param isNamespaceEnabled specify if the namespace is enabled. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation deletePath(final String path, + final boolean recursive, + final String continuation, + TracingContext tracingContext, + final boolean isNamespaceEnabled) throws AzureBlobFileSystemException { + /* + * If Pagination is enabled and current API version is old, + * use the minimum required version for pagination. + * If Pagination is enabled and current API version is later than minimum required + * version for pagination, use current version only as azure service is backward compatible. + * If pagination is disabled, use the current API version only. + */ + final List<AbfsHttpHeader> requestHeaders = (isPaginatedDelete(recursive, + isNamespaceEnabled) && getxMsVersion().compareTo( + ApiVersion.AUG_03_2023) < 0) + ? createDefaultHeaders(ApiVersion.AUG_03_2023) + : createDefaultHeaders(); + final AbfsUriQueryBuilder abfsUriQueryBuilder + = createDefaultUriQueryBuilder(); + + if (isPaginatedDelete(recursive, isNamespaceEnabled)) { + // Add paginated query parameter + abfsUriQueryBuilder.addQuery(QUERY_PARAM_PAGINATED, TRUE); + } + + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, + String.valueOf(recursive)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); + String operation = recursive + ? SASTokenProvider.DELETE_RECURSIVE_OPERATION + : SASTokenProvider.DELETE_OPERATION; + appendSASTokenToQuery(path, operation, abfsUriQueryBuilder); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.DeletePath, this, + HTTP_METHOD_DELETE, url, requestHeaders, getAbfsConfiguration()); + try { + op.execute(tracingContext); + } catch (AzureBlobFileSystemException e) { + // If we have no HTTP response, throw the original exception. + if (!op.hasResult()) { + throw e; + } + final AbfsRestOperation idempotencyOp = deleteIdempotencyCheckOp(op); + if (idempotencyOp.getResult().getStatusCode() + == op.getResult().getStatusCode()) { + // idempotency did not return different result + // throw back the exception + throw e; + } else { + return idempotencyOp; + } + } + + return op; + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update"> + * Path - Update</a>. + * @param path on which owner has to be set. + * @param owner to be set. + * @param group to be set. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation setOwner(final String path, + final String owner, + final String group, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + // JDK7 does not support PATCH, so to workaround the issue we will use + // PUT and specify the real method in the X-Http-Method-Override header. + requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH)); + if (owner != null && !owner.isEmpty()) { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_OWNER, owner)); + } + if (group != null && !group.isEmpty()) { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_GROUP, group)); + } + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, SET_ACCESS_CONTROL); + appendSASTokenToQuery(path, SASTokenProvider.SET_OWNER_OPERATION, + abfsUriQueryBuilder); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.SetOwner, + HTTP_METHOD_PUT, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update"> + * Path - Update</a>. + * @param path on which permission has to be set. + * @param permission to be set. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation setPermission(final String path, + final String permission, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + // JDK7 does not support PATCH, so to workaround the issue we will use + // PUT and specify the real method in the X-Http-Method-Override header. + requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH)); + requestHeaders.add(new AbfsHttpHeader( + HttpHeaderConfigurations.X_MS_PERMISSIONS, permission)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, SET_ACCESS_CONTROL); + appendSASTokenToQuery(path, SASTokenProvider.SET_PERMISSION_OPERATION, + abfsUriQueryBuilder); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.SetPermissions, + HTTP_METHOD_PUT, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update"> + * Path - Update</a>. + * @param path on which ACL has to be set. + * @param aclSpecString to be set. + * @param eTag to specify conditional headers. Set only if etag matches. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation setAcl(final String path, + final String aclSpecString, + final String eTag, + TracingContext tracingContext) + throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + // JDK7 does not support PATCH, so to workaround the issue we will use + // PUT and specify the real method in the X-Http-Method-Override header. + requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, + HTTP_METHOD_PATCH)); + requestHeaders.add( + new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_ACL, aclSpecString)); + if (eTag != null && !eTag.isEmpty()) { + requestHeaders.add( + new AbfsHttpHeader(IF_MATCH, eTag)); + } + + final AbfsUriQueryBuilder abfsUriQueryBuilder + = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, + SET_ACCESS_CONTROL); + appendSASTokenToQuery(path, SASTokenProvider.SET_ACL_OPERATION, + abfsUriQueryBuilder); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.SetAcl, + HTTP_METHOD_PUT, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/get-properties"> + * Path - Get Properties</a>. + * Retrieves the properties of blob at specified path. + * @param path of which properties have to be fetched. + * @param useUPN whether to use UPN with rest operation. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation getAclStatus(final String path, + final boolean useUPN, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, GET_ACCESS_CONTROL); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, + String.valueOf(useUPN)); + appendSASTokenToQuery(path, SASTokenProvider.GET_ACL_OPERATION, + abfsUriQueryBuilder); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.GetAcl, + HTTP_METHOD_HEAD, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/get-properties"> + * Path - Get Properties</a>. + * @param path Path for which access check needs to be performed + * @param rwx The permission to be checked on the path + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation checkAccess(String path, + String rwx, + TracingContext tracingContext) + throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + + AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, CHECK_ACCESS); + abfsUriQueryBuilder.addQuery(QUERY_FS_ACTION, rwx); + appendSASTokenToQuery(path, SASTokenProvider.CHECK_ACCESS_OPERATION, + abfsUriQueryBuilder); + + URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.CheckAccess, + HTTP_METHOD_HEAD, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Checks if the rest operation results indicate if the path is a directory. + * @param result executed rest operation containing response from server. + * @return True if the path is a directory, False otherwise. + */ + @Override + public boolean checkIsDir(AbfsHttpOperation result) { + String resourceType = result.getResponseHeader( + HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); + return resourceType != null Review Comment: You can make it simple by using `StringUtils.equalsIgnoreCase(resourceType, DIRECTORY)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org