This is an automated email from the ASF dual-hosted git repository.

xvrl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 995d99d  add ingest/notices/queueSize metric to give visibility into 
supervisor notices queue size (#11417)
995d99d is described below

commit 995d99d9e44145fcb89b2fd6060f4d38f6c39e21
Author: Harini Rajendran <hrajend...@confluent.io>
AuthorDate: Fri Jul 30 09:59:26 2021 -0500

    add ingest/notices/queueSize metric to give visibility into supervisor 
notices queue size (#11417)
---
 docs/operations/metrics.md                         |   2 +
 .../supervisor/SeekableStreamSupervisor.java       |  88 ++++++++++-
 .../SeekableStreamSupervisorStateTest.java         | 170 +++++++++++++++++----
 website/.spelling                                  |   1 +
 4 files changed, 228 insertions(+), 33 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index d4ca7ad..0ed0ff4 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -181,6 +181,8 @@ These metrics are only available if the 
RealtimeMetricsMonitor is included in th
 |`ingest/handoff/count`|Number of handoffs that happened.|dataSource, taskId, 
taskType.|Varies. Generally greater than 0 once every segment granular period 
if cluster operating normally|
 |`ingest/sink/count`|Number of sinks not handoffed.|dataSource, taskId, 
taskType.|1~3|
 |`ingest/events/messageGap`|Time gap between the data time in event and 
current system time.|dataSource, taskId, taskType.|Greater than 0, depends on 
the time carried in event |
+|`ingest/notices/queueSize`|Number of pending notices to be processed by the 
coordinator|dataSource.|Typically 0 and occasionally in lower single digits. 
Should not be a very high number. |
+|`ingest/notices/time`|Milliseconds taken to process a notice by the 
supervisor|dataSource, noticeType.| < 1s. |
 
 
 Note: If the JVM does not support CPU time measurement for the current thread, 
ingest/merge/cpu and ingest/persists/cpu will be 0.
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index d5e7257..ec2a526 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -274,6 +274,13 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    */
   private interface Notice
   {
+    /**
+     * Returns a descriptive label for this notice type. Used for metrics 
emission and logging.
+     *
+     * @return task type label
+     */
+    String getType();
+
     void handle() throws ExecutionException, InterruptedException, 
TimeoutException;
   }
 
@@ -312,6 +319,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   private class RunNotice implements Notice
   {
+    private static final String TYPE = "run_notice";
+
     @Override
     public void handle()
     {
@@ -323,12 +332,19 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
       runInternal();
     }
+
+    @Override
+    public String getType()
+    {
+      return TYPE;
+    }
   }
 
   // change taskCount without resubmitting.
   private class DynamicAllocationTasksNotice implements Notice
   {
     Callable<Integer> scaleAction;
+    private static final String TYPE = "dynamic_allocation_tasks_notice";
 
     DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
     {
@@ -382,6 +398,12 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         }
       }
     }
+
+    @Override
+    public String getType()
+    {
+      return TYPE;
+    }
   }
 
   /**
@@ -458,6 +480,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   private class ShutdownNotice implements Notice
   {
+    private static final String TYPE = "shutdown_notice";
+
     @Override
     public void handle() throws InterruptedException, ExecutionException, 
TimeoutException
     {
@@ -468,11 +492,18 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         stopLock.notifyAll();
       }
     }
+
+    @Override
+    public String getType()
+    {
+      return TYPE;
+    }
   }
 
   private class ResetNotice implements Notice
   {
     final DataSourceMetadata dataSourceMetadata;
+    private static final String TYPE = "reset_notice";
 
     ResetNotice(DataSourceMetadata dataSourceMetadata)
     {
@@ -484,12 +515,19 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     {
       resetInternal(dataSourceMetadata);
     }
+
+    @Override
+    public String getType()
+    {
+      return TYPE;
+    }
   }
 
   protected class CheckpointNotice implements Notice
   {
     private final int taskGroupId;
     private final SeekableStreamDataSourceMetadata<PartitionIdType, 
SequenceOffsetType> checkpointMetadata;
+    private static final String TYPE = "checkpoint_notice";
 
     CheckpointNotice(
         int taskGroupId,
@@ -560,6 +598,12 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
       return true;
     }
+
+    @Override
+    public String getType()
+    {
+      return TYPE;
+    }
   }
 
   // Map<{group id}, {actively reading task group}>; see documentation for 
TaskGroup class
@@ -908,7 +952,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                     notice.handle();
                     Instant handleNoticeEndTime = Instant.now();
                     Duration timeElapsed = 
Duration.between(handleNoticeStartTime, handleNoticeEndTime);
-                    log.debug("Handled notice [%s] from notices queue in [%d] 
ms, current notices queue size [%d]", notice.getClass().getName(), 
timeElapsed.toMillis(), getNoticesQueueSize());
+                    String noticeType = notice.getType();
+                    log.debug("Handled notice [%s] from notices queue in [%d] 
ms, current notices queue size [%d] for datasource [%s]", noticeType, 
timeElapsed.toMillis(), getNoticesQueueSize(), dataSource);
+                    emitNoticeProcessTime(noticeType, timeElapsed.toMillis());
                   }
                   catch (Throwable e) {
                     stateManager.recordThrowableEvent(e);
@@ -3588,6 +3634,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   /**
    * default implementation, schedules periodic fetch of latest offsets and 
{@link #emitLag} reporting for Kafka and Kinesis
+   * and periodic reporting of {@Link #emitNoticesQueueSize} for various data 
sources.
    */
   protected void scheduleReporting(ScheduledExecutorService reportingExec)
   {
@@ -3610,6 +3657,12 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
         TimeUnit.MILLISECONDS
     );
+    reportingExec.scheduleAtFixedRate(
+        this::emitNoticesQueueSize,
+        ioConfig.getStartDelay().getMillis() + 
INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up
+        spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
+        TimeUnit.MILLISECONDS
+    );
   }
 
   /**
@@ -3658,6 +3711,39 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
            && 
makeSequenceNumber(earliestOffset).compareTo(makeSequenceNumber(offsetFromMetadata))
 <= 0;
   }
 
+  protected void emitNoticeProcessTime(String noticeType, long timeInMillis)
+  {
+    try {
+      emitter.emit(
+          ServiceMetricEvent.builder()
+              .setDimension("noticeType", noticeType)
+              .setDimension("dataSource", dataSource)
+              .build("ingest/notices/time", timeInMillis)
+      );
+    }
+    catch (Exception e) {
+      log.warn(e, "Unable to emit notices process time");
+    }
+  }
+
+  protected void emitNoticesQueueSize()
+  {
+    if (spec.isSuspended()) {
+      // don't emit metrics if supervisor is suspended
+      return;
+    }
+    try {
+      emitter.emit(
+          ServiceMetricEvent.builder()
+              .setDimension("dataSource", dataSource)
+              .build("ingest/notices/queueSize", getNoticesQueueSize())
+      );
+    }
+    catch (Exception e) {
+      log.warn(e, "Unable to emit notices queue size");
+    }
+  }
+
   protected void emitLag()
   {
     if (spec.isSuspended() || !stateManager.isSteadyState()) {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 91dd10a..5f5d47f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -88,6 +88,8 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -97,6 +99,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 public class SeekableStreamSupervisorStateTest extends EasyMockSupport
 {
@@ -625,7 +628,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
   {
     expectEmitterSupervisor(false);
 
-    CountDownLatch latch = new CountDownLatch(1);
+    CountDownLatch latch = new CountDownLatch(2);
     TestEmittingTestSeekableStreamSupervisor supervisor = new 
TestEmittingTestSeekableStreamSupervisor(
         latch,
         ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
@@ -643,19 +646,23 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
 
 
     latch.await();
-    Assert.assertEquals(6, emitter.getEvents().size());
-    Assert.assertEquals("ingest/test/lag", 
emitter.getEvents().get(0).toMap().get("metric"));
-    Assert.assertEquals(850L, emitter.getEvents().get(0).toMap().get("value"));
-    Assert.assertEquals("ingest/test/maxLag", 
emitter.getEvents().get(1).toMap().get("metric"));
-    Assert.assertEquals(500L, emitter.getEvents().get(1).toMap().get("value"));
-    Assert.assertEquals("ingest/test/avgLag", 
emitter.getEvents().get(2).toMap().get("metric"));
-    Assert.assertEquals(283L, emitter.getEvents().get(2).toMap().get("value"));
-    Assert.assertEquals("ingest/test/lag/time", 
emitter.getEvents().get(3).toMap().get("metric"));
-    Assert.assertEquals(45000L, 
emitter.getEvents().get(3).toMap().get("value"));
-    Assert.assertEquals("ingest/test/maxLag/time", 
emitter.getEvents().get(4).toMap().get("metric"));
-    Assert.assertEquals(20000L, 
emitter.getEvents().get(4).toMap().get("value"));
-    Assert.assertEquals("ingest/test/avgLag/time", 
emitter.getEvents().get(5).toMap().get("metric"));
-    Assert.assertEquals(15000L, 
emitter.getEvents().get(5).toMap().get("value"));
+    List<Event> events = emitter.getEvents();
+    List<String> whitelist = Arrays.asList("ingest/test/lag", 
"ingest/test/maxLag",
+        "ingest/test/avgLag", "ingest/test/lag/time", 
"ingest/test/maxLag/time", "ingest/test/avgLag/time");
+    events = filterMetrics(events, whitelist);
+    Assert.assertEquals(6, events.size());
+    Assert.assertEquals("ingest/test/lag", 
events.get(0).toMap().get("metric"));
+    Assert.assertEquals(850L, events.get(0).toMap().get("value"));
+    Assert.assertEquals("ingest/test/maxLag", 
events.get(1).toMap().get("metric"));
+    Assert.assertEquals(500L, events.get(1).toMap().get("value"));
+    Assert.assertEquals("ingest/test/avgLag", 
events.get(2).toMap().get("metric"));
+    Assert.assertEquals(283L, events.get(2).toMap().get("value"));
+    Assert.assertEquals("ingest/test/lag/time", 
events.get(3).toMap().get("metric"));
+    Assert.assertEquals(45000L, events.get(3).toMap().get("value"));
+    Assert.assertEquals("ingest/test/maxLag/time", 
events.get(4).toMap().get("metric"));
+    Assert.assertEquals(20000L, events.get(4).toMap().get("value"));
+    Assert.assertEquals("ingest/test/avgLag/time", 
events.get(5).toMap().get("metric"));
+    Assert.assertEquals(15000L, events.get(5).toMap().get("value"));
     verifyAll();
   }
 
@@ -664,7 +671,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
   {
     expectEmitterSupervisor(false);
 
-    CountDownLatch latch = new CountDownLatch(1);
+    CountDownLatch latch = new CountDownLatch(2);
     TestEmittingTestSeekableStreamSupervisor supervisor = new 
TestEmittingTestSeekableStreamSupervisor(
         latch,
         ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
@@ -682,13 +689,16 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
 
 
     latch.await();
-    Assert.assertEquals(3, emitter.getEvents().size());
-    Assert.assertEquals("ingest/test/lag", 
emitter.getEvents().get(0).toMap().get("metric"));
-    Assert.assertEquals(850L, emitter.getEvents().get(0).toMap().get("value"));
-    Assert.assertEquals("ingest/test/maxLag", 
emitter.getEvents().get(1).toMap().get("metric"));
-    Assert.assertEquals(500L, emitter.getEvents().get(1).toMap().get("value"));
-    Assert.assertEquals("ingest/test/avgLag", 
emitter.getEvents().get(2).toMap().get("metric"));
-    Assert.assertEquals(283L, emitter.getEvents().get(2).toMap().get("value"));
+    List<Event> events = emitter.getEvents();
+    List<String> whitelist = Arrays.asList("ingest/test/lag", 
"ingest/test/maxLag", "ingest/test/avgLag");
+    events = filterMetrics(events, whitelist);
+    Assert.assertEquals(3, events.size());
+    Assert.assertEquals("ingest/test/lag", 
events.get(0).toMap().get("metric"));
+    Assert.assertEquals(850L, events.get(0).toMap().get("value"));
+    Assert.assertEquals("ingest/test/maxLag", 
events.get(1).toMap().get("metric"));
+    Assert.assertEquals(500L, events.get(1).toMap().get("value"));
+    Assert.assertEquals("ingest/test/avgLag", 
events.get(2).toMap().get("metric"));
+    Assert.assertEquals(283L, events.get(2).toMap().get("value"));
     verifyAll();
   }
 
@@ -697,7 +707,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
   {
     expectEmitterSupervisor(false);
 
-    CountDownLatch latch = new CountDownLatch(1);
+    CountDownLatch latch = new CountDownLatch(2);
     TestEmittingTestSeekableStreamSupervisor supervisor = new 
TestEmittingTestSeekableStreamSupervisor(
         latch,
         null,
@@ -715,13 +725,78 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
 
 
     latch.await();
-    Assert.assertEquals(3, emitter.getEvents().size());
-    Assert.assertEquals("ingest/test/lag/time", 
emitter.getEvents().get(0).toMap().get("metric"));
-    Assert.assertEquals(45000L, 
emitter.getEvents().get(0).toMap().get("value"));
-    Assert.assertEquals("ingest/test/maxLag/time", 
emitter.getEvents().get(1).toMap().get("metric"));
-    Assert.assertEquals(20000L, 
emitter.getEvents().get(1).toMap().get("value"));
-    Assert.assertEquals("ingest/test/avgLag/time", 
emitter.getEvents().get(2).toMap().get("metric"));
-    Assert.assertEquals(15000L, 
emitter.getEvents().get(2).toMap().get("value"));
+    List<Event> events = emitter.getEvents();
+    List<String> whitelist = Arrays.asList("ingest/test/lag/time", 
"ingest/test/maxLag/time", "ingest/test/avgLag/time");
+    events = filterMetrics(events, whitelist);
+    Assert.assertEquals(3, events.size());
+    Assert.assertEquals("ingest/test/lag/time", 
events.get(0).toMap().get("metric"));
+    Assert.assertEquals(45000L, events.get(0).toMap().get("value"));
+    Assert.assertEquals("ingest/test/maxLag/time", 
events.get(1).toMap().get("metric"));
+    Assert.assertEquals(20000L, events.get(1).toMap().get("value"));
+    Assert.assertEquals("ingest/test/avgLag/time", 
events.get(2).toMap().get("metric"));
+    Assert.assertEquals(15000L, events.get(2).toMap().get("value"));
+    verifyAll();
+  }
+
+  @Test
+  public void testEmitNoticesQueueSize() throws Exception
+  {
+    expectEmitterSupervisor(false);
+
+    CountDownLatch latch = new CountDownLatch(1);
+    TestEmittingTestSeekableStreamSupervisor supervisor = new 
TestEmittingTestSeekableStreamSupervisor(
+        latch,
+        null,
+        null
+    );
+
+
+    supervisor.start();
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.PENDING, 
supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.PENDING, 
supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+
+    latch.await();
+    List<Event> events = emitter.getEvents();
+    List<String> whitelist = 
Collections.singletonList("ingest/notices/queueSize");
+    events = filterMetrics(events, whitelist);
+    Assert.assertEquals(1, events.size());
+    Assert.assertEquals("ingest/notices/queueSize", 
events.get(0).toMap().get("metric"));
+    Assert.assertEquals(0, events.get(0).toMap().get("value"));
+    Assert.assertEquals("testDS", events.get(0).toMap().get("dataSource"));
+    verifyAll();
+  }
+
+  @Test
+  public void testEmitNoticesTime() throws Exception
+  {
+    expectEmitterSupervisor(false);
+    CountDownLatch latch = new CountDownLatch(2);
+    TestEmittingTestSeekableStreamSupervisor supervisor = new 
TestEmittingTestSeekableStreamSupervisor(
+        latch,
+        null,
+        null
+    );
+    supervisor.start();
+    supervisor.emitNoticesTime();
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.PENDING, 
supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.PENDING, 
supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+    latch.await();
+    List<Event> events = emitter.getEvents();
+    List<String> whitelist = Collections.singletonList("ingest/notices/time");
+    events = filterMetrics(events, whitelist);
+    Assert.assertEquals(1, events.size());
+    Assert.assertEquals("ingest/notices/time", 
events.get(0).toMap().get("metric"));
+    Assert.assertEquals(500L, events.get(0).toMap().get("value"));
+    Assert.assertEquals("testDS", events.get(0).toMap().get("dataSource"));
+    Assert.assertEquals("dummyNoticeType", 
events.get(0).toMap().get("noticeType"));
     verifyAll();
   }
 
@@ -730,7 +805,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
   {
     expectEmitterSupervisor(true);
 
-    CountDownLatch latch = new CountDownLatch(1);
+    CountDownLatch latch = new CountDownLatch(2);
     TestEmittingTestSeekableStreamSupervisor supervisor = new 
TestEmittingTestSeekableStreamSupervisor(
         latch,
         ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
@@ -748,10 +823,22 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
 
 
     latch.await();
-    Assert.assertEquals(0, emitter.getEvents().size());
+    List<Event> events = emitter.getEvents();
+    List<String> whitelist = Arrays.asList("ingest/test/lag", 
"ingest/test/maxLag",
+        "ingest/test/avgLag", "ingest/test/lag/time", 
"ingest/test/maxLag/time", "ingest/test/avgLag/time");
+    events = filterMetrics(events, whitelist);
+    Assert.assertEquals(0, events.size());
     verifyAll();
   }
 
+  private List<Event> filterMetrics(List<Event> events, List<String> whitelist)
+  {
+    List<Event> result = events.stream()
+        .filter(e -> whitelist.contains(e.toMap().get("metric").toString()))
+        .collect(Collectors.toList());
+    return result;
+  }
+
   private void expectEmitterSupervisor(boolean suspended) throws 
EntryExistsException
   {
     spec = createMock(SeekableStreamSupervisorSpec.class);
@@ -1250,6 +1337,19 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     }
 
     @Override
+    protected void emitNoticesQueueSize()
+    {
+      super.emitNoticesQueueSize();
+      latch.countDown();
+    }
+
+    public void emitNoticesTime()
+    {
+      super.emitNoticeProcessTime("dummyNoticeType", 500);
+      latch.countDown();
+    }
+
+    @Override
     public LagStats computeLagStats()
     {
       return null;
@@ -1265,6 +1365,12 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
           spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
           TimeUnit.MILLISECONDS
       );
+      reportingExec.scheduleAtFixedRate(
+          this::emitNoticesQueueSize,
+          ioConfig.getStartDelay().getMillis(),
+          spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
+          TimeUnit.MILLISECONDS
+      );
     }
   }
 
diff --git a/website/.spelling b/website/.spelling
index d4b568c..0e0ab00 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1276,6 +1276,7 @@ nativeQueryIds
 netAddress
 netHwaddr
 netName
+noticeType
 numComplexMetrics
 numDimensions
 numMetrics

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to