[
https://issues.apache.org/jira/browse/HADOOP-19232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17922253#comment-17922253
]
ASF GitHub Bot commented on HADOOP-19232:
-----------------------------------------
rakeshadr commented on code in PR #7272:
URL: https://github.com/apache/hadoop/pull/7272#discussion_r1934934413
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -177,12 +195,181 @@ public AbfsOutputStream(AbfsOutputStreamContext
abfsOutputStreamContext)
this.tracingContext.setOperation(FSOperationType.WRITE);
this.ioStatistics = outputStreamStatistics.getIOStatistics();
this.blockFactory = abfsOutputStreamContext.getBlockFactory();
- this.blockSize = bufferSize;
- // create that first block. This guarantees that an open + close sequence
- // writes a 0-byte entry.
- createBlockIfNeeded();
+ this.isDFSToBlobFallbackEnabled
+ = abfsOutputStreamContext.isDFSToBlobFallbackEnabled();
+ this.serviceTypeAtInit = abfsOutputStreamContext.getIngressServiceType();
+ this.currentExecutingServiceType =
abfsOutputStreamContext.getIngressServiceType();
+ this.clientHandler = abfsOutputStreamContext.getClientHandler();
+ createIngressHandler(serviceTypeAtInit,
+ abfsOutputStreamContext.getBlockFactory(), bufferSize, false, null);
+ }
+
+ /**
+ * Retrieves the current ingress handler.
+ *
+ * @return the current {@link AzureIngressHandler}.
+ */
+ public AzureIngressHandler getIngressHandler() {
+ return ingressHandler;
+ }
+
+ private final Lock lock = new ReentrantLock();
+
+ private volatile boolean switchCompleted = false;
+
+ /**
+ * Creates or retrieves an existing Azure ingress handler based on the
service type and provided parameters.
+ * <p>
+ * If the `ingressHandler` is already initialized and the switch operation
is complete, the existing
+ * handler is returned without acquiring a lock to minimize performance
overhead.
+ * If the `ingressHandler` is initialized but the switch is incomplete, a
lock is acquired to safely modify
+ * or create a new handler. Double-checked locking is used to ensure thread
safety while minimizing the
+ * time spent in the critical section.
+ * If the `ingressHandler` is `null`, the handler is safely initialized
outside of the lock as no other
+ * thread would be modifying it.
+ * </p>
+ *
+ * @param serviceType The type of Azure service to handle (e.g., ABFS,
Blob, etc.).
+ * @param blockFactory The factory to create data blocks used in the
handler.
+ * @param bufferSize The buffer size used by the handler for data
processing.
+ * @param isSwitch A flag indicating whether a switch operation is in
progress.
+ * @param blockManager The manager responsible for handling blocks of data
during processing.
+ *
+ * @return The initialized or existing Azure ingress handler.
+ * @throws IOException If an I/O error occurs during handler creation or
data processing.
+ */
+ private AzureIngressHandler createIngressHandler(AbfsServiceType serviceType,
+ DataBlocks.BlockFactory blockFactory,
+ int bufferSize, boolean isSwitch, AzureBlockManager blockManager) throws
IOException {
+ if (ingressHandler != null) {
+ if (switchCompleted) {
+ return ingressHandler; // Return the handler if it's already
initialized and the switch is completed
+ }
+ // If the switch is incomplete, lock to safely modify
+ lock.lock();
+ try {
+ // Double-check the condition after acquiring the lock
+ if (switchCompleted) {
+ return ingressHandler; // Return the handler if it's now completed
+ }
+ // If the switch is still incomplete, create a new handler
+ return createNewHandler(serviceType, blockFactory, bufferSize,
isSwitch, blockManager);
+ } finally {
+ lock.unlock();
+ }
+ }
+ // If ingressHandler is null, no lock is needed; safely initialize it
outside the lock
+ return createNewHandler(serviceType, blockFactory, bufferSize, isSwitch,
blockManager);
}
+ // Helper method to create a new handler, used in both scenarios (locked and
unlocked)
+ private AzureIngressHandler createNewHandler(AbfsServiceType serviceType,
+ DataBlocks.BlockFactory blockFactory,
+ int bufferSize,
+ boolean isSwitch,
+ AzureBlockManager blockManager) throws IOException {
+ this.client = clientHandler.getClient(serviceType);
+ if (isDFSToBlobFallbackEnabled && serviceTypeAtInit !=
AbfsServiceType.DFS) {
+ throw new InvalidConfigurationValueException(
+ "The ingress service type must be configured as DFS");
+ }
+ if (isDFSToBlobFallbackEnabled && !isSwitch) {
+ ingressHandler = new AzureDfsToBlobIngressFallbackHandler(this,
+ blockFactory, bufferSize, eTag, clientHandler);
+ } else if (serviceType == AbfsServiceType.BLOB) {
+ ingressHandler = new AzureBlobIngressHandler(this, blockFactory,
+ bufferSize, eTag, clientHandler, blockManager);
+ } else {
+ ingressHandler = new AzureDFSIngressHandler(this, blockFactory,
+ bufferSize, eTag, clientHandler);
+ }
+ if (isSwitch) {
+ switchCompleted = true;
+ }
+ return ingressHandler;
+ }
+
+ /**
+ * Switches the current ingress handler and service type if necessary.
+ *
+ * @throws IOException if there is an error creating the new ingress handler.
+ */
+ protected void switchHandler() throws IOException {
+ if (serviceTypeAtInit != currentExecutingServiceType) {
+ return;
Review Comment:
Please add a debug message about this != condition, conveying in which
situation this case occurs.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -177,12 +195,181 @@ public AbfsOutputStream(AbfsOutputStreamContext
abfsOutputStreamContext)
this.tracingContext.setOperation(FSOperationType.WRITE);
this.ioStatistics = outputStreamStatistics.getIOStatistics();
this.blockFactory = abfsOutputStreamContext.getBlockFactory();
- this.blockSize = bufferSize;
- // create that first block. This guarantees that an open + close sequence
- // writes a 0-byte entry.
- createBlockIfNeeded();
+ this.isDFSToBlobFallbackEnabled
+ = abfsOutputStreamContext.isDFSToBlobFallbackEnabled();
+ this.serviceTypeAtInit = abfsOutputStreamContext.getIngressServiceType();
+ this.currentExecutingServiceType =
abfsOutputStreamContext.getIngressServiceType();
+ this.clientHandler = abfsOutputStreamContext.getClientHandler();
+ createIngressHandler(serviceTypeAtInit,
+ abfsOutputStreamContext.getBlockFactory(), bufferSize, false, null);
+ }
+
+ /**
+ * Retrieves the current ingress handler.
+ *
+ * @return the current {@link AzureIngressHandler}.
+ */
+ public AzureIngressHandler getIngressHandler() {
+ return ingressHandler;
+ }
+
+ private final Lock lock = new ReentrantLock();
+
+ private volatile boolean switchCompleted = false;
+
+ /**
+ * Creates or retrieves an existing Azure ingress handler based on the
service type and provided parameters.
+ * <p>
+ * If the `ingressHandler` is already initialized and the switch operation
is complete, the existing
+ * handler is returned without acquiring a lock to minimize performance
overhead.
+ * If the `ingressHandler` is initialized but the switch is incomplete, a
lock is acquired to safely modify
+ * or create a new handler. Double-checked locking is used to ensure thread
safety while minimizing the
+ * time spent in the critical section.
+ * If the `ingressHandler` is `null`, the handler is safely initialized
outside of the lock as no other
+ * thread would be modifying it.
+ * </p>
+ *
+ * @param serviceType The type of Azure service to handle (e.g., ABFS,
Blob, etc.).
+ * @param blockFactory The factory to create data blocks used in the
handler.
+ * @param bufferSize The buffer size used by the handler for data
processing.
+ * @param isSwitch A flag indicating whether a switch operation is in
progress.
+ * @param blockManager The manager responsible for handling blocks of data
during processing.
+ *
+ * @return The initialized or existing Azure ingress handler.
+ * @throws IOException If an I/O error occurs during handler creation or
data processing.
+ */
+ private AzureIngressHandler createIngressHandler(AbfsServiceType serviceType,
+ DataBlocks.BlockFactory blockFactory,
+ int bufferSize, boolean isSwitch, AzureBlockManager blockManager) throws
IOException {
+ if (ingressHandler != null) {
+ if (switchCompleted) {
+ return ingressHandler; // Return the handler if it's already
initialized and the switch is completed
+ }
+ // If the switch is incomplete, lock to safely modify
+ lock.lock();
+ try {
+ // Double-check the condition after acquiring the lock
+ if (switchCompleted) {
+ return ingressHandler; // Return the handler if it's now completed
+ }
+ // If the switch is still incomplete, create a new handler
+ return createNewHandler(serviceType, blockFactory, bufferSize,
isSwitch, blockManager);
+ } finally {
+ lock.unlock();
+ }
+ }
+ // If ingressHandler is null, no lock is needed; safely initialize it
outside the lock
+ return createNewHandler(serviceType, blockFactory, bufferSize, isSwitch,
blockManager);
}
+ // Helper method to create a new handler, used in both scenarios (locked and
unlocked)
+ private AzureIngressHandler createNewHandler(AbfsServiceType serviceType,
+ DataBlocks.BlockFactory blockFactory,
+ int bufferSize,
+ boolean isSwitch,
+ AzureBlockManager blockManager) throws IOException {
+ this.client = clientHandler.getClient(serviceType);
+ if (isDFSToBlobFallbackEnabled && serviceTypeAtInit !=
AbfsServiceType.DFS) {
+ throw new InvalidConfigurationValueException(
+ "The ingress service type must be configured as DFS");
+ }
+ if (isDFSToBlobFallbackEnabled && !isSwitch) {
+ ingressHandler = new AzureDfsToBlobIngressFallbackHandler(this,
+ blockFactory, bufferSize, eTag, clientHandler);
+ } else if (serviceType == AbfsServiceType.BLOB) {
+ ingressHandler = new AzureBlobIngressHandler(this, blockFactory,
+ bufferSize, eTag, clientHandler, blockManager);
+ } else {
+ ingressHandler = new AzureDFSIngressHandler(this, blockFactory,
+ bufferSize, eTag, clientHandler);
+ }
+ if (isSwitch) {
+ switchCompleted = true;
+ }
+ return ingressHandler;
+ }
+
+ /**
+ * Switches the current ingress handler and service type if necessary.
+ *
+ * @throws IOException if there is an error creating the new ingress handler.
+ */
+ protected void switchHandler() throws IOException {
+ if (serviceTypeAtInit != currentExecutingServiceType) {
+ return;
+ }
+ if (serviceTypeAtInit == AbfsServiceType.BLOB) {
Review Comment:
As this is being used for fallback ingress, please add log message about the
auto-switching (includes the new selected type as well). Please keep as info
log priority as this would be useful to the users.
> ABFS: [FnsOverBlob] Implementing Ingress Support with various Fallback
> Handling
> -------------------------------------------------------------------------------
>
> Key: HADOOP-19232
> URL: https://issues.apache.org/jira/browse/HADOOP-19232
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.4.0
> Reporter: Anuj Modi
> Assignee: Anmol Asrani
> Priority: Major
> Labels: pull-request-available
>
> Scope of this task is to refactor the AbfsOutputStream class to handle the
> ingress for DFS and Blob endpoint effectively.
> More details will be added soon.
> Perquisites for this Patch:
> 1. [HADOOP-19187] ABFS: [FnsOverBlob]Making AbfsClient Abstract for
> supporting both DFS and Blob Endpoint - ASF JIRA (apache.org)
> 2. [HADOOP-19226] ABFS: [FnsOverBlob]Implementing Azure Rest APIs on Blob
> Endpoint for AbfsBlobClient - ASF JIRA (apache.org)
> 3. [HADOOP-19207] ABFS: [FnsOverBlob]Response Handling of Blob Endpoint APIs
> and Metadata APIs - ASF JIRA (apache.org)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]