>From Ritik Raj <[email protected]>:
Ritik Raj has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20549?usp=email )
Change subject: [NO ISSUE][CLOUD] Introduced config for S3 sync client
......................................................................
[NO ISSUE][CLOUD] Introduced config for S3 sync client
- user model changes: no
- storage format changes: no
- interface changes: no
Ext-ref: MB-69226
Change-Id: Id16c530916e7e223201e2395d7aa38a4640367b6
---
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
A
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
M
asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
8 files changed, 173 insertions(+), 31 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/49/20549/1
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
index 22de1e9..6f1c453 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.api.io.FileReference;
public interface IParallelDownloader extends AutoCloseable {
+ String STORAGE_SUB_DIR = "storage";
/**
* Downloads files in all partitions
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index b6683ad..28fd78a 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -47,19 +47,20 @@
private final boolean forcePathStyle;
private final boolean disableSslVerify;
private final boolean storageListEventuallyConsistent;
- private final boolean enableCrtClient;
+ private final S3ParallelDownloaderClientType parallelDownloaderClientType;
public S3ClientConfig(String region, String endpoint, String prefix,
boolean anonymousAuth,
- long profilerLogInterval, int writeBufferSize, boolean
enableCrtClient) {
+ long profilerLogInterval, int writeBufferSize,
+ S3ParallelDownloaderClientType parallelDownloaderClientType) {
this(region, endpoint, prefix, anonymousAuth, profilerLogInterval,
writeBufferSize, 1, 0, 0, 0, false, false,
- false, 0, 0, enableCrtClient);
+ false, 0, 0, parallelDownloaderClientType);
}
private S3ClientConfig(String region, String endpoint, String prefix,
boolean anonymousAuth,
long profilerLogInterval, int writeBufferSize, long
tokenAcquireTimeout, int writeMaxRequestsPerSeconds,
int readMaxRequestsPerSeconds, int requestsMaxHttpConnections,
boolean forcePathStyle,
boolean disableSslVerify, boolean storageListEventuallyConsistent,
int requestsMaxPendingHttpConnections,
- int requestsHttpConnectionAcquireTimeout, boolean enableCrtClient)
{
+ int requestsHttpConnectionAcquireTimeout,
S3ParallelDownloaderClientType parallelDownloaderClientType) {
this.region = Objects.requireNonNull(region, "region");
this.endpoint = endpoint;
this.prefix = Objects.requireNonNull(prefix, "prefix");
@@ -75,7 +76,7 @@
this.forcePathStyle = forcePathStyle;
this.disableSslVerify = disableSslVerify;
this.storageListEventuallyConsistent = storageListEventuallyConsistent;
- this.enableCrtClient = enableCrtClient;
+ this.parallelDownloaderClientType = parallelDownloaderClientType;
}
public static S3ClientConfig of(CloudProperties cloudProperties) {
@@ -87,7 +88,14 @@
cloudProperties.isStorageForcePathStyle(),
cloudProperties.isStorageDisableSSLVerify(),
cloudProperties.isStorageListEventuallyConsistent(),
cloudProperties.getRequestsMaxPendingHttpConnections(),
- cloudProperties.getRequestsHttpConnectionAcquireTimeout(),
cloudProperties.isS3EnableCrtClient());
+ cloudProperties.getRequestsHttpConnectionAcquireTimeout(),
+
S3ParallelDownloaderClientType.valueOf(cloudProperties.getS3ParallelDownloaderClientType()));
+ }
+
+ public enum S3ParallelDownloaderClientType {
+ CRT,
+ ASYNC,
+ SYNC
}
public static S3ClientConfig of(Map<String, String> configuration, int
writeBufferSize) {
@@ -101,7 +109,8 @@
String prefix = "";
boolean anonymousAuth = false;
- return new S3ClientConfig(region, endPoint, prefix, anonymousAuth,
profilerLogInterval, writeBufferSize, false);
+ return new S3ClientConfig(region, endPoint, prefix, anonymousAuth,
profilerLogInterval, writeBufferSize,
+ S3ParallelDownloaderClientType.ASYNC);
}
public String getRegion() {
@@ -169,8 +178,8 @@
return storageListEventuallyConsistent;
}
- public boolean isCrtClientEnabled() {
- return enableCrtClient;
+ public S3ParallelDownloaderClientType getParallelDownloaderClientType() {
+ return parallelDownloaderClientType;
}
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index defffe0..e68f859 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -329,7 +329,11 @@
@Override
public IParallelDownloader createParallelDownloader(String bucket,
IOManager ioManager) {
- return new S3ParallelDownloader(bucket, ioManager, config, profiler);
+ S3ClientConfig.S3ParallelDownloaderClientType
parallelDownloaderClientType = config.getParallelDownloaderClientType();
+ return switch (parallelDownloaderClientType) {
+ case CRT, ASYNC -> new S3ParallelDownloader(bucket, ioManager,
config, profiler);
+ case SYNC -> new S3SyncDownloader(bucket, ioManager, config,
profiler);
+ };
}
@Override
@@ -364,7 +368,7 @@
return new S3BufferedWriter(s3Client, profiler, guardian, bucket,
config.getPrefix() + path);
}
- private static CloseableAwsClients buildClient(S3ClientConfig config) {
+ public static CloseableAwsClients buildClient(S3ClientConfig config) {
CloseableAwsClients awsClients = new CloseableAwsClients();
S3ClientBuilder builder = S3Client.builder();
AwsCredentialsProvider credentialsProvider =
config.createCredentialsProvider();
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
index 50d05c6..6d85127 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
@@ -181,7 +181,9 @@
private static S3AsyncClient createAsyncClient(S3ClientConfig config) {
// CRT client is not supported by all local S3 providers, but provides
a better performance with AWS S3
- if (config.isCrtClientEnabled()) {
+ S3ClientConfig.S3ParallelDownloaderClientType
parallelDownloaderClientType =
+ config.getParallelDownloaderClientType();
+ if (parallelDownloaderClientType ==
S3ClientConfig.S3ParallelDownloaderClientType.CRT) {
return createS3CrtAsyncClient(config);
}
return createS3AsyncClient(config);
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
new file mode 100644
index 0000000..8b99240
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.aws.s3;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+@ThreadSafe
+public class S3SyncDownloader implements IParallelDownloader {
+ private final String bucket;
+ private final IOManager ioManager;
+ private final S3Client s3Client;
+ private final S3ClientConfig config;
+ private final IRequestProfilerLimiter profiler;
+
+ S3SyncDownloader(String bucket, IOManager ioManager, S3ClientConfig config,
+ IRequestProfilerLimiter profiler) {
+ this.bucket = bucket;
+ this.ioManager = ioManager;
+ this.config = config;
+ this.profiler = profiler;
+ this.s3Client = (S3Client)
S3CloudClient.buildClient(config).getConsumingClient();
+ }
+
+ @Override
+ public void downloadFiles(Collection<FileReference> toDownload) throws
HyracksDataException {
+ try {
+ for (FileReference fileReference : toDownload) {
+ profiler.objectGet();
+ FileUtils.createParentDirectories(fileReference.getFile());
+ downloadFile(fileReference);
+ }
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private void downloadFile(FileReference fileReference) throws IOException {
+ GetObjectRequest request = GetObjectRequest.builder().bucket(bucket)
+ .key(config.getPrefix() +
fileReference.getRelativePath()).build();
+
+ Path targetPath = fileReference.getFile().toPath();
+ try (ResponseInputStream<GetObjectResponse> response =
s3Client.getObject(request);
+ OutputStream outputStream = Files.newOutputStream(targetPath,
StandardOpenOption.CREATE,
+ StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING)) {
+
+ response.transferTo(outputStream);
+ }
+ }
+
+ @Override
+ public Collection<FileReference>
downloadDirectories(Collection<FileReference> toDownload)
+ throws HyracksDataException {
+ Set<FileReference> failedFiles = new HashSet<>();
+
+ try {
+ for (FileReference dirRef : toDownload) {
+ profiler.objectMultipartDownload();
+
+ ListObjectsV2Request listReq =
ListObjectsV2Request.builder().bucket(bucket)
+ .prefix(config.getPrefix() +
dirRef.getRelativePath()).build();
+
+ for (ListObjectsV2Response page :
s3Client.listObjectsV2Paginator(listReq)) {
+ for (S3Object s3Object : page.contents()) {
+ String key = createDiskSubPath(s3Object.key());
+
+ FileReference targetFile = ioManager.resolve(key);
+
FileUtils.createParentDirectories(targetFile.getFile());
+
+ try {
+ downloadFile(targetFile);
+ } catch (IOException e) {
+ failedFiles.add(targetFile);
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+
+ return failedFiles;
+ }
+
+ private String createDiskSubPath(String objectName) {
+ if (!objectName.startsWith(STORAGE_SUB_DIR)) {
+ objectName =
objectName.substring(objectName.indexOf(STORAGE_SUB_DIR));
+ }
+ return objectName;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ s3Client.close();
+ }
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
index 25bc217..364fb2a 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
@@ -45,7 +45,6 @@
import reactor.core.publisher.Mono;
public class AzureParallelDownloader implements IParallelDownloader {
- public static final String STORAGE_SUB_DIR = "storage";
private final IOManager ioManager;
private final BlobContainerAsyncClient blobContainerAsyncClient;
private final IRequestProfilerLimiter profiler;
diff --git
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
index 1bb0f74..5f8ee8c 100644
---
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
+++
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
@@ -67,8 +67,8 @@
client.createBucket(CreateBucketRequest.builder().bucket(PLAYGROUND_CONTAINER).build());
LOGGER.info("Client created successfully");
int writeBufferSize = StorageUtil.getIntSizeInBytes(5,
StorageUtil.StorageUnit.MEGABYTE);
- S3ClientConfig config =
- new S3ClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME,
"", true, 0, writeBufferSize, false);
+ S3ClientConfig config = new S3ClientConfig(MOCK_SERVER_REGION,
MOCK_SERVER_HOSTNAME, "", true, 0,
+ writeBufferSize,
S3ClientConfig.S3ParallelDownloaderClientType.ASYNC);
CLOUD_CLIENT = new S3CloudClient(config,
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index 4b6c928..6449598 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -77,10 +77,11 @@
CLOUD_STORAGE_FORCE_PATH_STYLE(BOOLEAN, false),
CLOUD_STORAGE_DISABLE_SSL_VERIFY(BOOLEAN, false),
CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT(BOOLEAN, false),
- CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT(BOOLEAN,
(Function<IApplicationConfig, Boolean>) app -> {
- String endpoint = app.getString(CLOUD_STORAGE_ENDPOINT);
- return endpoint == null || endpoint.isEmpty();
- });
+ // CLOUD_STORAGE_S3_PARALLEL_DOWNLOADER_CLIENT_TYPE(STRING,
(Function<IApplicationConfig, String>) app -> {
+ // String endpoint = app.getString(CLOUD_STORAGE_ENDPOINT);
+ // return (endpoint == null || endpoint.isEmpty()) ? "crt"
: "async";
+ // }),
+ CLOUD_STORAGE_S3_PARALLEL_DOWNLOADER_CLIENT_TYPE(STRING, "sync");
private final IOptionType interpreter;
private final Object defaultValue;
@@ -196,8 +197,8 @@
case CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT:
return "Indicates whether or not deleted objects may be
contained in list operations for some time"
+ "after they are deleted";
- case CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT:
- return "Indicates whether or not to use the AWS CRT S3
client for async requests";
+ case CLOUD_STORAGE_S3_PARALLEL_DOWNLOADER_CLIENT_TYPE:
+ return "The S3 client to use for parallel downloads (crt,
async or sync)";
default:
throw new IllegalStateException("NYI: " + this);
}
@@ -212,14 +213,6 @@
public Object defaultValue() {
return defaultValue;
}
-
- @Override
- public String usageDefaultOverride(IApplicationConfig accessor,
Function<IOption, String> optionPrinter) {
- if (this == CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT) {
- return "true when no custom endpoint is set, otherwise false";
- }
- return IOption.super.usageDefaultOverride(accessor, optionPrinter);
- }
}
public String getStorageScheme() {
@@ -325,7 +318,7 @@
return
accessor.getBoolean(Option.CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT);
}
- public boolean isS3EnableCrtClient() {
- return accessor.getBoolean(Option.CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT);
+ public String getS3ParallelDownloaderClientType() {
+ return
accessor.getString(Option.CLOUD_STORAGE_S3_PARALLEL_DOWNLOADER_CLIENT_TYPE).toUpperCase();
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20549?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: Id16c530916e7e223201e2395d7aa38a4640367b6
Gerrit-Change-Number: 20549
Gerrit-PatchSet: 1
Gerrit-Owner: Ritik Raj <[email protected]>