This is an automated email from the ASF dual-hosted git repository. himanshug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push: new 2560bf0 Add new coordinator metrics for coordinator duty runtimes (#10603) 2560bf0 is described below commit 2560bf0a1919c36b824bd0e4f9286e2899deddd3 Author: Lucas Capistrant <capistr...@users.noreply.github.com> AuthorDate: Sun Nov 29 16:47:35 2020 -0600 Add new coordinator metrics for coordinator duty runtimes (#10603) * Add new coordinator metrics for duty runtimes * fix spelling for a constant variable value * add comment clarifying why the global runtime metric is emitted where it is * Remove duty alias in lieu of using the class name for metrics * fix docs * CoordinatorStats tests + add duty stats to accumulate() logic --- docs/operations/metrics.md | 2 + .../java/org/apache/druid/query/DruidMetrics.java | 3 ++ .../druid/server/coordinator/CoordinatorStats.java | 51 ++++++++++++++++++ .../druid/server/coordinator/DruidCoordinator.java | 28 ++++++++-- .../duty/EmitClusterStatsAndMetrics.java | 32 +++++++++++ .../server/coordinator/CoordinatorStatsTest.java | 63 ++++++++++++++++++++++ .../server/coordinator/DruidCoordinatorTest.java | 2 +- 7 files changed, 176 insertions(+), 5 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 62c0521..68b7f88 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -254,6 +254,8 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina |`segment/skipCompact/bytes`|Total bytes of this datasource that are skipped (not eligible for auto compaction) by the auto compaction.|datasource.|Varies.| |`segment/skipCompact/count`|Total number of segments of this datasource that are skipped (not eligible for auto compaction) by the auto compaction.|datasource.|Varies.| |`interval/skipCompact/count`|Total number of intervals of this datasource that are skipped (not eligible for auto compaction) by the auto compaction.|datasource.|Varies.| +|`coordinator/time`|Approximate Coordinator duty runtime in milliseconds. The duty dimension is the string alias of the Duty that is being run.|duty.|Varies.| +|`coordinator/global/time`|Approximate runtime of a full coordination cycle in milliseconds. The `dutyGroup` dimension indicates what type of coordination this run was. i.e. Historical Management vs Indexing|`dutyGroup`|Varies.| If `emitBalancingStats` is set to `true` in the Coordinator [dynamic configuration]( ../configuration/index.html#dynamic-configuration), then [log entries](../configuration/logging.md) for class diff --git a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java index b1e49af..1e31c5f 100644 --- a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java @@ -42,6 +42,9 @@ public class DruidMetrics public static final String SERVER = "server"; public static final String TIER = "tier"; + public static final String DUTY = "duty"; + public static final String DUTY_GROUP = "dutyGroup"; + public static int findNumComplexAggs(List<AggregatorFactory> aggs) { int retVal = 0; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorStats.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorStats.java index fa10f20..357cf45 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorStats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorStats.java @@ -35,12 +35,14 @@ public class CoordinatorStats { private final Map<String, Object2LongOpenHashMap<String>> perTierStats; private final Map<String, Object2LongOpenHashMap<String>> perDataSourceStats; + private final Map<String, Object2LongOpenHashMap<String>> perDutyStats; private final Object2LongOpenHashMap<String> globalStats; public CoordinatorStats() { perTierStats = new HashMap<>(); perDataSourceStats = new HashMap<>(); + perDutyStats = new HashMap<>(); globalStats = new Object2LongOpenHashMap<>(); } @@ -54,6 +56,11 @@ public class CoordinatorStats return !perDataSourceStats.isEmpty(); } + public boolean hasPerDutyStats() + { + return !perDutyStats.isEmpty(); + } + public Set<String> getTiers(final String statName) { final Object2LongOpenHashMap<String> theStat = perTierStats.get(statName); @@ -72,6 +79,15 @@ public class CoordinatorStats return Collections.unmodifiableSet(stat.keySet()); } + public Set<String> getDuties(String statName) + { + final Object2LongOpenHashMap<String> stat = perDutyStats.get(statName); + if (stat == null) { + return Collections.emptySet(); + } + return Collections.unmodifiableSet(stat.keySet()); + } + /** * * @param statName the name of the statistics @@ -109,6 +125,21 @@ public class CoordinatorStats } } + public long getDutyStat(String statName, String duty) + { + return perDutyStats.get(statName).getLong(duty); + } + + public void forEachDutyStat(String statName, ObjLongConsumer<String> consumer) + { + final Object2LongOpenHashMap<String> stat = perDutyStats.get(statName); + if (stat != null) { + for (Entry<String> entry : stat.object2LongEntrySet()) { + consumer.accept(entry.getKey(), entry.getLongValue()); + } + } + } + public long getGlobalStat(final String statName) { return globalStats.getLong(statName); @@ -132,6 +163,12 @@ public class CoordinatorStats .addTo(dataSource, value); } + public void addToDutyStat(String statName, String duty, long value) + { + perDutyStats.computeIfAbsent(statName, k -> new Object2LongOpenHashMap<>()) + .addTo(duty, value); + } + public void addToGlobalStat(final String statName, final long value) { globalStats.addTo(statName, value); @@ -166,6 +203,20 @@ public class CoordinatorStats } ); + stats.perDutyStats.forEach( + (statName, urStat) -> { + final Object2LongOpenHashMap<String> myStat = perDutyStats.computeIfAbsent( + statName, + k -> new Object2LongOpenHashMap<>() + ); + + for (Entry<String> entry : urStat.object2LongEntrySet()) { + myStat.addTo(entry.getKey(), entry.getLongValue()); + } + } + + ); + for (final Object2LongMap.Entry<String> entry : stats.globalStats.object2LongEntrySet()) { globalStats.addTo(entry.getKey(), entry.getLongValue()); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 678f440..2b6e8c2 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -60,8 +60,10 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.MetadataRuleManager; import org.apache.druid.metadata.SegmentsMetadataManager; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordinator.duty.BalanceSegments; import org.apache.druid.server.coordinator.duty.CompactSegments; @@ -93,6 +95,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** @@ -158,6 +161,10 @@ public class DruidCoordinator private int cachedBalancerThreadNumber; private ListeningExecutorService balancerExec; + private static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties"; + private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = "IndexingServiceDuties"; + private static final String COMPACT_SEGMENTS_DUTIES_DUTY_GROUP = "CompactSegmentsDuties"; + @Inject public DruidCoordinator( DruidCoordinatorConfig config, @@ -573,7 +580,7 @@ public class DruidCoordinator public void runCompactSegmentsDuty() { final int startingLeaderCounter = coordLeaderSelector.localTerm(); - DutiesRunnable compactSegmentsDuty = new DutiesRunnable(makeCompactSegmentsDuty(), startingLeaderCounter); + DutiesRunnable compactSegmentsDuty = new DutiesRunnable(makeCompactSegmentsDuty(), startingLeaderCounter, COMPACT_SEGMENTS_DUTIES_DUTY_GROUP); compactSegmentsDuty.run(); } @@ -598,14 +605,14 @@ public class DruidCoordinator final List<Pair<? extends DutiesRunnable, Duration>> dutiesRunnables = new ArrayList<>(); dutiesRunnables.add( Pair.of( - new DutiesRunnable(makeHistoricalManagementDuties(), startingLeaderCounter), + new DutiesRunnable(makeHistoricalManagementDuties(), startingLeaderCounter, HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP), config.getCoordinatorPeriod() ) ); if (indexingServiceClient != null) { dutiesRunnables.add( Pair.of( - new DutiesRunnable(makeIndexingServiceDuties(), startingLeaderCounter), + new DutiesRunnable(makeIndexingServiceDuties(), startingLeaderCounter, INDEXING_SERVICE_DUTIES_DUTY_GROUP), config.getCoordinatorIndexingPeriod() ) ); @@ -706,11 +713,13 @@ public class DruidCoordinator private final long startTimeNanos = System.nanoTime(); private final List<CoordinatorDuty> duties; private final int startingLeaderCounter; + private final String dutiesRunnableAlias; - protected DutiesRunnable(List<CoordinatorDuty> duties, final int startingLeaderCounter) + protected DutiesRunnable(List<CoordinatorDuty> duties, final int startingLeaderCounter, String alias) { this.duties = duties; this.startingLeaderCounter = startingLeaderCounter; + this.dutiesRunnableAlias = alias; } @VisibleForTesting @@ -747,6 +756,7 @@ public class DruidCoordinator public void run() { try { + final long globalStart = System.nanoTime(); synchronized (lock) { if (!coordLeaderSelector.isLeader()) { log.info("LEGGO MY EGGO. [%s] is leader.", coordLeaderSelector.getCurrentLeader()); @@ -801,14 +811,24 @@ public class DruidCoordinator && coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) { + final long start = System.nanoTime(); params = duty.run(params); + final long end = System.nanoTime(); if (params == null) { // This duty wanted to cancel the run. No log message, since the duty should have logged a reason. return; + } else { + params.getCoordinatorStats().addToDutyStat("runtime", duty.getClass().getName(), TimeUnit.NANOSECONDS.toMillis(end - start)); } } } + // Emit the runtime of the full DutiesRunnable + params.getEmitter().emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, dutiesRunnableAlias) + .build("coordinator/global/time", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - globalStart)) + ); } catch (Exception e) { log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java index 70bfb04..1f6cab3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java @@ -97,6 +97,35 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty ); } + private void emitDutyStat( + final ServiceEmitter emitter, + final String metricName, + final String duty, + final long value + ) + { + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY, duty) + .build(metricName, value) + ); + } + + private void emitDutyStats( + final ServiceEmitter emitter, + final String metricName, + final CoordinatorStats stats, + final String statName + ) + { + stats.forEachDutyStat( + statName, + (final String duty, final long count) -> { + emitDutyStat(emitter, metricName, duty, count); + } + ); + } + @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { @@ -435,6 +464,9 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty } ); + // Emit coordinator runtime stats + emitDutyStats(emitter, "coordinator/time", stats, "runtime"); + return params; } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorStatsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorStatsTest.java index 11fcfd1..00dc094 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorStatsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorStatsTest.java @@ -116,12 +116,18 @@ public class CoordinatorStatsTest stats.addToTieredStat("stat1", "tier1", 1); stats.addToTieredStat("stat1", "tier2", 1); stats.addToTieredStat("stat2", "tier1", 1); + stats.addToDutyStat("stat1", "duty1", 1); + stats.addToDutyStat("stat1", "duty2", 1); + stats.addToDutyStat("stat2", "duty1", 1); final CoordinatorStats stats2 = new CoordinatorStats(); stats2.addToGlobalStat("stat1", 1); stats2.addToTieredStat("stat1", "tier2", 1); stats2.addToTieredStat("stat2", "tier2", 1); stats2.addToTieredStat("stat3", "tier1", 1); + stats2.addToDutyStat("stat1", "duty2", 1); + stats2.addToDutyStat("stat2", "duty2", 1); + stats2.addToDutyStat("stat3", "duty1", 1); stats.accumulate(stats2); @@ -132,6 +138,11 @@ public class CoordinatorStatsTest Assert.assertEquals(1, stats.getTieredStat("stat2", "tier1")); Assert.assertEquals(1, stats.getTieredStat("stat2", "tier2")); Assert.assertEquals(1, stats.getTieredStat("stat3", "tier1")); + Assert.assertEquals(1, stats.getDutyStat("stat1", "duty1")); + Assert.assertEquals(2, stats.getDutyStat("stat1", "duty2")); + Assert.assertEquals(1, stats.getDutyStat("stat2", "duty1")); + Assert.assertEquals(1, stats.getDutyStat("stat2", "duty2")); + Assert.assertEquals(1, stats.getDutyStat("stat3", "duty1")); } @Test @@ -167,4 +178,56 @@ public class CoordinatorStatsTest Assert.assertEquals(10, stats.getTieredStat("stat1", "tier2")); } + + @Test(expected = NullPointerException.class) + public void testGetNonexistentDutyStat() + { + stats.getDutyStat("stat", "duty"); + } + + @Test + public void testAddToDutyStat() + { + Assert.assertFalse(stats.hasPerDutyStats()); + stats.addToDutyStat("stat1", "duty1", 1); + stats.addToDutyStat("stat1", "duty2", 1); + stats.addToDutyStat("stat1", "duty1", -5); + stats.addToDutyStat("stat2", "duty1", 1); + stats.addToDutyStat("stat1", "duty2", 1); + Assert.assertTrue(stats.hasPerDutyStats()); + + Assert.assertEquals( + Sets.newHashSet("duty1", "duty2"), + stats.getDuties("stat1") + ); + Assert.assertEquals( + Sets.newHashSet("duty1"), + stats.getDuties("stat2") + ); + Assert.assertTrue(stats.getDuties("stat3").isEmpty()); + + Assert.assertEquals(-4, stats.getDutyStat("stat1", "duty1")); + Assert.assertEquals(2, stats.getDutyStat("stat1", "duty2")); + Assert.assertEquals(1, stats.getDutyStat("stat2", "duty1")); + } + + @Test + public void testForEachDutyStat() + { + final Map<String, Long> expected = ImmutableMap.of( + "duty1", 1L, + "duty2", 2L, + "duty3", 3L + ); + final Map<String, Long> actual = new HashMap<>(); + + expected.forEach( + (duty, count) -> stats.addToDutyStat("stat", duty, count) + ); + + stats.forEachDutyStat("stat0", (duty, count) -> Assert.fail()); + stats.forEachDutyStat("stat", actual::put); + + Assert.assertEquals(expected, actual); + } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 34fe944..31bfeec 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -709,7 +709,7 @@ public class DruidCoordinatorTest extends CuratorTestBase ZkEnablementConfig.ENABLED ); - DruidCoordinator.DutiesRunnable duty = c.new DutiesRunnable(Collections.emptyList(), 0); + DruidCoordinator.DutiesRunnable duty = c.new DutiesRunnable(Collections.emptyList(), 0, "TEST"); // before initialization Assert.assertEquals(0, c.getCachedBalancerThreadNumber()); Assert.assertNull(c.getBalancerExec()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org