shekhars-li commented on code in PR #1676:
URL: https://github.com/apache/samza/pull/1676#discussion_r1286271287
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -210,6 +216,56 @@ public CompletableFuture<String>
putSnapshotIndex(SnapshotIndex snapshotIndex) {
}, isCauseNonRetriable(), executor, retryPolicyConfig);
}
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself. This is done by getting
the SnapshotIndex first.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
+ * @param requestMetadata Metadata of the request
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
Metadata requestMetadata) {
+ return cleanSnapshotIndex(snapshotIndexBlobId, requestMetadata, false);
+ }
+
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself. This is done by getting
the SnapshotIndex first.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
+ * @param requestMetadata Metadata of the request
+ * @param getDeletedBlob Gets SnapshotIndex with getDeleted flag set
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
Metadata requestMetadata, Boolean getDeletedBlob) {
+ Metadata getSnapshotRequest = new
Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(),
requestMetadata.getJobName(),
+ requestMetadata.getJobId(), requestMetadata.getTaskName(),
requestMetadata.getStoreName());
+ SnapshotIndex snapshotIndex = getSnapshotIndex(snapshotIndexBlobId,
getSnapshotRequest, getDeletedBlob).join();
+ return cleanSnapshotIndex(snapshotIndexBlobId, snapshotIndex,
requestMetadata);
+ }
+
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
+ * @param snapshotIndex SnapshotIndex to delete
+ * @param requestMetadata Metadata of the request
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
SnapshotIndex snapshotIndex, Metadata requestMetadata) {
+ DirIndex dirIndex = snapshotIndex.getDirIndex();
+ CompletionStage<Void> storeDeletionFuture =
+ cleanUpDir(dirIndex, requestMetadata) // delete files and sub-dirs
previously marked for removal
+ .thenComposeAsync(v ->
+ deleteDir(dirIndex, requestMetadata), executor) // deleted
files and dirs still present
+ .thenComposeAsync(v ->
deleteSnapshotIndexBlob(snapshotIndexBlobId, requestMetadata), executor) //
delete the snapshot index blob
+ .exceptionally(ex -> {
+ if (ex instanceof DeletedException) {
+ LOG.warn("DeletedException received on trying to clean up
SnapshotIndex {}. Ignoring the error.",
Review Comment:
Added request metadata, that should have all the necessary info
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]