[ 
https://issues.apache.org/jira/browse/HADOOP-19232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17922145#comment-17922145
 ] 

ASF GitHub Bot commented on HADOOP-19232:
-----------------------------------------

rakeshadr commented on code in PR #7272:
URL: https://github.com/apache/hadoop/pull/7272#discussion_r1934191424


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -237,8 +427,12 @@ public synchronized void write(final byte[] data, final 
int off, final int lengt
     if (hasLease() && isLeaseFreed()) {
       throw new PathIOException(path, ERR_WRITE_WITHOUT_LEASE);
     }
-    DataBlocks.DataBlock block = createBlockIfNeeded();
-    int written = block.write(data, off, length);
+    if (length == 0) {
+      return;

Review Comment:
   Please add a log message conveying this condition



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -338,9 +531,14 @@ private void uploadBlockAsync(DataBlocks.DataBlock 
blockToUpload,
              */
             AppendRequestParameters reqParams = new AppendRequestParameters(
                 offset, 0, bytesLength, mode, false, leaseId, 
isExpectHeaderEnabled);
-            AbfsRestOperation op = getClient().append(path,
-                blockUploadData.toByteArray(), reqParams, cachedSasToken.get(),
-                contextEncryptionAdapter, new TracingContext(tracingContext));
+            AbfsRestOperation op;
+            try {
+              op = remoteWrite(blockToUpload, blockUploadData, reqParams, 
tracingContext);
+            } catch (InvalidIngressServiceException ex) {
+              switchHandler();

Review Comment:
   Whether this is deterministic switching ? I meant, in case of exception what 
will be the behavior of the new handler?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java:
##########
@@ -328,7 +337,7 @@ public AbfsRestOperation listPath(final String 
relativePath, final boolean recur
       abfsUriQueryBuilder.addQuery(QUERY_PARAM_DELIMITER, FORWARD_SLASH);
     }
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAX_RESULTS, 
String.valueOf(listMaxResults));
-    appendSASTokenToQuery(relativePath, SASTokenProvider.LIST_OPERATION, 
abfsUriQueryBuilder);
+    appendSASTokenToQuery(relativePath, 
SASTokenProvider.FIXED_SAS_STORE_OPERATION, abfsUriQueryBuilder);

Review Comment:
   Hope backward compatibility being taken care?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -631,45 +805,89 @@ private synchronized void waitForAppendsToComplete() 
throws IOException {
     }
   }
 
+  /**
+   * Flushes the written bytes to the Azure Blob Storage service, ensuring all
+   * appends are completed. This method is typically called during a close 
operation.
+   *
+   * @param isClose indicates whether this flush is happening as part of a 
close operation.
+   * @throws IOException if an I/O error occurs during the flush operation.
+   */
   private synchronized void flushWrittenBytesToService(boolean isClose) throws 
IOException {
+    // Ensure all appends are completed before flushing.
     waitForAppendsToComplete();
+    // Flush the written bytes to the service.
     flushWrittenBytesToServiceInternal(position, false, isClose);
   }
 
+  /**
+   * Asynchronously flushes the written bytes to the Azure Blob Storage 
service.
+   * This method ensures that the write operation queue is managed and only 
flushes
+   * if there are uncommitted data beyond the last flush offset.
+   *
+   * @throws IOException if an I/O error occurs during the flush operation.
+   */
   private synchronized void flushWrittenBytesToServiceAsync() throws 
IOException {
+    // Manage the write operation queue to ensure efficient writes
     shrinkWriteOperationQueue();
 
+    // Only flush if there are uncommitted data beyond the last flush offset
     if (this.lastTotalAppendOffset > this.lastFlushOffset) {
       this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true,
         false/*Async flush on close not permitted*/);
     }
   }
 
+  /**
+   * Flushes the written bytes to the Azure Blob Storage service.
+   *
+   * @param offset                the offset up to which data needs to be 
flushed.
+   * @param retainUncommitedData whether to retain uncommitted data after 
flush.
+   * @param isClose               whether this flush is happening as part of a 
close operation.
+   * @throws IOException if an I/O error occurs.
+   */
   private synchronized void flushWrittenBytesToServiceInternal(final long 
offset,
       final boolean retainUncommitedData, final boolean isClose) throws 
IOException {
     // flush is called for appendblob only on close
     if (this.isAppendBlob && !isClose) {
       return;
     }
 
+    // Tracker to monitor performance metrics
     AbfsPerfTracker tracker = client.getAbfsPerfTracker();
     try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
             "flushWrittenBytesToServiceInternal", "flush")) {
-      AbfsRestOperation op = getClient().flush(path, offset, 
retainUncommitedData,
-          isClose, cachedSasToken.get(), leaseId, contextEncryptionAdapter,
-          new TracingContext(tracingContext));
-      cachedSasToken.update(op.getSasToken());
-      perfInfo.registerResult(op.getResult()).registerSuccess(true);
-    } catch (AzureBlobFileSystemException ex) {
-      if (ex instanceof AbfsRestOperationException) {
-        if (((AbfsRestOperationException) ex).getStatusCode() == 
HttpURLConnection.HTTP_NOT_FOUND) {
+      AbfsRestOperation op;
+      try {
+        // Attempt to flush data to the remote service.
+        op = remoteFlush(offset, retainUncommitedData, isClose, leaseId,
+            tracingContext);
+      } catch (InvalidIngressServiceException ex) {
+        // If an invalid ingress service is encountered, switch handler and 
retry.
+        switchHandler();

Review Comment:
   The above 'switchHandler' comment is applicable here as well.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java:
##########
@@ -328,7 +337,7 @@ public AbfsRestOperation listPath(final String 
relativePath, final boolean recur
       abfsUriQueryBuilder.addQuery(QUERY_PARAM_DELIMITER, FORWARD_SLASH);
     }
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAX_RESULTS, 
String.valueOf(listMaxResults));
-    appendSASTokenToQuery(relativePath, SASTokenProvider.LIST_OPERATION, 
abfsUriQueryBuilder);
+    appendSASTokenToQuery(relativePath, 
SASTokenProvider.FIXED_SAS_STORE_OPERATION, abfsUriQueryBuilder);

Review Comment:
   Whats the plan for user delegation SAS, any jira created for this? If not 
please create a follow-up jira.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -338,9 +531,14 @@ private void uploadBlockAsync(DataBlocks.DataBlock 
blockToUpload,
              */
             AppendRequestParameters reqParams = new AppendRequestParameters(
                 offset, 0, bytesLength, mode, false, leaseId, 
isExpectHeaderEnabled);
-            AbfsRestOperation op = getClient().append(path,
-                blockUploadData.toByteArray(), reqParams, cachedSasToken.get(),
-                contextEncryptionAdapter, new TracingContext(tracingContext));
+            AbfsRestOperation op;
+            try {
+              op = remoteWrite(blockToUpload, blockUploadData, reqParams, 
tracingContext);
+            } catch (InvalidIngressServiceException ex) {
+              switchHandler();

Review Comment:
   Also, please add debug log message in conditional execution cases.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java:
##########
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.store.DataBlocks;
+
+/**
+ * Manages Azure Blob blocks for append operations.
+ */
+public class AzureBlobBlockManager extends AzureBlockManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsOutputStream.class);
+
+
+  /** The list of already committed blocks is stored in this list. */
+  private List<String> committedBlockEntries = new ArrayList<>();
+
+  /** The list to store blockId, position, and status. */
+  private final LinkedList<BlockEntry> blockEntryList = new LinkedList<>();
+
+
+  /**
+   * Constructs an AzureBlobBlockManager.
+   *
+   * @param abfsOutputStream the output stream
+   * @param blockFactory the block factory
+   * @param bufferSize the buffer size
+   * @throws AzureBlobFileSystemException if an error occurs
+   */
+  public AzureBlobBlockManager(AbfsOutputStream abfsOutputStream,
+      DataBlocks.BlockFactory blockFactory,
+      int bufferSize)
+      throws AzureBlobFileSystemException {
+    super(abfsOutputStream, blockFactory, bufferSize);
+    if (abfsOutputStream.getPosition() > 0 && 
!abfsOutputStream.isAppendBlob()) {
+      this.committedBlockEntries = 
getBlockList(abfsOutputStream.getTracingContext());
+    }
+    LOG.debug("Created a new Blob Block Manager for AbfsOutputStream instance 
{} for path {}",
+        abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
+  }
+
+  /**
+   * Creates a new block.
+   *
+   * @param position the position
+   * @return the created block
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  protected synchronized AbfsBlock createBlockInternal(long position)
+      throws IOException {
+    if (getActiveBlock() == null) {
+      setBlockCount(getBlockCount() + 1);
+      AbfsBlock activeBlock = new AbfsBlobBlock(getAbfsOutputStream(), 
position);
+      activeBlock.setBlockEntry(addNewEntry(activeBlock.getBlockId(), 
activeBlock.getOffset()));
+      setActiveBlock(activeBlock);
+    }
+    return getActiveBlock();
+  }
+
+  /**
+   * Returns block id's which are committed for the blob.
+   *
+   * @param tracingContext Tracing context object.
+   * @return list of committed block id's.
+   * @throws AzureBlobFileSystemException if an error occurs
+   */
+  private List<String> getBlockList(TracingContext tracingContext)
+      throws AzureBlobFileSystemException {
+    List<String> committedBlockIdList = new ArrayList<>();
+    AbfsBlobClient blobClient = 
getAbfsOutputStream().getClientHandler().getBlobClient();
+    final AbfsRestOperation op = blobClient
+        .getBlockList(getAbfsOutputStream().getPath(), tracingContext);
+    if (op != null && op.getResult() != null) {
+      committedBlockIdList = op.getResult().getBlockIdList();
+    }
+    return committedBlockIdList;
+  }
+
+  /**
+   * Adds a new block entry to the block entry list.
+   * The block entry is added only if the position of the new block
+   * is greater than the position of the last block in the list.
+   *
+   * @param blockId The ID of the new block to be added.
+   * @param position The position of the new block in the stream.
+   * @return The newly added {@link BlockEntry}.
+   * @throws IOException If the position of the new block is not greater than 
the last block in the list.
+   */
+  private synchronized BlockEntry addNewEntry(String blockId, long position) 
throws IOException {
+    if (!blockEntryList.isEmpty()) {
+      BlockEntry lastEntry = blockEntryList.getLast();
+      if (position <= lastEntry.getPosition()) {
+        throw new IOException("New block position " + position  + " must be 
greater than the last block position "
+            + lastEntry.getPosition() + " for path " + 
getAbfsOutputStream().getPath());
+      }
+    }
+    BlockEntry blockEntry = new BlockEntry(blockId, position, 
AbfsBlockStatus.NEW);
+    blockEntryList.addLast(blockEntry);
+    LOG.debug("Added block {} at position {} with status NEW.", blockId, 
position);
+    return blockEntry;
+  }
+
+  /**
+   * Updates the status of an existing block entry to SUCCESS.
+   * This method is used to mark a block as successfully processed.
+   *
+   * @param block The {@link AbfsBlock} whose status needs to be updated to 
SUCCESS.
+   */
+  protected synchronized void updateEntry(AbfsBlock block) {
+    BlockEntry blockEntry = block.getBlockEntry();
+    blockEntry.setStatus(AbfsBlockStatus.SUCCESS);
+    LOG.debug("Added block {} at position {} with status SUCCESS.", 
block.getBlockId(), blockEntry.getPosition());
+  }
+
+  /**
+   * Prepares the list of blocks to commit.
+   *
+   * @return whether we have some data to commit or not.
+   * @throws IOException if an I/O error occurs
+   */
+  protected boolean hasListToCommit() throws IOException {
+    // Adds all the committed blocks if available to the list of blocks to be 
added in putBlockList.
+    if (blockEntryList.isEmpty()) {
+      return false; // No entries to commit
+    }
+    while (!blockEntryList.isEmpty()) {
+      BlockEntry current = blockEntryList.poll();
+      if (current.getStatus() != AbfsBlockStatus.SUCCESS) {
+        LOG.debug(
+            "Block {} with position {} has status {}, flush cannot proceed.",
+            current.getBlockId(), current.getPosition(), current.getStatus());
+        throw new IOException("Flush failed. Block " + current.getBlockId()
+            + " with position " + current.getPosition() + " has status "
+            + current.getStatus() + "for path " + 
getAbfsOutputStream().getPath());
+      }
+      if (!blockEntryList.isEmpty()) {
+        BlockEntry next = blockEntryList.getFirst();
+        if (current.getPosition() >= next.getPosition()) {
+          String errorMessage =
+              "Position check failed. Current block position is greater than 
or equal to the next block's position.\n"
+                  + "Current Block Entry:\n"
+                  + "Block ID: " + current.getBlockId()
+                  + ", Position: " + current.getPosition()
+                  + ", Status: " + current.getStatus()
+                  + ", Path: " + getAbfsOutputStream().getPath()
+                  + ", StreamID: " + getAbfsOutputStream().getStreamID()
+                  + ", Next block position: " + next.getPosition()
+                  + "\n";
+          throw new IOException(errorMessage);
+        }
+      }
+      committedBlockEntries.add(current.getBlockId());

Review Comment:
   Please create a follow-up PR. Probably you can group all the open comments 
and handle it in that PR.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java:
##########
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.store.DataBlocks;
+
+/**
+ * Manages Azure Blob blocks for append operations.
+ */
+public class AzureBlobBlockManager extends AzureBlockManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsOutputStream.class);
+
+
+  /** The list of already committed blocks is stored in this list. */
+  private List<String> committedBlockEntries = new ArrayList<>();
+
+  /** The list to store blockId, position, and status. */
+  private final LinkedList<BlockEntry> blockEntryList = new LinkedList<>();
+
+
+  /**
+   * Constructs an AzureBlobBlockManager.
+   *
+   * @param abfsOutputStream the output stream
+   * @param blockFactory the block factory
+   * @param bufferSize the buffer size
+   * @throws AzureBlobFileSystemException if an error occurs
+   */
+  public AzureBlobBlockManager(AbfsOutputStream abfsOutputStream,
+      DataBlocks.BlockFactory blockFactory,
+      int bufferSize)
+      throws AzureBlobFileSystemException {
+    super(abfsOutputStream, blockFactory, bufferSize);
+    if (abfsOutputStream.getPosition() > 0 && 
!abfsOutputStream.isAppendBlob()) {
+      this.committedBlockEntries = 
getBlockList(abfsOutputStream.getTracingContext());
+    }
+    LOG.debug("Created a new Blob Block Manager for AbfsOutputStream instance 
{} for path {}",
+        abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
+  }
+
+  /**
+   * Creates a new block.
+   *
+   * @param position the position
+   * @return the created block
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  protected synchronized AbfsBlock createBlockInternal(long position)
+      throws IOException {
+    if (getActiveBlock() == null) {
+      setBlockCount(getBlockCount() + 1);
+      AbfsBlock activeBlock = new AbfsBlobBlock(getAbfsOutputStream(), 
position);
+      activeBlock.setBlockEntry(addNewEntry(activeBlock.getBlockId(), 
activeBlock.getOffset()));
+      setActiveBlock(activeBlock);
+    }
+    return getActiveBlock();
+  }
+
+  /**
+   * Returns block id's which are committed for the blob.
+   *
+   * @param tracingContext Tracing context object.
+   * @return list of committed block id's.
+   * @throws AzureBlobFileSystemException if an error occurs
+   */
+  private List<String> getBlockList(TracingContext tracingContext)
+      throws AzureBlobFileSystemException {
+    List<String> committedBlockIdList = new ArrayList<>();
+    AbfsBlobClient blobClient = 
getAbfsOutputStream().getClientHandler().getBlobClient();
+    final AbfsRestOperation op = blobClient
+        .getBlockList(getAbfsOutputStream().getPath(), tracingContext);
+    if (op != null && op.getResult() != null) {
+      committedBlockIdList = op.getResult().getBlockIdList();
+    }
+    return committedBlockIdList;
+  }
+
+  /**
+   * Adds a new block entry to the block entry list.
+   * The block entry is added only if the position of the new block
+   * is greater than the position of the last block in the list.
+   *
+   * @param blockId The ID of the new block to be added.
+   * @param position The position of the new block in the stream.
+   * @return The newly added {@link BlockEntry}.
+   * @throws IOException If the position of the new block is not greater than 
the last block in the list.
+   */
+  private synchronized BlockEntry addNewEntry(String blockId, long position) 
throws IOException {
+    if (!blockEntryList.isEmpty()) {
+      BlockEntry lastEntry = blockEntryList.getLast();
+      if (position <= lastEntry.getPosition()) {
+        throw new IOException("New block position " + position  + " must be 
greater than the last block position "
+            + lastEntry.getPosition() + " for path " + 
getAbfsOutputStream().getPath());
+      }
+    }
+    BlockEntry blockEntry = new BlockEntry(blockId, position, 
AbfsBlockStatus.NEW);
+    blockEntryList.addLast(blockEntry);
+    LOG.debug("Added block {} at position {} with status NEW.", blockId, 
position);
+    return blockEntry;
+  }
+
+  /**
+   * Updates the status of an existing block entry to SUCCESS.
+   * This method is used to mark a block as successfully processed.
+   *
+   * @param block The {@link AbfsBlock} whose status needs to be updated to 
SUCCESS.
+   */
+  protected synchronized void updateEntry(AbfsBlock block) {
+    BlockEntry blockEntry = block.getBlockEntry();
+    blockEntry.setStatus(AbfsBlockStatus.SUCCESS);
+    LOG.debug("Added block {} at position {} with status SUCCESS.", 
block.getBlockId(), blockEntry.getPosition());
+  }
+
+  /**
+   * Prepares the list of blocks to commit.
+   *
+   * @return whether we have some data to commit or not.
+   * @throws IOException if an I/O error occurs
+   */
+  protected boolean hasListToCommit() throws IOException {

Review Comment:
   since its protected, it can be overridden in future and can break this 
assumption unknowingly. Since "synchronized" reentrant by nature, its better to 
make this method also "synchronized"





> ABFS: [FnsOverBlob] Implementing Ingress Support with various Fallback 
> Handling
> -------------------------------------------------------------------------------
>
>                 Key: HADOOP-19232
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19232
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.4.0
>            Reporter: Anuj Modi
>            Assignee: Anmol Asrani
>            Priority: Major
>              Labels: pull-request-available
>
> Scope of this task is to refactor the AbfsOutputStream class to handle the 
> ingress for DFS and Blob endpoint effectively.
> More details will be added soon.
> Perquisites for this Patch:
> 1. [HADOOP-19187] ABFS: [FnsOverBlob]Making AbfsClient Abstract for 
> supporting both DFS and Blob Endpoint - ASF JIRA (apache.org)
> 2. [HADOOP-19226] ABFS: [FnsOverBlob]Implementing Azure Rest APIs on Blob 
> Endpoint for AbfsBlobClient - ASF JIRA (apache.org)
> 3. [HADOOP-19207] ABFS: [FnsOverBlob]Response Handling of Blob Endpoint APIs 
> and Metadata APIs - ASF JIRA (apache.org)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to