>From Michael Blow <[email protected]>:

Michael Blow has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21154?usp=email )


Change subject: [NO ISSUE][*DB][STO] Parallel cloud downloader lifecycle 
improvements
......................................................................

[NO ISSUE][*DB][STO] Parallel cloud downloader lifecycle improvements

- At most one downloader per client- close on last use
- Enable update of cloud config on active downloader

Change-Id: Ibb284aa77f648afece530bcff4cc5eece693afd7
---
A 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/IS3Downloader.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
4 files changed, 231 insertions(+), 26 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/54/21154/1

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/IS3Downloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/IS3Downloader.java
new file mode 100644
index 0000000..a61787d
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/IS3Downloader.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.aws.s3;
+
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.hyracks.util.annotations.AiProvenance;
+
+/**
+ * Package-private extension of {@link IParallelDownloader} that adds 
configuration reload support
+ * for S3-specific downloader implementations.
+ */
+@AiProvenance(agent = AiProvenance.Agent.CLAUDE_SONNET_4_6, tool = 
AiProvenance.Tool.GITHUB_COPILOT, contributionKind = 
AiProvenance.ContributionKind.GENERATED)
+interface IS3Downloader extends IParallelDownloader {
+    /**
+     * Reloads the downloader's internal S3 client(s) with the given new 
configuration.
+     * In-flight downloads are completed against the old client before it is 
closed.
+     */
+    void reloadConfiguration(S3ClientConfig newConfig);
+
+    /**
+     * Returns the client type this downloader was created for.
+     * Used by {@link S3CloudClient} to detect a type change on reload.
+     */
+    S3ClientConfig.S3ParallelDownloaderClientType getClientType();
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index c46f365..5e3e6ef 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -34,6 +34,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.function.Predicate;

@@ -57,6 +58,7 @@
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.cloud.io.ICloudProperties;
 import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.util.annotations.AiProvenance;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -93,12 +95,15 @@
 @ThreadSafe
 public final class S3CloudClient implements ICloudClient {
     private static final Logger LOGGER = LogManager.getLogger();
-    private final S3ClientConfig config;
+    private volatile S3ClientConfig config;
     private volatile CloseableAwsClients awsClients;
     private volatile S3Client s3Client;
     private final ICloudGuardian guardian;
     private final IRequestProfilerLimiter profiler;
     private final int writeBufferSize;
+    // Lazily created on demand, ref-counted — closed when the last consumer 
calls close().
+    // Initialized to a drained sentinel so createParallelDownloader never 
needs a null check.
+    private volatile RefCountedDownloader cachedHolder = 
RefCountedDownloader.DRAINED_SENTINEL;

     public S3CloudClient(S3ClientConfig config, ICloudGuardian guardian) {
         this(config, buildClient(config), guardian);
@@ -330,10 +335,23 @@

     @Override
     public IParallelDownloader createParallelDownloader(String bucket, 
IOManager ioManager) {
-        S3ClientConfig.S3ParallelDownloaderClientType 
parallelDownloaderClientType = config.getParallelDownloaderClientType();
-        return switch (parallelDownloaderClientType) {
-            case CRT, ASYNC -> new S3ParallelDownloader(bucket, ioManager, 
config, profiler);
-            case SYNC -> new S3SyncDownloader(bucket, ioManager, config, 
profiler);
+        RefCountedDownloader holder = cachedHolder;
+        if (holder.isDrained()) {
+            synchronized (this) {
+                holder = cachedHolder;
+                if (holder.isDrained()) {
+                    holder = new RefCountedDownloader(buildDownloader(bucket, 
ioManager, config));
+                    cachedHolder = holder;
+                }
+            }
+        }
+        return holder.acquire();
+    }
+
+    private IS3Downloader buildDownloader(String bucket, IOManager ioManager, 
S3ClientConfig cfg) {
+        return switch (cfg.getParallelDownloaderClientType()) {
+            case CRT, ASYNC -> new S3ParallelDownloader(bucket, ioManager, 
cfg, profiler);
+            case SYNC -> new S3SyncDownloader(bucket, ioManager, cfg, 
profiler);
         };
     }

@@ -354,6 +372,7 @@

     @Override
     public void close() {
+        cachedHolder.forceClose();
         AwsUtils.closeClients(awsClients);
     }

@@ -364,6 +383,17 @@
         CloseableAwsClients oldAwsClients = awsClients;
         awsClients = newAwsClients;
         s3Client = (S3Client) newAwsClients.getConsumingClient();
+        config = newConfig;
+        RefCountedDownloader existing = cachedHolder;
+        if (!existing.isDrained()) {
+            if (existing.getClientType() != 
newConfig.getParallelDownloaderClientType()) {
+                // Client type changed — vacate the slot; active consumers 
drain and close it naturally,
+                // and the next createParallelDownloader will create a fresh 
instance of the correct type.
+                cachedHolder = RefCountedDownloader.DRAINED_SENTINEL;
+            } else {
+                existing.reloadConfiguration(newConfig);
+            }
+        }
         LOGGER.info("reloaded S3 client configuration");
         AwsUtils.closeClients(oldAwsClients);
     }
@@ -453,4 +483,94 @@
         }
         return cloudFiles;
     }
+
+    /**
+     * Wraps an {@link IS3Downloader} with a reference count. Each call to 
{@link #acquire()} returns
+     * an {@link IParallelDownloader} handle; when the last handle is {@code 
close()}d the underlying
+     * downloader is closed and this holder is marked drained. A new holder 
will be created on the
+     * next {@link #createParallelDownloader} call.
+     */
+    @AiProvenance(agent = AiProvenance.Agent.CLAUDE_SONNET_4_6, tool = 
AiProvenance.Tool.GITHUB_COPILOT, contributionKind = 
AiProvenance.ContributionKind.GENERATED)
+    private static class RefCountedDownloader {
+        private static final Logger LOGGER = LogManager.getLogger();
+
+        /** Sentinel instance that is always drained — used as the initial 
value of {@code cachedHolder}. */
+        static final RefCountedDownloader DRAINED_SENTINEL = new 
RefCountedDownloader(null) {
+            @Override
+            boolean isDrained() {
+                return true;
+            }
+
+            @Override
+            void forceClose() {
+                /* no-op */ }
+        };
+
+        private final IS3Downloader delegate;
+        private final AtomicInteger refCount = new AtomicInteger(0);
+        private volatile boolean drained = false;
+
+        RefCountedDownloader(IS3Downloader delegate) {
+            this.delegate = delegate;
+        }
+
+        /** Increments the ref-count and returns a handle whose {@code 
close()} decrements it. */
+        IParallelDownloader acquire() {
+            refCount.incrementAndGet();
+            java.util.concurrent.atomic.AtomicBoolean released = new 
java.util.concurrent.atomic.AtomicBoolean(false);
+            return new IParallelDownloader() {
+                @Override
+                public void downloadFiles(Collection<FileReference> 
toDownload) throws HyracksDataException {
+                    delegate.downloadFiles(toDownload);
+                }
+
+                @Override
+                public void 
downloadDirectoriesWithRetry(Collection<FileReference> toDownload)
+                        throws HyracksDataException {
+                    delegate.downloadDirectoriesWithRetry(toDownload);
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    if (released.compareAndSet(false, true) && 
refCount.decrementAndGet() == 0) {
+                        drain();
+                    }
+                }
+            };
+        }
+
+        boolean isDrained() {
+            return drained;
+        }
+
+        S3ClientConfig.S3ParallelDownloaderClientType getClientType() {
+            return delegate.getClientType();
+        }
+
+        void reloadConfiguration(S3ClientConfig newConfig) {
+            delegate.reloadConfiguration(newConfig);
+        }
+
+        /** Closes the underlying downloader immediately, regardless of the 
ref-count. Idempotent. */
+        void forceClose() {
+            if (drained) {
+                return;
+            }
+            drained = true;
+            try {
+                delegate.close();
+            } catch (HyracksDataException e) {
+                LOGGER.warn("error force-closing downloader", e);
+            }
+        }
+
+        private void drain() {
+            drained = true;
+            try {
+                delegate.close();
+            } catch (HyracksDataException e) {
+                LOGGER.warn("error closing downloader on last release", e);
+            }
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
index a64e07b..fe836d5 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
@@ -65,25 +65,52 @@
  * Parallel S3 downloader with optional custom DNS resolution.
  */
 @ThreadSafe
-class S3ParallelDownloader extends AbstractParallelDownloader {
+class S3ParallelDownloader extends AbstractParallelDownloader implements 
IS3Downloader {
     private static final Logger LOGGER = LogManager.getLogger();
     private final String bucket;
     private final IOManager ioManager;
-    private final S3AsyncClient s3AsyncClient;
-    private final S3TransferManager transferManager;
-    private final S3ClientConfig config;
     private final IRequestProfilerLimiter profiler;
+    // Atomic holder for the async client and transfer manager, swapped 
together on reload
+    private volatile AsyncClientHolder holder;
     // Shared event loop group (reused across Netty client instances if 
multiple downloaders are created)
     private static final SdkEventLoopGroup SHARED_EVENT_LOOP = 
SdkEventLoopGroup.builder()
             .numberOfThreads(Math.max(2, 
Runtime.getRuntime().availableProcessors())).build();

+    /**
+     * Holds the S3AsyncClient and S3TransferManager together so they can be 
swapped atomically on reload.
+     */
+    private record AsyncClientHolder(S3AsyncClient asyncClient, 
S3TransferManager transferManager,
+            S3ClientConfig config) {
+        void close() {
+            transferManager.close();
+            asyncClient.close();
+        }
+    }
+
     S3ParallelDownloader(String bucket, IOManager ioManager, S3ClientConfig 
config, IRequestProfilerLimiter profiler) {
         this.bucket = bucket;
         this.ioManager = ioManager;
-        this.config = config;
         this.profiler = profiler;
-        this.s3AsyncClient = createAsyncClient(config);
-        this.transferManager = createS3TransferManager(s3AsyncClient);
+        this.holder = buildHolder(config);
+    }
+
+    @Override
+    public S3ClientConfig.S3ParallelDownloaderClientType getClientType() {
+        return holder.config().getParallelDownloaderClientType();
+    }
+
+    @Override
+    public void reloadConfiguration(S3ClientConfig newConfig) {
+        AsyncClientHolder oldHolder = holder;
+        holder = buildHolder(newConfig);
+        LOGGER.info("reloaded S3ParallelDownloader configuration");
+        // TODO: configurable delay before closing the old client to allow 
in-flight requests to complete.
+        oldHolder.close();
+    }
+
+    private static AsyncClientHolder buildHolder(S3ClientConfig config) {
+        S3AsyncClient asyncClient = createAsyncClient(config);
+        return new AsyncClientHolder(asyncClient, 
S3TransferManager.builder().s3Client(asyncClient).build(), config);
     }

     @Override
@@ -104,15 +131,15 @@

     @Override
     public void close() {
-        transferManager.close();
-        s3AsyncClient.close();
         // Do NOT close SHARED_EVENT_LOOP here (shared globally). Provide a 
separate shutdown hook if needed.
+        holder.close();
     }

     private void downloadFilesAndWait(Collection<FileReference> toDownload)
             throws IOException, ExecutionException, InterruptedException {
+        AsyncClientHolder h = holder;
         List<CompletableFuture<CompletedFileDownload>> downloads = new 
ArrayList<>();
-        int maxPending = config.getRequestsMaxPendingHttpConnections();
+        int maxPending = h.config().getRequestsMaxPendingHttpConnections();
         for (FileReference fileReference : toDownload) {
             // multipart download
             profiler.objectGet();
@@ -123,14 +150,14 @@
             // GetObjectRequest
             GetObjectRequest.Builder requestBuilder = 
GetObjectRequest.builder();
             requestBuilder.bucket(bucket);
-            requestBuilder.key(config.getPrefix() + 
fileReference.getRelativePath());
+            requestBuilder.key(h.config().getPrefix() + 
fileReference.getRelativePath());

             // Download object
             DownloadFileRequest.Builder builder = 
DownloadFileRequest.builder();
             builder.getObjectRequest(requestBuilder.build());
             builder.destination(fileReference.getFile());

-            FileDownload fileDownload = 
transferManager.downloadFile(builder.build());
+            FileDownload fileDownload = 
h.transferManager().downloadFile(builder.build());
             downloads.add(fileDownload.completionFuture());
             if (maxPending > 0 && downloads.size() >= maxPending) {
                 waitForFileDownloads(downloads);
@@ -151,14 +178,15 @@

     private List<CompletableFuture<CompletedDirectoryDownload>> 
startDownloadingDirectories(
             Collection<FileReference> toDownload) {
+        AsyncClientHolder h = holder;
         List<CompletableFuture<CompletedDirectoryDownload>> downloads = new 
ArrayList<>();
         for (FileReference fileReference : toDownload) {
             DownloadDirectoryRequest.Builder builder = 
DownloadDirectoryRequest.builder();
             builder.bucket(bucket);
             builder.destination(fileReference.getFile().toPath());
             builder.listObjectsV2RequestTransformer(
-                    l -> l.prefix(config.getPrefix() + 
fileReference.getRelativePath()));
-            DirectoryDownload directoryDownload = 
transferManager.downloadDirectory(builder.build());
+                    l -> l.prefix(h.config().getPrefix() + 
fileReference.getRelativePath()));
+            DirectoryDownload directoryDownload = 
h.transferManager().downloadDirectory(builder.build());
             downloads.add(directoryDownload.completionFuture());
         }
         return downloads;
@@ -264,8 +292,4 @@
         return builder.build();
     }

-    private S3TransferManager createS3TransferManager(S3AsyncClient 
s3AsyncClient) {
-        return S3TransferManager.builder().s3Client(s3AsyncClient).build();
-    }
-
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
index 498c34b..dac4b97 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
@@ -50,13 +50,13 @@
 import software.amazon.awssdk.services.s3.model.S3Object;

 @ThreadSafe
-public class S3SyncDownloader extends AbstractParallelDownloader {
+public class S3SyncDownloader extends AbstractParallelDownloader implements 
IS3Downloader {
     private static final Logger LOGGER = LogManager.getLogger();

     private final String bucket;
     private final IOManager ioManager;
-    private final S3Client s3Client;
-    private final S3ClientConfig config;
+    private volatile S3Client s3Client;
+    private volatile S3ClientConfig config;
     private final IRequestProfilerLimiter profiler;
     private final ExecutorService executorService;

@@ -66,10 +66,27 @@
         this.config = config;
         this.profiler = profiler;
         this.s3Client = (S3Client) 
S3CloudClient.buildClient(config).getConsumingClient();
+        // TODO: we should use the service context's executor instead of 
creating a separate one. If/when we do this,
+        //       remove executorService.shutdown() from close() below
         this.executorService = Executors.newCachedThreadPool();
     }

     @Override
+    public S3ClientConfig.S3ParallelDownloaderClientType getClientType() {
+        return S3ClientConfig.S3ParallelDownloaderClientType.SYNC;
+    }
+
+    @Override
+    public void reloadConfiguration(S3ClientConfig newConfig) {
+        S3Client oldClient = s3Client;
+        s3Client = (S3Client) 
S3CloudClient.buildClient(newConfig).getConsumingClient();
+        config = newConfig;
+        LOGGER.info("reloaded S3SyncDownloader configuration");
+        // TODO: configurable delay before closing the old client to allow 
in-flight requests to complete.
+        oldClient.close();
+    }
+
+    @Override
     public void downloadFiles(Collection<FileReference> toDownload) throws 
HyracksDataException {
         try {
             downloadFilesAndWait(toDownload);
@@ -196,6 +213,9 @@

     @Override
     public void close() throws HyracksDataException {
+        // TODO: we should use the service context's executor instead of 
creating a separate one. If/when we do this,
+        //       remove this executorService.shutdown()
+        executorService.shutdown();
         s3Client.close();
     }
 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21154?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: Ibb284aa77f648afece530bcff4cc5eece693afd7
Gerrit-Change-Number: 21154
Gerrit-PatchSet: 1
Gerrit-Owner: Michael Blow <[email protected]>

Reply via email to