This is an automated email from the ASF dual-hosted git repository.

leventov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d0c2ede  Added CronScheduler support as a proof to clock drift while 
emitting metrics (#10448)
d0c2ede is described below

commit d0c2ede50c94bef89b72261fe3e553c60e7f0013
Author: Ayush Kulshrestha <kulshrestha.ayush2...@gmail.com>
AuthorDate: Wed Nov 25 17:01:38 2020 +0530

    Added CronScheduler support as a proof to clock drift while emitting 
metrics (#10448)
    
    Co-authored-by: Ayush Kulshrestha <ayush.kulshres...@miqdigital.com>
---
 core/pom.xml                                       |   4 +
 .../druid/java/util/metrics/AbstractMonitor.java   |  17 ++
 .../druid/java/util/metrics/CompoundMonitor.java   |  15 +
 .../apache/druid/java/util/metrics/Monitor.java    |   7 +
 .../druid/java/util/metrics/MonitorScheduler.java  |  75 +++--
 .../java/util/metrics/MonitorSchedulerTest.java    | 304 ++++++++++++++++++++-
 licenses.yaml                                      |  10 +
 pom.xml                                            |   5 +
 server/pom.xml                                     |   4 +
 .../apache/druid/server/metrics/MetricsModule.java |   7 +-
 10 files changed, 423 insertions(+), 25 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index e0ccfa1..72fea54 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -224,6 +224,10 @@
       <groupId>org.antlr</groupId>
       <artifactId>antlr4-runtime</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.timeandspace</groupId>
+      <artifactId>cron-scheduler</artifactId>
+    </dependency>
 
 
 
diff --git 
a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java 
b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java
index 029dd47..4fbefb8 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java
@@ -22,11 +22,16 @@ package org.apache.druid.java.util.metrics;
 
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 
+import java.util.concurrent.Future;
+
+
 /**
  */
 public abstract class AbstractMonitor implements Monitor
 {
   private volatile boolean started = false;
+  
+  private volatile Future<?> scheduledFuture;
 
   @Override
   public void start()
@@ -51,4 +56,16 @@ public abstract class AbstractMonitor implements Monitor
   }
 
   public abstract boolean doMonitor(ServiceEmitter emitter);
+
+  @Override
+  public Future<?> getScheduledFuture()
+  {
+    return scheduledFuture;
+  }
+
+  @Override
+  public void setScheduledFuture(Future<?> scheduledFuture)
+  {
+    this.scheduledFuture = scheduledFuture;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java 
b/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java
index 9811f58..6649312 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java
@@ -24,10 +24,13 @@ import 
org.apache.druid.java.util.emitter.service.ServiceEmitter;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.Future;
 
 public abstract class CompoundMonitor implements Monitor
 {
   private final List<Monitor> monitors;
+  
+  private volatile Future<?> scheduledFuture;
 
   public CompoundMonitor(List<Monitor> monitors)
   {
@@ -61,5 +64,17 @@ public abstract class CompoundMonitor implements Monitor
     return shouldReschedule(Lists.transform(monitors, monitor -> 
monitor.monitor(emitter)));
   }
 
+  @Override
+  public Future<?> getScheduledFuture()
+  {
+    return scheduledFuture;
+  }
+
+  @Override
+  public void setScheduledFuture(Future<?> scheduledFuture)
+  {
+    this.scheduledFuture = scheduledFuture;
+  }
+
   public abstract boolean shouldReschedule(List<Boolean> reschedules);
 }
diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java 
b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
index 2ccd5db..8a3975e 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
@@ -21,6 +21,9 @@ package org.apache.druid.java.util.metrics;
 
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 
+import java.util.concurrent.Future;
+
+
 /**
  */
 public interface Monitor
@@ -35,4 +38,8 @@ public interface Monitor
    * @return true if this monitor needs to continue monitoring. False 
otherwise.
    */
   boolean monitor(ServiceEmitter emitter);
+
+  Future<?> getScheduledFuture();
+  
+  void setScheduledFuture(Future<?> scheduledFuture);
 }
diff --git 
a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java 
b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
index 2adbe95..961f823 100644
--- 
a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
+++ 
b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
@@ -20,41 +20,52 @@
 package org.apache.druid.java.util.metrics;
 
 import com.google.common.collect.Sets;
+import io.timeandspace.cronscheduler.CronScheduler;
+import io.timeandspace.cronscheduler.CronTask;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
 
 /**
  */
 public class MonitorScheduler
 {
+  
+  private static final Logger log = new Logger(MonitorScheduler.class);
+  
   private final MonitorSchedulerConfig config;
-  private final ScheduledExecutorService exec;
   private final ServiceEmitter emitter;
   private final Set<Monitor> monitors;
   private final Object lock = new Object();
+  private final CronScheduler scheduler;
+  private final ExecutorService executorService;
 
   private volatile boolean started = false;
-
+  
   public MonitorScheduler(
       MonitorSchedulerConfig config,
-      ScheduledExecutorService exec,
+      CronScheduler scheduler,
       ServiceEmitter emitter,
-      List<Monitor> monitors
+      List<Monitor> monitors,
+      ExecutorService executorService
   )
   {
     this.config = config;
-    this.exec = exec;
+    this.scheduler = scheduler;
     this.emitter = emitter;
     this.monitors = Sets.newHashSet(monitors);
+    this.executorService = executorService;
   }
 
   @LifecycleStart
@@ -124,24 +135,47 @@ public class MonitorScheduler
   {
     synchronized (lock) {
       monitor.start();
-      ScheduledExecutors.scheduleAtFixedRate(
-          exec,
-          config.getEmitterPeriod(),
-          new Callable<ScheduledExecutors.Signal>()
+      long rate = config.getEmitterPeriod().getMillis();
+      Future<?> scheduledFuture = scheduler.scheduleAtFixedRate(
+          rate,
+          rate,
+          TimeUnit.MILLISECONDS,
+          new CronTask()
           {
+            private volatile Future<Boolean> monitorFuture = null;
             @Override
-            public ScheduledExecutors.Signal call()
+            public void run(long scheduledRunTimeMillis)
             {
-              // Run one more time even if the monitor was removed, in case 
there's some extra data to flush
-              if (monitor.monitor(emitter) && hasMonitor(monitor)) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                removeMonitor(monitor);
-                return ScheduledExecutors.Signal.STOP;
+              try {
+                if (monitorFuture != null && monitorFuture.isDone()
+                    && !(monitorFuture.get() && hasMonitor(monitor))) {
+                  removeMonitor(monitor);
+                  monitor.getScheduledFuture().cancel(false);
+                  log.debug("Stopped rescheduling %s (delay %s)", this, rate);
+                  return;
+                }
+                log.trace("Running %s (period %s)", this, rate);
+                monitorFuture = executorService.submit(new Callable<Boolean>()
+                {
+                  @Override
+                  public Boolean call()
+                  {
+                    try {
+                      return monitor.monitor(emitter);
+                    }
+                    catch (Throwable e) {
+                      log.error(e, "Uncaught exception.");
+                      return Boolean.FALSE;
+                    }
+                  }
+                });
+              }
+              catch (Throwable e) {
+                log.error(e, "Uncaught exception.");
               }
             }
-          }
-      );
+          });
+      monitor.setScheduledFuture(scheduledFuture);
     }
   }
 
@@ -151,4 +185,5 @@ public class MonitorScheduler
       return monitors.contains(monitor);
     }
   }
+  
 }
diff --git 
a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
 
b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
index 7667196..da2ba59 100644
--- 
a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
+++ 
b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
@@ -19,17 +19,41 @@
 
 package org.apache.druid.java.util.metrics;
 
+
 import com.google.common.collect.ImmutableList;
-import org.apache.druid.java.util.common.concurrent.Execs;
+import io.timeandspace.cronscheduler.CronScheduler;
+import io.timeandspace.cronscheduler.CronTask;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
+import java.time.Duration;
 import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 public class MonitorSchedulerTest
 {
+  
+  @Mock
+  private CronScheduler cronScheduler;
+  
+  @Before
+  public void setUp()
+  {
+    MockitoAnnotations.initMocks(this);
+  }
+  
   @Test
   public void testFindMonitor()
   {
@@ -45,12 +69,15 @@ public class MonitorSchedulerTest
 
     final Monitor1 monitor1 = new Monitor1();
     final Monitor2 monitor2 = new Monitor2();
+    
+    ExecutorService executor = Mockito.mock(ExecutorService.class);
 
     final MonitorScheduler scheduler = new MonitorScheduler(
         Mockito.mock(MonitorSchedulerConfig.class),
-        Execs.scheduledSingleThreaded("monitor-scheduler-test"),
+        
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("monitor-scheduler-test").build(),
         Mockito.mock(ServiceEmitter.class),
-        ImmutableList.of(monitor1, monitor2)
+        ImmutableList.of(monitor1, monitor2),
+        executor
     );
 
     final Optional<Monitor1> maybeFound1 = 
scheduler.findMonitor(Monitor1.class);
@@ -62,7 +89,264 @@ public class MonitorSchedulerTest
 
     Assert.assertFalse(scheduler.findMonitor(Monitor3.class).isPresent());
   }
+  
+  @Test
+  public void testStart_RepeatScheduling() 
+  {
+    ExecutorService executor = Mockito.mock(ExecutorService.class);
+
+    Mockito.doAnswer(new Answer<Future<?>>()
+    {
+      private int scheduleCount = 0;
+
+      @SuppressWarnings("unchecked")
+      @Override
+      public Future<?> answer(InvocationOnMock invocation) throws Exception
+      {
+        final Object originalArgument = (invocation.getArguments())[3];
+        CronTask task = ((CronTask) originalArgument);
+
+        Mockito.doAnswer(new Answer<Future<?>>()
+        {
+          @Override
+          public Future<Boolean> answer(InvocationOnMock invocation) throws 
Exception
+          {
+            final Object originalArgument = (invocation.getArguments())[0];
+            ((Callable<Boolean>) originalArgument).call();
+            return CompletableFuture.completedFuture(Boolean.TRUE);
+          }
+        }).when(executor).submit(ArgumentMatchers.any(Callable.class));
 
+        while (scheduleCount < 2) {
+          scheduleCount++;
+          task.run(123L);
+        }
+        return createDummyFuture();
+      }
+    }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), 
ArgumentMatchers.anyLong(),
+        ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+
+    Monitor monitor = Mockito.mock(Monitor.class);
+
+    MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
+    Mockito.when(config.getEmitterPeriod()).thenReturn(new 
org.joda.time.Duration(1000L));
+
+    final MonitorScheduler scheduler = new MonitorScheduler(
+        config,
+        cronScheduler,
+        Mockito.mock(ServiceEmitter.class),
+        ImmutableList.of(monitor),
+        executor);
+    scheduler.start();
+
+    Mockito.verify(monitor, Mockito.times(1)).start();
+    Mockito.verify(cronScheduler, 
Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
+        ArgumentMatchers.anyLong(),
+        ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+    Mockito.verify(executor, 
Mockito.times(2)).submit(ArgumentMatchers.any(Callable.class));
+    Mockito.verify(monitor, Mockito.times(2)).monitor(ArgumentMatchers.any());
+
+  }
+  
+  @Test
+  public void testStart_RepeatAndStopScheduling() 
+  {
+    ExecutorService executor = Mockito.mock(ExecutorService.class);
+
+    Mockito.doAnswer(new Answer<Future<?>>()
+    {
+      private int scheduleCount = 0;
+
+      @SuppressWarnings("unchecked")
+      @Override
+      public Future<?> answer(InvocationOnMock invocation) throws Exception
+      {
+        final Object originalArgument = (invocation.getArguments())[3];
+        CronTask task = ((CronTask) originalArgument);
+        Mockito.doAnswer(new Answer<Future<?>>()
+        {
+          @Override
+          public Future<Boolean> answer(InvocationOnMock invocation) throws 
Exception
+          {
+            final Object originalArgument = (invocation.getArguments())[0];
+            ((Callable<Boolean>) originalArgument).call();
+            return CompletableFuture.completedFuture(Boolean.FALSE);
+          }
+        }).when(executor).submit(ArgumentMatchers.any(Callable.class));
+
+        while (scheduleCount < 2) {
+          scheduleCount++;
+          task.run(123L);
+        }
+        return createDummyFuture();
+      }
+    }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), 
ArgumentMatchers.anyLong(),
+        ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+
+    Monitor monitor = Mockito.mock(Monitor.class);
+    Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
+
+    MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
+    Mockito.when(config.getEmitterPeriod()).thenReturn(new 
org.joda.time.Duration(1000L));
+
+    final MonitorScheduler scheduler = new MonitorScheduler(
+        config,
+        cronScheduler,
+        Mockito.mock(ServiceEmitter.class),
+        ImmutableList.of(monitor),
+        executor);
+    scheduler.start();
+    
+    Mockito.verify(monitor, Mockito.times(1)).start();
+    Mockito.verify(cronScheduler, 
Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
+        ArgumentMatchers.anyLong(),
+        ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+    Mockito.verify(executor, 
Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
+    Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any());
+    Mockito.verify(monitor, Mockito.times(1)).stop();
+    
+  }
+  
+  @Test
+  public void testStart_UnexpectedExceptionWhileMonitoring() 
+  {
+    ExecutorService executor = Mockito.mock(ExecutorService.class);
+    Monitor monitor = Mockito.mock(Monitor.class);
+    Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
+    
Mockito.when(monitor.monitor(ArgumentMatchers.any(ServiceEmitter.class))).thenThrow(new
 RuntimeException());
+
+    MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
+    Mockito.when(config.getEmitterPeriod()).thenReturn(new 
org.joda.time.Duration(1000L));
+
+
+    Mockito.doAnswer(new Answer<Future<?>>()
+    {
+      @SuppressWarnings("unchecked")
+      @Override
+      public Future<?> answer(InvocationOnMock invocation) throws Exception
+      {
+        final Object originalArgument = (invocation.getArguments())[3];
+        CronTask task = ((CronTask) originalArgument);
+
+        Mockito.doAnswer(new Answer<Future<?>>()
+        {
+          @Override
+          public Future<Boolean> answer(InvocationOnMock invocation) throws 
Exception
+          {
+            final Object originalArgument = (invocation.getArguments())[0];
+            ((Callable<Boolean>) originalArgument).call();
+            return CompletableFuture.completedFuture(Boolean.TRUE);
+          }
+        }).when(executor).submit(ArgumentMatchers.any(Callable.class));
+
+        task.run(123L);
+        return createDummyFuture();
+      }
+    }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), 
ArgumentMatchers.anyLong(),
+        ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+
+    
+    final MonitorScheduler scheduler = new MonitorScheduler(
+        config,
+        cronScheduler,
+        Mockito.mock(ServiceEmitter.class),
+        ImmutableList.of(monitor),
+        executor);
+    scheduler.start();
+    
+    Mockito.verify(monitor, Mockito.times(1)).start();
+    Mockito.verify(cronScheduler, 
Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
+        ArgumentMatchers.anyLong(),
+        ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+    Mockito.verify(executor, 
Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
+    Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any());
+  }
+  
+  
+  @Test
+  public void testStart_UnexpectedExceptionWhileScheduling() 
+  {
+    ExecutorService executor = Mockito.mock(ExecutorService.class);
+    Monitor monitor = Mockito.mock(Monitor.class);
+    Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
+    MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
+    Mockito.when(config.getEmitterPeriod()).thenReturn(new 
org.joda.time.Duration(1000L));
+
+
+    Mockito.doAnswer(new Answer<Future<?>>()
+    {
+      @SuppressWarnings("unchecked")
+      @Override
+      public Future<?> answer(InvocationOnMock invocation) throws Exception
+      {
+        final Object originalArgument = (invocation.getArguments())[3];
+        CronTask task = ((CronTask) originalArgument);
+
+        
Mockito.when(executor.submit(ArgumentMatchers.any(Callable.class))).thenThrow(new
 RuntimeException());
+        task.run(123L);
+        return createDummyFuture();
+      }
+    }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), 
ArgumentMatchers.anyLong(),
+        ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+
+    
+    final MonitorScheduler scheduler = new MonitorScheduler(
+        config,
+        cronScheduler,
+        Mockito.mock(ServiceEmitter.class),
+        ImmutableList.of(monitor),
+        executor);
+    scheduler.start();
+    
+    Mockito.verify(monitor, Mockito.times(1)).start();
+    Mockito.verify(cronScheduler, 
Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
+        ArgumentMatchers.anyLong(),
+        ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+    Mockito.verify(executor, 
Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
+  }
+  
+  
+  private Future createDummyFuture()
+  {
+    Future<?> future = new Future()
+    {
+
+      @Override
+      public boolean cancel(boolean mayInterruptIfRunning)
+      {
+        return false;
+      }
+
+      @Override
+      public boolean isCancelled()
+      {
+        return false;
+      }
+
+      @Override
+      public boolean isDone()
+      {
+        return false;
+      }
+
+      @Override
+      public Object get()
+      {
+        return null;
+      }
+
+      @Override
+      public Object get(long timeout, TimeUnit unit)
+      {
+        return null;
+      }
+
+    };
+
+    return future;
+  }
+  
+  
   private static class NoopMonitor implements Monitor
   {
     @Override
@@ -82,5 +366,19 @@ public class MonitorSchedulerTest
     {
       return true;
     }
+
+    @Override
+    public Future<?> getScheduledFuture()
+    {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public void setScheduledFuture(Future<?> scheduledFuture)
+    {
+      // TODO Auto-generated method stub
+      
+    }
   }
 }
diff --git a/licenses.yaml b/licenses.yaml
index 170b3d1..e684073 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -345,6 +345,16 @@ libraries:
 
 ---
 
+name: CronScheduler
+license_category: binary
+module: java-core
+license_name: Apache License version 2.0
+version: 0.1
+libraries:
+  - io.timeandspace: cron-scheduler
+
+---
+
 name: LMAX Disruptor
 license_category: binary
 module: java-core
diff --git a/pom.xml b/pom.xml
index 383de94..ffe9460 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1255,6 +1255,11 @@
                 <version>1.19.0</version>
                 <scope>test</scope>
             </dependency>
+            <dependency>
+                <groupId>io.timeandspace</groupId>
+                <artifactId>cron-scheduler</artifactId>
+                <version>0.1</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
diff --git a/server/pom.xml b/server/pom.xml
index 7fa3a48..caddf48 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -311,6 +311,10 @@
             <groupId>io.github.resilience4j</groupId>
             <artifactId>resilience4j-bulkhead</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.timeandspace</groupId>
+            <artifactId>cron-scheduler</artifactId>
+        </dependency>
 
         <!-- Tests -->
         <dependency>
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java 
b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
index 71b9945..d7598ae 100644
--- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
+++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
@@ -27,6 +27,7 @@ import com.google.inject.Key;
 import com.google.inject.Module;
 import com.google.inject.Provides;
 import com.google.inject.name.Names;
+import io.timeandspace.cronscheduler.CronScheduler;
 import org.apache.druid.guice.DruidBinders;
 import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.LazySingleton;
@@ -42,6 +43,7 @@ import org.apache.druid.java.util.metrics.MonitorScheduler;
 import org.apache.druid.java.util.metrics.SysMonitor;
 import org.apache.druid.query.ExecutorServiceMonitor;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -106,9 +108,10 @@ public class MetricsModule implements Module
 
     return new MonitorScheduler(
         config.get(),
-        Execs.scheduledSingleThreaded("MonitorScheduler-%s"),
+        
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorSchedulerThread").build(),
         emitter,
-        monitors
+        monitors,
+        Execs.multiThreaded(64, "MonitorThread-%d")
     );
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to