[ 
https://issues.apache.org/jira/browse/HADOOP-19233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17913288#comment-17913288
 ] 

ASF GitHub Bot commented on HADOOP-19233:
-----------------------------------------

anmolanmol1234 commented on code in PR #7265:
URL: https://github.com/apache/hadoop/pull/7265#discussion_r1916525068


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java:
##########
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+//import java.util.ArrayList;
+//import java.util.Collections;
+//import java.util.List;
+import java.util.Random;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import 
org.apache.hadoop.fs.azurebfs.contracts.services.BlobAppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+import static 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_ID_LENGTH;
+//import static 
org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler.generateBlockListXml;
+
+/**
+ * For a directory enabled for atomic-rename, before rename starts, a file with
+ * -RenamePending.json suffix is created. In this file, the states required 
for the
+ * rename operation are given. This file is created by {@link #preRename()} 
method.
+ * This is important in case the JVM process crashes during rename, the 
atomicity
+ * will be maintained, when the job calls {@link 
AzureBlobFileSystem#listStatus(Path)}
+ * or {@link AzureBlobFileSystem#getFileStatus(Path)}. On these API calls to 
filesystem,
+ * it will be checked if there is any RenamePending JSON file. If yes, the 
crashed rename
+ * operation would be resumed as per the file.
+ */
+public class RenameAtomicity {
+
+    private final TracingContext tracingContext;
+
+    private Path src, dst;
+
+    private String srcEtag;
+
+    private final AbfsBlobClient abfsClient;
+
+    private final Path renameJsonPath;
+
+    public static final String SUFFIX = "-RenamePending.json";
+
+    private int preRenameRetryCount = 0;
+
+    private int renamePendingJsonLen;
+
+    private final AbfsLease sourcePathLease;
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private static final Random RANDOM = new Random();
+
+    /**
+     * Performs pre-rename operations. Creates a file with -RenamePending.json
+     * suffix in the source parent directory. This file contains the states
+     * required for the rename operation.
+     *
+     * @param src Source path
+     * @param dst Destination path
+     * @param renameJsonPath Path of the JSON file to be created
+     * @param tracingContext Tracing context
+     * @param srcEtag ETag of the source directory
+     * @param abfsClient AbfsClient instance
+     */
+    public RenameAtomicity(final Path src, final Path dst,
+                           final Path renameJsonPath,
+                           TracingContext tracingContext,
+                           final String srcEtag,
+                           final AbfsClient abfsClient) {
+        this.src = src;
+        this.dst = dst;
+        this.abfsClient = (AbfsBlobClient) abfsClient;
+        this.renameJsonPath = renameJsonPath;
+        this.tracingContext = tracingContext;
+        this.srcEtag = srcEtag;
+        this.sourcePathLease = null;
+    }
+
+    /**
+     * Resumes the rename operation from the JSON file.
+     *
+     * @param renameJsonPath Path of the JSON file
+     * @param renamePendingJsonFileLen Length of the JSON file
+     * @param tracingContext Tracing context
+     * @param srcEtag ETag of the source directory
+     * @param abfsClient AbfsClient instance
+     * @param sourceLease Lease of the source directory
+     */
+    public RenameAtomicity(final Path renameJsonPath,
+                           final int renamePendingJsonFileLen,
+                           TracingContext tracingContext,
+                           final String srcEtag,
+                           final AbfsClient abfsClient,
+                           final AbfsLease sourceLease) {
+        this.abfsClient = (AbfsBlobClient) abfsClient;
+        this.renameJsonPath = renameJsonPath;
+        this.tracingContext = tracingContext;
+        this.srcEtag = srcEtag;
+        this.renamePendingJsonLen = renamePendingJsonFileLen;
+        this.sourcePathLease = sourceLease;
+    }
+
+    /**
+     * Redo the rename operation from the JSON file.
+     *
+     * @throws AzureBlobFileSystemException If the redo operation fails.
+     */
+    public void redo() throws AzureBlobFileSystemException {
+        byte[] buffer = readRenamePendingJson(renameJsonPath, 
renamePendingJsonLen);
+        String contents = new String(buffer, Charset.defaultCharset());
+        try {
+            final RenamePendingJsonFormat renamePendingJsonFormatObj;
+            try {
+                renamePendingJsonFormatObj = objectMapper.readValue(contents,
+                        RenamePendingJsonFormat.class);
+            } catch (JsonProcessingException e) {
+                return;
+            }
+            if (renamePendingJsonFormatObj != null && StringUtils.isNotEmpty(
+                    renamePendingJsonFormatObj.getOldFolderName())
+                    && StringUtils.isNotEmpty(
+                    renamePendingJsonFormatObj.getNewFolderName())
+                    && 
StringUtils.isNotEmpty(renamePendingJsonFormatObj.getETag())) {
+                this.src = new 
Path(renamePendingJsonFormatObj.getOldFolderName());
+                this.dst = new 
Path(renamePendingJsonFormatObj.getNewFolderName());
+                this.srcEtag = renamePendingJsonFormatObj.getETag();
+
+                BlobRenameHandler blobRenameHandler = new BlobRenameHandler(
+                        this.src.toUri().getPath(), dst.toUri().getPath(),
+                        abfsClient, srcEtag, true, true,
+                        sourcePathLease, tracingContext);
+
+                blobRenameHandler.execute();
+            }
+        } finally {
+            deleteRenamePendingJson();
+        }
+    }
+
+    /** Read the JSON file.
+     *
+     * @param path Path of the JSON file
+     * @param len Length of the JSON file
+     * @return Contents of the JSON file
+     * @throws AzureBlobFileSystemException If the read operation fails.
+     */
+    @VisibleForTesting
+    byte[] readRenamePendingJson(Path path, int len)
+            throws AzureBlobFileSystemException {
+        byte[] bytes = new byte[len];
+        abfsClient.read(path.toUri().getPath(), 0, bytes, 0,
+                len, null, null, null,
+                tracingContext);
+        return bytes;
+    }
+
+    /** Generate a random block ID.
+     *
+     * @return Random block ID
+     */
+    public static String generateBlockId() {
+        // PutBlock on the path.
+        byte[] blockIdByteArray = new byte[BLOCK_ID_LENGTH];
+        RANDOM.nextBytes(blockIdByteArray);
+        return new String(Base64.encodeBase64(blockIdByteArray),
+                StandardCharsets.UTF_8);
+    }
+
+    /** Create the JSON file with the contents.
+     *
+     * @param path Path of the JSON file
+     * @param bytes Contents of the JSON file
+     * @throws AzureBlobFileSystemException If the create operation fails.
+     */
+    @VisibleForTesting
+    void createRenamePendingJson(Path path, byte[] bytes)
+            throws AzureBlobFileSystemException {
+        // PutBlob on the path.
+        AbfsRestOperation putBlobOp = 
abfsClient.createPath(path.toUri().getPath(),
+                true,
+                true,
+                null,
+                false,
+                null,
+                null,
+                tracingContext);
+        String eTag = extractEtagHeader(putBlobOp.getResult());
+
+        String blockId = generateBlockId();
+        AppendRequestParameters appendRequestParameters
+                = new AppendRequestParameters(0, 0,
+                bytes.length, AppendRequestParameters.Mode.APPEND_MODE, false, 
null,
+                abfsClient.getAbfsConfiguration().isExpectHeaderEnabled(),
+                new BlobAppendRequestParameters(blockId, eTag));
+
+        abfsClient.append(path.toUri().getPath(), bytes,
+                appendRequestParameters, null, null, tracingContext);
+
+//        List<String> blockIdList = new 
ArrayList<>(Collections.singleton(blockId));
+//        String blockList = generateBlockListXml(blockIdList);
+        // PutBlockList on the path.
+        String blockList = "";

Review Comment:
   if flush is called on empty string, how does it take the blockId into usage ?





> ABFS: [FnsOverBlob] Implementing Rename and Delete APIs over Blob Endpoint
> --------------------------------------------------------------------------
>
>                 Key: HADOOP-19233
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19233
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.4.0
>            Reporter: Anuj Modi
>            Assignee: Manish Bhatt
>            Priority: Major
>              Labels: pull-request-available
>
> Currently, we only support rename and delete operations on the DFS endpoint. 
> The reason for supporting rename and delete operations on the Blob endpoint 
> is that the Blob endpoint does not account for hierarchy. We need to ensure 
> that the HDFS contracts are maintained when performing rename and delete 
> operations. Renaming or deleting a directory over the Blob endpoint requires 
> the client to handle the orchestration and rename or delete all the blobs 
> within the specified directory.
>  
> The task outlines the considerations for implementing rename and delete 
> operations for the FNS-blob endpoint to ensure compatibility with HDFS 
> contracts.
>  * {*}Blob Endpoint Usage{*}: The task addresses the need for abstraction in 
> the code to maintain HDFS contracts while performing rename and delete 
> operations on the blob endpoint, which does not support hierarchy.
>  * {*}Rename Operations{*}: The {{AzureBlobFileSystem#rename()}} method will 
> use a {{RenameHandler}} instance to handle rename operations, with separate 
> handlers for the DFS and blob endpoints. This method includes prechecks, 
> destination adjustments, and orchestration of directory renaming for blobs.
>  * {*}Atomic Rename{*}: Atomic renaming is essential for blob endpoints, as 
> it requires orchestration to copy or delete each blob within the directory. A 
> configuration will allow developers to specify directories for atomic 
> renaming, with a JSON file to track the status of renames.
>  * {*}Delete Operations{*}: Delete operations are simpler than renames, 
> requiring fewer HDFS contract checks. For blob endpoints, the client must 
> handle orchestration, including managing orphaned directories created by 
> Az-copy.
>  * {*}Orchestration for Rename/Delete{*}: Orchestration for rename and delete 
> operations over blob endpoints involves listing blobs and performing actions 
> on each blob. The process must be optimized to handle large numbers of blobs 
> efficiently.
>  * {*}Need for Optimization{*}: Optimization is crucial because the 
> {{ListBlob}} API can return a maximum of 5000 blobs at once, necessitating 
> multiple calls for large directories. The task proposes a producer-consumer 
> model to handle blobs in parallel, thereby reducing processing time and 
> memory usage.
>  * {*}Producer-Consumer Design{*}: The proposed design includes a producer to 
> list blobs, a queue to store the blobs, and a consumer to process them in 
> parallel. This approach aims to improve efficiency and mitigate memory issues.
> More details will follow
> Perquisites for this Patch:
> 1. HADOOP-19187 ABFS: [FnsOverBlob]Making AbfsClient Abstract for supporting 
> both DFS and Blob Endpoint - ASF JIRA (apache.org)
> 2. HADOOP-19226 ABFS: [FnsOverBlob]Implementing Azure Rest APIs on Blob 
> Endpoint for AbfsBlobClient - ASF JIRA (apache.org)
> 3. HADOOP-19207 ABFS: [FnsOverBlob]Response Handling of Blob Endpoint APIs 
> and Metadata APIs - ASF JIRA (apache.org)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to