anmolanmol1234 commented on code in PR #7272:
URL: https://github.com/apache/hadoop/pull/7272#discussion_r1935007959


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -177,12 +195,181 @@ public AbfsOutputStream(AbfsOutputStreamContext 
abfsOutputStreamContext)
     this.tracingContext.setOperation(FSOperationType.WRITE);
     this.ioStatistics = outputStreamStatistics.getIOStatistics();
     this.blockFactory = abfsOutputStreamContext.getBlockFactory();
-    this.blockSize = bufferSize;
-    // create that first block. This guarantees that an open + close sequence
-    // writes a 0-byte entry.
-    createBlockIfNeeded();
+    this.isDFSToBlobFallbackEnabled
+        = abfsOutputStreamContext.isDFSToBlobFallbackEnabled();
+    this.serviceTypeAtInit = abfsOutputStreamContext.getIngressServiceType();
+    this.currentExecutingServiceType = 
abfsOutputStreamContext.getIngressServiceType();
+    this.clientHandler = abfsOutputStreamContext.getClientHandler();
+    createIngressHandler(serviceTypeAtInit,
+        abfsOutputStreamContext.getBlockFactory(), bufferSize, false, null);
+  }
+
+  /**
+   * Retrieves the current ingress handler.
+   *
+   * @return the current {@link AzureIngressHandler}.
+   */
+  public AzureIngressHandler getIngressHandler() {
+    return ingressHandler;
+  }
+
+  private final Lock lock = new ReentrantLock();
+
+  private volatile boolean switchCompleted = false;
+
+  /**
+   * Creates or retrieves an existing Azure ingress handler based on the 
service type and provided parameters.
+   * <p>
+   * If the `ingressHandler` is already initialized and the switch operation 
is complete, the existing
+   * handler is returned without acquiring a lock to minimize performance 
overhead.
+   * If the `ingressHandler` is initialized but the switch is incomplete, a 
lock is acquired to safely modify
+   * or create a new handler. Double-checked locking is used to ensure thread 
safety while minimizing the
+   * time spent in the critical section.
+   * If the `ingressHandler` is `null`, the handler is safely initialized 
outside of the lock as no other
+   * thread would be modifying it.
+   * </p>
+   *
+   * @param serviceType   The type of Azure service to handle (e.g., ABFS, 
Blob, etc.).
+   * @param blockFactory  The factory to create data blocks used in the 
handler.
+   * @param bufferSize    The buffer size used by the handler for data 
processing.
+   * @param isSwitch      A flag indicating whether a switch operation is in 
progress.
+   * @param blockManager  The manager responsible for handling blocks of data 
during processing.
+   *
+   * @return The initialized or existing Azure ingress handler.
+   * @throws IOException If an I/O error occurs during handler creation or 
data processing.
+   */
+  private AzureIngressHandler createIngressHandler(AbfsServiceType serviceType,
+      DataBlocks.BlockFactory blockFactory,
+      int bufferSize, boolean isSwitch, AzureBlockManager blockManager) throws 
IOException {
+    if (ingressHandler != null) {
+      if (switchCompleted) {
+        return ingressHandler; // Return the handler if it's already 
initialized and the switch is completed
+      }
+      // If the switch is incomplete, lock to safely modify
+      lock.lock();
+      try {
+        // Double-check the condition after acquiring the lock
+        if (switchCompleted) {
+          return ingressHandler; // Return the handler if it's now completed
+        }
+        // If the switch is still incomplete, create a new handler
+        return createNewHandler(serviceType, blockFactory, bufferSize, 
isSwitch, blockManager);
+      } finally {
+        lock.unlock();
+      }
+    }
+    // If ingressHandler is null, no lock is needed; safely initialize it 
outside the lock
+    return createNewHandler(serviceType, blockFactory, bufferSize, isSwitch, 
blockManager);
   }
 
+  // Helper method to create a new handler, used in both scenarios (locked and 
unlocked)
+  private AzureIngressHandler createNewHandler(AbfsServiceType serviceType,
+      DataBlocks.BlockFactory blockFactory,
+      int bufferSize,
+      boolean isSwitch,
+      AzureBlockManager blockManager) throws IOException {
+    this.client = clientHandler.getClient(serviceType);
+    if (isDFSToBlobFallbackEnabled && serviceTypeAtInit != 
AbfsServiceType.DFS) {
+      throw new InvalidConfigurationValueException(
+          "The ingress service type must be configured as DFS");
+    }
+    if (isDFSToBlobFallbackEnabled && !isSwitch) {
+      ingressHandler = new AzureDfsToBlobIngressFallbackHandler(this,
+          blockFactory, bufferSize, eTag, clientHandler);
+    } else if (serviceType == AbfsServiceType.BLOB) {
+      ingressHandler = new AzureBlobIngressHandler(this, blockFactory,
+          bufferSize, eTag, clientHandler, blockManager);
+    } else {
+      ingressHandler = new AzureDFSIngressHandler(this, blockFactory,
+          bufferSize, eTag, clientHandler);
+    }
+    if (isSwitch) {
+      switchCompleted = true;
+    }
+    return ingressHandler;
+  }
+
+  /**
+   * Switches the current ingress handler and service type if necessary.
+   *
+   * @throws IOException if there is an error creating the new ingress handler.
+   */
+  protected void switchHandler() throws IOException {
+    if (serviceTypeAtInit != currentExecutingServiceType) {
+      return;

Review Comment:
   taken



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -177,12 +195,181 @@ public AbfsOutputStream(AbfsOutputStreamContext 
abfsOutputStreamContext)
     this.tracingContext.setOperation(FSOperationType.WRITE);
     this.ioStatistics = outputStreamStatistics.getIOStatistics();
     this.blockFactory = abfsOutputStreamContext.getBlockFactory();
-    this.blockSize = bufferSize;
-    // create that first block. This guarantees that an open + close sequence
-    // writes a 0-byte entry.
-    createBlockIfNeeded();
+    this.isDFSToBlobFallbackEnabled
+        = abfsOutputStreamContext.isDFSToBlobFallbackEnabled();
+    this.serviceTypeAtInit = abfsOutputStreamContext.getIngressServiceType();
+    this.currentExecutingServiceType = 
abfsOutputStreamContext.getIngressServiceType();
+    this.clientHandler = abfsOutputStreamContext.getClientHandler();
+    createIngressHandler(serviceTypeAtInit,
+        abfsOutputStreamContext.getBlockFactory(), bufferSize, false, null);
+  }
+
+  /**
+   * Retrieves the current ingress handler.
+   *
+   * @return the current {@link AzureIngressHandler}.
+   */
+  public AzureIngressHandler getIngressHandler() {
+    return ingressHandler;
+  }
+
+  private final Lock lock = new ReentrantLock();
+
+  private volatile boolean switchCompleted = false;
+
+  /**
+   * Creates or retrieves an existing Azure ingress handler based on the 
service type and provided parameters.
+   * <p>
+   * If the `ingressHandler` is already initialized and the switch operation 
is complete, the existing
+   * handler is returned without acquiring a lock to minimize performance 
overhead.
+   * If the `ingressHandler` is initialized but the switch is incomplete, a 
lock is acquired to safely modify
+   * or create a new handler. Double-checked locking is used to ensure thread 
safety while minimizing the
+   * time spent in the critical section.
+   * If the `ingressHandler` is `null`, the handler is safely initialized 
outside of the lock as no other
+   * thread would be modifying it.
+   * </p>
+   *
+   * @param serviceType   The type of Azure service to handle (e.g., ABFS, 
Blob, etc.).
+   * @param blockFactory  The factory to create data blocks used in the 
handler.
+   * @param bufferSize    The buffer size used by the handler for data 
processing.
+   * @param isSwitch      A flag indicating whether a switch operation is in 
progress.
+   * @param blockManager  The manager responsible for handling blocks of data 
during processing.
+   *
+   * @return The initialized or existing Azure ingress handler.
+   * @throws IOException If an I/O error occurs during handler creation or 
data processing.
+   */
+  private AzureIngressHandler createIngressHandler(AbfsServiceType serviceType,
+      DataBlocks.BlockFactory blockFactory,
+      int bufferSize, boolean isSwitch, AzureBlockManager blockManager) throws 
IOException {
+    if (ingressHandler != null) {
+      if (switchCompleted) {
+        return ingressHandler; // Return the handler if it's already 
initialized and the switch is completed
+      }
+      // If the switch is incomplete, lock to safely modify
+      lock.lock();
+      try {
+        // Double-check the condition after acquiring the lock
+        if (switchCompleted) {
+          return ingressHandler; // Return the handler if it's now completed
+        }
+        // If the switch is still incomplete, create a new handler
+        return createNewHandler(serviceType, blockFactory, bufferSize, 
isSwitch, blockManager);
+      } finally {
+        lock.unlock();
+      }
+    }
+    // If ingressHandler is null, no lock is needed; safely initialize it 
outside the lock
+    return createNewHandler(serviceType, blockFactory, bufferSize, isSwitch, 
blockManager);
   }
 
+  // Helper method to create a new handler, used in both scenarios (locked and 
unlocked)
+  private AzureIngressHandler createNewHandler(AbfsServiceType serviceType,
+      DataBlocks.BlockFactory blockFactory,
+      int bufferSize,
+      boolean isSwitch,
+      AzureBlockManager blockManager) throws IOException {
+    this.client = clientHandler.getClient(serviceType);
+    if (isDFSToBlobFallbackEnabled && serviceTypeAtInit != 
AbfsServiceType.DFS) {
+      throw new InvalidConfigurationValueException(
+          "The ingress service type must be configured as DFS");
+    }
+    if (isDFSToBlobFallbackEnabled && !isSwitch) {
+      ingressHandler = new AzureDfsToBlobIngressFallbackHandler(this,
+          blockFactory, bufferSize, eTag, clientHandler);
+    } else if (serviceType == AbfsServiceType.BLOB) {
+      ingressHandler = new AzureBlobIngressHandler(this, blockFactory,
+          bufferSize, eTag, clientHandler, blockManager);
+    } else {
+      ingressHandler = new AzureDFSIngressHandler(this, blockFactory,
+          bufferSize, eTag, clientHandler);
+    }
+    if (isSwitch) {
+      switchCompleted = true;
+    }
+    return ingressHandler;
+  }
+
+  /**
+   * Switches the current ingress handler and service type if necessary.
+   *
+   * @throws IOException if there is an error creating the new ingress handler.
+   */
+  protected void switchHandler() throws IOException {
+    if (serviceTypeAtInit != currentExecutingServiceType) {
+      return;
+    }
+    if (serviceTypeAtInit == AbfsServiceType.BLOB) {

Review Comment:
   taken



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to