[
https://issues.apache.org/jira/browse/HADOOP-19233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17913269#comment-17913269
]
ASF GitHub Bot commented on HADOOP-19233:
-----------------------------------------
anmolanmol1234 commented on code in PR #7265:
URL: https://github.com/apache/hadoop/pull/7265#discussion_r1916391781
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java:
##########
@@ -0,0 +1,652 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+import org.apache.hadoop.fs.azurebfs.enums.BlobCopyProgress;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+import static
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_ABORTED;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_FAILED;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_SUCCESS;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_ID;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_SOURCE;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_STATUS;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_STATUS_DESCRIPTION;
+import static
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_ABORTED;
+import static
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_FAILED;
+import static
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
+
+/**
+ * Orchestrator for rename over Blob endpoint. Handles both directory and file
+ * renames. Blob Endpoint does not expose rename API, this class is responsible
+ * for copying the blobs and deleting the source blobs.
+ * <p>
+ * For directory rename, it recursively lists the blobs in the source
directory and
+ * copies them to the destination directory.
+ */
+public class BlobRenameHandler extends ListActionTaker {
+
+ public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
+
+ private final String srcEtag;
+
+ private final Path src, dst;
+
+ private final boolean isAtomicRename, isAtomicRenameRecovery;
+
+ private final TracingContext tracingContext;
+
+ private AbfsLease srcAbfsLease;
+
+ private String srcLeaseId;
+
+ private final List<AbfsLease> leases = new ArrayList<>();
+
+ private final AtomicInteger operatedBlobCount = new AtomicInteger(0);
+
+ /** Constructor.
+ *
+ * @param src source path
+ * @param dst destination path
+ * @param abfsClient AbfsBlobClient to use for the rename operation
+ * @param srcEtag eTag of the source path
+ * @param isAtomicRename true if the rename operation is atomic
+ * @param isAtomicRenameRecovery true if the rename operation is a
recovery of a previous failed atomic rename operation
+ * @param tracingContext object of tracingContext used for the tracing of
the server calls.
+ */
+ public BlobRenameHandler(final String src,
+ final String dst,
+ final AbfsBlobClient abfsClient,
+ final String srcEtag,
+ final boolean isAtomicRename,
+ final boolean isAtomicRenameRecovery,
+ final TracingContext tracingContext) {
+ super(new Path(src), abfsClient, tracingContext);
+ this.srcEtag = srcEtag;
+ this.tracingContext = tracingContext;
+ this.src = new Path(src);
+ this.dst = new Path(dst);
+ this.isAtomicRename = isAtomicRename;
+ this.isAtomicRenameRecovery = isAtomicRenameRecovery;
+ }
+
+ public BlobRenameHandler(final String src,
+ final String dst,
+ final AbfsBlobClient abfsClient,
+ final String srcEtag,
+ final boolean isAtomicRename,
+ final boolean isAtomicRenameRecovery,
+ final AbfsLease srcAbfsLease,
+ final TracingContext tracingContext) {
+ this(src, dst, abfsClient, srcEtag, isAtomicRename,
isAtomicRenameRecovery,
+ tracingContext);
+ this.srcAbfsLease = srcAbfsLease;
+ }
+
+ @Override
+ int getMaxConsumptionParallelism() {
+ return getAbfsClient().getAbfsConfiguration()
+ .getBlobRenameDirConsumptionParallelism();
+ }
+
+ /**
+ * Orchestrates the rename operation.
+ *
+ * @return AbfsClientRenameResult containing the result of the rename
operation
+ * @throws AzureBlobFileSystemException if server call fails
+ */
+ public boolean execute() throws AzureBlobFileSystemException {
+ PathInformation pathInformation = new PathInformation();
+ boolean result = false;
+ if (preCheck(src, dst, pathInformation)) {
+ RenameAtomicity renameAtomicity = null;
+ if (pathInformation.getIsDirectory()
+ && pathInformation.getIsImplicit()) {
+ AbfsRestOperation createMarkerOp =
getAbfsClient().createPath(src.toUri().getPath(),
+ false, false, null,
+ false, null, null, tracingContext);
+
pathInformation.setETag(extractEtagHeader(createMarkerOp.getResult()));
+ }
+ try {
+ if (isAtomicRename) {
+ /*
+ * Conditionally get a lease on the source blob to prevent
other writers
+ * from changing it. This is used for correctness in HBase
when log files
+ * are renamed. When the HBase master renames a log file
folder, the lease
+ * locks out other writers. This prevents a region server
that the master
+ * thinks is dead, but is still alive, from committing
additional updates.
+ * This is different than when HBase runs on HDFS, where
the region server
+ * recovers the lease on a log file, to gain exclusive
access to it, before
+ * it splits it.
+ */
+ if (srcAbfsLease == null) {
+ srcAbfsLease = takeLease(src, srcEtag);
+ }
+ srcLeaseId = srcAbfsLease.getLeaseID();
+ if (!isAtomicRenameRecovery &&
pathInformation.getIsDirectory()) {
+ /*
+ * if it is not a resume of a previous failed atomic
rename operation,
+ * perform the pre-rename operation.
+ */
+ renameAtomicity = getRenameAtomicity(pathInformation);
+ renameAtomicity.preRename();
+ }
+ }
+ if (pathInformation.getIsDirectory()) {
+ result = listRecursiveAndTakeAction() && finalSrcRename();
+ } else {
+ result = renameInternal(src, dst);
+ }
+ } finally {
+ if (srcAbfsLease != null) {
+ // If the operation is successful, cancel the timer and no
need to release
+ // the lease as delete on the blob-path has taken place.
+ if (result) {
+ srcAbfsLease.cancelTimer();
+ } else {
+ srcAbfsLease.free();
+ }
+ }
+ }
+ if (result && renameAtomicity != null) {
+ renameAtomicity.postRename();
+ }
+ }
+ return result;
+ }
+
+ /** Final rename operation after all the blobs have been copied.
+ *
+ * @return true if rename is successful
+ * @throws AzureBlobFileSystemException if server call fails
+ */
+ private boolean finalSrcRename() throws AzureBlobFileSystemException {
+ tracingContext.setOperatedBlobCount(operatedBlobCount.get() + 1);
+ try {
+ return renameInternal(src, dst);
+ } finally {
+ tracingContext.setOperatedBlobCount(null);
+ }
+ }
+
+ /** Gets the rename atomicity object.
+ *
+ * @param pathInformation object containing the path information of the
source path
+ *
+ * @return RenameAtomicity object
+ */
+ @VisibleForTesting
+ public RenameAtomicity getRenameAtomicity(final PathInformation
pathInformation) {
+ return new RenameAtomicity(src,
+ dst,
+ new Path(src.getParent(), src.getName() +
RenameAtomicity.SUFFIX),
+ tracingContext,
+ pathInformation.getETag(),
+ getAbfsClient());
+ }
+
+ /** Takes a lease on the path.
+ *
+ * @param path path on which the lease is to be taken
+ * @param eTag eTag of the path
+ *
+ * @return object containing the lease information
+ * @throws AzureBlobFileSystemException if server call fails
+ */
+ private AbfsLease takeLease(final Path path, final String eTag)
+ throws AzureBlobFileSystemException {
+ AbfsLease lease = new AbfsLease(getAbfsClient(),
path.toUri().getPath(), false,
+ getAbfsClient().getAbfsConfiguration()
+ .getAtomicRenameLeaseRefreshDuration(),
+ eTag, tracingContext);
+ leases.add(lease);
+ return lease;
+ }
+
+ /** Checks if the path contains a colon.
+ *
+ * @param p path to check
+ *
+ * @return true if the path contains a colon
+ */
+ private boolean containsColon(Path p) {
+ return p.toUri().getPath().contains(":");
+ }
+
+ /**
+ * Since, server doesn't have a rename API and would not be able to check
HDFS
+ * contracts, client would have to ensure that no HDFS contract is
violated.
+ *
+ * @param src source path
+ * @param dst destination path
+ * @param pathInformation object in which path information of the source
path would be stored
+ *
+ * @return true if the pre-checks pass
+ * @throws AzureBlobFileSystemException if server call fails or given
paths are invalid.
+ */
+ private boolean preCheck(final Path src, final Path dst,
+ final PathInformation pathInformation)
+ throws AzureBlobFileSystemException {
+ validateDestinationPath(src, dst);
+
+ setSrcPathInformation(src, pathInformation);
+ validateSourcePath(pathInformation);
+ validateDestinationPathNotExist(src, dst, pathInformation);
+ validateDestinationParentExist(src, dst, pathInformation);
+
+ return true;
+ }
+
+ /**
+ * Validate if the format of the destination path is correct and if the
destination
+ * path is not a sub-directory of the source path.
+ *
+ * @param src source path
+ * @param dst destination path
+ *
+ * @throws AbfsRestOperationException if the destination path is invalid
+ */
+ private void validateDestinationPath(final Path src, final Path dst)
+ throws AbfsRestOperationException {
+ if (containsColon(dst)) {
+ throw new AbfsRestOperationException(
+ HttpURLConnection.HTTP_BAD_REQUEST,
+
AzureServiceErrorCode.INVALID_RENAME_DESTINATION.getErrorCode(), null,
+ new PathIOException(dst.toUri().getPath(),
+ "Destination path contains colon"));
+ }
+
+ validateDestinationIsNotSubDir(src, dst);
+ }
+
+ /**
+ * Validate if the destination path is not a sub-directory of the source
path.
+ *
+ * @param src source path
+ * @param dst destination path
+ */
+ private void validateDestinationIsNotSubDir(final Path src,
+ final Path dst) throws
AbfsRestOperationException {
+ LOG.debug("Check if the destination is subDirectory");
+ Path nestedDstParent = dst.getParent();
+ if (nestedDstParent != null && nestedDstParent.toUri()
+ .getPath()
+ .indexOf(src.toUri().getPath()) == 0) {
+ LOG.info("Rename src: {} dst: {} failed as dst is subDir of src",
+ src, dst);
+ throw new
AbfsRestOperationException(HttpURLConnection.HTTP_CONFLICT,
+
AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH.getErrorCode(),
+
AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH.getErrorMessage(),
+ new Exception(
+
AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH.getErrorCode()));
+ }
+ }
+
+ /** Set the path information of the source path.
+ *
+ * @param src source path
+ * @param pathInformation object containing the path information of the
source path
+ *
+ * @throws AzureBlobFileSystemException if server call fails
+ */
+ private void setSrcPathInformation(final Path src,
+ final PathInformation pathInformation)
+ throws AzureBlobFileSystemException {
+ pathInformation.copy(getPathInformation(src, tracingContext));
+ }
+
+ /**
+ * Validate if the source path exists and if the client knows the ETag of
the source path,
+ * then the ETag should match with the server.
+ *
+ * @param pathInformation object containing the path information of the
source path
+ *
+ * @throws AbfsRestOperationException if the source path is not found or
if the ETag of the source
+ * path does not match with the server.
+ */
+ private void validateSourcePath(final PathInformation pathInformation)
+ throws AzureBlobFileSystemException {
+ if (!pathInformation.getPathExists()) {
+ throw new AbfsRestOperationException(
+ HttpURLConnection.HTTP_NOT_FOUND,
+
AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND.getErrorCode(), null,
+ new Exception(
+
AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND.getErrorCode()));
+ }
+ if (srcEtag != null && !srcEtag.equals(pathInformation.getETag())) {
+ throw new AbfsRestOperationException(
+ HttpURLConnection.HTTP_CONFLICT,
+ AzureServiceErrorCode.PATH_ALREADY_EXISTS.getErrorCode(),
null,
+ new Exception(
+
AzureServiceErrorCode.PATH_ALREADY_EXISTS.getErrorCode()));
+ }
+ }
+
+ /** Validate if the destination path does not exist.
+ *
+ * @param src source path
+ * @param dst destination path
+ * @param pathInformation object containing the path information of the
source path
+ *
+ * @throws AbfsRestOperationException if the destination path already
exists
+ */
+ private void validateDestinationPathNotExist(final Path src,
+ final Path dst,
+ final PathInformation
pathInformation)
+ throws AzureBlobFileSystemException {
+ /*
+ * Destination path name can be same to that of source path name only
in the
+ * case of a directory rename.
+ *
+ * In case the directory is being renamed to some other name, the
destination
+ * check would happen on the AzureBlobFileSystem#rename method.
+ */
+ if (pathInformation.getIsDirectory() && dst.getName()
+ .equals(src.getName())) {
+ PathInformation dstPathInformation = getPathInformation(
+ dst,
+ tracingContext);
+ if (dstPathInformation.getPathExists()) {
+ LOG.info(
+ "Rename src: {} dst: {} failed as qualifiedDst already
exists",
+ src, dst);
+ throw new AbfsRestOperationException(
+ HttpURLConnection.HTTP_CONFLICT,
+
AzureServiceErrorCode.PATH_ALREADY_EXISTS.getErrorCode(), null,
+ null);
+ }
+ }
+ }
+
+ /** Validate if the parent of the destination path exists.
+ *
+ * @param src source path
+ * @param dst destination path
+ * @param pathInformation object containing the path information of the
source path
+ *
+ * @throws AbfsRestOperationException if the parent of the destination
path does not exist
+ */
+ private void validateDestinationParentExist(final Path src,
+ final Path dst,
+ final PathInformation
pathInformation)
+ throws AzureBlobFileSystemException {
+ final Path nestedDstParent = dst.getParent();
+ if (!dst.isRoot() && nestedDstParent != null &&
!nestedDstParent.isRoot()
+ && (
+ !pathInformation.getIsDirectory() || !dst.getName()
+ .equals(src.getName()))) {
+ PathInformation nestedDstInfo = getPathInformation(
+ nestedDstParent,
+ tracingContext);
+ if (!nestedDstInfo.getPathExists() ||
!nestedDstInfo.getIsDirectory()) {
+ throw new AbfsRestOperationException(
+ HttpURLConnection.HTTP_NOT_FOUND,
+
RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode(), null,
+ new Exception(
+
RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode()));
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ boolean takeAction(final Path path) throws AzureBlobFileSystemException {
+ return renameInternal(path,
+ createDestinationPathForBlobPartOfRenameSrcDir(dst, path,
src));
+ }
+
+ /** Renames the source path to the destination path.
+ *
+ * @param path source path
+ * @param destinationPathForBlobPartOfRenameSrcDir destination path
+ *
+ * @return true if rename is successful
+ * @throws AzureBlobFileSystemException if server call fails
+ */
+ private boolean renameInternal(final Path path,
+ final Path
destinationPathForBlobPartOfRenameSrcDir)
+ throws AzureBlobFileSystemException {
+ final String leaseId;
+ AbfsLease abfsLease = null;
+ if (isAtomicRename) {
+ /*
+ * To maintain atomicity of rename of the path, lease is taken on
the path.
+ */
+ if (path.equals(src)) {
+ abfsLease = srcAbfsLease;
+ leaseId = srcLeaseId;
+ } else {
+ abfsLease = takeLease(path, null);
+ leaseId = abfsLease.getLeaseID();
+ }
+ } else {
+ leaseId = null;
+ }
+ boolean operated = false;
+ try {
+ copyPath(path, destinationPathForBlobPartOfRenameSrcDir, leaseId);
+ getAbfsClient().deleteBlobPath(path, leaseId, tracingContext);
+ operated = true;
+ } finally {
+ if (abfsLease != null) {
+ // If the operation is successful, cancel the timer and no
need to release
+ // the lease as delete on the blob-path has taken place.
+ if (operated) {
+ abfsLease.cancelTimer();
+ } else {
+ abfsLease.free();
+ }
+ }
+ }
+ operatedBlobCount.incrementAndGet();
+ return true;
+ }
+
+ /** Copies the source path to the destination path.
+ *
+ * @param src source path
+ * @param dst destination path
+ * @param leaseId lease id for the source path
+ *
+ * @throws AzureBlobFileSystemException if server call fails
+ */
+ private void copyPath(final Path src, final Path dst, final String leaseId)
+ throws AzureBlobFileSystemException {
+ String copyId;
+ try {
+ AbfsRestOperation copyPathOp = getAbfsClient().copyBlob(src, dst,
leaseId,
+ tracingContext);
+ final String progress = copyPathOp.getResult()
+ .getResponseHeader(X_MS_COPY_STATUS);
+ if (COPY_STATUS_SUCCESS.equalsIgnoreCase(progress)) {
+ return;
+ }
+ copyId = copyPathOp.getResult()
+ .getResponseHeader(X_MS_COPY_ID);
+ } catch (AbfsRestOperationException ex) {
+ if (ex.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
+ AbfsRestOperation dstPathStatus =
getAbfsClient().getPathStatus(
+ dst.toUri().getPath(),
+ tracingContext, null, false);
+ final String srcCopyPath = ROOT_PATH +
getAbfsClient().getFileSystem()
+ + src.toUri().getPath();
+ if (dstPathStatus.getResult() != null && (srcCopyPath.equals(
+ getDstSource(dstPathStatus)))) {
+ return;
+ }
+ }
+ throw ex;
+ }
+ final long pollWait = getAbfsClient().getAbfsConfiguration()
+ .getBlobCopyProgressPollWaitMillis();
+ while (handleCopyInProgress(dst, tracingContext, copyId)
Review Comment:
If handleCopyInProgress keeps returning PENDING, the code might enter an
infinite loop of waiting. We should introduce maximum wait time and if exceeded
fail.
> ABFS: [FnsOverBlob] Implementing Rename and Delete APIs over Blob Endpoint
> --------------------------------------------------------------------------
>
> Key: HADOOP-19233
> URL: https://issues.apache.org/jira/browse/HADOOP-19233
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.4.0
> Reporter: Anuj Modi
> Assignee: Manish Bhatt
> Priority: Major
> Labels: pull-request-available
>
> Currently, we only support rename and delete operations on the DFS endpoint.
> The reason for supporting rename and delete operations on the Blob endpoint
> is that the Blob endpoint does not account for hierarchy. We need to ensure
> that the HDFS contracts are maintained when performing rename and delete
> operations. Renaming or deleting a directory over the Blob endpoint requires
> the client to handle the orchestration and rename or delete all the blobs
> within the specified directory.
>
> The task outlines the considerations for implementing rename and delete
> operations for the FNS-blob endpoint to ensure compatibility with HDFS
> contracts.
> * {*}Blob Endpoint Usage{*}: The task addresses the need for abstraction in
> the code to maintain HDFS contracts while performing rename and delete
> operations on the blob endpoint, which does not support hierarchy.
> * {*}Rename Operations{*}: The {{AzureBlobFileSystem#rename()}} method will
> use a {{RenameHandler}} instance to handle rename operations, with separate
> handlers for the DFS and blob endpoints. This method includes prechecks,
> destination adjustments, and orchestration of directory renaming for blobs.
> * {*}Atomic Rename{*}: Atomic renaming is essential for blob endpoints, as
> it requires orchestration to copy or delete each blob within the directory. A
> configuration will allow developers to specify directories for atomic
> renaming, with a JSON file to track the status of renames.
> * {*}Delete Operations{*}: Delete operations are simpler than renames,
> requiring fewer HDFS contract checks. For blob endpoints, the client must
> handle orchestration, including managing orphaned directories created by
> Az-copy.
> * {*}Orchestration for Rename/Delete{*}: Orchestration for rename and delete
> operations over blob endpoints involves listing blobs and performing actions
> on each blob. The process must be optimized to handle large numbers of blobs
> efficiently.
> * {*}Need for Optimization{*}: Optimization is crucial because the
> {{ListBlob}} API can return a maximum of 5000 blobs at once, necessitating
> multiple calls for large directories. The task proposes a producer-consumer
> model to handle blobs in parallel, thereby reducing processing time and
> memory usage.
> * {*}Producer-Consumer Design{*}: The proposed design includes a producer to
> list blobs, a queue to store the blobs, and a consumer to process them in
> parallel. This approach aims to improve efficiency and mitigate memory issues.
> More details will follow
> Perquisites for this Patch:
> 1. HADOOP-19187 ABFS: [FnsOverBlob]Making AbfsClient Abstract for supporting
> both DFS and Blob Endpoint - ASF JIRA (apache.org)
> 2. HADOOP-19226 ABFS: [FnsOverBlob]Implementing Azure Rest APIs on Blob
> Endpoint for AbfsBlobClient - ASF JIRA (apache.org)
> 3. HADOOP-19207 ABFS: [FnsOverBlob]Response Handling of Blob Endpoint APIs
> and Metadata APIs - ASF JIRA (apache.org)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]