HDDS-224. Create metrics for Event Watcher. Contributed by Elek, Marton.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e12d93bf Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e12d93bf Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e12d93bf Branch: refs/heads/YARN-7402 Commit: e12d93bfc1a0efd007bc84758e60b5149c3aa663 Parents: 895845e Author: Anu Engineer <aengin...@apache.org> Authored: Mon Jul 9 12:02:20 2018 -0700 Committer: Anu Engineer <aengin...@apache.org> Committed: Mon Jul 9 12:10:12 2018 -0700 ---------------------------------------------------------------------- hadoop-hdds/framework/pom.xml | 5 + .../hadoop/hdds/server/events/EventWatcher.java | 43 +++++++- .../hdds/server/events/EventWatcherMetrics.java | 79 ++++++++++++++ .../hdds/server/events/TestEventWatcher.java | 107 ++++++++++++++++--- 4 files changed, 220 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12d93bf/hadoop-hdds/framework/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/pom.xml b/hadoop-hdds/framework/pom.xml index a497133..6e1927d 100644 --- a/hadoop-hdds/framework/pom.xml +++ b/hadoop-hdds/framework/pom.xml @@ -39,6 +39,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> <artifactId>hadoop-hdds-common</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12d93bf/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java index 19fddde..8c5605a 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java @@ -26,12 +26,17 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import java.util.stream.Collectors; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.ozone.lease.Lease; import org.apache.hadoop.ozone.lease.LeaseAlreadyExistException; import org.apache.hadoop.ozone.lease.LeaseExpiredException; import org.apache.hadoop.ozone.lease.LeaseManager; import org.apache.hadoop.ozone.lease.LeaseNotFoundException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.commons.collections.map.HashedMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,18 +63,39 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends private final LeaseManager<UUID> leaseManager; + private final EventWatcherMetrics metrics; + + private final String name; + protected final Map<UUID, TIMEOUT_PAYLOAD> trackedEventsByUUID = new ConcurrentHashMap<>(); protected final Set<TIMEOUT_PAYLOAD> trackedEvents = new HashSet<>(); - public EventWatcher(Event<TIMEOUT_PAYLOAD> startEvent, + private final Map<UUID, Long> startTrackingTimes = new HashedMap(); + + public EventWatcher(String name, Event<TIMEOUT_PAYLOAD> startEvent, Event<COMPLETION_PAYLOAD> completionEvent, LeaseManager<UUID> leaseManager) { this.startEvent = startEvent; this.completionEvent = completionEvent; this.leaseManager = leaseManager; + this.metrics = new EventWatcherMetrics(); + Preconditions.checkNotNull(name); + if (name.equals("")) { + name = getClass().getSimpleName(); + } + if (name.equals("")) { + //for anonymous inner classes + name = getClass().getName(); + } + this.name = name; + } + public EventWatcher(Event<TIMEOUT_PAYLOAD> startEvent, + Event<COMPLETION_PAYLOAD> completionEvent, + LeaseManager<UUID> leaseManager) { + this("", startEvent, completionEvent, leaseManager); } public void start(EventQueue queue) { @@ -87,11 +113,16 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends } }); + MetricsSystem ms = DefaultMetricsSystem.instance(); + ms.register(name, "EventWatcher metrics", metrics); } private synchronized void handleStartEvent(TIMEOUT_PAYLOAD payload, EventPublisher publisher) { + metrics.incrementTrackedEvents(); UUID identifier = payload.getUUID(); + startTrackingTimes.put(identifier, System.currentTimeMillis()); + trackedEventsByUUID.put(identifier, payload); trackedEvents.add(payload); try { @@ -112,16 +143,21 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends private synchronized void handleCompletion(UUID uuid, EventPublisher publisher) throws LeaseNotFoundException { + metrics.incrementCompletedEvents(); leaseManager.release(uuid); TIMEOUT_PAYLOAD payload = trackedEventsByUUID.remove(uuid); trackedEvents.remove(payload); + long originalTime = startTrackingTimes.remove(uuid); + metrics.updateFinishingTime(System.currentTimeMillis() - originalTime); onFinished(publisher, payload); } private synchronized void handleTimeout(EventPublisher publisher, UUID identifier) { + metrics.incrementTimedOutEvents(); TIMEOUT_PAYLOAD payload = trackedEventsByUUID.remove(identifier); trackedEvents.remove(payload); + startTrackingTimes.remove(payload.getUUID()); onTimeout(publisher, payload); } @@ -154,4 +190,9 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends return trackedEventsByUUID.values().stream().filter(predicate) .collect(Collectors.toList()); } + + @VisibleForTesting + protected EventWatcherMetrics getMetrics() { + return metrics; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12d93bf/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcherMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcherMetrics.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcherMetrics.java new file mode 100644 index 0000000..1db81a9 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcherMetrics.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.server.events; + +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableRate; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Metrics for any event watcher. + */ +public class EventWatcherMetrics { + + @Metric() + private MutableCounterLong trackedEvents; + + @Metric() + private MutableCounterLong timedOutEvents; + + @Metric() + private MutableCounterLong completedEvents; + + @Metric() + private MutableRate completionTime; + + public void incrementTrackedEvents() { + trackedEvents.incr(); + } + + public void incrementTimedOutEvents() { + timedOutEvents.incr(); + } + + public void incrementCompletedEvents() { + completedEvents.incr(); + } + + @VisibleForTesting + public void updateFinishingTime(long duration) { + completionTime.add(duration); + } + + @VisibleForTesting + public MutableCounterLong getTrackedEvents() { + return trackedEvents; + } + + @VisibleForTesting + public MutableCounterLong getTimedOutEvents() { + return timedOutEvents; + } + + @VisibleForTesting + public MutableCounterLong getCompletedEvents() { + return completedEvents; + } + + @VisibleForTesting + public MutableRate getCompletionTime() { + return completionTime; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12d93bf/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java index 1731350..38e1554 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java @@ -21,8 +21,13 @@ import java.util.List; import java.util.Objects; import java.util.UUID; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.ozone.lease.LeaseManager; +import org.apache.hadoop.test.MetricsAsserts; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -46,6 +51,7 @@ public class TestEventWatcher { @Before public void startLeaseManager() { + DefaultMetricsSystem.instance(); leaseManager = new LeaseManager<>(2000l); leaseManager.start(); } @@ -53,12 +59,12 @@ public class TestEventWatcher { @After public void stopLeaseManager() { leaseManager.shutdown(); + DefaultMetricsSystem.shutdown(); } @Test public void testEventHandling() throws InterruptedException { - EventQueue queue = new EventQueue(); EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent> @@ -139,26 +145,101 @@ public class TestEventWatcher { Assert.assertEquals(0, c1todo.size()); Assert.assertFalse(replicationWatcher.contains(event1)); + } + + @Test + public void testMetrics() throws InterruptedException { + + DefaultMetricsSystem.initialize("test"); + + EventQueue queue = new EventQueue(); + + EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent> + replicationWatcher = createEventWatcher(); + + EventHandlerStub<UnderreplicatedEvent> underReplicatedEvents = + new EventHandlerStub<>(); + + queue.addHandler(UNDER_REPLICATED, underReplicatedEvents); + + replicationWatcher.start(queue); + + //send 3 event to track 3 in-progress activity + UnderreplicatedEvent event1 = + new UnderreplicatedEvent(UUID.randomUUID(), "C1"); + + UnderreplicatedEvent event2 = + new UnderreplicatedEvent(UUID.randomUUID(), "C2"); + + UnderreplicatedEvent event3 = + new UnderreplicatedEvent(UUID.randomUUID(), "C1"); + + queue.fireEvent(WATCH_UNDER_REPLICATED, event1); + + queue.fireEvent(WATCH_UNDER_REPLICATED, event2); + + queue.fireEvent(WATCH_UNDER_REPLICATED, event3); + + //1st event is completed, don't need to track any more + ReplicationCompletedEvent event1Completed = + new ReplicationCompletedEvent(event1.UUID, "C1", "D1"); + + queue.fireEvent(REPLICATION_COMPLETED, event1Completed); + + + Thread.sleep(2200l); + + //until now: 3 in-progress activities are tracked with three + // UnderreplicatedEvents. The first one is completed, the remaining two + // are timed out (as the timeout -- defined in the leasmanager -- is 2000ms. + EventWatcherMetrics metrics = replicationWatcher.getMetrics(); + + //3 events are received + Assert.assertEquals(3, metrics.getTrackedEvents().value()); + + //one is finished. doesn't need to be resent + Assert.assertEquals(1, metrics.getCompletedEvents().value()); + + //Other two are timed out and resent + Assert.assertEquals(2, metrics.getTimedOutEvents().value()); + + DefaultMetricsSystem.shutdown(); } private EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent> createEventWatcher() { - return new EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent>( - WATCH_UNDER_REPLICATED, REPLICATION_COMPLETED, leaseManager) { + return new CommandWatcherExample(WATCH_UNDER_REPLICATED, + REPLICATION_COMPLETED, leaseManager); + } - @Override - void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) { - publisher.fireEvent(UNDER_REPLICATED, payload); - } + private class CommandWatcherExample + extends EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent> { - @Override - void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) { - //Good job. We did it. - } - }; + public CommandWatcherExample(Event<UnderreplicatedEvent> startEvent, + Event<ReplicationCompletedEvent> completionEvent, + LeaseManager<UUID> leaseManager) { + super("TestCommandWatcher", startEvent, completionEvent, leaseManager); + } + + @Override + void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) { + publisher.fireEvent(UNDER_REPLICATED, payload); + } + + @Override + void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) { + //Good job. We did it. + } + + @Override + public EventWatcherMetrics getMetrics() { + return super.getMetrics(); + } } + ; + private static class ReplicationCompletedEvent implements IdentifiableEventPayload { @@ -217,4 +298,4 @@ public class TestEventWatcher { } } -} \ No newline at end of file +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org