This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ef46d882007 Release unneeded append locks after acquiring a new 
superseding append lock (#15682)
ef46d882007 is described below

commit ef46d8820077cf9e7fc436f5c7e3219b42eb3077
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Tue Jan 30 16:51:56 2024 +0530

    Release unneeded append locks after acquiring a new superseding append lock 
(#15682)
    
    * Fix segment transactional append when publishing with multiple 
overlapping locks
---
 .../druid/indexing/overlord/TaskLockbox.java       | 212 ++++++++-------------
 .../concurrent/ConcurrentReplaceAndAppendTest.java |  38 ++++
 .../druid/indexing/overlord/TaskLockboxTest.java   | 191 +++++++------------
 3 files changed, 195 insertions(+), 246 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index da64198dd00..54e29191cff 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -175,8 +175,6 @@ public class TaskLockbox
             savedTaskLockWithPriority
         );
         if (taskLockPosse != null) {
-          taskLockPosse.addTask(task);
-
           final TaskLock taskLock = taskLockPosse.getTaskLock();
 
           if 
(savedTaskLockWithPriority.getVersion().equals(taskLock.getVersion())) {
@@ -299,7 +297,7 @@ public class TaskLockbox
           throw new ISE("Unknown lockGranularity[%s]", 
taskLock.getGranularity());
       }
 
-      return createOrFindLockPosse(request);
+      return createOrFindLockPosse(request, task, false);
     }
     catch (Exception e) {
       log.error(e,
@@ -401,7 +399,7 @@ public class TaskLockbox
         convertedRequest = request;
       }
 
-      final TaskLockPosse posseToUse = createOrFindLockPosse(convertedRequest);
+      final TaskLockPosse posseToUse = createOrFindLockPosse(convertedRequest, 
task, true);
       if (posseToUse != null && !posseToUse.getTaskLock().isRevoked()) {
         if (request instanceof LockRequestForNewSegment) {
           final LockRequestForNewSegment lockRequestForNewSegment = 
(LockRequestForNewSegment) request;
@@ -416,36 +414,7 @@ public class TaskLockbox
             newSegmentId = allocateSegmentId(lockRequestForNewSegment, 
posseToUse.getTaskLock().getVersion());
           }
         }
-
-        // Add to existing TaskLockPosse, if necessary
-        if (posseToUse.addTask(task)) {
-          log.info("Added task[%s] to TaskLock[%s]", task.getId(), 
posseToUse.getTaskLock());
-
-          // Update task storage facility. If it fails, revoke the lock.
-          try {
-            taskStorage.addLock(task.getId(), posseToUse.getTaskLock());
-            return LockResult.ok(posseToUse.getTaskLock(), newSegmentId);
-          }
-          catch (Exception e) {
-            log.makeAlert("Failed to persist lock in storage")
-               .addData("task", task.getId())
-               .addData("dataSource", posseToUse.getTaskLock().getDataSource())
-               .addData("interval", posseToUse.getTaskLock().getInterval())
-               .addData("version", posseToUse.getTaskLock().getVersion())
-               .emit();
-            unlock(
-                task,
-                convertedRequest.getInterval(),
-                posseToUse.getTaskLock().getGranularity() == 
LockGranularity.SEGMENT
-                ? ((SegmentLock) posseToUse.taskLock).getPartitionId()
-                : null
-            );
-            return LockResult.fail();
-          }
-        } else {
-          log.info("Task[%s] already present in TaskLock[%s]", task.getId(), 
posseToUse.getTaskLock().getGroupId());
-          return LockResult.ok(posseToUse.getTaskLock(), newSegmentId);
-        }
+        return LockResult.ok(posseToUse.getTaskLock(), newSegmentId);
       } else {
         final boolean lockRevoked = posseToUse != null && 
posseToUse.getTaskLock().isRevoked();
         if (lockRevoked) {
@@ -499,11 +468,7 @@ public class TaskLockbox
         allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, 
holderList.getPending());
         holderList.getPending().forEach(holder -> acquireTaskLock(holder, 
false));
       }
-      holderList.getPending().forEach(holder -> addTaskAndPersistLocks(holder, 
isTimeChunkLock));
-    }
-    catch (Exception e) {
-      holderList.clearStaleLocks(this);
-      throw e;
+      holderList.getPending().forEach(SegmentAllocationHolder::markSucceeded);
     }
     finally {
       giant.unlock();
@@ -538,7 +503,7 @@ public class TaskLockbox
     }
 
     // Create or find the task lock for the created lock request
-    final TaskLockPosse posseToUse = createOrFindLockPosse(lockRequest);
+    final TaskLockPosse posseToUse = createOrFindLockPosse(lockRequest, 
holder.task, true);
     final TaskLock acquiredLock = posseToUse == null ? null : 
posseToUse.getTaskLock();
     if (posseToUse == null) {
       holder.markFailed("Could not find or create lock posse.");
@@ -549,60 +514,14 @@ public class TaskLockbox
     }
   }
 
-  /**
-   * Adds the task to the found lock posse if not already added and updates
-   * in the metadata store. Marks the segment allocation as failed if the 
update
-   * did not succeed.
-   */
-  private void addTaskAndPersistLocks(SegmentAllocationHolder holder, boolean 
isTimeChunkLock)
-  {
-    final Task task = holder.task;
-    final TaskLock acquiredLock = holder.acquiredLock;
-
-    if (holder.taskLockPosse.addTask(task)) {
-      log.info("Added task [%s] to TaskLock [%s]", task.getId(), acquiredLock);
-
-      // This can also be batched later
-      boolean success = updateLockInStorage(task, acquiredLock);
-      if (success) {
-        holder.markSucceeded();
-      } else {
-        final Integer partitionId = isTimeChunkLock
-                                    ? null : ((SegmentLock) 
acquiredLock).getPartitionId();
-        unlock(task, holder.lockRequestInterval, partitionId);
-        holder.markFailed("Could not update task lock in metadata store.");
-      }
-    } else {
-      log.info("Task [%s] already present in TaskLock [%s]", task.getId(), 
acquiredLock.getGroupId());
-      holder.markSucceeded();
-    }
-  }
-
-  private boolean updateLockInStorage(Task task, TaskLock taskLock)
-  {
-    try {
-      taskStorage.addLock(task.getId(), taskLock);
-      return true;
-    }
-    catch (Exception e) {
-      log.makeAlert("Failed to persist lock in storage")
-         .addData("task", task.getId())
-         .addData("dataSource", taskLock.getDataSource())
-         .addData("interval", taskLock.getInterval())
-         .addData("version", taskLock.getVersion())
-         .emit();
-
-      return false;
-    }
-  }
-
-  private TaskLockPosse createOrFindLockPosse(LockRequest request)
+  private TaskLockPosse createOrFindLockPosse(LockRequest request, Task task, 
boolean persist)
   {
     Preconditions.checkState(!(request instanceof LockRequestForNewSegment), 
"Can't handle LockRequestForNewSegment");
 
     giant.lock();
 
     try {
+      final TaskLockPosse posseToUse;
       final List<TaskLockPosse> foundPosses = findLockPossesOverlapsInterval(
           request.getDataSource(),
           request.getInterval()
@@ -613,7 +532,7 @@ public class TaskLockbox
           .filter(taskLockPosse -> 
taskLockPosse.getTaskLock().conflict(request))
           .collect(Collectors.toList());
 
-      if (conflictPosses.size() > 0) {
+      if (!conflictPosses.isEmpty()) {
         // If we have some locks for dataSource and interval, check they can 
be reused.
         // If they can't be reused, check lock priority and revoke existing 
locks if possible.
         final List<TaskLockPosse> reusablePosses = foundPosses
@@ -621,7 +540,7 @@ public class TaskLockbox
             .filter(posse -> posse.reusableFor(request))
             .collect(Collectors.toList());
 
-        if (reusablePosses.size() == 0) {
+        if (reusablePosses.isEmpty()) {
           // case 1) this task doesn't have any lock, but others do
 
           if ((request.getType().equals(TaskLockType.APPEND) || 
request.getType().equals(TaskLockType.REPLACE))
@@ -631,14 +550,10 @@ public class TaskLockbox
           }
 
           // First, check if the lock can coexist with its conflicting posses
-          if (canLockCoexist(conflictPosses, request)) {
-            return createNewTaskLockPosse(request);
-          }
-
           // If not, revoke all lower priority locks of different types if the 
request has a greater priority
-          if (revokeAllIncompatibleActiveLocksIfPossible(conflictPosses, 
request)) {
-            return createNewTaskLockPosse(request);
-
+          if (canLockCoexist(conflictPosses, request)
+              || revokeAllIncompatibleActiveLocksIfPossible(conflictPosses, 
request)) {
+            posseToUse = createNewTaskLockPosse(request);
           } else {
             // During a rolling update, tasks of mixed versions can be run at 
the same time. Old tasks would request
             // timeChunkLocks while new tasks would ask segmentLocks. The 
below check is to allow for old and new tasks
@@ -653,7 +568,7 @@ public class TaskLockbox
             if (allDifferentGranularity) {
               // Lock collision was because of the different granularity in 
the same group.
               // We can add a new taskLockPosse.
-              return createNewTaskLockPosse(request);
+              posseToUse = createNewTaskLockPosse(request);
             } else {
               log.info(
                   "Cannot create a new taskLockPosse for request[%s] because 
existing locks[%s] have same or higher priorities",
@@ -665,7 +580,7 @@ public class TaskLockbox
           }
         } else if (reusablePosses.size() == 1) {
           // case 2) we found a lock posse for the given request
-          return reusablePosses.get(0);
+          posseToUse = reusablePosses.get(0);
         } else {
           // case 3) we found multiple lock posses for the given task
           throw new ISE(
@@ -677,8 +592,49 @@ public class TaskLockbox
       } else {
         // We don't have any locks for dataSource and interval.
         // Let's make a new one.
-        return createNewTaskLockPosse(request);
+        posseToUse = createNewTaskLockPosse(request);
+      }
+      if (posseToUse == null || posseToUse.getTaskLock() == null) {
+        return null;
+      }
+      // Add to existing TaskLockPosse
+      if (posseToUse.addTask(task)) {
+        log.info("Added task[%s] to TaskLock[%s]", task.getId(), 
posseToUse.getTaskLock());
+
+        // If the task lock can be used instead of the conflicing posses, 
their locks can be released
+        for (TaskLockPosse conflictPosse : conflictPosses) {
+          if (conflictPosse.containsTask(task) && 
posseToUse.supersedes(conflictPosse)) {
+            unlock(task, conflictPosse.getTaskLock().getInterval());
+          }
+        }
+
+        if (persist) {
+          // Update task storage facility. If it fails, unlock it
+          try {
+            taskStorage.addLock(task.getId(), posseToUse.getTaskLock());
+          }
+          catch (Exception e) {
+            log.makeAlert("Failed to persist lock in storage")
+               .addData("task", task.getId())
+               .addData("dataSource", posseToUse.getTaskLock().getDataSource())
+               .addData("interval", posseToUse.getTaskLock().getInterval())
+               .addData("version", posseToUse.getTaskLock().getVersion())
+               .emit();
+            unlock(
+                task,
+                posseToUse.getTaskLock().getInterval(),
+                posseToUse.getTaskLock().getGranularity() == 
LockGranularity.SEGMENT
+                ? ((SegmentLock) posseToUse.taskLock).getPartitionId()
+                : null
+            );
+            return null;
+          }
+        }
+
+      } else {
+        log.info("Task[%s] already present in TaskLock[%s]", task.getId(), 
posseToUse.getTaskLock().getGroupId());
       }
+      return posseToUse;
     }
     finally {
       giant.unlock();
@@ -1144,7 +1100,7 @@ public class TaskLockbox
             dsRunning.remove(interval.getStart());
           }
 
-          if (running.get(dataSource).size() == 0) {
+          if (running.get(dataSource).isEmpty()) {
             running.remove(dataSource);
           }
 
@@ -1525,14 +1481,14 @@ public class TaskLockbox
       }
       switch (type) {
         case EXCLUSIVE:
-          if (posse.getTaskLock().getPriority() >= priority) {
+          if (posse.getTaskLock().getNonNullPriority() >= priority) {
             return false;
           }
           possesToRevoke.add(posse);
           break;
         case SHARED:
           if (!posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
-            if (posse.getTaskLock().getPriority() >= priority) {
+            if (posse.getTaskLock().getNonNullPriority() >= priority) {
               return false;
             }
             possesToRevoke.add(posse);
@@ -1541,7 +1497,7 @@ public class TaskLockbox
         case REPLACE:
           if (!(posse.getTaskLock().getType().equals(TaskLockType.APPEND)
                 && 
request.getInterval().contains(posse.getTaskLock().getInterval()))) {
-            if (posse.getTaskLock().getPriority() >= priority) {
+            if (posse.getTaskLock().getNonNullPriority() >= priority) {
               return false;
             }
             possesToRevoke.add(posse);
@@ -1551,7 +1507,7 @@ public class TaskLockbox
           if (!(posse.getTaskLock().getType().equals(TaskLockType.APPEND)
                 || (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)
                     && 
posse.getTaskLock().getInterval().contains(request.getInterval())))) {
-            if (posse.getTaskLock().getPriority() >= priority) {
+            if (posse.getTaskLock().getNonNullPriority() >= priority) {
               return false;
             }
             possesToRevoke.add(posse);
@@ -1635,6 +1591,29 @@ public class TaskLockbox
       return taskIds.isEmpty();
     }
 
+    /**
+     * Checks if an APPEND time chunk lock can be reused for another append 
time chunk lock that already exists
+     * and has an interval that strictly contains the other's interval
+     * We do not expect multiple locks to exist with the same interval as the 
existing lock would be reused.
+     * A new append lock with a strictly encompassing interval can be created 
when a concurrent replace
+     * with a coarser granularity commits its segments and the appending task 
makes subsequent allocations
+     * @param other the conflicting lockPosse that already exists
+     * @return true if the task can be unlocked from the other posse after it 
has been added to the newly created posse.
+     */
+    boolean supersedes(TaskLockPosse other)
+    {
+      final TaskLock otherLock = other.taskLock;
+      return !taskLock.isRevoked()
+             && taskLock.getGranularity() == LockGranularity.TIME_CHUNK
+             && taskLock.getGranularity() == otherLock.getGranularity()
+             && taskLock.getType() == TaskLockType.APPEND
+             && taskLock.getType() == otherLock.getType()
+             && taskLock.getVersion().compareTo(otherLock.getVersion()) >= 0
+             && !taskLock.getInterval().equals(otherLock.getInterval())
+             && taskLock.getInterval().contains(otherLock.getInterval())
+             && taskLock.getGroupId().equals(otherLock.getGroupId());
+    }
+
     boolean reusableFor(LockRequest request)
     {
       if (taskLock.getType() == request.getType() && taskLock.getGranularity() 
== request.getGranularity()) {
@@ -1737,29 +1716,6 @@ public class TaskLockbox
       return pending;
     }
 
-    /**
-     *  When task locks are acquired in an attempt to allocate segments, *  a 
new lock posse might be created.
-     *  However, the posse is associated with the task only after all the 
segment allocations have succeeded.
-     *  If there is an exception, unlock all such unassociated locks.
-     */
-    void clearStaleLocks(TaskLockbox taskLockbox)
-    {
-      all
-          .stream()
-          .filter(holder -> holder.acquiredLock != null
-                            && holder.taskLockPosse != null
-                            && !holder.taskLockPosse.containsTask(holder.task))
-          .forEach(holder -> {
-            holder.taskLockPosse.addTask(holder.task);
-            taskLockbox.unlock(
-                holder.task,
-                holder.acquiredLock.getInterval(),
-                holder.acquiredLock instanceof SegmentLock ? ((SegmentLock) 
holder.acquiredLock).getPartitionId() : null
-            );
-            log.info("Cleared stale lock[%s] for task[%s]", 
holder.acquiredLock, holder.task.getId());
-          });
-    }
-
     List<SegmentAllocateResult> getResults()
     {
       return all.stream().map(holder -> 
holder.result).collect(Collectors.toList());
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index 3fda953a454..91c7d6a7175 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.common.task.concurrent;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskLock;
@@ -893,6 +894,43 @@ public class ConcurrentReplaceAndAppendTest extends 
IngestionTestBase
     verifyInputSegments(replaceTask, JAN_23, segment1, segment2, segment3);
   }
 
+  @Test
+  public void testLockAllocateDayReplaceMonthAllocateAppend()
+  {
+    final SegmentIdWithShardSpec pendingSegmentV0
+        = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), 
Granularities.DAY);
+
+    final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
+
+    final DataSegment segmentV10 = createSegment(JAN_23, v1);
+    replaceTask.commitReplaceSegments(segmentV10);
+    verifyIntervalHasUsedSegments(JAN_23, segmentV10);
+
+    final SegmentIdWithShardSpec pendingSegmentV1
+        = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), 
Granularities.DAY);
+    Assert.assertEquals(segmentV10.getVersion(), 
pendingSegmentV1.getVersion());
+
+    final DataSegment segmentV00 = asSegment(pendingSegmentV0);
+    final DataSegment segmentV11 = asSegment(pendingSegmentV1);
+    Set<DataSegment> appendSegments = 
appendTask.commitAppendSegments(segmentV00, segmentV11)
+                                                .getSegments();
+
+    Assert.assertEquals(3, appendSegments.size());
+    // Segment V11 is committed
+    Assert.assertTrue(appendSegments.remove(segmentV11));
+    // Segment V00 is also committed
+    Assert.assertTrue(appendSegments.remove(segmentV00));
+    // Segment V00 is upgraded to v1 with MONTH granularlity at the time of 
commit as V12
+    final DataSegment segmentV12 = Iterables.getOnlyElement(appendSegments);
+    Assert.assertEquals(v1, segmentV12.getVersion());
+    Assert.assertEquals(JAN_23, segmentV12.getInterval());
+    Assert.assertEquals(segmentV00.getLoadSpec(), segmentV12.getLoadSpec());
+
+    verifyIntervalHasUsedSegments(JAN_23, segmentV00, segmentV10, segmentV11, 
segmentV12);
+    verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11, 
segmentV12);
+  }
+
+
   @Nullable
   private DataSegment findSegmentWith(String version, Map<String, Object> 
loadSpec, Set<DataSegment> segments)
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 17ced86cfa3..c4ee78ea6a8 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -36,8 +36,6 @@ import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TimeChunkLock;
-import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
-import org.apache.druid.indexing.common.actions.SegmentAllocateRequest;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
@@ -51,7 +49,6 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
@@ -152,7 +149,12 @@ public class TaskLockboxTest
 
   private LockResult tryTimeChunkLock(TaskLockType lockType, Task task, 
Interval interval)
   {
-    return lockbox.tryLock(task, new TimeChunkLockRequest(lockType, task, 
interval, null));
+    return tryTimeChunkLock(lockType, task, interval, null);
+  }
+
+  private LockResult tryTimeChunkLock(TaskLockType lockType, Task task, 
Interval interval, String version)
+  {
+    return lockbox.tryLock(task, new TimeChunkLockRequest(lockType, task, 
interval, version));
   }
 
   @Test
@@ -1053,7 +1055,7 @@ public class TaskLockboxTest
     Assert.assertEquals(lockRequest.getDataSource(), 
segmentLock.getDataSource());
     Assert.assertEquals(lockRequest.getInterval(), segmentLock.getInterval());
     Assert.assertEquals(lockRequest.getPartialShardSpec().getShardSpecClass(), 
segmentId.getShardSpec().getClass());
-    Assert.assertEquals(lockRequest.getPriority(), lockRequest.getPriority());
+    Assert.assertEquals(lockRequest.getPriority(), 
segmentLock.getPriority().intValue());
   }
 
   @Test
@@ -1809,114 +1811,89 @@ public class TaskLockboxTest
   }
 
   @Test
-  public void testDoNotCleanUsedLockAfterSegmentAllocationFailure()
+  public void testUnlockSupersededLocks()
   {
     final Task task = NoopTask.create();
-    final Interval theInterval = Intervals.of("2023/2024");
     taskStorage.insert(task, TaskStatus.running(task.getId()));
+    lockbox.add(task);
+    final Task otherTask = NoopTask.create();
+    taskStorage.insert(otherTask, TaskStatus.running(otherTask.getId()));
+    lockbox.add(otherTask);
 
-    final TaskLockbox testLockbox = new 
SegmentAllocationFailingTaskLockbox(taskStorage, metadataStorageCoordinator);
-    testLockbox.add(task);
-    final LockResult lockResult = testLockbox.tryLock(task, new 
TimeChunkLockRequest(
-        TaskLockType.SHARED,
-        task,
-        theInterval,
-        null
-    ));
-    Assert.assertTrue(lockResult.isOk());
-
-    SegmentAllocateRequest request = new SegmentAllocateRequest(
+    // Can coexist and is superseded. Will be unlocked
+    final TaskLock supersededLock = tryTimeChunkLock(
+        TaskLockType.APPEND,
         task,
-        new SegmentAllocateAction(
-            task.getDataSource(),
-            DateTimes.of("2023-01-01"),
-            Granularities.NONE,
-            Granularities.YEAR,
-            task.getId(),
-            null,
-            false,
-            null,
-            null,
-            TaskLockType.SHARED
-        ),
-        90
+        Intervals.of("2024-01-01/2024-01-02"),
+        "v0"
+    ).getTaskLock();
+    Assert.assertEquals(
+        ImmutableSet.copyOf(taskStorage.getLocks(task.getId())),
+        ImmutableSet.of(supersededLock)
     );
 
-    try {
-      testLockbox.allocateSegments(
-          ImmutableList.of(request),
-          "DS",
-          theInterval,
-          false,
-          LockGranularity.TIME_CHUNK
-      );
-    }
-    catch (Exception e) {
-      // do nothing
-    }
-    Assert.assertFalse(testLockbox.getAllLocks().isEmpty());
+    // Can coexist, but is not superseded as the task doesn't belong to this 
posse
+    final TaskLock taskNotInPosse = tryTimeChunkLock(
+        TaskLockType.APPEND,
+        otherTask,
+        Intervals.of("2024-01-01/2024-01-02"),
+        "v0"
+    ).getTaskLock();
     Assert.assertEquals(
-        lockResult.getTaskLock(),
-        testLockbox.getOnlyTaskLockPosseContainingInterval(task, 
theInterval).get(0).getTaskLock()
+        ImmutableSet.copyOf(taskStorage.getLocks(otherTask.getId())),
+        ImmutableSet.of(taskNotInPosse)
     );
-  }
-
-  @Test
-  public void testCleanUpLocksAfterSegmentAllocationFailure()
-  {
-    final Task task = NoopTask.create();
-    taskStorage.insert(task, TaskStatus.running(task.getId()));
 
-    final TaskLockbox testLockbox = new 
SegmentAllocationFailingTaskLockbox(taskStorage, metadataStorageCoordinator);
-    testLockbox.add(task);
+    // Can coexist, but is not superseded as it is not an APPEND lock
+    final TaskLock replaceLock = tryTimeChunkLock(
+        TaskLockType.REPLACE,
+        task,
+        Intervals.of("2024-01-01/2025-01-01"),
+        "v0"
+    ).getTaskLock();
+    Assert.assertEquals(
+        ImmutableSet.copyOf(taskStorage.getLocks(task.getId())),
+        ImmutableSet.of(supersededLock, replaceLock)
+    );
 
-    SegmentAllocateRequest request0 = new SegmentAllocateRequest(
+    // Can coexist, but is not superseded due to higher version
+    final TaskLock higherVersion = tryTimeChunkLock(
+        TaskLockType.APPEND,
         task,
-        new SegmentAllocateAction(
-            task.getDataSource(),
-            DateTimes.of("2023-01-01"),
-            Granularities.NONE,
-            Granularities.YEAR,
-            task.getId(),
-            null,
-            false,
-            null,
-            null,
-            TaskLockType.SHARED
-        ),
-        90
+        Intervals.of("2024-01-11/2024-01-12"),
+        "v1"
+    ).getTaskLock();
+    Assert.assertEquals(
+        ImmutableSet.copyOf(taskStorage.getLocks(task.getId())),
+        ImmutableSet.of(supersededLock, replaceLock, higherVersion)
     );
 
-    SegmentAllocateRequest request1 = new SegmentAllocateRequest(
+    // Can coexist, but is not superseded as interval is not fully contained
+    final TaskLock uncontainedInterval = tryTimeChunkLock(
+        TaskLockType.APPEND,
         task,
-        new SegmentAllocateAction(
-            task.getDataSource(),
-            DateTimes.of("2023-01-01"),
-            Granularities.NONE,
-            Granularities.MONTH,
-            task.getId(),
-            null,
-            false,
-            null,
-            null,
-            TaskLockType.SHARED
-        ),
-        90
+        Intervals.of("2024-01-28/2024-02-04"),
+        "v0"
+    ).getTaskLock();
+    Assert.assertEquals(
+        ImmutableSet.copyOf(taskStorage.getLocks(task.getId())),
+        ImmutableSet.of(supersededLock, replaceLock, higherVersion, 
uncontainedInterval)
     );
 
-    try {
-      testLockbox.allocateSegments(
-          ImmutableList.of(request0, request1),
-          "DS",
-          Intervals.of("2023/2024"),
-          false,
-          LockGranularity.TIME_CHUNK
-      );
-    }
-    catch (Exception e) {
-      // do nothing
-    }
-    Assert.assertTrue(testLockbox.getAllLocks().isEmpty());
+    final TaskLock theLock = tryTimeChunkLock(
+        TaskLockType.APPEND,
+        task,
+        Intervals.of("2024-01-01/2024-02-01"),
+        "v0"
+    ).getTaskLock();
+    Assert.assertEquals(
+        ImmutableSet.copyOf(taskStorage.getLocks(task.getId())),
+        ImmutableSet.of(theLock, replaceLock, higherVersion, 
uncontainedInterval)
+    );
+    Assert.assertEquals(
+        ImmutableSet.copyOf(taskStorage.getLocks(otherTask.getId())),
+        ImmutableSet.of(taskNotInPosse)
+    );
   }
 
   @Test
@@ -2175,26 +2152,4 @@ public class TaskLockboxTest
                  .contains("FailingLockAcquisition") ? null : 
super.verifyAndCreateOrFindLockPosse(task, taskLock);
     }
   }
-
-  private static class SegmentAllocationFailingTaskLockbox extends TaskLockbox
-  {
-    public SegmentAllocationFailingTaskLockbox(
-        TaskStorage taskStorage,
-        IndexerMetadataStorageCoordinator metadataStorageCoordinator
-    )
-    {
-      super(taskStorage, metadataStorageCoordinator);
-    }
-
-    @Override
-    void allocateSegmentIds(
-        String dataSource,
-        Interval interval,
-        boolean skipSegmentLineageCheck,
-        Collection<SegmentAllocationHolder> holders
-    )
-    {
-      throw new RuntimeException("This lockbox cannot allocate segemnts.");
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to