>From Ritik Raj <[email protected]>: Ritik Raj has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20565?usp=email )
Change subject: [NO ISSUE][CLOUD] Fix premature buffer release caused by flatMap cancel ...................................................................... [NO ISSUE][CLOUD] Fix premature buffer release caused by flatMap cancel - user model changes: no - storage format changes: no - interface changes: no Details: Azure SDK uses Reactor/Netty for async downloads, where each download is a Mono merged with flatMap. When one Mono fails, flatMap cancels others, causing Azure to release buffers while Netty may still write into them, leading to IllegalReferenceCountException. Switching to flatMapDelayError defers error propagation and prevents premature cancellation, allowing all downloads to complete safely. Ext-ref: MB-69283 Change-Id: If0c3577225a0dffffadfab13321a9c8702c3551e Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20565 Tested-by: Ritik Raj <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java A asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/ReactiveExponentialRetryPolicy.java M hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java 3 files changed, 112 insertions(+), 3 deletions(-) Approvals: Jenkins: Verified Murtadha Hubail: Looks good to me, approved Ritik Raj: Verified diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java index 12d8a19..57f044d 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java @@ -35,6 +35,7 @@ import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.control.nc.io.IOManager; +import org.apache.hyracks.util.ExponentialRetryPolicy; import com.azure.storage.blob.BlobAsyncClient; import com.azure.storage.blob.BlobContainerAsyncClient; @@ -43,12 +44,14 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; public class AzureParallelDownloader extends AbstractParallelDownloader { private final IOManager ioManager; private final BlobContainerAsyncClient blobContainerAsyncClient; private final IRequestProfilerLimiter profiler; private final AzBlobStorageClientConfig config; + private final Retry retryPolicy; public AzureParallelDownloader(IOManager ioManager, BlobContainerAsyncClient blobContainerAsyncClient, IRequestProfilerLimiter profiler, AzBlobStorageClientConfig config) { @@ -56,6 +59,7 @@ this.blobContainerAsyncClient = blobContainerAsyncClient; this.profiler = profiler; this.config = config; + this.retryPolicy = ReactiveExponentialRetryPolicy.retryPolicy(new ExponentialRetryPolicy()); } @Override @@ -96,8 +100,9 @@ } private void waitForFileDownloads(List<Mono<Void>> downloads) throws HyracksDataException { - runBlockingWithExceptionHandling( - () -> Flux.fromIterable(downloads).flatMap(mono -> mono, downloads.size()).then().block()); + runBlockingWithExceptionHandling(() -> Flux.fromIterable(downloads) + .flatMapDelayError(mono -> mono.retryWhen(retryPolicy), downloads.size(), downloads.size()).then() + .block()); } @Override @@ -111,8 +116,9 @@ directoryDownloads.add(directoryTask); } + int concurrency = config.getRequestsMaxPendingHttpConnections(); runBlockingWithExceptionHandling(() -> Flux.fromIterable(directoryDownloads) - .flatMap(mono -> mono, config.getRequestsMaxPendingHttpConnections()).then().block()); + .flatMapDelayError(mono -> mono.retryWhen(retryPolicy), concurrency, concurrency).then().block()); return failedFiles; } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/ReactiveExponentialRetryPolicy.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/ReactiveExponentialRetryPolicy.java new file mode 100644 index 0000000..ba2e3d4 --- /dev/null +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/ReactiveExponentialRetryPolicy.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.cloud.clients.azure.blobstorage; + +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.hyracks.api.util.ExceptionUtils; +import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil; +import org.apache.hyracks.util.ExponentialRetryPolicy; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import reactor.util.retry.Retry; + +/** + * Utility methods for building Reactor {@link Retry} policies that behave similarly to the + * blocking {@link ExponentialRetryPolicy}. + */ +public class ReactiveExponentialRetryPolicy { + + private static final Logger LOGGER = LogManager.getLogger(); + + private ReactiveExponentialRetryPolicy() { + } + + /** + * Creates a {@link Retry} instance based on an {@link ExponentialRetryPolicy}. + * + * @param policy the blocking policy to mirror. If {@code null}, defaults are used. + * @return a Reactor {@link Retry} behaving similarly to the provided policy. + */ + public static Retry retryPolicy(ExponentialRetryPolicy policy) { + ExponentialRetryPolicy effectivePolicy = policy != null ? policy + : new ExponentialRetryPolicy(CloudRetryableRequestUtil.NUMBER_OF_RETRIES, + CloudRetryableRequestUtil.MAX_DELAY_BETWEEN_RETRIES); + long initialDelay = Math.max(0L, effectivePolicy.getInitialDelay()); + long maxDelay = Math.max(0L, effectivePolicy.getMaxDelay()); + int maxRetries = Math.max(0, effectivePolicy.getMaxRetries()); + long maxAttempts = Math.max(1L, (long) maxRetries + 1L); + return Retry.backoff(maxAttempts, Duration.ofMillis(initialDelay)).maxBackoff(Duration.ofMillis(maxDelay)) + .filter(ReactiveExponentialRetryPolicy::isRetryable).doBeforeRetry(signal -> { + long retriesSoFar = signal.totalRetries(); + long delayMillis = computeDelayMillis(initialDelay, maxDelay, retriesSoFar); + long attempt = retriesSoFar + 1; + LOGGER.info("Retrying after {}ms, attempt {}/{}", delayMillis, attempt, maxRetries); + }).transientErrors(true); + } + + private static long computeDelayMillis(long initialDelay, long maxDelay, long retriesSoFar) { + if (initialDelay <= 0 || maxDelay <= 0) { + return 0L; + } + + long delay = initialDelay; + for (long i = 0; i < retriesSoFar; i++) { + delay = delay > maxDelay / 2 ? maxDelay : delay * 2; + } + + long jitteredDelay = ThreadLocalRandom.current().nextLong(1, delay + 1); + + return Math.min(jitteredDelay, maxDelay); + } + + private static boolean isRetryable(Throwable error) { + if (error instanceof IllegalArgumentException) { + return false; + } + if (ExceptionUtils.causedByInterrupt(error)) { + Thread.currentThread().interrupt(); + return false; + } + return true; + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java index 8967e3e..385525c 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java @@ -101,6 +101,18 @@ return false; } + public int getMaxRetries() { + return maxRetries; + } + + public long getInitialDelay() { + return delay; + } + + public long getMaxDelay() { + return maxDelay; + } + private static boolean isUnstable() { return Boolean.getBoolean(CLOUD_UNSTABLE_MODE); } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20565?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: phoenix Gerrit-Change-Id: If0c3577225a0dffffadfab13321a9c8702c3551e Gerrit-Change-Number: 20565 Gerrit-PatchSet: 4 Gerrit-Owner: Ritik Raj <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Ritik Raj <[email protected]>
