HDDS-240. Implement metrics for EventQueue. Contributed by Elek, Marton.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2403231c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2403231c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2403231c Branch: refs/heads/HDDS-48 Commit: 2403231c8c3685ba08cd6bdf715d281cae611e45 Parents: 3c0a66a Author: Anu Engineer <aengin...@apache.org> Authored: Mon Jul 9 13:04:44 2018 -0700 Committer: Anu Engineer <aengin...@apache.org> Committed: Mon Jul 9 13:04:44 2018 -0700 ---------------------------------------------------------------------- .../hadoop/hdds/server/events/EventQueue.java | 108 +++++++++++-------- .../server/events/SingleThreadExecutor.java | 35 ++++-- .../hdds/server/events/TestEventQueue.java | 35 +----- 3 files changed, 91 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/2403231c/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java index 44d85f5..7e29223 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java @@ -18,7 +18,11 @@ package org.apache.hadoop.hdds.server.events; import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; + +import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +46,8 @@ public class EventQueue implements EventPublisher, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(EventQueue.class); + private static final String EXECUTOR_NAME_SEPARATOR = "For"; + private final Map<Event, Map<EventExecutor, List<EventHandler>>> executors = new HashMap<>(); @@ -51,38 +57,74 @@ public class EventQueue implements EventPublisher, AutoCloseable { public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler( EVENT_TYPE event, EventHandler<PAYLOAD> handler) { - - this.addHandler(event, new SingleThreadExecutor<>( - event.getName()), handler); + this.addHandler(event, handler, generateHandlerName(handler)); } + /** + * Add new handler to the event queue. + * <p> + * By default a separated single thread executor will be dedicated to + * deliver the events to the registered event handler. + * + * @param event Triggering event. + * @param handler Handler of event (will be called from a separated + * thread) + * @param handlerName The name of handler (should be unique together with + * the event name) + * @param <PAYLOAD> The type of the event payload. + * @param <EVENT_TYPE> The type of the event identifier. + */ public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler( - EVENT_TYPE event, - EventExecutor<PAYLOAD> executor, - EventHandler<PAYLOAD> handler) { + EVENT_TYPE event, EventHandler<PAYLOAD> handler, String handlerName) { + validateEvent(event); + Preconditions.checkNotNull(handler, "Handler name should not be null."); + String executorName = + StringUtils.camelize(event.getName()) + EXECUTOR_NAME_SEPARATOR + + handlerName; + this.addHandler(event, new SingleThreadExecutor<>(executorName), handler); + } - executors.putIfAbsent(event, new HashMap<>()); - executors.get(event).putIfAbsent(executor, new ArrayList<>()); + private <EVENT_TYPE extends Event<?>> void validateEvent(EVENT_TYPE event) { + Preconditions + .checkArgument(!event.getName().contains(EXECUTOR_NAME_SEPARATOR), + "Event name should not contain " + EXECUTOR_NAME_SEPARATOR + + " string."); - executors.get(event) - .get(executor) - .add(handler); + } + + private <PAYLOAD> String generateHandlerName(EventHandler<PAYLOAD> handler) { + if (!"".equals(handler.getClass().getSimpleName())) { + return handler.getClass().getSimpleName(); + } else { + return handler.getClass().getName(); + } } /** - * Creates one executor with multiple event handlers. + * Add event handler with custom executor. + * + * @param event Triggering event. + * @param executor The executor imlementation to deliver events from a + * separated threads. Please keep in your mind that + * registering metrics is the responsibility of the + * caller. + * @param handler Handler of event (will be called from a separated + * thread) + * @param <PAYLOAD> The type of the event payload. + * @param <EVENT_TYPE> The type of the event identifier. */ - public void addHandlerGroup(String name, HandlerForEvent<?>... - eventsAndHandlers) { - SingleThreadExecutor sharedExecutor = - new SingleThreadExecutor(name); - for (HandlerForEvent handlerForEvent : eventsAndHandlers) { - addHandler(handlerForEvent.event, sharedExecutor, - handlerForEvent.handler); - } + public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler( + EVENT_TYPE event, EventExecutor<PAYLOAD> executor, + EventHandler<PAYLOAD> handler) { + validateEvent(event); + executors.putIfAbsent(event, new HashMap<>()); + executors.get(event).putIfAbsent(executor, new ArrayList<>()); + executors.get(event).get(executor).add(handler); } + + /** * Route an event with payload to the right listener(s). * @@ -183,31 +225,5 @@ public class EventQueue implements EventPublisher, AutoCloseable { }); } - /** - * Event identifier together with the handler. - * - * @param <PAYLOAD> - */ - public static class HandlerForEvent<PAYLOAD> { - - private final Event<PAYLOAD> event; - - private final EventHandler<PAYLOAD> handler; - - public HandlerForEvent( - Event<PAYLOAD> event, - EventHandler<PAYLOAD> handler) { - this.event = event; - this.handler = handler; - } - - public Event<PAYLOAD> getEvent() { - return event; - } - - public EventHandler<PAYLOAD> getHandler() { - return handler; - } - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2403231c/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java index a64e3d7..3253f2d 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java @@ -23,13 +23,18 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; /** * Simple EventExecutor to call all the event handler one-by-one. * * @param <T> */ +@Metrics(context = "EventQueue") public class SingleThreadExecutor<T> implements EventExecutor<T> { public static final String THREAD_NAME_PREFIX = "EventQueue"; @@ -41,14 +46,24 @@ public class SingleThreadExecutor<T> implements EventExecutor<T> { private final ThreadPoolExecutor executor; - private final AtomicLong queuedCount = new AtomicLong(0); + @Metric + private MutableCounterLong queued; - private final AtomicLong successfulCount = new AtomicLong(0); + @Metric + private MutableCounterLong done; - private final AtomicLong failedCount = new AtomicLong(0); + @Metric + private MutableCounterLong failed; + /** + * Create SingleThreadExecutor. + * + * @param name Unique name used in monitoring and metrics. + */ public SingleThreadExecutor(String name) { this.name = name; + DefaultMetricsSystem.instance() + .register("EventQueue" + name, "Event Executor metrics ", this); LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(); executor = @@ -64,31 +79,31 @@ public class SingleThreadExecutor<T> implements EventExecutor<T> { @Override public void onMessage(EventHandler<T> handler, T message, EventPublisher publisher) { - queuedCount.incrementAndGet(); + queued.incr(); executor.execute(() -> { try { handler.onMessage(message, publisher); - successfulCount.incrementAndGet(); + done.incr(); } catch (Exception ex) { LOG.error("Error on execution message {}", message, ex); - failedCount.incrementAndGet(); + failed.incr(); } }); } @Override public long failedEvents() { - return failedCount.get(); + return failed.value(); } @Override public long successfulEvents() { - return successfulCount.get(); + return done.value(); } @Override public long queuedEvents() { - return queuedCount.get(); + return queued.value(); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/2403231c/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java index 3944409..2bdf705 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java @@ -25,6 +25,8 @@ import org.junit.Test; import java.util.Set; import java.util.stream.Collectors; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; + /** * Testing the basic functionality of the event queue. */ @@ -44,11 +46,13 @@ public class TestEventQueue { @Before public void startEventQueue() { + DefaultMetricsSystem.initialize(getClass().getSimpleName()); queue = new EventQueue(); } @After public void stopEventQueue() { + DefaultMetricsSystem.shutdown(); queue.close(); } @@ -79,35 +83,4 @@ public class TestEventQueue { } - @Test - public void handlerGroup() { - final long[] result = new long[2]; - queue.addHandlerGroup( - "group", - new EventQueue.HandlerForEvent<>(EVENT3, (payload, publisher) -> - result[0] = payload), - new EventQueue.HandlerForEvent<>(EVENT4, (payload, publisher) -> - result[1] = payload) - ); - - queue.fireEvent(EVENT3, 23L); - queue.fireEvent(EVENT4, 42L); - - queue.processAll(1000); - - Assert.assertEquals(23, result[0]); - Assert.assertEquals(42, result[1]); - - Set<String> eventQueueThreadNames = - Thread.getAllStackTraces().keySet() - .stream() - .filter(t -> t.getName().startsWith(SingleThreadExecutor - .THREAD_NAME_PREFIX)) - .map(Thread::getName) - .collect(Collectors.toSet()); - System.out.println(eventQueueThreadNames); - Assert.assertEquals(1, eventQueueThreadNames.size()); - - } - } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org