This is an automated email from the ASF dual-hosted git repository. leventov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push: new d0c2ede Added CronScheduler support as a proof to clock drift while emitting metrics (#10448) d0c2ede is described below commit d0c2ede50c94bef89b72261fe3e553c60e7f0013 Author: Ayush Kulshrestha <kulshrestha.ayush2...@gmail.com> AuthorDate: Wed Nov 25 17:01:38 2020 +0530 Added CronScheduler support as a proof to clock drift while emitting metrics (#10448) Co-authored-by: Ayush Kulshrestha <ayush.kulshres...@miqdigital.com> --- core/pom.xml | 4 + .../druid/java/util/metrics/AbstractMonitor.java | 17 ++ .../druid/java/util/metrics/CompoundMonitor.java | 15 + .../apache/druid/java/util/metrics/Monitor.java | 7 + .../druid/java/util/metrics/MonitorScheduler.java | 75 +++-- .../java/util/metrics/MonitorSchedulerTest.java | 304 ++++++++++++++++++++- licenses.yaml | 10 + pom.xml | 5 + server/pom.xml | 4 + .../apache/druid/server/metrics/MetricsModule.java | 7 +- 10 files changed, 423 insertions(+), 25 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index e0ccfa1..72fea54 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -224,6 +224,10 @@ <groupId>org.antlr</groupId> <artifactId>antlr4-runtime</artifactId> </dependency> + <dependency> + <groupId>io.timeandspace</groupId> + <artifactId>cron-scheduler</artifactId> + </dependency> diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java index 029dd47..4fbefb8 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java @@ -22,11 +22,16 @@ package org.apache.druid.java.util.metrics; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import java.util.concurrent.Future; + + /** */ public abstract class AbstractMonitor implements Monitor { private volatile boolean started = false; + + private volatile Future<?> scheduledFuture; @Override public void start() @@ -51,4 +56,16 @@ public abstract class AbstractMonitor implements Monitor } public abstract boolean doMonitor(ServiceEmitter emitter); + + @Override + public Future<?> getScheduledFuture() + { + return scheduledFuture; + } + + @Override + public void setScheduledFuture(Future<?> scheduledFuture) + { + this.scheduledFuture = scheduledFuture; + } } diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java index 9811f58..6649312 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java @@ -24,10 +24,13 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import java.util.Arrays; import java.util.List; +import java.util.concurrent.Future; public abstract class CompoundMonitor implements Monitor { private final List<Monitor> monitors; + + private volatile Future<?> scheduledFuture; public CompoundMonitor(List<Monitor> monitors) { @@ -61,5 +64,17 @@ public abstract class CompoundMonitor implements Monitor return shouldReschedule(Lists.transform(monitors, monitor -> monitor.monitor(emitter))); } + @Override + public Future<?> getScheduledFuture() + { + return scheduledFuture; + } + + @Override + public void setScheduledFuture(Future<?> scheduledFuture) + { + this.scheduledFuture = scheduledFuture; + } + public abstract boolean shouldReschedule(List<Boolean> reschedules); } diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java index 2ccd5db..8a3975e 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java @@ -21,6 +21,9 @@ package org.apache.druid.java.util.metrics; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import java.util.concurrent.Future; + + /** */ public interface Monitor @@ -35,4 +38,8 @@ public interface Monitor * @return true if this monitor needs to continue monitoring. False otherwise. */ boolean monitor(ServiceEmitter emitter); + + Future<?> getScheduledFuture(); + + void setScheduledFuture(Future<?> scheduledFuture); } diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java index 2adbe95..961f823 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java @@ -20,41 +20,52 @@ package org.apache.druid.java.util.metrics; import com.google.common.collect.Sets; +import io.timeandspace.cronscheduler.CronScheduler; +import io.timeandspace.cronscheduler.CronTask; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + /** */ public class MonitorScheduler { + + private static final Logger log = new Logger(MonitorScheduler.class); + private final MonitorSchedulerConfig config; - private final ScheduledExecutorService exec; private final ServiceEmitter emitter; private final Set<Monitor> monitors; private final Object lock = new Object(); + private final CronScheduler scheduler; + private final ExecutorService executorService; private volatile boolean started = false; - + public MonitorScheduler( MonitorSchedulerConfig config, - ScheduledExecutorService exec, + CronScheduler scheduler, ServiceEmitter emitter, - List<Monitor> monitors + List<Monitor> monitors, + ExecutorService executorService ) { this.config = config; - this.exec = exec; + this.scheduler = scheduler; this.emitter = emitter; this.monitors = Sets.newHashSet(monitors); + this.executorService = executorService; } @LifecycleStart @@ -124,24 +135,47 @@ public class MonitorScheduler { synchronized (lock) { monitor.start(); - ScheduledExecutors.scheduleAtFixedRate( - exec, - config.getEmitterPeriod(), - new Callable<ScheduledExecutors.Signal>() + long rate = config.getEmitterPeriod().getMillis(); + Future<?> scheduledFuture = scheduler.scheduleAtFixedRate( + rate, + rate, + TimeUnit.MILLISECONDS, + new CronTask() { + private volatile Future<Boolean> monitorFuture = null; @Override - public ScheduledExecutors.Signal call() + public void run(long scheduledRunTimeMillis) { - // Run one more time even if the monitor was removed, in case there's some extra data to flush - if (monitor.monitor(emitter) && hasMonitor(monitor)) { - return ScheduledExecutors.Signal.REPEAT; - } else { - removeMonitor(monitor); - return ScheduledExecutors.Signal.STOP; + try { + if (monitorFuture != null && monitorFuture.isDone() + && !(monitorFuture.get() && hasMonitor(monitor))) { + removeMonitor(monitor); + monitor.getScheduledFuture().cancel(false); + log.debug("Stopped rescheduling %s (delay %s)", this, rate); + return; + } + log.trace("Running %s (period %s)", this, rate); + monitorFuture = executorService.submit(new Callable<Boolean>() + { + @Override + public Boolean call() + { + try { + return monitor.monitor(emitter); + } + catch (Throwable e) { + log.error(e, "Uncaught exception."); + return Boolean.FALSE; + } + } + }); + } + catch (Throwable e) { + log.error(e, "Uncaught exception."); } } - } - ); + }); + monitor.setScheduledFuture(scheduledFuture); } } @@ -151,4 +185,5 @@ public class MonitorScheduler return monitors.contains(monitor); } } + } diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java index 7667196..da2ba59 100644 --- a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java +++ b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java @@ -19,17 +19,41 @@ package org.apache.druid.java.util.metrics; + import com.google.common.collect.ImmutableList; -import org.apache.druid.java.util.common.concurrent.Execs; +import io.timeandspace.cronscheduler.CronScheduler; +import io.timeandspace.cronscheduler.CronTask; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import java.time.Duration; import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; public class MonitorSchedulerTest { + + @Mock + private CronScheduler cronScheduler; + + @Before + public void setUp() + { + MockitoAnnotations.initMocks(this); + } + @Test public void testFindMonitor() { @@ -45,12 +69,15 @@ public class MonitorSchedulerTest final Monitor1 monitor1 = new Monitor1(); final Monitor2 monitor2 = new Monitor2(); + + ExecutorService executor = Mockito.mock(ExecutorService.class); final MonitorScheduler scheduler = new MonitorScheduler( Mockito.mock(MonitorSchedulerConfig.class), - Execs.scheduledSingleThreaded("monitor-scheduler-test"), + CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("monitor-scheduler-test").build(), Mockito.mock(ServiceEmitter.class), - ImmutableList.of(monitor1, monitor2) + ImmutableList.of(monitor1, monitor2), + executor ); final Optional<Monitor1> maybeFound1 = scheduler.findMonitor(Monitor1.class); @@ -62,7 +89,264 @@ public class MonitorSchedulerTest Assert.assertFalse(scheduler.findMonitor(Monitor3.class).isPresent()); } + + @Test + public void testStart_RepeatScheduling() + { + ExecutorService executor = Mockito.mock(ExecutorService.class); + + Mockito.doAnswer(new Answer<Future<?>>() + { + private int scheduleCount = 0; + + @SuppressWarnings("unchecked") + @Override + public Future<?> answer(InvocationOnMock invocation) throws Exception + { + final Object originalArgument = (invocation.getArguments())[3]; + CronTask task = ((CronTask) originalArgument); + + Mockito.doAnswer(new Answer<Future<?>>() + { + @Override + public Future<Boolean> answer(InvocationOnMock invocation) throws Exception + { + final Object originalArgument = (invocation.getArguments())[0]; + ((Callable<Boolean>) originalArgument).call(); + return CompletableFuture.completedFuture(Boolean.TRUE); + } + }).when(executor).submit(ArgumentMatchers.any(Callable.class)); + while (scheduleCount < 2) { + scheduleCount++; + task.run(123L); + } + return createDummyFuture(); + } + }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + + Monitor monitor = Mockito.mock(Monitor.class); + + MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); + Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); + + final MonitorScheduler scheduler = new MonitorScheduler( + config, + cronScheduler, + Mockito.mock(ServiceEmitter.class), + ImmutableList.of(monitor), + executor); + scheduler.start(); + + Mockito.verify(monitor, Mockito.times(1)).start(); + Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(), + ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + Mockito.verify(executor, Mockito.times(2)).submit(ArgumentMatchers.any(Callable.class)); + Mockito.verify(monitor, Mockito.times(2)).monitor(ArgumentMatchers.any()); + + } + + @Test + public void testStart_RepeatAndStopScheduling() + { + ExecutorService executor = Mockito.mock(ExecutorService.class); + + Mockito.doAnswer(new Answer<Future<?>>() + { + private int scheduleCount = 0; + + @SuppressWarnings("unchecked") + @Override + public Future<?> answer(InvocationOnMock invocation) throws Exception + { + final Object originalArgument = (invocation.getArguments())[3]; + CronTask task = ((CronTask) originalArgument); + Mockito.doAnswer(new Answer<Future<?>>() + { + @Override + public Future<Boolean> answer(InvocationOnMock invocation) throws Exception + { + final Object originalArgument = (invocation.getArguments())[0]; + ((Callable<Boolean>) originalArgument).call(); + return CompletableFuture.completedFuture(Boolean.FALSE); + } + }).when(executor).submit(ArgumentMatchers.any(Callable.class)); + + while (scheduleCount < 2) { + scheduleCount++; + task.run(123L); + } + return createDummyFuture(); + } + }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + + Monitor monitor = Mockito.mock(Monitor.class); + Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture()); + + MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); + Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); + + final MonitorScheduler scheduler = new MonitorScheduler( + config, + cronScheduler, + Mockito.mock(ServiceEmitter.class), + ImmutableList.of(monitor), + executor); + scheduler.start(); + + Mockito.verify(monitor, Mockito.times(1)).start(); + Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(), + ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class)); + Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any()); + Mockito.verify(monitor, Mockito.times(1)).stop(); + + } + + @Test + public void testStart_UnexpectedExceptionWhileMonitoring() + { + ExecutorService executor = Mockito.mock(ExecutorService.class); + Monitor monitor = Mockito.mock(Monitor.class); + Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture()); + Mockito.when(monitor.monitor(ArgumentMatchers.any(ServiceEmitter.class))).thenThrow(new RuntimeException()); + + MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); + Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); + + + Mockito.doAnswer(new Answer<Future<?>>() + { + @SuppressWarnings("unchecked") + @Override + public Future<?> answer(InvocationOnMock invocation) throws Exception + { + final Object originalArgument = (invocation.getArguments())[3]; + CronTask task = ((CronTask) originalArgument); + + Mockito.doAnswer(new Answer<Future<?>>() + { + @Override + public Future<Boolean> answer(InvocationOnMock invocation) throws Exception + { + final Object originalArgument = (invocation.getArguments())[0]; + ((Callable<Boolean>) originalArgument).call(); + return CompletableFuture.completedFuture(Boolean.TRUE); + } + }).when(executor).submit(ArgumentMatchers.any(Callable.class)); + + task.run(123L); + return createDummyFuture(); + } + }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + + + final MonitorScheduler scheduler = new MonitorScheduler( + config, + cronScheduler, + Mockito.mock(ServiceEmitter.class), + ImmutableList.of(monitor), + executor); + scheduler.start(); + + Mockito.verify(monitor, Mockito.times(1)).start(); + Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(), + ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class)); + Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any()); + } + + + @Test + public void testStart_UnexpectedExceptionWhileScheduling() + { + ExecutorService executor = Mockito.mock(ExecutorService.class); + Monitor monitor = Mockito.mock(Monitor.class); + Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture()); + MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); + Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); + + + Mockito.doAnswer(new Answer<Future<?>>() + { + @SuppressWarnings("unchecked") + @Override + public Future<?> answer(InvocationOnMock invocation) throws Exception + { + final Object originalArgument = (invocation.getArguments())[3]; + CronTask task = ((CronTask) originalArgument); + + Mockito.when(executor.submit(ArgumentMatchers.any(Callable.class))).thenThrow(new RuntimeException()); + task.run(123L); + return createDummyFuture(); + } + }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + + + final MonitorScheduler scheduler = new MonitorScheduler( + config, + cronScheduler, + Mockito.mock(ServiceEmitter.class), + ImmutableList.of(monitor), + executor); + scheduler.start(); + + Mockito.verify(monitor, Mockito.times(1)).start(); + Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(), + ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class)); + } + + + private Future createDummyFuture() + { + Future<?> future = new Future() + { + + @Override + public boolean cancel(boolean mayInterruptIfRunning) + { + return false; + } + + @Override + public boolean isCancelled() + { + return false; + } + + @Override + public boolean isDone() + { + return false; + } + + @Override + public Object get() + { + return null; + } + + @Override + public Object get(long timeout, TimeUnit unit) + { + return null; + } + + }; + + return future; + } + + private static class NoopMonitor implements Monitor { @Override @@ -82,5 +366,19 @@ public class MonitorSchedulerTest { return true; } + + @Override + public Future<?> getScheduledFuture() + { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setScheduledFuture(Future<?> scheduledFuture) + { + // TODO Auto-generated method stub + + } } } diff --git a/licenses.yaml b/licenses.yaml index 170b3d1..e684073 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -345,6 +345,16 @@ libraries: --- +name: CronScheduler +license_category: binary +module: java-core +license_name: Apache License version 2.0 +version: 0.1 +libraries: + - io.timeandspace: cron-scheduler + +--- + name: LMAX Disruptor license_category: binary module: java-core diff --git a/pom.xml b/pom.xml index 383de94..ffe9460 100644 --- a/pom.xml +++ b/pom.xml @@ -1255,6 +1255,11 @@ <version>1.19.0</version> <scope>test</scope> </dependency> + <dependency> + <groupId>io.timeandspace</groupId> + <artifactId>cron-scheduler</artifactId> + <version>0.1</version> + </dependency> </dependencies> </dependencyManagement> diff --git a/server/pom.xml b/server/pom.xml index 7fa3a48..caddf48 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -311,6 +311,10 @@ <groupId>io.github.resilience4j</groupId> <artifactId>resilience4j-bulkhead</artifactId> </dependency> + <dependency> + <groupId>io.timeandspace</groupId> + <artifactId>cron-scheduler</artifactId> + </dependency> <!-- Tests --> <dependency> diff --git a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java index 71b9945..d7598ae 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java +++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java @@ -27,6 +27,7 @@ import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.name.Names; +import io.timeandspace.cronscheduler.CronScheduler; import org.apache.druid.guice.DruidBinders; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; @@ -42,6 +43,7 @@ import org.apache.druid.java.util.metrics.MonitorScheduler; import org.apache.druid.java.util.metrics.SysMonitor; import org.apache.druid.query.ExecutorServiceMonitor; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -106,9 +108,10 @@ public class MetricsModule implements Module return new MonitorScheduler( config.get(), - Execs.scheduledSingleThreaded("MonitorScheduler-%s"), + CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorSchedulerThread").build(), emitter, - monitors + monitors, + Execs.multiThreaded(64, "MonitorThread-%d") ); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org