This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 08c01f1dae9 Handle and map errors in delete pending segments API
(#15673)
08c01f1dae9 is described below
commit 08c01f1dae9732859ee69f12b4612c6df05c4dde
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Sun Jan 14 20:39:01 2024 -0800
Handle and map errors in delete pending segments API (#15673)
Changes:
- Handle exception in deletePendingSegments API and map to correct HTTP
status code
- Clean up exception message using `DruidException`
- Add unit tests
---
.../overlord/IndexerMetadataStorageAdapter.java | 39 +++---
.../apache/druid/indexing/overlord/TaskMaster.java | 2 +-
.../indexing/overlord/http/OverlordResource.java | 21 +++-
.../IndexerMetadataStorageAdapterTest.java | 75 ++++++++++--
.../overlord/http/OverlordResourceTest.java | 136 ++++++++++++++++++++-
5 files changed, 238 insertions(+), 35 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapter.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapter.java
index 4671c5bb7a9..6d23fcec327 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapter.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapter.java
@@ -19,11 +19,12 @@
package org.apache.druid.indexing.overlord;
-import com.google.common.base.Preconditions;
import com.google.inject.Inject;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.TaskInfo;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.DateTimes;
-import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.Comparator;
@@ -46,23 +47,31 @@ public class IndexerMetadataStorageAdapter
public int deletePendingSegments(String dataSource, Interval deleteInterval)
{
- // Check the given interval overlaps the
interval(minCreatedDateOfActiveTasks, MAX)
- final Optional<DateTime> minCreatedDateOfActiveTasks =
taskStorageQueryAdapter
+ // Find the earliest active task created for the specified datasource; if
one exists,
+ // check if its interval overlaps with the delete interval.
+ final Optional<TaskInfo<Task, TaskStatus>> earliestActiveTaskOptional =
taskStorageQueryAdapter
.getActiveTaskInfo(dataSource)
.stream()
- .map(TaskInfo::getCreatedTime)
- .min(Comparator.naturalOrder());
+ .min(Comparator.comparing(TaskInfo::getCreatedTime));
- final Interval activeTaskInterval = new Interval(
- minCreatedDateOfActiveTasks.orElse(DateTimes.MAX),
- DateTimes.MAX
- );
+ if (earliestActiveTaskOptional.isPresent()) {
+ final TaskInfo<Task, TaskStatus> earliestActiveTask =
earliestActiveTaskOptional.get();
+ final Interval activeTaskInterval = new Interval(
+ earliestActiveTask.getCreatedTime(),
+ DateTimes.MAX
+ );
- Preconditions.checkArgument(
- !deleteInterval.overlaps(activeTaskInterval),
- "Cannot delete pendingSegments because there is at least one active
task created at %s",
- activeTaskInterval.getStart()
- );
+ if (deleteInterval.overlaps(activeTaskInterval)) {
+ throw InvalidInput.exception(
+ "Cannot delete pendingSegments for datasource[%s] as there is at
least one active task[%s] created at[%s] "
+ + "that overlaps with the delete interval[%s]. Please retry when
there are no active tasks.",
+ dataSource,
+ earliestActiveTask.getId(),
+ activeTaskInterval.getStart(),
+ deleteInterval
+ );
+ }
+ }
return
indexerMetadataStorageCoordinator.deletePendingSegmentsCreatedInInterval(dataSource,
deleteInterval);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
index e4d26b37573..4798513aafd 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
@@ -238,7 +238,7 @@ public class TaskMaster implements TaskCountStatsProvider,
TaskSlotCountStatsPro
}
/**
- * Returns true if it's the leader and its all services have been properly
initialized.
+ * Returns true if it's the leader and all its services have been
initialized.
*/
public boolean isLeader()
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index 7afaaa90526..b589dc5ffd8 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -939,10 +939,25 @@ public class OverlordResource
}
if (taskMaster.isLeader()) {
- final int numDeleted =
indexerMetadataStorageAdapter.deletePendingSegments(dataSource, deleteInterval);
- return Response.ok().entity(ImmutableMap.of("numDeleted",
numDeleted)).build();
+ try {
+ final int numDeleted =
indexerMetadataStorageAdapter.deletePendingSegments(dataSource, deleteInterval);
+ return Response.ok().entity(ImmutableMap.of("numDeleted",
numDeleted)).build();
+ }
+ catch (DruidException e) {
+ return Response.status(e.getStatusCode())
+ .entity(ImmutableMap.<String, Object>of("error",
e.getMessage()))
+ .build();
+ }
+ catch (Exception e) {
+ log.warn(e, "Failed to delete pending segments for datasource[%s] and
interval[%s].", dataSource, deleteInterval);
+ return Response.status(Status.INTERNAL_SERVER_ERROR)
+ .entity(ImmutableMap.<String, Object>of("error",
e.getMessage()))
+ .build();
+ }
} else {
- return Response.status(Status.SERVICE_UNAVAILABLE).build();
+ return Response.status(Status.SERVICE_UNAVAILABLE)
+ .entity(ImmutableMap.of("error", "overlord is not the
leader or not initialized yet"))
+ .build();
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java
index 19605634117..633d861410d 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java
@@ -20,6 +20,8 @@
package org.apache.druid.indexing.overlord;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.NoopTask;
@@ -27,21 +29,16 @@ import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.easymock.EasyMock;
-import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import java.util.List;
public class IndexerMetadataStorageAdapterTest
{
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
-
private TaskStorageQueryAdapter taskStorageQueryAdapter;
private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;
@@ -69,7 +66,7 @@ public class IndexerMetadataStorageAdapterTest
NoopTask.create()
),
new TaskInfo<>(
- "id1",
+ "id2",
DateTimes.of("2017-12-02"),
TaskStatus.running("id2"),
"dataSource",
@@ -93,7 +90,7 @@ public class IndexerMetadataStorageAdapterTest
}
@Test
- public void testDeletePendingSegmentsOfRunningTasks()
+ public void testDeletePendingSegmentsOfOneOverlappingRunningTask()
{
final ImmutableList<TaskInfo<Task, TaskStatus>> taskInfos =
ImmutableList.of(
new TaskInfo<>(
@@ -104,7 +101,7 @@ public class IndexerMetadataStorageAdapterTest
NoopTask.create()
),
new TaskInfo<>(
- "id1",
+ "id2",
DateTimes.of("2017-12-02"),
TaskStatus.running("id2"),
"dataSource",
@@ -125,8 +122,62 @@ public class IndexerMetadataStorageAdapterTest
.andReturn(10);
EasyMock.replay(taskStorageQueryAdapter,
indexerMetadataStorageCoordinator);
-
expectedException.expect(CoreMatchers.instanceOf(IllegalArgumentException.class));
- expectedException.expectMessage("Cannot delete pendingSegments because
there is at least one active task created");
- indexerMetadataStorageAdapter.deletePendingSegments("dataSource",
deleteInterval);
+ MatcherAssert.assertThat(
+ Assert.assertThrows(
+ DruidException.class,
+ () ->
indexerMetadataStorageAdapter.deletePendingSegments("dataSource",
deleteInterval)
+ ),
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ "Cannot delete pendingSegments for datasource[dataSource] as there
is at least one active task[id1]"
+ + " created at[2017-11-01T00:00:00.000Z] that overlaps with the
delete "
+ + "interval[2017-01-01T00:00:00.000Z/2017-12-01T00:00:00.000Z].
Please retry when there are no active tasks."
+ )
+ );
+ }
+
+ @Test
+ public void testDeletePendingSegmentsOfMultipleOverlappingRunningTasks()
+ {
+ final ImmutableList<TaskInfo<Task, TaskStatus>> taskInfos =
ImmutableList.of(
+ new TaskInfo<>(
+ "id1",
+ DateTimes.of("2017-12-01"),
+ TaskStatus.running("id1"),
+ "dataSource",
+ NoopTask.create()
+ ),
+ new TaskInfo<>(
+ "id2",
+ DateTimes.of("2017-11-01"),
+ TaskStatus.running("id2"),
+ "dataSource",
+ NoopTask.create()
+ )
+ );
+
+
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos);
+
+ final Interval deleteInterval = Intervals.of("2017-01-01/2018-12-01");
+ EasyMock
+ .expect(
+
indexerMetadataStorageCoordinator.deletePendingSegmentsCreatedInInterval(
+ EasyMock.anyString(),
+ EasyMock.eq(deleteInterval)
+ )
+ )
+ .andReturn(10);
+ EasyMock.replay(taskStorageQueryAdapter,
indexerMetadataStorageCoordinator);
+
+ MatcherAssert.assertThat(
+ Assert.assertThrows(
+ DruidException.class,
+ () ->
indexerMetadataStorageAdapter.deletePendingSegments("dataSource",
deleteInterval)
+ ),
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ "Cannot delete pendingSegments for datasource[dataSource] as there
is at least one active task[id2]"
+ + " created at[2017-11-01T00:00:00.000Z] that overlaps with the
delete"
+ + " interval[2017-01-01T00:00:00.000Z/2018-12-01T00:00:00.000Z].
Please retry when there are no active tasks."
+ )
+ );
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index 561e8dd9b19..cdaf5b9b359 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -28,6 +28,8 @@ import com.google.common.collect.ImmutableSet;
import org.apache.druid.audit.AuditEntry;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.common.config.JacksonConfigManager;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskLocation;
@@ -983,10 +985,136 @@ public class OverlordResourceTest
authConfig
);
- final Map<String, Integer> response = (Map<String, Integer>)
overlordResource
- .killPendingSegments("allow", new Interval(DateTimes.MIN,
DateTimes.nowUtc()).toString(), req)
- .getEntity();
- Assert.assertEquals(2, response.get("numDeleted").intValue());
+ Response response = overlordResource
+ .killPendingSegments("allow", new Interval(DateTimes.MIN,
DateTimes.nowUtc()).toString(), req);
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(ImmutableMap.of("numDeleted", 2),
response.getEntity());
+ }
+
+ @Test
+ public void testKillPendingSegmentsThrowsInvalidInputDruidException()
+ {
+ expectAuthorizationTokenCheck();
+
+ EasyMock.expect(taskMaster.isLeader()).andReturn(true);
+ final String exceptionMsg = "Some exception msg";
+ EasyMock
+ .expect(
+ indexerMetadataStorageAdapter.deletePendingSegments(
+ EasyMock.eq("allow"),
+ EasyMock.anyObject(Interval.class)
+ )
+ )
+ .andThrow(InvalidInput.exception(exceptionMsg))
+ .once();
+
+ EasyMock.replay(
+ taskRunner,
+ taskMaster,
+ taskStorageQueryAdapter,
+ indexerMetadataStorageAdapter,
+ req,
+ workerTaskRunnerQueryAdapter,
+ authConfig
+ );
+
+ Response response = overlordResource
+ .killPendingSegments("allow", new Interval(DateTimes.MIN,
DateTimes.nowUtc()).toString(), req);
+
+ Assert.assertEquals(400, response.getStatus());
+ Assert.assertEquals(ImmutableMap.of("error", exceptionMsg),
response.getEntity());
+ }
+
+ @Test
+ public void testKillPendingSegmentsThrowsDefensiveDruidException()
+ {
+ expectAuthorizationTokenCheck();
+
+ EasyMock.expect(taskMaster.isLeader()).andReturn(true);
+ final String exceptionMsg = "An internal defensive exception";
+ EasyMock
+ .expect(
+ indexerMetadataStorageAdapter.deletePendingSegments(
+ EasyMock.eq("allow"),
+ EasyMock.anyObject(Interval.class)
+ )
+ )
+ .andThrow(DruidException.defensive(exceptionMsg))
+ .once();
+
+ EasyMock.replay(
+ taskRunner,
+ taskMaster,
+ taskStorageQueryAdapter,
+ indexerMetadataStorageAdapter,
+ req,
+ workerTaskRunnerQueryAdapter,
+ authConfig
+ );
+
+ Response response = overlordResource
+ .killPendingSegments("allow", new Interval(DateTimes.MIN,
DateTimes.nowUtc()).toString(), req);
+
+ Assert.assertEquals(500, response.getStatus());
+ Assert.assertEquals(ImmutableMap.of("error", exceptionMsg),
response.getEntity());
+ }
+
+ @Test
+ public void testKillPendingSegmentsThrowsArbitraryException()
+ {
+ expectAuthorizationTokenCheck();
+
+ EasyMock.expect(taskMaster.isLeader()).andReturn(true);
+ final String exceptionMsg = "An unexpected illegal state exception";
+ EasyMock
+ .expect(
+ indexerMetadataStorageAdapter.deletePendingSegments(
+ EasyMock.eq("allow"),
+ EasyMock.anyObject(Interval.class)
+ )
+ )
+ .andThrow(new IllegalStateException(exceptionMsg))
+ .once();
+
+ EasyMock.replay(
+ taskRunner,
+ taskMaster,
+ taskStorageQueryAdapter,
+ indexerMetadataStorageAdapter,
+ req,
+ workerTaskRunnerQueryAdapter,
+ authConfig
+ );
+
+ Response response = overlordResource
+ .killPendingSegments("allow", new Interval(DateTimes.MIN,
DateTimes.nowUtc()).toString(), req);
+
+ Assert.assertEquals(500, response.getStatus());
+ Assert.assertEquals(ImmutableMap.of("error", exceptionMsg),
response.getEntity());
+ }
+
+ @Test
+ public void testKillPendingSegmentsToNonLeader()
+ {
+ expectAuthorizationTokenCheck();
+
+ EasyMock.expect(taskMaster.isLeader()).andReturn(false);
+
+ EasyMock.replay(
+ taskRunner,
+ taskMaster,
+ taskStorageQueryAdapter,
+ indexerMetadataStorageAdapter,
+ req,
+ workerTaskRunnerQueryAdapter,
+ authConfig
+ );
+
+ Response response = overlordResource
+ .killPendingSegments("allow", new Interval(DateTimes.MIN,
DateTimes.nowUtc()).toString(), req);
+
+ Assert.assertEquals(503, response.getStatus());
+ Assert.assertEquals(ImmutableMap.of("error", "overlord is not the leader
or not initialized yet"), response.getEntity());
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]