[ 
https://issues.apache.org/jira/browse/HADOOP-19232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17921203#comment-17921203
 ] 

ASF GitHub Bot commented on HADOOP-19232:
-----------------------------------------

rakeshadr commented on code in PR #7272:
URL: https://github.com/apache/hadoop/pull/7272#discussion_r1928399881


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java:
##########
@@ -145,6 +145,11 @@ public final class FileSystemConfigurations {
    */
   public static final int BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT = 20;
 
+  /**
+   * Length of the block ID used for appends.

Review Comment:
   Whats the reason for choosing the value 60, looks to me a magic number. Why 
not provided config option for this param?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobBlock.java:
##########
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.commons.codec.binary.Base64;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_ID_LENGTH;
+
+public class AbfsBlobBlock extends AbfsBlock {

Review Comment:
   Add class-level javadoc



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java:
##########
@@ -611,6 +818,54 @@ public AbfsRestOperation append(final String path,
     return op;
   }
 
+  /**
+   * Appends a block to an append blob.
+   * <a href="../../../../site/markdown/blobEndpoint.md#append-block">Append 
Block</a>.
+   *
+   * @param path the path of the append blob.
+   * @param requestParameters the parameters for the append request.
+   * @param data the data to be appended.
+   * @param tracingContext the tracing context.
+   * @return the executed rest operation containing the response from the 
server.
+   * @throws AzureBlobFileSystemException if the rest operation fails.
+   */
+  public AbfsRestOperation appendBlock(final String path,
+      AppendRequestParameters requestParameters,
+      final byte[] data,
+      final TracingContext tracingContext) throws AzureBlobFileSystemException 
{
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+    requestHeaders.add(new AbfsHttpHeader(CONTENT_LENGTH, 
String.valueOf(data.length)));
+    requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_TYPE, APPEND_BLOB_TYPE));
+    if (requestParameters.getLeaseId() != null) {
+      requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, 
requestParameters.getLeaseId()));
+    }
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, APPEND_BLOCK);
+    String sasTokenForReuse = appendSASTokenToQuery(path, 
SASTokenProvider.FIXED_SAS_STORE_OPERATION, abfsUriQueryBuilder);
+
+    final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = getAbfsRestOperation(
+        AbfsRestOperationType.AppendBlock,
+        HTTP_METHOD_PUT,
+        url,
+        requestHeaders,
+        data,
+        requestParameters.getoffset(),
+        requestParameters.getLength(),
+        sasTokenForReuse);
+
+    try {
+      op.execute(tracingContext);
+    } catch (AzureBlobFileSystemException ex) {
+      // If we have no HTTP response, throw the original exception.
+      if (!op.hasResult()) {
+        throw ex;

Review Comment:
   Please add debug log message for better troubleshooting.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java:
##########
@@ -377,8 +390,199 @@ public AbfsRestOperation createPath(final String path,
       final String eTag,
       final ContextEncryptionAdapter contextEncryptionAdapter,
       final TracingContext tracingContext) throws AzureBlobFileSystemException 
{
-    // TODO: [FnsOverBlob][HADOOP-19232] To be implemented as part of ingress 
support.
-    throw new NotImplementedException("Create Path operation on Blob endpoint 
yet to be implemented.");
+    return createPath(path, isFile, overwrite, permissions, isAppendBlob, eTag,
+        contextEncryptionAdapter, tracingContext, false);
+  }
+
+  /**
+   * Get Rest Operation for API
+   * <a 
href="https://learn.microsoft.com/en-us/rest/api/storageservices/put-blob";>Put 
Blob</a>.
+   * Creates a file or directory (marker file) at the specified path.
+   *
+   * @param path the path of the directory to be created.
+   * @param isFile whether the path is a file.
+   * @param overwrite whether to overwrite if the path already exists.
+   * @param permissions the permissions to set on the path.
+   * @param isAppendBlob whether the path is an append blob.
+   * @param eTag the eTag of the path.
+   * @param contextEncryptionAdapter the context encryption adapter.
+   * @param tracingContext the tracing context.
+   * @param isCreateCalledFromMarkers whether the create is called from 
markers.
+   * @return the executed rest operation containing the response from the 
server.
+   * @throws AzureBlobFileSystemException if the rest operation fails.
+   */
+  public AbfsRestOperation createPath(final String path,
+      final boolean isFile,
+      final boolean overwrite,
+      final AzureBlobFileSystemStore.Permissions permissions,
+      final boolean isAppendBlob,
+      final String eTag,
+      final ContextEncryptionAdapter contextEncryptionAdapter,
+      final TracingContext tracingContext,
+      boolean isCreateCalledFromMarkers) throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+    if (!getIsNamespaceEnabled() && !isCreateCalledFromMarkers) {
+      AbfsHttpOperation op1Result = null;
+      try {
+        op1Result = getPathStatus(path, tracingContext,
+            null, true).getResult();
+      } catch (AbfsRestOperationException ex) {
+        if (ex.getStatusCode() == HTTP_NOT_FOUND) {
+          LOG.debug("No directory/path found: {}", path);
+        } else {
+          throw ex;
+        }
+      }
+      if (op1Result != null) {
+        boolean isDir = checkIsDir(op1Result);
+        if (isFile == isDir) {
+          throw new AbfsRestOperationException(HTTP_CONFLICT,
+              AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
+              PATH_EXISTS,
+              null);
+        }
+      }
+      Path parentPath = new Path(path).getParent();
+      if (parentPath != null && !parentPath.isRoot()) {
+        createMarkers(parentPath, overwrite, permissions, isAppendBlob, eTag,
+            contextEncryptionAdapter, tracingContext);
+      }
+    }
+    if (isFile) {
+      addEncryptionKeyRequestHeaders(path, requestHeaders, true,
+          contextEncryptionAdapter, tracingContext);
+    } else {
+      requestHeaders.add(new AbfsHttpHeader(X_MS_META_HDI_ISFOLDER, TRUE));
+    }
+    requestHeaders.add(new AbfsHttpHeader(CONTENT_LENGTH, ZERO));
+    if (isAppendBlob) {
+      requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_TYPE, APPEND_BLOB_TYPE));
+    } else {
+      requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_TYPE, BLOCK_BLOB_TYPE));
+    }
+    if (!overwrite) {
+      requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, 
AbfsHttpConstants.STAR));
+    }
+    if (eTag != null && !eTag.isEmpty()) {
+      requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, 
eTag));
+    }
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
+    appendSASTokenToQuery(path, SASTokenProvider.FIXED_SAS_STORE_OPERATION, 
abfsUriQueryBuilder);
+
+    final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = getAbfsRestOperation(
+        AbfsRestOperationType.PutBlob,
+        HTTP_METHOD_PUT, url, requestHeaders);
+    try {
+      op.execute(tracingContext);
+    } catch (AzureBlobFileSystemException ex) {
+      // If we have no HTTP response, throw the original exception.
+      if (!op.hasResult()) {
+        throw ex;
+      }
+      if (!isFile && op.getResult().getStatusCode() == HTTP_CONFLICT) {
+        // This ensures that we don't throw ex only for existing directory but 
if a blob exists we throw exception.
+        AbfsHttpOperation opResult = null;
+        try {
+          opResult = this.getPathStatus(path, true, tracingContext, 
null).getResult();
+        } catch (AbfsRestOperationException e) {
+          if (opResult != null) {
+            throw e;
+          }
+        }
+        if (opResult != null && checkIsDir(opResult)) {
+          return op;
+        }
+      }
+      throw ex;
+    }
+    return op;
+  }
+
+  /**
+   *  Creates marker blobs for the parent directories of the specified path.
+   *
+   * @param path The path for which parent directories need to be created.
+   * @param overwrite A flag indicating whether existing directories should be 
overwritten.
+   * @param permissions The permissions to be set for the created directories.
+   * @param isAppendBlob A flag indicating whether the created blob should be 
of type APPEND_BLOB.
+   * @param eTag The eTag to be matched for conditional requests.
+   * @param contextEncryptionAdapter The encryption adapter for context 
encryption.
+   * @param tracingContext The tracing context for the operation.
+   * @throws AzureBlobFileSystemException If the creation of any parent 
directory fails.
+   */
+  private void createMarkers(final Path path,
+      final boolean overwrite,
+      final AzureBlobFileSystemStore.Permissions permissions,
+      final boolean isAppendBlob,
+      final String eTag,
+      final ContextEncryptionAdapter contextEncryptionAdapter,
+      final TracingContext tracingContext) throws AzureBlobFileSystemException 
{
+    ArrayList<Path> keysToCreateAsFolder = new ArrayList<>();
+    checkParentChainForFile(path, tracingContext,
+        keysToCreateAsFolder);
+    for (Path pathToCreate : keysToCreateAsFolder) {
+      createPath(pathToCreate.toUri().getPath(), false, overwrite, permissions,
+          isAppendBlob, eTag, contextEncryptionAdapter, tracingContext, true);
+    }
+  }
+
+  /**
+   * Checks for the entire parent hierarchy and returns if any directory 
exists and
+   * throws an exception if any file exists.
+   * @param path path to check the hierarchy for.
+   * @param tracingContext the tracingcontext.
+   */
+  private void checkParentChainForFile(Path path, TracingContext 
tracingContext,
+      List<Path> keysToCreateAsFolder) throws AzureBlobFileSystemException {
+    AbfsHttpOperation opResult = null;
+    try {
+      opResult = getPathStatus(path.toUri().getPath(),
+          tracingContext, null, false).getResult();
+    } catch (AbfsRestOperationException ex) {
+      if (ex.getStatusCode() == HTTP_NOT_FOUND) {
+        LOG.debug("No explicit directory/path found: {}", path);
+      } else {
+        throw ex;

Review Comment:
   Here also, Please add debug log message for better troubleshooting.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java:
##########
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.store.DataBlocks;
+
+/**
+ * Manages Azure Blob blocks for append operations.
+ */
+public class AzureBlobBlockManager extends AzureBlockManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsOutputStream.class);
+
+
+  /** The list of already committed blocks is stored in this list. */
+  private List<String> committedBlockEntries = new ArrayList<>();
+
+  /** The list to store blockId, position, and status. */
+  private final LinkedList<BlockEntry> blockEntryList = new LinkedList<>();
+
+
+  /**
+   * Constructs an AzureBlobBlockManager.
+   *
+   * @param abfsOutputStream the output stream
+   * @param blockFactory the block factory
+   * @param bufferSize the buffer size
+   * @throws AzureBlobFileSystemException if an error occurs
+   */
+  public AzureBlobBlockManager(AbfsOutputStream abfsOutputStream,
+      DataBlocks.BlockFactory blockFactory,
+      int bufferSize)
+      throws AzureBlobFileSystemException {
+    super(abfsOutputStream, blockFactory, bufferSize);
+    if (abfsOutputStream.getPosition() > 0 && 
!abfsOutputStream.isAppendBlob()) {
+      this.committedBlockEntries = 
getBlockList(abfsOutputStream.getTracingContext());
+    }
+    LOG.debug("Created a new Blob Block Manager for AbfsOutputStream instance 
{} for path {}",
+        abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
+  }
+
+  /**
+   * Creates a new block.
+   *
+   * @param position the position
+   * @return the created block
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  protected synchronized AbfsBlock createBlockInternal(long position)
+      throws IOException {
+    if (getActiveBlock() == null) {
+      setBlockCount(getBlockCount() + 1);
+      AbfsBlock activeBlock = new AbfsBlobBlock(getAbfsOutputStream(), 
position);
+      activeBlock.setBlockEntry(addNewEntry(activeBlock.getBlockId(), 
activeBlock.getOffset()));
+      setActiveBlock(activeBlock);
+    }
+    return getActiveBlock();
+  }
+
+  /**
+   * Returns block id's which are committed for the blob.
+   *
+   * @param tracingContext Tracing context object.
+   * @return list of committed block id's.
+   * @throws AzureBlobFileSystemException if an error occurs
+   */
+  private List<String> getBlockList(TracingContext tracingContext)
+      throws AzureBlobFileSystemException {
+    List<String> committedBlockIdList = new ArrayList<>();
+    AbfsBlobClient blobClient = 
getAbfsOutputStream().getClientHandler().getBlobClient();
+    final AbfsRestOperation op = blobClient
+        .getBlockList(getAbfsOutputStream().getPath(), tracingContext);
+    if (op != null && op.getResult() != null) {
+      committedBlockIdList = op.getResult().getBlockIdList();
+    }
+    return committedBlockIdList;
+  }
+
+  /**
+   * Adds a new block entry to the block entry list.
+   * The block entry is added only if the position of the new block
+   * is greater than the position of the last block in the list.
+   *
+   * @param blockId The ID of the new block to be added.
+   * @param position The position of the new block in the stream.
+   * @return The newly added {@link BlockEntry}.
+   * @throws IOException If the position of the new block is not greater than 
the last block in the list.
+   */
+  private synchronized BlockEntry addNewEntry(String blockId, long position) 
throws IOException {
+    if (!blockEntryList.isEmpty()) {
+      BlockEntry lastEntry = blockEntryList.getLast();
+      if (position <= lastEntry.getPosition()) {
+        throw new IOException("New block position " + position  + " must be 
greater than the last block position "
+            + lastEntry.getPosition() + " for path " + 
getAbfsOutputStream().getPath());
+      }
+    }
+    BlockEntry blockEntry = new BlockEntry(blockId, position, 
AbfsBlockStatus.NEW);
+    blockEntryList.addLast(blockEntry);
+    LOG.debug("Added block {} at position {} with status NEW.", blockId, 
position);
+    return blockEntry;
+  }
+
+  /**
+   * Updates the status of an existing block entry to SUCCESS.
+   * This method is used to mark a block as successfully processed.
+   *
+   * @param block The {@link AbfsBlock} whose status needs to be updated to 
SUCCESS.
+   */
+  protected synchronized void updateEntry(AbfsBlock block) {
+    BlockEntry blockEntry = block.getBlockEntry();
+    blockEntry.setStatus(AbfsBlockStatus.SUCCESS);
+    LOG.debug("Added block {} at position {} with status SUCCESS.", 
block.getBlockId(), blockEntry.getPosition());
+  }
+
+  /**
+   * Prepares the list of blocks to commit.
+   *
+   * @return whether we have some data to commit or not.
+   * @throws IOException if an I/O error occurs
+   */
+  protected boolean hasListToCommit() throws IOException {
+    // Adds all the committed blocks if available to the list of blocks to be 
added in putBlockList.
+    if (blockEntryList.isEmpty()) {
+      return false; // No entries to commit
+    }
+    while (!blockEntryList.isEmpty()) {
+      BlockEntry current = blockEntryList.poll();
+      if (current.getStatus() != AbfsBlockStatus.SUCCESS) {
+        LOG.debug(
+            "Block {} with position {} has status {}, flush cannot proceed.",
+            current.getBlockId(), current.getPosition(), current.getStatus());
+        throw new IOException("Flush failed. Block " + current.getBlockId()
+            + " with position " + current.getPosition() + " has status "
+            + current.getStatus() + "for path " + 
getAbfsOutputStream().getPath());
+      }
+      if (!blockEntryList.isEmpty()) {
+        BlockEntry next = blockEntryList.getFirst();
+        if (current.getPosition() >= next.getPosition()) {
+          String errorMessage =
+              "Position check failed. Current block position is greater than 
or equal to the next block's position.\n"
+                  + "Current Block Entry:\n"
+                  + "Block ID: " + current.getBlockId()
+                  + ", Position: " + current.getPosition()
+                  + ", Status: " + current.getStatus()
+                  + ", Path: " + getAbfsOutputStream().getPath()
+                  + ", StreamID: " + getAbfsOutputStream().getStreamID()
+                  + ", Next block position: " + next.getPosition()
+                  + "\n";
+          throw new IOException(errorMessage);
+        }
+      }
+      committedBlockEntries.add(current.getBlockId());

Review Comment:
   `committedBlockEntries `is growing and adding entries into this list. When 
do you clean this list. Otw, it would leads to OOME, right?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java:
##########
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidIngressServiceException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import 
org.apache.hadoop.fs.azurebfs.contracts.services.BlobAppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.store.DataBlocks;
+import org.apache.hadoop.io.IOUtils;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOB_APPEND;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOB_FLUSH;
+import static 
org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient.generateBlockListXml;
+
+public class AzureBlobIngressHandler extends AzureIngressHandler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsOutputStream.class);
+
+  private volatile String eTag;
+
+  private final AzureBlobBlockManager blobBlockManager;
+
+  private final AbfsBlobClient blobClient;
+
+  private final AbfsClientHandler clientHandler;
+
+  /**
+   * Constructs an AzureBlobIngressHandler.
+   *
+   * @param abfsOutputStream the AbfsOutputStream.
+   * @param blockFactory the block factory.
+   * @param bufferSize the buffer size.
+   * @param eTag the eTag.
+   * @param clientHandler the client handler.
+   * @param blockManager the block manager.
+   * @throws AzureBlobFileSystemException if an error occurs.
+   */
+  public AzureBlobIngressHandler(AbfsOutputStream abfsOutputStream,
+      DataBlocks.BlockFactory blockFactory,
+      int bufferSize, String eTag, AbfsClientHandler clientHandler, 
AzureBlockManager blockManager)
+      throws AzureBlobFileSystemException {
+    super(abfsOutputStream);
+    this.eTag = eTag;
+    if (blockManager instanceof AzureBlobBlockManager) {
+      this.blobBlockManager = (AzureBlobBlockManager) blockManager;
+    } else {
+      this.blobBlockManager = new AzureBlobBlockManager(abfsOutputStream,
+          blockFactory, bufferSize);
+    }
+    this.clientHandler = clientHandler;
+    this.blobClient = clientHandler.getBlobClient();
+    LOG.trace("Created a new BlobIngress Handler for AbfsOutputStream instance 
{} for path {}",
+        abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
+  }
+
+  /**
+   * Buffers data into the specified block.
+   *
+   * @param block the block to buffer data into.
+   * @param data  the data to be buffered.
+   * @param off   the start offset in the data.
+   * @param length the number of bytes to buffer.
+   * @return the number of bytes buffered.
+   * @throws IOException if an I/O error occurs.
+   */
+  @Override
+  protected int bufferData(AbfsBlock block,
+      final byte[] data,
+      final int off,
+      final int length)
+      throws IOException {
+    LOG.trace("Buffering data of length {} to block at offset {}", length, 
off);
+    return block.write(data, off, length);
+  }
+
+  /**
+   * Performs a remote write operation.
+   *
+   * @param blockToUpload the block to upload.
+   * @param uploadData    the data to upload.
+   * @param reqParams     the request parameters.
+   * @param tracingContext the tracing context.
+   * @return the resulting AbfsRestOperation.
+   * @throws IOException if an I/O error occurs.
+   */
+  @Override
+  protected AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
+      DataBlocks.BlockUploadData uploadData,
+      AppendRequestParameters reqParams,
+      TracingContext tracingContext)
+      throws IOException {
+    BlobAppendRequestParameters blobParams = new 
BlobAppendRequestParameters(blockToUpload.getBlockId(), getETag());
+    reqParams.setBlobParams(blobParams);
+    AbfsRestOperation op;
+    String threadIdStr = String.valueOf(Thread.currentThread().getId());
+    TracingContext tracingContextAppend = new TracingContext(tracingContext);
+    tracingContextAppend.setIngressHandler(BLOB_APPEND + " T " + threadIdStr);
+    
tracingContextAppend.setPosition(String.valueOf(blockToUpload.getOffset()));
+    try {
+      LOG.trace("Starting remote write for block with ID {} and offset {}",
+          blockToUpload.getBlockId(), blockToUpload.getOffset());
+      op = getClient().append(getAbfsOutputStream().getPath(), 
uploadData.toByteArray(),
+          reqParams,
+          getAbfsOutputStream().getCachedSasTokenString(),
+          getAbfsOutputStream().getContextEncryptionAdapter(),
+          tracingContextAppend);
+      blobBlockManager.updateEntry(blockToUpload);
+    } catch (AbfsRestOperationException ex) {
+      LOG.error("Error in remote write requiring handler switch for path {}", 
getAbfsOutputStream().getPath(), ex);
+      if (shouldIngressHandlerBeSwitched(ex)) {
+        throw getIngressHandlerSwitchException(ex);
+      }
+      LOG.error("Error in remote write for path {} and offset {}", 
getAbfsOutputStream().getPath(),
+          blockToUpload.getOffset(), ex);
+      throw ex;
+    }
+    return op;
+  }
+
+  /**
+   * Flushes data to the remote store.
+   *
+   * @param offset               the offset to flush.
+   * @param retainUncommitedData whether to retain uncommitted data.
+   * @param isClose              whether this is a close operation.
+   * @param leaseId              the lease ID.
+   * @param tracingContext       the tracing context.
+   * @return the resulting AbfsRestOperation.
+   * @throws IOException if an I/O error occurs.
+   */
+  @Override
+  protected synchronized AbfsRestOperation remoteFlush(final long offset,
+      final boolean retainUncommitedData,
+      final boolean isClose,
+      final String leaseId,
+      TracingContext tracingContext)
+      throws IOException {
+    AbfsRestOperation op;
+    if (getAbfsOutputStream().isAppendBlob()) {
+      return null;
+    }
+    if (!blobBlockManager.hasListToCommit()) {
+      return null;
+    }
+    try {
+      // Generate the xml with the list of blockId's to generate putBlockList 
call.
+      String blockListXml = generateBlockListXml(
+          blobBlockManager.getBlockIdList());
+      TracingContext tracingContextFlush = new TracingContext(tracingContext);
+      tracingContextFlush.setIngressHandler(BLOB_FLUSH);
+      tracingContextFlush.setPosition(String.valueOf(offset));
+      LOG.trace("Flushing data at offset {} for path {}", offset, 
getAbfsOutputStream().getPath());
+      op = getClient().flush(blockListXml.getBytes(StandardCharsets.UTF_8),
+          getAbfsOutputStream().getPath(),
+          isClose, getAbfsOutputStream().getCachedSasTokenString(), leaseId,
+          getETag(), getAbfsOutputStream().getContextEncryptionAdapter(), 
tracingContextFlush);
+      setETag(op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG));
+    } catch (AbfsRestOperationException ex) {
+      LOG.error("Error in remote flush requiring handler switch for path {}", 
getAbfsOutputStream().getPath(), ex);
+      if (shouldIngressHandlerBeSwitched(ex)) {
+        throw getIngressHandlerSwitchException(ex);
+      }
+      LOG.error("Error in remote flush for path {} and offset {}", 
getAbfsOutputStream().getPath(), offset, ex);
+      throw ex;
+    }
+    return op;
+  }
+
+  /**
+   * Method to perform a remote write operation for appending data to an 
append blob in Azure Blob Storage.
+   *
+   * <p>This method is intended to be implemented by subclasses to handle the 
specific
+   * case of appending data to an append blob. It takes in the path of the 
append blob,
+   * the data to be uploaded, the block of data, and additional parameters 
required for
+   * the append operation.</p>
+   *
+   * @param path           The path of the append blob to which data is to be 
appended.
+   * @param uploadData     The data to be uploaded as part of the append 
operation.
+   * @param block          The block of data to append.
+   * @param reqParams      The additional parameters required for the append 
operation.
+   * @param tracingContext The tracing context for the operation.
+   * @return An {@link AbfsRestOperation} object representing the remote write 
operation.
+   * @throws IOException If an I/O error occurs during the append operation.
+   */
+  protected AbfsRestOperation remoteAppendBlobWrite(String path,
+      DataBlocks.BlockUploadData uploadData,
+      AbfsBlock block,
+      AppendRequestParameters reqParams,
+      TracingContext tracingContext) throws IOException {
+    // Perform the remote append operation using the blob client.
+    AbfsRestOperation op = null;
+    try {
+      op = blobClient.appendBlock(path, reqParams, uploadData.toByteArray(), 
tracingContext);
+    } catch (AbfsRestOperationException ex) {
+      LOG.error("Error in remote write requiring handler switch for path {}",
+          getAbfsOutputStream().getPath(), ex);
+      if (shouldIngressHandlerBeSwitched(ex)) {
+        throw getIngressHandlerSwitchException(ex);
+      }
+      LOG.error("Error in remote write for path {} and offset {}",
+          getAbfsOutputStream().getPath(),
+          block.getOffset(), ex);
+      throw ex;
+    }
+    return op;
+  }
+
+  /**
+   * Sets the eTag of the blob.
+   *
+   * @param eTag the eTag to set.
+   */
+  synchronized void setETag(String eTag) {

Review Comment:
   Whats the reasoning for synchronized here?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java:
##########
@@ -328,7 +337,7 @@ public AbfsRestOperation listPath(final String 
relativePath, final boolean recur
       abfsUriQueryBuilder.addQuery(QUERY_PARAM_DELIMITER, FORWARD_SLASH);
     }
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAX_RESULTS, 
String.valueOf(listMaxResults));
-    appendSASTokenToQuery(relativePath, SASTokenProvider.LIST_OPERATION, 
abfsUriQueryBuilder);
+    appendSASTokenToQuery(relativePath, 
SASTokenProvider.FIXED_SAS_STORE_OPERATION, abfsUriQueryBuilder);

Review Comment:
   Why do we change "LIST_OPERATION" to "FIXED_SAS_STORE_OPERATION"



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java:
##########
@@ -1462,6 +1462,19 @@ void setIsNamespaceEnabledAccount(String 
isNamespaceEnabledAccount) {
     this.isNamespaceEnabledAccount = isNamespaceEnabledAccount;
   }
 
+  /**
+   * Checks if the FixedSASTokenProvider is configured for the current account.
+   *
+   * @return true if the FixedSASTokenProvider is configured, false otherwise.
+   */
+  public boolean isFixedSASTokenProviderConfigured() {
+    try {
+      return getSASTokenProvider() instanceof FixedSASTokenProvider;
+    } catch (AzureBlobFileSystemException e) {
+      return false;

Review Comment:
   Please add a debug log message with exception trace for better 
troubleshooting



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java:
##########
@@ -524,10 +524,10 @@ public boolean isDfsToBlobFallbackEnabled() {
   public void validateConfiguredServiceType(boolean isHNSEnabled)
       throws InvalidConfigurationValueException {
     // TODO: [FnsOverBlob][HADOOP-19179] Remove this check when FNS over Blob 
is ready.
-    if (getFsConfiguredServiceType() == AbfsServiceType.BLOB) {
-      throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY,
-          "Blob Endpoint Support not yet available");
-    }
+//    if (getFsConfiguredServiceType() == AbfsServiceType.BLOB) {

Review Comment:
   Please remove commented lines



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java:
##########
@@ -234,6 +237,29 @@ public void initialize(URI uri, Configuration 
configuration)
       throw new 
InvalidConfigurationValueException(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
     }
 
+    /*
+     * Validates if the correct SAS Token provider is configured for non-HNS 
accounts.
+     * For non-HNS accounts, if the authentication type is set to SAS, only a 
fixed SAS Token is supported as of now.
+     * A custom SAS Token Provider should not be configured in such cases, as 
it will override the FixedSASTokenProvider and render it unused.
+     * If the namespace is not enabled and the FixedSASTokenProvider is not 
configured,
+     * an InvalidConfigurationValueException will be thrown.
+     *
+     * @throws InvalidConfigurationValueException if account is not namespace 
enabled and FixedSASTokenProvider is not configured.
+     */
+    try {
+      if (abfsConfiguration.getAuthType(abfsConfiguration.getAccountName()) == 
AuthType.SAS && // Auth type is SAS
+          !tryGetIsNamespaceEnabled(new TracingContext(initFSTracingContext)) 
&& // Account is FNS

Review Comment:
   `tryGetIsNamespaceEnabled(new TracingContext(initFSTracingContext)` being 
used or invoked three times in the #initialize method. You can create a 
variable and reuse it, right?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java:
##########
@@ -889,7 +1144,7 @@ public AbfsRestOperation read(final String path,
     }
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
-    String sasTokenForReuse = appendSASTokenToQuery(path, 
SASTokenProvider.READ_OPERATION,
+    String sasTokenForReuse = appendSASTokenToQuery(path, 
SASTokenProvider.FIXED_SAS_STORE_OPERATION,

Review Comment:
   Similar to the above LIST case, could you explain the reason for replacing 
everywhere "FIXED_SAS_STORE_OPERATION"



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java:
##########
@@ -152,4 +161,16 @@ public void setRetryDueToExpect(boolean retryDueToExpect) {
   public void setExpectHeaderEnabled(boolean expectHeaderEnabled) {
     isExpectHeaderEnabled = expectHeaderEnabled;
   }
+
+  public void setBlockId(final String blockId) {

Review Comment:
   Please add javadoc for the public methods.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java:
##########
@@ -377,8 +390,199 @@ public AbfsRestOperation createPath(final String path,
       final String eTag,
       final ContextEncryptionAdapter contextEncryptionAdapter,
       final TracingContext tracingContext) throws AzureBlobFileSystemException 
{
-    // TODO: [FnsOverBlob][HADOOP-19232] To be implemented as part of ingress 
support.
-    throw new NotImplementedException("Create Path operation on Blob endpoint 
yet to be implemented.");
+    return createPath(path, isFile, overwrite, permissions, isAppendBlob, eTag,
+        contextEncryptionAdapter, tracingContext, false);
+  }
+
+  /**
+   * Get Rest Operation for API
+   * <a 
href="https://learn.microsoft.com/en-us/rest/api/storageservices/put-blob";>Put 
Blob</a>.
+   * Creates a file or directory (marker file) at the specified path.
+   *
+   * @param path the path of the directory to be created.
+   * @param isFile whether the path is a file.
+   * @param overwrite whether to overwrite if the path already exists.
+   * @param permissions the permissions to set on the path.
+   * @param isAppendBlob whether the path is an append blob.
+   * @param eTag the eTag of the path.
+   * @param contextEncryptionAdapter the context encryption adapter.
+   * @param tracingContext the tracing context.
+   * @param isCreateCalledFromMarkers whether the create is called from 
markers.
+   * @return the executed rest operation containing the response from the 
server.
+   * @throws AzureBlobFileSystemException if the rest operation fails.
+   */
+  public AbfsRestOperation createPath(final String path,
+      final boolean isFile,
+      final boolean overwrite,
+      final AzureBlobFileSystemStore.Permissions permissions,
+      final boolean isAppendBlob,
+      final String eTag,
+      final ContextEncryptionAdapter contextEncryptionAdapter,
+      final TracingContext tracingContext,
+      boolean isCreateCalledFromMarkers) throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+    if (!getIsNamespaceEnabled() && !isCreateCalledFromMarkers) {
+      AbfsHttpOperation op1Result = null;
+      try {
+        op1Result = getPathStatus(path, tracingContext,
+            null, true).getResult();
+      } catch (AbfsRestOperationException ex) {
+        if (ex.getStatusCode() == HTTP_NOT_FOUND) {
+          LOG.debug("No directory/path found: {}", path);
+        } else {
+          throw ex;

Review Comment:
   Please add a debug log message for better troubleshooting



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java:
##########
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.store.DataBlocks;
+
+/**
+ * Manages Azure Blob blocks for append operations.
+ */
+public class AzureBlobBlockManager extends AzureBlockManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsOutputStream.class);

Review Comment:
   logger is not correct. Please use AzureBlobBlockManager .class instead



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java:
##########
@@ -377,8 +390,199 @@ public AbfsRestOperation createPath(final String path,
       final String eTag,
       final ContextEncryptionAdapter contextEncryptionAdapter,
       final TracingContext tracingContext) throws AzureBlobFileSystemException 
{
-    // TODO: [FnsOverBlob][HADOOP-19232] To be implemented as part of ingress 
support.
-    throw new NotImplementedException("Create Path operation on Blob endpoint 
yet to be implemented.");
+    return createPath(path, isFile, overwrite, permissions, isAppendBlob, eTag,
+        contextEncryptionAdapter, tracingContext, false);
+  }
+
+  /**
+   * Get Rest Operation for API
+   * <a 
href="https://learn.microsoft.com/en-us/rest/api/storageservices/put-blob";>Put 
Blob</a>.
+   * Creates a file or directory (marker file) at the specified path.
+   *
+   * @param path the path of the directory to be created.
+   * @param isFile whether the path is a file.
+   * @param overwrite whether to overwrite if the path already exists.
+   * @param permissions the permissions to set on the path.
+   * @param isAppendBlob whether the path is an append blob.
+   * @param eTag the eTag of the path.
+   * @param contextEncryptionAdapter the context encryption adapter.
+   * @param tracingContext the tracing context.
+   * @param isCreateCalledFromMarkers whether the create is called from 
markers.
+   * @return the executed rest operation containing the response from the 
server.
+   * @throws AzureBlobFileSystemException if the rest operation fails.
+   */
+  public AbfsRestOperation createPath(final String path,
+      final boolean isFile,
+      final boolean overwrite,
+      final AzureBlobFileSystemStore.Permissions permissions,
+      final boolean isAppendBlob,
+      final String eTag,
+      final ContextEncryptionAdapter contextEncryptionAdapter,
+      final TracingContext tracingContext,
+      boolean isCreateCalledFromMarkers) throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+    if (!getIsNamespaceEnabled() && !isCreateCalledFromMarkers) {
+      AbfsHttpOperation op1Result = null;
+      try {
+        op1Result = getPathStatus(path, tracingContext,
+            null, true).getResult();
+      } catch (AbfsRestOperationException ex) {
+        if (ex.getStatusCode() == HTTP_NOT_FOUND) {
+          LOG.debug("No directory/path found: {}", path);
+        } else {
+          throw ex;
+        }
+      }
+      if (op1Result != null) {
+        boolean isDir = checkIsDir(op1Result);
+        if (isFile == isDir) {
+          throw new AbfsRestOperationException(HTTP_CONFLICT,
+              AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
+              PATH_EXISTS,
+              null);
+        }
+      }
+      Path parentPath = new Path(path).getParent();
+      if (parentPath != null && !parentPath.isRoot()) {
+        createMarkers(parentPath, overwrite, permissions, isAppendBlob, eTag,
+            contextEncryptionAdapter, tracingContext);
+      }
+    }
+    if (isFile) {
+      addEncryptionKeyRequestHeaders(path, requestHeaders, true,
+          contextEncryptionAdapter, tracingContext);
+    } else {
+      requestHeaders.add(new AbfsHttpHeader(X_MS_META_HDI_ISFOLDER, TRUE));
+    }
+    requestHeaders.add(new AbfsHttpHeader(CONTENT_LENGTH, ZERO));
+    if (isAppendBlob) {
+      requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_TYPE, APPEND_BLOB_TYPE));
+    } else {
+      requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_TYPE, BLOCK_BLOB_TYPE));
+    }
+    if (!overwrite) {
+      requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, 
AbfsHttpConstants.STAR));
+    }
+    if (eTag != null && !eTag.isEmpty()) {
+      requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, 
eTag));
+    }
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
+    appendSASTokenToQuery(path, SASTokenProvider.FIXED_SAS_STORE_OPERATION, 
abfsUriQueryBuilder);
+
+    final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = getAbfsRestOperation(
+        AbfsRestOperationType.PutBlob,
+        HTTP_METHOD_PUT, url, requestHeaders);
+    try {
+      op.execute(tracingContext);
+    } catch (AzureBlobFileSystemException ex) {
+      // If we have no HTTP response, throw the original exception.
+      if (!op.hasResult()) {
+        throw ex;
+      }
+      if (!isFile && op.getResult().getStatusCode() == HTTP_CONFLICT) {
+        // This ensures that we don't throw ex only for existing directory but 
if a blob exists we throw exception.
+        AbfsHttpOperation opResult = null;
+        try {
+          opResult = this.getPathStatus(path, true, tracingContext, 
null).getResult();
+        } catch (AbfsRestOperationException e) {
+          if (opResult != null) {
+            throw e;

Review Comment:
   Please add debug log message for better troubleshooting.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java:
##########
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.store.DataBlocks;
+
+/**
+ * Manages Azure Blob blocks for append operations.
+ */
+public class AzureBlobBlockManager extends AzureBlockManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsOutputStream.class);
+
+
+  /** The list of already committed blocks is stored in this list. */
+  private List<String> committedBlockEntries = new ArrayList<>();
+
+  /** The list to store blockId, position, and status. */
+  private final LinkedList<BlockEntry> blockEntryList = new LinkedList<>();
+
+
+  /**
+   * Constructs an AzureBlobBlockManager.
+   *
+   * @param abfsOutputStream the output stream
+   * @param blockFactory the block factory
+   * @param bufferSize the buffer size
+   * @throws AzureBlobFileSystemException if an error occurs
+   */
+  public AzureBlobBlockManager(AbfsOutputStream abfsOutputStream,
+      DataBlocks.BlockFactory blockFactory,
+      int bufferSize)
+      throws AzureBlobFileSystemException {
+    super(abfsOutputStream, blockFactory, bufferSize);
+    if (abfsOutputStream.getPosition() > 0 && 
!abfsOutputStream.isAppendBlob()) {
+      this.committedBlockEntries = 
getBlockList(abfsOutputStream.getTracingContext());
+    }
+    LOG.debug("Created a new Blob Block Manager for AbfsOutputStream instance 
{} for path {}",
+        abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
+  }
+
+  /**
+   * Creates a new block.
+   *
+   * @param position the position
+   * @return the created block
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  protected synchronized AbfsBlock createBlockInternal(long position)
+      throws IOException {
+    if (getActiveBlock() == null) {
+      setBlockCount(getBlockCount() + 1);
+      AbfsBlock activeBlock = new AbfsBlobBlock(getAbfsOutputStream(), 
position);
+      activeBlock.setBlockEntry(addNewEntry(activeBlock.getBlockId(), 
activeBlock.getOffset()));
+      setActiveBlock(activeBlock);
+    }
+    return getActiveBlock();
+  }
+
+  /**
+   * Returns block id's which are committed for the blob.
+   *
+   * @param tracingContext Tracing context object.
+   * @return list of committed block id's.
+   * @throws AzureBlobFileSystemException if an error occurs
+   */
+  private List<String> getBlockList(TracingContext tracingContext)
+      throws AzureBlobFileSystemException {
+    List<String> committedBlockIdList = new ArrayList<>();
+    AbfsBlobClient blobClient = 
getAbfsOutputStream().getClientHandler().getBlobClient();
+    final AbfsRestOperation op = blobClient
+        .getBlockList(getAbfsOutputStream().getPath(), tracingContext);
+    if (op != null && op.getResult() != null) {
+      committedBlockIdList = op.getResult().getBlockIdList();
+    }
+    return committedBlockIdList;
+  }
+
+  /**
+   * Adds a new block entry to the block entry list.
+   * The block entry is added only if the position of the new block
+   * is greater than the position of the last block in the list.
+   *
+   * @param blockId The ID of the new block to be added.
+   * @param position The position of the new block in the stream.
+   * @return The newly added {@link BlockEntry}.
+   * @throws IOException If the position of the new block is not greater than 
the last block in the list.
+   */
+  private synchronized BlockEntry addNewEntry(String blockId, long position) 
throws IOException {
+    if (!blockEntryList.isEmpty()) {
+      BlockEntry lastEntry = blockEntryList.getLast();
+      if (position <= lastEntry.getPosition()) {
+        throw new IOException("New block position " + position  + " must be 
greater than the last block position "
+            + lastEntry.getPosition() + " for path " + 
getAbfsOutputStream().getPath());
+      }
+    }
+    BlockEntry blockEntry = new BlockEntry(blockId, position, 
AbfsBlockStatus.NEW);
+    blockEntryList.addLast(blockEntry);
+    LOG.debug("Added block {} at position {} with status NEW.", blockId, 
position);
+    return blockEntry;
+  }
+
+  /**
+   * Updates the status of an existing block entry to SUCCESS.
+   * This method is used to mark a block as successfully processed.
+   *
+   * @param block The {@link AbfsBlock} whose status needs to be updated to 
SUCCESS.
+   */
+  protected synchronized void updateEntry(AbfsBlock block) {
+    BlockEntry blockEntry = block.getBlockEntry();
+    blockEntry.setStatus(AbfsBlockStatus.SUCCESS);
+    LOG.debug("Added block {} at position {} with status SUCCESS.", 
block.getBlockId(), blockEntry.getPosition());
+  }
+
+  /**
+   * Prepares the list of blocks to commit.
+   *
+   * @return whether we have some data to commit or not.
+   * @throws IOException if an I/O error occurs
+   */
+  protected boolean hasListToCommit() throws IOException {

Review Comment:
   Should this method be synchronized ?and make it threadsafe ?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java:
##########
@@ -377,8 +390,199 @@ public AbfsRestOperation createPath(final String path,
       final String eTag,
       final ContextEncryptionAdapter contextEncryptionAdapter,
       final TracingContext tracingContext) throws AzureBlobFileSystemException 
{
-    // TODO: [FnsOverBlob][HADOOP-19232] To be implemented as part of ingress 
support.
-    throw new NotImplementedException("Create Path operation on Blob endpoint 
yet to be implemented.");
+    return createPath(path, isFile, overwrite, permissions, isAppendBlob, eTag,
+        contextEncryptionAdapter, tracingContext, false);
+  }
+
+  /**
+   * Get Rest Operation for API
+   * <a 
href="https://learn.microsoft.com/en-us/rest/api/storageservices/put-blob";>Put 
Blob</a>.
+   * Creates a file or directory (marker file) at the specified path.
+   *
+   * @param path the path of the directory to be created.
+   * @param isFile whether the path is a file.
+   * @param overwrite whether to overwrite if the path already exists.
+   * @param permissions the permissions to set on the path.
+   * @param isAppendBlob whether the path is an append blob.
+   * @param eTag the eTag of the path.
+   * @param contextEncryptionAdapter the context encryption adapter.
+   * @param tracingContext the tracing context.
+   * @param isCreateCalledFromMarkers whether the create is called from 
markers.
+   * @return the executed rest operation containing the response from the 
server.
+   * @throws AzureBlobFileSystemException if the rest operation fails.
+   */
+  public AbfsRestOperation createPath(final String path,
+      final boolean isFile,
+      final boolean overwrite,
+      final AzureBlobFileSystemStore.Permissions permissions,
+      final boolean isAppendBlob,
+      final String eTag,
+      final ContextEncryptionAdapter contextEncryptionAdapter,
+      final TracingContext tracingContext,
+      boolean isCreateCalledFromMarkers) throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+    if (!getIsNamespaceEnabled() && !isCreateCalledFromMarkers) {
+      AbfsHttpOperation op1Result = null;
+      try {
+        op1Result = getPathStatus(path, tracingContext,
+            null, true).getResult();
+      } catch (AbfsRestOperationException ex) {
+        if (ex.getStatusCode() == HTTP_NOT_FOUND) {
+          LOG.debug("No directory/path found: {}", path);
+        } else {
+          throw ex;
+        }
+      }
+      if (op1Result != null) {
+        boolean isDir = checkIsDir(op1Result);
+        if (isFile == isDir) {
+          throw new AbfsRestOperationException(HTTP_CONFLICT,
+              AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
+              PATH_EXISTS,
+              null);
+        }
+      }
+      Path parentPath = new Path(path).getParent();
+      if (parentPath != null && !parentPath.isRoot()) {
+        createMarkers(parentPath, overwrite, permissions, isAppendBlob, eTag,
+            contextEncryptionAdapter, tracingContext);
+      }
+    }
+    if (isFile) {
+      addEncryptionKeyRequestHeaders(path, requestHeaders, true,
+          contextEncryptionAdapter, tracingContext);
+    } else {
+      requestHeaders.add(new AbfsHttpHeader(X_MS_META_HDI_ISFOLDER, TRUE));
+    }
+    requestHeaders.add(new AbfsHttpHeader(CONTENT_LENGTH, ZERO));
+    if (isAppendBlob) {
+      requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_TYPE, APPEND_BLOB_TYPE));
+    } else {
+      requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_TYPE, BLOCK_BLOB_TYPE));
+    }
+    if (!overwrite) {
+      requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, 
AbfsHttpConstants.STAR));
+    }
+    if (eTag != null && !eTag.isEmpty()) {
+      requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, 
eTag));
+    }
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
+    appendSASTokenToQuery(path, SASTokenProvider.FIXED_SAS_STORE_OPERATION, 
abfsUriQueryBuilder);
+
+    final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = getAbfsRestOperation(
+        AbfsRestOperationType.PutBlob,
+        HTTP_METHOD_PUT, url, requestHeaders);
+    try {
+      op.execute(tracingContext);
+    } catch (AzureBlobFileSystemException ex) {
+      // If we have no HTTP response, throw the original exception.
+      if (!op.hasResult()) {
+        throw ex;

Review Comment:
   Please add debug log message for better troubleshooting. We can add the 
debug log outside the condition and would log the detailed exception 
unconditionally.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java:
##########
@@ -377,8 +390,199 @@ public AbfsRestOperation createPath(final String path,
       final String eTag,
       final ContextEncryptionAdapter contextEncryptionAdapter,
       final TracingContext tracingContext) throws AzureBlobFileSystemException 
{
-    // TODO: [FnsOverBlob][HADOOP-19232] To be implemented as part of ingress 
support.
-    throw new NotImplementedException("Create Path operation on Blob endpoint 
yet to be implemented.");
+    return createPath(path, isFile, overwrite, permissions, isAppendBlob, eTag,
+        contextEncryptionAdapter, tracingContext, false);
+  }
+
+  /**
+   * Get Rest Operation for API
+   * <a 
href="https://learn.microsoft.com/en-us/rest/api/storageservices/put-blob";>Put 
Blob</a>.
+   * Creates a file or directory (marker file) at the specified path.
+   *
+   * @param path the path of the directory to be created.
+   * @param isFile whether the path is a file.
+   * @param overwrite whether to overwrite if the path already exists.
+   * @param permissions the permissions to set on the path.
+   * @param isAppendBlob whether the path is an append blob.
+   * @param eTag the eTag of the path.
+   * @param contextEncryptionAdapter the context encryption adapter.
+   * @param tracingContext the tracing context.
+   * @param isCreateCalledFromMarkers whether the create is called from 
markers.
+   * @return the executed rest operation containing the response from the 
server.
+   * @throws AzureBlobFileSystemException if the rest operation fails.
+   */
+  public AbfsRestOperation createPath(final String path,
+      final boolean isFile,
+      final boolean overwrite,
+      final AzureBlobFileSystemStore.Permissions permissions,
+      final boolean isAppendBlob,
+      final String eTag,
+      final ContextEncryptionAdapter contextEncryptionAdapter,
+      final TracingContext tracingContext,
+      boolean isCreateCalledFromMarkers) throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+    if (!getIsNamespaceEnabled() && !isCreateCalledFromMarkers) {
+      AbfsHttpOperation op1Result = null;
+      try {
+        op1Result = getPathStatus(path, tracingContext,
+            null, true).getResult();
+      } catch (AbfsRestOperationException ex) {
+        if (ex.getStatusCode() == HTTP_NOT_FOUND) {
+          LOG.debug("No directory/path found: {}", path);
+        } else {
+          throw ex;
+        }
+      }
+      if (op1Result != null) {
+        boolean isDir = checkIsDir(op1Result);
+        if (isFile == isDir) {
+          throw new AbfsRestOperationException(HTTP_CONFLICT,
+              AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
+              PATH_EXISTS,
+              null);
+        }
+      }
+      Path parentPath = new Path(path).getParent();
+      if (parentPath != null && !parentPath.isRoot()) {
+        createMarkers(parentPath, overwrite, permissions, isAppendBlob, eTag,
+            contextEncryptionAdapter, tracingContext);
+      }
+    }
+    if (isFile) {
+      addEncryptionKeyRequestHeaders(path, requestHeaders, true,
+          contextEncryptionAdapter, tracingContext);
+    } else {
+      requestHeaders.add(new AbfsHttpHeader(X_MS_META_HDI_ISFOLDER, TRUE));
+    }
+    requestHeaders.add(new AbfsHttpHeader(CONTENT_LENGTH, ZERO));
+    if (isAppendBlob) {
+      requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_TYPE, APPEND_BLOB_TYPE));
+    } else {
+      requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_TYPE, BLOCK_BLOB_TYPE));
+    }
+    if (!overwrite) {
+      requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, 
AbfsHttpConstants.STAR));
+    }
+    if (eTag != null && !eTag.isEmpty()) {
+      requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, 
eTag));
+    }
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
+    appendSASTokenToQuery(path, SASTokenProvider.FIXED_SAS_STORE_OPERATION, 
abfsUriQueryBuilder);
+
+    final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = getAbfsRestOperation(
+        AbfsRestOperationType.PutBlob,
+        HTTP_METHOD_PUT, url, requestHeaders);
+    try {
+      op.execute(tracingContext);
+    } catch (AzureBlobFileSystemException ex) {
+      // If we have no HTTP response, throw the original exception.
+      if (!op.hasResult()) {
+        throw ex;
+      }
+      if (!isFile && op.getResult().getStatusCode() == HTTP_CONFLICT) {
+        // This ensures that we don't throw ex only for existing directory but 
if a blob exists we throw exception.
+        AbfsHttpOperation opResult = null;
+        try {
+          opResult = this.getPathStatus(path, true, tracingContext, 
null).getResult();
+        } catch (AbfsRestOperationException e) {
+          if (opResult != null) {
+            throw e;
+          }
+        }
+        if (opResult != null && checkIsDir(opResult)) {
+          return op;
+        }
+      }
+      throw ex;
+    }
+    return op;
+  }
+
+  /**
+   *  Creates marker blobs for the parent directories of the specified path.
+   *
+   * @param path The path for which parent directories need to be created.
+   * @param overwrite A flag indicating whether existing directories should be 
overwritten.
+   * @param permissions The permissions to be set for the created directories.
+   * @param isAppendBlob A flag indicating whether the created blob should be 
of type APPEND_BLOB.
+   * @param eTag The eTag to be matched for conditional requests.
+   * @param contextEncryptionAdapter The encryption adapter for context 
encryption.
+   * @param tracingContext The tracing context for the operation.
+   * @throws AzureBlobFileSystemException If the creation of any parent 
directory fails.
+   */
+  private void createMarkers(final Path path,
+      final boolean overwrite,
+      final AzureBlobFileSystemStore.Permissions permissions,
+      final boolean isAppendBlob,
+      final String eTag,
+      final ContextEncryptionAdapter contextEncryptionAdapter,
+      final TracingContext tracingContext) throws AzureBlobFileSystemException 
{
+    ArrayList<Path> keysToCreateAsFolder = new ArrayList<>();
+    checkParentChainForFile(path, tracingContext,
+        keysToCreateAsFolder);
+    for (Path pathToCreate : keysToCreateAsFolder) {
+      createPath(pathToCreate.toUri().getPath(), false, overwrite, permissions,
+          isAppendBlob, eTag, contextEncryptionAdapter, tracingContext, true);
+    }
+  }
+
+  /**
+   * Checks for the entire parent hierarchy and returns if any directory 
exists and
+   * throws an exception if any file exists.
+   * @param path path to check the hierarchy for.
+   * @param tracingContext the tracingcontext.
+   */
+  private void checkParentChainForFile(Path path, TracingContext 
tracingContext,
+      List<Path> keysToCreateAsFolder) throws AzureBlobFileSystemException {
+    AbfsHttpOperation opResult = null;
+    try {
+      opResult = getPathStatus(path.toUri().getPath(),
+          tracingContext, null, false).getResult();
+    } catch (AbfsRestOperationException ex) {
+      if (ex.getStatusCode() == HTTP_NOT_FOUND) {
+        LOG.debug("No explicit directory/path found: {}", path);
+      } else {
+        throw ex;
+      }
+    }
+    boolean isDirectory = opResult != null && checkIsDir(opResult);
+    if (opResult != null && !isDirectory) {
+      throw new AbfsRestOperationException(HTTP_CONFLICT,
+          AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
+          PATH_EXISTS,
+          null);
+    }
+    if (isDirectory) {
+      return;
+    }
+    keysToCreateAsFolder.add(path);
+    Path current = path.getParent();
+    while (current != null && !current.isRoot()) {

Review Comment:
   Instead of duplicating, how abt using `do...while`  statement, with that the 
first iteration you can make it unconditionally, right?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java:
##########
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidIngressServiceException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import 
org.apache.hadoop.fs.azurebfs.contracts.services.BlobAppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.store.DataBlocks;
+import org.apache.hadoop.io.IOUtils;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOB_APPEND;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOB_FLUSH;
+import static 
org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient.generateBlockListXml;
+
+public class AzureBlobIngressHandler extends AzureIngressHandler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsOutputStream.class);
+
+  private volatile String eTag;
+
+  private final AzureBlobBlockManager blobBlockManager;
+
+  private final AbfsBlobClient blobClient;
+
+  private final AbfsClientHandler clientHandler;
+
+  /**
+   * Constructs an AzureBlobIngressHandler.
+   *
+   * @param abfsOutputStream the AbfsOutputStream.
+   * @param blockFactory the block factory.
+   * @param bufferSize the buffer size.
+   * @param eTag the eTag.
+   * @param clientHandler the client handler.
+   * @param blockManager the block manager.
+   * @throws AzureBlobFileSystemException if an error occurs.
+   */
+  public AzureBlobIngressHandler(AbfsOutputStream abfsOutputStream,
+      DataBlocks.BlockFactory blockFactory,
+      int bufferSize, String eTag, AbfsClientHandler clientHandler, 
AzureBlockManager blockManager)
+      throws AzureBlobFileSystemException {
+    super(abfsOutputStream);
+    this.eTag = eTag;
+    if (blockManager instanceof AzureBlobBlockManager) {
+      this.blobBlockManager = (AzureBlobBlockManager) blockManager;
+    } else {
+      this.blobBlockManager = new AzureBlobBlockManager(abfsOutputStream,
+          blockFactory, bufferSize);
+    }
+    this.clientHandler = clientHandler;
+    this.blobClient = clientHandler.getBlobClient();
+    LOG.trace("Created a new BlobIngress Handler for AbfsOutputStream instance 
{} for path {}",
+        abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
+  }
+
+  /**
+   * Buffers data into the specified block.
+   *
+   * @param block the block to buffer data into.
+   * @param data  the data to be buffered.
+   * @param off   the start offset in the data.
+   * @param length the number of bytes to buffer.
+   * @return the number of bytes buffered.
+   * @throws IOException if an I/O error occurs.
+   */
+  @Override
+  protected int bufferData(AbfsBlock block,
+      final byte[] data,
+      final int off,
+      final int length)
+      throws IOException {
+    LOG.trace("Buffering data of length {} to block at offset {}", length, 
off);
+    return block.write(data, off, length);
+  }
+
+  /**
+   * Performs a remote write operation.
+   *
+   * @param blockToUpload the block to upload.
+   * @param uploadData    the data to upload.
+   * @param reqParams     the request parameters.
+   * @param tracingContext the tracing context.
+   * @return the resulting AbfsRestOperation.
+   * @throws IOException if an I/O error occurs.
+   */
+  @Override
+  protected AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
+      DataBlocks.BlockUploadData uploadData,
+      AppendRequestParameters reqParams,
+      TracingContext tracingContext)
+      throws IOException {
+    BlobAppendRequestParameters blobParams = new 
BlobAppendRequestParameters(blockToUpload.getBlockId(), getETag());
+    reqParams.setBlobParams(blobParams);
+    AbfsRestOperation op;
+    String threadIdStr = String.valueOf(Thread.currentThread().getId());
+    TracingContext tracingContextAppend = new TracingContext(tracingContext);
+    tracingContextAppend.setIngressHandler(BLOB_APPEND + " T " + threadIdStr);
+    
tracingContextAppend.setPosition(String.valueOf(blockToUpload.getOffset()));
+    try {
+      LOG.trace("Starting remote write for block with ID {} and offset {}",
+          blockToUpload.getBlockId(), blockToUpload.getOffset());
+      op = getClient().append(getAbfsOutputStream().getPath(), 
uploadData.toByteArray(),
+          reqParams,
+          getAbfsOutputStream().getCachedSasTokenString(),
+          getAbfsOutputStream().getContextEncryptionAdapter(),
+          tracingContextAppend);
+      blobBlockManager.updateEntry(blockToUpload);
+    } catch (AbfsRestOperationException ex) {
+      LOG.error("Error in remote write requiring handler switch for path {}", 
getAbfsOutputStream().getPath(), ex);
+      if (shouldIngressHandlerBeSwitched(ex)) {
+        throw getIngressHandlerSwitchException(ex);
+      }
+      LOG.error("Error in remote write for path {} and offset {}", 
getAbfsOutputStream().getPath(),
+          blockToUpload.getOffset(), ex);
+      throw ex;
+    }
+    return op;
+  }
+
+  /**
+   * Flushes data to the remote store.
+   *
+   * @param offset               the offset to flush.
+   * @param retainUncommitedData whether to retain uncommitted data.
+   * @param isClose              whether this is a close operation.
+   * @param leaseId              the lease ID.
+   * @param tracingContext       the tracing context.
+   * @return the resulting AbfsRestOperation.
+   * @throws IOException if an I/O error occurs.
+   */
+  @Override
+  protected synchronized AbfsRestOperation remoteFlush(final long offset,
+      final boolean retainUncommitedData,
+      final boolean isClose,
+      final String leaseId,
+      TracingContext tracingContext)
+      throws IOException {
+    AbfsRestOperation op;
+    if (getAbfsOutputStream().isAppendBlob()) {
+      return null;
+    }
+    if (!blobBlockManager.hasListToCommit()) {
+      return null;
+    }
+    try {
+      // Generate the xml with the list of blockId's to generate putBlockList 
call.
+      String blockListXml = generateBlockListXml(
+          blobBlockManager.getBlockIdList());
+      TracingContext tracingContextFlush = new TracingContext(tracingContext);
+      tracingContextFlush.setIngressHandler(BLOB_FLUSH);
+      tracingContextFlush.setPosition(String.valueOf(offset));
+      LOG.trace("Flushing data at offset {} for path {}", offset, 
getAbfsOutputStream().getPath());
+      op = getClient().flush(blockListXml.getBytes(StandardCharsets.UTF_8),
+          getAbfsOutputStream().getPath(),
+          isClose, getAbfsOutputStream().getCachedSasTokenString(), leaseId,
+          getETag(), getAbfsOutputStream().getContextEncryptionAdapter(), 
tracingContextFlush);
+      setETag(op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG));
+    } catch (AbfsRestOperationException ex) {
+      LOG.error("Error in remote flush requiring handler switch for path {}", 
getAbfsOutputStream().getPath(), ex);
+      if (shouldIngressHandlerBeSwitched(ex)) {
+        throw getIngressHandlerSwitchException(ex);
+      }
+      LOG.error("Error in remote flush for path {} and offset {}", 
getAbfsOutputStream().getPath(), offset, ex);
+      throw ex;
+    }
+    return op;
+  }
+
+  /**
+   * Method to perform a remote write operation for appending data to an 
append blob in Azure Blob Storage.
+   *
+   * <p>This method is intended to be implemented by subclasses to handle the 
specific
+   * case of appending data to an append blob. It takes in the path of the 
append blob,
+   * the data to be uploaded, the block of data, and additional parameters 
required for
+   * the append operation.</p>
+   *
+   * @param path           The path of the append blob to which data is to be 
appended.
+   * @param uploadData     The data to be uploaded as part of the append 
operation.
+   * @param block          The block of data to append.
+   * @param reqParams      The additional parameters required for the append 
operation.
+   * @param tracingContext The tracing context for the operation.
+   * @return An {@link AbfsRestOperation} object representing the remote write 
operation.
+   * @throws IOException If an I/O error occurs during the append operation.
+   */
+  protected AbfsRestOperation remoteAppendBlobWrite(String path,
+      DataBlocks.BlockUploadData uploadData,
+      AbfsBlock block,
+      AppendRequestParameters reqParams,
+      TracingContext tracingContext) throws IOException {
+    // Perform the remote append operation using the blob client.
+    AbfsRestOperation op = null;
+    try {
+      op = blobClient.appendBlock(path, reqParams, uploadData.toByteArray(), 
tracingContext);
+    } catch (AbfsRestOperationException ex) {
+      LOG.error("Error in remote write requiring handler switch for path {}",
+          getAbfsOutputStream().getPath(), ex);
+      if (shouldIngressHandlerBeSwitched(ex)) {
+        throw getIngressHandlerSwitchException(ex);
+      }
+      LOG.error("Error in remote write for path {} and offset {}",
+          getAbfsOutputStream().getPath(),
+          block.getOffset(), ex);
+      throw ex;
+    }
+    return op;
+  }
+
+  /**
+   * Sets the eTag of the blob.
+   *
+   * @param eTag the eTag to set.
+   */
+  synchronized void setETag(String eTag) {
+    this.eTag = eTag;
+  }
+
+  /**
+   * Gets the eTag value of the blob.
+   *
+   * @return the eTag.
+   */
+  @VisibleForTesting
+  @Override
+  public synchronized String getETag() {

Review Comment:
   same applicable here as well, please explain the reason for synchroinzed



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java:
##########
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidIngressServiceException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import 
org.apache.hadoop.fs.azurebfs.contracts.services.BlobAppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.store.DataBlocks;
+import org.apache.hadoop.io.IOUtils;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOB_APPEND;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOB_FLUSH;
+import static 
org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient.generateBlockListXml;
+
+public class AzureBlobIngressHandler extends AzureIngressHandler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsOutputStream.class);
+
+  private volatile String eTag;
+
+  private final AzureBlobBlockManager blobBlockManager;
+
+  private final AbfsBlobClient blobClient;
+
+  private final AbfsClientHandler clientHandler;
+
+  /**
+   * Constructs an AzureBlobIngressHandler.
+   *
+   * @param abfsOutputStream the AbfsOutputStream.
+   * @param blockFactory the block factory.
+   * @param bufferSize the buffer size.
+   * @param eTag the eTag.
+   * @param clientHandler the client handler.
+   * @param blockManager the block manager.
+   * @throws AzureBlobFileSystemException if an error occurs.
+   */
+  public AzureBlobIngressHandler(AbfsOutputStream abfsOutputStream,
+      DataBlocks.BlockFactory blockFactory,
+      int bufferSize, String eTag, AbfsClientHandler clientHandler, 
AzureBlockManager blockManager)
+      throws AzureBlobFileSystemException {
+    super(abfsOutputStream);
+    this.eTag = eTag;
+    if (blockManager instanceof AzureBlobBlockManager) {
+      this.blobBlockManager = (AzureBlobBlockManager) blockManager;
+    } else {
+      this.blobBlockManager = new AzureBlobBlockManager(abfsOutputStream,
+          blockFactory, bufferSize);
+    }
+    this.clientHandler = clientHandler;
+    this.blobClient = clientHandler.getBlobClient();
+    LOG.trace("Created a new BlobIngress Handler for AbfsOutputStream instance 
{} for path {}",
+        abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
+  }
+
+  /**
+   * Buffers data into the specified block.
+   *
+   * @param block the block to buffer data into.
+   * @param data  the data to be buffered.
+   * @param off   the start offset in the data.
+   * @param length the number of bytes to buffer.
+   * @return the number of bytes buffered.
+   * @throws IOException if an I/O error occurs.
+   */
+  @Override
+  protected int bufferData(AbfsBlock block,
+      final byte[] data,
+      final int off,
+      final int length)
+      throws IOException {
+    LOG.trace("Buffering data of length {} to block at offset {}", length, 
off);
+    return block.write(data, off, length);
+  }
+
+  /**
+   * Performs a remote write operation.
+   *
+   * @param blockToUpload the block to upload.
+   * @param uploadData    the data to upload.
+   * @param reqParams     the request parameters.
+   * @param tracingContext the tracing context.
+   * @return the resulting AbfsRestOperation.
+   * @throws IOException if an I/O error occurs.
+   */
+  @Override
+  protected AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
+      DataBlocks.BlockUploadData uploadData,
+      AppendRequestParameters reqParams,
+      TracingContext tracingContext)
+      throws IOException {
+    BlobAppendRequestParameters blobParams = new 
BlobAppendRequestParameters(blockToUpload.getBlockId(), getETag());
+    reqParams.setBlobParams(blobParams);
+    AbfsRestOperation op;
+    String threadIdStr = String.valueOf(Thread.currentThread().getId());
+    TracingContext tracingContextAppend = new TracingContext(tracingContext);
+    tracingContextAppend.setIngressHandler(BLOB_APPEND + " T " + threadIdStr);
+    
tracingContextAppend.setPosition(String.valueOf(blockToUpload.getOffset()));
+    try {
+      LOG.trace("Starting remote write for block with ID {} and offset {}",
+          blockToUpload.getBlockId(), blockToUpload.getOffset());
+      op = getClient().append(getAbfsOutputStream().getPath(), 
uploadData.toByteArray(),
+          reqParams,
+          getAbfsOutputStream().getCachedSasTokenString(),
+          getAbfsOutputStream().getContextEncryptionAdapter(),
+          tracingContextAppend);
+      blobBlockManager.updateEntry(blockToUpload);
+    } catch (AbfsRestOperationException ex) {
+      LOG.error("Error in remote write requiring handler switch for path {}", 
getAbfsOutputStream().getPath(), ex);
+      if (shouldIngressHandlerBeSwitched(ex)) {
+        throw getIngressHandlerSwitchException(ex);
+      }
+      LOG.error("Error in remote write for path {} and offset {}", 
getAbfsOutputStream().getPath(),
+          blockToUpload.getOffset(), ex);
+      throw ex;
+    }
+    return op;
+  }
+
+  /**
+   * Flushes data to the remote store.
+   *
+   * @param offset               the offset to flush.
+   * @param retainUncommitedData whether to retain uncommitted data.
+   * @param isClose              whether this is a close operation.
+   * @param leaseId              the lease ID.
+   * @param tracingContext       the tracing context.
+   * @return the resulting AbfsRestOperation.
+   * @throws IOException if an I/O error occurs.
+   */
+  @Override
+  protected synchronized AbfsRestOperation remoteFlush(final long offset,

Review Comment:
   Have we used the `synchronized ` threadsafe philosophy accurately in this 
entire class. Could you please double check all the methods in this class.





> ABFS: [FnsOverBlob] Implementing Ingress Support with various Fallback 
> Handling
> -------------------------------------------------------------------------------
>
>                 Key: HADOOP-19232
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19232
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.4.0
>            Reporter: Anuj Modi
>            Assignee: Anmol Asrani
>            Priority: Major
>              Labels: pull-request-available
>
> Scope of this task is to refactor the AbfsOutputStream class to handle the 
> ingress for DFS and Blob endpoint effectively.
> More details will be added soon.
> Perquisites for this Patch:
> 1. [HADOOP-19187] ABFS: [FnsOverBlob]Making AbfsClient Abstract for 
> supporting both DFS and Blob Endpoint - ASF JIRA (apache.org)
> 2. [HADOOP-19226] ABFS: [FnsOverBlob]Implementing Azure Rest APIs on Blob 
> Endpoint for AbfsBlobClient - ASF JIRA (apache.org)
> 3. [HADOOP-19207] ABFS: [FnsOverBlob]Response Handling of Blob Endpoint APIs 
> and Metadata APIs - ASF JIRA (apache.org)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to