[
https://issues.apache.org/jira/browse/HADOOP-19232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17922162#comment-17922162
]
ASF GitHub Bot commented on HADOOP-19232:
-----------------------------------------
anmolanmol1234 commented on code in PR #7272:
URL: https://github.com/apache/hadoop/pull/7272#discussion_r1934323725
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -338,9 +531,14 @@ private void uploadBlockAsync(DataBlocks.DataBlock
blockToUpload,
*/
AppendRequestParameters reqParams = new AppendRequestParameters(
offset, 0, bytesLength, mode, false, leaseId,
isExpectHeaderEnabled);
- AbfsRestOperation op = getClient().append(path,
- blockUploadData.toByteArray(), reqParams, cachedSasToken.get(),
- contextEncryptionAdapter, new TracingContext(tracingContext));
+ AbfsRestOperation op;
+ try {
+ op = remoteWrite(blockToUpload, blockUploadData, reqParams,
tracingContext);
+ } catch (InvalidIngressServiceException ex) {
+ switchHandler();
Review Comment:
This fallback mechanism ensures compatibility across different endpoints.
For example, if a file is created over DFS and later updated over the Blob
endpoint, the operation would fail. Deterministic switching ensures that in
case of an InvalidIngressServiceException, the system automatically selects the
correct endpoint.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -631,45 +805,89 @@ private synchronized void waitForAppendsToComplete()
throws IOException {
}
}
+ /**
+ * Flushes the written bytes to the Azure Blob Storage service, ensuring all
+ * appends are completed. This method is typically called during a close
operation.
+ *
+ * @param isClose indicates whether this flush is happening as part of a
close operation.
+ * @throws IOException if an I/O error occurs during the flush operation.
+ */
private synchronized void flushWrittenBytesToService(boolean isClose) throws
IOException {
+ // Ensure all appends are completed before flushing.
waitForAppendsToComplete();
+ // Flush the written bytes to the service.
flushWrittenBytesToServiceInternal(position, false, isClose);
}
+ /**
+ * Asynchronously flushes the written bytes to the Azure Blob Storage
service.
+ * This method ensures that the write operation queue is managed and only
flushes
+ * if there are uncommitted data beyond the last flush offset.
+ *
+ * @throws IOException if an I/O error occurs during the flush operation.
+ */
private synchronized void flushWrittenBytesToServiceAsync() throws
IOException {
+ // Manage the write operation queue to ensure efficient writes
shrinkWriteOperationQueue();
+ // Only flush if there are uncommitted data beyond the last flush offset
if (this.lastTotalAppendOffset > this.lastFlushOffset) {
this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true,
false/*Async flush on close not permitted*/);
}
}
+ /**
+ * Flushes the written bytes to the Azure Blob Storage service.
+ *
+ * @param offset the offset up to which data needs to be
flushed.
+ * @param retainUncommitedData whether to retain uncommitted data after
flush.
+ * @param isClose whether this flush is happening as part of a
close operation.
+ * @throws IOException if an I/O error occurs.
+ */
private synchronized void flushWrittenBytesToServiceInternal(final long
offset,
final boolean retainUncommitedData, final boolean isClose) throws
IOException {
// flush is called for appendblob only on close
if (this.isAppendBlob && !isClose) {
return;
}
+ // Tracker to monitor performance metrics
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"flushWrittenBytesToServiceInternal", "flush")) {
- AbfsRestOperation op = getClient().flush(path, offset,
retainUncommitedData,
- isClose, cachedSasToken.get(), leaseId, contextEncryptionAdapter,
- new TracingContext(tracingContext));
- cachedSasToken.update(op.getSasToken());
- perfInfo.registerResult(op.getResult()).registerSuccess(true);
- } catch (AzureBlobFileSystemException ex) {
- if (ex instanceof AbfsRestOperationException) {
- if (((AbfsRestOperationException) ex).getStatusCode() ==
HttpURLConnection.HTTP_NOT_FOUND) {
+ AbfsRestOperation op;
+ try {
+ // Attempt to flush data to the remote service.
+ op = remoteFlush(offset, retainUncommitedData, isClose, leaseId,
+ tracingContext);
+ } catch (InvalidIngressServiceException ex) {
+ // If an invalid ingress service is encountered, switch handler and
retry.
+ switchHandler();
Review Comment:
addressed above and added log message
> ABFS: [FnsOverBlob] Implementing Ingress Support with various Fallback
> Handling
> -------------------------------------------------------------------------------
>
> Key: HADOOP-19232
> URL: https://issues.apache.org/jira/browse/HADOOP-19232
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.4.0
> Reporter: Anuj Modi
> Assignee: Anmol Asrani
> Priority: Major
> Labels: pull-request-available
>
> Scope of this task is to refactor the AbfsOutputStream class to handle the
> ingress for DFS and Blob endpoint effectively.
> More details will be added soon.
> Perquisites for this Patch:
> 1. [HADOOP-19187] ABFS: [FnsOverBlob]Making AbfsClient Abstract for
> supporting both DFS and Blob Endpoint - ASF JIRA (apache.org)
> 2. [HADOOP-19226] ABFS: [FnsOverBlob]Implementing Azure Rest APIs on Blob
> Endpoint for AbfsBlobClient - ASF JIRA (apache.org)
> 3. [HADOOP-19207] ABFS: [FnsOverBlob]Response Handling of Blob Endpoint APIs
> and Metadata APIs - ASF JIRA (apache.org)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]