>From Ritik Raj <[email protected]>:

Ritik Raj has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20565?usp=email )


Change subject: [NO ISSUE][CLOUD] Fix premature buffer release caused by 
flatMap cancel
......................................................................

[NO ISSUE][CLOUD] Fix premature buffer release caused by flatMap cancel

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Azure SDK uses Reactor/Netty for async downloads, where each download
is a Mono merged with flatMap. When one Mono fails, flatMap cancels
others, causing Azure to release buffers while Netty may still write
into them, leading to IllegalReferenceCountException. Switching to
flatMapDelayError defers error propagation and prevents premature
cancellation, allowing all downloads to complete safely.

Change-Id: If0c3577225a0dffffadfab13321a9c8702c3551e
---
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
1 file changed, 4 insertions(+), 3 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/65/20565/1

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
index 25bc217..8310ce7 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
@@ -97,8 +97,8 @@
     }

     private void waitForFileDownloads(List<Mono<Void>> downloads) throws 
HyracksDataException {
-        runBlockingWithExceptionHandling(
-                () -> Flux.fromIterable(downloads).flatMap(mono -> mono, 
downloads.size()).then().block());
+        runBlockingWithExceptionHandling(() -> Flux.fromIterable(downloads)
+                .flatMapDelayError(mono -> mono, downloads.size(), 
downloads.size()).then().block());
     }

     @Override
@@ -113,8 +113,9 @@
             directoryDownloads.add(directoryTask);
         }

+        int concurrency = config.getRequestsMaxPendingHttpConnections();
         runBlockingWithExceptionHandling(() -> 
Flux.fromIterable(directoryDownloads)
-                .flatMap(mono -> mono, 
config.getRequestsMaxPendingHttpConnections()).then().block());
+                .flatMapDelayError(mono -> mono, concurrency, 
concurrency).then().block());

         return failedFiles;
     }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20565?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: If0c3577225a0dffffadfab13321a9c8702c3551e
Gerrit-Change-Number: 20565
Gerrit-PatchSet: 1
Gerrit-Owner: Ritik Raj <[email protected]>

Reply via email to