>From Ritik Raj <[email protected]>:

Ritik Raj has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20549?usp=email )


Change subject: [NO ISSUE][CLOUD] Introduced config for S3 sync client
......................................................................

[NO ISSUE][CLOUD] Introduced config for S3 sync client

- user model changes: no
- storage format changes: no
- interface changes: no

Ext-ref: MB-69226

Change-Id: Id16c530916e7e223201e2395d7aa38a4640367b6
---
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
A 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
M 
asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
8 files changed, 173 insertions(+), 31 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/49/20549/1

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
index 22de1e9..6f1c453 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
@@ -24,6 +24,7 @@
 import org.apache.hyracks.api.io.FileReference;

 public interface IParallelDownloader extends AutoCloseable {
+    String STORAGE_SUB_DIR = "storage";

     /**
      * Downloads files in all partitions
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index b6683ad..28fd78a 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -47,19 +47,20 @@
     private final boolean forcePathStyle;
     private final boolean disableSslVerify;
     private final boolean storageListEventuallyConsistent;
-    private final boolean enableCrtClient;
+    private final S3ParallelDownloaderClientType parallelDownloaderClientType;

     public S3ClientConfig(String region, String endpoint, String prefix, 
boolean anonymousAuth,
-            long profilerLogInterval, int writeBufferSize, boolean 
enableCrtClient) {
+            long profilerLogInterval, int writeBufferSize,
+            S3ParallelDownloaderClientType parallelDownloaderClientType) {
         this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, 
writeBufferSize, 1, 0, 0, 0, false, false,
-                false, 0, 0, enableCrtClient);
+                false, 0, 0, parallelDownloaderClientType);
     }

     private S3ClientConfig(String region, String endpoint, String prefix, 
boolean anonymousAuth,
             long profilerLogInterval, int writeBufferSize, long 
tokenAcquireTimeout, int writeMaxRequestsPerSeconds,
             int readMaxRequestsPerSeconds, int requestsMaxHttpConnections, 
boolean forcePathStyle,
             boolean disableSslVerify, boolean storageListEventuallyConsistent, 
int requestsMaxPendingHttpConnections,
-            int requestsHttpConnectionAcquireTimeout, boolean enableCrtClient) 
{
+            int requestsHttpConnectionAcquireTimeout, 
S3ParallelDownloaderClientType parallelDownloaderClientType) {
         this.region = Objects.requireNonNull(region, "region");
         this.endpoint = endpoint;
         this.prefix = Objects.requireNonNull(prefix, "prefix");
@@ -75,7 +76,7 @@
         this.forcePathStyle = forcePathStyle;
         this.disableSslVerify = disableSslVerify;
         this.storageListEventuallyConsistent = storageListEventuallyConsistent;
-        this.enableCrtClient = enableCrtClient;
+        this.parallelDownloaderClientType = parallelDownloaderClientType;
     }

     public static S3ClientConfig of(CloudProperties cloudProperties) {
@@ -87,7 +88,14 @@
                 cloudProperties.isStorageForcePathStyle(), 
cloudProperties.isStorageDisableSSLVerify(),
                 cloudProperties.isStorageListEventuallyConsistent(),
                 cloudProperties.getRequestsMaxPendingHttpConnections(),
-                cloudProperties.getRequestsHttpConnectionAcquireTimeout(), 
cloudProperties.isS3EnableCrtClient());
+                cloudProperties.getRequestsHttpConnectionAcquireTimeout(),
+                
S3ParallelDownloaderClientType.valueOf(cloudProperties.getS3ParallelDownloaderClientType()));
+    }
+
+    public enum S3ParallelDownloaderClientType {
+        CRT,
+        ASYNC,
+        SYNC
     }

     public static S3ClientConfig of(Map<String, String> configuration, int 
writeBufferSize) {
@@ -101,7 +109,8 @@
         String prefix = "";
         boolean anonymousAuth = false;

-        return new S3ClientConfig(region, endPoint, prefix, anonymousAuth, 
profilerLogInterval, writeBufferSize, false);
+        return new S3ClientConfig(region, endPoint, prefix, anonymousAuth, 
profilerLogInterval, writeBufferSize,
+                S3ParallelDownloaderClientType.ASYNC);
     }

     public String getRegion() {
@@ -169,8 +178,8 @@
         return storageListEventuallyConsistent;
     }

-    public boolean isCrtClientEnabled() {
-        return enableCrtClient;
+    public S3ParallelDownloaderClientType getParallelDownloaderClientType() {
+        return parallelDownloaderClientType;
     }

 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index defffe0..e68f859 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -329,7 +329,11 @@

     @Override
     public IParallelDownloader createParallelDownloader(String bucket, 
IOManager ioManager) {
-        return new S3ParallelDownloader(bucket, ioManager, config, profiler);
+        S3ClientConfig.S3ParallelDownloaderClientType 
parallelDownloaderClientType = config.getParallelDownloaderClientType();
+        return switch (parallelDownloaderClientType) {
+            case CRT, ASYNC -> new S3ParallelDownloader(bucket, ioManager, 
config, profiler);
+            case SYNC -> new S3SyncDownloader(bucket, ioManager, config, 
profiler);
+        };
     }

     @Override
@@ -364,7 +368,7 @@
         return new S3BufferedWriter(s3Client, profiler, guardian, bucket, 
config.getPrefix() + path);
     }

-    private static CloseableAwsClients buildClient(S3ClientConfig config) {
+    public static CloseableAwsClients buildClient(S3ClientConfig config) {
         CloseableAwsClients awsClients = new CloseableAwsClients();
         S3ClientBuilder builder = S3Client.builder();
         AwsCredentialsProvider credentialsProvider = 
config.createCredentialsProvider();
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
index 50d05c6..6d85127 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
@@ -181,7 +181,9 @@

     private static S3AsyncClient createAsyncClient(S3ClientConfig config) {
         // CRT client is not supported by all local S3 providers, but provides 
a better performance with AWS S3
-        if (config.isCrtClientEnabled()) {
+        S3ClientConfig.S3ParallelDownloaderClientType 
parallelDownloaderClientType =
+                config.getParallelDownloaderClientType();
+        if (parallelDownloaderClientType == 
S3ClientConfig.S3ParallelDownloaderClientType.CRT) {
             return createS3CrtAsyncClient(config);
         }
         return createS3AsyncClient(config);
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
new file mode 100644
index 0000000..8b99240
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.aws.s3;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+@ThreadSafe
+public class S3SyncDownloader implements IParallelDownloader {
+    private final String bucket;
+    private final IOManager ioManager;
+    private final S3Client s3Client;
+    private final S3ClientConfig config;
+    private final IRequestProfilerLimiter profiler;
+
+    S3SyncDownloader(String bucket, IOManager ioManager, S3ClientConfig config,
+                     IRequestProfilerLimiter profiler) {
+        this.bucket = bucket;
+        this.ioManager = ioManager;
+        this.config = config;
+        this.profiler = profiler;
+        this.s3Client = (S3Client) 
S3CloudClient.buildClient(config).getConsumingClient();
+    }
+
+    @Override
+    public void downloadFiles(Collection<FileReference> toDownload) throws 
HyracksDataException {
+        try {
+            for (FileReference fileReference : toDownload) {
+                profiler.objectGet();
+                FileUtils.createParentDirectories(fileReference.getFile());
+                downloadFile(fileReference);
+            }
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private void downloadFile(FileReference fileReference) throws IOException {
+        GetObjectRequest request = GetObjectRequest.builder().bucket(bucket)
+                .key(config.getPrefix() + 
fileReference.getRelativePath()).build();
+
+        Path targetPath = fileReference.getFile().toPath();
+        try (ResponseInputStream<GetObjectResponse> response = 
s3Client.getObject(request);
+                OutputStream outputStream = Files.newOutputStream(targetPath, 
StandardOpenOption.CREATE,
+                        StandardOpenOption.WRITE, 
StandardOpenOption.TRUNCATE_EXISTING)) {
+
+            response.transferTo(outputStream);
+        }
+    }
+
+    @Override
+    public Collection<FileReference> 
downloadDirectories(Collection<FileReference> toDownload)
+            throws HyracksDataException {
+        Set<FileReference> failedFiles = new HashSet<>();
+
+        try {
+            for (FileReference dirRef : toDownload) {
+                profiler.objectMultipartDownload();
+
+                ListObjectsV2Request listReq = 
ListObjectsV2Request.builder().bucket(bucket)
+                        .prefix(config.getPrefix() + 
dirRef.getRelativePath()).build();
+
+                for (ListObjectsV2Response page : 
s3Client.listObjectsV2Paginator(listReq)) {
+                    for (S3Object s3Object : page.contents()) {
+                        String key = createDiskSubPath(s3Object.key());
+
+                        FileReference targetFile = ioManager.resolve(key);
+                        
FileUtils.createParentDirectories(targetFile.getFile());
+
+                        try {
+                            downloadFile(targetFile);
+                        } catch (IOException e) {
+                            failedFiles.add(targetFile);
+                        }
+                    }
+                }
+            }
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+
+        return failedFiles;
+    }
+
+    private String createDiskSubPath(String objectName) {
+        if (!objectName.startsWith(STORAGE_SUB_DIR)) {
+            objectName = 
objectName.substring(objectName.indexOf(STORAGE_SUB_DIR));
+        }
+        return objectName;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        s3Client.close();
+    }
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
index 25bc217..364fb2a 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
@@ -45,7 +45,6 @@
 import reactor.core.publisher.Mono;

 public class AzureParallelDownloader implements IParallelDownloader {
-    public static final String STORAGE_SUB_DIR = "storage";
     private final IOManager ioManager;
     private final BlobContainerAsyncClient blobContainerAsyncClient;
     private final IRequestProfilerLimiter profiler;
diff --git 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
index 1bb0f74..5f8ee8c 100644
--- 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
+++ 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
@@ -67,8 +67,8 @@
         
client.createBucket(CreateBucketRequest.builder().bucket(PLAYGROUND_CONTAINER).build());
         LOGGER.info("Client created successfully");
         int writeBufferSize = StorageUtil.getIntSizeInBytes(5, 
StorageUtil.StorageUnit.MEGABYTE);
-        S3ClientConfig config =
-                new S3ClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, 
"", true, 0, writeBufferSize, false);
+        S3ClientConfig config = new S3ClientConfig(MOCK_SERVER_REGION, 
MOCK_SERVER_HOSTNAME, "", true, 0,
+                writeBufferSize, 
S3ClientConfig.S3ParallelDownloaderClientType.ASYNC);
         CLOUD_CLIENT = new S3CloudClient(config, 
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
     }

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index 4b6c928..6449598 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -77,10 +77,11 @@
         CLOUD_STORAGE_FORCE_PATH_STYLE(BOOLEAN, false),
         CLOUD_STORAGE_DISABLE_SSL_VERIFY(BOOLEAN, false),
         CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT(BOOLEAN, false),
-        CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT(BOOLEAN, 
(Function<IApplicationConfig, Boolean>) app -> {
-            String endpoint = app.getString(CLOUD_STORAGE_ENDPOINT);
-            return endpoint == null || endpoint.isEmpty();
-        });
+        //        CLOUD_STORAGE_S3_PARALLEL_DOWNLOADER_CLIENT_TYPE(STRING, 
(Function<IApplicationConfig, String>) app -> {
+        //            String endpoint = app.getString(CLOUD_STORAGE_ENDPOINT);
+        //            return (endpoint == null || endpoint.isEmpty()) ? "crt" 
: "async";
+        //        }),
+        CLOUD_STORAGE_S3_PARALLEL_DOWNLOADER_CLIENT_TYPE(STRING, "sync");

         private final IOptionType interpreter;
         private final Object defaultValue;
@@ -196,8 +197,8 @@
                 case CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT:
                     return "Indicates whether or not deleted objects may be 
contained in list operations for some time"
                             + "after they are deleted";
-                case CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT:
-                    return "Indicates whether or not to use the AWS CRT S3 
client for async requests";
+                case CLOUD_STORAGE_S3_PARALLEL_DOWNLOADER_CLIENT_TYPE:
+                    return "The S3 client to use for parallel downloads (crt, 
async or sync)";
                 default:
                     throw new IllegalStateException("NYI: " + this);
             }
@@ -212,14 +213,6 @@
         public Object defaultValue() {
             return defaultValue;
         }
-
-        @Override
-        public String usageDefaultOverride(IApplicationConfig accessor, 
Function<IOption, String> optionPrinter) {
-            if (this == CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT) {
-                return "true when no custom endpoint is set, otherwise false";
-            }
-            return IOption.super.usageDefaultOverride(accessor, optionPrinter);
-        }
     }

     public String getStorageScheme() {
@@ -325,7 +318,7 @@
         return 
accessor.getBoolean(Option.CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT);
     }

-    public boolean isS3EnableCrtClient() {
-        return accessor.getBoolean(Option.CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT);
+    public String getS3ParallelDownloaderClientType() {
+        return 
accessor.getString(Option.CLOUD_STORAGE_S3_PARALLEL_DOWNLOADER_CLIENT_TYPE).toUpperCase();
     }
 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20549?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: Id16c530916e7e223201e2395d7aa38a4640367b6
Gerrit-Change-Number: 20549
Gerrit-PatchSet: 1
Gerrit-Owner: Ritik Raj <[email protected]>

Reply via email to