anmolanmol1234 commented on code in PR #7272: URL: https://github.com/apache/hadoop/pull/7272#discussion_r1935007959
########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java: ########## @@ -177,12 +195,181 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext) this.tracingContext.setOperation(FSOperationType.WRITE); this.ioStatistics = outputStreamStatistics.getIOStatistics(); this.blockFactory = abfsOutputStreamContext.getBlockFactory(); - this.blockSize = bufferSize; - // create that first block. This guarantees that an open + close sequence - // writes a 0-byte entry. - createBlockIfNeeded(); + this.isDFSToBlobFallbackEnabled + = abfsOutputStreamContext.isDFSToBlobFallbackEnabled(); + this.serviceTypeAtInit = abfsOutputStreamContext.getIngressServiceType(); + this.currentExecutingServiceType = abfsOutputStreamContext.getIngressServiceType(); + this.clientHandler = abfsOutputStreamContext.getClientHandler(); + createIngressHandler(serviceTypeAtInit, + abfsOutputStreamContext.getBlockFactory(), bufferSize, false, null); + } + + /** + * Retrieves the current ingress handler. + * + * @return the current {@link AzureIngressHandler}. + */ + public AzureIngressHandler getIngressHandler() { + return ingressHandler; + } + + private final Lock lock = new ReentrantLock(); + + private volatile boolean switchCompleted = false; + + /** + * Creates or retrieves an existing Azure ingress handler based on the service type and provided parameters. + * <p> + * If the `ingressHandler` is already initialized and the switch operation is complete, the existing + * handler is returned without acquiring a lock to minimize performance overhead. + * If the `ingressHandler` is initialized but the switch is incomplete, a lock is acquired to safely modify + * or create a new handler. Double-checked locking is used to ensure thread safety while minimizing the + * time spent in the critical section. + * If the `ingressHandler` is `null`, the handler is safely initialized outside of the lock as no other + * thread would be modifying it. + * </p> + * + * @param serviceType The type of Azure service to handle (e.g., ABFS, Blob, etc.). + * @param blockFactory The factory to create data blocks used in the handler. + * @param bufferSize The buffer size used by the handler for data processing. + * @param isSwitch A flag indicating whether a switch operation is in progress. + * @param blockManager The manager responsible for handling blocks of data during processing. + * + * @return The initialized or existing Azure ingress handler. + * @throws IOException If an I/O error occurs during handler creation or data processing. + */ + private AzureIngressHandler createIngressHandler(AbfsServiceType serviceType, + DataBlocks.BlockFactory blockFactory, + int bufferSize, boolean isSwitch, AzureBlockManager blockManager) throws IOException { + if (ingressHandler != null) { + if (switchCompleted) { + return ingressHandler; // Return the handler if it's already initialized and the switch is completed + } + // If the switch is incomplete, lock to safely modify + lock.lock(); + try { + // Double-check the condition after acquiring the lock + if (switchCompleted) { + return ingressHandler; // Return the handler if it's now completed + } + // If the switch is still incomplete, create a new handler + return createNewHandler(serviceType, blockFactory, bufferSize, isSwitch, blockManager); + } finally { + lock.unlock(); + } + } + // If ingressHandler is null, no lock is needed; safely initialize it outside the lock + return createNewHandler(serviceType, blockFactory, bufferSize, isSwitch, blockManager); } + // Helper method to create a new handler, used in both scenarios (locked and unlocked) + private AzureIngressHandler createNewHandler(AbfsServiceType serviceType, + DataBlocks.BlockFactory blockFactory, + int bufferSize, + boolean isSwitch, + AzureBlockManager blockManager) throws IOException { + this.client = clientHandler.getClient(serviceType); + if (isDFSToBlobFallbackEnabled && serviceTypeAtInit != AbfsServiceType.DFS) { + throw new InvalidConfigurationValueException( + "The ingress service type must be configured as DFS"); + } + if (isDFSToBlobFallbackEnabled && !isSwitch) { + ingressHandler = new AzureDfsToBlobIngressFallbackHandler(this, + blockFactory, bufferSize, eTag, clientHandler); + } else if (serviceType == AbfsServiceType.BLOB) { + ingressHandler = new AzureBlobIngressHandler(this, blockFactory, + bufferSize, eTag, clientHandler, blockManager); + } else { + ingressHandler = new AzureDFSIngressHandler(this, blockFactory, + bufferSize, eTag, clientHandler); + } + if (isSwitch) { + switchCompleted = true; + } + return ingressHandler; + } + + /** + * Switches the current ingress handler and service type if necessary. + * + * @throws IOException if there is an error creating the new ingress handler. + */ + protected void switchHandler() throws IOException { + if (serviceTypeAtInit != currentExecutingServiceType) { + return; Review Comment: taken ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java: ########## @@ -177,12 +195,181 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext) this.tracingContext.setOperation(FSOperationType.WRITE); this.ioStatistics = outputStreamStatistics.getIOStatistics(); this.blockFactory = abfsOutputStreamContext.getBlockFactory(); - this.blockSize = bufferSize; - // create that first block. This guarantees that an open + close sequence - // writes a 0-byte entry. - createBlockIfNeeded(); + this.isDFSToBlobFallbackEnabled + = abfsOutputStreamContext.isDFSToBlobFallbackEnabled(); + this.serviceTypeAtInit = abfsOutputStreamContext.getIngressServiceType(); + this.currentExecutingServiceType = abfsOutputStreamContext.getIngressServiceType(); + this.clientHandler = abfsOutputStreamContext.getClientHandler(); + createIngressHandler(serviceTypeAtInit, + abfsOutputStreamContext.getBlockFactory(), bufferSize, false, null); + } + + /** + * Retrieves the current ingress handler. + * + * @return the current {@link AzureIngressHandler}. + */ + public AzureIngressHandler getIngressHandler() { + return ingressHandler; + } + + private final Lock lock = new ReentrantLock(); + + private volatile boolean switchCompleted = false; + + /** + * Creates or retrieves an existing Azure ingress handler based on the service type and provided parameters. + * <p> + * If the `ingressHandler` is already initialized and the switch operation is complete, the existing + * handler is returned without acquiring a lock to minimize performance overhead. + * If the `ingressHandler` is initialized but the switch is incomplete, a lock is acquired to safely modify + * or create a new handler. Double-checked locking is used to ensure thread safety while minimizing the + * time spent in the critical section. + * If the `ingressHandler` is `null`, the handler is safely initialized outside of the lock as no other + * thread would be modifying it. + * </p> + * + * @param serviceType The type of Azure service to handle (e.g., ABFS, Blob, etc.). + * @param blockFactory The factory to create data blocks used in the handler. + * @param bufferSize The buffer size used by the handler for data processing. + * @param isSwitch A flag indicating whether a switch operation is in progress. + * @param blockManager The manager responsible for handling blocks of data during processing. + * + * @return The initialized or existing Azure ingress handler. + * @throws IOException If an I/O error occurs during handler creation or data processing. + */ + private AzureIngressHandler createIngressHandler(AbfsServiceType serviceType, + DataBlocks.BlockFactory blockFactory, + int bufferSize, boolean isSwitch, AzureBlockManager blockManager) throws IOException { + if (ingressHandler != null) { + if (switchCompleted) { + return ingressHandler; // Return the handler if it's already initialized and the switch is completed + } + // If the switch is incomplete, lock to safely modify + lock.lock(); + try { + // Double-check the condition after acquiring the lock + if (switchCompleted) { + return ingressHandler; // Return the handler if it's now completed + } + // If the switch is still incomplete, create a new handler + return createNewHandler(serviceType, blockFactory, bufferSize, isSwitch, blockManager); + } finally { + lock.unlock(); + } + } + // If ingressHandler is null, no lock is needed; safely initialize it outside the lock + return createNewHandler(serviceType, blockFactory, bufferSize, isSwitch, blockManager); } + // Helper method to create a new handler, used in both scenarios (locked and unlocked) + private AzureIngressHandler createNewHandler(AbfsServiceType serviceType, + DataBlocks.BlockFactory blockFactory, + int bufferSize, + boolean isSwitch, + AzureBlockManager blockManager) throws IOException { + this.client = clientHandler.getClient(serviceType); + if (isDFSToBlobFallbackEnabled && serviceTypeAtInit != AbfsServiceType.DFS) { + throw new InvalidConfigurationValueException( + "The ingress service type must be configured as DFS"); + } + if (isDFSToBlobFallbackEnabled && !isSwitch) { + ingressHandler = new AzureDfsToBlobIngressFallbackHandler(this, + blockFactory, bufferSize, eTag, clientHandler); + } else if (serviceType == AbfsServiceType.BLOB) { + ingressHandler = new AzureBlobIngressHandler(this, blockFactory, + bufferSize, eTag, clientHandler, blockManager); + } else { + ingressHandler = new AzureDFSIngressHandler(this, blockFactory, + bufferSize, eTag, clientHandler); + } + if (isSwitch) { + switchCompleted = true; + } + return ingressHandler; + } + + /** + * Switches the current ingress handler and service type if necessary. + * + * @throws IOException if there is an error creating the new ingress handler. + */ + protected void switchHandler() throws IOException { + if (serviceTypeAtInit != currentExecutingServiceType) { + return; + } + if (serviceTypeAtInit == AbfsServiceType.BLOB) { Review Comment: taken -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org