This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 92a40d81699 Add API to fetch conflicting task locks (#16799)
92a40d81699 is described below

commit 92a40d81699742012deb795dabba29fdffe683c1
Author: AmatyaAvadhanula <amatya.avadhan...@imply.io>
AuthorDate: Tue Jul 30 11:40:48 2024 +0530

    Add API to fetch conflicting task locks (#16799)
    
    * Add API to fetch conflicting active locks
---
 .../druid/indexing/overlord/TaskLockbox.java       |  92 +++---
 .../druid/indexing/overlord/TaskQueryTool.java     |  16 +-
 .../indexing/overlord/http/OverlordResource.java   |  17 +-
 .../indexing/overlord/http/TaskLockResponse.java   |  54 ++++
 .../druid/indexing/overlord/TaskLockboxTest.java   | 307 ++++++++++++---------
 .../overlord/http/OverlordResourceTest.java        |  82 +++++-
 .../druid/testsEx/indexer/ITIndexerTest.java       |   5 +-
 .../clients/OverlordResourceTestClient.java        |   7 +-
 .../duty/ITAutoCompactionLockContentionTest.java   |   7 +-
 .../apache/druid/tests/indexer/ITIndexerTest.java  |   5 +-
 .../apache/druid/metadata/LockFilterPolicy.java    |  31 +--
 .../server/coordinator/duty/CompactSegments.java   |   3 +-
 .../druid/rpc/indexing/OverlordClientImplTest.java |   4 +-
 13 files changed, 396 insertions(+), 234 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 2155ac2c265..bebb52157d6 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -42,6 +42,7 @@ import 
org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.UOE;
@@ -49,6 +50,7 @@ import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.metadata.ReplaceTaskLock;
+import org.apache.druid.query.QueryContexts;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
@@ -992,50 +994,76 @@ public class TaskLockbox
   }
 
   /**
-   * Gets a List of Intervals locked by higher priority tasks for each 
datasource.
-   * Here, Segment Locks are being treated the same as Time Chunk Locks i.e.
-   * a Task with a Segment Lock is assumed to lock a whole Interval and not 
just
-   * the corresponding Segment.
-   *
-   * @param minTaskPriority Minimum task priority for each datasource. Only the
-   *                        Intervals that are locked by Tasks with equal or
-   *                        higher priority than this are returned. Locked 
intervals
-   *                        for datasources that are not present in this Map 
are
-   *                        not returned.
-   * @return Map from Datasource to List of Intervals locked by Tasks that have
-   * priority greater than or equal to the {@code minTaskPriority} for that 
datasource.
+   * @param lockFilterPolicies Lock filters for the given datasources
+   * @return Map from datasource to list of non-revoked locks with at least as 
much priority and an overlapping interval
    */
-  public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> 
minTaskPriority)
+  public Map<String, List<TaskLock>> getActiveLocks(List<LockFilterPolicy> 
lockFilterPolicies)
   {
-    final Map<String, Set<Interval>> datasourceToIntervals = new HashMap<>();
+    final Map<String, List<TaskLock>> datasourceToLocks = new HashMap<>();
 
     // Take a lock and populate the maps
     giant.lock();
+
     try {
-      running.forEach(
-          (datasource, datasourceLocks) -> {
-            // If this datasource is not requested, do not proceed
-            if (!minTaskPriority.containsKey(datasource)) {
+      lockFilterPolicies.forEach(
+          lockFilter -> {
+            final String datasource = lockFilter.getDatasource();
+            if (!running.containsKey(datasource)) {
               return;
             }
 
-            datasourceLocks.forEach(
+            final int priority = lockFilter.getPriority();
+            final List<Interval> intervals;
+            if (lockFilter.getIntervals() != null) {
+              intervals = lockFilter.getIntervals();
+            } else {
+              intervals = Collections.singletonList(Intervals.ETERNITY);
+            }
+
+            final Map<String, Object> context = lockFilter.getContext();
+            final boolean ignoreAppendLocks;
+            final Boolean useConcurrentLocks = QueryContexts.getAsBoolean(
+                Tasks.USE_CONCURRENT_LOCKS,
+                context.get(Tasks.USE_CONCURRENT_LOCKS)
+            );
+            if (useConcurrentLocks == null) {
+              TaskLockType taskLockType = QueryContexts.getAsEnum(
+                  Tasks.TASK_LOCK_TYPE,
+                  context.get(Tasks.TASK_LOCK_TYPE),
+                  TaskLockType.class
+              );
+              if (taskLockType == null) {
+                ignoreAppendLocks = Tasks.DEFAULT_USE_CONCURRENT_LOCKS;
+              } else {
+                ignoreAppendLocks = taskLockType == TaskLockType.APPEND;
+              }
+            } else {
+              ignoreAppendLocks = useConcurrentLocks;
+            }
+
+            running.get(datasource).forEach(
                 (startTime, startTimeLocks) -> startTimeLocks.forEach(
                     (interval, taskLockPosses) -> taskLockPosses.forEach(
                         taskLockPosse -> {
                           if (taskLockPosse.getTaskLock().isRevoked()) {
-                            // Do not proceed if the lock is revoked
-                            return;
+                            // do nothing
                           } else if (taskLockPosse.getTaskLock().getPriority() 
== null
-                                     || 
taskLockPosse.getTaskLock().getPriority() < minTaskPriority.get(datasource)) {
-                            // Do not proceed if the lock has a priority 
strictly less than the minimum
-                            return;
+                                     || 
taskLockPosse.getTaskLock().getPriority() < priority) {
+                            // do nothing
+                          } else if (ignoreAppendLocks
+                                     && taskLockPosse.getTaskLock().getType() 
== TaskLockType.APPEND) {
+                            // do nothing
+                          } else {
+                            for (Interval filterInterval : intervals) {
+                              if (interval.overlaps(filterInterval)) {
+                                datasourceToLocks.computeIfAbsent(datasource, 
ds -> new ArrayList<>())
+                                                 
.add(taskLockPosse.getTaskLock());
+                                break;
+                              }
+                            }
                           }
-
-                          datasourceToIntervals
-                              .computeIfAbsent(datasource, k -> new 
HashSet<>())
-                              .add(interval);
-                        })
+                        }
+                    )
                 )
             );
           }
@@ -1045,11 +1073,7 @@ public class TaskLockbox
       giant.unlock();
     }
 
-    return datasourceToIntervals.entrySet().stream()
-                                .collect(Collectors.toMap(
-                                    Map.Entry::getKey,
-                                    entry -> new ArrayList<>(entry.getValue())
-                                ));
+    return datasourceToLocks;
   }
 
   public void unlock(final Task task, final Interval interval)
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
index f5351d7c6e5..b25bde067c7 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
@@ -27,6 +27,7 @@ import org.apache.druid.common.config.JacksonConfigManager;
 import org.apache.druid.indexer.TaskInfo;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
 import org.apache.druid.indexing.overlord.http.TaskStateLookup;
@@ -94,19 +95,12 @@ public class TaskQueryTool
   }
 
   /**
-   * Gets a List of Intervals locked by higher priority tasks for each 
datasource.
-   *
-   * @param minTaskPriority Minimum task priority for each datasource. Only the
-   *                        Intervals that are locked by Tasks with equal or
-   *                        higher priority than this are returned. Locked 
intervals
-   *                        for datasources that are not present in this Map 
are
-   *                        not returned.
-   * @return Map from Datasource to List of Intervals locked by Tasks that have
-   * priority greater than or equal to the {@code minTaskPriority} for that 
datasource.
+   * @param lockFilterPolicies Requests for active locks for various 
datasources
+   * @return Map from datasource to conflicting lock infos
    */
-  public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> 
minTaskPriority)
+  public Map<String, List<TaskLock>> getActiveLocks(List<LockFilterPolicy> 
lockFilterPolicies)
   {
-    return taskLockbox.getLockedIntervals(minTaskPriority);
+    return taskLockbox.getActiveLocks(lockFilterPolicies);
   }
 
   public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String 
dataSource)
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index 54ada7cb2b4..b62e1de055f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -241,33 +241,32 @@ public class OverlordResource
     }
   }
 
-  @Deprecated
   @POST
-  @Path("/lockedIntervals")
+  @Path("/lockedIntervals/v2")
   @Produces(MediaType.APPLICATION_JSON)
   @ResourceFilters(StateResourceFilter.class)
-  public Response getDatasourceLockedIntervals(Map<String, Integer> 
minTaskPriority)
+  public Response getDatasourceLockedIntervals(List<LockFilterPolicy> 
lockFilterPolicies)
   {
-    if (minTaskPriority == null || minTaskPriority.isEmpty()) {
-      return Response.status(Status.BAD_REQUEST).entity("No Datasource 
provided").build();
+    if (lockFilterPolicies == null || lockFilterPolicies.isEmpty()) {
+      return Response.status(Status.BAD_REQUEST).entity("No filter 
provided").build();
     }
 
     // Build the response
-    return 
Response.ok(taskQueryTool.getLockedIntervals(minTaskPriority)).build();
+    return 
Response.ok(taskQueryTool.getLockedIntervals(lockFilterPolicies)).build();
   }
 
   @POST
-  @Path("/lockedIntervals/v2")
+  @Path("/activeLocks")
   @Produces(MediaType.APPLICATION_JSON)
   @ResourceFilters(StateResourceFilter.class)
-  public Response getDatasourceLockedIntervalsV2(List<LockFilterPolicy> 
lockFilterPolicies)
+  public Response getActiveLocks(List<LockFilterPolicy> lockFilterPolicies)
   {
     if (lockFilterPolicies == null || lockFilterPolicies.isEmpty()) {
       return Response.status(Status.BAD_REQUEST).entity("No filter 
provided").build();
     }
 
     // Build the response
-    return 
Response.ok(taskQueryTool.getLockedIntervals(lockFilterPolicies)).build();
+    return Response.ok(new 
TaskLockResponse(taskQueryTool.getActiveLocks(lockFilterPolicies))).build();
   }
 
   @GET
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TaskLockResponse.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TaskLockResponse.java
new file mode 100644
index 00000000000..df4a5d8b03c
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TaskLockResponse.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexing.common.TaskLock;
+
+import java.util.List;
+import java.util.Map;
+
+public class TaskLockResponse
+{
+  private final Map<String, List<TaskLock>> datasourceToLocks;
+
+  @JsonCreator
+  public TaskLockResponse(
+      @JsonProperty("datasourceToLocks") final Map<String, List<TaskLock>> 
datasourceToLocks
+  )
+  {
+    this.datasourceToLocks = datasourceToLocks;
+  }
+
+  @JsonProperty
+  public Map<String, List<TaskLock>> getDatasourceToLocks()
+  {
+    return datasourceToLocks;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "TaskLockResponse{" +
+           "datasourceToLocks='" + datasourceToLocks +
+           '}';
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index a02b5108767..a8c4b5117b1 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -75,7 +75,6 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -1174,99 +1173,6 @@ public class TaskLockboxTest
     );
   }
 
-  @Test
-  public void testGetLockedIntervals()
-  {
-    // Acquire locks for task1
-    final Task task1 = NoopTask.forDatasource("ds1");
-    lockbox.add(task1);
-
-    tryTimeChunkLock(
-        TaskLockType.EXCLUSIVE,
-        task1,
-        Intervals.of("2017-01-01/2017-02-01")
-    );
-    tryTimeChunkLock(
-        TaskLockType.EXCLUSIVE,
-        task1,
-        Intervals.of("2017-04-01/2017-05-01")
-    );
-
-    // Acquire locks for task2
-    final Task task2 = NoopTask.forDatasource("ds2");
-    lockbox.add(task2);
-    tryTimeChunkLock(
-        TaskLockType.EXCLUSIVE,
-        task2,
-        Intervals.of("2017-03-01/2017-04-01")
-    );
-
-    // Verify the locked intervals
-    final Map<String, Integer> minTaskPriority = new HashMap<>();
-    minTaskPriority.put(task1.getDataSource(), 10);
-    minTaskPriority.put(task2.getDataSource(), 10);
-    final Map<String, List<Interval>> lockedIntervals = 
lockbox.getLockedIntervals(minTaskPriority);
-    Assert.assertEquals(2, lockedIntervals.size());
-
-    Assert.assertEquals(
-        Arrays.asList(
-            Intervals.of("2017-01-01/2017-02-01"),
-            Intervals.of("2017-04-01/2017-05-01")
-        ),
-        lockedIntervals.get(task1.getDataSource())
-    );
-
-    Assert.assertEquals(
-        Collections.singletonList(
-            Intervals.of("2017-03-01/2017-04-01")),
-        lockedIntervals.get(task2.getDataSource())
-    );
-  }
-
-  @Test
-  public void testGetLockedIntervalsForLowPriorityTask()
-  {
-    // Acquire lock for a low priority task
-    final Task lowPriorityTask = NoopTask.ofPriority(5);
-    lockbox.add(lowPriorityTask);
-    taskStorage.insert(lowPriorityTask, 
TaskStatus.running(lowPriorityTask.getId()));
-    tryTimeChunkLock(
-        TaskLockType.EXCLUSIVE,
-        lowPriorityTask,
-        Intervals.of("2017/2018")
-    );
-
-    final Map<String, Integer> minTaskPriority = new HashMap<>();
-    minTaskPriority.put(lowPriorityTask.getDataSource(), 10);
-
-    Map<String, List<Interval>> lockedIntervals = 
lockbox.getLockedIntervals(minTaskPriority);
-    Assert.assertTrue(lockedIntervals.isEmpty());
-  }
-
-  @Test
-  public void testGetLockedIntervalsForEqualPriorityTask()
-  {
-    // Acquire lock for a low priority task
-    final Task task = NoopTask.ofPriority(5);
-    lockbox.add(task);
-    taskStorage.insert(task, TaskStatus.running(task.getId()));
-    tryTimeChunkLock(
-        TaskLockType.EXCLUSIVE,
-        task,
-        Intervals.of("2017/2018")
-    );
-
-    final Map<String, Integer> minTaskPriority = new HashMap<>();
-    minTaskPriority.put(task.getDataSource(), 5);
-
-    Map<String, List<Interval>> lockedIntervals = 
lockbox.getLockedIntervals(minTaskPriority);
-    Assert.assertEquals(1, lockedIntervals.size());
-    Assert.assertEquals(
-        Collections.singletonList(Intervals.of("2017/2018")),
-        lockedIntervals.get(task.getDataSource())
-    );
-  }
-
   @Test
   public void testGetLockedIntervalsForHigherPriorityExclusiveLock()
   {
@@ -1282,6 +1188,7 @@ public class TaskLockboxTest
     LockFilterPolicy requestForExclusiveLowerPriorityLock = new 
LockFilterPolicy(
         task.getDataSource(),
         75,
+        null,
         null
     );
 
@@ -1305,6 +1212,7 @@ public class TaskLockboxTest
     LockFilterPolicy requestForExclusiveLowerPriorityLock = new 
LockFilterPolicy(
         task.getDataSource(),
         25,
+        null,
         null
     );
 
@@ -1332,6 +1240,7 @@ public class TaskLockboxTest
     LockFilterPolicy requestForReplaceLowerPriorityLock = new LockFilterPolicy(
         task.getDataSource(),
         25,
+        null,
         ImmutableMap.of(Tasks.TASK_LOCK_TYPE, TaskLockType.REPLACE.name())
     );
 
@@ -1355,6 +1264,7 @@ public class TaskLockboxTest
     LockFilterPolicy requestForReplaceLowerPriorityLock = new LockFilterPolicy(
         task.getDataSource(),
         25,
+        null,
         ImmutableMap.of(
             Tasks.TASK_LOCK_TYPE,
             TaskLockType.EXCLUSIVE.name(),
@@ -1369,6 +1279,171 @@ public class TaskLockboxTest
   }
 
 
+  @Test
+  public void testGetActiveLocks()
+  {
+    final Set<TaskLock> expectedLocks = new HashSet<>();
+    final TaskLock overlappingReplaceLock =
+        validator.expectLockCreated(TaskLockType.REPLACE, 
Intervals.of("2024/2025"), 50);
+    expectedLocks.add(overlappingReplaceLock);
+
+    //Lower priority
+    validator.expectLockCreated(TaskLockType.APPEND, 
Intervals.of("2024/2025"), 25);
+
+    final TaskLock overlappingAppendLock =
+        validator.expectLockCreated(TaskLockType.APPEND, 
Intervals.of("2024-01-01/2024-02-01"), 75);
+    expectedLocks.add(overlappingAppendLock);
+
+    // Non-overlapping interval
+    validator.expectLockCreated(TaskLockType.APPEND, 
Intervals.of("2024-12-01/2025-01-01"), 75);
+
+    final TaskLock overlappingExclusiveLock =
+        validator.expectLockCreated(TaskLockType.EXCLUSIVE, 
Intervals.of("2020/2021"), 50);
+    expectedLocks.add(overlappingExclusiveLock);
+
+    LockFilterPolicy policy = new LockFilterPolicy(
+        "none",
+        50,
+        ImmutableList.of(Intervals.of("2020/2021"), 
Intervals.of("2024-01-01/2024-07-01")),
+        null
+    );
+
+    LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy(
+        "nonExistent",
+        0,
+        null,
+        null
+    );
+
+    Map<String, List<TaskLock>> activeLocks =
+        lockbox.getActiveLocks(ImmutableList.of(policy, 
policyForNonExistentDatasource));
+    Assert.assertEquals(1, activeLocks.size());
+    Assert.assertEquals(expectedLocks, new HashSet<>(activeLocks.get("none")));
+  }
+
+  @Test
+  public void testGetActiveLocksWithAppendLockIgnoresAppendLocks()
+  {
+    final Set<TaskLock> expectedLocks = new HashSet<>();
+    final TaskLock overlappingReplaceLock =
+        validator.expectLockCreated(TaskLockType.REPLACE, 
Intervals.of("2024/2025"), 50);
+    expectedLocks.add(overlappingReplaceLock);
+
+    //Lower priority
+    validator.expectLockCreated(TaskLockType.APPEND, 
Intervals.of("2024/2025"), 25);
+
+    validator.expectLockCreated(TaskLockType.APPEND, 
Intervals.of("2024-01-01/2024-02-01"), 75);
+
+    // Non-overlapping interval
+    validator.expectLockCreated(TaskLockType.APPEND, 
Intervals.of("2024-12-01/2025-01-01"), 75);
+
+    final TaskLock overlappingExclusiveLock =
+        validator.expectLockCreated(TaskLockType.EXCLUSIVE, 
Intervals.of("2020/2021"), 50);
+    expectedLocks.add(overlappingExclusiveLock);
+
+    LockFilterPolicy policy = new LockFilterPolicy(
+        "none",
+        50,
+        ImmutableList.of(Intervals.of("2020/2021"), 
Intervals.of("2024-01-01/2024-07-01")),
+        ImmutableMap.of(Tasks.TASK_LOCK_TYPE, TaskLockType.APPEND.name())
+    );
+
+    LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy(
+        "nonExistent",
+        0,
+        null,
+        null
+    );
+
+    Map<String, List<TaskLock>> activeLocks =
+        lockbox.getActiveLocks(ImmutableList.of(policy, 
policyForNonExistentDatasource));
+    Assert.assertEquals(1, activeLocks.size());
+    Assert.assertEquals(expectedLocks, new HashSet<>(activeLocks.get("none")));
+  }
+
+  @Test
+  public void testGetActiveLocksWithConcurrentLocksIgnoresAppendLocks()
+  {
+    final Set<TaskLock> expectedLocks = new HashSet<>();
+    final TaskLock overlappingReplaceLock =
+        validator.expectLockCreated(TaskLockType.REPLACE, 
Intervals.of("2024/2025"), 50);
+    expectedLocks.add(overlappingReplaceLock);
+
+    //Lower priority
+    validator.expectLockCreated(TaskLockType.APPEND, 
Intervals.of("2024/2025"), 25);
+
+    validator.expectLockCreated(TaskLockType.APPEND, 
Intervals.of("2024-01-01/2024-02-01"), 75);
+
+    // Non-overlapping interval
+    validator.expectLockCreated(TaskLockType.APPEND, 
Intervals.of("2024-12-01/2025-01-01"), 75);
+
+    final TaskLock overlappingExclusiveLock =
+        validator.expectLockCreated(TaskLockType.EXCLUSIVE, 
Intervals.of("2020/2021"), 50);
+    expectedLocks.add(overlappingExclusiveLock);
+
+    LockFilterPolicy policy = new LockFilterPolicy(
+        "none",
+        50,
+        ImmutableList.of(Intervals.of("2020/2021"), 
Intervals.of("2024-01-01/2024-07-01")),
+        ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, true, 
Tasks.TASK_LOCK_TYPE, TaskLockType.EXCLUSIVE.name())
+    );
+
+    LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy(
+        "nonExistent",
+        0,
+        null,
+        null
+    );
+
+    Map<String, List<TaskLock>> activeLocks =
+        lockbox.getActiveLocks(ImmutableList.of(policy, 
policyForNonExistentDatasource));
+    Assert.assertEquals(1, activeLocks.size());
+    Assert.assertEquals(expectedLocks, new HashSet<>(activeLocks.get("none")));
+  }
+
+  @Test
+  public void testGetActiveLocksWithoutConcurrentLocksConsidersAppendLocks()
+  {
+    final Set<TaskLock> expectedLocks = new HashSet<>();
+    final TaskLock overlappingReplaceLock =
+        validator.expectLockCreated(TaskLockType.REPLACE, 
Intervals.of("2024/2025"), 50);
+
+    expectedLocks.add(overlappingReplaceLock);
+
+    //Lower priority
+    validator.expectLockCreated(TaskLockType.APPEND, 
Intervals.of("2024/2025"), 25);
+
+    final TaskLock overlappingAppendLock =
+        validator.expectLockCreated(TaskLockType.APPEND, 
Intervals.of("2024-01-01/2024-02-01"), 75);
+    expectedLocks.add(overlappingAppendLock);
+
+    // Non-overlapping interval
+    validator.expectLockCreated(TaskLockType.APPEND, 
Intervals.of("2024-12-01/2025-01-01"), 75);
+
+    final TaskLock overlappingExclusiveLock =
+        validator.expectLockCreated(TaskLockType.EXCLUSIVE, 
Intervals.of("2020/2021"), 50);
+    expectedLocks.add(overlappingExclusiveLock);
+
+    LockFilterPolicy policy = new LockFilterPolicy(
+        "none",
+        50,
+        ImmutableList.of(Intervals.of("2020/2021"), 
Intervals.of("2024-01-01/2024-07-01")),
+        ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, false, 
Tasks.TASK_LOCK_TYPE, TaskLockType.APPEND.name())
+    );
+
+    LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy(
+        "nonExistent",
+        0,
+        null,
+        null
+    );
+
+    Map<String, List<TaskLock>> activeLocks =
+        lockbox.getActiveLocks(ImmutableList.of(policy, 
policyForNonExistentDatasource));
+    Assert.assertEquals(1, activeLocks.size());
+    Assert.assertEquals(expectedLocks, new HashSet<>(activeLocks.get("none")));
+  }
+
   @Test
   public void testExclusiveLockCompatibility()
   {
@@ -1770,50 +1845,6 @@ public class TaskLockboxTest
     validator.expectLockNotGranted(TaskLockType.APPEND, otherGroupTask, 
Intervals.of("2024/2025"));
   }
 
-  @Test
-  public void testGetLockedIntervalsForRevokedLocks()
-  {
-    // Acquire lock for a low priority task
-    final Task lowPriorityTask = NoopTask.ofPriority(5);
-    lockbox.add(lowPriorityTask);
-    taskStorage.insert(lowPriorityTask, 
TaskStatus.running(lowPriorityTask.getId()));
-    tryTimeChunkLock(
-        TaskLockType.EXCLUSIVE,
-        lowPriorityTask,
-        Intervals.of("2017/2018")
-    );
-
-    final Map<String, Integer> minTaskPriority = new HashMap<>();
-    minTaskPriority.put(lowPriorityTask.getDataSource(), 1);
-
-    Map<String, List<Interval>> lockedIntervals = 
lockbox.getLockedIntervals(minTaskPriority);
-    Assert.assertEquals(1, lockedIntervals.size());
-    Assert.assertEquals(
-        Collections.singletonList(
-            Intervals.of("2017/2018")),
-        lockedIntervals.get(lowPriorityTask.getDataSource())
-    );
-
-    // Revoke the lowPriorityTask
-    final Task highPriorityTask = NoopTask.ofPriority(10);
-    lockbox.add(highPriorityTask);
-    tryTimeChunkLock(
-        TaskLockType.EXCLUSIVE,
-        highPriorityTask,
-        Intervals.of("2017-05-01/2017-06-01")
-    );
-
-    // Verify the locked intervals
-    minTaskPriority.put(highPriorityTask.getDataSource(), 1);
-    lockedIntervals = lockbox.getLockedIntervals(minTaskPriority);
-    Assert.assertEquals(1, lockedIntervals.size());
-    Assert.assertEquals(
-        Collections.singletonList(
-            Intervals.of("2017-05-01/2017-06-01")),
-        lockedIntervals.get(highPriorityTask.getDataSource())
-    );
-  }
-
   @Test
   public void testFailedToReacquireTaskLock()
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index e6dee0c7e40..93f80ac9709 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -36,6 +36,9 @@ import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.TimeChunkLock;
 import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
@@ -61,6 +64,7 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.metadata.TaskLookup;
 import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
 import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
@@ -1057,31 +1061,33 @@ public class OverlordResourceTest
   @Test
   public void testGetLockedIntervals() throws Exception
   {
-    final Map<String, Integer> minTaskPriority = 
Collections.singletonMap("ds1", 0);
-    final Map<String, List<Interval>> expectedLockedIntervals = 
Collections.singletonMap(
+    final List<LockFilterPolicy> lockFilterPolicies = ImmutableList.of(
+        new LockFilterPolicy("ds1", 25, null, null)
+    );
+    final Map<String, List<Interval>> expectedIntervals = 
Collections.singletonMap(
         "ds1",
         Arrays.asList(
             Intervals.of("2012-01-01/2012-01-02"),
-            Intervals.of("2012-01-02/2012-01-03")
+            Intervals.of("2012-01-01/2012-01-02")
         )
     );
 
-    EasyMock.expect(taskLockbox.getLockedIntervals(minTaskPriority))
-            .andReturn(expectedLockedIntervals);
+    EasyMock.expect(taskLockbox.getLockedIntervals(lockFilterPolicies))
+            .andReturn(expectedIntervals);
     replayAll();
 
-    final Response response = 
overlordResource.getDatasourceLockedIntervals(minTaskPriority);
+    final Response response = 
overlordResource.getDatasourceLockedIntervals(lockFilterPolicies);
     Assert.assertEquals(200, response.getStatus());
 
     final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
-    Map<String, List<Interval>> observedLockedIntervals = jsonMapper.readValue(
+    Map<String, List<Interval>> observedIntervals = jsonMapper.readValue(
         jsonMapper.writeValueAsString(response.getEntity()),
         new TypeReference<Map<String, List<Interval>>>()
         {
         }
     );
 
-    Assert.assertEquals(expectedLockedIntervals, observedLockedIntervals);
+    Assert.assertEquals(expectedIntervals, observedIntervals);
   }
 
   @Test
@@ -1092,7 +1098,65 @@ public class OverlordResourceTest
     Response response = overlordResource.getDatasourceLockedIntervals(null);
     Assert.assertEquals(400, response.getStatus());
 
-    response = 
overlordResource.getDatasourceLockedIntervals(Collections.emptyMap());
+    response = 
overlordResource.getDatasourceLockedIntervals(Collections.emptyList());
+    Assert.assertEquals(400, response.getStatus());
+  }
+
+  @Test
+  public void testGetActiveLocks() throws Exception
+  {
+    final List<LockFilterPolicy> lockFilterPolicies = ImmutableList.of(
+        new LockFilterPolicy("ds1", 25, null, null)
+    );
+    final Map<String, List<TaskLock>> expectedLocks = Collections.singletonMap(
+        "ds1",
+        Arrays.asList(
+            new TimeChunkLock(
+                TaskLockType.REPLACE,
+                "groupId",
+                "datasource",
+                Intervals.of("2012-01-01/2012-01-02"),
+                "version",
+                25
+            ),
+            new TimeChunkLock(
+                TaskLockType.EXCLUSIVE,
+                "groupId",
+                "datasource",
+                Intervals.of("2012-01-02/2012-01-03"),
+                "version",
+                75
+                )
+        )
+    );
+
+    EasyMock.expect(taskLockbox.getActiveLocks(lockFilterPolicies))
+            .andReturn(expectedLocks);
+    replayAll();
+
+    final Response response = 
overlordResource.getActiveLocks(lockFilterPolicies);
+    Assert.assertEquals(200, response.getStatus());
+
+    final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
+    Map<String, List<TaskLock>> observedLocks = jsonMapper.readValue(
+        jsonMapper.writeValueAsString(response.getEntity()),
+        new TypeReference<TaskLockResponse>()
+        {
+        }
+    ).getDatasourceToLocks();
+
+    Assert.assertEquals(expectedLocks, observedLocks);
+  }
+
+  @Test
+  public void testGetActiveLocksWithEmptyBody()
+  {
+    replayAll();
+
+    Response response = overlordResource.getActiveLocks(null);
+    Assert.assertEquals(400, response.getStatus());
+
+    response = overlordResource.getActiveLocks(Collections.emptyList());
     Assert.assertEquals(400, response.getStatus());
   }
 
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java
index 65b8dc0b1ac..84f2dff1d79 100644
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java
+++ 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java
@@ -24,6 +24,7 @@ import com.google.inject.Inject;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
 import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
 import org.apache.druid.testing.utils.ITRetryUtil;
@@ -343,12 +344,12 @@ public class ITIndexerTest extends 
AbstractITBatchIndexTest
       submitIndexTask(INDEX_TASK, datasourceName);
 
       // Wait until it acquires a lock
-      final Map<String, Integer> minTaskPriority = 
Collections.singletonMap(datasourceName, 0);
+      final List<LockFilterPolicy> lockFilterPolicies = 
Collections.singletonList(new LockFilterPolicy(datasourceName, 0, null, null));
       final Map<String, List<Interval>> lockedIntervals = new HashMap<>();
       ITRetryUtil.retryUntilFalse(
           () -> {
             lockedIntervals.clear();
-            
lockedIntervals.putAll(indexer.getLockedIntervals(minTaskPriority));
+            
lockedIntervals.putAll(indexer.getLockedIntervals(lockFilterPolicies));
             return lockedIntervals.isEmpty();
           },
           "Verify Intervals are Locked"
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index 8167b9b64e1..f75dc6043f9 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -39,6 +39,7 @@ import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
 import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
 import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.guice.TestClient;
@@ -334,13 +335,13 @@ public class OverlordResourceTestClient
     }
   }
 
-  public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> 
minTaskPriority)
+  public Map<String, List<Interval>> getLockedIntervals(List<LockFilterPolicy> 
lockFilterPolicies)
   {
     try {
-      String jsonBody = jsonMapper.writeValueAsString(minTaskPriority);
+      String jsonBody = jsonMapper.writeValueAsString(lockFilterPolicies);
 
       StatusResponseHolder response = httpClient.go(
-          new Request(HttpMethod.POST, new URL(getIndexerURL() + 
"lockedIntervals"))
+          new Request(HttpMethod.POST, new URL(getIndexerURL() + 
"lockedIntervals/v2"))
               .setContent(
                   "application/json",
                   StringUtils.toUtf8(jsonBody)
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
index 8d980d76f12..79d63cb4550 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
@@ -19,11 +19,13 @@
 
 package org.apache.druid.tests.coordinator.duty;
 
+import com.google.common.collect.ImmutableList;
 import com.google.inject.Inject;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -53,7 +55,6 @@ import org.testng.annotations.Test;
 import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -265,13 +266,13 @@ public class ITAutoCompactionLockContentionTest extends 
AbstractKafkaIndexingSer
    */
   private void ensureLockedIntervals(Interval... intervals)
   {
-    final Map<String, Integer> minTaskPriority = 
Collections.singletonMap(fullDatasourceName, 0);
+    final LockFilterPolicy lockFilterPolicy = new 
LockFilterPolicy(fullDatasourceName, 0, null, null);
     final List<Interval> lockedIntervals = new ArrayList<>();
     ITRetryUtil.retryUntilTrue(
         () -> {
           lockedIntervals.clear();
 
-          Map<String, List<Interval>> allIntervals = 
indexer.getLockedIntervals(minTaskPriority);
+          Map<String, List<Interval>> allIntervals = 
indexer.getLockedIntervals(ImmutableList.of(lockFilterPolicy));
           if (allIntervals.containsKey(fullDatasourceName)) {
             lockedIntervals.addAll(allIntervals.get(fullDatasourceName));
           }
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
index f527135c80d..dfe308e2c1b 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
@@ -24,6 +24,7 @@ import com.google.inject.Inject;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
 import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
 import org.apache.druid.testing.guice.DruidTestModuleFactory;
@@ -342,12 +343,12 @@ public class ITIndexerTest extends 
AbstractITBatchIndexTest
       submitIndexTask(INDEX_TASK, datasourceName);
 
       // Wait until it acquires a lock
-      final Map<String, Integer> minTaskPriority = 
Collections.singletonMap(datasourceName, 0);
+      final List<LockFilterPolicy> lockFilterPolicies = 
Collections.singletonList(new LockFilterPolicy(datasourceName, 0, null, null));
       final Map<String, List<Interval>> lockedIntervals = new HashMap<>();
       ITRetryUtil.retryUntilFalse(
           () -> {
             lockedIntervals.clear();
-            
lockedIntervals.putAll(indexer.getLockedIntervals(minTaskPriority));
+            
lockedIntervals.putAll(indexer.getLockedIntervals(lockFilterPolicies));
             return lockedIntervals.isEmpty();
           },
           "Verify Intervals are Locked"
diff --git 
a/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java 
b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java
index 88ab4673aa8..019fd22807c 100644
--- a/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java
+++ b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java
@@ -21,10 +21,12 @@ package org.apache.druid.metadata;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.joda.time.Interval;
 
+import javax.annotation.Nullable;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 
 /**
  * Specifies a policy to filter active locks held by a datasource
@@ -33,17 +35,20 @@ public class LockFilterPolicy
 {
   private final String datasource;
   private final int priority;
+  private final List<Interval> intervals;
   private final Map<String, Object> context;
 
   @JsonCreator
   public LockFilterPolicy(
       @JsonProperty("datasource") String datasource,
       @JsonProperty("priority") int priority,
-      @JsonProperty("context") Map<String, Object> context
+      @JsonProperty("intervals") @Nullable List<Interval> intervals,
+      @JsonProperty("context") @Nullable Map<String, Object> context
   )
   {
     this.datasource = datasource;
     this.priority = priority;
+    this.intervals = intervals;
     this.context = context == null ? Collections.emptyMap() : context;
   }
 
@@ -65,24 +70,10 @@ public class LockFilterPolicy
     return context;
   }
 
-  @Override
-  public boolean equals(Object o)
-  {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    LockFilterPolicy that = (LockFilterPolicy) o;
-    return Objects.equals(datasource, that.datasource)
-           && priority == that.priority
-           && Objects.equals(context, that.context);
-  }
-
-  @Override
-  public int hashCode()
+  @Nullable
+  @JsonProperty
+  public List<Interval> getIntervals()
   {
-    return Objects.hash(datasource, priority, context);
+    return intervals;
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 01f3bc77e9e..d6e25fc9ac6 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -278,7 +278,8 @@ public class CompactSegments implements 
CoordinatorCustomDuty
   {
     final List<LockFilterPolicy> lockFilterPolicies = compactionConfigs
         .stream()
-        .map(config -> new LockFilterPolicy(config.getDataSource(), 
config.getTaskPriority(), config.getTaskContext()))
+        .map(config ->
+                 new LockFilterPolicy(config.getDataSource(), 
config.getTaskPriority(), null, config.getTaskContext()))
         .collect(Collectors.toList());
     final Map<String, List<Interval>> datasourceToLockedIntervals =
         new 
HashMap<>(FutureUtils.getUnchecked(overlordClient.findLockedIntervals(lockFilterPolicies),
 true));
diff --git 
a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
 
b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
index 8c3b867e368..5f583746266 100644
--- 
a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
+++ 
b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
@@ -225,7 +225,7 @@ public class OverlordClientImplTest
     final Map<String, List<Interval>> lockMap =
         ImmutableMap.of("foo", 
Collections.singletonList(Intervals.of("2000/2001")));
     final List<LockFilterPolicy> requests = ImmutableList.of(
-        new LockFilterPolicy("foo", 3, null)
+        new LockFilterPolicy("foo", 3, null, null)
     );
 
     serviceClient.expectAndRespond(
@@ -246,7 +246,7 @@ public class OverlordClientImplTest
   public void test_findLockedIntervals_nullReturn() throws Exception
   {
     final List<LockFilterPolicy> requests = ImmutableList.of(
-        new LockFilterPolicy("foo", 3, null)
+        new LockFilterPolicy("foo", 3, null, null)
     );
 
     serviceClient.expectAndRespond(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org


Reply via email to