This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0e3df2d2e9e Clean up stale locks if segment allocation fails (#14966)
0e3df2d2e9e is described below
commit 0e3df2d2e9eb3388d43ac722a76de53a6a263ee8
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Thu Sep 14 14:58:02 2023 +0530
Clean up stale locks if segment allocation fails (#14966)
* Clean up stale locks if segment allocation fails due to an exception
---
.../druid/indexing/overlord/TaskLockbox.java | 33 ++++-
.../druid/indexing/overlord/TaskLockboxTest.java | 137 +++++++++++++++++++++
2 files changed, 167 insertions(+), 3 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index ae8c3313ec7..a8981688888 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -494,9 +494,12 @@ public class TaskLockbox
allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck,
holderList.getPending());
holderList.getPending().forEach(holder -> acquireTaskLock(holder,
false));
}
-
holderList.getPending().forEach(holder -> addTaskAndPersistLocks(holder,
isTimeChunkLock));
}
+ catch (Exception e) {
+ holderList.clearStaleLocks(this);
+ throw e;
+ }
finally {
giant.unlock();
}
@@ -711,7 +714,8 @@ public class TaskLockbox
* for the given requests. Updates the holder with the allocated segment if
* the allocation succeeds, otherwise marks it as failed.
*/
- private void allocateSegmentIds(
+ @VisibleForTesting
+ void allocateSegmentIds(
String dataSource,
Interval interval,
boolean skipSegmentLineageCheck,
@@ -1598,6 +1602,28 @@ public class TaskLockbox
return pending;
}
+ /**
+ * When task locks are acquired in an attempt to allocate segments, * a
new lock posse might be created.
+ * However, the posse is associated with the task only after all the
segment allocations have succeeded.
+ * If there is an exception, unlock all such unassociated locks.
+ */
+ void clearStaleLocks(TaskLockbox taskLockbox)
+ {
+ all
+ .stream()
+ .filter(holder -> holder.acquiredLock != null
+ && holder.taskLockPosse != null
+ && !holder.taskLockPosse.containsTask(holder.task))
+ .forEach(holder -> {
+ holder.taskLockPosse.addTask(holder.task);
+ taskLockbox.unlock(
+ holder.task,
+ holder.acquiredLock.getInterval(),
+ holder.acquiredLock instanceof SegmentLock ? ((SegmentLock)
holder.acquiredLock).getPartitionId() : null
+ );
+ log.info("Cleared stale lock[%s] for task[%s]",
holder.acquiredLock, holder.task.getId());
+ });
+ }
List<SegmentAllocateResult> getResults()
{
@@ -1608,7 +1634,8 @@ public class TaskLockbox
/**
* Contains the task, request, lock and final result for a segment
allocation.
*/
- private static class SegmentAllocationHolder
+ @VisibleForTesting
+ static class SegmentAllocationHolder
{
final AllocationHolderList list;
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 6b9b4cd213d..ceb1657f68e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.indexer.TaskStatus;
@@ -34,6 +35,8 @@ import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TimeChunkLock;
+import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
+import org.apache.druid.indexing.common.actions.SegmentAllocateRequest;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
@@ -46,6 +49,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
@@ -1727,6 +1731,117 @@ public class TaskLockboxTest
validator.expectActiveLocks(conflictingLock, floorLock);
}
+ @Test
+ public void testDoNotCleanUsedLockAfterSegmentAllocationFailure()
+ {
+ final Task task = NoopTask.create();
+ final Interval theInterval = Intervals.of("2023/2024");
+ taskStorage.insert(task, TaskStatus.running(task.getId()));
+
+ final TaskLockbox testLockbox = new
SegmentAllocationFailingTaskLockbox(taskStorage, metadataStorageCoordinator);
+ testLockbox.add(task);
+ final LockResult lockResult = testLockbox.tryLock(task, new
TimeChunkLockRequest(
+ TaskLockType.SHARED,
+ task,
+ theInterval,
+ null
+ ));
+ Assert.assertTrue(lockResult.isOk());
+
+ SegmentAllocateRequest request = new SegmentAllocateRequest(
+ task,
+ new SegmentAllocateAction(
+ task.getDataSource(),
+ DateTimes.of("2023-01-01"),
+ Granularities.NONE,
+ Granularities.YEAR,
+ task.getId(),
+ null,
+ false,
+ null,
+ null,
+ TaskLockType.SHARED
+ ),
+ 90
+ );
+
+ try {
+ testLockbox.allocateSegments(
+ ImmutableList.of(request),
+ "DS",
+ theInterval,
+ false,
+ LockGranularity.TIME_CHUNK
+ );
+ }
+ catch (Exception e) {
+ // do nothing
+ }
+ Assert.assertFalse(testLockbox.getAllLocks().isEmpty());
+ Assert.assertEquals(
+ lockResult.getTaskLock(),
+ testLockbox.getOnlyTaskLockPosseContainingInterval(task,
theInterval).get(0).getTaskLock()
+ );
+ }
+
+ @Test
+ public void testCleanUpLocksAfterSegmentAllocationFailure()
+ {
+ final Task task = NoopTask.create();
+ taskStorage.insert(task, TaskStatus.running(task.getId()));
+
+ final TaskLockbox testLockbox = new
SegmentAllocationFailingTaskLockbox(taskStorage, metadataStorageCoordinator);
+ testLockbox.add(task);
+
+ SegmentAllocateRequest request0 = new SegmentAllocateRequest(
+ task,
+ new SegmentAllocateAction(
+ task.getDataSource(),
+ DateTimes.of("2023-01-01"),
+ Granularities.NONE,
+ Granularities.YEAR,
+ task.getId(),
+ null,
+ false,
+ null,
+ null,
+ TaskLockType.SHARED
+ ),
+ 90
+ );
+
+ SegmentAllocateRequest request1 = new SegmentAllocateRequest(
+ task,
+ new SegmentAllocateAction(
+ task.getDataSource(),
+ DateTimes.of("2023-01-01"),
+ Granularities.NONE,
+ Granularities.MONTH,
+ task.getId(),
+ null,
+ false,
+ null,
+ null,
+ TaskLockType.SHARED
+ ),
+ 90
+ );
+
+ try {
+ testLockbox.allocateSegments(
+ ImmutableList.of(request0, request1),
+ "DS",
+ Intervals.of("2023/2024"),
+ false,
+ LockGranularity.TIME_CHUNK
+ );
+ }
+ catch (Exception e) {
+ // do nothing
+ }
+ Assert.assertTrue(testLockbox.getAllLocks().isEmpty());
+ }
+
private class TaskLockboxValidator
{
@@ -1953,4 +2068,26 @@ public class TaskLockboxTest
.contains("FailingLockAcquisition") ? null :
super.verifyAndCreateOrFindLockPosse(task, taskLock);
}
}
+
+ private static class SegmentAllocationFailingTaskLockbox extends TaskLockbox
+ {
+ public SegmentAllocationFailingTaskLockbox(
+ TaskStorage taskStorage,
+ IndexerMetadataStorageCoordinator metadataStorageCoordinator
+ )
+ {
+ super(taskStorage, metadataStorageCoordinator);
+ }
+
+ @Override
+ void allocateSegmentIds(
+ String dataSource,
+ Interval interval,
+ boolean skipSegmentLineageCheck,
+ Collection<SegmentAllocationHolder> holders
+ )
+ {
+ throw new RuntimeException("This lockbox cannot allocate segemnts.");
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]