>From Hussain Towaileb <[email protected]>: Hussain Towaileb has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20428?usp=email )
Change subject: [ASTERIXDB-3653][EXT]: Properly handle errors when deleting for COPY TO ...................................................................... [ASTERIXDB-3653][EXT]: Properly handle errors when deleting for COPY TO Ext-ref: MB-68654 Change-Id: Ib670a014a5e4378b0a63be740ead72d699a84bb1 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20428 Tested-by: Hussain Towaileb <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> --- M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java 6 files changed, 97 insertions(+), 33 deletions(-) Approvals: Jenkins: Verified Hussain Towaileb: Looks good to me, but someone else must approve; Verified Michael Blow: Looks good to me, approved diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java index 0baebc7..a8cb59b 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java @@ -121,6 +121,15 @@ void copy(String bucket, String srcPath, FileReference destPath) throws HyracksDataException; /** + * Deletes an object at the specified bucket and path + * + * @param bucket bucket + * @param path path of object + * @throws HyracksDataException HyracksDataException + */ + void deleteObject(String bucket, String path) throws HyracksDataException; + + /** * Deletes all objects at the specified bucket and paths * * @param bucket bucket diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java index d81419b..1160c12 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java @@ -106,6 +106,11 @@ } @Override + public void deleteObject(String bucket, String path) throws HyracksDataException { + cloudClient.deleteObject(bucket, path); + } + + @Override public void deleteObjects(String bucket, Collection<String> paths) throws HyracksDataException { cloudClient.deleteObjects(bucket, paths); } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java index 81b96d7..c9fd485 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java @@ -73,6 +73,7 @@ import software.amazon.awssdk.services.s3.S3ClientBuilder; import software.amazon.awssdk.services.s3.model.CopyObjectRequest; import software.amazon.awssdk.services.s3.model.Delete; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; import software.amazon.awssdk.services.s3.model.GetObjectRequest; @@ -238,6 +239,22 @@ } @Override + public void deleteObject(String bucket, String path) throws HyracksDataException { + try { + if (path.isEmpty()) { + return; + } + guardian.checkWriteAccess(bucket, path); + profiler.objectDelete(); + DeleteObjectRequest request = + DeleteObjectRequest.builder().bucket(bucket).key(config.getPrefix() + path).build(); + s3Client.deleteObject(request); + } catch (Exception ex) { + throw HyracksDataException.create(ex); + } + } + + @Override public void deleteObjects(String bucket, Collection<String> paths) throws HyracksDataException { if (paths.isEmpty()) { return; diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java index bdb551b..4a61f1c 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java @@ -259,6 +259,21 @@ } @Override + public void deleteObject(String bucket, String path) throws HyracksDataException { + try { + if (path.isEmpty()) { + return; + } + guardian.checkWriteAccess(bucket, path); + profiler.objectDelete(); + BlobClient blobClient = blobContainerClient.getBlobClient(config.getPrefix() + path); + blobClient.delete(); + } catch (Exception ex) { + throw HyracksDataException.create(ex); + } + } + + @Override public void deleteObjects(String bucket, Collection<String> paths) throws HyracksDataException { if (paths.isEmpty()) return; @@ -270,21 +285,25 @@ for (List<String> batch : batchedBlobURLs) { PagedIterable<Response<Void>> responses = blobBatchClient.deleteBlobs(batch, null); Iterator<String> deletePathIter = paths.iterator(); - String deletedPath = null; - try { - for (Response<Void> response : responses) { - deletedPath = deletePathIter.next(); - // The response.getStatusCode() method returns: - // - 202 (Accepted) if the delete operation is successful - // - exception if the delete operation fails - int statusCode = response.getStatusCode(); - if (statusCode != SUCCESS_RESPONSE_CODE) { - LOGGER.warn("Failed to delete blob: {} with status code: {} while deleting {}", deletedPath, - statusCode, paths.toString()); + String deletedPath; + String failedDeletedPath = null; + for (Response<Void> response : responses) { + deletedPath = deletePathIter.next(); + // The response.getStatusCode() method returns: + // - 202 (Accepted) if the delete operation is successful + // - exception if the delete operation fails + int statusCode = response.getStatusCode(); + if (statusCode != SUCCESS_RESPONSE_CODE) { + LOGGER.warn("Failed to delete blob: {} with status code: {} while deleting {}", deletedPath, + statusCode, paths.toString()); + if (failedDeletedPath == null) { + failedDeletedPath = deletedPath; } } - } catch (BlobStorageException e) { - throw new RuntimeDataException(ErrorCode.CLOUD_IO_FAILURE, e, "DELETE", deletedPath, paths.toString()); + } + if (failedDeletedPath != null) { + throw new RuntimeDataException(ErrorCode.CLOUD_IO_FAILURE, "DELETE", failedDeletedPath, + paths.toString()); } } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java index bd0a044..48bb2cd 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java @@ -75,7 +75,7 @@ private final Storage gcsClient; private final GCSClientConfig config; private final ICloudGuardian guardian; - private final IRequestProfilerLimiter profilerLimiter; + private final IRequestProfilerLimiter profiler; private final int writeBufferSize; public GCSCloudClient(GCSClientConfig config, Storage gcsClient, ICloudGuardian guardian) { @@ -86,9 +86,9 @@ long profilerInterval = config.getProfilerLogInterval(); GCSRequestRateLimiter limiter = new GCSRequestRateLimiter(config); if (profilerInterval > 0) { - profilerLimiter = new CountRequestProfilerLimiter(profilerInterval, limiter); + profiler = new CountRequestProfilerLimiter(profilerInterval, limiter); } else { - profilerLimiter = new RequestLimiterNoOpProfiler(limiter); + profiler = new RequestLimiterNoOpProfiler(limiter); } guardian.setCloudClient(this); } @@ -104,18 +104,18 @@ @Override public IRequestProfilerLimiter getProfilerLimiter() { - return profilerLimiter; + return profiler; } @Override public ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider) { - return new GCSWriter(bucket, config.getPrefix() + path, gcsClient, profilerLimiter, guardian, writeBufferSize); + return new GCSWriter(bucket, config.getPrefix() + path, gcsClient, profiler, guardian, writeBufferSize); } @Override public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) { guardian.checkReadAccess(bucket, path); - profilerLimiter.objectsList(); + profiler.objectsList(); Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path), BlobListOption.fields(Storage.BlobField.SIZE)); Set<CloudFile> files = new HashSet<>(); @@ -130,7 +130,7 @@ @Override public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException { guardian.checkReadAccess(bucket, path); - profilerLimiter.objectGet(); + profiler.objectGet(); BlobId blobId = BlobId.of(bucket, config.getPrefix() + path); long readTo = offset + buffer.remaining(); int totalRead = 0; @@ -152,7 +152,7 @@ @Override public byte[] readAllBytes(String bucket, String path) throws HyracksDataException { guardian.checkReadAccess(bucket, path); - profilerLimiter.objectGet(); + profiler.objectGet(); BlobId blobId = BlobId.of(bucket, config.getPrefix() + path); try { return gcsClient.readAllBytes(blobId); @@ -167,7 +167,7 @@ @Override public InputStream getObjectStream(String bucket, String path, long offset, long length) { guardian.checkReadAccess(bucket, path); - profilerLimiter.objectGet(); + profiler.objectGet(); ReadChannel reader = null; try { reader = gcsClient.reader(bucket, config.getPrefix() + path).limit(offset + length); @@ -181,7 +181,7 @@ @Override public void write(String bucket, String path, byte[] data) { guardian.checkWriteAccess(bucket, path); - profilerLimiter.objectWrite(); + profiler.objectWrite(); BlobInfo blobInfo = BlobInfo.newBuilder(bucket, config.getPrefix() + path).build(); gcsClient.create(blobInfo, data); } @@ -189,10 +189,10 @@ @Override public void copy(String bucket, String srcPath, FileReference destPath) { guardian.checkReadAccess(bucket, srcPath); - profilerLimiter.objectsList(); + profiler.objectsList(); Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + srcPath)); for (Blob blob : blobs.iterateAll()) { - profilerLimiter.objectCopy(); + profiler.objectCopy(); BlobId source = blob.getBlobId(); String targetName = destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName())); BlobId target = BlobId.of(bucket, targetName); @@ -203,6 +203,21 @@ } @Override + public void deleteObject(String bucket, String path) throws HyracksDataException { + try { + if (path.isEmpty()) { + return; + } + guardian.checkWriteAccess(bucket, path); + profiler.objectDelete(); + BlobId blobId = BlobId.of(bucket, config.getPrefix() + path); + gcsClient.delete(blobId); + } catch (Exception ex) { + throw HyracksDataException.create(ex); + } + } + + @Override public void deleteObjects(String bucket, Collection<String> paths) throws HyracksDataException { if (paths.isEmpty()) { return; @@ -236,14 +251,14 @@ paths.toString()); } } - profilerLimiter.objectDelete(); + profiler.objectDelete(); } } @Override public long getObjectSize(String bucket, String path) { guardian.checkReadAccess(bucket, path); - profilerLimiter.objectGet(); + profiler.objectGet(); Blob blob = gcsClient.get(bucket, config.getPrefix() + path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE)); if (blob == null) { @@ -255,7 +270,7 @@ @Override public boolean exists(String bucket, String path) { guardian.checkReadAccess(bucket, path); - profilerLimiter.objectGet(); + profiler.objectGet(); Blob blob = gcsClient.get(bucket, config.getPrefix() + path, Storage.BlobGetOption.fields(Storage.BlobField.values())); return blob != null && blob.exists(); @@ -264,7 +279,7 @@ @Override public boolean isEmptyPrefix(String bucket, String path) { guardian.checkReadAccess(bucket, path); - profilerLimiter.objectsList(); + profiler.objectsList(); Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path)); return !blobs.iterateAll().iterator().hasNext(); } @@ -272,13 +287,13 @@ @Override public IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager) throws HyracksDataException { - return new GCSParallelDownloader(bucket, ioManager, config, profilerLimiter); + return new GCSParallelDownloader(bucket, ioManager, config, profiler); } @Override public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) { guardian.checkReadAccess(bucket, "/"); - profilerLimiter.objectsList(); + profiler.objectsList(); Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.fields(Storage.BlobField.SIZE)); ArrayNode objectsInfo = objectMapper.createArrayNode(); diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java index c52e2b6..b6af323 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java @@ -23,7 +23,6 @@ import static org.apache.hyracks.cloud.util.CloudRetryableRequestUtil.runWithNoRetryOnInterruption; import java.io.IOException; -import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.Random; @@ -179,7 +178,7 @@ } } finally { // Delete the written file - runWithNoRetryOnInterruption(() -> testClient.deleteObjects(bucket, Collections.singleton(finalPath))); + runWithNoRetryOnInterruption(() -> testClient.deleteObject(bucket, finalPath)); } } } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20428?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: phoenix Gerrit-Change-Id: Ib670a014a5e4378b0a63be740ead72d699a84bb1 Gerrit-Change-Number: 20428 Gerrit-PatchSet: 3 Gerrit-Owner: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]>
