>From Hussain Towaileb <[email protected]>:

Hussain Towaileb has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19825 )


Change subject: [ASTERIXDB-XXXXX][EXT]: Add retry logic to cloud clients
......................................................................

[ASTERIXDB-XXXXX][EXT]: Add retry logic to cloud clients

Change-Id: Ie9fcea374a4a04286d82e9323c5d677f975707fc
---
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
M 
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
M 
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
M 
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
8 files changed, 71 insertions(+), 28 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/25/19825/1

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 8fefcf5..d0ff275 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -204,7 +204,7 @@
     }

     @Override
-    public void restoreStream(CloudInputStream cloudStream) {
+    public void restoreStream(CloudInputStream cloudStream) throws 
HyracksDataException {
         LOGGER.warn("Restoring stream from cloud, {}", cloudStream);
         /*
          * This cloud request should not be called using 
CloudRetryableRequestUtil as it is the responsibility of the
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index d332944..754fd88 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -98,7 +98,7 @@
      * @param length length
      * @return input stream of requested range
      */
-    InputStream getObjectStream(String bucket, String path, long offset, long 
length);
+    InputStream getObjectStream(String bucket, String path, long offset, long 
length) throws HyracksDataException;

     /**
      * Writes the content of the byte array into the bucket at the specified 
path
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
index ccd6da1..3ba546d 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
@@ -90,7 +90,8 @@
     }

     @Override
-    public InputStream getObjectStream(String bucket, String path, long 
offset, long length) {
+    public InputStream getObjectStream(String bucket, String path, long 
offset, long length)
+            throws HyracksDataException {
         return cloudClient.getObjectStream(bucket, path, offset, length);
     }

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 01a8b02..b9f8bc5 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -21,6 +21,7 @@
 import static 
org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig.DELETE_BATCH_SIZE;
 import static org.apache.asterix.cloud.clients.aws.s3.S3ClientUtils.encodeURI;
 import static 
org.apache.asterix.cloud.clients.aws.s3.S3ClientUtils.listS3Objects;
+import static 
org.apache.hyracks.cloud.util.CloudRetryableRequestUtil.runWithNoRetryOnInterruption;

 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -51,6 +52,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.cloud.io.request.ICloudReturnableRequest;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 import org.apache.logging.log4j.LogManager;
@@ -126,11 +128,15 @@
     }

     @Override
-    public Set<CloudFile> listObjects(String bucket, String path, 
FilenameFilter filter) {
+    public Set<CloudFile> listObjects(String bucket, String path, 
FilenameFilter filter) throws HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profiler.objectsList();
         path = config.isLocalS3Provider() ? encodeURI(path) : path;
-        return filterAndGet(listS3Objects(s3Client, bucket, config.getPrefix() 
+ path), filter);
+
+        String finalPath = config.getPrefix() + path;
+        ICloudReturnableRequest<List<S3Object>> request = () -> 
listS3Objects(s3Client, bucket, finalPath);
+        List<S3Object> objects = runWithNoRetryOnInterruption(request);
+        return filterAndGet(objects, filter);
     }

     @Override
@@ -145,8 +151,8 @@
         int totalRead = 0;
         int read;

-        // TODO(htowaileb): add retry logic here
-        try (ResponseInputStream<GetObjectResponse> response = 
s3Client.getObject(rangeGetObjectRequest)) {
+        try (ResponseInputStream<GetObjectResponse> response =
+                runWithNoRetryOnInterruption(() -> 
s3Client.getObject(rangeGetObjectRequest))) {
             while (buffer.remaining() > 0) {
                 read = response.read(buffer.array(), buffer.position(), 
buffer.remaining());
                 if (read == -1) {
@@ -172,7 +178,8 @@
         profiler.objectGet();
         GetObjectRequest getReq = 
GetObjectRequest.builder().bucket(bucket).key(config.getPrefix() + 
path).build();

-        try (ResponseInputStream<GetObjectResponse> stream = 
s3Client.getObject(getReq)) {
+        try (ResponseInputStream<GetObjectResponse> stream =
+                runWithNoRetryOnInterruption(() -> 
s3Client.getObject(getReq))) {
             return stream.readAllBytes();
         } catch (NoSuchKeyException e) {
             return null;
@@ -182,14 +189,15 @@
     }

     @Override
-    public InputStream getObjectStream(String bucket, String path, long 
offset, long length) {
+    public InputStream getObjectStream(String bucket, String path, long 
offset, long length)
+            throws HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profiler.objectGet();
         long readTo = offset + length - 1;
         GetObjectRequest getReq = GetObjectRequest.builder().range("bytes=" + 
offset + "-" + readTo).bucket(bucket)
                 .key(config.getPrefix() + path).build();
         try {
-            return s3Client.getObject(getReq);
+            return runWithNoRetryOnInterruption(() -> 
s3Client.getObject(getReq));
         } catch (NoSuchKeyException e) {
             // This should not happen at least from the only caller of this 
method
             throw new IllegalStateException(e);
@@ -197,19 +205,20 @@
     }

     @Override
-    public void write(String bucket, String path, byte[] data) {
+    public void write(String bucket, String path, byte[] data) throws 
HyracksDataException {
         guardian.checkWriteAccess(bucket, path);
         profiler.objectWrite();
         PutObjectRequest putReq = 
PutObjectRequest.builder().bucket(bucket).key(config.getPrefix() + 
path).build();
-        s3Client.putObject(putReq, RequestBody.fromBytes(data));
+        runWithNoRetryOnInterruption(() -> s3Client.putObject(putReq, 
RequestBody.fromBytes(data)));
     }

     @Override
-    public void copy(String bucket, String srcPath, FileReference destPath) {
+    public void copy(String bucket, String srcPath, FileReference destPath) 
throws HyracksDataException {
         guardian.checkReadAccess(bucket, srcPath);
         srcPath = config.getPrefix() + srcPath;
         srcPath = config.isLocalS3Provider() ? encodeURI(srcPath) : srcPath;
-        List<S3Object> objects = listS3Objects(s3Client, bucket, srcPath);
+        final String finalSrcPath = srcPath;
+        List<S3Object> objects = runWithNoRetryOnInterruption(() -> 
listS3Objects(s3Client, bucket, finalSrcPath));

         profiler.objectsList();
         for (S3Object object : objects) {
@@ -219,7 +228,8 @@
             String destKey = 
destPath.getChildPath(IoUtil.getFileNameFromPath(srcKey));
             CopyObjectRequest copyReq = 
CopyObjectRequest.builder().sourceBucket(bucket).sourceKey(srcKey)
                     
.destinationBucket(bucket).destinationKey(config.getPrefix() + destKey).build();
-            s3Client.copyObject(copyReq);
+
+            runWithNoRetryOnInterruption(() -> s3Client.copyObject(copyReq));
         }
     }

@@ -242,7 +252,8 @@

             Delete delete = 
Delete.builder().objects(objectIdentifiers).build();
             DeleteObjectsRequest deleteReq = 
DeleteObjectsRequest.builder().bucket(bucket).delete(delete).build();
-            DeleteObjectsResponse deleteObjectsResponse = 
s3Client.deleteObjects(deleteReq);
+            DeleteObjectsResponse deleteObjectsResponse =
+                    runWithNoRetryOnInterruption(() -> 
s3Client.deleteObjects(deleteReq));
             if (deleteObjectsResponse.hasErrors()) {
                 List<S3Error> deleteErrors = deleteObjectsResponse.errors();
                 for (S3Error s3Error : deleteErrors) {
@@ -261,9 +272,11 @@
         guardian.checkReadAccess(bucket, path);
         profiler.objectGet();
         try {
-            return s3Client
-                    
.headObject(HeadObjectRequest.builder().bucket(bucket).key(config.getPrefix() + 
path).build())
-                    .contentLength();
+            return runWithNoRetryOnInterruption(
+                    () -> s3Client
+                            .headObject(
+                                    
HeadObjectRequest.builder().bucket(bucket).key(config.getPrefix() + 
path).build())
+                            .contentLength());
         } catch (NoSuchKeyException ex) {
             return 0;
         } catch (Exception ex) {
@@ -276,7 +289,8 @@
         guardian.checkReadAccess(bucket, path);
         profiler.objectGet();
         try {
-            
s3Client.headObject(HeadObjectRequest.builder().bucket(bucket).key(config.getPrefix()
 + path).build());
+            runWithNoRetryOnInterruption(() -> s3Client
+                    
.headObject(HeadObjectRequest.builder().bucket(bucket).key(config.getPrefix() + 
path).build()));
             return true;
         } catch (NoSuchKeyException ex) {
             return false;
@@ -288,7 +302,8 @@
     @Override
     public boolean isEmptyPrefix(String bucket, String path) throws 
HyracksDataException {
         profiler.objectsList();
-        return S3ClientUtils.isEmptyPrefix(s3Client, bucket, 
config.getPrefix() + path);
+        return runWithNoRetryOnInterruption(
+                () -> S3ClientUtils.isEmptyPrefix(s3Client, bucket, 
config.getPrefix() + path));
     }

     @Override
@@ -297,8 +312,9 @@
     }

     @Override
-    public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
-        List<S3Object> objects = listS3Objects(s3Client, bucket, 
config.getPrefix());
+    public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) 
throws HyracksDataException {
+        List<S3Object> objects =
+                runWithNoRetryOnInterruption(() -> listS3Objects(s3Client, 
bucket, config.getPrefix()));
         ArrayNode objectsInfo = objectMapper.createArrayNode();

         objects.sort((x, y) -> String.CASE_INSENSITIVE_ORDER.compare(x.key(), 
y.key()));
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
index 1de64d0..73345c3 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
@@ -40,6 +40,9 @@
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.request.ICloudReturnableRequest;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
 import org.apache.hyracks.data.std.primitive.LongPointable;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -138,6 +141,7 @@
         while (testClient.exists(bucket, path)) {
             path = pathPrefix + random.nextInt();
         }
+        final String finalPath = path;

         long writeValue = random.nextLong();
         byte[] data = new byte[Long.BYTES];
@@ -158,7 +162,9 @@
         }

         try {
-            long readValue = 
LongPointable.getLong(testClient.readAllBytes(bucket, path), 0);
+            ICloudReturnableRequest<byte[]> request = () -> 
testClient.readAllBytes(bucket, finalPath);
+            byte[] bytes = 
CloudRetryableRequestUtil.runWithNoRetryOnInterruption(request);
+            long readValue = LongPointable.getLong(bytes, 0);
             if (writeValue != readValue) {
                 // This should never happen unless S3 is messed up. But log 
for sanity check
                 LOGGER.warn(
@@ -167,7 +173,8 @@
             }
         } finally {
             // Delete the written file
-            testClient.deleteObjects(bucket, Collections.singleton(path));
+            ICloudRequest request = () -> testClient.deleteObjects(bucket, 
Collections.singleton(finalPath));
+            CloudRetryableRequestUtil.runWithNoRetryOnInterruption(request);
         }
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
index 9b6a2ff..272544d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -58,7 +58,7 @@
      *
      * @param stream to restore
      */
-    void restoreStream(CloudInputStream stream);
+    void restoreStream(CloudInputStream stream) throws HyracksDataException;

     /**
      * Write to local drive only
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
index 4dab80d..3ace317 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.cloud.io.request;

+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;

 /**
@@ -30,5 +31,5 @@
     /**
      * Run pre-retry routine before reattempting {@link ICloudRequest} or 
{@link ICloudReturnableRequest}
      */
-    void beforeRetry();
+    void beforeRetry() throws HyracksDataException;
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
index da9c8fb..2dde96c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
@@ -151,7 +151,16 @@
         doRun(request, retry);
     }

-    private static <T> T doRun(ICloudReturnableRequest<T> request, 
ICloudBeforeRetryRequest retry)
+    public static <T> T 
runWithNoRetryOnInterruption(ICloudReturnableRequest<T> request) throws 
HyracksDataException {
+        return runWithNoRetryOnInterruption(request, NO_OP_RETRY);
+    }
+
+    public static <T> T 
runWithNoRetryOnInterruption(ICloudReturnableRequest<T> request, 
ICloudBeforeRetryRequest retry)
+            throws HyracksDataException {
+        return doRun(request, retry);
+    }
+
+    public static <T> T doRun(ICloudReturnableRequest<T> request, 
ICloudBeforeRetryRequest retry)
             throws HyracksDataException {
         int attempt = 1;
         IRetryPolicy retryPolicy = null;

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19825
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: ionic
Gerrit-Change-Id: Ie9fcea374a4a04286d82e9323c5d677f975707fc
Gerrit-Change-Number: 19825
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>
Gerrit-MessageType: newchange

Reply via email to