suneet-s commented on code in PR #14131:
URL: https://github.com/apache/druid/pull/14131#discussion_r1178452319
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -64,6 +70,52 @@ public S3DataSegmentKiller(
this.inputDataConfig = inputDataConfig;
}
+ @Override
+ public void kill(List<DataSegment> segments) throws SegmentLoadingException
+ {
+ int size = segments.size();
+ if (size == 0) {
+ return;
+ }
+ if (segments.size() == 1) {
+ kill(segments.get(0));
+ return;
+ }
+
+ // we can assume that all segments are in the same bucket.
+ String s3Bucket = MapUtils.getString(segments.get(0).getLoadSpec(),
"bucket");
+ final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+
+ // 1000 objects is the max amount of objects that can be deleted in s3 at
a time.
+ List<List<DataSegment>> segmentsChunks = Lists.partition(segments, 1000);
+ for (List<DataSegment> segmentsChunk : segmentsChunks) {
+ try {
+ DeleteObjectsRequest deleteObjectsRequest = new
DeleteObjectsRequest(s3Bucket);
+ String[] keys = segmentsChunk.stream()
+ .map(segment -> MapUtils.getString(segment.getLoadSpec(),
"key"))
+ .toArray(String[]::new);
+ deleteObjectsRequest = deleteObjectsRequest.withKeys(keys);
+ log.info("Removing indexes from s3, bucket: %s, keys %s",
deleteObjectsRequest.getBucketName(),
deleteObjectsRequest.getKeys().toString());
Review Comment:
```suggestion
log.info("Removing indexes from s3, bucket: %s, keys [%s]",
s3Bucket, keys);
```
Can you paste an example of what this line looks like?
##########
processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java:
##########
@@ -54,6 +55,19 @@ static String descriptorPath(String path)
*/
void kill(DataSegment segment) throws SegmentLoadingException;
+ /**
+ * Removes a list of segment files (indexes and metadata) from deep storage.
This method can be more efficient if
+ * implementer of this interface leverages batch/bulk deletes.
Review Comment:
nit:
```suggestion
Removes a list of segment files (indexes and metadata) from deep
storage. This method can be more efficient if
* Kills a list of segments from deep storage. The default implementation
calls kill on the segments in a loop.
* Implementers of this interface can leverage batch / bulk deletes to be
more efficient.
```
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -64,6 +70,52 @@ public S3DataSegmentKiller(
this.inputDataConfig = inputDataConfig;
}
+ @Override
+ public void kill(List<DataSegment> segments) throws SegmentLoadingException
+ {
+ int size = segments.size();
+ if (size == 0) {
+ return;
+ }
+ if (segments.size() == 1) {
+ kill(segments.get(0));
+ return;
+ }
+
+ // we can assume that all segments are in the same bucket.
+ String s3Bucket = MapUtils.getString(segments.get(0).getLoadSpec(),
"bucket");
+ final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+
+ // 1000 objects is the max amount of objects that can be deleted in s3 at
a time.
+ List<List<DataSegment>> segmentsChunks = Lists.partition(segments, 1000);
+ for (List<DataSegment> segmentsChunk : segmentsChunks) {
+ try {
+ DeleteObjectsRequest deleteObjectsRequest = new
DeleteObjectsRequest(s3Bucket);
+ String[] keys = segmentsChunk.stream()
+ .map(segment -> MapUtils.getString(segment.getLoadSpec(),
"key"))
+ .toArray(String[]::new);
+ deleteObjectsRequest = deleteObjectsRequest.withKeys(keys);
+ log.info("Removing indexes from s3, bucket: %s, keys %s",
deleteObjectsRequest.getBucketName(),
deleteObjectsRequest.getKeys().toString());
+ DeleteObjectsResult deleteResult =
s3Client.deleteObjects(deleteObjectsRequest);
+ log.info("deleted objects %s", deleteResult);
+
+ // delete descriptors which are a files to store segment metadata in
deep storage.
+ // This file is deprecated and not stored anymore, but we still delete
them if they exist.
+ deleteObjectsRequest = new DeleteObjectsRequest(s3Bucket);
+ deleteObjectsRequest = deleteObjectsRequest.withKeys(
+
Arrays.stream(keys).map(DataSegmentKiller::descriptorPath).toArray(String[]::new));
+ deleteResult = s3Client.deleteObjects(deleteObjectsRequest);
+ log.info("deleted objects %s", deleteResult);
+ }
+ catch (MultiObjectDeleteException e) {
Review Comment:
Instead of making 2 `DeleteObjectsRequest`s can we combine all the keys into
one list of keys and then split the list based on the max 1000 keys that can be
deleted at a time.
This will also mean that descriptor files will not be left behind if
deleting one segment fails.
At this point the segments are already removed from the metadata store so
the files and their descriptors should no longer be referenced anywhere in
Driud.
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -64,6 +70,52 @@ public S3DataSegmentKiller(
this.inputDataConfig = inputDataConfig;
}
+ @Override
+ public void kill(List<DataSegment> segments) throws SegmentLoadingException
+ {
+ int size = segments.size();
+ if (size == 0) {
+ return;
+ }
+ if (segments.size() == 1) {
+ kill(segments.get(0));
+ return;
+ }
+
+ // we can assume that all segments are in the same bucket.
+ String s3Bucket = MapUtils.getString(segments.get(0).getLoadSpec(),
"bucket");
+ final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+
+ // 1000 objects is the max amount of objects that can be deleted in s3 at
a time.
+ List<List<DataSegment>> segmentsChunks = Lists.partition(segments, 1000);
+ for (List<DataSegment> segmentsChunk : segmentsChunks) {
+ try {
+ DeleteObjectsRequest deleteObjectsRequest = new
DeleteObjectsRequest(s3Bucket);
+ String[] keys = segmentsChunk.stream()
+ .map(segment -> MapUtils.getString(segment.getLoadSpec(),
"key"))
+ .toArray(String[]::new);
+ deleteObjectsRequest = deleteObjectsRequest.withKeys(keys);
Review Comment:
We don't care about the successes since we already have the list of keys in
the request.
```suggestion
deleteObjectsRequest =
deleteObjectsRequest.withKeys(keys).withQuiet(true);
```
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -64,6 +70,52 @@ public S3DataSegmentKiller(
this.inputDataConfig = inputDataConfig;
}
+ @Override
+ public void kill(List<DataSegment> segments) throws SegmentLoadingException
+ {
+ int size = segments.size();
+ if (size == 0) {
+ return;
+ }
+ if (segments.size() == 1) {
+ kill(segments.get(0));
+ return;
+ }
+
+ // we can assume that all segments are in the same bucket.
+ String s3Bucket = MapUtils.getString(segments.get(0).getLoadSpec(),
"bucket");
+ final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+
+ // 1000 objects is the max amount of objects that can be deleted in s3 at
a time.
+ List<List<DataSegment>> segmentsChunks = Lists.partition(segments, 1000);
+ for (List<DataSegment> segmentsChunk : segmentsChunks) {
+ try {
+ DeleteObjectsRequest deleteObjectsRequest = new
DeleteObjectsRequest(s3Bucket);
+ String[] keys = segmentsChunk.stream()
+ .map(segment -> MapUtils.getString(segment.getLoadSpec(),
"key"))
+ .toArray(String[]::new);
+ deleteObjectsRequest = deleteObjectsRequest.withKeys(keys);
+ log.info("Removing indexes from s3, bucket: %s, keys %s",
deleteObjectsRequest.getBucketName(),
deleteObjectsRequest.getKeys().toString());
+ DeleteObjectsResult deleteResult =
s3Client.deleteObjects(deleteObjectsRequest);
+ log.info("deleted objects %s", deleteResult);
+
+ // delete descriptors which are a files to store segment metadata in
deep storage.
+ // This file is deprecated and not stored anymore, but we still delete
them if they exist.
+ deleteObjectsRequest = new DeleteObjectsRequest(s3Bucket);
+ deleteObjectsRequest = deleteObjectsRequest.withKeys(
+
Arrays.stream(keys).map(DataSegmentKiller::descriptorPath).toArray(String[]::new));
+ deleteResult = s3Client.deleteObjects(deleteObjectsRequest);
+ log.info("deleted objects %s", deleteResult);
Review Comment:
```suggestion
```
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -64,6 +70,52 @@ public S3DataSegmentKiller(
this.inputDataConfig = inputDataConfig;
}
+ @Override
+ public void kill(List<DataSegment> segments) throws SegmentLoadingException
+ {
+ int size = segments.size();
+ if (size == 0) {
+ return;
+ }
+ if (segments.size() == 1) {
+ kill(segments.get(0));
+ return;
+ }
+
+ // we can assume that all segments are in the same bucket.
+ String s3Bucket = MapUtils.getString(segments.get(0).getLoadSpec(),
"bucket");
+ final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+
+ // 1000 objects is the max amount of objects that can be deleted in s3 at
a time.
+ List<List<DataSegment>> segmentsChunks = Lists.partition(segments, 1000);
+ for (List<DataSegment> segmentsChunk : segmentsChunks) {
+ try {
+ DeleteObjectsRequest deleteObjectsRequest = new
DeleteObjectsRequest(s3Bucket);
+ String[] keys = segmentsChunk.stream()
+ .map(segment -> MapUtils.getString(segment.getLoadSpec(),
"key"))
+ .toArray(String[]::new);
+ deleteObjectsRequest = deleteObjectsRequest.withKeys(keys);
+ log.info("Removing indexes from s3, bucket: %s, keys %s",
deleteObjectsRequest.getBucketName(),
deleteObjectsRequest.getKeys().toString());
+ DeleteObjectsResult deleteResult =
s3Client.deleteObjects(deleteObjectsRequest);
+ log.info("deleted objects %s", deleteResult);
Review Comment:
```suggestion
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -129,8 +129,12 @@ public TaskStatus runTask(TaskToolbox toolbox) throws
Exception
// Kill segments
toolbox.getTaskActionClient().submit(new SegmentNukeAction(new
HashSet<>(unusedSegments)));
- for (DataSegment segment : unusedSegments) {
- toolbox.getDataSegmentKiller().kill(segment);
+ if (getContextValue("batchDelete", false)) {
Review Comment:
If we keep this feature flag, it should be enabled by default as this is the
better method. Since there is no change in behavior with the flag enabled vs
disabled, I don't see a reason to default disable this behavior when batch
deleting segments is the more optimal way to delete segments. We should also
document this in the task context documentation and mention that this feature
flag will be going away in the very near future.
Is there a way to configure the auto-kill to specify this context so that
Druid operators who have auto-kill enabled can enable / disable this behavior
for kill tasks generated from the auto-kill functionality in Druid?
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -64,6 +70,52 @@ public S3DataSegmentKiller(
this.inputDataConfig = inputDataConfig;
}
+ @Override
+ public void kill(List<DataSegment> segments) throws SegmentLoadingException
+ {
+ int size = segments.size();
+ if (size == 0) {
+ return;
+ }
+ if (segments.size() == 1) {
+ kill(segments.get(0));
+ return;
+ }
+
+ // we can assume that all segments are in the same bucket.
+ String s3Bucket = MapUtils.getString(segments.get(0).getLoadSpec(),
"bucket");
+ final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+
+ // 1000 objects is the max amount of objects that can be deleted in s3 at
a time.
+ List<List<DataSegment>> segmentsChunks = Lists.partition(segments, 1000);
+ for (List<DataSegment> segmentsChunk : segmentsChunks) {
+ try {
+ DeleteObjectsRequest deleteObjectsRequest = new
DeleteObjectsRequest(s3Bucket);
+ String[] keys = segmentsChunk.stream()
+ .map(segment -> MapUtils.getString(segment.getLoadSpec(),
"key"))
+ .toArray(String[]::new);
+ deleteObjectsRequest = deleteObjectsRequest.withKeys(keys);
+ log.info("Removing indexes from s3, bucket: %s, keys %s",
deleteObjectsRequest.getBucketName(),
deleteObjectsRequest.getKeys().toString());
+ DeleteObjectsResult deleteResult =
s3Client.deleteObjects(deleteObjectsRequest);
+ log.info("deleted objects %s", deleteResult);
+
+ // delete descriptors which are a files to store segment metadata in
deep storage.
+ // This file is deprecated and not stored anymore, but we still delete
them if they exist.
+ deleteObjectsRequest = new DeleteObjectsRequest(s3Bucket);
+ deleteObjectsRequest = deleteObjectsRequest.withKeys(
+
Arrays.stream(keys).map(DataSegmentKiller::descriptorPath).toArray(String[]::new));
+ deleteResult = s3Client.deleteObjects(deleteObjectsRequest);
+ log.info("deleted objects %s", deleteResult);
+ }
+ catch (MultiObjectDeleteException e) {
+ throw new SegmentLoadingException(e, "Couldn't kill all segment but
deleted[%s]: [%s]", e.getDeletedObjects(), e);
Review Comment:
I don't think we should throw an exception if deleting some of the segments
fail. We should run through all the objects to delete and then at the end,
throw an exception with the list of objects that failed to delete.
```suggestion
throw new SegmentLoadingException(e, "Failed to delete keys [%s]",
e.getDeletedObjects().stream().map(DeleteObjectsResult.DeletedObject::getKey).collect(Collectors.toList())));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]