>From Michael Blow <[email protected]>: Michael Blow has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21154?usp=email )
Change subject: [NO ISSUE][*DB][STO] Parallel cloud downloader lifecycle improvements ...................................................................... [NO ISSUE][*DB][STO] Parallel cloud downloader lifecycle improvements - At most one downloader per client- close on last use - Enable update of cloud config on active downloader Change-Id: Ibb284aa77f648afece530bcff4cc5eece693afd7 --- A asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/IS3Downloader.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java 4 files changed, 231 insertions(+), 26 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/54/21154/1 diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/IS3Downloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/IS3Downloader.java new file mode 100644 index 0000000..a61787d --- /dev/null +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/IS3Downloader.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.cloud.clients.aws.s3; + +import org.apache.asterix.cloud.clients.IParallelDownloader; +import org.apache.hyracks.util.annotations.AiProvenance; + +/** + * Package-private extension of {@link IParallelDownloader} that adds configuration reload support + * for S3-specific downloader implementations. + */ +@AiProvenance(agent = AiProvenance.Agent.CLAUDE_SONNET_4_6, tool = AiProvenance.Tool.GITHUB_COPILOT, contributionKind = AiProvenance.ContributionKind.GENERATED) +interface IS3Downloader extends IParallelDownloader { + /** + * Reloads the downloader's internal S3 client(s) with the given new configuration. + * In-flight downloads are completed against the old client before it is closed. + */ + void reloadConfiguration(S3ClientConfig newConfig); + + /** + * Returns the client type this downloader was created for. + * Used by {@link S3CloudClient} to detect a type change on reload. + */ + S3ClientConfig.S3ParallelDownloaderClientType getClientType(); +} diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java index c46f365..5e3e6ef 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java @@ -34,6 +34,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.Predicate; @@ -57,6 +58,7 @@ import org.apache.hyracks.api.util.IoUtil; import org.apache.hyracks.cloud.io.ICloudProperties; import org.apache.hyracks.control.nc.io.IOManager; +import org.apache.hyracks.util.annotations.AiProvenance; import org.apache.hyracks.util.annotations.ThreadSafe; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -93,12 +95,15 @@ @ThreadSafe public final class S3CloudClient implements ICloudClient { private static final Logger LOGGER = LogManager.getLogger(); - private final S3ClientConfig config; + private volatile S3ClientConfig config; private volatile CloseableAwsClients awsClients; private volatile S3Client s3Client; private final ICloudGuardian guardian; private final IRequestProfilerLimiter profiler; private final int writeBufferSize; + // Lazily created on demand, ref-counted — closed when the last consumer calls close(). + // Initialized to a drained sentinel so createParallelDownloader never needs a null check. + private volatile RefCountedDownloader cachedHolder = RefCountedDownloader.DRAINED_SENTINEL; public S3CloudClient(S3ClientConfig config, ICloudGuardian guardian) { this(config, buildClient(config), guardian); @@ -330,10 +335,23 @@ @Override public IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager) { - S3ClientConfig.S3ParallelDownloaderClientType parallelDownloaderClientType = config.getParallelDownloaderClientType(); - return switch (parallelDownloaderClientType) { - case CRT, ASYNC -> new S3ParallelDownloader(bucket, ioManager, config, profiler); - case SYNC -> new S3SyncDownloader(bucket, ioManager, config, profiler); + RefCountedDownloader holder = cachedHolder; + if (holder.isDrained()) { + synchronized (this) { + holder = cachedHolder; + if (holder.isDrained()) { + holder = new RefCountedDownloader(buildDownloader(bucket, ioManager, config)); + cachedHolder = holder; + } + } + } + return holder.acquire(); + } + + private IS3Downloader buildDownloader(String bucket, IOManager ioManager, S3ClientConfig cfg) { + return switch (cfg.getParallelDownloaderClientType()) { + case CRT, ASYNC -> new S3ParallelDownloader(bucket, ioManager, cfg, profiler); + case SYNC -> new S3SyncDownloader(bucket, ioManager, cfg, profiler); }; } @@ -354,6 +372,7 @@ @Override public void close() { + cachedHolder.forceClose(); AwsUtils.closeClients(awsClients); } @@ -364,6 +383,17 @@ CloseableAwsClients oldAwsClients = awsClients; awsClients = newAwsClients; s3Client = (S3Client) newAwsClients.getConsumingClient(); + config = newConfig; + RefCountedDownloader existing = cachedHolder; + if (!existing.isDrained()) { + if (existing.getClientType() != newConfig.getParallelDownloaderClientType()) { + // Client type changed — vacate the slot; active consumers drain and close it naturally, + // and the next createParallelDownloader will create a fresh instance of the correct type. + cachedHolder = RefCountedDownloader.DRAINED_SENTINEL; + } else { + existing.reloadConfiguration(newConfig); + } + } LOGGER.info("reloaded S3 client configuration"); AwsUtils.closeClients(oldAwsClients); } @@ -453,4 +483,94 @@ } return cloudFiles; } + + /** + * Wraps an {@link IS3Downloader} with a reference count. Each call to {@link #acquire()} returns + * an {@link IParallelDownloader} handle; when the last handle is {@code close()}d the underlying + * downloader is closed and this holder is marked drained. A new holder will be created on the + * next {@link #createParallelDownloader} call. + */ + @AiProvenance(agent = AiProvenance.Agent.CLAUDE_SONNET_4_6, tool = AiProvenance.Tool.GITHUB_COPILOT, contributionKind = AiProvenance.ContributionKind.GENERATED) + private static class RefCountedDownloader { + private static final Logger LOGGER = LogManager.getLogger(); + + /** Sentinel instance that is always drained — used as the initial value of {@code cachedHolder}. */ + static final RefCountedDownloader DRAINED_SENTINEL = new RefCountedDownloader(null) { + @Override + boolean isDrained() { + return true; + } + + @Override + void forceClose() { + /* no-op */ } + }; + + private final IS3Downloader delegate; + private final AtomicInteger refCount = new AtomicInteger(0); + private volatile boolean drained = false; + + RefCountedDownloader(IS3Downloader delegate) { + this.delegate = delegate; + } + + /** Increments the ref-count and returns a handle whose {@code close()} decrements it. */ + IParallelDownloader acquire() { + refCount.incrementAndGet(); + java.util.concurrent.atomic.AtomicBoolean released = new java.util.concurrent.atomic.AtomicBoolean(false); + return new IParallelDownloader() { + @Override + public void downloadFiles(Collection<FileReference> toDownload) throws HyracksDataException { + delegate.downloadFiles(toDownload); + } + + @Override + public void downloadDirectoriesWithRetry(Collection<FileReference> toDownload) + throws HyracksDataException { + delegate.downloadDirectoriesWithRetry(toDownload); + } + + @Override + public void close() throws HyracksDataException { + if (released.compareAndSet(false, true) && refCount.decrementAndGet() == 0) { + drain(); + } + } + }; + } + + boolean isDrained() { + return drained; + } + + S3ClientConfig.S3ParallelDownloaderClientType getClientType() { + return delegate.getClientType(); + } + + void reloadConfiguration(S3ClientConfig newConfig) { + delegate.reloadConfiguration(newConfig); + } + + /** Closes the underlying downloader immediately, regardless of the ref-count. Idempotent. */ + void forceClose() { + if (drained) { + return; + } + drained = true; + try { + delegate.close(); + } catch (HyracksDataException e) { + LOGGER.warn("error force-closing downloader", e); + } + } + + private void drain() { + drained = true; + try { + delegate.close(); + } catch (HyracksDataException e) { + LOGGER.warn("error closing downloader on last release", e); + } + } + } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java index a64e07b..fe836d5 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java @@ -65,25 +65,52 @@ * Parallel S3 downloader with optional custom DNS resolution. */ @ThreadSafe -class S3ParallelDownloader extends AbstractParallelDownloader { +class S3ParallelDownloader extends AbstractParallelDownloader implements IS3Downloader { private static final Logger LOGGER = LogManager.getLogger(); private final String bucket; private final IOManager ioManager; - private final S3AsyncClient s3AsyncClient; - private final S3TransferManager transferManager; - private final S3ClientConfig config; private final IRequestProfilerLimiter profiler; + // Atomic holder for the async client and transfer manager, swapped together on reload + private volatile AsyncClientHolder holder; // Shared event loop group (reused across Netty client instances if multiple downloaders are created) private static final SdkEventLoopGroup SHARED_EVENT_LOOP = SdkEventLoopGroup.builder() .numberOfThreads(Math.max(2, Runtime.getRuntime().availableProcessors())).build(); + /** + * Holds the S3AsyncClient and S3TransferManager together so they can be swapped atomically on reload. + */ + private record AsyncClientHolder(S3AsyncClient asyncClient, S3TransferManager transferManager, + S3ClientConfig config) { + void close() { + transferManager.close(); + asyncClient.close(); + } + } + S3ParallelDownloader(String bucket, IOManager ioManager, S3ClientConfig config, IRequestProfilerLimiter profiler) { this.bucket = bucket; this.ioManager = ioManager; - this.config = config; this.profiler = profiler; - this.s3AsyncClient = createAsyncClient(config); - this.transferManager = createS3TransferManager(s3AsyncClient); + this.holder = buildHolder(config); + } + + @Override + public S3ClientConfig.S3ParallelDownloaderClientType getClientType() { + return holder.config().getParallelDownloaderClientType(); + } + + @Override + public void reloadConfiguration(S3ClientConfig newConfig) { + AsyncClientHolder oldHolder = holder; + holder = buildHolder(newConfig); + LOGGER.info("reloaded S3ParallelDownloader configuration"); + // TODO: configurable delay before closing the old client to allow in-flight requests to complete. + oldHolder.close(); + } + + private static AsyncClientHolder buildHolder(S3ClientConfig config) { + S3AsyncClient asyncClient = createAsyncClient(config); + return new AsyncClientHolder(asyncClient, S3TransferManager.builder().s3Client(asyncClient).build(), config); } @Override @@ -104,15 +131,15 @@ @Override public void close() { - transferManager.close(); - s3AsyncClient.close(); // Do NOT close SHARED_EVENT_LOOP here (shared globally). Provide a separate shutdown hook if needed. + holder.close(); } private void downloadFilesAndWait(Collection<FileReference> toDownload) throws IOException, ExecutionException, InterruptedException { + AsyncClientHolder h = holder; List<CompletableFuture<CompletedFileDownload>> downloads = new ArrayList<>(); - int maxPending = config.getRequestsMaxPendingHttpConnections(); + int maxPending = h.config().getRequestsMaxPendingHttpConnections(); for (FileReference fileReference : toDownload) { // multipart download profiler.objectGet(); @@ -123,14 +150,14 @@ // GetObjectRequest GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder(); requestBuilder.bucket(bucket); - requestBuilder.key(config.getPrefix() + fileReference.getRelativePath()); + requestBuilder.key(h.config().getPrefix() + fileReference.getRelativePath()); // Download object DownloadFileRequest.Builder builder = DownloadFileRequest.builder(); builder.getObjectRequest(requestBuilder.build()); builder.destination(fileReference.getFile()); - FileDownload fileDownload = transferManager.downloadFile(builder.build()); + FileDownload fileDownload = h.transferManager().downloadFile(builder.build()); downloads.add(fileDownload.completionFuture()); if (maxPending > 0 && downloads.size() >= maxPending) { waitForFileDownloads(downloads); @@ -151,14 +178,15 @@ private List<CompletableFuture<CompletedDirectoryDownload>> startDownloadingDirectories( Collection<FileReference> toDownload) { + AsyncClientHolder h = holder; List<CompletableFuture<CompletedDirectoryDownload>> downloads = new ArrayList<>(); for (FileReference fileReference : toDownload) { DownloadDirectoryRequest.Builder builder = DownloadDirectoryRequest.builder(); builder.bucket(bucket); builder.destination(fileReference.getFile().toPath()); builder.listObjectsV2RequestTransformer( - l -> l.prefix(config.getPrefix() + fileReference.getRelativePath())); - DirectoryDownload directoryDownload = transferManager.downloadDirectory(builder.build()); + l -> l.prefix(h.config().getPrefix() + fileReference.getRelativePath())); + DirectoryDownload directoryDownload = h.transferManager().downloadDirectory(builder.build()); downloads.add(directoryDownload.completionFuture()); } return downloads; @@ -264,8 +292,4 @@ return builder.build(); } - private S3TransferManager createS3TransferManager(S3AsyncClient s3AsyncClient) { - return S3TransferManager.builder().s3Client(s3AsyncClient).build(); - } - } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java index 498c34b..dac4b97 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java @@ -50,13 +50,13 @@ import software.amazon.awssdk.services.s3.model.S3Object; @ThreadSafe -public class S3SyncDownloader extends AbstractParallelDownloader { +public class S3SyncDownloader extends AbstractParallelDownloader implements IS3Downloader { private static final Logger LOGGER = LogManager.getLogger(); private final String bucket; private final IOManager ioManager; - private final S3Client s3Client; - private final S3ClientConfig config; + private volatile S3Client s3Client; + private volatile S3ClientConfig config; private final IRequestProfilerLimiter profiler; private final ExecutorService executorService; @@ -66,10 +66,27 @@ this.config = config; this.profiler = profiler; this.s3Client = (S3Client) S3CloudClient.buildClient(config).getConsumingClient(); + // TODO: we should use the service context's executor instead of creating a separate one. If/when we do this, + // remove executorService.shutdown() from close() below this.executorService = Executors.newCachedThreadPool(); } @Override + public S3ClientConfig.S3ParallelDownloaderClientType getClientType() { + return S3ClientConfig.S3ParallelDownloaderClientType.SYNC; + } + + @Override + public void reloadConfiguration(S3ClientConfig newConfig) { + S3Client oldClient = s3Client; + s3Client = (S3Client) S3CloudClient.buildClient(newConfig).getConsumingClient(); + config = newConfig; + LOGGER.info("reloaded S3SyncDownloader configuration"); + // TODO: configurable delay before closing the old client to allow in-flight requests to complete. + oldClient.close(); + } + + @Override public void downloadFiles(Collection<FileReference> toDownload) throws HyracksDataException { try { downloadFilesAndWait(toDownload); @@ -196,6 +213,9 @@ @Override public void close() throws HyracksDataException { + // TODO: we should use the service context's executor instead of creating a separate one. If/when we do this, + // remove this executorService.shutdown() + executorService.shutdown(); s3Client.close(); } } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21154?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: newchange Gerrit-Project: asterixdb Gerrit-Branch: lumina Gerrit-Change-Id: Ibb284aa77f648afece530bcff4cc5eece693afd7 Gerrit-Change-Number: 21154 Gerrit-PatchSet: 1 Gerrit-Owner: Michael Blow <[email protected]>
