This is an automated email from the ASF dual-hosted git repository. kfaraz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push: new 8e55eb5c098 Add taskType dimension to task count metrics (#18032) 8e55eb5c098 is described below commit 8e55eb5c098080a08e55e62714fa81834e71ff5d Author: jtuglu-netflix <jtu...@netflix.com> AuthorDate: Mon May 26 00:31:28 2025 -0700 Add taskType dimension to task count metrics (#18032) Changes: - Update methods in `TaskCountStatsProvider` to return map from `RowKey` to count - Include dimension taskType for all task count metrics (pending, running, waiting, successful, failed) --- .../apache/druid/indexing/overlord/TaskMaster.java | 11 ++-- .../apache/druid/indexing/overlord/TaskQueue.java | 60 +++++++++++++--------- .../druid/server/coordinator/stats/Dimension.java | 1 + .../server/metrics/TaskCountStatsMonitor.java | 6 +-- .../server/metrics/TaskCountStatsProvider.java | 21 ++++---- .../server/metrics/TaskCountStatsMonitorTest.java | 47 +++++++++++------ 6 files changed, 87 insertions(+), 59 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java index 105f908869a..db17c0ed866 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java @@ -27,6 +27,7 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.metrics.TaskCountStatsProvider; import org.apache.druid.server.metrics.TaskSlotCountStatsProvider; @@ -155,7 +156,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro } @Override - public Map<String, Long> getSuccessfulTaskCount() + public Map<RowKey, Long> getSuccessfulTaskCount() { Optional<TaskQueue> taskQueue = getTaskQueue(); if (taskQueue.isPresent()) { @@ -166,7 +167,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro } @Override - public Map<String, Long> getFailedTaskCount() + public Map<RowKey, Long> getFailedTaskCount() { Optional<TaskQueue> taskQueue = getTaskQueue(); if (taskQueue.isPresent()) { @@ -177,7 +178,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro } @Override - public Map<String, Long> getRunningTaskCount() + public Map<RowKey, Long> getRunningTaskCount() { Optional<TaskQueue> taskQueue = getTaskQueue(); if (taskQueue.isPresent()) { @@ -188,7 +189,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro } @Override - public Map<String, Long> getPendingTaskCount() + public Map<RowKey, Long> getPendingTaskCount() { Optional<TaskQueue> taskQueue = getTaskQueue(); if (taskQueue.isPresent()) { @@ -199,7 +200,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro } @Override - public Map<String, Long> getWaitingTaskCount() + public Map<RowKey, Long> getWaitingTaskCount() { Optional<TaskQueue> taskQueue = getTaskQueue(); if (taskQueue.isPresent()) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 50eb61d2866..051817416ac 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -64,6 +64,8 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.PasswordProvider; import org.apache.druid.metadata.PasswordProviderRedactionMixIn; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; @@ -153,12 +155,12 @@ public class TaskQueue private static final EmittingLogger log = new EmittingLogger(TaskQueue.class); - private final ConcurrentHashMap<String, AtomicLong> totalSuccessfulTaskCount = new ConcurrentHashMap<>(); - private final ConcurrentHashMap<String, AtomicLong> totalFailedTaskCount = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<RowKey, AtomicLong> totalSuccessfulTaskCount = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<RowKey, AtomicLong> totalFailedTaskCount = new ConcurrentHashMap<>(); @GuardedBy("totalSuccessfulTaskCount") - private Map<String, Long> prevTotalSuccessfulTaskCount = new HashMap<>(); + private Map<RowKey, Long> prevTotalSuccessfulTaskCount = new HashMap<>(); @GuardedBy("totalFailedTaskCount") - private Map<String, Long> prevTotalFailedTaskCount = new HashMap<>(); + private Map<RowKey, Long> prevTotalFailedTaskCount = new HashMap<>(); private final AtomicInteger statusUpdatesInQueue = new AtomicInteger(); private final AtomicInteger handledStatusUpdates = new AtomicInteger(); @@ -807,9 +809,9 @@ public class TaskQueue ); if (status.isSuccess()) { - Counters.incrementAndGetLong(totalSuccessfulTaskCount, task.getDataSource()); + Counters.incrementAndGetLong(totalSuccessfulTaskCount, getMetricKey(task)); } else { - Counters.incrementAndGetLong(totalFailedTaskCount, task.getDataSource()); + Counters.incrementAndGetLong(totalFailedTaskCount, getMetricKey(task)); } } } @@ -892,9 +894,9 @@ public class TaskQueue return rv; } - private Map<String, Long> getDeltaValues(Map<String, Long> total, Map<String, Long> prev) + private Map<RowKey, Long> getDeltaValues(Map<RowKey, Long> total, Map<RowKey, Long> prev) { - final Map<String, Long> deltaValues = new HashMap<>(); + final Map<RowKey, Long> deltaValues = new HashMap<>(); total.forEach( (dataSource, totalCount) -> deltaValues.put( dataSource, @@ -904,58 +906,59 @@ public class TaskQueue return deltaValues; } - public Map<String, Long> getSuccessfulTaskCount() + public Map<RowKey, Long> getSuccessfulTaskCount() { - Map<String, Long> total = CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get); + final Map<RowKey, Long> total = CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get); synchronized (totalSuccessfulTaskCount) { - Map<String, Long> delta = getDeltaValues(total, prevTotalSuccessfulTaskCount); + Map<RowKey, Long> delta = getDeltaValues(total, prevTotalSuccessfulTaskCount); prevTotalSuccessfulTaskCount = total; return delta; } } - public Map<String, Long> getFailedTaskCount() + public Map<RowKey, Long> getFailedTaskCount() { - Map<String, Long> total = CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get); + final Map<RowKey, Long> total = CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get); synchronized (totalFailedTaskCount) { - Map<String, Long> delta = getDeltaValues(total, prevTotalFailedTaskCount); + Map<RowKey, Long> delta = getDeltaValues(total, prevTotalFailedTaskCount); prevTotalFailedTaskCount = total; return delta; } } - private Map<String, String> getCurrentTaskDatasources() + private Map<String, RowKey> getCurrentTaskDatasources() { return activeTasks.values().stream() + .filter(entry -> !entry.isComplete) .map(entry -> entry.task) - .collect(Collectors.toMap(Task::getId, Task::getDataSource)); + .collect(Collectors.toMap(Task::getId, TaskQueue::getMetricKey)); } - public Map<String, Long> getRunningTaskCount() + public Map<RowKey, Long> getRunningTaskCount() { - Map<String, String> taskDatasources = getCurrentTaskDatasources(); + final Map<String, RowKey> taskDatasources = getCurrentTaskDatasources(); return taskRunner.getRunningTasks() .stream() .collect(Collectors.toMap( - e -> taskDatasources.getOrDefault(e.getTaskId(), ""), + e -> taskDatasources.getOrDefault(e.getTaskId(), RowKey.empty()), e -> 1L, Long::sum )); } - public Map<String, Long> getPendingTaskCount() + public Map<RowKey, Long> getPendingTaskCount() { - Map<String, String> taskDatasources = getCurrentTaskDatasources(); + final Map<String, RowKey> taskDatasources = getCurrentTaskDatasources(); return taskRunner.getPendingTasks() .stream() .collect(Collectors.toMap( - e -> taskDatasources.getOrDefault(e.getTaskId(), ""), + e -> taskDatasources.getOrDefault(e.getTaskId(), RowKey.empty()), e -> 1L, Long::sum )); } - public Map<String, Long> getWaitingTaskCount() + public Map<RowKey, Long> getWaitingTaskCount() { Set<String> runnerKnownTaskIds = taskRunner.getKnownTasks() .stream() @@ -966,7 +969,7 @@ public class TaskQueue .filter(entry -> !entry.isComplete) .map(entry -> entry.task) .filter(task -> !runnerKnownTaskIds.contains(task.getId())) - .collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum)); + .collect(Collectors.toMap(TaskQueue::getMetricKey, task -> 1L, Long::sum)); } /** @@ -1136,4 +1139,13 @@ public class TaskQueue this.lastUpdatedTime = DateTimes.nowUtc(); } } + + private static RowKey getMetricKey(final Task task) + { + if (task == null) { + return RowKey.empty(); + } + return RowKey.with(Dimension.DATASOURCE, task.getDataSource()) + .and(Dimension.TASK_TYPE, task.getType()); + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/Dimension.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/Dimension.java index 61bb6054f92..7001f7fa455 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Dimension.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Dimension.java @@ -25,6 +25,7 @@ package org.apache.druid.server.coordinator.stats; public enum Dimension { TIER("tier"), + TASK_TYPE("taskType"), DATASOURCE("dataSource"), DUTY("duty"), DUTY_GROUP("dutyGroup"), diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java index 3e120e0ffd4..aca9fec3e4e 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.metrics.AbstractMonitor; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.CoordinatorStat; import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; import java.util.Map; @@ -61,12 +62,12 @@ public class TaskCountStatsMonitor extends AbstractMonitor return true; } - private void emit(ServiceEmitter emitter, String key, Map<String, Long> counts) + private void emit(ServiceEmitter emitter, String key, Map<RowKey, Long> counts) { final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); if (counts != null) { counts.forEach((k, v) -> { - builder.setDimension("dataSource", k); + k.getValues().forEach((dim, value) -> builder.setDimension(dim.reportedName(), value)); emitter.emit(builder.setMetric(key, v)); }); } @@ -84,5 +85,4 @@ public class TaskCountStatsMonitor extends AbstractMonitor ); emitter.emit(eventBuilder.setMetric(stat.getMetricName(), value)); } - } diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java index 1b96047fbdd..f94deb562bf 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java +++ b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java @@ -20,40 +20,41 @@ package org.apache.druid.server.metrics; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.RowKey; import java.util.Map; public interface TaskCountStatsProvider { /** - * Return the number of successful tasks for each datasource during emission period. + * Return the number of successful tasks for each datasource and task type during emission period. */ @Deprecated - Map<String, Long> getSuccessfulTaskCount(); + Map<RowKey, Long> getSuccessfulTaskCount(); /** - * Return the number of failed tasks for each datasource during emission period. + * Return the number of failed tasks for each datasource and task type during emission period. */ @Deprecated - Map<String, Long> getFailedTaskCount(); + Map<RowKey, Long> getFailedTaskCount(); /** - * Return the number of current running tasks for each datasource. + * Return the number of current running tasks for each datasource and task type. */ @Deprecated - Map<String, Long> getRunningTaskCount(); + Map<RowKey, Long> getRunningTaskCount(); /** - * Return the number of current pending tasks for each datasource. + * Return the number of current pending tasks for each datasource and task type. */ @Deprecated - Map<String, Long> getPendingTaskCount(); + Map<RowKey, Long> getPendingTaskCount(); /** - * Return the number of current waiting tasks for each datasource. + * Return the number of current waiting tasks for each datasource and task type. */ @Deprecated - Map<String, Long> getWaitingTaskCount(); + Map<RowKey, Long> getWaitingTaskCount(); /** * Collects all task level stats. This method deprecates the other task stats diff --git a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java index 46479484f87..93687bc1be8 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java @@ -23,6 +23,8 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.CoordinatorStat; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -32,6 +34,12 @@ import java.util.Map; public class TaskCountStatsMonitorTest { private TaskCountStatsProvider statsProvider; + private static final RowKey TASK_METRIC_KEY1 = RowKey.with(Dimension.DATASOURCE, "d1") + .with(Dimension.TASK_TYPE, "index") + .build(); + private static final RowKey TASK_METRIC_KEY2 = RowKey.with(Dimension.DATASOURCE, "d1") + .with(Dimension.TASK_TYPE, "kill") + .build(); @Before public void setUp() @@ -39,33 +47,33 @@ public class TaskCountStatsMonitorTest statsProvider = new TaskCountStatsProvider() { @Override - public Map<String, Long> getSuccessfulTaskCount() + public Map<RowKey, Long> getSuccessfulTaskCount() { - return ImmutableMap.of("d1", 1L); + return ImmutableMap.of(TASK_METRIC_KEY1, 1L); } @Override - public Map<String, Long> getFailedTaskCount() + public Map<RowKey, Long> getFailedTaskCount() { - return ImmutableMap.of("d1", 1L); + return ImmutableMap.of(TASK_METRIC_KEY1, 1L, TASK_METRIC_KEY2, 1L); } @Override - public Map<String, Long> getRunningTaskCount() + public Map<RowKey, Long> getRunningTaskCount() { - return ImmutableMap.of("d1", 1L); + return ImmutableMap.of(TASK_METRIC_KEY1, 1L); } @Override - public Map<String, Long> getPendingTaskCount() + public Map<RowKey, Long> getPendingTaskCount() { - return ImmutableMap.of("d1", 1L); + return ImmutableMap.of(TASK_METRIC_KEY1, 2L); } @Override - public Map<String, Long> getWaitingTaskCount() + public Map<RowKey, Long> getWaitingTaskCount() { - return ImmutableMap.of("d1", 1L); + return ImmutableMap.of(TASK_METRIC_KEY1, 2L, TASK_METRIC_KEY2, 1L); } @Override @@ -85,14 +93,19 @@ public class TaskCountStatsMonitorTest final TaskCountStatsMonitor monitor = new TaskCountStatsMonitor(statsProvider); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); - Assert.assertEquals(7, emitter.getEvents().size()); - emitter.verifyValue("task/success/count", 1L); - emitter.verifyValue("task/failed/count", 1L); - emitter.verifyValue("task/running/count", 1L); - emitter.verifyValue("task/pending/count", 1L); - emitter.verifyValue("task/waiting/count", 1L); + + Assert.assertEquals(9, emitter.getEvents().size()); + + emitter.verifyValue("task/success/count", Map.of("dataSource", "d1", "taskType", "index"), 1L); + emitter.verifyValue("task/failed/count", Map.of("dataSource", "d1", "taskType", "index"), 1L); + emitter.verifyValue("task/failed/count", Map.of("dataSource", "d1", "taskType", "kill"), 1L); + emitter.verifyValue("task/running/count", Map.of("dataSource", "d1", "taskType", "index"), 1L); + emitter.verifyValue("task/pending/count", Map.of("dataSource", "d1", "taskType", "index"), 2L); + emitter.verifyValue("task/waiting/count", Map.of("dataSource", "d1", "taskType", "index"), 2L); + emitter.verifyValue("task/waiting/count", Map.of("dataSource", "d1", "taskType", "kill"), 1L); + emitter.verifyValue(Stat.INFO_1.getMetricName(), 10L); - emitter.verifyValue(Stat.DEBUG_1.getMetricName(), ImmutableMap.of("tier", "hot", "dataSource", "wiki"), 20L); + emitter.verifyValue(Stat.DEBUG_1.getMetricName(), Map.of("tier", "hot", "dataSource", "wiki"), 20L); } private static class Stat --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org