This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e55eb5c098 Add taskType dimension to task count metrics (#18032)
8e55eb5c098 is described below

commit 8e55eb5c098080a08e55e62714fa81834e71ff5d
Author: jtuglu-netflix <jtu...@netflix.com>
AuthorDate: Mon May 26 00:31:28 2025 -0700

    Add taskType dimension to task count metrics (#18032)
    
    Changes:
    - Update methods in `TaskCountStatsProvider` to return map from `RowKey` to 
count
    - Include dimension taskType for all task count metrics (pending, running, 
waiting, successful, failed)
---
 .../apache/druid/indexing/overlord/TaskMaster.java | 11 ++--
 .../apache/druid/indexing/overlord/TaskQueue.java  | 60 +++++++++++++---------
 .../druid/server/coordinator/stats/Dimension.java  |  1 +
 .../server/metrics/TaskCountStatsMonitor.java      |  6 +--
 .../server/metrics/TaskCountStatsProvider.java     | 21 ++++----
 .../server/metrics/TaskCountStatsMonitorTest.java  | 47 +++++++++++------
 6 files changed, 87 insertions(+), 59 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
index 105f908869a..db17c0ed866 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
@@ -27,6 +27,7 @@ import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.RowKey;
 import org.apache.druid.server.metrics.TaskCountStatsProvider;
 import org.apache.druid.server.metrics.TaskSlotCountStatsProvider;
 
@@ -155,7 +156,7 @@ public class TaskMaster implements TaskCountStatsProvider, 
TaskSlotCountStatsPro
   }
 
   @Override
-  public Map<String, Long> getSuccessfulTaskCount()
+  public Map<RowKey, Long> getSuccessfulTaskCount()
   {
     Optional<TaskQueue> taskQueue = getTaskQueue();
     if (taskQueue.isPresent()) {
@@ -166,7 +167,7 @@ public class TaskMaster implements TaskCountStatsProvider, 
TaskSlotCountStatsPro
   }
 
   @Override
-  public Map<String, Long> getFailedTaskCount()
+  public Map<RowKey, Long> getFailedTaskCount()
   {
     Optional<TaskQueue> taskQueue = getTaskQueue();
     if (taskQueue.isPresent()) {
@@ -177,7 +178,7 @@ public class TaskMaster implements TaskCountStatsProvider, 
TaskSlotCountStatsPro
   }
 
   @Override
-  public Map<String, Long> getRunningTaskCount()
+  public Map<RowKey, Long> getRunningTaskCount()
   {
     Optional<TaskQueue> taskQueue = getTaskQueue();
     if (taskQueue.isPresent()) {
@@ -188,7 +189,7 @@ public class TaskMaster implements TaskCountStatsProvider, 
TaskSlotCountStatsPro
   }
 
   @Override
-  public Map<String, Long> getPendingTaskCount()
+  public Map<RowKey, Long> getPendingTaskCount()
   {
     Optional<TaskQueue> taskQueue = getTaskQueue();
     if (taskQueue.isPresent()) {
@@ -199,7 +200,7 @@ public class TaskMaster implements TaskCountStatsProvider, 
TaskSlotCountStatsPro
   }
 
   @Override
-  public Map<String, Long> getWaitingTaskCount()
+  public Map<RowKey, Long> getWaitingTaskCount()
   {
     Optional<TaskQueue> taskQueue = getTaskQueue();
     if (taskQueue.isPresent()) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 50eb61d2866..051817416ac 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -64,6 +64,8 @@ import 
org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.metadata.PasswordProvider;
 import org.apache.druid.metadata.PasswordProviderRedactionMixIn;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
 import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.DateTime;
 
@@ -153,12 +155,12 @@ public class TaskQueue
 
   private static final EmittingLogger log = new 
EmittingLogger(TaskQueue.class);
 
-  private final ConcurrentHashMap<String, AtomicLong> totalSuccessfulTaskCount 
= new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<String, AtomicLong> totalFailedTaskCount = 
new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<RowKey, AtomicLong> totalSuccessfulTaskCount 
= new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<RowKey, AtomicLong> totalFailedTaskCount = 
new ConcurrentHashMap<>();
   @GuardedBy("totalSuccessfulTaskCount")
-  private Map<String, Long> prevTotalSuccessfulTaskCount = new HashMap<>();
+  private Map<RowKey, Long> prevTotalSuccessfulTaskCount = new HashMap<>();
   @GuardedBy("totalFailedTaskCount")
-  private Map<String, Long> prevTotalFailedTaskCount = new HashMap<>();
+  private Map<RowKey, Long> prevTotalFailedTaskCount = new HashMap<>();
 
   private final AtomicInteger statusUpdatesInQueue = new AtomicInteger();
   private final AtomicInteger handledStatusUpdates = new AtomicInteger();
@@ -807,9 +809,9 @@ public class TaskQueue
                 );
 
                 if (status.isSuccess()) {
-                  Counters.incrementAndGetLong(totalSuccessfulTaskCount, 
task.getDataSource());
+                  Counters.incrementAndGetLong(totalSuccessfulTaskCount, 
getMetricKey(task));
                 } else {
-                  Counters.incrementAndGetLong(totalFailedTaskCount, 
task.getDataSource());
+                  Counters.incrementAndGetLong(totalFailedTaskCount, 
getMetricKey(task));
                 }
               }
             }
@@ -892,9 +894,9 @@ public class TaskQueue
     return rv;
   }
 
-  private Map<String, Long> getDeltaValues(Map<String, Long> total, 
Map<String, Long> prev)
+  private Map<RowKey, Long> getDeltaValues(Map<RowKey, Long> total, 
Map<RowKey, Long> prev)
   {
-    final Map<String, Long> deltaValues = new HashMap<>();
+    final Map<RowKey, Long> deltaValues = new HashMap<>();
     total.forEach(
         (dataSource, totalCount) -> deltaValues.put(
             dataSource,
@@ -904,58 +906,59 @@ public class TaskQueue
     return deltaValues;
   }
 
-  public Map<String, Long> getSuccessfulTaskCount()
+  public Map<RowKey, Long> getSuccessfulTaskCount()
   {
-    Map<String, Long> total = 
CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get);
+    final Map<RowKey, Long> total = 
CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get);
     synchronized (totalSuccessfulTaskCount) {
-      Map<String, Long> delta = getDeltaValues(total, 
prevTotalSuccessfulTaskCount);
+      Map<RowKey, Long> delta = getDeltaValues(total, 
prevTotalSuccessfulTaskCount);
       prevTotalSuccessfulTaskCount = total;
       return delta;
     }
   }
 
-  public Map<String, Long> getFailedTaskCount()
+  public Map<RowKey, Long> getFailedTaskCount()
   {
-    Map<String, Long> total = CollectionUtils.mapValues(totalFailedTaskCount, 
AtomicLong::get);
+    final Map<RowKey, Long> total = 
CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get);
     synchronized (totalFailedTaskCount) {
-      Map<String, Long> delta = getDeltaValues(total, 
prevTotalFailedTaskCount);
+      Map<RowKey, Long> delta = getDeltaValues(total, 
prevTotalFailedTaskCount);
       prevTotalFailedTaskCount = total;
       return delta;
     }
   }
 
-  private Map<String, String> getCurrentTaskDatasources()
+  private Map<String, RowKey> getCurrentTaskDatasources()
   {
     return activeTasks.values().stream()
+                      .filter(entry -> !entry.isComplete)
                       .map(entry -> entry.task)
-                      .collect(Collectors.toMap(Task::getId, 
Task::getDataSource));
+                      .collect(Collectors.toMap(Task::getId, 
TaskQueue::getMetricKey));
   }
 
-  public Map<String, Long> getRunningTaskCount()
+  public Map<RowKey, Long> getRunningTaskCount()
   {
-    Map<String, String> taskDatasources = getCurrentTaskDatasources();
+    final Map<String, RowKey> taskDatasources = getCurrentTaskDatasources();
     return taskRunner.getRunningTasks()
                      .stream()
                      .collect(Collectors.toMap(
-                         e -> taskDatasources.getOrDefault(e.getTaskId(), ""),
+                         e -> taskDatasources.getOrDefault(e.getTaskId(), 
RowKey.empty()),
                          e -> 1L,
                          Long::sum
                      ));
   }
 
-  public Map<String, Long> getPendingTaskCount()
+  public Map<RowKey, Long> getPendingTaskCount()
   {
-    Map<String, String> taskDatasources = getCurrentTaskDatasources();
+    final Map<String, RowKey> taskDatasources = getCurrentTaskDatasources();
     return taskRunner.getPendingTasks()
                      .stream()
                      .collect(Collectors.toMap(
-                         e -> taskDatasources.getOrDefault(e.getTaskId(), ""),
+                         e -> taskDatasources.getOrDefault(e.getTaskId(), 
RowKey.empty()),
                          e -> 1L,
                          Long::sum
                      ));
   }
 
-  public Map<String, Long> getWaitingTaskCount()
+  public Map<RowKey, Long> getWaitingTaskCount()
   {
     Set<String> runnerKnownTaskIds = taskRunner.getKnownTasks()
                                                .stream()
@@ -966,7 +969,7 @@ public class TaskQueue
                       .filter(entry -> !entry.isComplete)
                       .map(entry -> entry.task)
                       .filter(task -> 
!runnerKnownTaskIds.contains(task.getId()))
-                      .collect(Collectors.toMap(Task::getDataSource, task -> 
1L, Long::sum));
+                      .collect(Collectors.toMap(TaskQueue::getMetricKey, task 
-> 1L, Long::sum));
   }
 
   /**
@@ -1136,4 +1139,13 @@ public class TaskQueue
       this.lastUpdatedTime = DateTimes.nowUtc();
     }
   }
+
+  private static RowKey getMetricKey(final Task task)
+  {
+    if (task == null) {
+      return RowKey.empty();
+    }
+    return RowKey.with(Dimension.DATASOURCE, task.getDataSource())
+                 .and(Dimension.TASK_TYPE, task.getType());
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/stats/Dimension.java 
b/server/src/main/java/org/apache/druid/server/coordinator/stats/Dimension.java
index 61bb6054f92..7001f7fa455 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/stats/Dimension.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/stats/Dimension.java
@@ -25,6 +25,7 @@ package org.apache.druid.server.coordinator.stats;
 public enum Dimension
 {
   TIER("tier"),
+  TASK_TYPE("taskType"),
   DATASOURCE("dataSource"),
   DUTY("duty"),
   DUTY_GROUP("dutyGroup"),
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java
 
b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java
index 3e120e0ffd4..aca9fec3e4e 100644
--- 
a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java
+++ 
b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java
@@ -26,6 +26,7 @@ import org.apache.druid.java.util.metrics.AbstractMonitor;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
 import org.apache.druid.server.coordinator.stats.CoordinatorStat;
 import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
 
 import java.util.Map;
 
@@ -61,12 +62,12 @@ public class TaskCountStatsMonitor extends AbstractMonitor
     return true;
   }
 
-  private void emit(ServiceEmitter emitter, String key, Map<String, Long> 
counts)
+  private void emit(ServiceEmitter emitter, String key, Map<RowKey, Long> 
counts)
   {
     final ServiceMetricEvent.Builder builder = new 
ServiceMetricEvent.Builder();
     if (counts != null) {
       counts.forEach((k, v) -> {
-        builder.setDimension("dataSource", k);
+        k.getValues().forEach((dim, value) -> 
builder.setDimension(dim.reportedName(), value));
         emitter.emit(builder.setMetric(key, v));
       });
     }
@@ -84,5 +85,4 @@ public class TaskCountStatsMonitor extends AbstractMonitor
     );
     emitter.emit(eventBuilder.setMetric(stat.getMetricName(), value));
   }
-
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
 
b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
index 1b96047fbdd..f94deb562bf 100644
--- 
a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
+++ 
b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
@@ -20,40 +20,41 @@
 package org.apache.druid.server.metrics;
 
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.RowKey;
 
 import java.util.Map;
 
 public interface TaskCountStatsProvider
 {
   /**
-   * Return the number of successful tasks for each datasource during emission 
period.
+   * Return the number of successful tasks for each datasource and task type 
during emission period.
    */
   @Deprecated
-  Map<String, Long> getSuccessfulTaskCount();
+  Map<RowKey, Long> getSuccessfulTaskCount();
 
   /**
-   * Return the number of failed tasks for each datasource during emission 
period.
+   * Return the number of failed tasks for each datasource and task type 
during emission period.
    */
   @Deprecated
-  Map<String, Long> getFailedTaskCount();
+  Map<RowKey, Long> getFailedTaskCount();
 
   /**
-   * Return the number of current running tasks for each datasource.
+   * Return the number of current running tasks for each datasource and task 
type.
    */
   @Deprecated
-  Map<String, Long> getRunningTaskCount();
+  Map<RowKey, Long> getRunningTaskCount();
 
   /**
-   * Return the number of current pending tasks for each datasource.
+   * Return the number of current pending tasks for each datasource and task 
type.
    */
   @Deprecated
-  Map<String, Long> getPendingTaskCount();
+  Map<RowKey, Long> getPendingTaskCount();
 
   /**
-   * Return the number of current waiting tasks for each datasource.
+   * Return the number of current waiting tasks for each datasource and task 
type.
    */
   @Deprecated
-  Map<String, Long> getWaitingTaskCount();
+  Map<RowKey, Long> getWaitingTaskCount();
 
   /**
    * Collects all task level stats. This method deprecates the other task stats
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
 
b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
index 46479484f87..93687bc1be8 100644
--- 
a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
@@ -23,6 +23,8 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
 import org.apache.druid.server.coordinator.stats.CoordinatorStat;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -32,6 +34,12 @@ import java.util.Map;
 public class TaskCountStatsMonitorTest
 {
   private TaskCountStatsProvider statsProvider;
+  private static final RowKey TASK_METRIC_KEY1 = 
RowKey.with(Dimension.DATASOURCE, "d1")
+                                                       
.with(Dimension.TASK_TYPE, "index")
+                                                       .build();
+  private static final RowKey TASK_METRIC_KEY2 = 
RowKey.with(Dimension.DATASOURCE, "d1")
+                                                       
.with(Dimension.TASK_TYPE, "kill")
+                                                       .build();
 
   @Before
   public void setUp()
@@ -39,33 +47,33 @@ public class TaskCountStatsMonitorTest
     statsProvider = new TaskCountStatsProvider()
     {
       @Override
-      public Map<String, Long> getSuccessfulTaskCount()
+      public Map<RowKey, Long> getSuccessfulTaskCount()
       {
-        return ImmutableMap.of("d1", 1L);
+        return ImmutableMap.of(TASK_METRIC_KEY1, 1L);
       }
 
       @Override
-      public Map<String, Long> getFailedTaskCount()
+      public Map<RowKey, Long> getFailedTaskCount()
       {
-        return ImmutableMap.of("d1", 1L);
+        return ImmutableMap.of(TASK_METRIC_KEY1, 1L, TASK_METRIC_KEY2, 1L);
       }
 
       @Override
-      public Map<String, Long> getRunningTaskCount()
+      public Map<RowKey, Long> getRunningTaskCount()
       {
-        return ImmutableMap.of("d1", 1L);
+        return ImmutableMap.of(TASK_METRIC_KEY1, 1L);
       }
 
       @Override
-      public Map<String, Long> getPendingTaskCount()
+      public Map<RowKey, Long> getPendingTaskCount()
       {
-        return ImmutableMap.of("d1", 1L);
+        return ImmutableMap.of(TASK_METRIC_KEY1, 2L);
       }
 
       @Override
-      public Map<String, Long> getWaitingTaskCount()
+      public Map<RowKey, Long> getWaitingTaskCount()
       {
-        return ImmutableMap.of("d1", 1L);
+        return ImmutableMap.of(TASK_METRIC_KEY1, 2L, TASK_METRIC_KEY2, 1L);
       }
 
       @Override
@@ -85,14 +93,19 @@ public class TaskCountStatsMonitorTest
     final TaskCountStatsMonitor monitor = new 
TaskCountStatsMonitor(statsProvider);
     final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     monitor.doMonitor(emitter);
-    Assert.assertEquals(7, emitter.getEvents().size());
-    emitter.verifyValue("task/success/count", 1L);
-    emitter.verifyValue("task/failed/count", 1L);
-    emitter.verifyValue("task/running/count", 1L);
-    emitter.verifyValue("task/pending/count", 1L);
-    emitter.verifyValue("task/waiting/count", 1L);
+
+    Assert.assertEquals(9, emitter.getEvents().size());
+
+    emitter.verifyValue("task/success/count", Map.of("dataSource", "d1", 
"taskType", "index"), 1L);
+    emitter.verifyValue("task/failed/count", Map.of("dataSource", "d1", 
"taskType", "index"), 1L);
+    emitter.verifyValue("task/failed/count", Map.of("dataSource", "d1", 
"taskType", "kill"), 1L);
+    emitter.verifyValue("task/running/count", Map.of("dataSource", "d1", 
"taskType", "index"), 1L);
+    emitter.verifyValue("task/pending/count", Map.of("dataSource", "d1", 
"taskType", "index"), 2L);
+    emitter.verifyValue("task/waiting/count", Map.of("dataSource", "d1", 
"taskType", "index"), 2L);
+    emitter.verifyValue("task/waiting/count", Map.of("dataSource", "d1", 
"taskType", "kill"), 1L);
+
     emitter.verifyValue(Stat.INFO_1.getMetricName(), 10L);
-    emitter.verifyValue(Stat.DEBUG_1.getMetricName(), ImmutableMap.of("tier", 
"hot", "dataSource", "wiki"), 20L);
+    emitter.verifyValue(Stat.DEBUG_1.getMetricName(), Map.of("tier", "hot", 
"dataSource", "wiki"), 20L);
   }
 
   private static class Stat


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to