This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e3df2d2e9e Clean up stale locks if segment allocation fails (#14966)
0e3df2d2e9e is described below

commit 0e3df2d2e9eb3388d43ac722a76de53a6a263ee8
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Thu Sep 14 14:58:02 2023 +0530

    Clean up stale locks if segment allocation fails (#14966)
    
    * Clean up stale locks if segment allocation fails due to an exception
---
 .../druid/indexing/overlord/TaskLockbox.java       |  33 ++++-
 .../druid/indexing/overlord/TaskLockboxTest.java   | 137 +++++++++++++++++++++
 2 files changed, 167 insertions(+), 3 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index ae8c3313ec7..a8981688888 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -494,9 +494,12 @@ public class TaskLockbox
         allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, 
holderList.getPending());
         holderList.getPending().forEach(holder -> acquireTaskLock(holder, 
false));
       }
-
       holderList.getPending().forEach(holder -> addTaskAndPersistLocks(holder, 
isTimeChunkLock));
     }
+    catch (Exception e) {
+      holderList.clearStaleLocks(this);
+      throw e;
+    }
     finally {
       giant.unlock();
     }
@@ -711,7 +714,8 @@ public class TaskLockbox
    * for the given requests. Updates the holder with the allocated segment if
    * the allocation succeeds, otherwise marks it as failed.
    */
-  private void allocateSegmentIds(
+  @VisibleForTesting
+  void allocateSegmentIds(
       String dataSource,
       Interval interval,
       boolean skipSegmentLineageCheck,
@@ -1598,6 +1602,28 @@ public class TaskLockbox
       return pending;
     }
 
+    /**
+     *  When task locks are acquired in an attempt to allocate segments, *  a 
new lock posse might be created.
+     *  However, the posse is associated with the task only after all the 
segment allocations have succeeded.
+     *  If there is an exception, unlock all such unassociated locks.
+     */
+    void clearStaleLocks(TaskLockbox taskLockbox)
+    {
+      all
+          .stream()
+          .filter(holder -> holder.acquiredLock != null
+                            && holder.taskLockPosse != null
+                            && !holder.taskLockPosse.containsTask(holder.task))
+          .forEach(holder -> {
+            holder.taskLockPosse.addTask(holder.task);
+            taskLockbox.unlock(
+                holder.task,
+                holder.acquiredLock.getInterval(),
+                holder.acquiredLock instanceof SegmentLock ? ((SegmentLock) 
holder.acquiredLock).getPartitionId() : null
+            );
+            log.info("Cleared stale lock[%s] for task[%s]", 
holder.acquiredLock, holder.task.getId());
+          });
+    }
 
     List<SegmentAllocateResult> getResults()
     {
@@ -1608,7 +1634,8 @@ public class TaskLockbox
   /**
    * Contains the task, request, lock and final result for a segment 
allocation.
    */
-  private static class SegmentAllocationHolder
+  @VisibleForTesting
+  static class SegmentAllocationHolder
   {
     final AllocationHolderList list;
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 6b9b4cd213d..ceb1657f68e 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
 import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import org.apache.druid.indexer.TaskStatus;
@@ -34,6 +35,8 @@ import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TimeChunkLock;
+import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
+import org.apache.druid.indexing.common.actions.SegmentAllocateRequest;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
@@ -46,6 +49,7 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
@@ -1727,6 +1731,117 @@ public class TaskLockboxTest
     validator.expectActiveLocks(conflictingLock, floorLock);
   }
 
+  @Test
+  public void testDoNotCleanUsedLockAfterSegmentAllocationFailure()
+  {
+    final Task task = NoopTask.create();
+    final Interval theInterval = Intervals.of("2023/2024");
+    taskStorage.insert(task, TaskStatus.running(task.getId()));
+
+    final TaskLockbox testLockbox = new 
SegmentAllocationFailingTaskLockbox(taskStorage, metadataStorageCoordinator);
+    testLockbox.add(task);
+    final LockResult lockResult = testLockbox.tryLock(task, new 
TimeChunkLockRequest(
+        TaskLockType.SHARED,
+        task,
+        theInterval,
+        null
+    ));
+    Assert.assertTrue(lockResult.isOk());
+
+    SegmentAllocateRequest request = new SegmentAllocateRequest(
+        task,
+        new SegmentAllocateAction(
+            task.getDataSource(),
+            DateTimes.of("2023-01-01"),
+            Granularities.NONE,
+            Granularities.YEAR,
+            task.getId(),
+            null,
+            false,
+            null,
+            null,
+            TaskLockType.SHARED
+        ),
+        90
+    );
+
+    try {
+      testLockbox.allocateSegments(
+          ImmutableList.of(request),
+          "DS",
+          theInterval,
+          false,
+          LockGranularity.TIME_CHUNK
+      );
+    }
+    catch (Exception e) {
+      // do nothing
+    }
+    Assert.assertFalse(testLockbox.getAllLocks().isEmpty());
+    Assert.assertEquals(
+        lockResult.getTaskLock(),
+        testLockbox.getOnlyTaskLockPosseContainingInterval(task, 
theInterval).get(0).getTaskLock()
+    );
+  }
+
+  @Test
+  public void testCleanUpLocksAfterSegmentAllocationFailure()
+  {
+    final Task task = NoopTask.create();
+    taskStorage.insert(task, TaskStatus.running(task.getId()));
+
+    final TaskLockbox testLockbox = new 
SegmentAllocationFailingTaskLockbox(taskStorage, metadataStorageCoordinator);
+    testLockbox.add(task);
+
+    SegmentAllocateRequest request0 = new SegmentAllocateRequest(
+        task,
+        new SegmentAllocateAction(
+            task.getDataSource(),
+            DateTimes.of("2023-01-01"),
+            Granularities.NONE,
+            Granularities.YEAR,
+            task.getId(),
+            null,
+            false,
+            null,
+            null,
+            TaskLockType.SHARED
+        ),
+        90
+    );
+
+    SegmentAllocateRequest request1 = new SegmentAllocateRequest(
+        task,
+        new SegmentAllocateAction(
+            task.getDataSource(),
+            DateTimes.of("2023-01-01"),
+            Granularities.NONE,
+            Granularities.MONTH,
+            task.getId(),
+            null,
+            false,
+            null,
+            null,
+            TaskLockType.SHARED
+        ),
+        90
+    );
+
+    try {
+      testLockbox.allocateSegments(
+          ImmutableList.of(request0, request1),
+          "DS",
+          Intervals.of("2023/2024"),
+          false,
+          LockGranularity.TIME_CHUNK
+      );
+    }
+    catch (Exception e) {
+      // do nothing
+    }
+    Assert.assertTrue(testLockbox.getAllLocks().isEmpty());
+  }
+
 
   private class TaskLockboxValidator
   {
@@ -1953,4 +2068,26 @@ public class TaskLockboxTest
                  .contains("FailingLockAcquisition") ? null : 
super.verifyAndCreateOrFindLockPosse(task, taskLock);
     }
   }
+
+  private static class SegmentAllocationFailingTaskLockbox extends TaskLockbox
+  {
+    public SegmentAllocationFailingTaskLockbox(
+        TaskStorage taskStorage,
+        IndexerMetadataStorageCoordinator metadataStorageCoordinator
+    )
+    {
+      super(taskStorage, metadataStorageCoordinator);
+    }
+
+    @Override
+    void allocateSegmentIds(
+        String dataSource,
+        Interval interval,
+        boolean skipSegmentLineageCheck,
+        Collection<SegmentAllocationHolder> holders
+    )
+    {
+      throw new RuntimeException("This lockbox cannot allocate segemnts.");
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to