This is an automated email from the ASF dual-hosted git repository. jonwei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push: new 564d6defd4 Worker level task metrics (#12446) 564d6defd4 is described below commit 564d6defd47749d55dd07e5549d7264cbc1c4019 Author: zachjsh <zach...@gmail.com> AuthorDate: Tue Apr 26 12:44:44 2022 -0400 Worker level task metrics (#12446) * * fix metric name inconsistency * * add task slot metrics for middle managers * * add new WorkerTaskCountStatsMonitor to report task count metrics from worker * * more stuff * * remove unused variable * * more stuff * * add javadocs * * fix checkstyle * * fix hadoop test failure * * cleanup * * add more code coverage in tests * * fix test failure * * add docs * * increase code coverage * * fix spelling * * fix failing tests * * remove dead code * * fix spelling --- docs/configuration/index.md | 2 + docs/operations/metrics.md | 5 + .../main/resources/defaultMetricDimensions.json | 8 +- .../druid/indexing/overlord/ForkingTaskRunner.java | 73 ++++++- .../indexing/overlord/ForkingTaskRunnerTest.java | 16 ++ .../metrics/WorkerTaskCountStatsMonitor.java | 79 ++++++++ .../metrics/WorkerTaskCountStatsProvider.java | 63 ++++++ .../metrics/WorkerTaskCountStatsMonitorTest.java | 216 +++++++++++++++++++++ .../org/apache/druid/cli/CliMiddleManager.java | 2 + website/.spelling | 1 + 10 files changed, 454 insertions(+), 11 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index f4c268ae9d..e598b36ea0 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -383,6 +383,8 @@ Metric monitoring is an essential part of Druid operations. The following monit |`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.| |`org.apache.druid.server.emitter.HttpEmittingMonitor`|Reports internal metrics of `http` or `parametrized` emitter (see below). Must not be used with another emitter type. See the description of the metrics here: https://github.com/apache/druid/pull/4973.| |`org.apache.druid.server.metrics.TaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting and also the number of successful/failed tasks per emission period.| +|`org.apache.druid.server.metrics.TaskSlotCountStatsMonitor`|Reports metrics about task slot usage per emission period.| +|`org.apache.druid.server.metrics.WorkerTaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting, the number of successful/failed tasks, and metrics about task slot usage for the reporting worker, per emission period. Only supported by middleManager node types.| For example, you might configure monitors on all processes for system and JVM information within `common.runtime.properties` as follows: diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index a5894ed005..8124acd1c9 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -214,6 +214,11 @@ Note: If the JVM does not support CPU time measurement for the current thread, i |`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.| |`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.| |`task/segmentAvailability/wait/time`|The amount of milliseconds a batch indexing task waited for newly created segments to become available for querying.|dataSource, taskType, taskId, segmentAvailabilityConfirmed|Varies.| +|`worker/task/failed/count`|Number of failed tasks run on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included, and is only supported for middleManager nodes.|category, version.|Varies.| +|`worker/task/success/count`|Number of successful tasks run on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included, and is only supported for middleManager nodes.|category, version.|Varies.| +|`worker/taskSlot/idle/count`|Number of idle task slots on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included, and is only supported for middleManager nodes.|category, version.|Varies.| +|`worker/taskSlot/total/count`|Number of total task slots on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included.|category, version.|Varies.| +|`worker/taskSlot/used/count`|Number of busy task slots on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included.|category, version.|Varies.| ## Shuffle metrics (Native parallel task) diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json index 6c0fa75d1a..a1f5a85479 100644 --- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json @@ -63,9 +63,15 @@ "task/pending/count" : { "dimensions" : ["dataSource"], "type" : "gauge" }, "task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "gauge" }, + "worker/task/failed/count" : { "dimensions" : ["category", "version"], "type" : "count" }, + "worker/task/success/count" : { "dimensions" : ["category", "version"], "type" : "count" }, + "worker/taskSlot/idle/count" : { "dimensions" : ["category", "version"], "type" : "gauge" }, + "worker/taskSlot/total/count" : { "dimensions" : ["category", "version"], "type" : "gauge" }, + "worker/taskSlot/used/count" : { "dimensions" : ["category", "version"], "type" : "gauge" }, + "taskSlot/total/count" : { "dimensions" : ["category"], "type" : "gauge" }, "taskSlot/idle/count" : { "dimensions" : ["category"], "type" : "gauge" }, - "taskSlot/busy/count" : { "dimensions" : ["category"], "type" : "gauge" }, + "taskSlot/used/count" : { "dimensions" : ["category"], "type" : "gauge" }, "taskSlot/lazy/count" : { "dimensions" : ["category"], "type" : "gauge" }, "taskSlot/blacklisted/count" : { "dimensions" : ["category"], "type" : "gauge" }, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index c35e10cb25..77c515502d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -63,6 +63,7 @@ import org.apache.druid.query.DruidMetrics; import org.apache.druid.server.DruidNode; import org.apache.druid.server.log.StartupLoggingConfig; import org.apache.druid.server.metrics.MonitorsConfig; +import org.apache.druid.server.metrics.WorkerTaskCountStatsProvider; import org.apache.druid.tasklogs.TaskLogPusher; import org.apache.druid.tasklogs.TaskLogStreamer; import org.joda.time.DateTime; @@ -83,13 +84,14 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * Runs tasks in separate processes using the "internal peon" verb. */ public class ForkingTaskRunner extends BaseRestorableTaskRunner<ForkingTaskRunner.ForkingTaskRunnerWorkItem> - implements TaskLogStreamer + implements TaskLogStreamer, WorkerTaskCountStatsProvider { private static final EmittingLogger LOGGER = new EmittingLogger(ForkingTaskRunner.class); private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property."; @@ -104,6 +106,11 @@ public class ForkingTaskRunner private volatile boolean stopping = false; + private static final AtomicLong LAST_REPORTED_FAILED_TASK_COUNT = new AtomicLong(); + private static final AtomicLong FAILED_TASK_COUNT = new AtomicLong(); + private static final AtomicLong SUCCESSFUL_TASK_COUNT = new AtomicLong(); + private static final AtomicLong LAST_REPORTED_SUCCESSFUL_TASK_COUNT = new AtomicLong(); + @Inject public ForkingTaskRunner( ForkingTaskRunnerConfig config, @@ -399,7 +406,11 @@ public class ForkingTaskRunner ) ); } - + if (status.isSuccess()) { + SUCCESSFUL_TASK_COUNT.incrementAndGet(); + } else { + FAILED_TASK_COUNT.incrementAndGet(); + } TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); return status; } @@ -690,18 +701,12 @@ public class ForkingTaskRunner @Override public Map<String, Long> getTotalTaskSlotCount() { - if (config.getPorts() != null && !config.getPorts().isEmpty()) { - return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(config.getPorts().size())); - } - return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(config.getEndPort() - config.getStartPort() + 1)); + return ImmutableMap.of(workerConfig.getCategory(), getTotalTaskSlotCountLong()); } public long getTotalTaskSlotCountLong() { - if (config.getPorts() != null && !config.getPorts().isEmpty()) { - return config.getPorts().size(); - } - return config.getEndPort() - config.getStartPort() + 1; + return workerConfig.getCapacity(); } @Override @@ -733,6 +738,54 @@ public class ForkingTaskRunner return ImmutableMap.of(workerConfig.getCategory(), 0L); } + @Override + public Long getWorkerFailedTaskCount() + { + long failedTaskCount = FAILED_TASK_COUNT.get(); + long lastReportedFailedTaskCount = LAST_REPORTED_FAILED_TASK_COUNT.get(); + LAST_REPORTED_FAILED_TASK_COUNT.set(failedTaskCount); + return failedTaskCount - lastReportedFailedTaskCount; + } + + @Override + public Long getWorkerIdleTaskSlotCount() + { + return Math.max(getTotalTaskSlotCountLong() - getUsedTaskSlotCountLong(), 0); + } + + @Override + public Long getWorkerUsedTaskSlotCount() + { + return (long) portFinder.findUsedPortCount(); + } + + @Override + public Long getWorkerTotalTaskSlotCount() + { + return getTotalTaskSlotCountLong(); + } + + @Override + public String getWorkerCategory() + { + return workerConfig.getCategory(); + } + + @Override + public String getWorkerVersion() + { + return workerConfig.getVersion(); + } + + @Override + public Long getWorkerSuccessfulTaskCount() + { + long successfulTaskCount = SUCCESSFUL_TASK_COUNT.get(); + long lastReportedSuccessfulTaskCount = LAST_REPORTED_SUCCESSFUL_TASK_COUNT.get(); + LAST_REPORTED_SUCCESSFUL_TASK_COUNT.set(successfulTaskCount); + return successfulTaskCount - lastReportedSuccessfulTaskCount; + } + protected static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem { private final Task task; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java index ae5c49d4af..2ba8e9d59d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java @@ -231,6 +231,12 @@ public class ForkingTaskRunnerTest @Override int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile) { + WorkerConfig workerConfig = new WorkerConfig(); + Assert.assertEquals(1L, (long) this.getWorkerUsedTaskSlotCount()); + Assert.assertEquals(workerConfig.getCapacity(), (long) this.getWorkerTotalTaskSlotCount()); + Assert.assertEquals(workerConfig.getCapacity() - 1, (long) this.getWorkerIdleTaskSlotCount()); + Assert.assertEquals(workerConfig.getCategory(), this.getWorkerCategory()); + Assert.assertEquals(workerConfig.getVersion(), this.getWorkerVersion()); // Emulate task process failure return 1; } @@ -242,6 +248,8 @@ public class ForkingTaskRunnerTest "Task execution process exited unsuccessfully with code[1]. See middleManager logs for more details.", status.getErrorMsg() ); + Assert.assertEquals(1L, (long) forkingTaskRunner.getWorkerFailedTaskCount()); + Assert.assertEquals(0L, (long) forkingTaskRunner.getWorkerSuccessfulTaskCount()); } @Test @@ -294,6 +302,12 @@ public class ForkingTaskRunnerTest @Override int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile) { + WorkerConfig workerConfig = new WorkerConfig(); + Assert.assertEquals(1L, (long) this.getWorkerUsedTaskSlotCount()); + Assert.assertEquals(workerConfig.getCapacity(), (long) this.getWorkerTotalTaskSlotCount()); + Assert.assertEquals(workerConfig.getCapacity() - 1, (long) this.getWorkerIdleTaskSlotCount()); + Assert.assertEquals(workerConfig.getCategory(), this.getWorkerCategory()); + Assert.assertEquals(workerConfig.getVersion(), this.getWorkerVersion()); return 0; } }; @@ -301,6 +315,8 @@ public class ForkingTaskRunnerTest final TaskStatus status = forkingTaskRunner.run(task).get(); Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode()); Assert.assertNull(status.getErrorMsg()); + Assert.assertEquals(0L, (long) forkingTaskRunner.getWorkerFailedTaskCount()); + Assert.assertEquals(1L, (long) forkingTaskRunner.getWorkerSuccessfulTaskCount()); } @Test diff --git a/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java new file mode 100644 index 0000000000..cfd01610a1 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.metrics; + +import com.google.inject.Inject; +import com.google.inject.Injector; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.guice.annotations.Self; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.java.util.metrics.AbstractMonitor; + +import java.util.Set; + +public class WorkerTaskCountStatsMonitor extends AbstractMonitor +{ + private final WorkerTaskCountStatsProvider statsProvider; + private final String workerCategory; + private final String workerVersion; + private final boolean isMiddleManager; + + @Inject + public WorkerTaskCountStatsMonitor( + Injector injector, + @Self Set<NodeRole> nodeRoles + ) + { + this.isMiddleManager = nodeRoles.contains(NodeRole.MIDDLE_MANAGER); + if (isMiddleManager) { + this.statsProvider = injector.getInstance(WorkerTaskCountStatsProvider.class); + this.workerCategory = statsProvider.getWorkerCategory(); + this.workerVersion = statsProvider.getWorkerVersion(); + } else { + this.statsProvider = null; + this.workerCategory = null; + this.workerVersion = null; + } + } + + @Override + public boolean doMonitor(ServiceEmitter emitter) + { + if (isMiddleManager) { + emit(emitter, "worker/task/failed/count", statsProvider.getWorkerFailedTaskCount()); + emit(emitter, "worker/task/success/count", statsProvider.getWorkerSuccessfulTaskCount()); + emit(emitter, "worker/taskSlot/idle/count", statsProvider.getWorkerIdleTaskSlotCount()); + emit(emitter, "worker/taskSlot/total/count", statsProvider.getWorkerTotalTaskSlotCount()); + emit(emitter, "worker/taskSlot/used/count", statsProvider.getWorkerUsedTaskSlotCount()); + } + return true; + } + + private void emit(ServiceEmitter emitter, String metricName, Long value) + { + if (value != null) { + final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); + builder.setDimension("category", workerCategory); + builder.setDimension("version", workerVersion); + emitter.emit(builder.build(metricName, value)); + } + } +} diff --git a/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsProvider.java new file mode 100644 index 0000000000..3ca9df5624 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsProvider.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.metrics; + +/** + * Proides task / task count status at the level of individual worker nodes. These merics + * are repoerted by workers, like middle-managers. + */ +public interface WorkerTaskCountStatsProvider +{ + /** + * The number of failed tasks run on worker during emission period. + */ + Long getWorkerFailedTaskCount(); + + /** + * The number of successful tasks run on worker during emission period. + */ + Long getWorkerSuccessfulTaskCount(); + + /** + * The number of idle task slots on worker. + */ + Long getWorkerIdleTaskSlotCount(); + + /** + * The number of total task slots on worker. + */ + Long getWorkerTotalTaskSlotCount(); + + /** + * The number of used task slots on worker. + */ + Long getWorkerUsedTaskSlotCount(); + + + /** + * The worker category. + */ + String getWorkerCategory(); + + /** + * The worker version. + */ + String getWorkerVersion(); +} diff --git a/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java new file mode 100644 index 0000000000..17702c0fad --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.metrics; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +public class WorkerTaskCountStatsMonitorTest +{ + private Injector injectorForMiddleManager; + private Injector injectorForMiddleManagerNullStats; + private Injector injectorForPeon; + + private WorkerTaskCountStatsProvider statsProvider; + private WorkerTaskCountStatsProvider nullStatsProvider; + + @Before + public void setUp() + { + statsProvider = new WorkerTaskCountStatsProvider() + { + @Override + public Long getWorkerTotalTaskSlotCount() + { + return 5L; + } + + @Override + public Long getWorkerFailedTaskCount() + { + return 4L; + } + + @Override + public Long getWorkerIdleTaskSlotCount() + { + return 3L; + } + + @Override + public Long getWorkerSuccessfulTaskCount() + { + return 2L; + } + + @Override + public Long getWorkerUsedTaskSlotCount() + { + return 1L; + } + + @Override + public String getWorkerCategory() + { + return "workerCategory"; + } + + @Override + public String getWorkerVersion() + { + return "workerVersion"; + } + }; + + nullStatsProvider = new WorkerTaskCountStatsProvider() + { + @Nullable + @Override + public Long getWorkerTotalTaskSlotCount() + { + return null; + } + + @Nullable + @Override + public Long getWorkerFailedTaskCount() + { + return null; + } + + @Nullable + @Override + public Long getWorkerIdleTaskSlotCount() + { + return null; + } + + @Nullable + @Override + public Long getWorkerSuccessfulTaskCount() + { + return null; + } + + @Nullable + @Override + public Long getWorkerUsedTaskSlotCount() + { + return null; + } + + @Nullable + @Override + public String getWorkerCategory() + { + return null; + } + + @Nullable + @Override + public String getWorkerVersion() + { + return null; + } + }; + + injectorForMiddleManager = Guice.createInjector( + ImmutableList.of( + (Module) binder -> { + binder.bind(WorkerTaskCountStatsProvider.class).toInstance(statsProvider); + } + ) + ); + + injectorForMiddleManagerNullStats = Guice.createInjector( + ImmutableList.of( + (Module) binder -> { + binder.bind(WorkerTaskCountStatsProvider.class).toInstance(nullStatsProvider); + } + ) + ); + + injectorForPeon = Guice.createInjector( + ImmutableList.of( + (Module) binder -> {} + ) + ); + } + + @Test + public void testMonitor() + { + final WorkerTaskCountStatsMonitor monitor = + new WorkerTaskCountStatsMonitor(injectorForMiddleManager, ImmutableSet.of(NodeRole.MIDDLE_MANAGER)); + final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + monitor.doMonitor(emitter); + Assert.assertEquals(5, emitter.getEvents().size()); + Assert.assertEquals("worker/task/failed/count", emitter.getEvents().get(0).toMap().get("metric")); + Assert.assertEquals("workerCategory", emitter.getEvents().get(0).toMap().get("category")); + Assert.assertEquals("workerVersion", emitter.getEvents().get(0).toMap().get("version")); + Assert.assertEquals(4L, emitter.getEvents().get(0).toMap().get("value")); + Assert.assertEquals("worker/task/success/count", emitter.getEvents().get(1).toMap().get("metric")); + Assert.assertEquals("workerCategory", emitter.getEvents().get(1).toMap().get("category")); + Assert.assertEquals("workerVersion", emitter.getEvents().get(1).toMap().get("version")); + Assert.assertEquals(2L, emitter.getEvents().get(1).toMap().get("value")); + Assert.assertEquals("worker/taskSlot/idle/count", emitter.getEvents().get(2).toMap().get("metric")); + Assert.assertEquals("workerCategory", emitter.getEvents().get(2).toMap().get("category")); + Assert.assertEquals("workerVersion", emitter.getEvents().get(2).toMap().get("version")); + Assert.assertEquals(3L, emitter.getEvents().get(2).toMap().get("value")); + Assert.assertEquals("worker/taskSlot/total/count", emitter.getEvents().get(3).toMap().get("metric")); + Assert.assertEquals("workerCategory", emitter.getEvents().get(3).toMap().get("category")); + Assert.assertEquals("workerVersion", emitter.getEvents().get(3).toMap().get("version")); + Assert.assertEquals(5L, emitter.getEvents().get(3).toMap().get("value")); + Assert.assertEquals("worker/taskSlot/used/count", emitter.getEvents().get(4).toMap().get("metric")); + Assert.assertEquals("workerCategory", emitter.getEvents().get(4).toMap().get("category")); + Assert.assertEquals("workerVersion", emitter.getEvents().get(4).toMap().get("version")); + Assert.assertEquals(1L, emitter.getEvents().get(4).toMap().get("value")); + } + + @Test + public void testMonitorWithNulls() + { + final WorkerTaskCountStatsMonitor monitor = + new WorkerTaskCountStatsMonitor(injectorForMiddleManagerNullStats, ImmutableSet.of(NodeRole.MIDDLE_MANAGER)); + final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + monitor.doMonitor(emitter); + Assert.assertEquals(0, emitter.getEvents().size()); + } + + @Test + public void testMonitorNotMiddleManager() + { + final WorkerTaskCountStatsMonitor monitor = + new WorkerTaskCountStatsMonitor(injectorForPeon, ImmutableSet.of(NodeRole.PEON)); + final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + monitor.doMonitor(emitter); + Assert.assertEquals(0, emitter.getEvents().size()); + } +} diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index a005e3a48d..ed424ba037 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -78,6 +78,7 @@ import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; import org.apache.druid.server.DruidNode; import org.apache.druid.server.http.SelfDiscoveryResource; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; +import org.apache.druid.server.metrics.WorkerTaskCountStatsProvider; import org.apache.druid.timeline.PruneLastCompactionState; import org.eclipse.jetty.server.Server; @@ -137,6 +138,7 @@ public class CliMiddleManager extends ServerRunnable binder.bind(TaskRunner.class).to(ForkingTaskRunner.class); binder.bind(ForkingTaskRunner.class).in(LazySingleton.class); + binder.bind(WorkerTaskCountStatsProvider.class).to(ForkingTaskRunner.class); binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class); binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexSupervisorTaskClient>>() {}) diff --git a/website/.spelling b/website/.spelling index e83d1b8598..89ad85dd32 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1428,6 +1428,7 @@ Sys SysMonitor TaskCountStatsMonitor TaskSlotCountStatsMonitor +WorkerTaskCountStatsMonitor bufferCapacity bufferpoolName cms --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org