Repository: eagle Updated Branches: refs/heads/master 0cda01b58 -> 673a81e44
[EAGLE-872] Transform counter metric to rate metric - Add transform bolt using counterToRateFunction in HadoopMetricMonitorApp's storm topology. https://issues.apache.org/jira/browse/EAGLE-872 Author: r7raul1984 <[email protected]> Closes #783 from r7raul1984/EAGLE-872. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/673a81e4 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/673a81e4 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/673a81e4 Branch: refs/heads/master Commit: 673a81e440914ca292b14b26ec2eb8b757a01e8f Parents: 0cda01b Author: r7raul1984 <[email protected]> Authored: Tue Mar 21 02:52:14 2017 +0000 Committer: r7raul1984 <[email protected]> Committed: Tue Mar 21 02:52:14 2017 +0000 ---------------------------------------------------------------------- .../environment/builder/CountMetricFilter.java | 26 ++ .../builder/CounterToRateFunction.java | 220 +++++++++++++ .../java/org/apache/eagle/app/utils/Clock.java | 24 ++ .../apache/eagle/app/utils/ClockWithOffset.java | 35 +++ .../org/apache/eagle/app/utils/ManualClock.java | 54 ++++ .../builder/CounterToRateFunctionTest.java | 306 +++++++++++++++++++ .../app/messaging/MetricStreamPersistTest.java | 144 +++++++++ .../mr/running/MRRunningJobApplicationTest.java | 2 - .../jpm/mr/running/MRRunningJobManagerTest.java | 3 - .../parser/MRJobEntityCreationHandlerTest.java | 2 - .../eagle/metric/HadoopMetricMonitorApp.java | 56 ++-- 11 files changed, 840 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CountMetricFilter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CountMetricFilter.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CountMetricFilter.java new file mode 100644 index 0000000..bed047b --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CountMetricFilter.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.app.environment.builder; + +import java.io.Serializable; +import java.util.function.Function; + +@FunctionalInterface +interface CountMetricFilter extends Function<String, Boolean>, Serializable { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java new file mode 100644 index 0000000..51dad41 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.app.environment.builder; + +import com.google.common.base.Preconditions; +import org.apache.eagle.app.utils.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class CounterToRateFunction implements TransformFunction { + private static final Logger LOG = LoggerFactory.getLogger(CounterToRateFunction.class); + private final Map<String, CounterValue> cache; + private MetricDescriptor metricDescriptor; + private Collector collector; + + public CounterToRateFunction(MetricDescriptor metricDescriptor, long heartbeat, TimeUnit unit, final Clock clock) { + final long heartbeatMillis = TimeUnit.MILLISECONDS.convert(heartbeat, unit); + this.cache = new LinkedHashMap<String, CounterValue>(16, 0.75f, true) { + protected boolean removeEldestEntry(Map.Entry<String, CounterValue> eldest) { + final long now = clock.now(); + final long lastMod = eldest.getValue().getTimestamp(); + final boolean expired = (now - lastMod) > heartbeatMillis; + if (expired) { + LOG.debug("heartbeat interval exceeded, expiring {}", eldest.getKey()); + } + return expired; + } + }; + this.metricDescriptor = metricDescriptor; + } + + @Override + public String getName() { + return "CounterToRate"; + } + + @Override + public void open(Collector collector) { + this.collector = collector; + } + + @Override + public void transform(Map event) { + Metric metric = toMetric(event); + LOG.debug("received {} metrics", metric); + if (new DefaultCountMetricFilter().apply(metric.getMetricName())) { + final String metricName = metric.getMetricName(); + final CounterValue prev = cache.get(metricName); + if (prev != null) { + final double rate = prev.computeRate(metric); + event.put(metricDescriptor.getValueField(), rate); + collector.collect(event.toString(), event); + } else { + CounterValue current = new CounterValue(metric); + cache.put(metricName, current); + } + } else { + collector.collect(event.toString(), event); + } + + } + + @Override + public void close() { + cache.clear(); + } + + private Metric toMetric(Map event) { + + String metricName = ""; + for (String dimensionField : metricDescriptor.getDimensionFields()) { + metricName += event.get(dimensionField) + "-"; + } + metricName += metricDescriptor.getMetricNameSelector().getMetricName(event); + + long timestamp = metricDescriptor.getTimestampSelector().getTimestamp(event); + + return new Metric(metricName, timestamp, getCurrentValue(event)); + } + + private double getCurrentValue(Map event) { + double[] values; + if (event.containsKey(metricDescriptor.getValueField())) { + values = new double[] {(double) event.get(metricDescriptor.getValueField())}; + } else { + LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDescriptor.getValueField(), event); + values = new double[] {0}; + } + return values[0]; + } + + protected static class CounterValue { + private long timestamp; + private double value; + + public CounterValue(long timestamp, double value) { + this.timestamp = timestamp; + this.value = value; + } + + public CounterValue(Metric m) { + this(m.getTimestamp(), m.getNumberValue().doubleValue()); + } + + public long getTimestamp() { + return timestamp; + } + + public double computeRate(Metric m) { + final long currentTimestamp = m.getTimestamp(); + final double currentValue = m.getNumberValue().doubleValue(); + + final long durationMillis = currentTimestamp - timestamp; + final double delta = currentValue - value; + + timestamp = currentTimestamp; + value = currentValue; + + return computeRate(durationMillis, delta); + } + + private double computeRate(long durationMillis, double delta) { + final double millisPerSecond = 1000.0; + final double duration = durationMillis / millisPerSecond; + return (duration <= 0.0 || delta <= 0.0) ? 0.0 : delta / duration; + } + + @Override + public String toString() { + return "CounterValue{" + "timestamp=" + timestamp + ", value=" + value + '}'; + } + } + + + protected final class Metric { + private final String metricName; + private final long timestamp; + private final Object value; + + public Metric(String metricName, long timestamp, Object value) { + this.metricName = Preconditions.checkNotNull(metricName, "metricName"); + this.timestamp = timestamp; + this.value = Preconditions.checkNotNull(value, "value"); + } + + public String getMetricName() { + return metricName; + } + + public long getTimestamp() { + return timestamp; + } + + public Object getValue() { + return value; + } + + public Number getNumberValue() { + return (Number) value; + } + + public boolean hasNumberValue() { + return (value instanceof Number); + } + + public boolean isCounter() { + return metricName.endsWith("count"); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || !(obj instanceof Metric)) { + return false; + } + Metric m = (Metric) obj; + return metricName.equals(m.getMetricName()) + && timestamp == m.getTimestamp() + && value.equals(m.getValue()); + } + + @Override + public int hashCode() { + int result = metricName.hashCode(); + result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); + result = 31 * result + value.hashCode(); + return result; + } + + @Override + public String toString() { + return "Metric{metricName=" + metricName + ", timestamp=" + timestamp + ", value=" + value + '}'; + } + } + + private class DefaultCountMetricFilter implements CountMetricFilter { + @Override + public Boolean apply(String metricName) { + return metricName.endsWith("count"); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/Clock.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/Clock.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/Clock.java new file mode 100644 index 0000000..f3deff9 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/Clock.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.app.utils; + +public interface Clock { + Clock WALL = System::currentTimeMillis; + + long now(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ClockWithOffset.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ClockWithOffset.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ClockWithOffset.java new file mode 100644 index 0000000..62b060f --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ClockWithOffset.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.app.utils; + +public enum ClockWithOffset implements Clock { + INSTANCE; + + private volatile long offset = 0L; + + public void setOffset(long offset) { + if (offset >= 0) { + this.offset = offset; + } + } + + @Override + public long now() { + return offset + System.currentTimeMillis(); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ManualClock.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ManualClock.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ManualClock.java new file mode 100644 index 0000000..cd8fc80 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ManualClock.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.utils; + +import java.util.concurrent.atomic.AtomicLong; + +public class ManualClock implements Clock { + + private final AtomicLong time; + + public ManualClock(long init) { + time = new AtomicLong(init); + } + + public void set(long t) { + time.set(t); + } + + public long now() { + return time.get(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ManualClock clock = (ManualClock) o; + return now() == clock.now(); + } + + @Override + public int hashCode() { + return Long.valueOf(now()).hashCode(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/builder/CounterToRateFunctionTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/builder/CounterToRateFunctionTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/builder/CounterToRateFunctionTest.java new file mode 100644 index 0000000..6c00880 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/builder/CounterToRateFunctionTest.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.app.environment.builder; + +import backtype.storm.task.IOutputCollector; +import backtype.storm.task.OutputCollector; +import backtype.storm.tuple.Tuple; +import org.apache.eagle.app.utils.ClockWithOffset; +import org.apache.eagle.app.utils.ManualClock; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.*; +import java.util.concurrent.TimeUnit; + +public class CounterToRateFunctionTest { + + + private Map mkCountTypeEvent(long ts, double value) { + Map event = new HashMap(); + event.put("timestamp", ts); + event.put("metric", "hadoop.hbase.regionserver.server.totalrequestcount"); + event.put("component", "hbasemaster"); + event.put("site", "sandbox"); + event.put("value", value); + event.put("host", "xxx-xxx.int.xxx.com"); + return event; + } + + private Map mkCountTypeEventWithMetricName(long ts, double value, String metric) { + Map event = new HashMap(); + event.put("timestamp", ts); + event.put("metric", metric); + event.put("component", "hbasemaster"); + event.put("site", "sandbox"); + event.put("value", value); + event.put("host", "xxx-xxx.int.xxx.com"); + return event; + } + + private Map mkOtherTypeEvent(long ts, double value) { + Map event = new HashMap(); + event.put("timestamp", ts); + event.put("metric", "hadoop.memory.heapmemoryusage.used"); + event.put("component", "hbasemaster"); + event.put("site", "sandbox"); + event.put("value", value); + event.put("host", "xxx-xxx.int.xxx.com"); + return event; + } + + + @Test + public void testToMetricAndCounterValue() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + long baseTime = System.currentTimeMillis() + 100000L; + + MetricDescriptor metricDefinition = MetricDescriptor + .metricGroupByField("group") + .siteAs("siteId") + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host", "component", "site") + .granularity(Calendar.MINUTE) + .valueField("value"); + CounterToRateFunction counterToRateFunction = new CounterToRateFunction(metricDefinition, 3, TimeUnit.MINUTES, ClockWithOffset.INSTANCE); + + Map event = mkCountTypeEvent((baseTime + 0), 374042741.0); + Method toMetricMethod = counterToRateFunction.getClass().getDeclaredMethod("toMetric", Map.class); + toMetricMethod.setAccessible(true); + CounterToRateFunction.Metric metric = (CounterToRateFunction.Metric) toMetricMethod.invoke(counterToRateFunction, event); + Assert.assertEquals("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.totalrequestcount", metric.getMetricName()); + Assert.assertEquals(374042741.0, Double.valueOf(metric.getValue().toString()), 0.00001); + Assert.assertEquals(374042741.0, metric.getNumberValue().doubleValue(), 0.00001); + Assert.assertTrue(metric.isCounter()); + + + event = mkOtherTypeEvent((baseTime + 0), 100); + metric = (CounterToRateFunction.Metric) toMetricMethod.invoke(counterToRateFunction, event); + Assert.assertEquals("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.memory.heapmemoryusage.used", metric.getMetricName()); + Assert.assertEquals(100, Double.valueOf(metric.getValue().toString()), 0.00001); + Assert.assertEquals(100, metric.getNumberValue().doubleValue(), 0.00001); + Assert.assertTrue(!metric.isCounter()); + + + } + + @Test + public void testTransformToRate() throws NoSuchFieldException, IllegalAccessException { + List<Map> result = new ArrayList<>(); + OutputCollector collector = new OutputCollector(new IOutputCollector() { + @Override + public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { + result.add((Map) tuple.get(1)); + return null; + } + + @Override + public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { + + } + + @Override + public void ack(Tuple input) { + + } + + @Override + public void fail(Tuple input) { + + } + + @Override + public void reportError(Throwable error) { + + } + }); + MetricDescriptor metricDefinition = MetricDescriptor + .metricGroupByField("group") + .siteAs("siteId") + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host", "component", "site") + .granularity(Calendar.MINUTE) + .valueField("value"); + CounterToRateFunction counterToRateFunction = new CounterToRateFunction(metricDefinition, 3, TimeUnit.MINUTES, ClockWithOffset.INSTANCE); + counterToRateFunction.open(new StormOutputCollector(collector)); + long baseTime = System.currentTimeMillis() + 100000L; + //put first count sample + Map event = mkCountTypeEvent((baseTime + 0), 374042741.0); + counterToRateFunction.transform(event); + Assert.assertTrue(result.isEmpty()); + + Field cacheField = counterToRateFunction.getClass().getDeclaredField("cache"); + cacheField.setAccessible(true); + Map<String, CounterToRateFunction.CounterValue> cache = (Map<String, CounterToRateFunction.CounterValue>) cacheField.get(counterToRateFunction); + Assert.assertTrue(cache.size() == 1); + + CounterToRateFunction.CounterValue counterValue = cache.get("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.totalrequestcount"); + Assert.assertEquals((long) event.get("timestamp"), counterValue.getTimestamp()); + Field valueField = counterValue.getClass().getDeclaredField("value"); + valueField.setAccessible(true); + double value = (double) valueField.get(counterValue); + Assert.assertEquals(374042741.0, value, 0.00001); + result.clear(); + //put not count sample + event = mkOtherTypeEvent((baseTime + 0), 100); + counterToRateFunction.transform(event); + Assert.assertTrue(result.size() == 1); + Assert.assertTrue(cache.size() == 1); + Assert.assertEquals(baseTime + 0, counterValue.getTimestamp()); + Assert.assertEquals(374042741.0, value, 0.00001); + + Assert.assertEquals("hadoop.memory.heapmemoryusage.used", event.get("metric")); + Assert.assertEquals(100, (Double) event.get("value"), 0.00001); + result.clear(); + + //delta of 10 in 5 seconds + event = mkCountTypeEvent((baseTime + 5000), 374042751.0); + counterToRateFunction.transform(event); + + Assert.assertTrue(result.size() == 1); + Map transedEvent = result.get(0); + Assert.assertEquals(baseTime + 5000, transedEvent.get("timestamp")); + Assert.assertEquals(2.0, (double) transedEvent.get("value"), 0.00001); + Assert.assertEquals(baseTime + 5000, counterValue.getTimestamp()); + value = (double) valueField.get(counterValue); + Assert.assertEquals(374042751.0, value, 0.00001); + result.clear(); + + //delta of 15 in 5 seconds + event = mkCountTypeEvent((baseTime + 10000), 374042766.0); + counterToRateFunction.transform(event); + + Assert.assertTrue(result.size() == 1); + transedEvent = result.get(0); + Assert.assertEquals(baseTime + 10000, transedEvent.get("timestamp")); + Assert.assertEquals(3.0, (double) transedEvent.get("value"), 0.00001); + Assert.assertEquals(baseTime + 10000, counterValue.getTimestamp()); + value = (double) valueField.get(counterValue); + Assert.assertEquals(374042766.0, value, 0.00001); + result.clear(); + + + //No change from previous sample + event = mkCountTypeEvent((baseTime + 15000), 374042766.0); + counterToRateFunction.transform(event); + + Assert.assertTrue(result.size() == 1); + transedEvent = result.get(0); + Assert.assertEquals(baseTime + 15000, transedEvent.get("timestamp")); + Assert.assertEquals(0.0, (double) transedEvent.get("value"), 0.00001); + Assert.assertEquals(baseTime + 15000, counterValue.getTimestamp()); + value = (double) valueField.get(counterValue); + Assert.assertEquals(374042766.0, value, 0.00001); + result.clear(); + + //Decrease from previous sample + event = mkCountTypeEvent((baseTime + 20000), 1.0); + counterToRateFunction.transform(event); + + Assert.assertTrue(result.size() == 1); + transedEvent = result.get(0); + Assert.assertEquals(baseTime + 20000, transedEvent.get("timestamp")); + Assert.assertEquals(0.0, (double) transedEvent.get("value"), 0.00001); + Assert.assertEquals(baseTime + 20000, counterValue.getTimestamp()); + value = (double) valueField.get(counterValue); + Assert.assertEquals(1.0, value, 0.00001); + result.clear(); + } + + @Test + public void testTransformToRateWithExpiration() throws NoSuchFieldException, IllegalAccessException { + + MetricDescriptor metricDefinition = MetricDescriptor + .metricGroupByField("group") + .siteAs("siteId") + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host", "component", "site") + .granularity(Calendar.MINUTE) + .valueField("value"); + List<Map> result = new ArrayList<>(); + OutputCollector collector = new OutputCollector(new IOutputCollector() { + @Override + public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { + result.add((Map) tuple.get(1)); + return null; + } + + @Override + public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { + + } + + @Override + public void ack(Tuple input) { + + } + + @Override + public void fail(Tuple input) { + + } + + @Override + public void reportError(Throwable error) { + + } + }); + ManualClock manualClock = new ManualClock(0); + manualClock.set(30000L); + CounterToRateFunction counterToRateFunction = new CounterToRateFunction(metricDefinition, 60, TimeUnit.SECONDS, manualClock); + counterToRateFunction.open(new StormOutputCollector(collector)); + Map event = mkCountTypeEventWithMetricName(manualClock.now(), 110, "hadoop.hbase.regionserver.server.totalrequestcount"); + counterToRateFunction.transform(event); + Field cacheField = counterToRateFunction.getClass().getDeclaredField("cache"); + cacheField.setAccessible(true); + Map<String, CounterToRateFunction.CounterValue> cache = (Map<String, CounterToRateFunction.CounterValue>) cacheField.get(counterToRateFunction); + Assert.assertTrue(cache.size() == 1); + + manualClock.set(50000L); + event = mkCountTypeEventWithMetricName(manualClock.now(), 130, "hadoop.hbase.regionserver.server.readerrequestcount"); + counterToRateFunction.transform(event); + + cache = (Map<String, CounterToRateFunction.CounterValue>) cacheField.get(counterToRateFunction); + Assert.assertEquals(2, cache.size()); + Assert.assertEquals("CounterValue{timestamp=30000, value=110.0}", cache.get("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.totalrequestcount").toString()); + Assert.assertEquals("CounterValue{timestamp=50000, value=130.0}", cache.get("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.readerrequestcount").toString()); + + manualClock.set(100000L); + event = mkCountTypeEventWithMetricName(manualClock.now(), 120, "hadoop.hbase.regionserver.server.totalrequestcount"); + counterToRateFunction.transform(event); + + cache = (Map<String, CounterToRateFunction.CounterValue>) cacheField.get(counterToRateFunction); + Assert.assertEquals(2, cache.size()); + Assert.assertEquals("CounterValue{timestamp=100000, value=120.0}", cache.get("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.totalrequestcount").toString()); + Assert.assertEquals("CounterValue{timestamp=50000, value=130.0}", cache.get("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.readerrequestcount").toString()); + + manualClock.set(160001L); + event = mkCountTypeEventWithMetricName(manualClock.now(), 10, "hadoop.hbase.regionserver.server.writerrequestcount"); + counterToRateFunction.transform(event); + Assert.assertEquals(2, cache.size()); + Assert.assertEquals("CounterValue{timestamp=160001, value=10.0}", cache.get("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.writerrequestcount").toString()); + Assert.assertEquals("CounterValue{timestamp=50000, value=130.0}", cache.get("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.readerrequestcount").toString()); + + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/messaging/MetricStreamPersistTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/messaging/MetricStreamPersistTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/messaging/MetricStreamPersistTest.java new file mode 100644 index 0000000..1561a41 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/messaging/MetricStreamPersistTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.messaging; + +import backtype.storm.Testing; +import backtype.storm.task.IOutputCollector; +import backtype.storm.task.OutputCollector; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import com.typesafe.config.Config; +import org.apache.eagle.app.environment.builder.MetricDescriptor; +import org.apache.eagle.app.utils.StreamConvertHelper; +import org.apache.eagle.log.entity.GenericMetricEntity; +import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.lang.reflect.Field; +import java.util.*; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest( {MetricStreamPersist.class}) +public class MetricStreamPersistTest { + + @Test + public void testStructuredMetricMapper() throws Exception { + MetricDescriptor metricDefinition = MetricDescriptor + .metricGroupByField("group") + .siteAs("siteId") + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host", "component", "site") + .granularity(Calendar.MINUTE) + .valueField("value"); + Config config = mock(Config.class); + MetricStreamPersist metricStreamPersist = new MetricStreamPersist(metricDefinition, config); + Field mapperField = metricStreamPersist.getClass().getDeclaredField("mapper"); + mapperField.setAccessible(true); + + Map event = new HashMap(); + event.put("timestamp", 1482106479564L); + event.put("metric", "hadoop.memory.heapmemoryusage.used"); + event.put("component", "hbasemaster"); + event.put("site", "sandbox"); + event.put("value", 14460904.0); + event.put("host", "xxx-xxx.int.xxx.com"); + + Tuple tuple = Testing.testTuple(new Values("metric", event)); + MetricStreamPersist.MetricMapper mapper = (MetricStreamPersist.MetricMapper) mapperField.get(metricStreamPersist); + + GenericMetricEntity metricEntity = mapper.map(StreamConvertHelper.tupleToEvent(tuple).f1()); + + Assert.assertEquals("prefix:hadoop.memory.heapmemoryusage.used, timestamp:1482106440000, humanReadableDate:2016-12-19 00:14:00,000, tags: component=hbasemaster,site=sandbox,host=xxx-xxx.int.xxx.com,, encodedRowkey:null", metricEntity.toString()); + } + + @Test + public void testMetricStreamPersist() throws Exception { + List<String> result = new ArrayList<>(); + OutputCollector collector = new OutputCollector(new IOutputCollector() { + @Override + public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { + result.add(String.valueOf(tuple.get(0))); + return null; + } + + @Override + public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { + + } + + @Override + public void ack(Tuple input) { + + } + + @Override + public void fail(Tuple input) { + + } + + @Override + public void reportError(Throwable error) { + + } + }); + + MetricDescriptor metricDefinition = MetricDescriptor + .metricGroupByField("group") + .siteAs("siteId") + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host", "component", "site") + .granularity(Calendar.MINUTE) + .valueField("value"); + Config config = mock(Config.class); + when(config.hasPath("service.batchSize")).thenReturn(false); + + GenericServiceAPIResponseEntity<String> response = mock(GenericServiceAPIResponseEntity.class); + when(response.isSuccess()).thenReturn(true); + + EagleServiceClientImpl client = mock(EagleServiceClientImpl.class); + PowerMockito.whenNew(EagleServiceClientImpl.class).withArguments(config).thenReturn(client); + when(client.create(anyObject())).thenReturn(response); + + MetricStreamPersist metricStreamPersist = new MetricStreamPersist(metricDefinition, config); + metricStreamPersist.prepare(null, null, collector); + Map event = new HashMap(); + event.put("timestamp", 1482106479564L); + event.put("metric", "hadoop.memory.heapmemoryusage.used"); + event.put("component", "hbasemaster"); + event.put("site", "sandbox"); + event.put("value", 14460904.0); + event.put("host", "xxx-xxx.int.xxx.com"); + + Tuple tuple = Testing.testTuple(new Values("metric", event)); + metricStreamPersist.execute(tuple); + Assert.assertTrue(result.size() == 1); + Assert.assertEquals("hadoop.memory.heapmemoryusage.used", result.get(0)); + } +} + http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java index a43c956..faf8c8e 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java @@ -65,12 +65,10 @@ public class MRRunningJobApplicationTest { private static final String TUPLE_2 = "[application_1479206441898_35341, AppInfo{id='application_1479206441898_35341', user='yyy', name='insert overwrite table inter...a.xxx(Stage-3)', queue='yyy', state='RUNNING', finalStatus='UNDEFINED', progress=59.545456, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_35341/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479341511477, finishedTime=0, elapsedTime=77619, amContainerLogs='http://host.domain.com:8042/node/containerlogs/container_e11_1479206441898_35341_01_000005/yyy', amHostHttpAddress='host.domain.com:8042', allocatedMB=27648, allocatedVCores=6, runningContainers=6}, null]"; private static final ObjectMapper OBJ_MAPPER = new ObjectMapper(); private static Config config = ConfigFactory.load(); - private static String siteId; @BeforeClass public static void setupMapper() throws Exception { OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true); - siteId = config.getString("siteId"); } http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java index 4c52e10..eb48ff6 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java @@ -24,7 +24,6 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingServer; import org.apache.curator.utils.CloseableUtils; import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager; -import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity; import org.apache.eagle.jpm.util.jobrecover.RunningJobManager; import org.apache.zookeeper.CreateMode; import org.junit.*; @@ -37,8 +36,6 @@ import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandlerTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandlerTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandlerTest.java index 0ec1b8e..7ffd30f 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandlerTest.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandlerTest.java @@ -21,8 +21,6 @@ import org.apache.eagle.jpm.mr.running.parser.metrics.JobExecutionMetricsCreatio import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity; import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.jpm.util.Utils; -import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils; -import org.apache.eagle.jpm.util.resourcefetch.connection.URLConnectionUtils; import org.apache.eagle.jpm.util.resourcefetch.model.*; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import com.fasterxml.jackson.core.JsonParser; http://git-wip-us.apache.org/repos/asf/eagle/blob/673a81e4/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java ---------------------------------------------------------------------- diff --git a/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java b/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java index 05c874d..9c0ac10 100644 --- a/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java +++ b/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java @@ -19,41 +19,47 @@ package org.apache.eagle.metric; import backtype.storm.generated.StormTopology; import com.typesafe.config.Config; import org.apache.eagle.app.StormApplication; +import org.apache.eagle.app.environment.builder.CounterToRateFunction; import org.apache.eagle.app.environment.builder.MetricDescriptor; import org.apache.eagle.app.environment.builder.MetricDescriptor.MetricGroupSelector; import org.apache.eagle.app.environment.impl.StormEnvironment; import org.apache.eagle.app.utils.AppConfigUtils; +import org.apache.eagle.app.utils.ClockWithOffset; import java.util.Calendar; +import java.util.concurrent.TimeUnit; public class HadoopMetricMonitorApp extends StormApplication { @Override public StormTopology execute(Config config, StormEnvironment environment) { + + MetricDescriptor hadoopMetricDescriptor = MetricDescriptor.metricGroupAs((MetricGroupSelector) event -> { + if (event.containsKey("component")) { + return String.format("hadoop.%s", ((String) event.get("component")).toLowerCase()); + } else { + return "hadoop.metrics"; + } + }) + .siteAs(AppConfigUtils.getSiteId(config)) + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host", "component", "site") + .granularity(Calendar.SECOND) + .valueField("value"); + + MetricDescriptor systemMetricDescriptor = MetricDescriptor.metricGroupByField("group") + .siteAs(AppConfigUtils.getSiteId(config)) + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host", "group", "site", "device") + .granularity(Calendar.SECOND) + .valueField("value"); return environment.newApp(config) - .fromStream("HADOOP_JMX_METRIC_STREAM") - .saveAsMetric( - MetricDescriptor.metricGroupAs((MetricGroupSelector) event -> { - if (event.containsKey("component")) { - return String.format("hadoop.%s", ((String) event.get("component")).toLowerCase()); - } else { - return "hadoop.metrics"; - } - }) - .siteAs(AppConfigUtils.getSiteId(config)) - .namedByField("metric") - .eventTimeByField("timestamp") - .dimensionFields("host", "component", "site") - .granularity(Calendar.SECOND) - .valueField("value")) - .fromStream("SYSTEM_METRIC_STREAM") - .saveAsMetric(MetricDescriptor.metricGroupByField("group") - .siteAs(AppConfigUtils.getSiteId(config)) - .namedByField("metric") - .eventTimeByField("timestamp") - .dimensionFields("host", "group", "site", "device") - .granularity(Calendar.SECOND) - .valueField("value") - ) - .toTopology(); + .fromStream("HADOOP_JMX_METRIC_STREAM").transformBy(new CounterToRateFunction(hadoopMetricDescriptor,3, TimeUnit.SECONDS, ClockWithOffset.INSTANCE)) + .saveAsMetric(hadoopMetricDescriptor) + .fromStream("SYSTEM_METRIC_STREAM").transformBy(new CounterToRateFunction(hadoopMetricDescriptor,3, TimeUnit.SECONDS, ClockWithOffset.INSTANCE)) + .saveAsMetric(systemMetricDescriptor + ) + .toTopology(); } } \ No newline at end of file
