suneet-s commented on code in PR #14131:
URL: https://github.com/apache/druid/pull/14131#discussion_r1178452319


##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -64,6 +70,52 @@ public S3DataSegmentKiller(
     this.inputDataConfig = inputDataConfig;
   }
 
+  @Override
+  public void kill(List<DataSegment> segments) throws SegmentLoadingException
+  {
+    int size = segments.size();
+    if (size == 0) {
+      return;
+    }
+    if (segments.size() == 1) {
+      kill(segments.get(0));
+      return;
+    }
+
+    // we can assume that all segments are in the same bucket.
+    String s3Bucket = MapUtils.getString(segments.get(0).getLoadSpec(), 
"bucket");
+    final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+
+    // 1000 objects is the max amount of objects that can be deleted in s3 at 
a time.
+    List<List<DataSegment>> segmentsChunks = Lists.partition(segments, 1000);
+    for (List<DataSegment> segmentsChunk : segmentsChunks) {
+      try {
+        DeleteObjectsRequest deleteObjectsRequest = new 
DeleteObjectsRequest(s3Bucket);
+        String[] keys = segmentsChunk.stream()
+                     .map(segment -> MapUtils.getString(segment.getLoadSpec(), 
"key"))
+                     .toArray(String[]::new);
+        deleteObjectsRequest = deleteObjectsRequest.withKeys(keys);
+        log.info("Removing indexes from s3, bucket: %s, keys %s", 
deleteObjectsRequest.getBucketName(), 
deleteObjectsRequest.getKeys().toString());

Review Comment:
   ```suggestion
           log.info("Removing indexes from s3, bucket: %s, keys [%s]", 
s3Bucket, keys);
   ```
   
   Can you paste an example of what this line looks like?



##########
processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java:
##########
@@ -54,6 +55,19 @@ static String descriptorPath(String path)
    */
   void kill(DataSegment segment) throws SegmentLoadingException;
 
+  /**
+   * Removes a list of segment files (indexes and metadata) from deep storage. 
This method can be more efficient if
+   * implementer of this interface leverages batch/bulk deletes.

Review Comment:
   nit:
   ```suggestion
         Removes a list of segment files (indexes and metadata) from deep 
storage. This method can be more efficient if
      * Kills a list of segments from deep storage. The default implementation 
calls kill on the segments in a loop.
      * Implementers of this interface can leverage batch / bulk deletes to be 
more efficient.
   ```



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -64,6 +70,52 @@ public S3DataSegmentKiller(
     this.inputDataConfig = inputDataConfig;
   }
 
+  @Override
+  public void kill(List<DataSegment> segments) throws SegmentLoadingException
+  {
+    int size = segments.size();
+    if (size == 0) {
+      return;
+    }
+    if (segments.size() == 1) {
+      kill(segments.get(0));
+      return;
+    }
+
+    // we can assume that all segments are in the same bucket.
+    String s3Bucket = MapUtils.getString(segments.get(0).getLoadSpec(), 
"bucket");
+    final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+
+    // 1000 objects is the max amount of objects that can be deleted in s3 at 
a time.
+    List<List<DataSegment>> segmentsChunks = Lists.partition(segments, 1000);
+    for (List<DataSegment> segmentsChunk : segmentsChunks) {
+      try {
+        DeleteObjectsRequest deleteObjectsRequest = new 
DeleteObjectsRequest(s3Bucket);
+        String[] keys = segmentsChunk.stream()
+                     .map(segment -> MapUtils.getString(segment.getLoadSpec(), 
"key"))
+                     .toArray(String[]::new);
+        deleteObjectsRequest = deleteObjectsRequest.withKeys(keys);
+        log.info("Removing indexes from s3, bucket: %s, keys %s", 
deleteObjectsRequest.getBucketName(), 
deleteObjectsRequest.getKeys().toString());
+        DeleteObjectsResult deleteResult = 
s3Client.deleteObjects(deleteObjectsRequest);
+        log.info("deleted objects %s", deleteResult);
+
+        // delete descriptors which are a files to store segment metadata in 
deep storage.
+        // This file is deprecated and not stored anymore, but we still delete 
them if they exist.
+        deleteObjectsRequest = new DeleteObjectsRequest(s3Bucket);
+        deleteObjectsRequest = deleteObjectsRequest.withKeys(
+            
Arrays.stream(keys).map(DataSegmentKiller::descriptorPath).toArray(String[]::new));
+        deleteResult = s3Client.deleteObjects(deleteObjectsRequest);
+        log.info("deleted objects %s", deleteResult);
+      }
+      catch (MultiObjectDeleteException e) {

Review Comment:
   Instead of making 2 `DeleteObjectsRequest`s can we combine all the keys into 
one list of keys and then split the list based on the max 1000 keys that can be 
deleted at a time.
   
   This will also mean that descriptor files will not be left behind if 
deleting one segment fails.
   
   At this point the segments are already removed from the metadata store so 
the files and their descriptors should no longer be referenced anywhere in 
Driud.



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -64,6 +70,52 @@ public S3DataSegmentKiller(
     this.inputDataConfig = inputDataConfig;
   }
 
+  @Override
+  public void kill(List<DataSegment> segments) throws SegmentLoadingException
+  {
+    int size = segments.size();
+    if (size == 0) {
+      return;
+    }
+    if (segments.size() == 1) {
+      kill(segments.get(0));
+      return;
+    }
+
+    // we can assume that all segments are in the same bucket.
+    String s3Bucket = MapUtils.getString(segments.get(0).getLoadSpec(), 
"bucket");
+    final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+
+    // 1000 objects is the max amount of objects that can be deleted in s3 at 
a time.
+    List<List<DataSegment>> segmentsChunks = Lists.partition(segments, 1000);
+    for (List<DataSegment> segmentsChunk : segmentsChunks) {
+      try {
+        DeleteObjectsRequest deleteObjectsRequest = new 
DeleteObjectsRequest(s3Bucket);
+        String[] keys = segmentsChunk.stream()
+                     .map(segment -> MapUtils.getString(segment.getLoadSpec(), 
"key"))
+                     .toArray(String[]::new);
+        deleteObjectsRequest = deleteObjectsRequest.withKeys(keys);

Review Comment:
   We don't care about the successes since we already have the list of keys in 
the request.
   
   ```suggestion
           deleteObjectsRequest = 
deleteObjectsRequest.withKeys(keys).withQuiet(true);
   ```



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -64,6 +70,52 @@ public S3DataSegmentKiller(
     this.inputDataConfig = inputDataConfig;
   }
 
+  @Override
+  public void kill(List<DataSegment> segments) throws SegmentLoadingException
+  {
+    int size = segments.size();
+    if (size == 0) {
+      return;
+    }
+    if (segments.size() == 1) {
+      kill(segments.get(0));
+      return;
+    }
+
+    // we can assume that all segments are in the same bucket.
+    String s3Bucket = MapUtils.getString(segments.get(0).getLoadSpec(), 
"bucket");
+    final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+
+    // 1000 objects is the max amount of objects that can be deleted in s3 at 
a time.
+    List<List<DataSegment>> segmentsChunks = Lists.partition(segments, 1000);
+    for (List<DataSegment> segmentsChunk : segmentsChunks) {
+      try {
+        DeleteObjectsRequest deleteObjectsRequest = new 
DeleteObjectsRequest(s3Bucket);
+        String[] keys = segmentsChunk.stream()
+                     .map(segment -> MapUtils.getString(segment.getLoadSpec(), 
"key"))
+                     .toArray(String[]::new);
+        deleteObjectsRequest = deleteObjectsRequest.withKeys(keys);
+        log.info("Removing indexes from s3, bucket: %s, keys %s", 
deleteObjectsRequest.getBucketName(), 
deleteObjectsRequest.getKeys().toString());
+        DeleteObjectsResult deleteResult = 
s3Client.deleteObjects(deleteObjectsRequest);
+        log.info("deleted objects %s", deleteResult);
+
+        // delete descriptors which are a files to store segment metadata in 
deep storage.
+        // This file is deprecated and not stored anymore, but we still delete 
them if they exist.
+        deleteObjectsRequest = new DeleteObjectsRequest(s3Bucket);
+        deleteObjectsRequest = deleteObjectsRequest.withKeys(
+            
Arrays.stream(keys).map(DataSegmentKiller::descriptorPath).toArray(String[]::new));
+        deleteResult = s3Client.deleteObjects(deleteObjectsRequest);
+        log.info("deleted objects %s", deleteResult);

Review Comment:
   ```suggestion
   ```



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -64,6 +70,52 @@ public S3DataSegmentKiller(
     this.inputDataConfig = inputDataConfig;
   }
 
+  @Override
+  public void kill(List<DataSegment> segments) throws SegmentLoadingException
+  {
+    int size = segments.size();
+    if (size == 0) {
+      return;
+    }
+    if (segments.size() == 1) {
+      kill(segments.get(0));
+      return;
+    }
+
+    // we can assume that all segments are in the same bucket.
+    String s3Bucket = MapUtils.getString(segments.get(0).getLoadSpec(), 
"bucket");
+    final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+
+    // 1000 objects is the max amount of objects that can be deleted in s3 at 
a time.
+    List<List<DataSegment>> segmentsChunks = Lists.partition(segments, 1000);
+    for (List<DataSegment> segmentsChunk : segmentsChunks) {
+      try {
+        DeleteObjectsRequest deleteObjectsRequest = new 
DeleteObjectsRequest(s3Bucket);
+        String[] keys = segmentsChunk.stream()
+                     .map(segment -> MapUtils.getString(segment.getLoadSpec(), 
"key"))
+                     .toArray(String[]::new);
+        deleteObjectsRequest = deleteObjectsRequest.withKeys(keys);
+        log.info("Removing indexes from s3, bucket: %s, keys %s", 
deleteObjectsRequest.getBucketName(), 
deleteObjectsRequest.getKeys().toString());
+        DeleteObjectsResult deleteResult = 
s3Client.deleteObjects(deleteObjectsRequest);
+        log.info("deleted objects %s", deleteResult);

Review Comment:
   ```suggestion
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -129,8 +129,12 @@ public TaskStatus runTask(TaskToolbox toolbox) throws 
Exception
 
     // Kill segments
     toolbox.getTaskActionClient().submit(new SegmentNukeAction(new 
HashSet<>(unusedSegments)));
-    for (DataSegment segment : unusedSegments) {
-      toolbox.getDataSegmentKiller().kill(segment);
+    if (getContextValue("batchDelete", false)) {

Review Comment:
   If we keep this feature flag, it should be enabled by default as this is the 
better method. Since there is no change in behavior with the flag enabled vs 
disabled, I don't see a reason to default disable this behavior when batch 
deleting segments is the more optimal way to delete segments. We should also 
document this in the task context documentation and mention that this feature 
flag will be going away in the very near future.
   
   Is there a way to configure the auto-kill to specify this context so that 
Druid operators who have auto-kill enabled can enable / disable this behavior 
for kill tasks generated from the auto-kill functionality in Druid?



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -64,6 +70,52 @@ public S3DataSegmentKiller(
     this.inputDataConfig = inputDataConfig;
   }
 
+  @Override
+  public void kill(List<DataSegment> segments) throws SegmentLoadingException
+  {
+    int size = segments.size();
+    if (size == 0) {
+      return;
+    }
+    if (segments.size() == 1) {
+      kill(segments.get(0));
+      return;
+    }
+
+    // we can assume that all segments are in the same bucket.
+    String s3Bucket = MapUtils.getString(segments.get(0).getLoadSpec(), 
"bucket");
+    final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+
+    // 1000 objects is the max amount of objects that can be deleted in s3 at 
a time.
+    List<List<DataSegment>> segmentsChunks = Lists.partition(segments, 1000);
+    for (List<DataSegment> segmentsChunk : segmentsChunks) {
+      try {
+        DeleteObjectsRequest deleteObjectsRequest = new 
DeleteObjectsRequest(s3Bucket);
+        String[] keys = segmentsChunk.stream()
+                     .map(segment -> MapUtils.getString(segment.getLoadSpec(), 
"key"))
+                     .toArray(String[]::new);
+        deleteObjectsRequest = deleteObjectsRequest.withKeys(keys);
+        log.info("Removing indexes from s3, bucket: %s, keys %s", 
deleteObjectsRequest.getBucketName(), 
deleteObjectsRequest.getKeys().toString());
+        DeleteObjectsResult deleteResult = 
s3Client.deleteObjects(deleteObjectsRequest);
+        log.info("deleted objects %s", deleteResult);
+
+        // delete descriptors which are a files to store segment metadata in 
deep storage.
+        // This file is deprecated and not stored anymore, but we still delete 
them if they exist.
+        deleteObjectsRequest = new DeleteObjectsRequest(s3Bucket);
+        deleteObjectsRequest = deleteObjectsRequest.withKeys(
+            
Arrays.stream(keys).map(DataSegmentKiller::descriptorPath).toArray(String[]::new));
+        deleteResult = s3Client.deleteObjects(deleteObjectsRequest);
+        log.info("deleted objects %s", deleteResult);
+      }
+      catch (MultiObjectDeleteException e) {
+        throw new SegmentLoadingException(e, "Couldn't kill all segment but 
deleted[%s]: [%s]", e.getDeletedObjects(), e);

Review Comment:
   I don't think we should throw an exception if deleting some of the segments 
fail. We should run through all the objects to delete and then at the end, 
throw an exception with the list of objects that failed to delete.
   
   ```suggestion
           throw new SegmentLoadingException(e, "Failed to delete keys [%s]", 
e.getDeletedObjects().stream().map(DeleteObjectsResult.DeletedObject::getKey).collect(Collectors.toList())));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to