This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 08c01f1dae9 Handle and map errors in delete pending segments API 
(#15673)
08c01f1dae9 is described below

commit 08c01f1dae9732859ee69f12b4612c6df05c4dde
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Sun Jan 14 20:39:01 2024 -0800

    Handle and map errors in delete pending segments API (#15673)
    
    Changes:
    - Handle exception in deletePendingSegments API and map to correct HTTP 
status code
    - Clean up exception message using `DruidException`
    - Add unit tests
---
 .../overlord/IndexerMetadataStorageAdapter.java    |  39 +++---
 .../apache/druid/indexing/overlord/TaskMaster.java |   2 +-
 .../indexing/overlord/http/OverlordResource.java   |  21 +++-
 .../IndexerMetadataStorageAdapterTest.java         |  75 ++++++++++--
 .../overlord/http/OverlordResourceTest.java        | 136 ++++++++++++++++++++-
 5 files changed, 238 insertions(+), 35 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapter.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapter.java
index 4671c5bb7a9..6d23fcec327 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapter.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapter.java
@@ -19,11 +19,12 @@
 
 package org.apache.druid.indexing.overlord;
 
-import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexer.TaskInfo;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.java.util.common.DateTimes;
-import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import java.util.Comparator;
@@ -46,23 +47,31 @@ public class IndexerMetadataStorageAdapter
 
   public int deletePendingSegments(String dataSource, Interval deleteInterval)
   {
-    // Check the given interval overlaps the 
interval(minCreatedDateOfActiveTasks, MAX)
-    final Optional<DateTime> minCreatedDateOfActiveTasks = 
taskStorageQueryAdapter
+    // Find the earliest active task created for the specified datasource; if 
one exists,
+    // check if its interval overlaps with the delete interval.
+    final Optional<TaskInfo<Task, TaskStatus>> earliestActiveTaskOptional = 
taskStorageQueryAdapter
         .getActiveTaskInfo(dataSource)
         .stream()
-        .map(TaskInfo::getCreatedTime)
-        .min(Comparator.naturalOrder());
+        .min(Comparator.comparing(TaskInfo::getCreatedTime));
 
-    final Interval activeTaskInterval = new Interval(
-        minCreatedDateOfActiveTasks.orElse(DateTimes.MAX),
-        DateTimes.MAX
-    );
+    if (earliestActiveTaskOptional.isPresent()) {
+      final TaskInfo<Task, TaskStatus> earliestActiveTask = 
earliestActiveTaskOptional.get();
+      final Interval activeTaskInterval = new Interval(
+          earliestActiveTask.getCreatedTime(),
+          DateTimes.MAX
+      );
 
-    Preconditions.checkArgument(
-        !deleteInterval.overlaps(activeTaskInterval),
-        "Cannot delete pendingSegments because there is at least one active 
task created at %s",
-        activeTaskInterval.getStart()
-    );
+      if (deleteInterval.overlaps(activeTaskInterval)) {
+        throw InvalidInput.exception(
+            "Cannot delete pendingSegments for datasource[%s] as there is at 
least one active task[%s] created at[%s] "
+            + "that overlaps with the delete interval[%s]. Please retry when 
there are no active tasks.",
+            dataSource,
+            earliestActiveTask.getId(),
+            activeTaskInterval.getStart(),
+            deleteInterval
+        );
+      }
+    }
 
     return 
indexerMetadataStorageCoordinator.deletePendingSegmentsCreatedInInterval(dataSource,
 deleteInterval);
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
index e4d26b37573..4798513aafd 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
@@ -238,7 +238,7 @@ public class TaskMaster implements TaskCountStatsProvider, 
TaskSlotCountStatsPro
   }
 
   /**
-   * Returns true if it's the leader and its all services have been properly 
initialized.
+   * Returns true if it's the leader and all its services have been 
initialized.
    */
   public boolean isLeader()
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index 7afaaa90526..b589dc5ffd8 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -939,10 +939,25 @@ public class OverlordResource
     }
 
     if (taskMaster.isLeader()) {
-      final int numDeleted = 
indexerMetadataStorageAdapter.deletePendingSegments(dataSource, deleteInterval);
-      return Response.ok().entity(ImmutableMap.of("numDeleted", 
numDeleted)).build();
+      try {
+        final int numDeleted = 
indexerMetadataStorageAdapter.deletePendingSegments(dataSource, deleteInterval);
+        return Response.ok().entity(ImmutableMap.of("numDeleted", 
numDeleted)).build();
+      }
+      catch (DruidException e) {
+        return Response.status(e.getStatusCode())
+                       .entity(ImmutableMap.<String, Object>of("error", 
e.getMessage()))
+                       .build();
+      }
+      catch (Exception e) {
+        log.warn(e, "Failed to delete pending segments for datasource[%s] and 
interval[%s].", dataSource, deleteInterval);
+        return Response.status(Status.INTERNAL_SERVER_ERROR)
+                       .entity(ImmutableMap.<String, Object>of("error", 
e.getMessage()))
+                       .build();
+      }
     } else {
-      return Response.status(Status.SERVICE_UNAVAILABLE).build();
+      return Response.status(Status.SERVICE_UNAVAILABLE)
+                     .entity(ImmutableMap.of("error", "overlord is not the 
leader or not initialized yet"))
+                     .build();
     }
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java
index 19605634117..633d861410d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java
@@ -20,6 +20,8 @@
 package org.apache.druid.indexing.overlord;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
 import org.apache.druid.indexer.TaskInfo;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.task.NoopTask;
@@ -27,21 +29,16 @@ import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.easymock.EasyMock;
-import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 import java.util.List;
 
 public class IndexerMetadataStorageAdapterTest
 {
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
-
   private TaskStorageQueryAdapter taskStorageQueryAdapter;
   private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
   private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;
@@ -69,7 +66,7 @@ public class IndexerMetadataStorageAdapterTest
             NoopTask.create()
         ),
         new TaskInfo<>(
-            "id1",
+            "id2",
             DateTimes.of("2017-12-02"),
             TaskStatus.running("id2"),
             "dataSource",
@@ -93,7 +90,7 @@ public class IndexerMetadataStorageAdapterTest
   }
 
   @Test
-  public void testDeletePendingSegmentsOfRunningTasks()
+  public void testDeletePendingSegmentsOfOneOverlappingRunningTask()
   {
     final ImmutableList<TaskInfo<Task, TaskStatus>> taskInfos = 
ImmutableList.of(
         new TaskInfo<>(
@@ -104,7 +101,7 @@ public class IndexerMetadataStorageAdapterTest
             NoopTask.create()
         ),
         new TaskInfo<>(
-            "id1",
+            "id2",
             DateTimes.of("2017-12-02"),
             TaskStatus.running("id2"),
             "dataSource",
@@ -125,8 +122,62 @@ public class IndexerMetadataStorageAdapterTest
         .andReturn(10);
     EasyMock.replay(taskStorageQueryAdapter, 
indexerMetadataStorageCoordinator);
 
-    
expectedException.expect(CoreMatchers.instanceOf(IllegalArgumentException.class));
-    expectedException.expectMessage("Cannot delete pendingSegments because 
there is at least one active task created");
-    indexerMetadataStorageAdapter.deletePendingSegments("dataSource", 
deleteInterval);
+    MatcherAssert.assertThat(
+        Assert.assertThrows(
+            DruidException.class,
+            () -> 
indexerMetadataStorageAdapter.deletePendingSegments("dataSource", 
deleteInterval)
+        ),
+        DruidExceptionMatcher.invalidInput().expectMessageIs(
+            "Cannot delete pendingSegments for datasource[dataSource] as there 
is at least one active task[id1]"
+            + " created at[2017-11-01T00:00:00.000Z] that overlaps with the 
delete "
+            + "interval[2017-01-01T00:00:00.000Z/2017-12-01T00:00:00.000Z]. 
Please retry when there are no active tasks."
+        )
+    );
+  }
+
+  @Test
+  public void testDeletePendingSegmentsOfMultipleOverlappingRunningTasks()
+  {
+    final ImmutableList<TaskInfo<Task, TaskStatus>> taskInfos = 
ImmutableList.of(
+        new TaskInfo<>(
+            "id1",
+            DateTimes.of("2017-12-01"),
+            TaskStatus.running("id1"),
+            "dataSource",
+            NoopTask.create()
+        ),
+        new TaskInfo<>(
+            "id2",
+            DateTimes.of("2017-11-01"),
+            TaskStatus.running("id2"),
+            "dataSource",
+            NoopTask.create()
+        )
+    );
+
+    
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos);
+
+    final Interval deleteInterval = Intervals.of("2017-01-01/2018-12-01");
+    EasyMock
+        .expect(
+            
indexerMetadataStorageCoordinator.deletePendingSegmentsCreatedInInterval(
+                EasyMock.anyString(),
+                EasyMock.eq(deleteInterval)
+            )
+        )
+        .andReturn(10);
+    EasyMock.replay(taskStorageQueryAdapter, 
indexerMetadataStorageCoordinator);
+
+    MatcherAssert.assertThat(
+        Assert.assertThrows(
+            DruidException.class,
+            () -> 
indexerMetadataStorageAdapter.deletePendingSegments("dataSource", 
deleteInterval)
+        ),
+        DruidExceptionMatcher.invalidInput().expectMessageIs(
+            "Cannot delete pendingSegments for datasource[dataSource] as there 
is at least one active task[id2]"
+            + " created at[2017-11-01T00:00:00.000Z] that overlaps with the 
delete"
+            + " interval[2017-01-01T00:00:00.000Z/2018-12-01T00:00:00.000Z]. 
Please retry when there are no active tasks."
+        )
+    );
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index 561e8dd9b19..cdaf5b9b359 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -28,6 +28,8 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.druid.audit.AuditEntry;
 import org.apache.druid.audit.AuditManager;
 import org.apache.druid.common.config.JacksonConfigManager;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexer.RunnerTaskState;
 import org.apache.druid.indexer.TaskInfo;
 import org.apache.druid.indexer.TaskLocation;
@@ -983,10 +985,136 @@ public class OverlordResourceTest
         authConfig
     );
 
-    final Map<String, Integer> response = (Map<String, Integer>) 
overlordResource
-        .killPendingSegments("allow", new Interval(DateTimes.MIN, 
DateTimes.nowUtc()).toString(), req)
-        .getEntity();
-    Assert.assertEquals(2, response.get("numDeleted").intValue());
+    Response response = overlordResource
+        .killPendingSegments("allow", new Interval(DateTimes.MIN, 
DateTimes.nowUtc()).toString(), req);
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertEquals(ImmutableMap.of("numDeleted", 2), 
response.getEntity());
+  }
+
+  @Test
+  public void testKillPendingSegmentsThrowsInvalidInputDruidException()
+  {
+    expectAuthorizationTokenCheck();
+
+    EasyMock.expect(taskMaster.isLeader()).andReturn(true);
+    final String exceptionMsg = "Some exception msg";
+    EasyMock
+        .expect(
+            indexerMetadataStorageAdapter.deletePendingSegments(
+                EasyMock.eq("allow"),
+                EasyMock.anyObject(Interval.class)
+            )
+        )
+        .andThrow(InvalidInput.exception(exceptionMsg))
+        .once();
+
+    EasyMock.replay(
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter,
+        authConfig
+    );
+
+    Response response = overlordResource
+        .killPendingSegments("allow", new Interval(DateTimes.MIN, 
DateTimes.nowUtc()).toString(), req);
+
+    Assert.assertEquals(400, response.getStatus());
+    Assert.assertEquals(ImmutableMap.of("error", exceptionMsg), 
response.getEntity());
+  }
+
+  @Test
+  public void testKillPendingSegmentsThrowsDefensiveDruidException()
+  {
+    expectAuthorizationTokenCheck();
+
+    EasyMock.expect(taskMaster.isLeader()).andReturn(true);
+    final String exceptionMsg = "An internal defensive exception";
+    EasyMock
+        .expect(
+            indexerMetadataStorageAdapter.deletePendingSegments(
+                EasyMock.eq("allow"),
+                EasyMock.anyObject(Interval.class)
+            )
+        )
+        .andThrow(DruidException.defensive(exceptionMsg))
+        .once();
+
+    EasyMock.replay(
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter,
+        authConfig
+    );
+
+    Response response = overlordResource
+        .killPendingSegments("allow", new Interval(DateTimes.MIN, 
DateTimes.nowUtc()).toString(), req);
+
+    Assert.assertEquals(500, response.getStatus());
+    Assert.assertEquals(ImmutableMap.of("error", exceptionMsg), 
response.getEntity());
+  }
+
+  @Test
+  public void testKillPendingSegmentsThrowsArbitraryException()
+  {
+    expectAuthorizationTokenCheck();
+
+    EasyMock.expect(taskMaster.isLeader()).andReturn(true);
+    final String exceptionMsg = "An unexpected illegal state exception";
+    EasyMock
+        .expect(
+            indexerMetadataStorageAdapter.deletePendingSegments(
+                EasyMock.eq("allow"),
+                EasyMock.anyObject(Interval.class)
+            )
+        )
+        .andThrow(new IllegalStateException(exceptionMsg))
+        .once();
+
+    EasyMock.replay(
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter,
+        authConfig
+    );
+
+    Response response = overlordResource
+        .killPendingSegments("allow", new Interval(DateTimes.MIN, 
DateTimes.nowUtc()).toString(), req);
+
+    Assert.assertEquals(500, response.getStatus());
+    Assert.assertEquals(ImmutableMap.of("error", exceptionMsg), 
response.getEntity());
+  }
+
+  @Test
+  public void testKillPendingSegmentsToNonLeader()
+  {
+    expectAuthorizationTokenCheck();
+
+    EasyMock.expect(taskMaster.isLeader()).andReturn(false);
+
+    EasyMock.replay(
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter,
+        authConfig
+    );
+
+    Response response = overlordResource
+        .killPendingSegments("allow", new Interval(DateTimes.MIN, 
DateTimes.nowUtc()).toString(), req);
+
+    Assert.assertEquals(503, response.getStatus());
+    Assert.assertEquals(ImmutableMap.of("error", "overlord is not the leader 
or not initialized yet"), response.getEntity());
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to