>From Hussain Towaileb <[email protected]>:
Hussain Towaileb has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19825 )
Change subject: [ASTERIXDB-XXXXX][EXT]: Add retry logic to cloud clients
......................................................................
[ASTERIXDB-XXXXX][EXT]: Add retry logic to cloud clients
Change-Id: Ie9fcea374a4a04286d82e9323c5d677f975707fc
---
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
M
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
M
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
M
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
8 files changed, 71 insertions(+), 28 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/25/19825/1
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 8fefcf5..d0ff275 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -204,7 +204,7 @@
}
@Override
- public void restoreStream(CloudInputStream cloudStream) {
+ public void restoreStream(CloudInputStream cloudStream) throws
HyracksDataException {
LOGGER.warn("Restoring stream from cloud, {}", cloudStream);
/*
* This cloud request should not be called using
CloudRetryableRequestUtil as it is the responsibility of the
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index d332944..754fd88 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -98,7 +98,7 @@
* @param length length
* @return input stream of requested range
*/
- InputStream getObjectStream(String bucket, String path, long offset, long
length);
+ InputStream getObjectStream(String bucket, String path, long offset, long
length) throws HyracksDataException;
/**
* Writes the content of the byte array into the bucket at the specified
path
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
index ccd6da1..3ba546d 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
@@ -90,7 +90,8 @@
}
@Override
- public InputStream getObjectStream(String bucket, String path, long
offset, long length) {
+ public InputStream getObjectStream(String bucket, String path, long
offset, long length)
+ throws HyracksDataException {
return cloudClient.getObjectStream(bucket, path, offset, length);
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 01a8b02..b9f8bc5 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -21,6 +21,7 @@
import static
org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig.DELETE_BATCH_SIZE;
import static org.apache.asterix.cloud.clients.aws.s3.S3ClientUtils.encodeURI;
import static
org.apache.asterix.cloud.clients.aws.s3.S3ClientUtils.listS3Objects;
+import static
org.apache.hyracks.cloud.util.CloudRetryableRequestUtil.runWithNoRetryOnInterruption;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -51,6 +52,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.cloud.io.request.ICloudReturnableRequest;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.util.annotations.ThreadSafe;
import org.apache.logging.log4j.LogManager;
@@ -126,11 +128,15 @@
}
@Override
- public Set<CloudFile> listObjects(String bucket, String path,
FilenameFilter filter) {
+ public Set<CloudFile> listObjects(String bucket, String path,
FilenameFilter filter) throws HyracksDataException {
guardian.checkReadAccess(bucket, path);
profiler.objectsList();
path = config.isLocalS3Provider() ? encodeURI(path) : path;
- return filterAndGet(listS3Objects(s3Client, bucket, config.getPrefix()
+ path), filter);
+
+ String finalPath = config.getPrefix() + path;
+ ICloudReturnableRequest<List<S3Object>> request = () ->
listS3Objects(s3Client, bucket, finalPath);
+ List<S3Object> objects = runWithNoRetryOnInterruption(request);
+ return filterAndGet(objects, filter);
}
@Override
@@ -145,8 +151,8 @@
int totalRead = 0;
int read;
- // TODO(htowaileb): add retry logic here
- try (ResponseInputStream<GetObjectResponse> response =
s3Client.getObject(rangeGetObjectRequest)) {
+ try (ResponseInputStream<GetObjectResponse> response =
+ runWithNoRetryOnInterruption(() ->
s3Client.getObject(rangeGetObjectRequest))) {
while (buffer.remaining() > 0) {
read = response.read(buffer.array(), buffer.position(),
buffer.remaining());
if (read == -1) {
@@ -172,7 +178,8 @@
profiler.objectGet();
GetObjectRequest getReq =
GetObjectRequest.builder().bucket(bucket).key(config.getPrefix() +
path).build();
- try (ResponseInputStream<GetObjectResponse> stream =
s3Client.getObject(getReq)) {
+ try (ResponseInputStream<GetObjectResponse> stream =
+ runWithNoRetryOnInterruption(() ->
s3Client.getObject(getReq))) {
return stream.readAllBytes();
} catch (NoSuchKeyException e) {
return null;
@@ -182,14 +189,15 @@
}
@Override
- public InputStream getObjectStream(String bucket, String path, long
offset, long length) {
+ public InputStream getObjectStream(String bucket, String path, long
offset, long length)
+ throws HyracksDataException {
guardian.checkReadAccess(bucket, path);
profiler.objectGet();
long readTo = offset + length - 1;
GetObjectRequest getReq = GetObjectRequest.builder().range("bytes=" +
offset + "-" + readTo).bucket(bucket)
.key(config.getPrefix() + path).build();
try {
- return s3Client.getObject(getReq);
+ return runWithNoRetryOnInterruption(() ->
s3Client.getObject(getReq));
} catch (NoSuchKeyException e) {
// This should not happen at least from the only caller of this
method
throw new IllegalStateException(e);
@@ -197,19 +205,20 @@
}
@Override
- public void write(String bucket, String path, byte[] data) {
+ public void write(String bucket, String path, byte[] data) throws
HyracksDataException {
guardian.checkWriteAccess(bucket, path);
profiler.objectWrite();
PutObjectRequest putReq =
PutObjectRequest.builder().bucket(bucket).key(config.getPrefix() +
path).build();
- s3Client.putObject(putReq, RequestBody.fromBytes(data));
+ runWithNoRetryOnInterruption(() -> s3Client.putObject(putReq,
RequestBody.fromBytes(data)));
}
@Override
- public void copy(String bucket, String srcPath, FileReference destPath) {
+ public void copy(String bucket, String srcPath, FileReference destPath)
throws HyracksDataException {
guardian.checkReadAccess(bucket, srcPath);
srcPath = config.getPrefix() + srcPath;
srcPath = config.isLocalS3Provider() ? encodeURI(srcPath) : srcPath;
- List<S3Object> objects = listS3Objects(s3Client, bucket, srcPath);
+ final String finalSrcPath = srcPath;
+ List<S3Object> objects = runWithNoRetryOnInterruption(() ->
listS3Objects(s3Client, bucket, finalSrcPath));
profiler.objectsList();
for (S3Object object : objects) {
@@ -219,7 +228,8 @@
String destKey =
destPath.getChildPath(IoUtil.getFileNameFromPath(srcKey));
CopyObjectRequest copyReq =
CopyObjectRequest.builder().sourceBucket(bucket).sourceKey(srcKey)
.destinationBucket(bucket).destinationKey(config.getPrefix() + destKey).build();
- s3Client.copyObject(copyReq);
+
+ runWithNoRetryOnInterruption(() -> s3Client.copyObject(copyReq));
}
}
@@ -242,7 +252,8 @@
Delete delete =
Delete.builder().objects(objectIdentifiers).build();
DeleteObjectsRequest deleteReq =
DeleteObjectsRequest.builder().bucket(bucket).delete(delete).build();
- DeleteObjectsResponse deleteObjectsResponse =
s3Client.deleteObjects(deleteReq);
+ DeleteObjectsResponse deleteObjectsResponse =
+ runWithNoRetryOnInterruption(() ->
s3Client.deleteObjects(deleteReq));
if (deleteObjectsResponse.hasErrors()) {
List<S3Error> deleteErrors = deleteObjectsResponse.errors();
for (S3Error s3Error : deleteErrors) {
@@ -261,9 +272,11 @@
guardian.checkReadAccess(bucket, path);
profiler.objectGet();
try {
- return s3Client
-
.headObject(HeadObjectRequest.builder().bucket(bucket).key(config.getPrefix() +
path).build())
- .contentLength();
+ return runWithNoRetryOnInterruption(
+ () -> s3Client
+ .headObject(
+
HeadObjectRequest.builder().bucket(bucket).key(config.getPrefix() +
path).build())
+ .contentLength());
} catch (NoSuchKeyException ex) {
return 0;
} catch (Exception ex) {
@@ -276,7 +289,8 @@
guardian.checkReadAccess(bucket, path);
profiler.objectGet();
try {
-
s3Client.headObject(HeadObjectRequest.builder().bucket(bucket).key(config.getPrefix()
+ path).build());
+ runWithNoRetryOnInterruption(() -> s3Client
+
.headObject(HeadObjectRequest.builder().bucket(bucket).key(config.getPrefix() +
path).build()));
return true;
} catch (NoSuchKeyException ex) {
return false;
@@ -288,7 +302,8 @@
@Override
public boolean isEmptyPrefix(String bucket, String path) throws
HyracksDataException {
profiler.objectsList();
- return S3ClientUtils.isEmptyPrefix(s3Client, bucket,
config.getPrefix() + path);
+ return runWithNoRetryOnInterruption(
+ () -> S3ClientUtils.isEmptyPrefix(s3Client, bucket,
config.getPrefix() + path));
}
@Override
@@ -297,8 +312,9 @@
}
@Override
- public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
- List<S3Object> objects = listS3Objects(s3Client, bucket,
config.getPrefix());
+ public JsonNode listAsJson(ObjectMapper objectMapper, String bucket)
throws HyracksDataException {
+ List<S3Object> objects =
+ runWithNoRetryOnInterruption(() -> listS3Objects(s3Client,
bucket, config.getPrefix()));
ArrayNode objectsInfo = objectMapper.createArrayNode();
objects.sort((x, y) -> String.CASE_INSENSITIVE_ORDER.compare(x.key(),
y.key()));
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
index 1de64d0..73345c3 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
@@ -40,6 +40,9 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.request.ICloudReturnableRequest;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
import org.apache.hyracks.data.std.primitive.LongPointable;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -138,6 +141,7 @@
while (testClient.exists(bucket, path)) {
path = pathPrefix + random.nextInt();
}
+ final String finalPath = path;
long writeValue = random.nextLong();
byte[] data = new byte[Long.BYTES];
@@ -158,7 +162,9 @@
}
try {
- long readValue =
LongPointable.getLong(testClient.readAllBytes(bucket, path), 0);
+ ICloudReturnableRequest<byte[]> request = () ->
testClient.readAllBytes(bucket, finalPath);
+ byte[] bytes =
CloudRetryableRequestUtil.runWithNoRetryOnInterruption(request);
+ long readValue = LongPointable.getLong(bytes, 0);
if (writeValue != readValue) {
// This should never happen unless S3 is messed up. But log
for sanity check
LOGGER.warn(
@@ -167,7 +173,8 @@
}
} finally {
// Delete the written file
- testClient.deleteObjects(bucket, Collections.singleton(path));
+ ICloudRequest request = () -> testClient.deleteObjects(bucket,
Collections.singleton(finalPath));
+ CloudRetryableRequestUtil.runWithNoRetryOnInterruption(request);
}
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
index 9b6a2ff..272544d 100644
---
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -58,7 +58,7 @@
*
* @param stream to restore
*/
- void restoreStream(CloudInputStream stream);
+ void restoreStream(CloudInputStream stream) throws HyracksDataException;
/**
* Write to local drive only
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
index 4dab80d..3ace317 100644
---
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.cloud.io.request;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
/**
@@ -30,5 +31,5 @@
/**
* Run pre-retry routine before reattempting {@link ICloudRequest} or
{@link ICloudReturnableRequest}
*/
- void beforeRetry();
+ void beforeRetry() throws HyracksDataException;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
index da9c8fb..2dde96c 100644
---
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
@@ -151,7 +151,16 @@
doRun(request, retry);
}
- private static <T> T doRun(ICloudReturnableRequest<T> request,
ICloudBeforeRetryRequest retry)
+ public static <T> T
runWithNoRetryOnInterruption(ICloudReturnableRequest<T> request) throws
HyracksDataException {
+ return runWithNoRetryOnInterruption(request, NO_OP_RETRY);
+ }
+
+ public static <T> T
runWithNoRetryOnInterruption(ICloudReturnableRequest<T> request,
ICloudBeforeRetryRequest retry)
+ throws HyracksDataException {
+ return doRun(request, retry);
+ }
+
+ public static <T> T doRun(ICloudReturnableRequest<T> request,
ICloudBeforeRetryRequest retry)
throws HyracksDataException {
int attempt = 1;
IRetryPolicy retryPolicy = null;
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19825
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: ionic
Gerrit-Change-Id: Ie9fcea374a4a04286d82e9323c5d677f975707fc
Gerrit-Change-Number: 19825
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>
Gerrit-MessageType: newchange